You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/04/24 12:02:06 UTC
ambari git commit: AMBARI-20828. Heartbeat and register with real
server instead of mock server (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf 8cc384cae -> 2a493704f
AMBARI-20828. Heartbeat and register with real server instead of mock server (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2a493704
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2a493704
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2a493704
Branch: refs/heads/branch-3.0-perf
Commit: 2a493704fdf16e8cc38f27c062c9c9bbf374734a
Parents: 8cc384c
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon Apr 24 15:02:08 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon Apr 24 15:02:08 2017 +0300
----------------------------------------------------------------------
.../main/python/ambari_agent/AmbariConfig.py | 2 +-
.../main/python/ambari_agent/ClusterCache.py | 8 +-
.../src/main/python/ambari_agent/Constants.py | 6 +-
.../main/python/ambari_agent/HeartbeatThread.py | 30 ++--
.../python/ambari_agent/InitializerModule.py | 80 +++++++++
.../src/main/python/ambari_agent/Utils.py | 22 ++-
.../main/python/ambari_agent/client_example.py | 8 +-
.../src/main/python/ambari_agent/security.py | 33 +---
.../ambari_agent/BaseStompServerTestCase.py | 164 ++++++++++---------
.../ambari_agent/TestAgentStompResponses.py | 37 ++---
.../python/ambari_stomp/adapter/websocket.py | 2 +-
11 files changed, 234 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index cf48189..fe48870 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -43,7 +43,7 @@ data_cleanup_interval=86400
data_cleanup_max_age=2592000
data_cleanup_max_size_MB = 100
ping_port=8670
-cache_dir={ps}var{ps}lib{ps}ambari-agent{ps}cache
+cache_dir={ps}tmp
parallel_execution=0
system_resource_overrides={ps}etc{ps}resource_overrides
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index f2ac8ed..4bd94ae 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -46,10 +46,6 @@ class ClusterCache(dict):
self._cache_lock = threading.RLock()
self.__current_cache_json_file = os.path.join(self.cluster_cache_dir, self.get_cache_name()+'.json')
- # ensure that our cache directory exists
- if not os.path.exists(cluster_cache_dir):
- os.makedirs(cluster_cache_dir)
-
# if the file exists, then load it
cache_dict = {}
if os.path.isfile(self.__current_cache_json_file):
@@ -84,6 +80,10 @@ class ClusterCache(dict):
with self.__file_lock:
+ # ensure that our cache directory exists
+ if not os.path.exists(self.cluster_cache_dir):
+ os.makedirs(self.cluster_cache_dir)
+
with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f:
json.dump(self, f, indent=2)
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 9ad5f2e..50fac9e 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -23,12 +23,12 @@ COMMANDS_TOPIC = '/events/commands'
CONFIGURATIONS_TOPIC = '/events/configurations'
METADATA_TOPIC = '/events/metadata'
TOPOLOGIES_TOPIC = '/events/topologies'
-SERVER_RESPONSES_TOPIC = '/user'
+SERVER_RESPONSES_TOPIC = '/user/'
TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
-HEARTBEAT_ENDPOINT = '/agent/heartbeat'
-REGISTRATION_ENDPOINT = '/agent/registration'
+HEARTBEAT_ENDPOINT = '/heartbeat'
+REGISTRATION_ENDPOINT = '/register'
AGENT_TMP_DIR = "/var/lib/ambari-agent/tmp"
CORRELATION_ID_STRING = 'correlationId'
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index b2469d2..63674d5 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -27,9 +27,7 @@ import security
from collections import defaultdict
from ambari_agent import Constants
-from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache
-from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
-from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
+from ambari_agent.InitializerModule import initializer_module
from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
@@ -45,25 +43,17 @@ class HeartbeatThread(threading.Thread):
"""
def __init__(self):
threading.Thread.__init__(self)
- self.stomp_connector = security.StompConnector()
self.is_registered = False
self.heartbeat_interval = HEARTBEAT_INTERVAL
self._stop = threading.Event()
- # TODO STOMP: change this once is integrated with ambari config
- cluster_cache_dir = '/tmp'
-
- # caches
- self.metadata_cache = ClusterMetadataCache(cluster_cache_dir)
- self.topology_cache = ClusterTopologyCache(cluster_cache_dir)
- self.configurations_cache = ClusterConfigurationCache(cluster_cache_dir)
- self.caches = [self.metadata_cache, self.topology_cache, self.configurations_cache]
+ self.caches = [initializer_module.metadata_cache, initializer_module.topology_cache, initializer_module.configurations_cache]
# listeners
self.server_responses_listener = ServerResponsesListener()
- self.metadata_events_listener = MetadataEventListener(self.metadata_cache)
- self.topology_events_listener = TopologyEventListener(self.topology_cache)
- self.configuration_events_listener = ConfigurationEventListener(self.configurations_cache)
+ self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache)
+ self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
+ self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
self.listeners = [self.server_responses_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
def run(self):
@@ -97,7 +87,7 @@ class HeartbeatThread(threading.Thread):
self.subscribe_and_listen()
registration_request = self.get_registration_request()
- logger.info("Registration request received")
+ logger.info("Sending registration request")
logger.debug("Registration request is {0}".format(registration_request))
response = self.blocking_request(registration_request, Constants.REGISTRATION_ENDPOINT)
@@ -132,14 +122,14 @@ class HeartbeatThread(threading.Thread):
Subscribe to topics and set listener classes.
"""
for listener in self.listeners:
- self.stomp_connector.add_listener(listener)
+ initializer_module.connection.add_listener(listener)
for topic_name in Constants.TOPICS_TO_SUBSCRIBE:
- self.stomp_connector.subscribe(destination=topic_name, id='sub', ack='client-individual')
+ initializer_module.connection.subscribe(destination=topic_name, id='sub', ack='client-individual')
def blocking_request(self, body, destination):
"""
Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id.
"""
- self.stomp_connector.send(body=json.dumps(body), destination=destination)
- return self.server_responses_listener.responses.blocking_pop(str(self.stomp_connector.correlation_id))
\ No newline at end of file
+ initializer_module.connection.send(body=json.dumps(body), destination=destination)
+ return self.server_responses_listener.responses.blocking_pop(str(initializer_module.connection.correlation_id))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
new file mode 100644
index 0000000..76d14e5
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import os
+from ambari_agent.FileCache import FileCache
+from ambari_agent.AmbariConfig import AmbariConfig
+from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache
+from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
+from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
+from ambari_agent.Utils import lazy_property
+from ambari_agent.security import AmbariStompConnection
+
+logger = logging.getLogger()
+
+class InitializerModule:
+ """
+ - Instantiate some singleton classes or widely used instances along with providing their dependencies.
+ - Reduce cross modules dependencies.
+ - Make other components code cleaner.
+ - Provide an easier way to mock some dependencies.
+ """
+ def __init__(self):
+ self.initConfigs()
+ self.init()
+
+ def initConfigs(self):
+ """
+ Initialize every property got from ambari-agent.ini
+ """
+ self.ambariConfig = AmbariConfig.get_resolved_config()
+
+ self.server_hostname = self.ambariConfig.get('server', 'hostname')
+ self.secured_url_port = self.ambariConfig.get('server', 'secured_url_port')
+
+ self.cache_dir = self.ambariConfig.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache')
+ self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY)
+
+ def init(self):
+ """
+ Initialize properties
+ """
+ self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir)
+ self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir)
+ self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
+
+ @lazy_property
+ def connection(self):
+ """
+ Create a stomp connection
+ """
+ # TODO STOMP: handle if agent.ssl=false?
+ connection_url = 'wss://{0}:{1}/agent/stomp/v1'.format(self.server_hostname, self.secured_url_port)
+
+ logging.info("Connecting to {0}".format(connection_url))
+
+ conn = AmbariStompConnection(connection_url)
+ conn.start()
+ conn.connect(wait=True)
+
+ return conn
+
+initializer_module = InitializerModule()
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py
index 8078ad2..6e919c0 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import threading
+from functools import wraps
class BlockingDictionary():
"""
@@ -86,4 +87,23 @@ ImmutableDictionary.__setitem__ = raise_immutable_error
ImmutableDictionary.__delitem__ = raise_immutable_error
ImmutableDictionary.clear = raise_immutable_error
ImmutableDictionary.pop = raise_immutable_error
-ImmutableDictionary.update = raise_immutable_error
\ No newline at end of file
+ImmutableDictionary.update = raise_immutable_error
+
+
+def lazy_property(undecorated):
+ """
+ Only run the function decorated once. Next time return cached value.
+ """
+ name = '_' + undecorated.__name__
+
+ @property
+ @wraps(undecorated)
+ def decorated(self):
+ try:
+ return getattr(self, name)
+ except AttributeError:
+ v = undecorated(self)
+ setattr(self, name, v)
+ return v
+
+ return decorated
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/client_example.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/client_example.py b/ambari-agent/src/main/python/ambari_agent/client_example.py
index 96e76be..4be0d47 100644
--- a/ambari-agent/src/main/python/ambari_agent/client_example.py
+++ b/ambari-agent/src/main/python/ambari_agent/client_example.py
@@ -29,7 +29,6 @@ def get_headers():
global correlationId
correlationId += 1
headers = {
- "content-type": "text/plain",
"correlationId": correlationId
}
return headers
@@ -50,8 +49,9 @@ class MyStatsListener(ambari_stomp.StatsListener):
read_messages = []
-conn = websocket.WsConnection('ws://gc6401:8080/api/stomp/v1')
-conn.transport.ws.extra_headers = [("Authorization", "Basic " + base64.b64encode('admin:admin'))]
+#conn = websocket.WsConnection('ws://gc6401:8080/api/stomp/v1')
+#conn.transport.ws.extra_headers = [("Authorization", "Basic " + base64.b64encode('admin:admin'))]
+conn = websocket.WsConnection('wss://gc6401:8441/agent/stomp/v1')
conn.set_listener('my_listener', MyListener())
conn.set_listener('stats_listener', MyStatsListener())
conn.start()
@@ -61,7 +61,7 @@ conn.connect(wait=True, headers=get_headers())
conn.subscribe(destination='/user/', id='sub-0', ack='client-individual')
#conn.send(body="", destination='/test/time', headers=get_headers())
-conn.send(body="some message", destination='/test/echo', headers=get_headers())
+conn.send(body="{}", destination='/register', headers=get_headers())
time.sleep(1)
for message in read_messages:
conn.ack(message['id'], message['subscription'])
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py
index b1599be..b3cb16e 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -30,6 +30,7 @@ import traceback
import hostname
import platform
import ambari_stomp
+from ambari_stomp.adapter.websocket import WsConnection
logger = logging.getLogger(__name__)
@@ -100,39 +101,17 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
return sock
-# TODO STOMP: When server part is ready re-write this class by extending WsConnection.
-class StompConnector:
- def __init__(self):
+class AmbariStompConnection(WsConnection):
+ def __init__(self, url):
self.correlation_id = -1
- self._connection = None
-
- # TODO STOMP: re-init this on_disconnect
- def _get_connection(self):
- if not self._connection:
- self._connection = self._create_new_connection()
- return self._connection
-
- def _create_new_connection(self):
- # Connection for unit tests. TODO STOMP: fix this
- hosts = [('127.0.0.1', 21613)]
- connection = ambari_stomp.Connection(host_and_ports=hosts)
- connection.start()
- connection.connect(wait=True)
-
- return connection
+ WsConnection.__init__(self, url)
def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
self.correlation_id += 1
- self._get_connection().send(destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers)
-
- def subscribe(self, *args, **kwargs):
- self._get_connection().subscribe(*args, **kwargs)
-
- def untrack_connection(self):
- self.conn = None
+ WsConnection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers)
def add_listener(self, listener):
- self._get_connection().set_listener(listener.__class__.__name__, listener)
+ self.set_listener(listener.__class__.__name__, listener)
class CachedHTTPSConnection:
""" Caches a ssl socket and uses a single https connection to the server. """
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
index 194002f..671387a 100644
--- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
+++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
+import ambari_stomp
import os
import sys
import time
@@ -42,85 +43,85 @@ from coilmq.scheduler import FavorReliableSubscriberScheduler, RandomQueueSchedu
from coilmq.protocol import STOMP10
class BaseStompServerTestCase(unittest.TestCase):
+ """
+ Base class for test cases provides the fixtures for setting up the multi-threaded
+ unit test infrastructure.
+ We use a combination of C{threading.Event} and C{Queue.Queue} objects to faciliate
+ inter-thread communication and lock-stepping the assertions.
+ """
+
+ def setUp(self):
+
+ self.clients = []
+ self.server = None # This gets set in the server thread.
+ self.server_address = None # This gets set in the server thread.
+ self.ready_event = threading.Event()
+
+ addr_bound = threading.Event()
+
+ def start_server():
+ self.server = TestStompServer(('127.0.0.1', 21613),
+ ready_event=self.ready_event,
+ authenticator=None,
+ queue_manager=self._queuemanager(),
+ topic_manager=self._topicmanager())
+ self.server_address = self.server.socket.getsockname()
+ addr_bound.set()
+ self.server.serve_forever()
+
+ self.server_thread = threading.Thread(
+ target=start_server, name='server')
+ self.server_thread.start()
+ self.ready_event.wait()
+ addr_bound.wait()
+
+ def _queuemanager(self):
"""
- Base class for test cases provides the fixtures for setting up the multi-threaded
- unit test infrastructure.
- We use a combination of C{threading.Event} and C{Queue.Queue} objects to faciliate
- inter-thread communication and lock-stepping the assertions.
+ Returns the configured L{QueueManager} instance to use.
+ Can be overridden by subclasses that wish to change out any queue mgr parameters.
+ @rtype: L{QueueManager}
"""
+ return QueueManager(store=MemoryQueue(),
+ subscriber_scheduler=FavorReliableSubscriberScheduler(),
+ queue_scheduler=RandomQueueScheduler())
- def setUp(self):
-
- self.clients = []
- self.server = None # This gets set in the server thread.
- self.server_address = None # This gets set in the server thread.
- self.ready_event = threading.Event()
-
- addr_bound = threading.Event()
-
- def start_server():
- self.server = TestStompServer(('127.0.0.1', 21613),
- ready_event=self.ready_event,
- authenticator=None,
- queue_manager=self._queuemanager(),
- topic_manager=self._topicmanager())
- self.server_address = self.server.socket.getsockname()
- addr_bound.set()
- self.server.serve_forever()
-
- self.server_thread = threading.Thread(
- target=start_server, name='server')
- self.server_thread.start()
- self.ready_event.wait()
- addr_bound.wait()
-
- def _queuemanager(self):
- """
- Returns the configured L{QueueManager} instance to use.
- Can be overridden by subclasses that wish to change out any queue mgr parameters.
- @rtype: L{QueueManager}
- """
- return QueueManager(store=MemoryQueue(),
- subscriber_scheduler=FavorReliableSubscriberScheduler(),
- queue_scheduler=RandomQueueScheduler())
-
- def _topicmanager(self):
- """
- Returns the configured L{TopicManager} instance to use.
- Can be overridden by subclasses that wish to change out any topic mgr parameters.
- @rtype: L{TopicManager}
- """
- return TopicManager()
+ def _topicmanager(self):
+ """
+ Returns the configured L{TopicManager} instance to use.
+ Can be overridden by subclasses that wish to change out any topic mgr parameters.
+ @rtype: L{TopicManager}
+ """
+ return TopicManager()
- def tearDown(self):
- for c in self.clients:
- c.close()
- self.server.shutdown() # server_close takes too much time
- self.server_thread.join()
- self.ready_event.clear()
- del self.server_thread
+ def tearDown(self):
+ for c in self.clients:
+ c.close()
+ self.server.shutdown() # server_close takes too much time
+ self.server_thread.join()
+ self.ready_event.clear()
+ del self.server_thread
- def _new_client(self, connect=True):
- """
- Get a new L{TestStompClient} connected to our test server.
- The client will also be registered for close in the tearDown method.
- @param connect: Whether to issue the CONNECT command.
- @type connect: C{bool}
- @rtype: L{TestStompClient}
- """
- client = TestStompClient(self.server_address)
- self.clients.append(client)
- if connect:
- client.connect()
- res = client.received_frames.get(timeout=1)
- self.assertEqual(res.cmd, frames.CONNECTED)
- return client
+ def _new_client(self, connect=True):
+ """
+ Get a new L{TestStompClient} connected to our test server.
+ The client will also be registered for close in the tearDown method.
+ @param connect: Whether to issue the CONNECT command.
+ @type connect: C{bool}
+ @rtype: L{TestStompClient}
+ """
+ client = TestStompClient(self.server_address)
+ self.clients.append(client)
+ if connect:
+ client.connect()
+ res = client.received_frames.get(timeout=1)
+ self.assertEqual(res.cmd, frames.CONNECTED)
+ return client
- def get_json(self, filename):
- filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "dummy_files", "stomp", filename)
+ def get_json(self, filename):
+ filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "dummy_files", "stomp", filename)
- with open(filepath) as f:
- return f.read()
+ with open(filepath) as f:
+ return f.read()
class TestStompServer(ThreadedStompServer):
@@ -151,7 +152,7 @@ class TestStompServer(ThreadedStompServer):
class TestStompClient(object):
"""
A stomp client for use in testing.
- This client spawns a listener thread and pushes anything that comes in onto the
+ This client spawns a listener thread and pushes anything that comes in onto the
read_frames queue.
@ivar received_frames: A queue of Frame instances that have been received.
@type received_frames: C{Queue.Queue} containing any received C{stompclient.frame.Frame}
@@ -227,4 +228,19 @@ class TestStompClient(object):
raise RuntimeError("Not connected")
self.connected = False
self.read_stopped.wait(timeout=0.5)
- self.sock.close()
\ No newline at end of file
+ self.sock.close()
+
+class TestCaseTcpConnection(ambari_stomp.Connection):
+ def __init__(self, url):
+ self.correlation_id = -1
+ ambari_stomp.Connection.__init__(self, host_and_ports=[('127.0.0.1', 21613)])
+
+ def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
+ self.correlation_id += 1
+ ambari_stomp.Connection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers)
+
+ def add_listener(self, listener):
+ self.set_listener(listener.__class__.__name__, listener)
+
+from ambari_agent import security
+security.AmbariStompConnection = TestCaseTcpConnection
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index d2a83ff..f5caa7b 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -27,6 +27,7 @@ from coilmq.util.frames import Frame
from BaseStompServerTestCase import BaseStompServerTestCase
from ambari_agent import HeartbeatThread
+from ambari_agent.InitializerModule import initializer_module
from mock.mock import MagicMock, patch
@@ -34,6 +35,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
def test_mock_server_can_start(self):
self.init_stdout_logger()
+ #initializer_module.server_hostname = 'gc6401'
+ #initializer_module.init()
+
self.remove(['/tmp/configurations.json', '/tmp/metadata.json', '/tmp/topology.json'])
heartbeat_thread = HeartbeatThread.HeartbeatThread()
@@ -49,7 +53,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
registration_frame = self.server.frames_queue.get()
# server sends registration response
- f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
self.server.topic_manager.send(f)
f = Frame(frames.MESSAGE, headers={'destination': '/events/configurations'}, body=self.get_json("configurations_update.json"))
@@ -67,26 +71,27 @@ class TestAgentStompResponses(BaseStompServerTestCase):
heartbeat_frame = self.server.frames_queue.get()
heartbeat_thread._stop.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
- self.assertEquals(heartbeat_thread.topology_cache['cl1']['topology']['hosts'][0]['hostname'], 'c6401.ambari.apache.org')
- self.assertEquals(heartbeat_thread.metadata_cache['cl1']['metadata']['status_commands_to_run'], ('STATUS',))
- self.assertEquals(heartbeat_thread.configurations_cache['cl1']['configurations']['zoo.cfg']['clientPort'], '2181')
+ self.assertEquals(initializer_module.topology_cache['cl1']['topology']['hosts'][0]['hostname'], 'c6401.ambari.apache.org')
+ self.assertEquals(initializer_module.metadata_cache['cl1']['metadata']['status_commands_to_run'], ('STATUS',))
+ self.assertEquals(initializer_module.configurations_cache['cl1']['configurations']['zoo.cfg']['clientPort'], '2181')
"""
============================================================================================
============================================================================================
"""
+ delattr(initializer_module,'_connection')
+ self.server.frames_queue.queue.clear()
+
heartbeat_thread = HeartbeatThread.HeartbeatThread()
heartbeat_thread.heartbeat_interval = 0
heartbeat_thread.start()
- self.server.frames_queue.queue.clear()
-
connect_frame = self.server.frames_queue.get()
users_subscribe_frame = self.server.frames_queue.get()
commands_subscribe_frame = self.server.frames_queue.get()
@@ -101,31 +106,17 @@ class TestAgentStompResponses(BaseStompServerTestCase):
self.assertEquals(clusters_hashes['topology_hash'], 'd14ca943e4a69ad0dd640f32d713d2b9')
# server sends registration response
- f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
self.server.topic_manager.send(f)
heartbeat_frame = self.server.frames_queue.get()
heartbeat_thread._stop.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
-
- def _other(self):
- f = Frame(frames.MESSAGE, headers={'destination': '/events/configurations'}, body=self.get_json("configurations_update.json"))
- self.server.topic_manager.send(f)
-
- f = Frame(frames.MESSAGE, headers={'destination': '/events/commands'}, body=self.get_json("execution_commands.json"))
- self.server.topic_manager.send(f)
-
- f = Frame(frames.MESSAGE, headers={'destination': '/events/metadata'}, body=self.get_json("metadata_update.json"))
- self.server.topic_manager.send(f)
-
- f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_update.json"))
- self.server.topic_manager.send(f)
-
def remove(self, filepathes):
for filepath in filepathes:
if os.path.isfile(filepath):
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
index 8416a27..91eaaac 100644
--- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
+++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
@@ -95,7 +95,7 @@ class WsTransport(Transport):
Transport.stop(self)
class WsConnection(BaseConnection, Protocol12):
- def __init__(self, url, wait_on_receipt=False):
+ def __init__(self, url):
self.transport = WsTransport(url)
self.transport.set_listener('ws-listener', self)
self.transactions = {}