You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by st...@apache.org on 2017/01/18 08:42:45 UTC

[4/5] ambari git commit: AMBARI-19416. Ambari agents remain in heartbeat lost state after ambari server restart. (stoader)

AMBARI-19416. Ambari agents remain in heartbeat lost state after ambari server restart. (stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/36f74224
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/36f74224
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/36f74224

Branch: refs/heads/branch-2.5
Commit: 36f742246530af98913051bb27dcd3b20368e474
Parents: b5d3e07
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Mon Jan 9 13:18:53 2017 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Wed Jan 18 09:41:55 2017 +0100

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py    |  5 ++++-
 .../src/main/python/ambari_agent/Controller.py     | 17 ++++++++++++++++-
 ambari-agent/src/main/python/ambari_agent/main.py  |  5 +++--
 .../src/test/python/ambari_agent/TestHeartbeat.py  |  2 ++
 .../src/test/python/ambari_agent/TestMain.py       |  4 +++-
 5 files changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/36f74224/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 3726286..18d7c2a 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -75,7 +75,10 @@ class ActionQueue(threading.Thread):
   def __init__(self, config, controller):
     super(ActionQueue, self).__init__()
     self.commandQueue = Queue.Queue()
-    self.statusCommandQueue = multiprocessing.Queue()
+    self.statusCommandQueue = None # the queue this field points to is re-created whenever
+                                   # a new StatusCommandExecutor child process is spawned
+                                   # by Controller
+    # multiprocessing.Queue()
     self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor.
     self.backgroundCommandQueue = Queue.Queue()
     self.commandStatuses = CommandStatusDict(callback_action =

http://git-wip-us.apache.org/repos/asf/ambari/blob/36f74224/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 11b98f4..2244d30 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
+import multiprocessing
 import logging
 import ambari_simplejson as json
 import sys
@@ -202,7 +203,7 @@ class Controller(threading.Thread):
           self.spawnStatusCommandsExecutorProcess()
         elif self.statusCommandsExecutor.is_alive():
           logger.info("Terminating statusCommandsExecutor as agent re-registered with server.")
-          self.statusCommandsExecutor.kill()
+          self.killStatusCommandsExecutorProcess()
 
         if 'statusCommands' in ret.keys():
           logger.debug("Got status commands on registration.")
@@ -457,9 +458,23 @@ class Controller(threading.Thread):
         self.DEBUG_STOP_HEARTBEATING=True
 
   def spawnStatusCommandsExecutorProcess(self):
+    # Re-create the status command queue as in case the consumer
+    # process is killed the queue may deadlock (see http://bugs.python.org/issue20527).
+    # The queue must be re-created by the producer process.
+    if self.actionQueue.statusCommandQueue is not None:
+      self.actionQueue.statusCommandQueue.close()
+      self.actionQueue.statusCommandQueue.join_thread()
+
+    self.actionQueue.statusCommandQueue = multiprocessing.Queue()
+
     self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
     self.statusCommandsExecutor.start()
 
+  def killStatusCommandsExecutorProcess(self):
+    self.statusCommandsExecutor.kill()
+
+
+
   def getStatusCommandsExecutor(self):
     return self.statusCommandsExecutor
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/36f74224/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 968b828..2e1124e 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -294,11 +294,12 @@ def run_threads(server_hostname, heartbeat_stop_callback):
     if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
       if controller.getStatusCommandsExecutor().is_alive():
         logger.info("Terminating statusCommandsExecutor")
-        controller.getStatusCommandsExecutor().kill()
+        controller.killStatusCommandsExecutorProcess()
       logger.info("Respawning statusCommandsExecutor")
       controller.spawnStatusCommandsExecutorProcess()
 
-  controller.getStatusCommandsExecutor().kill()
+  if controller.getStatusCommandsExecutor() is not None and controller.getStatusCommandsExecutor().is_alive():
+    controller.killStatusCommandsExecutorProcess()
 
 # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
 # we need this for windows os, where no sigterm available

http://git-wip-us.apache.org/repos/asf/ambari/blob/36f74224/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index 19fad56..de07743 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -24,6 +24,7 @@ import tempfile
 from mock.mock import patch, MagicMock, call
 import StringIO
 import sys
+import multiprocessing
 from ambari_agent.RecoveryManager import RecoveryManager
 
 
@@ -212,6 +213,7 @@ class TestHeartbeat(TestCase):
 
     dummy_controller = MagicMock()
     actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.statusCommandQueue = multiprocessing.Queue()
     statusCommand = {
       "serviceName" : 'HDFS',
       "commandType" : "STATUS_COMMAND",

http://git-wip-us.apache.org/repos/asf/ambari/blob/36f74224/ambari-agent/src/test/python/ambari_agent/TestMain.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py
index 998b778..97c448b 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestMain.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py
@@ -325,6 +325,7 @@ class TestMain(unittest.TestCase):
   @patch.object(Controller, "is_alive")
   @patch.object(Controller, "start")
   @patch.object(Controller, "getStatusCommandsExecutor")
+  @patch.object(Controller, "killStatusCommandsExecutorProcess")
   @patch("optparse.OptionParser.parse_args")
   @patch.object(DataCleaner,"start")
   @patch.object(DataCleaner,"__init__")
@@ -333,7 +334,8 @@ class TestMain(unittest.TestCase):
   @patch.object(ExitHelper,"execute_cleanup")
   @patch.object(ExitHelper, "exit")
   def test_main(self, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock,
-                parse_args_mock, start_mock, Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock,
+                parse_args_mock, start_mock, Controller_killStatusCommandsExecutorProcess,
+                Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock,
                 update_log_level_mock, daemonize_mock, perform_prestart_checks_mock,
                 ambari_config_mock,
                 stop_mock, bind_signal_handlers_mock,