You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/04/23 11:51:25 UTC

[ambari] branch trunk updated: AMBARI-23657. Ambari-agent should handle delivery status responses from server (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b22d22c  AMBARI-23657. Ambari-agent should handle delivery status responses from server (aonishuk)
b22d22c is described below

commit b22d22c556f50d58ac597a7636f4d3dd9bd192aa
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Mon Apr 23 13:05:54 2018 +0300

    AMBARI-23657. Ambari-agent should handle delivery status responses from server (aonishuk)
---
 .../python/ambari_agent/AlertStatusReporter.py     |  7 ++---
 .../main/python/ambari_agent/CommandStatusDict.py  | 31 ++++++++++++++--------
 .../python/ambari_agent/ComponentStatusExecutor.py |  5 +++-
 .../main/python/ambari_agent/HeartbeatThread.py    |  6 ++---
 .../main/python/ambari_agent/HostStatusReporter.py |  9 +++++--
 .../main/python/ambari_agent/InitializerModule.py  |  3 +++
 .../listeners/ServerResponsesListener.py           | 23 +++++++++++++---
 7 files changed, 60 insertions(+), 24 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
index cef0996..173c03b 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
@@ -40,6 +40,7 @@ class AlertStatusReporter(threading.Thread):
     self.alert_reports_interval = initializer_module.config.alert_reports_interval
     self.alert_definitions_cache = initializer_module.alert_definitions_cache
     self.stale_alerts_monitor = initializer_module.stale_alerts_monitor
+    self.server_responses_listener = initializer_module.server_responses_listener
     self.reported_alerts = defaultdict(lambda:defaultdict(lambda:[]))
     threading.Thread.__init__(self)
 
@@ -57,12 +58,12 @@ class AlertStatusReporter(threading.Thread):
           self.clean_not_existing_clusters_info()
           alerts = self.collector.alerts()
           self.stale_alerts_monitor.save_executed_alerts(alerts)
-
           changed_alerts = self.get_changed_alerts(alerts)
 
           if changed_alerts and self.initializer_module.is_registered:
-            self.initializer_module.connection.send(message=changed_alerts, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT, log_message_function=AlertStatusReporter.log_sending)
-            self.save_results(changed_alerts)
+            correlation_id = self.initializer_module.connection.send(message=changed_alerts, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT, log_message_function=AlertStatusReporter.log_sending)
+            self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.save_results(changed_alerts)
+
       except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
         pass
       except:
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 0ddc1d9..49d925c 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -47,6 +47,7 @@ class CommandStatusDict():
     self.lock = threading.RLock()
     self.initializer_module = initializer_module
     self.command_update_output = initializer_module.config.command_update_output
+    self.server_responses_listener = initializer_module.server_responses_listener
     self.reported_reports = set()
 
 
@@ -63,30 +64,37 @@ class CommandStatusDict():
       self.reported_reports.discard(key)
       self.current_state.pop(key, None)
 
-    is_sent = self.force_update_to_server({command['clusterId']: [report]})
+    is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]})
     updatable = report['status'] == ActionQueue.IN_PROGRESS_STATUS and self.command_update_output
 
     if not is_sent or updatable:
-      # if sending is not successful send later
-      with self.lock:
-        self.current_state[key] = (command, report)
-        self.reported_reports.discard(key)
+      self.queue_report_sending(key, command, report)
+    else:
+      self.server_responses_listener.listener_functions_on_error[correlation_id] = lambda headers, message: self.queue_report_sending(key, command, report)
+
+  def queue_report_sending(self, key, command, report):
+    with self.lock:
+      self.current_state[key] = (command, report)
+      self.reported_reports.discard(key)
 
   def force_update_to_server(self, reports_dict):
     if not self.initializer_module.is_registered:
-      return False
+      return False, None
 
     try:
