You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/04/12 09:14:46 UTC

[9/9] ambari git commit: AMBARI-20734. Handle caching of topologies, configs and metadata (aonishuk)

AMBARI-20734. Handle caching of topologies,configs and metadata  (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6b76fc90
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6b76fc90
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6b76fc90

Branch: refs/heads/branch-3.0-perf
Commit: 6b76fc9026725d0981e1fdc539301e58249c9612
Parents: 6934475
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Wed Apr 12 12:14:26 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Wed Apr 12 12:14:26 2017 +0300

----------------------------------------------------------------------
 .../main/python/ambari_agent/ClusterCache.py    |  16 ++-
 .../ambari_agent/ClusterConfigurationCache.py   |   4 +-
 .../python/ambari_agent/ClusterMetadataCache.py |   4 +-
 .../python/ambari_agent/ClusterTopologyCache.py |   4 +-
 .../src/main/python/ambari_agent/Constants.py   |  12 ++
 .../src/main/python/ambari_agent/Controller.py  |   6 +-
 .../main/python/ambari_agent/HeartbeatThread.py |  94 +++++++++++---
 .../listeners/ConfigurationEventListener.py     |  46 +++++++
 .../listeners/MetadataEventListener.py          |  46 +++++++
 .../listeners/ServerResponsesListener.py        |  31 +++--
 .../listeners/TopologyEventListener.py          |  27 +++-
 .../python/ambari_agent/listeners/__init__.py   |  45 ++++++-
 .../src/main/python/ambari_agent/security.py    |   6 +-
 .../ambari_agent/TestAgentStompResponses.py     |  76 +++++++++--
 .../stomp/metadata_after_registration.json      |  37 ++++++
 .../stomp/registration_response.json            | 125 +------------------
 16 files changed, 399 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index 1986733..f2ac8ed 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -44,7 +44,7 @@ class ClusterCache(dict):
 
     self.__file_lock = threading.RLock()
     self._cache_lock = threading.RLock()
-    self.__current_cache_json_file = os.path.join(self.cluster_cache_dir, self.get_file_name())
+    self.__current_cache_json_file = os.path.join(self.cluster_cache_dir, self.get_cache_name()+'.json')
 
     # ensure that our cache directory exists
     if not os.path.exists(cluster_cache_dir):
@@ -58,7 +58,14 @@ class ClusterCache(dict):
 
     super(ClusterCache, self).__init__(cache_dict)
 
-  def update_cache(self, cluster_name, cache):
+  def get_cluster_names(self):
+    return self.keys()
+
+  def update_cache(self, cache):
+    for cluster_name, cluster_cache in cache['clusters'].iteritems():
+      self.update_cluster_cache(cluster_name, cluster_cache)
+
+  def update_cluster_cache(self, cluster_name, cache):
     """
     Thread-safe method for writing out the specified cluster cache
     and updating the in-memory representation.
@@ -98,4 +105,7 @@ class ClusterCache(dict):
 
     logger.info("Cache value for {0} is {1}".format(self.__class__.__name__, result))
 
-    return result
\ No newline at end of file
+    return result
+
+  def get_cache_name(self):
+    raise NotImplemented()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
index 61fdf94..03e9645 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
@@ -38,8 +38,8 @@ class ClusterConfigurationCache(ClusterCache):
     """
     super(ClusterConfigurationCache, self).__init__(cluster_cache_dir)
 
-  def get_file_name(self):
-    return 'configurations.json'
+  def get_cache_name(self):
+    return 'configurations'
 
   def get_configuration_value(self, cluster_name, key):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py
index 4859b3f..2488ef0 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py
@@ -38,5 +38,5 @@ class ClusterMetadataCache(ClusterCache):
     """
     super(ClusterMetadataCache, self).__init__(cluster_cache_dir)
 
-  def get_file_name(self):
-    return 'metadata.json'
+  def get_cache_name(self):
+    return 'metadata'

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
index 19313c5..0cfe5a4 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
@@ -38,5 +38,5 @@ class ClusterTopologyCache(ClusterCache):
     """
     super(ClusterTopologyCache, self).__init__(cluster_cache_dir)
 
-  def get_file_name(self):
-    return 'topology.json'
+  def get_cache_name(self):
+    return 'topology'

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 0ff9eb9..9ad5f2e 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -18,5 +18,17 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
+
+COMMANDS_TOPIC = '/events/commands'
+CONFIGURATIONS_TOPIC = '/events/configurations'
+METADATA_TOPIC = '/events/metadata'
+TOPOLOGIES_TOPIC = '/events/topologies'
+SERVER_RESPONSES_TOPIC = '/user'
+
+TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
+
+HEARTBEAT_ENDPOINT = '/agent/heartbeat'
+REGISTRATION_ENDPOINT = '/agent/registration'
+
 AGENT_TMP_DIR = "/var/lib/ambari-agent/tmp"
 CORRELATION_ID_STRING = 'correlationId'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 4d81947..4f6edcc 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -245,8 +245,8 @@ class Controller(threading.Thread):
           cluster_name = command['clusterName']
           configurations = command['configurations']
           topology = command['clusterHostInfo']
-          self.cluster_configuration_cache.update_cache(cluster_name, configurations)
-          self.cluster_topology_cache.update_cache(cluster_name, topology)
+          self.cluster_configuration_cache.update_cluster_cache(cluster_name, configurations)
+          self.cluster_topology_cache.update_cluster_cache(cluster_name, topology)
 
           # TODO: use this once server part is ready.
           self.cluster_topology_cache.get_md5_hashsum(cluster_name)
@@ -259,7 +259,7 @@ class Controller(threading.Thread):
         if 'clusterName' in command and 'configurations' in command:
           cluster_name = command['clusterName']
           configurations = command['configurations']
-          self.cluster_configuration_cache.update_cache(cluster_name, configurations)
+          self.cluster_configuration_cache.update_cluster_cache(cluster_name, configurations)
 
           # TODO: use this once server part is ready.
           self.cluster_configuration_cache.get_md5_hashsum(cluster_name)

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 2c2f9d8..b2469d2 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -24,12 +24,17 @@ import logging
 import ambari_stomp
 import threading
 import security
+from collections import defaultdict
 
+from ambari_agent import Constants
+from ambari_agent.ClusterConfigurationCache import  ClusterConfigurationCache
+from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
+from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
 from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
+from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
+from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
+from ambari_agent.listeners.MetadataEventListener import MetadataEventListener
 
-HEARTBEAT_ENDPOINT = '/agent/heartbeat'
-REGISTRATION_ENDPOINT = '/agent/registration'
-SERVER_RESPONSES_ENDPOINT = '/user'
 HEARTBEAT_INTERVAL = 10
 
 logger = logging.getLogger(__name__)
@@ -45,7 +50,27 @@ class HeartbeatThread(threading.Thread):
     self.heartbeat_interval = HEARTBEAT_INTERVAL
     self._stop = threading.Event()
 
+    # TODO STOMP: change this once is integrated with ambari config
+    cluster_cache_dir = '/tmp'
+
+    # caches
+    self.metadata_cache = ClusterMetadataCache(cluster_cache_dir)
+    self.topology_cache = ClusterTopologyCache(cluster_cache_dir)
+    self.configurations_cache = ClusterConfigurationCache(cluster_cache_dir)
+    self.caches = [self.metadata_cache, self.topology_cache, self.configurations_cache]
+
+    # listeners
+    self.server_responses_listener = ServerResponsesListener()
+    self.metadata_events_listener = MetadataEventListener(self.metadata_cache)
+    self.topology_events_listener = TopologyEventListener(self.topology_cache)
+    self.configuration_events_listener = ConfigurationEventListener(self.configurations_cache)
+    self.listeners = [self.server_responses_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
+
   def run(self):
+    """
+    Run an endless loop of hearbeat with registration upon init or exception in heartbeating.
+    """
+    # TODO STOMP: stop the thread on SIGTERM
     while not self._stop.is_set():
       try:
         if not self.is_registered:
@@ -53,7 +78,7 @@ class HeartbeatThread(threading.Thread):
 
         heartbeat_body = self.get_heartbeat_body()
         logger.debug("Heartbeat body is {0}".format(heartbeat_body))
-        response = self.blocking_request(heartbeat_body, HEARTBEAT_ENDPOINT)
+        response = self.blocking_request(heartbeat_body, Constants.HEARTBEAT_ENDPOINT)
         logger.debug("Heartbeat response is {0}".format(response))
 
         time.sleep(self.heartbeat_interval)
@@ -63,25 +88,58 @@ class HeartbeatThread(threading.Thread):
         # TODO STOMP: re-connect here
         self.is_registered = False
         pass
-
-  def blocking_request(self, body, destination):
-    self.stomp_connector.send(body=json.dumps(body), destination=destination)
-    return self.server_responses_listener.responses.blocking_pop(str(self.stomp_connector.correlation_id))
+    logger.info("HeartbeatThread has successfully finished")
 
   def register(self):
-    # TODO STOMP: prepare data to register
-    data = {'registration-test':'true'}
-    self.server_responses_listener = ServerResponsesListener()
-    self.stomp_connector._connection = self.stomp_connector._create_new_connection(self.server_responses_listener)
-    self.stomp_connector.add_listener(self.server_responses_listener)
-    self.stomp_connector.subscribe(destination=SERVER_RESPONSES_ENDPOINT, id=1, ack='client-individual')
+    """
+    Subscribe to topics, register with server, wait for server's response.
+    """
+    self.subscribe_and_listen()
+
+    registration_request = self.get_registration_request()
+    logger.info("Registration request received")
+    logger.debug("Registration request is {0}".format(registration_request))
 
-    logger.debug("Registration request is {0}".format(data))
-    response = self.blocking_request(data, REGISTRATION_ENDPOINT)
+    response = self.blocking_request(registration_request, Constants.REGISTRATION_ENDPOINT)
+
+    logger.info("Registration response received")
     logger.debug("Registration response is {0}".format(response))
 
-    # TODO STOMP: handle registration response
+    self.registration_response = response
     self.registered = True
 
+  def get_registration_request(self):
+    """
+    Get registration request body to send it to server
+    """
+    request = {'clusters':defaultdict(lambda:{})}
+
+    for cache in self.caches:
+      cache_key_name = cache.get_cache_name() + '_hash'
+      for cluster_name in cache.get_cluster_names():
+        request['clusters'][cluster_name][cache_key_name] = cache.get_md5_hashsum(cluster_name)
+
+    return request
+
   def get_heartbeat_body(self):
-    return {'heartbeat-request-test':'true'}
\ No newline at end of file
+    """
+    Heartbeat body to be send to server
+    """
+    return {'heartbeat-request-test':'true'}
+
+  def subscribe_and_listen(self):
+    """
+    Subscribe to topics and set listener classes.
+    """
+    for listener in self.listeners:
+      self.stomp_connector.add_listener(listener)
+
+    for topic_name in Constants.TOPICS_TO_SUBSCRIBE:
+      self.stomp_connector.subscribe(destination=topic_name, id='sub', ack='client-individual')
+
+  def blocking_request(self, body, destination):
+    """
+    Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id.
+    """
+    self.stomp_connector.send(body=json.dumps(body), destination=destination)
+    return self.server_responses_listener.responses.blocking_pop(str(self.stomp_connector.correlation_id))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
new file mode 100644
index 0000000..722ec3c
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ambari_stomp
+
+from ambari_agent.listeners import EventListener
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class ConfigurationEventListener(EventListener):
+  """
+  Listener of Constants.CONFIGURATIONS_TOPIC events from server.
+  """
+  def __init__(self, configuration_cache):
+    self.topology_cache = configuration_cache
+
+  def on_event(self, headers, message):
+    """
+    Is triggered when an event to Constants.CONFIGURATIONS_TOPIC topic is received from server.
+
+    @param headers: headers dictionary
+    @param message: message payload dictionary
+    """
+    self.topology_cache.update_cache(message)
+
+  def get_handled_path(self):
+    return Constants.CONFIGURATIONS_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
new file mode 100644
index 0000000..c738ea2
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ambari_stomp
+
+from ambari_agent.listeners import EventListener
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class MetadataEventListener(EventListener):
+  """
+  Listener of Constants.METADATA_TOPIC events from server.
+  """
+  def __init__(self, configuration_cache):
+    self.topology_cache = configuration_cache
+
+  def on_event(self, headers, message):
+    """
+    Is triggered when an event to Constants.METADATA_TOPIC topic is received from server.
+
+    @param headers: headers dictionary
+    @param message: message payload dictionary
+    """
+    self.topology_cache.update_cache(message)
+
+  def get_handled_path(self):
+    return Constants.METADATA_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
index c7ad082..8502507 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
@@ -21,19 +21,32 @@ limitations under the License.
 import logging
 import ambari_stomp
 
+from ambari_agent.listeners import EventListener
 from ambari_agent import Utils
-from ambari_agent.Constants import CORRELATION_ID_STRING
+from ambari_agent import Constants
 
-logging = logging.getLogger(__name__)
+logger = logging.getLogger(__name__)
 
-class ServerResponsesListener(ambari_stomp.ConnectionListener):
+class ServerResponsesListener(EventListener):
+  """
+  Listener of Constants.SERVER_RESPONSES_TOPIC events from server.
+  """
   def __init__(self):
     self.responses = Utils.BlockingDictionary()
 
-  def on_message(self, headers, message):
-    logging.debug("Received headers={0} ; message={1}".format(headers, message))
-
-    if CORRELATION_ID_STRING in headers:
-      self.responses.put(headers[CORRELATION_ID_STRING], message)
+  def on_event(self, headers, message):
+    """
+    Is triggered when an event to Constants.SERVER_RESPONSES_TOPIC topic is received from server.
+    This type of event is general response to the agent request and contains 'correlationId', which is an int value
+    of the request it responds to.
+
+    @param headers: headers dictionary
+    @param message: message payload dictionary
+    """
+    if Constants.CORRELATION_ID_STRING in headers:
+      self.responses.put(headers[Constants.CORRELATION_ID_STRING], message)
     else:
-      logging.warn("Received a message from server without a '{0}' header. Ignoring the message".format(CORRELATION_ID_STRING))
\ No newline at end of file
+      logger.warn("Received a message from server without a '{0}' header. Ignoring the message".format(Constants.CORRELATION_ID_STRING))\
+
+  def get_handled_path(self):
+    return Constants.SERVER_RESPONSES_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
index 44afc48..61e89bd 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
@@ -18,8 +18,29 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
+import logging
 import ambari_stomp
 
-class MyListener(ambari_stomp.ConnectionListener):
-  def on_message(self, headers, message):
-    print "Received {0}".format(message)
\ No newline at end of file
+from ambari_agent.listeners import EventListener
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class TopologyEventListener(EventListener):
+  """
+  Listener of Constants.TOPOLOGIES_TOPIC events from server.
+  """
+  def __init__(self, topology_cache):
+    self.topology_cache = topology_cache
+
+  def on_event(self, headers, message):
+    """
+    Is triggered when an event to Constants.TOPOLOGIES_TOPIC topic is received from server.
+
+    @param headers: headers dictionary
+    @param message: message payload dictionary
+    """
+    self.topology_cache.update_cache(message)
+
+  def get_handled_path(self):
+    return Constants.TOPOLOGIES_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index 8cfcc3f..2b7e9bc 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -15,4 +15,47 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-"""
\ No newline at end of file
+"""
+
+import json
+import ambari_stomp
+import logging
+
+logger = logging.getLogger(__name__)
+
+class EventListener(ambari_stomp.ConnectionListener):
+  """
+  Base abstract class for event listeners on specific topics.
+  """
+  def on_message(self, headers, message):
+    """
+    This method is triggered by stomp when message from serve is received.
+
+    Here we handle some decode the message to json and check if it addressed to this specific event listener.
+    """
+    if not 'destination' in headers:
+      logger.warn("Received event from server which does not contain 'destination' header")
+      return
+
+    destination = headers['destination']
+
+    if destination.rstrip('/') == self.get_handled_path().rstrip('/'):
+      try:
+        message_json = json.loads(message)
+      except ValueError:
+        logger.exception("Received event from server does not  a valid json as a message. Message is:\n{0}".format(message))
+        return
+
+      logger.info("Received event from {0}".format(destination))
+      logger.debug("Received event from {0}: headers={1} ; message={2}".format(destination, headers, message))
+
+      self.on_event(headers, message_json)
+
+  def on_event(self, headers, message):
+    """
+    Is triggered when an event for specific listener is received:
+
+    @param headers: headers dictionary
+    @param message: message payload dictionary
+    """
+    raise NotImplementedError()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py
index 32ad556..b1599be 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -112,11 +112,10 @@ class StompConnector:
       self._connection = self._create_new_connection()
     return self._connection
 
-  def _create_new_connection(self, listener):
+  def _create_new_connection(self):
     # Connection for unit tests. TODO STOMP: fix this
     hosts = [('127.0.0.1', 21613)]
     connection = ambari_stomp.Connection(host_and_ports=hosts)
-    connection.set_listener('my_listener', listener)
     connection.start()
     connection.connect(wait=True)
 
@@ -133,8 +132,7 @@ class StompConnector:
     self.conn = None
 
   def add_listener(self, listener):
-    pass
-    #self._get_connection().set_listener('my_listener', listener)
+    self._get_connection().set_listener(listener.__class__.__name__, listener)
 
 class CachedHTTPSConnection:
   """ Caches a ssl socket and uses a single https connection to the server. """

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index b0f3a16..d2a83ff 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -21,7 +21,6 @@ import os
 import sys
 import logging
 import json
-import time
 from coilmq.util import frames
 from coilmq.util.frames import Frame
 
@@ -31,34 +30,88 @@ from ambari_agent import HeartbeatThread
 
 from mock.mock import MagicMock, patch
 
-# TODO: where agent sends?
-
 class TestAgentStompResponses(BaseStompServerTestCase):
-  @patch.object(HeartbeatThread, "time")
-  def test_mock_server_can_start(self, time_mock):
+  def test_mock_server_can_start(self):
     self.init_stdout_logger()
 
+    self.remove(['/tmp/configurations.json', '/tmp/metadata.json', '/tmp/topology.json'])
+
     heartbeat_thread = HeartbeatThread.HeartbeatThread()
+    heartbeat_thread.heartbeat_interval = 0
     heartbeat_thread.start()
 
     connect_frame = self.server.frames_queue.get()
     users_subscribe_frame = self.server.frames_queue.get()
+    commands_subscribe_frame = self.server.frames_queue.get()
+    configurations_subscribe_frame = self.server.frames_queue.get()
+    metadata_subscribe_frame = self.server.frames_queue.get()
+    topologies_subscribe_frame = self.server.frames_queue.get()
     registration_frame = self.server.frames_queue.get()
 
     # server sends registration response
     f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
     self.server.topic_manager.send(f)
 
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/configurations'}, body=self.get_json("configurations_update.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/commands'}, body=self.get_json("execution_commands.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/metadata'}, body=self.get_json("metadata_after_registration.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_update.json"))
+    self.server.topic_manager.send(f)
+
     heartbeat_frame = self.server.frames_queue.get()
+    heartbeat_thread._stop.set()
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
+    self.server.topic_manager.send(f)
+
+    heartbeat_thread.join()
+
+    self.assertEquals(heartbeat_thread.topology_cache['cl1']['topology']['hosts'][0]['hostname'], 'c6401.ambari.apache.org')
+    self.assertEquals(heartbeat_thread.metadata_cache['cl1']['metadata']['status_commands_to_run'], ('STATUS',))
+    self.assertEquals(heartbeat_thread.configurations_cache['cl1']['configurations']['zoo.cfg']['clientPort'], '2181')
+
+    """
+    ============================================================================================
+    ============================================================================================
+    """
+
+    heartbeat_thread = HeartbeatThread.HeartbeatThread()
+    heartbeat_thread.heartbeat_interval = 0
+    heartbeat_thread.start()
+
+    self.server.frames_queue.queue.clear()
+
+    connect_frame = self.server.frames_queue.get()
+    users_subscribe_frame = self.server.frames_queue.get()
+    commands_subscribe_frame = self.server.frames_queue.get()
+    configurations_subscribe_frame = self.server.frames_queue.get()
+    metadata_subscribe_frame = self.server.frames_queue.get()
+    topologies_subscribe_frame = self.server.frames_queue.get()
+    registration_frame_json = json.loads(self.server.frames_queue.get().body)
+    clusters_hashes = registration_frame_json['clusters']['cl1']
+
+    self.assertEquals(clusters_hashes['metadata_hash'], '20089c8c8682cf03e361cdab3e668ed1')
+    self.assertEquals(clusters_hashes['configurations_hash'], 'bc54fe976cade95c48eafbfdff188661')
+    self.assertEquals(clusters_hashes['topology_hash'], 'd14ca943e4a69ad0dd640f32d713d2b9')
 
+    # server sends registration response
+    f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
+    self.server.topic_manager.send(f)
+
+    heartbeat_frame = self.server.frames_queue.get()
     heartbeat_thread._stop.set()
 
-    # server sends heartbeat response
     f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
-    print "Thread successfully finished"
+
 
   def _other(self):
     f = Frame(frames.MESSAGE, headers={'destination': '/events/configurations'}, body=self.get_json("configurations_update.json"))
@@ -73,14 +126,19 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_update.json"))
     self.server.topic_manager.send(f)
 
+  def remove(self, filepathes):
+    for filepath in filepathes:
+      if os.path.isfile(filepath):
+        os.remove(filepath)
+
   def init_stdout_logger(self):
     format='%(levelname)s %(asctime)s - %(message)s'
 
     logger = logging.getLogger()
-    logger.setLevel(logging.DEBUG)
+    logger.setLevel(logging.INFO)
     formatter = logging.Formatter(format)
     chout = logging.StreamHandler(sys.stdout)
-    chout.setLevel(logging.DEBUG)
+    chout.setLevel(logging.INFO)
     chout.setFormatter(formatter)
     cherr = logging.StreamHandler(sys.stderr)
     cherr.setLevel(logging.ERROR)

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
new file mode 100644
index 0000000..8ccd8e8
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
@@ -0,0 +1,37 @@
+{
+  "clusters":{
+    "cl1":{
+      "metadata":{
+        "status_commands_to_run":[
+          "STATUS"
+        ],
+        "status_commands_timeout":"900",
+        "hooks_folder":"HDP/2.0.6/hooks",
+        "credentialStoreEnabled":"false",
+        "availableServices":{
+          "SQOOP":"1.4.6.2.5",
+          "AMBARI_METRICS":"0.1.0",
+          "KERBEROS":"1.10.3-10",
+          "RANGER":"0.6.0.2.5",
+          "ZEPPELIN":"0.6.0.2.5",
+          "HDFS":"2.7.3.2.6",
+          "ZOOKEEPER":"3.4.6"
+        },
+        "agentConfigParams":{
+          "agent":{
+            "parallel_execution":0,
+            "use_system_proxy_settings":true
+          }
+        },
+        "recoveryConfig":{
+          "type":"DEFAULT|AUTO_START|AUTO_INSTALL_START|FULL",
+          "maxCount":10,
+          "windowInMinutes":60,
+          "retryGap":0,
+          "components":"ZOOKEEPER_CLIENT,ZOOKEEPER_SERVER",
+          "recoveryTimestamp":1458150424380
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6b76fc90/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
index 779f139..f744410 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
@@ -4,128 +4,5 @@
   "errorMessage":"",
   "clusters_list":[
     "c1"
-  ],
-  "clusters":{
-    "cl1":{
-      "metadata":{
-        "status_commands_to_run":[
-          "STATUS",
-          "SECURITY_STATUS"
-        ],
-        "status_commands_timeout":"900",
-        "hooks_folder":"HDP/2.0.6/hooks",
-        "credentialStoreEnabled":"false",
-        "availableServices":{
-          "SQOOP":"1.4.6.2.5",
-          "AMBARI_METRICS":"0.1.0",
-          "KERBEROS":"1.10.3-10",
-          "RANGER":"0.6.0.2.5",
-          "ZEPPELIN":"0.6.0.2.5",
-          "HDFS":"2.7.3.2.6",
-          "ZOOKEEPER":"3.4.6"
-        },
-        "agentConfigParams":{
-          "agent":{
-            "parallel_execution":0,
-            "use_system_proxy_settings":true
-          }
-        },
-        "recoveryConfig":{
-          "type":"DEFAULT|AUTO_START|AUTO_INSTALL_START|FULL",
-          "maxCount":10,
-          "windowInMinutes":60,
-          "retryGap":0,
-          "components":"ZOOKEEPER_CLIENT,ZOOKEEPER_SERVER",
-          "recoveryTimestamp":1458150424380
-        }
-      },
-      "topology":{
-        "components":{
-          "ZOOKEEPER_SERVER":{
-            "hosts":[
-              0,
-              1,
-              4
-            ],
-            "statusCommandsParams":{
-              "script":"scripts/zookeeper_server.py",
-              "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package"
-            }
-          },
-          "ZOOKEEPER_CLIENT":{
-            "hosts":[
-              0,
-              1,
-              2,
-              3,
-              4
-            ],
-            "statusCommandsParams":{
-              "script":"scripts/zookeeper_client.py",
-              "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package"
-            }
-          },
-          "hosts":[
-            {
-              "hostname":"c6401.ambari.apache.org",
-              "rack_id":0,
-              "ipv4_ip":"10.240.0.240"
-            },
-            {
-              "hostname":"c6402.ambari.apache.org",
-              "rack_id":1,
-              "ipv4_ip":"10.240.0.241"
-            },
-            {
-              "hostname":"c6403.ambari.apache.org",
-              "rack_id":0,
-              "ipv4_ip":"10.240.0.242"
-            },
-            {
-              "hostname":"c6404.ambari.apache.org",
-              "rack_id":0,
-              "ipv4_ip":"10.240.0.243"
-            },
-            {
-              "hostname":"c6405.ambari.apache.org",
-              "rack_id":1,
-              "ipv4_ip":"10.240.0.244"
-            }
-          ],
-          "racks":[
-            {
-              "name":"/default-rack"
-            },
-            {
-              "name":"/another-rack"
-            }
-          ]
-        }
-      },
-      "configurations":{
-        "zookeeper-env":{
-          "zk_user":"zookeeper",
-          "zk_log_dir":"/var/log/zookeeper",
-          "content":"\nexport JAVA_HOME={{java64_home}}\nexport ZOO_LOG_DIR={{zk_log_dir}}\nexport ZOOPIDFILE={{zk_pid_file}}\nexport SERVER_JVMFLAGS={{zk_server_heapsize}}\nexport JAVA=$JAVA_HOME/bin/java\nexport CLASSPATH=$CLASSPATH:/usr/share/zookeeper/*\n\n{% if security_enabled %}\nexport SERVER_JVMFLAGS=\"$SERVER_JVMFLAGS -Djava.security.auth.login.config={{zk_server_jaas_file}}\"\nexport CLIENT_JVMFLAGS=\"$CLIENT_JVMFLAGS -Djava.security.auth.login.config={{zk_client_jaas_file}}\"\n{% endif %}",
-          "zk_pid_dir":"/var/run/zookeeper",
-          "zookeeper_principal_name":"zookeeper/_HOST@EXAMPLE.COM",
-          "zookeeper_keytab_path":"/etc/security/keytabs/zk.service.keytab"
-        },
-        "zoo.cfg":{
-          "clientPort":"2181",
-          "syncLimit":"5",
-          "initLimit":"10",
-          "dataDir":"/hadoop/zookeeper",
-          "tickTime":"2000"
-        }
-      },
-      "configuration_attributes":{
-        "core-site":{
-          "final":{
-            "fs.defaultFS":"true"
-          }
-        }
-      }
-    }
-  }
+  ]
 }
\ No newline at end of file