You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/09/24 22:15:45 UTC

[ambari] branch trunk updated: [AMBARI-24679] Fix race condition in agent during registration and topology updates. (#2368)

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5a9b6e0  [AMBARI-24679] Fix race condition in agent during registration and topology updates. (#2368)
5a9b6e0 is described below

commit 5a9b6e03bc27493f427043b0ae735b20815cfd1e
Author: avijayanhwx <av...@hortonworks.com>
AuthorDate: Mon Sep 24 15:15:39 2018 -0700

    [AMBARI-24679] Fix race condition in agent during registration and topology updates. (#2368)
---
 .../main/python/ambari_agent/HeartbeatThread.py    |  6 +++-
 .../main/python/ambari_agent/listeners/__init__.py | 38 ++++++++++++++++++++--
 2 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index b5510d1..ded5edd 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -145,7 +145,11 @@ class HeartbeatThread(threading.Thread):
           logger.exception("Exception while handing response to request at {0}. {1}".format(endpoint, response))
           raise
       finally:
-        listener.enabled = True
+        with listener.event_queue_lock:
+          logger.info("Enabling events for listener {0}".format(listener))
+          listener.enabled = True
+          # Process queued messages if any
+          listener.dequeue_unprocessed_events()
 
     self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
 
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index 1f541aa..b50bdaa 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -25,16 +25,40 @@ import copy
 from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
 from ambari_agent import Constants
 from ambari_agent.Utils import Utils
+from Queue import Queue
+import threading
 
 logger = logging.getLogger(__name__)
 
 class EventListener(ambari_stomp.ConnectionListener):
+
+  unprocessed_messages_queue = Queue(100)
+
   """
   Base abstract class for event listeners on specific topics.
   """
   def __init__(self, initializer_module):
     self.initializer_module = initializer_module
     self.enabled = True
+    self.event_queue_lock = threading.RLock()
+
+  def dequeue_unprocessed_events(self):
+    while not self.unprocessed_messages_queue.empty():
+      payload = self.unprocessed_messages_queue.get_nowait()
+      if payload:
+        logger.info("Processing event from unprocessed queue {0} {1}".format(payload[0], payload[1]))
+        destination = payload[0]
+        headers = payload[1]
+        message_json = payload[2]
+        message = payload[3]
+        try:
+          self.on_event(headers, message_json)
+        except Exception as ex:
+          logger.exception("Exception while handing event from {0} {1} {2}".format(destination, headers, message))
+          self.report_status_to_sender(headers, message, ex)
+        else:
+          self.report_status_to_sender(headers, message)
+
 
   def on_message(self, headers, message):
     """
@@ -58,13 +82,21 @@ class EventListener(ambari_stomp.ConnectionListener):
       logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, copy.deepcopy(message_json))))
 
       if not self.enabled:
-        logger.info("Ignoring event to {0} since event listener is disabled".format(destination))
-        return
+        with self.event_queue_lock:
+          if not self.enabled:
+            logger.info("Queuing event as unprocessed {0} since event "
+                        "listener is disabled".format(destination))
+            try:
+              self.unprocessed_messages_queue.put_nowait((destination, headers, message_json, message))
+            except Exception as ex:
+              logger.warning("Cannot queue any more unprocessed events since "
+                           "queue is full! {0} {1}".format(destination, message))
+            return
 
       try:
         self.on_event(headers, message_json)
       except Exception as ex:
-        logger.exception("Exception while handing event from {0} {1}".format(destination, headers, message))
+        logger.exception("Exception while handing event from {0} {1} {2}".format(destination, headers, message))
         self.report_status_to_sender(headers, message, ex)
       else:
         self.report_status_to_sender(headers, message)