-      self.initializer_module.connection.send(message={'clusters':reports_dict}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT, log_message_function=CommandStatusDict.log_sending)
-      return True
+      correlation_id = self.initializer_module.connection.send(message={'clusters':reports_dict}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT, log_message_function=CommandStatusDict.log_sending)
+      return True, correlation_id
     except ConnectionIsAlreadyClosed:
-      return False
+      return False, None
 
   def report(self):
     report = self.generate_report()
 
-    if report and self.force_update_to_server(report):
-      self.clear_reported_reports()
+    if report:
+      success, correlation_id = self.force_update_to_server(report)
+
+      if success:
+        self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.clear_reported_reports()
 
   def get_command_status(self, taskId):
     with self.lock:
@@ -99,6 +107,7 @@ class CommandStatusDict():
     FAILED. Statuses for COMPLETE or FAILED commands are forgotten after
     generation
     """
+    logger.info("Reporting {0}".format(self.current_state))
     self.generated_reports = []
     from ActionQueue import ActionQueue
     with self.lock: # Synchronized
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index cebf190..e2c73bd 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -38,6 +38,7 @@ class ComponentStatusExecutor(threading.Thread):
     self.stop_event = initializer_module.stop_event
     self.recovery_manager = initializer_module.recovery_manager
     self.reported_component_status = defaultdict(lambda:defaultdict(lambda:defaultdict(lambda:None))) # component statuses which were received by server
+    self.server_responses_listener = initializer_module.server_responses_listener
     threading.Thread.__init__(self)
 
   def run(self):
@@ -145,8 +146,10 @@ class ComponentStatusExecutor(threading.Thread):
     if not cluster_reports or not self.initializer_module.is_registered:
       return
 
-    self.initializer_module.connection.send(message={'clusters': cluster_reports}, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
+    correlation_id = self.initializer_module.connection.send(message={'clusters': cluster_reports}, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
+    self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.save_reported_component_status(cluster_reports)
 
+  def save_reported_component_status(self, cluster_reports):
     for cluster_id, reports in cluster_reports.iteritems():
       for report in reports:
         component_name = report['componentName']
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 4885e8b..0c04ad2 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -58,7 +58,7 @@ class HeartbeatThread(threading.Thread):
     self.config = initializer_module.config
 
     # listeners
-    self.server_responses_listener = ServerResponsesListener()
+    self.server_responses_listener = initializer_module.server_responses_listener
     self.commands_events_listener = CommandsEventListener(initializer_module.action_queue)
     self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache)
     self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
@@ -243,7 +243,7 @@ class HeartbeatThread(threading.Thread):
     """
     def presend_hook(correlation_id):
       if log_handler:
-        self.server_responses_listener.logging_handlers[str(correlation_id)] = log_handler 
+        self.server_responses_listener.logging_handlers[correlation_id] = log_handler
            
     try:
       correlation_id = self.connection.send(message=message, destination=destination, presend_hook=presend_hook)
@@ -253,6 +253,6 @@ class HeartbeatThread(threading.Thread):
       raise
 
     try:
-      return self.server_responses_listener.responses.blocking_pop(str(correlation_id), timeout=timeout)
+      return self.server_responses_listener.responses.blocking_pop(correlation_id, timeout=timeout)
     except BlockingDictionary.DictionaryPopTimeout:
       raise Exception("{0} seconds timeout expired waiting for response from server at {1} to message from {2}".format(timeout, Constants.SERVER_RESPONSES_TOPIC, destination))
diff --git a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
index 622c4bd..c82fa9b 100644
--- a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
@@ -38,6 +38,7 @@ class HostStatusReporter(threading.Thread):
     self.config = initializer_module.config
     self.host_info = HostInfo(initializer_module.config)
     self.last_report = {}
+    self.server_responses_listener = initializer_module.server_responses_listener
     self.hardware = Hardware(config=initializer_module.config, cache_info=False)
     threading.Thread.__init__(self)
 
