You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/04/24 12:02:06 UTC

ambari git commit: AMBARI-20828. Heartbeat and register with real server instead of mock server (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 8cc384cae -> 2a493704f


AMBARI-20828. Heartbeat and register with real server instead of mock server (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2a493704
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2a493704
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2a493704

Branch: refs/heads/branch-3.0-perf
Commit: 2a493704fdf16e8cc38f27c062c9c9bbf374734a
Parents: 8cc384c
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon Apr 24 15:02:08 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon Apr 24 15:02:08 2017 +0300

----------------------------------------------------------------------
 .../main/python/ambari_agent/AmbariConfig.py    |   2 +-
 .../main/python/ambari_agent/ClusterCache.py    |   8 +-
 .../src/main/python/ambari_agent/Constants.py   |   6 +-
 .../main/python/ambari_agent/HeartbeatThread.py |  30 ++--
 .../python/ambari_agent/InitializerModule.py    |  80 +++++++++
 .../src/main/python/ambari_agent/Utils.py       |  22 ++-
 .../main/python/ambari_agent/client_example.py  |   8 +-
 .../src/main/python/ambari_agent/security.py    |  33 +---
 .../ambari_agent/BaseStompServerTestCase.py     | 164 ++++++++++---------
 .../ambari_agent/TestAgentStompResponses.py     |  37 ++---
 .../python/ambari_stomp/adapter/websocket.py    |   2 +-
 11 files changed, 234 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index cf48189..fe48870 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -43,7 +43,7 @@ data_cleanup_interval=86400
 data_cleanup_max_age=2592000
 data_cleanup_max_size_MB = 100
 ping_port=8670
-cache_dir={ps}var{ps}lib{ps}ambari-agent{ps}cache
+cache_dir={ps}tmp
 parallel_execution=0
 system_resource_overrides={ps}etc{ps}resource_overrides
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index f2ac8ed..4bd94ae 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -46,10 +46,6 @@ class ClusterCache(dict):
     self._cache_lock = threading.RLock()
     self.__current_cache_json_file = os.path.join(self.cluster_cache_dir, self.get_cache_name()+'.json')
 
-    # ensure that our cache directory exists
-    if not os.path.exists(cluster_cache_dir):
-      os.makedirs(cluster_cache_dir)
-
     # if the file exists, then load it
     cache_dict = {}
     if os.path.isfile(self.__current_cache_json_file):
@@ -84,6 +80,10 @@ class ClusterCache(dict):
 
 
     with self.__file_lock:
+      # ensure that our cache directory exists
+      if not os.path.exists(self.cluster_cache_dir):
+        os.makedirs(self.cluster_cache_dir)
+
       with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f:
         json.dump(self, f, indent=2)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 9ad5f2e..50fac9e 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -23,12 +23,12 @@ COMMANDS_TOPIC = '/events/commands'
 CONFIGURATIONS_TOPIC = '/events/configurations'
 METADATA_TOPIC = '/events/metadata'
 TOPOLOGIES_TOPIC = '/events/topologies'
-SERVER_RESPONSES_TOPIC = '/user'
+SERVER_RESPONSES_TOPIC = '/user/'
 
 TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
 
-HEARTBEAT_ENDPOINT = '/agent/heartbeat'
-REGISTRATION_ENDPOINT = '/agent/registration'
+HEARTBEAT_ENDPOINT = '/heartbeat'
+REGISTRATION_ENDPOINT = '/register'
 
 AGENT_TMP_DIR = "/var/lib/ambari-agent/tmp"
 CORRELATION_ID_STRING = 'correlationId'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index b2469d2..63674d5 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -27,9 +27,7 @@ import security
 from collections import defaultdict
 
 from ambari_agent import Constants
-from ambari_agent.ClusterConfigurationCache import  ClusterConfigurationCache
-from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
-from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
+from ambari_agent.InitializerModule import initializer_module
 from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
 from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
 from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
@@ -45,25 +43,17 @@ class HeartbeatThread(threading.Thread):
   """
   def __init__(self):
     threading.Thread.__init__(self)
-    self.stomp_connector = security.StompConnector()
     self.is_registered = False
     self.heartbeat_interval = HEARTBEAT_INTERVAL
     self._stop = threading.Event()
 
-    # TODO STOMP: change this once is integrated with ambari config
-    cluster_cache_dir = '/tmp'
-
-    # caches
-    self.metadata_cache = ClusterMetadataCache(cluster_cache_dir)
-    self.topology_cache = ClusterTopologyCache(cluster_cache_dir)
-    self.configurations_cache = ClusterConfigurationCache(cluster_cache_dir)
-    self.caches = [self.metadata_cache, self.topology_cache, self.configurations_cache]
+    self.caches = [initializer_module.metadata_cache, initializer_module.topology_cache, initializer_module.configurations_cache]
 
     # listeners
     self.server_responses_listener = ServerResponsesListener()
-    self.metadata_events_listener = MetadataEventListener(self.metadata_cache)
-    self.topology_events_listener = TopologyEventListener(self.topology_cache)
-    self.configuration_events_listener = ConfigurationEventListener(self.configurations_cache)
+    self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache)
+    self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
+    self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
     self.listeners = [self.server_responses_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
 
   def run(self):
@@ -97,7 +87,7 @@ class HeartbeatThread(threading.Thread):
     self.subscribe_and_listen()
 
     registration_request = self.get_registration_request()
-    logger.info("Registration request received")
+    logger.info("Sending registration request")
     logger.debug("Registration request is {0}".format(registration_request))
 
     response = self.blocking_request(registration_request, Constants.REGISTRATION_ENDPOINT)
@@ -132,14 +122,14 @@ class HeartbeatThread(threading.Thread):
     Subscribe to topics and set listener classes.
     """
     for listener in self.listeners:
-      self.stomp_connector.add_listener(listener)
+      initializer_module.connection.add_listener(listener)
 
     for topic_name in Constants.TOPICS_TO_SUBSCRIBE:
-      self.stomp_connector.subscribe(destination=topic_name, id='sub', ack='client-individual')
+      initializer_module.connection.subscribe(destination=topic_name, id='sub', ack='client-individual')
 
   def blocking_request(self, body, destination):
     """
     Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id.
     """
-    self.stomp_connector.send(body=json.dumps(body), destination=destination)
-    return self.server_responses_listener.responses.blocking_pop(str(self.stomp_connector.correlation_id))
\ No newline at end of file
+    initializer_module.connection.send(body=json.dumps(body), destination=destination)
+    return self.server_responses_listener.responses.blocking_pop(str(initializer_module.connection.correlation_id))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
new file mode 100644
index 0000000..76d14e5
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import os
+from ambari_agent.FileCache import FileCache
+from ambari_agent.AmbariConfig import AmbariConfig
+from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache
+from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
+from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
+from ambari_agent.Utils import lazy_property
+from ambari_agent.security import AmbariStompConnection
+
+logger = logging.getLogger()
+
+class InitializerModule:
+  """
+  - Instantiate some singleton classes or widely used instances along with providing their dependencies.
+  - Reduce cross modules dependencies.
+  - Make other components code cleaner.
+  - Provide an easier way to mock some dependencies.
+  """
+  def __init__(self):
+    self.initConfigs()
+    self.init()
+
+  def initConfigs(self):
+    """
+    Initialize every property got from ambari-agent.ini
+    """
+    self.ambariConfig = AmbariConfig.get_resolved_config()
+
+    self.server_hostname = self.ambariConfig.get('server', 'hostname')
+    self.secured_url_port = self.ambariConfig.get('server', 'secured_url_port')
+
+    self.cache_dir = self.ambariConfig.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache')
+    self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY)
+
+  def init(self):
+    """
+    Initialize properties
+    """
+    self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir)
+    self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir)
+    self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
+
+  @lazy_property
+  def connection(self):
+    """
+    Create a stomp connection
+    """
+    # TODO STOMP: handle if agent.ssl=false?
+    connection_url = 'wss://{0}:{1}/agent/stomp/v1'.format(self.server_hostname, self.secured_url_port)
+
+    logging.info("Connecting to {0}".format(connection_url))
+
+    conn = AmbariStompConnection(connection_url)
+    conn.start()
+    conn.connect(wait=True)
+
+    return conn
+
+initializer_module = InitializerModule()

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py
index 8078ad2..6e919c0 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 """
 import threading
+from functools import wraps
 
 class BlockingDictionary():
   """
