You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/04/23 11:51:25 UTC
[ambari] branch trunk updated: AMBARI-23657. Ambari-agent should
handle delivery status responses from server (aonishuk)
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new b22d22c AMBARI-23657. Ambari-agent should handle delivery status responses from server (aonishuk)
b22d22c is described below
commit b22d22c556f50d58ac597a7636f4d3dd9bd192aa
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Mon Apr 23 13:05:54 2018 +0300
AMBARI-23657. Ambari-agent should handle delivery status responses from server (aonishuk)
---
.../python/ambari_agent/AlertStatusReporter.py | 7 ++---
.../main/python/ambari_agent/CommandStatusDict.py | 31 ++++++++++++++--------
.../python/ambari_agent/ComponentStatusExecutor.py | 5 +++-
.../main/python/ambari_agent/HeartbeatThread.py | 6 ++---
.../main/python/ambari_agent/HostStatusReporter.py | 9 +++++--
.../main/python/ambari_agent/InitializerModule.py | 3 +++
.../listeners/ServerResponsesListener.py | 23 +++++++++++++---
7 files changed, 60 insertions(+), 24 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
index cef0996..173c03b 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
@@ -40,6 +40,7 @@ class AlertStatusReporter(threading.Thread):
self.alert_reports_interval = initializer_module.config.alert_reports_interval
self.alert_definitions_cache = initializer_module.alert_definitions_cache
self.stale_alerts_monitor = initializer_module.stale_alerts_monitor
+ self.server_responses_listener = initializer_module.server_responses_listener
self.reported_alerts = defaultdict(lambda:defaultdict(lambda:[]))
threading.Thread.__init__(self)
@@ -57,12 +58,12 @@ class AlertStatusReporter(threading.Thread):
self.clean_not_existing_clusters_info()
alerts = self.collector.alerts()
self.stale_alerts_monitor.save_executed_alerts(alerts)
-
changed_alerts = self.get_changed_alerts(alerts)
if changed_alerts and self.initializer_module.is_registered:
- self.initializer_module.connection.send(message=changed_alerts, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT, log_message_function=AlertStatusReporter.log_sending)
- self.save_results(changed_alerts)
+ correlation_id = self.initializer_module.connection.send(message=changed_alerts, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT, log_message_function=AlertStatusReporter.log_sending)
+ self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.save_results(changed_alerts)
+
except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
pass
except:
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 0ddc1d9..49d925c 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -47,6 +47,7 @@ class CommandStatusDict():
self.lock = threading.RLock()
self.initializer_module = initializer_module
self.command_update_output = initializer_module.config.command_update_output
+ self.server_responses_listener = initializer_module.server_responses_listener
self.reported_reports = set()
@@ -63,30 +64,37 @@ class CommandStatusDict():
self.reported_reports.discard(key)
self.current_state.pop(key, None)
- is_sent = self.force_update_to_server({command['clusterId']: [report]})
+ is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]})
updatable = report['status'] == ActionQueue.IN_PROGRESS_STATUS and self.command_update_output
if not is_sent or updatable:
- # if sending is not successful send later
- with self.lock:
- self.current_state[key] = (command, report)
- self.reported_reports.discard(key)
+ self.queue_report_sending(key, command, report)
+ else:
+ self.server_responses_listener.listener_functions_on_error[correlation_id] = lambda headers, message: self.queue_report_sending(key, command, report)
+
+ def queue_report_sending(self, key, command, report):
+ with self.lock:
+ self.current_state[key] = (command, report)
+ self.reported_reports.discard(key)
def force_update_to_server(self, reports_dict):
if not self.initializer_module.is_registered:
- return False
+ return False, None
try:
- self.initializer_module.connection.send(message={'clusters':reports_dict}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT, log_message_function=CommandStatusDict.log_sending)
- return True
+ correlation_id = self.initializer_module.connection.send(message={'clusters':reports_dict}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT, log_message_function=CommandStatusDict.log_sending)
+ return True, correlation_id
except ConnectionIsAlreadyClosed:
- return False
+ return False, None
def report(self):
report = self.generate_report()
- if report and self.force_update_to_server(report):
- self.clear_reported_reports()
+ if report:
+ success, correlation_id = self.force_update_to_server(report)
+
+ if success:
+ self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.clear_reported_reports()
def get_command_status(self, taskId):
with self.lock:
@@ -99,6 +107,7 @@ class CommandStatusDict():
FAILED. Statuses for COMPLETE or FAILED commands are forgotten after
generation
"""
+ logger.info("Reporting {0}".format(self.current_state))
self.generated_reports = []
from ActionQueue import ActionQueue
with self.lock: # Synchronized
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index cebf190..e2c73bd 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -38,6 +38,7 @@ class ComponentStatusExecutor(threading.Thread):
self.stop_event = initializer_module.stop_event
self.recovery_manager = initializer_module.recovery_manager
self.reported_component_status = defaultdict(lambda:defaultdict(lambda:defaultdict(lambda:None))) # component statuses which were received by server
+ self.server_responses_listener = initializer_module.server_responses_listener
threading.Thread.__init__(self)
def run(self):
@@ -145,8 +146,10 @@ class ComponentStatusExecutor(threading.Thread):
if not cluster_reports or not self.initializer_module.is_registered:
return
- self.initializer_module.connection.send(message={'clusters': cluster_reports}, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
+ correlation_id = self.initializer_module.connection.send(message={'clusters': cluster_reports}, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
+ self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.save_reported_component_status(cluster_reports)
+ def save_reported_component_status(self, cluster_reports):
for cluster_id, reports in cluster_reports.iteritems():
for report in reports:
component_name = report['componentName']
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 4885e8b..0c04ad2 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -58,7 +58,7 @@ class HeartbeatThread(threading.Thread):
self.config = initializer_module.config
# listeners
- self.server_responses_listener = ServerResponsesListener()
+ self.server_responses_listener = initializer_module.server_responses_listener
self.commands_events_listener = CommandsEventListener(initializer_module.action_queue)
self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache)
self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
@@ -243,7 +243,7 @@ class HeartbeatThread(threading.Thread):
"""
def presend_hook(correlation_id):
if log_handler:
- self.server_responses_listener.logging_handlers[str(correlation_id)] = log_handler
+ self.server_responses_listener.logging_handlers[correlation_id] = log_handler
try:
correlation_id = self.connection.send(message=message, destination=destination, presend_hook=presend_hook)
@@ -253,6 +253,6 @@ class HeartbeatThread(threading.Thread):
raise
try:
- return self.server_responses_listener.responses.blocking_pop(str(correlation_id), timeout=timeout)
+ return self.server_responses_listener.responses.blocking_pop(correlation_id, timeout=timeout)
except BlockingDictionary.DictionaryPopTimeout:
raise Exception("{0} seconds timeout expired waiting for response from server at {1} to message from {2}".format(timeout, Constants.SERVER_RESPONSES_TOPIC, destination))
diff --git a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
index 622c4bd..c82fa9b 100644
--- a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
@@ -38,6 +38,7 @@ class HostStatusReporter(threading.Thread):
self.config = initializer_module.config
self.host_info = HostInfo(initializer_module.config)
self.last_report = {}
+ self.server_responses_listener = initializer_module.server_responses_listener
self.hardware = Hardware(config=initializer_module.config, cache_info=False)
threading.Thread.__init__(self)
@@ -48,8 +49,9 @@ class HostStatusReporter(threading.Thread):
report = self.get_report()
if self.initializer_module.is_registered and not Utils.are_dicts_equal(report, self.last_report, keys_to_skip=["agentTimeStampAtReporting"]):
- self.initializer_module.connection.send(message=report, destination=Constants.HOST_STATUS_REPORTS_ENDPOINT)
- self.last_report = report
+ correlation_id = self.initializer_module.connection.send(message=report, destination=Constants.HOST_STATUS_REPORTS_ENDPOINT)
+ self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.save_last_report(report)
+
except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
pass
except:
@@ -59,6 +61,9 @@ class HostStatusReporter(threading.Thread):
logger.info("HostStatusReporter has successfully finished")
+ def save_last_report(self, report):
+ self.last_report = report
+
def get_report(self):
host_info_dict = {}
self.host_info.register(host_info_dict)
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index fbfab47..61c1e65 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -36,6 +36,7 @@ from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
from ambari_agent.ConfigurationBuilder import ConfigurationBuilder
from ambari_agent.StaleAlertsMonitor import StaleAlertsMonitor
from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
+from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
logger = logging.getLogger(__name__)
@@ -65,6 +66,8 @@ class InitializerModule:
self.configuration_builder = ConfigurationBuilder(self)
self.stale_alerts_monitor = StaleAlertsMonitor(self)
+ self.server_responses_listener = ServerResponsesListener()
+
self.file_cache = FileCache(self.config)
self.customServiceOrchestrator = CustomServiceOrchestrator(self)
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
index bb0a746..5320ab7 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
@@ -31,9 +31,10 @@ class ServerResponsesListener(EventListener):
"""
Listener of Constants.SERVER_RESPONSES_TOPIC events from server.
"""
+ RESPONSE_STATUS_STRING = 'status'
+ RESPONSE_STATUS_SUCCESS = 'OK'
+
def __init__(self):
- self.listener_functions = {}
- self.logging_handlers = {}
self.reset_responses()
def on_event(self, headers, message):
@@ -46,12 +47,21 @@ class ServerResponsesListener(EventListener):
@param message: message payload dictionary
"""
if Constants.CORRELATION_ID_STRING in headers:
- correlation_id = headers[Constants.CORRELATION_ID_STRING]
+ correlation_id = int(headers[Constants.CORRELATION_ID_STRING])
self.responses.put(correlation_id, message)
if correlation_id in self.listener_functions:
self.listener_functions[correlation_id](headers, message)
del self.listener_functions[correlation_id]
+
+ if self.RESPONSE_STATUS_STRING in message and message[self.RESPONSE_STATUS_STRING] == self.RESPONSE_STATUS_SUCCESS:
+ if correlation_id in self.listener_functions_on_success:
+ self.listener_functions_on_success[correlation_id](headers, message)
+ del self.listener_functions_on_success[correlation_id]
+ else:
+ if correlation_id in self.listener_functions_on_error:
+ self.listener_functions_on_error[correlation_id](headers, message)
+ del self.listener_functions_on_error[correlation_id]
else:
logger.warn("Received a message from server without a '{0}' header. Ignoring the message".format(Constants.CORRELATION_ID_STRING))
@@ -76,8 +86,13 @@ class ServerResponsesListener(EventListener):
def reset_responses(self):
"""
- Clear responses dictionary
+ Resets data saved on per-response basis.
+ Should be called when correlactionIds are reset to 0 aka. re-registration case.
"""
self.responses = Utils.BlockingDictionary()
+ self.listener_functions_on_success = {}
+ self.listener_functions_on_error = {}
+ self.listener_functions = {}
+ self.logging_handlers = {}
--
To stop receiving notification emails like this one, please contact
aonishuk@apache.org.