You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by st...@apache.org on 2017/01/09 12:19:03 UTC
ambari git commit: AMBARI-19416. Ambari agents remain in heartbeat
lost state after ambari server restart. (stoader)
Repository: ambari
Updated Branches:
refs/heads/trunk 8fa580614 -> 1535bc982
AMBARI-19416. Ambari agents remain in heartbeat lost state after ambari server restart. (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1535bc98
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1535bc98
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1535bc98
Branch: refs/heads/trunk
Commit: 1535bc9823d7a9e58d1a3f195949a8c6d43b91d6
Parents: 8fa5806
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Mon Jan 9 13:18:53 2017 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Mon Jan 9 13:18:53 2017 +0100
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 5 ++++-
.../src/main/python/ambari_agent/Controller.py | 17 ++++++++++++++++-
ambari-agent/src/main/python/ambari_agent/main.py | 5 +++--
.../src/test/python/ambari_agent/TestHeartbeat.py | 2 ++
.../src/test/python/ambari_agent/TestMain.py | 4 +++-
5 files changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/1535bc98/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 3726286..18d7c2a 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -75,7 +75,10 @@ class ActionQueue(threading.Thread):
def __init__(self, config, controller):
super(ActionQueue, self).__init__()
self.commandQueue = Queue.Queue()
- self.statusCommandQueue = multiprocessing.Queue()
+ self.statusCommandQueue = None # the queue this field points to is re-created whenever
+ # a new StatusCommandExecutor child process is spawned
+ # by Controller
+ # multiprocessing.Queue()
self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor.
self.backgroundCommandQueue = Queue.Queue()
self.commandStatuses = CommandStatusDict(callback_action =
http://git-wip-us.apache.org/repos/asf/ambari/blob/1535bc98/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index f6bda1e..6b1b196 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
+import multiprocessing
import logging
import ambari_simplejson as json
import sys
@@ -202,7 +203,7 @@ class Controller(threading.Thread):
self.spawnStatusCommandsExecutorProcess()
elif self.statusCommandsExecutor.is_alive():
logger.info("Terminating statusCommandsExecutor as agent re-registered with server.")
- self.statusCommandsExecutor.kill()
+ self.killStatusCommandsExecutorProcess()
if 'statusCommands' in ret.keys():
logger.debug("Got status commands on registration.")
@@ -457,9 +458,23 @@ class Controller(threading.Thread):
self.DEBUG_STOP_HEARTBEATING=True
def spawnStatusCommandsExecutorProcess(self):
+ # Re-create the status command queue as in case the consumer
+ # process is killed the queue may deadlock (see http://bugs.python.org/issue20527).
+ # The queue must be re-created by the producer process.
+ if self.actionQueue.statusCommandQueue is not None:
+ self.actionQueue.statusCommandQueue.close()
+ self.actionQueue.statusCommandQueue.join_thread()
+
+ self.actionQueue.statusCommandQueue = multiprocessing.Queue()
+
self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
self.statusCommandsExecutor.start()
+ def killStatusCommandsExecutorProcess(self):
+ self.statusCommandsExecutor.kill()
+
+
+
def getStatusCommandsExecutor(self):
return self.statusCommandsExecutor
http://git-wip-us.apache.org/repos/asf/ambari/blob/1535bc98/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index f812226..2e0517b 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -312,11 +312,12 @@ def run_threads(server_hostname, heartbeat_stop_callback):
if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
if controller.getStatusCommandsExecutor().is_alive():
logger.info("Terminating statusCommandsExecutor")
- controller.getStatusCommandsExecutor().kill()
+ controller.killStatusCommandsExecutorProcess()
logger.info("Respawning statusCommandsExecutor")
controller.spawnStatusCommandsExecutorProcess()
- controller.getStatusCommandsExecutor().kill()
+ if controller.getStatusCommandsExecutor() is not None and controller.getStatusCommandsExecutor().is_alive():
+ controller.killStatusCommandsExecutorProcess()
# event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
# we need this for windows os, where no sigterm available
http://git-wip-us.apache.org/repos/asf/ambari/blob/1535bc98/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index 19fad56..de07743 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -24,6 +24,7 @@ import tempfile
from mock.mock import patch, MagicMock, call
import StringIO
import sys
+import multiprocessing
from ambari_agent.RecoveryManager import RecoveryManager
@@ -212,6 +213,7 @@ class TestHeartbeat(TestCase):
dummy_controller = MagicMock()
actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.statusCommandQueue = multiprocessing.Queue()
statusCommand = {
"serviceName" : 'HDFS',
"commandType" : "STATUS_COMMAND",
http://git-wip-us.apache.org/repos/asf/ambari/blob/1535bc98/ambari-agent/src/test/python/ambari_agent/TestMain.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py
index 6f38410..2deca20 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestMain.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py
@@ -341,6 +341,7 @@ class TestMain(unittest.TestCase):
@patch.object(Controller, "is_alive")
@patch.object(Controller, "start")
@patch.object(Controller, "getStatusCommandsExecutor")
+ @patch.object(Controller, "killStatusCommandsExecutorProcess")
@patch("optparse.OptionParser.parse_args")
@patch.object(DataCleaner,"start")
@patch.object(DataCleaner,"__init__")
@@ -349,7 +350,8 @@ class TestMain(unittest.TestCase):
@patch.object(ExitHelper,"execute_cleanup")
@patch.object(ExitHelper, "exit")
def test_main(self, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock,
- parse_args_mock, start_mock, Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock,
+ parse_args_mock, start_mock, Controller_killStatusCommandsExecutorProcess,
+ Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock,
update_log_level_mock, daemonize_mock, perform_prestart_checks_mock,
ambari_config_mock,
stop_mock, bind_signal_handlers_mock,