@@ -86,4 +87,23 @@ ImmutableDictionary.__setitem__ = raise_immutable_error
 ImmutableDictionary.__delitem__ = raise_immutable_error
 ImmutableDictionary.clear = raise_immutable_error
 ImmutableDictionary.pop = raise_immutable_error
-ImmutableDictionary.update = raise_immutable_error
\ No newline at end of file
+ImmutableDictionary.update = raise_immutable_error
+
+
+def lazy_property(undecorated):
+  """
+  Only run the function decorated once. Next time return cached value.
+  """
+  name = '_' + undecorated.__name__
+
+  @property
+  @wraps(undecorated)
+  def decorated(self):
+    try:
+      return getattr(self, name)
+    except AttributeError:
+      v = undecorated(self)
+      setattr(self, name, v)
+      return v
+
+  return decorated
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/client_example.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/client_example.py b/ambari-agent/src/main/python/ambari_agent/client_example.py
index 96e76be..4be0d47 100644
--- a/ambari-agent/src/main/python/ambari_agent/client_example.py
+++ b/ambari-agent/src/main/python/ambari_agent/client_example.py
@@ -29,7 +29,6 @@ def get_headers():
   global correlationId
   correlationId += 1
   headers = {
-    "content-type": "text/plain",
     "correlationId": correlationId
   }
   return headers
