You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by st...@apache.org on 2017/01/16 12:43:11 UTC

ambari git commit: AMBARI-19520. Ambari agents not recovering from heart beat lost state immediately after successful re-registering with server. (stoader)

Repository: ambari
Updated Branches:
  refs/heads/trunk 5f222d9ce -> 816939498


AMBARI-19520. Ambari agents not recovering from heart beat lost state immediately after successful re-registering with server. (stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/81693949
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/81693949
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/81693949

Branch: refs/heads/trunk
Commit: 8169394988ef1a06c2eb1013df46ee14445d49a9
Parents: 5f222d9
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Mon Jan 16 13:43:01 2017 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Mon Jan 16 13:43:01 2017 +0100

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/Controller.py  | 63 ++++++++++++++------
 .../src/main/python/ambari_agent/main.py        | 18 +++---
 2 files changed, 53 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/81693949/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 6b1b196..d00ffae 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -86,6 +86,10 @@ class Controller(threading.Thread):
     self.max_reconnect_retry_delay = int(config.get('server','max_reconnect_retry_delay', default=30))
     self.hasMappedComponents = True
     self.statusCommandsExecutor = None
+
+    # this lock is used control which thread spawns/kills the StatusCommandExecutor child process
+    self.spawnKillStatusCommandExecutorLock = threading.RLock()
+
     # Event is used for synchronizing heartbeat iterations (to make possible
     # manual wait() interruption between heartbeats )
     self.heartbeat_stop_callback = heartbeat_stop_callback
@@ -199,11 +203,9 @@ class Controller(threading.Thread):
         self.config.update_configuration_from_registration(ret)
         logger.debug("Updated config:" + str(self.config))
 
-        if self.statusCommandsExecutor is None:
-          self.spawnStatusCommandsExecutorProcess()
-        elif self.statusCommandsExecutor.is_alive():
-          logger.info("Terminating statusCommandsExecutor as agent re-registered with server.")
-          self.killStatusCommandsExecutorProcess()
+        # Start StatusCommandExecutor child process or restart it if already running
+        # in order to receive up to date agent config.
+        self.spawnStatusCommandsExecutorProcess()
 
         if 'statusCommands' in ret.keys():
           logger.debug("Got status commands on registration.")
@@ -458,22 +460,43 @@ class Controller(threading.Thread):
         self.DEBUG_STOP_HEARTBEATING=True
 
   def spawnStatusCommandsExecutorProcess(self):
-    # Re-create the status command queue as in case the consumer
-    # process is killed the queue may deadlock (see http://bugs.python.org/issue20527).
-    # The queue must be re-created by the producer process.
-    if self.actionQueue.statusCommandQueue is not None:
-      self.actionQueue.statusCommandQueue.close()
-      self.actionQueue.statusCommandQueue.join_thread()
-
-    self.actionQueue.statusCommandQueue = multiprocessing.Queue()
-
-    self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
-    self.statusCommandsExecutor.start()
+    '''
+    Starts a new StatusCommandExecutor child process. In case there is a running instance
+     already restarts it by simply killing it and starting new one.
+     This function is thread-safe.
+    '''
+    with self.getSpawnKillStatusCommandExecutorLock():
+      # if there is already an instance of StatusCommandExecutor kill it first
+      self.killStatusCommandsExecutorProcess()
+
+      # Re-create the status command queue as in case the consumer
+      # process is killed the queue may deadlock (see http://bugs.python.org/issue20527).
+      # The queue must be re-created by the producer process.
+      statusCommandQueue = self.actionQueue.statusCommandQueue
+      self.actionQueue.statusCommandQueue = multiprocessing.Queue()
+
+      if statusCommandQueue is not None:
+        statusCommandQueue.close()
+
+      logger.info("Spawning statusCommandsExecutor")
+      self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
+      self.statusCommandsExecutor.start()
 
   def killStatusCommandsExecutorProcess(self):
-    self.statusCommandsExecutor.kill()
-
-
+    '''
+    Kills the StatusExecutorChild process if exists. This function is thread-safe.
+    '''
+    with self.getSpawnKillStatusCommandExecutorLock():
+      if self.statusCommandsExecutor is not None and self.statusCommandsExecutor.is_alive():
+        logger.info("Terminating statusCommandsExecutor.")
+        self.statusCommandsExecutor.kill()
+
+  def getSpawnKillStatusCommandExecutorLock(self):
+    '''
+    Re-entrant lock to be used to synchronize the spawning or killing of
+    StatusCommandExecutor child process in multi-thread environment.
+    '''
+    return self.spawnKillStatusCommandExecutorLock;
 
   def getStatusCommandsExecutor(self):
     return self.statusCommandsExecutor
@@ -586,6 +609,8 @@ class Controller(threading.Thread):
     except Exception, e:
       logger.info("Exception in move_data_dir_mount_file(). Error: {0}".format(str(e)))
 
+
+
 def main(argv=None):
   # Allow Ctrl-C
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/81693949/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 2e0517b..6927b15 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -309,15 +309,15 @@ def run_threads(server_hostname, heartbeat_stop_callback):
   while controller.is_alive():
     time.sleep(0.1)
 
-    if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
-      if controller.getStatusCommandsExecutor().is_alive():
-        logger.info("Terminating statusCommandsExecutor")
-        controller.killStatusCommandsExecutorProcess()
-      logger.info("Respawning statusCommandsExecutor")
-      controller.spawnStatusCommandsExecutorProcess()
-
-  if controller.getStatusCommandsExecutor() is not None and controller.getStatusCommandsExecutor().is_alive():
-    controller.killStatusCommandsExecutorProcess()
+    with controller.getSpawnKillStatusCommandExecutorLock():
+      # We need to lock as Controller.py may try to spawn StatusCommandExecutor child in parallel as well
+      if controller.getStatusCommandsExecutor() is not None \
+          and (not controller.getStatusCommandsExecutor().is_alive()
+              or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
+        controller.spawnStatusCommandsExecutorProcess()
+
+
+  controller.killStatusCommandsExecutorProcess()
 
 # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
 # we need this for windows os, where no sigterm available