You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by st...@apache.org on 2017/01/16 12:43:11 UTC
ambari git commit: AMBARI-19520. Ambari agents not recovering from
heart beat lost state immediately after successful re-registering with
server. (stoader)
Repository: ambari
Updated Branches:
refs/heads/trunk 5f222d9ce -> 816939498
AMBARI-19520. Ambari agents not recovering from heart beat lost state immediately after successful re-registering with server. (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/81693949
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/81693949
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/81693949
Branch: refs/heads/trunk
Commit: 8169394988ef1a06c2eb1013df46ee14445d49a9
Parents: 5f222d9
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Mon Jan 16 13:43:01 2017 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Mon Jan 16 13:43:01 2017 +0100
----------------------------------------------------------------------
.../src/main/python/ambari_agent/Controller.py | 63 ++++++++++++++------
.../src/main/python/ambari_agent/main.py | 18 +++---
2 files changed, 53 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/81693949/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 6b1b196..d00ffae 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -86,6 +86,10 @@ class Controller(threading.Thread):
self.max_reconnect_retry_delay = int(config.get('server','max_reconnect_retry_delay', default=30))
self.hasMappedComponents = True
self.statusCommandsExecutor = None
+
+ # this lock is used control which thread spawns/kills the StatusCommandExecutor child process
+ self.spawnKillStatusCommandExecutorLock = threading.RLock()
+
# Event is used for synchronizing heartbeat iterations (to make possible
# manual wait() interruption between heartbeats )
self.heartbeat_stop_callback = heartbeat_stop_callback
@@ -199,11 +203,9 @@ class Controller(threading.Thread):
self.config.update_configuration_from_registration(ret)
logger.debug("Updated config:" + str(self.config))
- if self.statusCommandsExecutor is None:
- self.spawnStatusCommandsExecutorProcess()
- elif self.statusCommandsExecutor.is_alive():
- logger.info("Terminating statusCommandsExecutor as agent re-registered with server.")
- self.killStatusCommandsExecutorProcess()
+ # Start StatusCommandExecutor child process or restart it if already running
+ # in order to receive up to date agent config.
+ self.spawnStatusCommandsExecutorProcess()
if 'statusCommands' in ret.keys():
logger.debug("Got status commands on registration.")
@@ -458,22 +460,43 @@ class Controller(threading.Thread):
self.DEBUG_STOP_HEARTBEATING=True
def spawnStatusCommandsExecutorProcess(self):
- # Re-create the status command queue as in case the consumer
- # process is killed the queue may deadlock (see http://bugs.python.org/issue20527).
- # The queue must be re-created by the producer process.
- if self.actionQueue.statusCommandQueue is not None:
- self.actionQueue.statusCommandQueue.close()
- self.actionQueue.statusCommandQueue.join_thread()
-
- self.actionQueue.statusCommandQueue = multiprocessing.Queue()
-
- self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
- self.statusCommandsExecutor.start()
+ '''
+ Starts a new StatusCommandExecutor child process. In case there is a running instance
+ already restarts it by simply killing it and starting new one.
+ This function is thread-safe.
+ '''
+ with self.getSpawnKillStatusCommandExecutorLock():
+ # if there is already an instance of StatusCommandExecutor kill it first
+ self.killStatusCommandsExecutorProcess()
+
+ # Re-create the status command queue as in case the consumer
+ # process is killed the queue may deadlock (see http://bugs.python.org/issue20527).
+ # The queue must be re-created by the producer process.
+ statusCommandQueue = self.actionQueue.statusCommandQueue
+ self.actionQueue.statusCommandQueue = multiprocessing.Queue()
+
+ if statusCommandQueue is not None:
+ statusCommandQueue.close()
+
+ logger.info("Spawning statusCommandsExecutor")
+ self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
+ self.statusCommandsExecutor.start()
def killStatusCommandsExecutorProcess(self):
- self.statusCommandsExecutor.kill()
-
-
+ '''
+ Kills the StatusExecutorChild process if exists. This function is thread-safe.
+ '''
+ with self.getSpawnKillStatusCommandExecutorLock():
+ if self.statusCommandsExecutor is not None and self.statusCommandsExecutor.is_alive():
+ logger.info("Terminating statusCommandsExecutor.")
+ self.statusCommandsExecutor.kill()
+
+ def getSpawnKillStatusCommandExecutorLock(self):
+ '''
+ Re-entrant lock to be used to synchronize the spawning or killing of
+ StatusCommandExecutor child process in multi-thread environment.
+ '''
+ return self.spawnKillStatusCommandExecutorLock;
def getStatusCommandsExecutor(self):
return self.statusCommandsExecutor
@@ -586,6 +609,8 @@ class Controller(threading.Thread):
except Exception, e:
logger.info("Exception in move_data_dir_mount_file(). Error: {0}".format(str(e)))
+
+
def main(argv=None):
# Allow Ctrl-C
http://git-wip-us.apache.org/repos/asf/ambari/blob/81693949/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 2e0517b..6927b15 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -309,15 +309,15 @@ def run_threads(server_hostname, heartbeat_stop_callback):
while controller.is_alive():
time.sleep(0.1)
- if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
- if controller.getStatusCommandsExecutor().is_alive():
- logger.info("Terminating statusCommandsExecutor")
- controller.killStatusCommandsExecutorProcess()
- logger.info("Respawning statusCommandsExecutor")
- controller.spawnStatusCommandsExecutorProcess()
-
- if controller.getStatusCommandsExecutor() is not None and controller.getStatusCommandsExecutor().is_alive():
- controller.killStatusCommandsExecutorProcess()
+ with controller.getSpawnKillStatusCommandExecutorLock():
+ # We need to lock as Controller.py may try to spawn StatusCommandExecutor child in parallel as well
+ if controller.getStatusCommandsExecutor() is not None \
+ and (not controller.getStatusCommandsExecutor().is_alive()
+ or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
+ controller.spawnStatusCommandsExecutorProcess()
+
+
+ controller.killStatusCommandsExecutorProcess()
# event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
# we need this for windows os, where no sigterm available