@@ -50,8 +49,9 @@ class MyStatsListener(ambari_stomp.StatsListener):
 
 read_messages = []
 
-conn = websocket.WsConnection('ws://gc6401:8080/api/stomp/v1')
-conn.transport.ws.extra_headers = [("Authorization", "Basic " + base64.b64encode('admin:admin'))]
+#conn = websocket.WsConnection('ws://gc6401:8080/api/stomp/v1')
+#conn.transport.ws.extra_headers = [("Authorization", "Basic " + base64.b64encode('admin:admin'))]
+conn = websocket.WsConnection('wss://gc6401:8441/agent/stomp/v1')
 conn.set_listener('my_listener', MyListener())
 conn.set_listener('stats_listener', MyStatsListener())
 conn.start()
@@ -61,7 +61,7 @@ conn.connect(wait=True, headers=get_headers())
 conn.subscribe(destination='/user/', id='sub-0', ack='client-individual')
 
 #conn.send(body="", destination='/test/time', headers=get_headers())
-conn.send(body="some message", destination='/test/echo', headers=get_headers())
+conn.send(body="{}", destination='/register', headers=get_headers())
 time.sleep(1)
 for message in read_messages:
   conn.ack(message['id'], message['subscription'])

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py
index b1599be..b3cb16e 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -30,6 +30,7 @@ import traceback
 import hostname
 import platform
 import ambari_stomp
+from ambari_stomp.adapter.websocket import WsConnection
 
 logger = logging.getLogger(__name__)
 
@@ -100,39 +101,17 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
 
     return sock
 
-# TODO STOMP: When server part is ready re-write this class by extending WsConnection.
-class StompConnector:
-  def __init__(self):
+class AmbariStompConnection(WsConnection):
+  def __init__(self, url):
     self.correlation_id = -1
