You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/09/04 14:39:06 UTC

[ambari] branch trunk updated: AMBARI-24583. Ambari agent status could be reported stale just after execution command thread has finished execution (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a41262e  AMBARI-24583. Ambari agent status could be reported stale just after execution command thread has finished execution (aonishuk)
a41262e is described below

commit a41262e2fa1728f665bc16a4aaf00e389b000f11
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Sun Sep 2 12:37:48 2018 +0300

    AMBARI-24583. Ambari agent status could be reported stale just after execution command thread has finished execution (aonishuk)
---
 .../python/ambari_agent/ComponentStatusExecutor.py | 35 ++++++++++++++++++++++
 1 file changed, 35 insertions(+)

diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 64e6ae7..df72c88 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -21,10 +21,12 @@ import threading
 
 from ambari_agent import Constants
 from ambari_agent.LiveStatus import LiveStatus
+from ambari_agent.Utils import Utils
 from collections import defaultdict
 
 from ambari_agent.models.commands import AgentCommand
 from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
+from resource_management.libraries.functions.default import default
 
 
 class ComponentStatusExecutor(threading.Thread):
@@ -39,6 +41,8 @@ class ComponentStatusExecutor(threading.Thread):
     self.reported_component_status = defaultdict(lambda:defaultdict(lambda:defaultdict(lambda:None))) # component statuses which were received by server
     self.server_responses_listener = initializer_module.server_responses_listener
     self.logger = logging.getLogger(__name__)
+    self.reports_to_discard = []
+    self.reports_to_discard_lock = threading.RLock()
     threading.Thread.__init__(self)
 
   def run(self):
@@ -54,6 +58,9 @@ class ComponentStatusExecutor(threading.Thread):
         self.clean_not_existing_clusters_info()
         cluster_reports = defaultdict(lambda:[])
 
+        with self.reports_to_discard_lock:
+          self.reports_to_discard = []
+
         for cluster_id in self.topology_cache.get_cluster_ids():
           # TODO: check if we can make clusters immutable too
           try:
@@ -104,6 +111,8 @@ class ComponentStatusExecutor(threading.Thread):
               if result:
                 cluster_reports[cluster_id].append(result)
 
+
+        cluster_reports = self.discard_stale_reports(cluster_reports)
         self.send_updates_to_server(cluster_reports)
       except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
         pass
@@ -113,6 +122,29 @@ class ComponentStatusExecutor(threading.Thread):
       self.stop_event.wait(self.status_commands_run_interval)
     self.logger.info("ComponentStatusExecutor has successfully finished")
 
+  def discard_stale_reports(self, cluster_reports):
+    """
+    Remove reports which are already stale (meaning other process has already updated status to something different)
+    """
+    with self.reports_to_discard_lock:
+      # nothing to discard
+      if not self.reports_to_discard:
+        return cluster_reports
+
+      reports_to_discard = self.reports_to_discard[:]
+
+    new_cluster_reports = defaultdict(lambda:[])
+    for cluster_id, cluster_reports in cluster_reports.iteritems():
+      for cluster_report in cluster_reports:
+        for discarded_report in reports_to_discard:
+          if Utils.are_dicts_equal(cluster_report, discarded_report, keys_to_skip=['status']):
+            self.logger.info("Discarding outdated status {0} before sending".format(cluster_report))
+            break
+        else:
+          new_cluster_reports[cluster_id].append(cluster_report)
+
+    return new_cluster_reports
+
   def check_component_status(self, cluster_id, service_name, component_name, command_name, report=False):
     """
     Returns components status if it has changed, otherwise None.
@@ -151,6 +183,9 @@ class ComponentStatusExecutor(threading.Thread):
       self.recovery_manager.handle_status_change(component_name, status)
 
       if report:
+        with self.reports_to_discard_lock:
+          self.reports_to_discard.append(result)
+
         self.send_updates_to_server({cluster_id: [result]})
 
       return result