You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/05/29 08:29:23 UTC
ambari git commit: AMBARI-21134. Get initial metadata, topology,
configs from another endpoint (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf b094c753e -> 6bad191bb
AMBARI-21134. Get initial metadata, topology, configs from another endpoint (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6bad191b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6bad191b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6bad191b
Branch: refs/heads/branch-3.0-perf
Commit: 6bad191bb3cf009cbe7d1d7a92942d47770ab31d
Parents: b094c75
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon May 29 11:28:44 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon May 29 11:28:44 2017 +0300
----------------------------------------------------------------------
.../main/python/ambari_agent/ClusterCache.py | 3 +-
.../python/ambari_agent/CommandStatusDict.py | 3 +-
.../ambari_agent/CommandStatusReporter.py | 7 +-
.../ambari_agent/ComponentStatusExecutor.py | 7 +-
.../src/main/python/ambari_agent/Constants.py | 10 +-
.../main/python/ambari_agent/HeartbeatThread.py | 48 ++++----
.../python/ambari_agent/InitializerModule.py | 2 +
.../src/main/python/ambari_agent/Utils.py | 3 +
.../listeners/ServerResponsesListener.py | 23 +++-
.../python/ambari_agent/listeners/__init__.py | 13 ++-
.../src/main/python/ambari_agent/security.py | 8 +-
.../ambari_agent/BaseStompServerTestCase.py | 35 +++++-
.../ambari_agent/TestAgentStompResponses.py | 115 ++++++++-----------
13 files changed, 169 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index 70b44b4..4b88f71 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -97,11 +97,10 @@ class ClusterCache(dict):
with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f:
json.dump(self, f, indent=2)
- def get_md5_hashsum(self, cluster_id):
+ def get_md5_hashsum(self):
"""
Thread-safe method for writing out the specified cluster cache
and updating the in-memory representation.
- :param cluster_id:
:param cache:
:return:
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index bb0cea3..afaf77d 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -21,7 +21,6 @@ limitations under the License.
import logging
import threading
import copy
-import json
from Grep import Grep
from ambari_agent import Constants
@@ -57,7 +56,7 @@ class CommandStatusDict():
self.force_update_to_server([new_report])
def force_update_to_server(self, reports):
- self.initializer_module.connection.send(body=json.dumps(reports), destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+ self.initializer_module.connection.send(message=reports, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
def get_command_status(self, taskId):
with self.lock:
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
index acee3b1..216f20b 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
-import json
import logging
import threading
@@ -43,10 +42,10 @@ class CommandStatusReporter(threading.Thread):
while not self.stop_event.is_set():
try:
- # TODO STOMP: what if not registered?
+ # TODO STOMP: if not registered, reports should not be on agent until next registration
report = self.commandStatuses.generate_report()
- if report:
- self.initializer_module.connection.send(body=json.dumps(report), destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+ if report and self.initializer_module.is_registered:
+ self.initializer_module.connection.send(message=report, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
self.stop_event.wait(self.command_reports_interval)
except:
logger.exception("Exception in CommandStatusReporter. Re-running it")
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 1f6a7dc..6783138 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
-import json
import random
import logging
import threading
@@ -98,8 +97,10 @@ class ComponentStatusExecutor(threading.Thread):
def send_updates_to_server(self, cluster_reports):
# TODO STOMP: override send to send dicts and lists? and not use json.dump
- # TODO STOMP: skip this if server is down?
- self.initializer_module.connection.send(body=json.dumps(cluster_reports), destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
+ if not cluster_reports or not self.initializer_module.is_registered:
+ return
+
+ self.initializer_module.connection.send(message=cluster_reports, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
for cluster_id, reports in cluster_reports.iteritems():
for report in reports:
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 6a054cc..e15d1d8 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -21,12 +21,16 @@ limitations under the License.
COMMANDS_TOPIC = '/user/commands'
CONFIGURATIONS_TOPIC = '/user/configs'
-METADATA_TOPIC = '/user/metadata'
-TOPOLOGIES_TOPIC = '/user/topologies'
+METADATA_TOPIC = '/events/metadata'
+TOPOLOGIES_TOPIC = '/events/topology'
SERVER_RESPONSES_TOPIC = '/user/'
-TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
+PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC]
+POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
+TOPOLOGY_REQUEST_ENDPOINT = '/agents/topology'
+METADATA_REQUEST_ENDPOINT = '/agents/metadata'
+CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 70fe7e7..e88fee7 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -18,11 +18,9 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
-import json
import logging
import ambari_stomp
import threading
-from collections import defaultdict
from ambari_agent import Constants
from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
@@ -41,7 +39,6 @@ class HeartbeatThread(threading.Thread):
"""
def __init__(self, initializer_module):
threading.Thread.__init__(self)
- self.is_registered = False
self.heartbeat_interval = HEARTBEAT_INTERVAL
self.stop_event = initializer_module.stop_event
@@ -55,6 +52,12 @@ class HeartbeatThread(threading.Thread):
self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
+ self.post_registration_requests = [
+ (Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, self.topology_events_listener),
+ (Constants.METADATA_REQUEST_ENDPOINT, initializer_module.metadata_cache, self.metadata_events_listener),
+ (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener)
+ ]
+
def run(self):
"""
@@ -63,7 +66,7 @@ class HeartbeatThread(threading.Thread):
# TODO STOMP: stop the thread on SIGTERM
while not self.stop_event.is_set():
try:
- if not self.is_registered:
+ if not self.initializer_module.is_registered:
self.register()
heartbeat_body = self.get_heartbeat_body()
@@ -75,16 +78,19 @@ class HeartbeatThread(threading.Thread):
# TODO STOMP: handle heartbeat reponse
except:
logger.exception("Exception in HeartbeatThread. Re-running the registration")
- # TODO STOMP: re-connect here
- self.is_registered = False
+ self.initializer_module.is_registered = False
+ self.initializer_module.connection.disconnect()
pass
+
+ self.initializer_module.connection.disconnect()
logger.info("HeartbeatThread has successfully finished")
def register(self):
"""
Subscribe to topics, register with server, wait for server's response.
"""
- self.subscribe_and_listen()
+ self.add_listeners()
+ self.subscribe_to_topics(Constants.PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE)
registration_request = self.get_registration_request()
logger.info("Sending registration request")
@@ -96,20 +102,19 @@ class HeartbeatThread(threading.Thread):
logger.debug("Registration response is {0}".format(response))
self.registration_response = response
- self.registered = True
+
+ for endpoint, cache, listener in self.post_registration_requests:
+ response = self.blocking_request({'hash': cache.get_md5_hashsum()}, endpoint)
+ listener.on_event({}, response)
+
+ self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
+ self.initializer_module.is_registered = True
def get_registration_request(self):
"""
Get registration request body to send it to server
"""
- request = {'clusters':defaultdict(lambda:{})}
-
- for cache in self.caches:
- cache_key_name = cache.get_cache_name() + '_hash'
- for cluster_id in cache.get_cluster_ids():
- request['clusters'][cluster_id][cache_key_name] = cache.get_md5_hashsum(cluster_id)
-
- return request
+ return {'registration-response':'true'}
def get_heartbeat_body(self):
"""
@@ -117,19 +122,20 @@ class HeartbeatThread(threading.Thread):
"""
return {'hostname':'true'}
- def subscribe_and_listen(self):
+ def add_listeners(self):
"""
Subscribe to topics and set listener classes.
"""
for listener in self.listeners:
self.initializer_module.connection.add_listener(listener)
- for topic_name in Constants.TOPICS_TO_SUBSCRIBE:
+ def subscribe_to_topics(self, topics_list):
+ for topic_name in topics_list:
self.initializer_module.connection.subscribe(destination=topic_name, id='sub', ack='client-individual')
- def blocking_request(self, body, destination):
+ def blocking_request(self, message, destination):
"""
Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id.
"""
- self.initializer_module.connection.send(body=json.dumps(body), destination=destination)
- return self.server_responses_listener.responses.blocking_pop(str(self.initializer_module.connection.correlation_id))
\ No newline at end of file
+ correlation_id = self.initializer_module.connection.send(message=message, destination=destination)
+ return self.server_responses_listener.responses.blocking_pop(str(correlation_id))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index c36bd68..4d0ac9b 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -63,6 +63,8 @@ class InitializerModule:
"""
self.stop_event = threading.Event()
+ self.is_registered = False
+
self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir)
self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir)
self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py
index d6f0294..e48aa5f 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -43,6 +43,9 @@ class BlockingDictionary():
"""
Block until a key in dictionary is available and than pop it.
"""
+ if key in self.dict:
+ return self.dict.pop(key)
+
while True:
self.put_event.wait()
self.put_event.clear()
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
index 8502507..6d23c37 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
@@ -33,6 +33,7 @@ class ServerResponsesListener(EventListener):
"""
def __init__(self):
self.responses = Utils.BlockingDictionary()
+ self.listener_functions = {}
def on_event(self, headers, message):
"""
@@ -44,9 +45,25 @@ class ServerResponsesListener(EventListener):
@param message: message payload dictionary
"""
if Constants.CORRELATION_ID_STRING in headers:
- self.responses.put(headers[Constants.CORRELATION_ID_STRING], message)
+ correlation_id = headers[Constants.CORRELATION_ID_STRING]
+ self.responses.put(correlation_id, message)
+
+ if correlation_id in self.listener_functions:
+ self.listener_functions[correlation_id](headers, message)
+ del self.listener_functions[correlation_id]
else:
- logger.warn("Received a message from server without a '{0}' header. Ignoring the message".format(Constants.CORRELATION_ID_STRING))\
+ logger.warn("Received a message from server without a '{0}' header. Ignoring the message".format(Constants.CORRELATION_ID_STRING))
def get_handled_path(self):
- return Constants.SERVER_RESPONSES_TOPIC
\ No newline at end of file
+ return Constants.SERVER_RESPONSES_TOPIC
+
+ def get_log_message(self, headers, message_json):
+ """
+ This string will be used to log received messsage of this type
+ """
+ if Constants.CORRELATION_ID_STRING in headers:
+ correlation_id = headers[Constants.CORRELATION_ID_STRING]
+ return " (correlation_id={0}): {1}".format(correlation_id, message_json)
+ return str(message_json)
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index 45b38ed..f05f8da 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -46,12 +46,11 @@ class EventListener(ambari_stomp.ConnectionListener):
logger.exception("Received event from server does not a valid json as a message. Message is:\n{0}".format(message))
return
- logger.info("Received event from {0}".format(destination))
- logger.debug("Received event from {0}: headers={1} ; message={2}".format(destination, headers, message))
+ logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, message_json)))
try:
self.on_event(headers, message_json)
except:
- logger.exception("Exception while handing event from {0}: headers={1} ; message={2}".format(destination, headers, message))
+ logger.exception("Exception while handing event from {0} {1}".format(destination, headers, message))
def on_event(self, headers, message):
"""
@@ -60,4 +59,10 @@ class EventListener(ambari_stomp.ConnectionListener):
@param headers: headers dictionary
@param message: message payload dictionary
"""
- raise NotImplementedError()
\ No newline at end of file
+ raise NotImplementedError()
+
+ def get_log_message(self, headers, message_json):
+ """
+ This string will be used to log received messsage of this type
+ """
+ return ": " + str(message_json)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py
index b3cb16e..df45699 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -106,10 +106,16 @@ class AmbariStompConnection(WsConnection):
self.correlation_id = -1
WsConnection.__init__(self, url)
- def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
+ def send(self, destination, message, content_type=None, headers=None, **keyword_headers):
self.correlation_id += 1
+
+ logger.info("Event to server at {0} (correlation_id={1}): {2}".format(destination, self.correlation_id, message))
+
+ body = json.dumps(message)
WsConnection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers)
+ return self.correlation_id
+
def add_listener(self, listener):
self.set_listener(listener.__class__.__name__, listener)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
index 671387a..7380727 100644
--- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
+++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
+import json
import ambari_stomp
import os
import sys
@@ -42,6 +43,8 @@ from coilmq.store.memory import MemoryQueue
from coilmq.scheduler import FavorReliableSubscriberScheduler, RandomQueueScheduler
from coilmq.protocol import STOMP10
+logger = logging.getLogger(__name__)
+
class BaseStompServerTestCase(unittest.TestCase):
"""
Base class for test cases provides the fixtures for setting up the multi-threaded
@@ -58,6 +61,7 @@ class BaseStompServerTestCase(unittest.TestCase):
self.ready_event = threading.Event()
addr_bound = threading.Event()
+ self.init_stdout_logger()
def start_server():
self.server = TestStompServer(('127.0.0.1', 21613),
@@ -123,6 +127,30 @@ class BaseStompServerTestCase(unittest.TestCase):
with open(filepath) as f:
return f.read()
+ def init_stdout_logger(self):
+ format='%(levelname)s %(asctime)s - %(message)s'
+
+ logger = logging.getLogger()
+ logger.setLevel(logging.INFO)
+ formatter = logging.Formatter(format)
+ chout = logging.StreamHandler(sys.stdout)
+ chout.setLevel(logging.INFO)
+ chout.setFormatter(formatter)
+ cherr = logging.StreamHandler(sys.stderr)
+ cherr.setLevel(logging.ERROR)
+ cherr.setFormatter(formatter)
+ logger.handlers = []
+ logger.addHandler(cherr)
+ logger.addHandler(chout)
+
+ logging.getLogger('stomp.py').setLevel(logging.WARN)
+ logging.getLogger('coilmq').setLevel(logging.INFO)
+
+ def remove_files(self, filepathes):
+ for filepath in filepathes:
+ if os.path.isfile(filepath):
+ os.remove(filepath)
+
class TestStompServer(ThreadedStompServer):
"""
@@ -235,9 +263,14 @@ class TestCaseTcpConnection(ambari_stomp.Connection):
self.correlation_id = -1
ambari_stomp.Connection.__init__(self, host_and_ports=[('127.0.0.1', 21613)])
- def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
+ def send(self, destination, message, content_type=None, headers=None, **keyword_headers):
self.correlation_id += 1
+
+ logger.info("Event to server at {0} (correlation_id={1}): {2}".format(destination, self.correlation_id, message))
+
+ body = json.dumps(message)
ambari_stomp.Connection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers)
+ return self.correlation_id
def add_listener(self, listener):
self.set_listener(listener.__class__.__name__, listener)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index 9d59222..1f3a6e7 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -21,6 +21,7 @@ import os
import sys
import logging
import json
+import time
from coilmq.util import frames
from coilmq.util.frames import Frame
@@ -38,9 +39,8 @@ class TestAgentStompResponses(BaseStompServerTestCase):
@patch.object(CustomServiceOrchestrator, "runCommand")
def test_mock_server_can_start(self, runCommand_mock):
runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
- self.init_stdout_logger()
- self.remove(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
+ self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
if not os.path.exists("/tmp/ambari-agent"):
os.mkdir("/tmp/ambari-agent")
@@ -52,48 +52,55 @@ class TestAgentStompResponses(BaseStompServerTestCase):
action_queue = initializer_module.action_queue
action_queue.start()
- connect_frame = self.server.frames_queue.get()
- users_subscribe_frame = self.server.frames_queue.get()
- commands_subscribe_frame = self.server.frames_queue.get()
- configurations_subscribe_frame = self.server.frames_queue.get()
- metadata_subscribe_frame = self.server.frames_queue.get()
- topologies_subscribe_frame = self.server.frames_queue.get()
- registration_frame = self.server.frames_queue.get()
-
component_status_executor = ComponentStatusExecutor(initializer_module)
component_status_executor.start()
command_status_reporter = CommandStatusReporter(initializer_module)
command_status_reporter.start()
- status_reports_frame = self.server.frames_queue.get()
+ connect_frame = self.server.frames_queue.get()
+ users_subscribe_frame = self.server.frames_queue.get()
+ registration_frame = self.server.frames_queue.get()
# server sends registration response
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/user/configs'}, body=self.get_json("configurations_update.json"))
+
+ # response to /initial_topology
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=self.get_json("topology_update.json"))
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/user/commands'}, body=self.get_json("execution_commands.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=self.get_json("metadata_after_registration.json"))
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/user/metadata'}, body=self.get_json("metadata_after_registration.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body=self.get_json("configurations_update.json"))
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/user/topologies'}, body=self.get_json("topology_update.json"))
+ initial_topology_request = self.server.frames_queue.get()
+ initial_metadata_request = self.server.frames_queue.get()
+ initial_configs_request = self.server.frames_queue.get()
+
+ while not initializer_module.is_registered:
+ time.sleep(0.1)
+
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/commands'}, body=self.get_json("execution_commands.json"))
self.server.topic_manager.send(f)
+ commands_subscribe_frame = self.server.frames_queue.get()
+ configurations_subscribe_frame = self.server.frames_queue.get()
+ metadata_subscribe_frame = self.server.frames_queue.get()
+ topologies_subscribe_frame = self.server.frames_queue.get()
heartbeat_frame = self.server.frames_queue.get()
- dn_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
- dn_status_failed_frame = json.loads(self.server.frames_queue.get().body)
- zk_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
- zk_status_failed_frame = json.loads(self.server.frames_queue.get().body)
+ dn_start_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+ dn_start_failed_frame = json.loads(self.server.frames_queue.get().body)
+ zk_start_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+ zk_start_failed_frame = json.loads(self.server.frames_queue.get().body)
action_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
action_status_failed_frame = json.loads(self.server.frames_queue.get().body)
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=json.dumps({'heartbeat-response':'true'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
@@ -104,10 +111,10 @@ class TestAgentStompResponses(BaseStompServerTestCase):
self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostname'], 'c6401.ambari.apache.org')
self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',))
self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181')
- self.assertEquals(dn_status_in_progress_frame[0]['roleCommand'], 'START')
- self.assertEquals(dn_status_in_progress_frame[0]['role'], 'DATANODE')
- self.assertEquals(dn_status_in_progress_frame[0]['status'], 'IN_PROGRESS')
- self.assertEquals(dn_status_failed_frame[0]['status'], 'FAILED')
+ self.assertEquals(dn_start_in_progress_frame[0]['roleCommand'], 'START')
+ self.assertEquals(dn_start_in_progress_frame[0]['role'], 'DATANODE')
+ self.assertEquals(dn_start_in_progress_frame[0]['status'], 'IN_PROGRESS')
+ self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED')
"""
============================================================================================
@@ -123,62 +130,42 @@ class TestAgentStompResponses(BaseStompServerTestCase):
action_queue = initializer_module.action_queue
action_queue.start()
- connect_frame = self.server.frames_queue.get()
- users_subscribe_frame = self.server.frames_queue.get()
- commands_subscribe_frame = self.server.frames_queue.get()
- configurations_subscribe_frame = self.server.frames_queue.get()
- metadata_subscribe_frame = self.server.frames_queue.get()
- topologies_subscribe_frame = self.server.frames_queue.get()
- registration_frame_json = json.loads(self.server.frames_queue.get().body)
- clusters_hashes = registration_frame_json['clusters']['0']
-
component_status_executor = ComponentStatusExecutor(initializer_module)
component_status_executor.start()
command_status_reporter = CommandStatusReporter(initializer_module)
command_status_reporter.start()
- status_reports_frame = self.server.frames_queue.get()
-
- self.assertEquals(clusters_hashes['metadata_hash'], '21724f6ffa7aff0fe91a0c0c5b765dba')
- self.assertEquals(clusters_hashes['configurations_hash'], '04c968412ded7c8ffe7858036bae03ce')
- self.assertEquals(clusters_hashes['topology_hash'], '0de1df56fd594873fe594cf02ea61f4b')
+ connect_frame = self.server.frames_queue.get()
+ users_subscribe_frame = self.server.frames_queue.get()
+ registration_frame = self.server.frames_queue.get()
# server sends registration response
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
self.server.topic_manager.send(f)
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body='{}')
+ self.server.topic_manager.send(f)
+
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body='{}')
+ self.server.topic_manager.send(f)
+
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
+ self.server.topic_manager.send(f)
+
+ commands_subscribe_frame = self.server.frames_queue.get()
+ configurations_subscribe_frame = self.server.frames_queue.get()
+ metadata_subscribe_frame = self.server.frames_queue.get()
+ topologies_subscribe_frame = self.server.frames_queue.get()
heartbeat_frame = self.server.frames_queue.get()
+ status_reports_frame = self.server.frames_queue.get()
+
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=json.dumps({'heartbeat-response':'true'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
component_status_executor.join()
command_status_reporter.join()
- action_queue.join()
-
- def remove(self, filepathes):
- for filepath in filepathes:
- if os.path.isfile(filepath):
- os.remove(filepath)
-
- def init_stdout_logger(self):
- format='%(levelname)s %(asctime)s - %(message)s'
-
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- formatter = logging.Formatter(format)
- chout = logging.StreamHandler(sys.stdout)
- chout.setLevel(logging.INFO)
- chout.setFormatter(formatter)
- cherr = logging.StreamHandler(sys.stderr)
- cherr.setLevel(logging.ERROR)
- cherr.setFormatter(formatter)
- logger.handlers = []
- logger.addHandler(cherr)
- logger.addHandler(chout)
-
- logging.getLogger('stomp.py').setLevel(logging.WARN)
- logging.getLogger('coilmq').setLevel(logging.INFO)
\ No newline at end of file
+ action_queue.join()
\ No newline at end of file