-    self._connection = None
-
-  # TODO STOMP: re-init this on_disconnect
-  def _get_connection(self):
-    if not self._connection:
-      self._connection = self._create_new_connection()
-    return self._connection
-
-  def _create_new_connection(self):
-    # Connection for unit tests. TODO STOMP: fix this
-    hosts = [('127.0.0.1', 21613)]
-    connection = ambari_stomp.Connection(host_and_ports=hosts)
-    connection.start()
-    connection.connect(wait=True)
-
-    return connection
+    WsConnection.__init__(self, url)
 
   def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
     self.correlation_id += 1
-    self._get_connection().send(destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers)
-
-  def subscribe(self, *args, **kwargs):
-    self._get_connection().subscribe(*args, **kwargs)
-
-  def untrack_connection(self):
-    self.conn = None
+    WsConnection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers)
 
   def add_listener(self, listener):
-    self._get_connection().set_listener(listener.__class__.__name__, listener)
+    self.set_listener(listener.__class__.__name__, listener)
 
 class CachedHTTPSConnection:
   """ Caches a ssl socket and uses a single https connection to the server. """

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
index 194002f..671387a 100644
--- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
+++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
+import ambari_stomp
 import os
 import sys
 import time
@@ -42,85 +43,85 @@ from coilmq.scheduler import FavorReliableSubscriberScheduler, RandomQueueSchedu
 from coilmq.protocol import STOMP10
 
 class BaseStompServerTestCase(unittest.TestCase):
+  """
+  Base class for test cases provides the fixtures for setting up the multi-threaded
+  unit test infrastructure.
+  We use a combination of C{threading.Event} and C{Queue.Queue} objects to faciliate
+  inter-thread communication and lock-stepping the assertions.
+  """
+
+  def setUp(self):
+
+    self.clients = []
+    self.server = None  # This gets set in the server thread.
+    self.server_address = None  # This gets set in the server thread.
+    self.ready_event = threading.Event()
+
+    addr_bound = threading.Event()
+
+    def start_server():
+      self.server = TestStompServer(('127.0.0.1', 21613),
+                                    ready_event=self.ready_event,
+                                    authenticator=None,
+                                    queue_manager=self._queuemanager(),
+                                    topic_manager=self._topicmanager())
+      self.server_address = self.server.socket.getsockname()
+      addr_bound.set()
+      self.server.serve_forever()
+
+    self.server_thread = threading.Thread(
+        target=start_server, name='server')
+    self.server_thread.start()
+    self.ready_event.wait()
+    addr_bound.wait()
+
+  def _queuemanager(self):
     """
-    Base class for test cases provides the fixtures for setting up the multi-threaded
-    unit test infrastructure.
-    We use a combination of C{threading.Event} and C{Queue.Queue} objects to faciliate
-    inter-thread communication and lock-stepping the assertions. 
+    Returns the configured L{QueueManager} instance to use.
+    Can be overridden by subclasses that wish to change out any queue mgr parameters.
+    @rtype: L{QueueManager}
     """
+    return QueueManager(store=MemoryQueue(),
+                        subscriber_scheduler=FavorReliableSubscriberScheduler(),
+                        queue_scheduler=RandomQueueScheduler())
 
-    def setUp(self):
-
-        self.clients = []
-        self.server = None  # This gets set in the server thread.
-        self.server_address = None  # This gets set in the server thread.
-        self.ready_event = threading.Event()
-
-        addr_bound = threading.Event()
-
-        def start_server():
-            self.server = TestStompServer(('127.0.0.1', 21613),
-                                          ready_event=self.ready_event,
-                                          authenticator=None,
-                                          queue_manager=self._queuemanager(),
-                                          topic_manager=self._topicmanager())
-            self.server_address = self.server.socket.getsockname()
-            addr_bound.set()
-            self.server.serve_forever()
-
-        self.server_thread = threading.Thread(
-            target=start_server, name='server')
-        self.server_thread.start()
-        self.ready_event.wait()
-        addr_bound.wait()
-
-    def _queuemanager(self):
-        """
-        Returns the configured L{QueueManager} instance to use.
-        Can be overridden by subclasses that wish to change out any queue mgr parameters.
-        @rtype: L{QueueManager}
-        """
-        return QueueManager(store=MemoryQueue(),
-                            subscriber_scheduler=FavorReliableSubscriberScheduler(),
-                            queue_scheduler=RandomQueueScheduler())
-
-    def _topicmanager(self):
-        """
-        Returns the configured L{TopicManager} instance to use.
-        Can be overridden by subclasses that wish to change out any topic mgr parameters.
-        @rtype: L{TopicManager}
-        """
-        return TopicManager()
+  def _topicmanager(self):
+    """
+    Returns the configured L{TopicManager} instance to use.
+    Can be overridden by subclasses that wish to change out any topic mgr parameters.
+    @rtype: L{TopicManager}
+    """
+    return TopicManager()
 