@@ -48,8 +49,9 @@ class HostStatusReporter(threading.Thread):
           report = self.get_report()
 
           if self.initializer_module.is_registered and not Utils.are_dicts_equal(report, self.last_report, keys_to_skip=["agentTimeStampAtReporting"]):
-            self.initializer_module.connection.send(message=report, destination=Constants.HOST_STATUS_REPORTS_ENDPOINT)
-            self.last_report = report
+            correlation_id = self.initializer_module.connection.send(message=report, destination=Constants.HOST_STATUS_REPORTS_ENDPOINT)
+            self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.save_last_report(report)
+
       except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
         pass
       except:
@@ -59,6 +61,9 @@ class HostStatusReporter(threading.Thread):
 
     logger.info("HostStatusReporter has successfully finished")
 
+  def save_last_report(self, report):
+    self.last_report = report
+
   def get_report(self):
     host_info_dict = {}
     self.host_info.register(host_info_dict)
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index fbfab47..61c1e65 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -36,6 +36,7 @@ from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
 from ambari_agent.ConfigurationBuilder import ConfigurationBuilder
 from ambari_agent.StaleAlertsMonitor import StaleAlertsMonitor
 from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
+from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
 
 logger = logging.getLogger(__name__)
 
@@ -65,6 +66,8 @@ class InitializerModule:
     self.configuration_builder = ConfigurationBuilder(self)
     self.stale_alerts_monitor = StaleAlertsMonitor(self)
 
+    self.server_responses_listener = ServerResponsesListener()
+
     self.file_cache = FileCache(self.config)
 
     self.customServiceOrchestrator = CustomServiceOrchestrator(self)
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
index bb0a746..5320ab7 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
@@ -31,9 +31,10 @@ class ServerResponsesListener(EventListener):
   """
   Listener of Constants.SERVER_RESPONSES_TOPIC events from server.
   """
+  RESPONSE_STATUS_STRING = 'status'
+  RESPONSE_STATUS_SUCCESS = 'OK'
+
   def __init__(self):
-    self.listener_functions = {}
-    self.logging_handlers = {}
     self.reset_responses()
 
   def on_event(self, headers, message):
@@ -46,12 +47,21 @@ class ServerResponsesListener(EventListener):
     @param message: message payload dictionary
     """
     if Constants.CORRELATION_ID_STRING in headers:
-      correlation_id = headers[Constants.CORRELATION_ID_STRING]
+      correlation_id = int(headers[Constants.CORRELATION_ID_STRING])
       self.responses.put(correlation_id, message)
 
       if correlation_id in self.listener_functions:
         self.listener_functions[correlation_id](headers, message)
         del self.listener_functions[correlation_id]
+
+      if self.RESPONSE_STATUS_STRING in message and message[self.RESPONSE_STATUS_STRING] == self.RESPONSE_STATUS_SUCCESS:
+        if correlation_id in self.listener_functions_on_success:
+          self.listener_functions_on_success[correlation_id](headers, message)
+          del self.listener_functions_on_success[correlation_id]
+      else:
+        if correlation_id in self.listener_functions_on_error:
+          self.listener_functions_on_error[correlation_id](headers, message)
+          del self.listener_functions_on_error[correlation_id]
     else:
       logger.warn("Received a message from server without a '{0}' header. Ignoring the message".format(Constants.CORRELATION_ID_STRING))
 
@@ -76,8 +86,13 @@ class ServerResponsesListener(EventListener):
 
   def reset_responses(self):
     """
-    Clear responses dictionary
+    Resets data saved on per-response basis.
+    Should be called when correlactionIds are reset to 0 aka. re-registration case.
     """
     self.responses = Utils.BlockingDictionary()
+    self.listener_functions_on_success = {}
+    self.listener_functions_on_error = {}
+    self.listener_functions = {}
+    self.logging_handlers = {}
 
 

-- 
To stop receiving notification emails like this one, please contact
aonishuk@apache.org.