-    def tearDown(self):
-        for c in self.clients:
-            c.close()
-        self.server.shutdown() # server_close takes too much time
-        self.server_thread.join()
-        self.ready_event.clear()
-        del self.server_thread
+  def tearDown(self):
+    for c in self.clients:
+      c.close()
+    self.server.shutdown() # server_close takes too much time
+    self.server_thread.join()
+    self.ready_event.clear()
+    del self.server_thread
 
-    def _new_client(self, connect=True):
-        """
-        Get a new L{TestStompClient} connected to our test server. 
-        The client will also be registered for close in the tearDown method.
-        @param connect: Whether to issue the CONNECT command.
-        @type connect: C{bool}
-        @rtype: L{TestStompClient}
-        """
-        client = TestStompClient(self.server_address)
-        self.clients.append(client)
-        if connect:
-            client.connect()
-            res = client.received_frames.get(timeout=1)
-            self.assertEqual(res.cmd, frames.CONNECTED)
-        return client
+  def _new_client(self, connect=True):
+    """
+    Get a new L{TestStompClient} connected to our test server.
+    The client will also be registered for close in the tearDown method.
+    @param connect: Whether to issue the CONNECT command.
+    @type connect: C{bool}
+    @rtype: L{TestStompClient}
+    """
+    client = TestStompClient(self.server_address)
+    self.clients.append(client)
+    if connect:
+      client.connect()
+      res = client.received_frames.get(timeout=1)
+      self.assertEqual(res.cmd, frames.CONNECTED)
+    return client
 
-    def get_json(self, filename):
-      filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "dummy_files", "stomp", filename)
+  def get_json(self, filename):
+    filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "dummy_files", "stomp", filename)
 
-      with open(filepath) as f:
-        return f.read()
+    with open(filepath) as f:
+      return f.read()
 
 
 class TestStompServer(ThreadedStompServer):
@@ -151,7 +152,7 @@ class TestStompServer(ThreadedStompServer):
 class TestStompClient(object):
     """
     A stomp client for use in testing.
-    This client spawns a listener thread and pushes anything that comes in onto the 
+    This client spawns a listener thread and pushes anything that comes in onto the
     read_frames queue.
     @ivar received_frames: A queue of Frame instances that have been received.
     @type received_frames: C{Queue.Queue} containing any received C{stompclient.frame.Frame}
@@ -227,4 +228,19 @@ class TestStompClient(object):
             raise RuntimeError("Not connected")
         self.connected = False
         self.read_stopped.wait(timeout=0.5)
-        self.sock.close()
\ No newline at end of file
+        self.sock.close()
+
+class TestCaseTcpConnection(ambari_stomp.Connection):
+  def __init__(self, url):
+    self.correlation_id = -1
+    ambari_stomp.Connection.__init__(self, host_and_ports=[('127.0.0.1', 21613)])
+
+  def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
+    self.correlation_id += 1
+    ambari_stomp.Connection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers)
+
+  def add_listener(self, listener):
+    self.set_listener(listener.__class__.__name__, listener)
+
+from ambari_agent import security
+security.AmbariStompConnection = TestCaseTcpConnection
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index d2a83ff..f5caa7b 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -27,6 +27,7 @@ from coilmq.util.frames import Frame
 from BaseStompServerTestCase import BaseStompServerTestCase
 
 from ambari_agent import HeartbeatThread
+from ambari_agent.InitializerModule import initializer_module
 
 from mock.mock import MagicMock, patch
 
@@ -34,6 +35,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
   def test_mock_server_can_start(self):
     self.init_stdout_logger()
 
+    #initializer_module.server_hostname = 'gc6401'
+    #initializer_module.init()
+
     self.remove(['/tmp/configurations.json', '/tmp/metadata.json', '/tmp/topology.json'])
 
     heartbeat_thread = HeartbeatThread.HeartbeatThread()
@@ -49,7 +53,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     registration_frame = self.server.frames_queue.get()
 
     # server sends registration response
-    f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
     self.server.topic_manager.send(f)
 
     f = Frame(frames.MESSAGE, headers={'destination': '/events/configurations'}, body=self.get_json("configurations_update.json"))
@@ -67,26 +71,27 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     heartbeat_frame = self.server.frames_queue.get()
     heartbeat_thread._stop.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
 
-    self.assertEquals(heartbeat_thread.topology_cache['cl1']['topology']['hosts'][0]['hostname'], 'c6401.ambari.apache.org')
-    self.assertEquals(heartbeat_thread.metadata_cache['cl1']['metadata']['status_commands_to_run'], ('STATUS',))
-    self.assertEquals(heartbeat_thread.configurations_cache['cl1']['configurations']['zoo.cfg']['clientPort'], '2181')
+    self.assertEquals(initializer_module.topology_cache['cl1']['topology']['hosts'][0]['hostname'], 'c6401.ambari.apache.org')
+    self.assertEquals(initializer_module.metadata_cache['cl1']['metadata']['status_commands_to_run'], ('STATUS',))
+    self.assertEquals(initializer_module.configurations_cache['cl1']['configurations']['zoo.cfg']['clientPort'], '2181')
 
     """
     ============================================================================================
     ============================================================================================
     """
 
+    delattr(initializer_module,'_connection')
+    self.server.frames_queue.queue.clear()
+
     heartbeat_thread = HeartbeatThread.HeartbeatThread()
     heartbeat_thread.heartbeat_interval = 0
     heartbeat_thread.start()
 
-    self.server.frames_queue.queue.clear()
-
     connect_frame = self.server.frames_queue.get()
     users_subscribe_frame = self.server.frames_queue.get()
     commands_subscribe_frame = self.server.frames_queue.get()
@@ -101,31 +106,17 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     self.assertEquals(clusters_hashes['topology_hash'], 'd14ca943e4a69ad0dd640f32d713d2b9')
 
     # server sends registration response
-    f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
     self.server.topic_manager.send(f)
 
     heartbeat_frame = self.server.frames_queue.get()
     heartbeat_thread._stop.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
 
-
-  def _other(self):
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/configurations'}, body=self.get_json("configurations_update.json"))
-    self.server.topic_manager.send(f)
-
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/commands'}, body=self.get_json("execution_commands.json"))
-    self.server.topic_manager.send(f)
-
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/metadata'}, body=self.get_json("metadata_update.json"))
-    self.server.topic_manager.send(f)
-
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_update.json"))
-    self.server.topic_manager.send(f)
-
   def remove(self, filepathes):
     for filepath in filepathes:
       if os.path.isfile(filepath):

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
index 8416a27..91eaaac 100644
--- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
+++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
@@ -95,7 +95,7 @@ class WsTransport(Transport):
     Transport.stop(self)
 
 class WsConnection(BaseConnection, Protocol12):
-  def __init__(self, url, wait_on_receipt=False):
+  def __init__(self, url):
     self.transport = WsTransport(url)
     self.transport.set_listener('ws-listener', self)
     self.transactions = {}