You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by st...@apache.org on 2017/01/18 08:42:42 UTC
[1/5] ambari git commit: AMBARI-18505. Ambari Status commands should
enforce a timeout < heartbeat interval (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-2.5 40aeeb9c0 -> b512b26ae
AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f8ccb478
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f8ccb478
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f8ccb478
Branch: refs/heads/branch-2.5
Commit: f8ccb478902438c2d1bd43f182c8a7bd935b1088
Parents: 40aeeb9
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Tue Nov 8 11:32:46 2016 +0200
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Wed Jan 18 09:41:54 2017 +0100
----------------------------------------------------------------------
ambari-agent/conf/unix/ambari-agent.ini | 1 +
.../src/main/python/ambari_agent/ActionQueue.py | 41 ++++-----
.../src/main/python/ambari_agent/Controller.py | 10 +++
.../ambari_agent/PythonReflectiveExecutor.py | 20 +++--
.../ambari_agent/StatusCommandsExecutor.py | 91 ++++++++++++++++++++
.../src/main/python/ambari_agent/main.py | 21 ++++-
.../test/python/ambari_agent/TestActionQueue.py | 76 ++++------------
.../test/python/ambari_agent/TestController.py | 1 +
.../src/test/python/ambari_agent/TestMain.py | 3 +-
9 files changed, 175 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index f2c8846..56fa605 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -35,6 +35,7 @@ tolerate_download_failures=true
run_as_user=root
parallel_execution=0
alert_grace_period=5
+status_command_timeout=5
alert_kinit_timeout=14400000
system_resource_overrides=/etc/resource_overrides
; memory_threshold_soft_mb=400
http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index a9567c4..aeae954 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
import Queue
+import multiprocessing
import logging
import traceback
@@ -74,7 +75,8 @@ class ActionQueue(threading.Thread):
def __init__(self, config, controller):
super(ActionQueue, self).__init__()
self.commandQueue = Queue.Queue()
- self.statusCommandQueue = Queue.Queue()
+ self.statusCommandQueue = multiprocessing.Queue()
+ self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor.
self.backgroundCommandQueue = Queue.Queue()
self.commandStatuses = CommandStatusDict(callback_action =
self.status_update_callback)
@@ -96,13 +98,9 @@ class ActionQueue(threading.Thread):
return self._stop.isSet()
def put_status(self, commands):
- if not self.statusCommandQueue.empty():
- #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones
- statusCommandQueueSize = 0
- while not self.statusCommandQueue.empty():
- self.statusCommandQueue.get()
- statusCommandQueueSize = statusCommandQueueSize + 1
- logger.info("Number of status commands removed from queue : " + str(statusCommandQueueSize))
+ #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones
+ while not self.statusCommandQueue.empty():
+ self.statusCommandQueue.get()
for command in commands:
logger.info("Adding " + command['commandType'] + " for component " + \
@@ -158,7 +156,7 @@ class ActionQueue(threading.Thread):
try:
while not self.stopped():
self.processBackgroundQueueSafeEmpty();
- self.processStatusCommandQueueSafeEmpty();
+ self.processStatusCommandResultQueueSafeEmpty();
try:
if self.parallel_execution == 0:
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
@@ -202,14 +200,19 @@ class ActionQueue(threading.Thread):
except Queue.Empty:
pass
- def processStatusCommandQueueSafeEmpty(self):
- while not self.statusCommandQueue.empty():
+ def processStatusCommandResultQueueSafeEmpty(self):
+ while not self.statusCommandResultQueue.empty():
try:
- command = self.statusCommandQueue.get(False)
- self.process_command(command)
+ result = self.statusCommandResultQueue.get(False)
+ self.process_status_command_result(result)
except Queue.Empty:
pass
-
+ except IOError:
+ # on race condition in multiprocessing.Queue if get/put and thread kill are executed at the same time.
+ # During queue.close IOError will be thrown (this prevents from permanently dead-locked get).
+ pass
+ except UnicodeDecodeError:
+ pass
def createCommandHandle(self, command):
if command.has_key('__handle'):
@@ -230,8 +233,6 @@ class ActionQueue(threading.Thread):
finally:
if self.controller.recovery_manager.enabled():
self.controller.recovery_manager.stop_execution_command()
- elif commandType == self.STATUS_COMMAND:
- self.execute_status_command(command)
else:
logger.error("Unrecognized command " + pprint.pformat(command))
except Exception:
@@ -518,11 +519,12 @@ class ActionQueue(threading.Thread):
self.commandStatuses.put_command_status(handle.command, roleResult)
- def execute_status_command(self, command):
+ def process_status_command_result(self, result):
'''
Executes commands of type STATUS_COMMAND
'''
try:
+ command, component_status_result, component_security_status_result = result
cluster = command['clusterName']
service = command['serviceName']
component = command['componentName']
@@ -537,11 +539,6 @@ class ActionQueue(threading.Thread):
component_extra = None
- # For custom services, responsibility to determine service status is
- # delegated to python scripts
- component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
- component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(command)
-
if component_status_result['exitcode'] == 0:
component_status = LiveStatus.LIVE_STATUS
if self.controller.recovery_manager.enabled() \
http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index beeaad9..f6296d8 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -39,6 +39,7 @@ import AmbariConfig
from ambari_agent.Heartbeat import Heartbeat
from ambari_agent.Register import Register
from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.StatusCommandsExecutor import StatusCommandsExecutor
from ambari_agent.FileCache import FileCache
from ambari_agent.NetUtil import NetUtil
from ambari_agent.LiveStatus import LiveStatus
@@ -83,6 +84,7 @@ class Controller(threading.Thread):
self.cachedconnect = None
self.max_reconnect_retry_delay = int(config.get('server','max_reconnect_retry_delay', default=30))
self.hasMappedComponents = True
+ self.statusCommandsExecutor = None
# Event is used for synchronizing heartbeat iterations (to make possible
# manual wait() interruption between heartbeats )
self.heartbeat_stop_callback = heartbeat_stop_callback
@@ -448,10 +450,18 @@ class Controller(threading.Thread):
logger.info("Stop event received")
self.DEBUG_STOP_HEARTBEATING=True
+ def spawnStatusCommandsExecutorProcess(self):
+ self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
+ self.statusCommandsExecutor.start()
+
+ def getStatusCommandsExecutor(self):
+ return self.statusCommandsExecutor
+
def run(self):
try:
self.actionQueue = ActionQueue(self.config, controller=self)
self.actionQueue.start()
+ self.spawnStatusCommandsExecutorProcess()
self.register = Register(self.config)
self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
index 655b2fc..b27d7d1 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
@@ -53,7 +53,9 @@ class PythonReflectiveExecutor(PythonExecutor):
returncode = 1
try:
- with PythonContext(script_dir, pythonCommand):
+ current_context = PythonContext(script_dir, pythonCommand)
+ PythonReflectiveExecutor.last_context = current_context
+ with current_context:
imp.load_source('__main__', script)
except SystemExit as e:
returncode = e.code
@@ -76,6 +78,8 @@ class PythonContext:
def __init__(self, script_dir, pythonCommand):
self.script_dir = script_dir
self.pythonCommand = pythonCommand
+ self.is_reverted = False
+ self.is_forced_revert = False
def __enter__(self):
self.old_sys_path = copy.copy(sys.path)
@@ -88,12 +92,18 @@ class PythonContext:
sys.argv = self.pythonCommand[1:]
def __exit__(self, exc_type, exc_val, exc_tb):
- sys.path = self.old_sys_path
- sys.argv = self.old_agv
- logging.disable(self.old_logging_disable)
- self.revert_sys_modules(self.old_sys_modules)
+ self.revert(is_forced_revert=False)
return False
+ def revert(self, is_forced_revert=True):
+ if not self.is_reverted:
+ self.is_forced_revert = is_forced_revert
+ self.is_reverted = True
+ sys.path = self.old_sys_path
+ sys.argv = self.old_agv
+ logging.disable(self.old_logging_disable)
+ self.revert_sys_modules(self.old_sys_modules)
+
def revert_sys_modules(self, value):
sys.modules.update(value)
http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
new file mode 100644
index 0000000..5d3607a
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import signal
+import threading
+import logging
+import multiprocessing
+from ambari_agent.PythonReflectiveExecutor import PythonReflectiveExecutor
+from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers
+
+logger = logging.getLogger(__name__)
+
+class StatusCommandsExecutor(multiprocessing.Process):
+ """
+ A process which executes status/security status commands.
+
+ It dies and respawns itself on timeout of the command. Which is the most graceful way to end the currently running status command.
+ """
+ def __init__(self, config, actionQueue):
+ multiprocessing.Process.__init__(self)
+
+ self.config = config
+ self.actionQueue = actionQueue
+
+ self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) # in seconds
+ self.hasTimeoutedEvent = multiprocessing.Event()
+
+ def run(self):
+ try:
+ bind_debug_signal_handlers()
+ while True:
+ command = self.actionQueue.statusCommandQueue.get(True) # blocks until status status command appears
+ logger.debug("Running status command for {0}".format(command['componentName']))
+
+ timeout_timer = threading.Timer( self.status_command_timeout, self.respawn, [command])
+ timeout_timer.start()
+
+ self.process_status_command(command)
+
+ timeout_timer.cancel()
+ logger.debug("Completed status command for {0}".format(command['componentName']))
+ except:
+ logger.exception("StatusCommandsExecutor process failed with exception:")
+ raise
+
+ logger.warn("StatusCommandsExecutor process has finished")
+
+ def process_status_command(self, command):
+ component_status_result = self.actionQueue.customServiceOrchestrator.requestComponentStatus(command)
+ component_security_status_result = self.actionQueue.customServiceOrchestrator.requestComponentSecurityState(command)
+ result = (command, component_status_result, component_security_status_result)
+
+ self.actionQueue.statusCommandResultQueue.put(result)
+
+ def respawn(self, command):
+ try:
+ if hasattr(PythonReflectiveExecutor, "last_context"):
+ # Force context to reset to normal. By context we mean sys.path, imports, etc. They are set by specific status command, and are not relevant to ambari-agent.
+ PythonReflectiveExecutor.last_context.revert()
+
+ logger.warn("Command {0} for {1} is running for more than {2} seconds. Terminating it due to timeout.".format(command['commandType'], command['componentName'], self.status_command_timeout))
+
+ self.hasTimeoutedEvent.set()
+ except:
+ logger.exception("StatusCommandsExecutor.finish thread failed with exception:")
+ raise
+
+ def kill(self):
+ os.kill(self.pid, signal.SIGKILL)
+
+ # prevent queue from ending up with non-freed semaphores, locks during put. Which would result in dead-lock in process executing get.
+ self.actionQueue.statusCommandResultQueue.close()
+ self.actionQueue.statusCommandResultQueue.join_thread()
+ self.actionQueue.statusCommandResultQueue = multiprocessing.Queue()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 4eb478a..968b828 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -284,6 +284,22 @@ def reset_agent(options):
MAX_RETRIES = 10
+def run_threads(server_hostname, heartbeat_stop_callback):
+ # Launch Controller communication
+ controller = Controller(config, server_hostname, heartbeat_stop_callback)
+ controller.start()
+ while controller.is_alive():
+ time.sleep(0.1)
+
+ if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
+ if controller.getStatusCommandsExecutor().is_alive():
+ logger.info("Terminating statusCommandsExecutor")
+ controller.getStatusCommandsExecutor().kill()
+ logger.info("Respawning statusCommandsExecutor")
+ controller.spawnStatusCommandsExecutorProcess()
+
+ controller.getStatusCommandsExecutor().kill()
+
# event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
# we need this for windows os, where no sigterm available
def main(heartbeat_stop_callback=None):
@@ -388,10 +404,7 @@ def main(heartbeat_stop_callback=None):
# Set the active server
active_server = server_hostname
# Launch Controller communication
- controller = Controller(config, server_hostname, heartbeat_stop_callback)
- controller.start()
- while controller.is_alive():
- time.sleep(0.1)
+ run_threads(server_hostname, heartbeat_stop_callback)
#
# If Ambari Agent connected to the server or
http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 6a9bad1..d4f5436 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -325,9 +325,7 @@ class TestActionQueue(TestCase):
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
@patch("logging.RootLogger.exception")
@patch.object(ActionQueue, "execute_command")
- @patch.object(ActionQueue, "execute_status_command")
- def test_process_command(self, execute_status_command_mock,
- execute_command_mock, log_exc_mock):
+ def test_process_command(self, execute_command_mock, log_exc_mock):
dummy_controller = MagicMock()
config = AmbariConfig()
config.set('agent', 'tolerate_download_failures', "true")
@@ -344,29 +342,19 @@ class TestActionQueue(TestCase):
# Try wrong command
actionQueue.process_command(wrong_command)
self.assertFalse(execute_command_mock.called)
- self.assertFalse(execute_status_command_mock.called)
self.assertFalse(log_exc_mock.called)
execute_command_mock.reset_mock()
- execute_status_command_mock.reset_mock()
log_exc_mock.reset_mock()
# Try normal execution
actionQueue.process_command(execution_command)
self.assertTrue(execute_command_mock.called)
- self.assertFalse(execute_status_command_mock.called)
self.assertFalse(log_exc_mock.called)
execute_command_mock.reset_mock()
- execute_status_command_mock.reset_mock()
log_exc_mock.reset_mock()
- actionQueue.process_command(status_command)
- self.assertFalse(execute_command_mock.called)
- self.assertTrue(execute_status_command_mock.called)
- self.assertFalse(log_exc_mock.called)
-
execute_command_mock.reset_mock()
- execute_status_command_mock.reset_mock()
log_exc_mock.reset_mock()
# Try exception to check proper logging
@@ -378,7 +366,6 @@ class TestActionQueue(TestCase):
log_exc_mock.reset_mock()
- execute_status_command_mock.side_effect = side_effect
actionQueue.process_command(execution_command)
self.assertTrue(log_exc_mock.called)
@@ -944,14 +931,11 @@ class TestActionQueue(TestCase):
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
@patch.object(ActionQueue, "status_update_callback")
- @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
- @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
@patch.object(ActionQueue, "execute_command")
@patch.object(LiveStatus, "build")
@patch.object(CustomServiceOrchestrator, "__init__")
def test_execute_status_command(self, CustomServiceOrchestrator_mock,
- build_mock, execute_command_mock, requestComponentSecurityState_mock,
- requestComponentStatus_mock,
+ build_mock, execute_command_mock,
status_update_callback):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
@@ -961,33 +945,25 @@ class TestActionQueue(TestCase):
dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
- requestComponentStatus_mock.reset_mock()
- requestComponentStatus_mock.return_value = {'exitcode': 0 }
+ result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
- requestComponentSecurityState_mock.reset_mock()
- requestComponentSecurityState_mock.return_value = 'UNKNOWN'
-
- actionQueue.execute_status_command(self.status_command)
+ actionQueue.process_status_command_result(result)
report = actionQueue.result()
expected = {'dummy report': '',
'securityState' : 'UNKNOWN'}
self.assertEqual(len(report['componentStatus']), 1)
self.assertEqual(report['componentStatus'][0], expected)
- self.assertTrue(requestComponentStatus_mock.called)
@patch.object(RecoveryManager, "command_exists")
@patch.object(RecoveryManager, "requires_recovery")
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
@patch.object(ActionQueue, "status_update_callback")
- @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
- @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
@patch.object(ActionQueue, "execute_command")
@patch.object(LiveStatus, "build")
@patch.object(CustomServiceOrchestrator, "__init__")
- def test_execute_status_command_recovery(self, CustomServiceOrchestrator_mock,
- build_mock, execute_command_mock, requestComponentSecurityState_mock,
- requestComponentStatus_mock,
+ def test_process_status_command_result_recovery(self, CustomServiceOrchestrator_mock,
+ build_mock, execute_command_mock,
status_update_callback, requires_recovery_mock,
command_exists_mock):
CustomServiceOrchestrator_mock.return_value = None
@@ -1000,13 +976,9 @@ class TestActionQueue(TestCase):
dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False)
- requestComponentStatus_mock.reset_mock()
- requestComponentStatus_mock.return_value = {'exitcode': 0 }
-
- requestComponentSecurityState_mock.reset_mock()
- requestComponentSecurityState_mock.return_value = 'UNKNOWN'
+ result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
- actionQueue.execute_status_command(self.status_command)
+ actionQueue.process_status_command_result(result)
report = actionQueue.result()
expected = {'dummy report': '',
'securityState' : 'UNKNOWN',
@@ -1014,17 +986,13 @@ class TestActionQueue(TestCase):
self.assertEqual(len(report['componentStatus']), 1)
self.assertEqual(report['componentStatus'][0], expected)
- self.assertTrue(requestComponentStatus_mock.called)
requires_recovery_mock.return_value = True
command_exists_mock.return_value = True
- requestComponentStatus_mock.reset_mock()
- requestComponentStatus_mock.return_value = {'exitcode': 0 }
-
- requestComponentSecurityState_mock.reset_mock()
- requestComponentSecurityState_mock.return_value = 'UNKNOWN'
+
+ result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
- actionQueue.execute_status_command(self.status_command)
+ actionQueue.process_status_command_result(result)
report = actionQueue.result()
expected = {'dummy report': '',
'securityState' : 'UNKNOWN',
@@ -1032,39 +1000,33 @@ class TestActionQueue(TestCase):
self.assertEqual(len(report['componentStatus']), 1)
self.assertEqual(report['componentStatus'][0], expected)
- self.assertTrue(requestComponentStatus_mock.called)
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
@patch.object(ActionQueue, "status_update_callback")
- @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
- @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
@patch.object(ActionQueue, "execute_command")
@patch.object(LiveStatus, "build")
@patch.object(CustomServiceOrchestrator, "__init__")
- def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock,
- requestComponentSecurityState_mock,
+ def test_process_status_command_result_with_alerts(self, CustomServiceOrchestrator_mock,
build_mock, execute_command_mock,
- requestComponentStatus_mock,
status_update_callback):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-
-
- requestComponentStatus_mock.reset_mock()
- requestComponentStatus_mock.return_value = {
+ command_return_value = {
'exitcode': 0,
'stdout': 'out',
'stderr': 'err',
'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
}
+
+ result = (self.status_command_for_alerts, command_return_value, command_return_value)
+
build_mock.return_value = {'somestatusresult': 'aresult'}
- actionQueue.execute_status_command(self.status_command_for_alerts)
+ actionQueue.process_status_command_result(result)
report = actionQueue.result()
- self.assertTrue(requestComponentStatus_mock.called)
self.assertEqual(len(report['componentStatus']), 1)
self.assertTrue(report['componentStatus'][0].has_key('alerts'))
@@ -1324,7 +1286,7 @@ class TestActionQueue(TestCase):
execute_command = copy.deepcopy(self.background_command)
actionQueue.put([execute_command])
actionQueue.processBackgroundQueueSafeEmpty();
- actionQueue.processStatusCommandQueueSafeEmpty();
+ actionQueue.processStatusCommandResultQueueSafeEmpty();
#assert that python execturor start
self.assertTrue(runCommand_mock.called)
@@ -1368,7 +1330,7 @@ class TestActionQueue(TestCase):
None, command_complete_w)
actionQueue.put([self.background_command])
actionQueue.processBackgroundQueueSafeEmpty();
- actionQueue.processStatusCommandQueueSafeEmpty();
+ actionQueue.processStatusCommandResultQueueSafeEmpty();
with lock:
complete_done.wait(0.1)
http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 59b41cd..b47af03 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -44,6 +44,7 @@ import ambari_commons
@not_for_platform(PLATFORM_WINDOWS)
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+@patch.object(Controller.Controller, "spawnStatusCommandsExecutorProcess", new = MagicMock())
class TestController(unittest.TestCase):
logger = logging.getLogger()
http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/test/python/ambari_agent/TestMain.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py
index 400241f..998b778 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestMain.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py
@@ -324,6 +324,7 @@ class TestMain(unittest.TestCase):
@patch.object(Controller, "__init__")
@patch.object(Controller, "is_alive")
@patch.object(Controller, "start")
+ @patch.object(Controller, "getStatusCommandsExecutor")
@patch("optparse.OptionParser.parse_args")
@patch.object(DataCleaner,"start")
@patch.object(DataCleaner,"__init__")
@@ -332,7 +333,7 @@ class TestMain(unittest.TestCase):
@patch.object(ExitHelper,"execute_cleanup")
@patch.object(ExitHelper, "exit")
def test_main(self, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock,
- parse_args_mock, start_mock, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock,
+ parse_args_mock, start_mock, Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock,
update_log_level_mock, daemonize_mock, perform_prestart_checks_mock,
ambari_config_mock,
stop_mock, bind_signal_handlers_mock,
[5/5] ambari git commit: AMBARI-19520. Ambari agents not recovering
from heart beat lost state immediately after successful re-registering with
server. (stoader)
Posted by st...@apache.org.
AMBARI-19520. Ambari agents not recovering from heart beat lost state immediately after successful re-registering with server. (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b512b26a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b512b26a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b512b26a
Branch: refs/heads/branch-2.5
Commit: b512b26ae48c92df0b8d884c08f5f07cf9a2875b
Parents: 36f7422
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Mon Jan 16 13:43:01 2017 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Wed Jan 18 09:41:56 2017 +0100
----------------------------------------------------------------------
.../src/main/python/ambari_agent/Controller.py | 63 ++++++++++++++------
.../src/main/python/ambari_agent/main.py | 18 +++---
2 files changed, 53 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/b512b26a/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 2244d30..09ab1e6 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -86,6 +86,10 @@ class Controller(threading.Thread):
self.max_reconnect_retry_delay = int(config.get('server','max_reconnect_retry_delay', default=30))
self.hasMappedComponents = True
self.statusCommandsExecutor = None
+
+ # this lock is used control which thread spawns/kills the StatusCommandExecutor child process
+ self.spawnKillStatusCommandExecutorLock = threading.RLock()
+
# Event is used for synchronizing heartbeat iterations (to make possible
# manual wait() interruption between heartbeats )
self.heartbeat_stop_callback = heartbeat_stop_callback
@@ -199,11 +203,9 @@ class Controller(threading.Thread):
self.config.update_configuration_from_registration(ret)
logger.debug("Updated config:" + str(self.config))
- if self.statusCommandsExecutor is None:
- self.spawnStatusCommandsExecutorProcess()
- elif self.statusCommandsExecutor.is_alive():
- logger.info("Terminating statusCommandsExecutor as agent re-registered with server.")
- self.killStatusCommandsExecutorProcess()
+ # Start StatusCommandExecutor child process or restart it if already running
+ # in order to receive up to date agent config.
+ self.spawnStatusCommandsExecutorProcess()
if 'statusCommands' in ret.keys():
logger.debug("Got status commands on registration.")
@@ -458,22 +460,43 @@ class Controller(threading.Thread):
self.DEBUG_STOP_HEARTBEATING=True
def spawnStatusCommandsExecutorProcess(self):
- # Re-create the status command queue as in case the consumer
- # process is killed the queue may deadlock (see http://bugs.python.org/issue20527).
- # The queue must be re-created by the producer process.
- if self.actionQueue.statusCommandQueue is not None:
- self.actionQueue.statusCommandQueue.close()
- self.actionQueue.statusCommandQueue.join_thread()
-
- self.actionQueue.statusCommandQueue = multiprocessing.Queue()
-
- self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
- self.statusCommandsExecutor.start()
+ '''
+ Starts a new StatusCommandExecutor child process. In case there is a running instance
+ already restarts it by simply killing it and starting new one.
+ This function is thread-safe.
+ '''
+ with self.getSpawnKillStatusCommandExecutorLock():
+ # if there is already an instance of StatusCommandExecutor kill it first
+ self.killStatusCommandsExecutorProcess()
+
+ # Re-create the status command queue as in case the consumer
+ # process is killed the queue may deadlock (see http://bugs.python.org/issue20527).
+ # The queue must be re-created by the producer process.
+ statusCommandQueue = self.actionQueue.statusCommandQueue
+ self.actionQueue.statusCommandQueue = multiprocessing.Queue()
+
+ if statusCommandQueue is not None:
+ statusCommandQueue.close()
+
+ logger.info("Spawning statusCommandsExecutor")
+ self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
+ self.statusCommandsExecutor.start()
def killStatusCommandsExecutorProcess(self):
- self.statusCommandsExecutor.kill()
-
-
+ '''
+ Kills the StatusExecutorChild process if exists. This function is thread-safe.
+ '''
+ with self.getSpawnKillStatusCommandExecutorLock():
+ if self.statusCommandsExecutor is not None and self.statusCommandsExecutor.is_alive():
+ logger.info("Terminating statusCommandsExecutor.")
+ self.statusCommandsExecutor.kill()
+
+ def getSpawnKillStatusCommandExecutorLock(self):
+ '''
+ Re-entrant lock to be used to synchronize the spawning or killing of
+ StatusCommandExecutor child process in multi-thread environment.
+ '''
+ return self.spawnKillStatusCommandExecutorLock;
def getStatusCommandsExecutor(self):
return self.statusCommandsExecutor
@@ -586,6 +609,8 @@ class Controller(threading.Thread):
except Exception, e:
logger.info("Exception in move_data_dir_mount_file(). Error: {0}".format(str(e)))
+
+
def main(argv=None):
# Allow Ctrl-C
http://git-wip-us.apache.org/repos/asf/ambari/blob/b512b26a/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 2e1124e..8e577a5 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -291,15 +291,15 @@ def run_threads(server_hostname, heartbeat_stop_callback):
while controller.is_alive():
time.sleep(0.1)
- if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
- if controller.getStatusCommandsExecutor().is_alive():
- logger.info("Terminating statusCommandsExecutor")
- controller.killStatusCommandsExecutorProcess()
- logger.info("Respawning statusCommandsExecutor")
- controller.spawnStatusCommandsExecutorProcess()
-
- if controller.getStatusCommandsExecutor() is not None and controller.getStatusCommandsExecutor().is_alive():
- controller.killStatusCommandsExecutorProcess()
+ with controller.getSpawnKillStatusCommandExecutorLock():
+ # We need to lock as Controller.py may try to spawn StatusCommandExecutor child in parallel as well
+ if controller.getStatusCommandsExecutor() is not None \
+ and (not controller.getStatusCommandsExecutor().is_alive()
+ or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
+ controller.spawnStatusCommandsExecutorProcess()
+
+
+ controller.killStatusCommandsExecutorProcess()
# event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
# we need this for windows os, where no sigterm available
[4/5] ambari git commit: AMBARI-19416. Ambari agents remain in
heartbeat lost state after ambari server restart. (stoader)
Posted by st...@apache.org.
AMBARI-19416. Ambari agents remain in heartbeat lost state after ambari server restart. (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/36f74224
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/36f74224
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/36f74224
Branch: refs/heads/branch-2.5
Commit: 36f742246530af98913051bb27dcd3b20368e474
Parents: b5d3e07
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Mon Jan 9 13:18:53 2017 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Wed Jan 18 09:41:55 2017 +0100
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 5 ++++-
.../src/main/python/ambari_agent/Controller.py | 17 ++++++++++++++++-
ambari-agent/src/main/python/ambari_agent/main.py | 5 +++--
.../src/test/python/ambari_agent/TestHeartbeat.py | 2 ++
.../src/test/python/ambari_agent/TestMain.py | 4 +++-
5 files changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/36f74224/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 3726286..18d7c2a 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -75,7 +75,10 @@ class ActionQueue(threading.Thread):
def __init__(self, config, controller):
super(ActionQueue, self).__init__()
self.commandQueue = Queue.Queue()
- self.statusCommandQueue = multiprocessing.Queue()
+ self.statusCommandQueue = None # the queue this field points to is re-created whenever
+ # a new StatusCommandExecutor child process is spawned
+ # by Controller
+ # multiprocessing.Queue()
self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor.
self.backgroundCommandQueue = Queue.Queue()
self.commandStatuses = CommandStatusDict(callback_action =
http://git-wip-us.apache.org/repos/asf/ambari/blob/36f74224/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 11b98f4..2244d30 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
+import multiprocessing
import logging
import ambari_simplejson as json
import sys
@@ -202,7 +203,7 @@ class Controller(threading.Thread):
self.spawnStatusCommandsExecutorProcess()
elif self.statusCommandsExecutor.is_alive():
logger.info("Terminating statusCommandsExecutor as agent re-registered with server.")
- self.statusCommandsExecutor.kill()
+ self.killStatusCommandsExecutorProcess()
if 'statusCommands' in ret.keys():
logger.debug("Got status commands on registration.")
@@ -457,9 +458,23 @@ class Controller(threading.Thread):
self.DEBUG_STOP_HEARTBEATING=True
def spawnStatusCommandsExecutorProcess(self):
+ # Re-create the status command queue as in case the consumer
+ # process is killed the queue may deadlock (see http://bugs.python.org/issue20527).
+ # The queue must be re-created by the producer process.
+ if self.actionQueue.statusCommandQueue is not None:
+ self.actionQueue.statusCommandQueue.close()
+ self.actionQueue.statusCommandQueue.join_thread()
+
+ self.actionQueue.statusCommandQueue = multiprocessing.Queue()
+
self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
self.statusCommandsExecutor.start()
+ def killStatusCommandsExecutorProcess(self):
+ self.statusCommandsExecutor.kill()
+
+
+
def getStatusCommandsExecutor(self):
return self.statusCommandsExecutor
http://git-wip-us.apache.org/repos/asf/ambari/blob/36f74224/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 968b828..2e1124e 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -294,11 +294,12 @@ def run_threads(server_hostname, heartbeat_stop_callback):
if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
if controller.getStatusCommandsExecutor().is_alive():
logger.info("Terminating statusCommandsExecutor")
- controller.getStatusCommandsExecutor().kill()
+ controller.killStatusCommandsExecutorProcess()
logger.info("Respawning statusCommandsExecutor")
controller.spawnStatusCommandsExecutorProcess()
- controller.getStatusCommandsExecutor().kill()
+ if controller.getStatusCommandsExecutor() is not None and controller.getStatusCommandsExecutor().is_alive():
+ controller.killStatusCommandsExecutorProcess()
# event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
# we need this for windows os, where no sigterm available
http://git-wip-us.apache.org/repos/asf/ambari/blob/36f74224/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index 19fad56..de07743 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -24,6 +24,7 @@ import tempfile
from mock.mock import patch, MagicMock, call
import StringIO
import sys
+import multiprocessing
from ambari_agent.RecoveryManager import RecoveryManager
@@ -212,6 +213,7 @@ class TestHeartbeat(TestCase):
dummy_controller = MagicMock()
actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.statusCommandQueue = multiprocessing.Queue()
statusCommand = {
"serviceName" : 'HDFS',
"commandType" : "STATUS_COMMAND",
http://git-wip-us.apache.org/repos/asf/ambari/blob/36f74224/ambari-agent/src/test/python/ambari_agent/TestMain.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py
index 998b778..97c448b 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestMain.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py
@@ -325,6 +325,7 @@ class TestMain(unittest.TestCase):
@patch.object(Controller, "is_alive")
@patch.object(Controller, "start")
@patch.object(Controller, "getStatusCommandsExecutor")
+ @patch.object(Controller, "killStatusCommandsExecutorProcess")
@patch("optparse.OptionParser.parse_args")
@patch.object(DataCleaner,"start")
@patch.object(DataCleaner,"__init__")
@@ -333,7 +334,8 @@ class TestMain(unittest.TestCase):
@patch.object(ExitHelper,"execute_cleanup")
@patch.object(ExitHelper, "exit")
def test_main(self, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock,
- parse_args_mock, start_mock, Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock,
+ parse_args_mock, start_mock, Controller_killStatusCommandsExecutorProcess,
+ Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock,
update_log_level_mock, daemonize_mock, perform_prestart_checks_mock,
ambari_config_mock,
stop_mock, bind_signal_handlers_mock,
[3/5] ambari git commit: AMBARI-18922 Agent Auto Restart Doesn't
Release Ping Port (dsen)
Posted by st...@apache.org.
AMBARI-18922 Agent Auto Restart Doesn't Release Ping Port (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d46461a3
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d46461a3
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d46461a3
Branch: refs/heads/branch-2.5
Commit: d46461a308762f29c789d044873b685643fc63bc
Parents: f8ccb47
Author: Dmytro Sen <ds...@apache.org>
Authored: Mon Nov 21 12:01:31 2016 +0200
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Wed Jan 18 09:41:55 2017 +0100
----------------------------------------------------------------------
.../src/main/python/ambari_agent/AmbariAgent.py | 19 +++++++++++++------
.../ambari_agent/StatusCommandsExecutor.py | 4 +++-
.../test/python/ambari_agent/TestAmbariAgent.py | 7 ++++++-
3 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/d46461a3/ambari-agent/src/main/python/ambari_agent/AmbariAgent.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariAgent.py b/ambari-agent/src/main/python/ambari_agent/AmbariAgent.py
index d701e49..28b9528 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariAgent.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariAgent.py
@@ -21,6 +21,7 @@ limitations under the License.
import os
import sys
import subprocess
+import signal
from Controller import AGENT_AUTO_RESTART_EXIT_CODE
if os.environ.has_key("PYTHON_BIN"):
@@ -48,12 +49,18 @@ def main():
mergedArgs = [PYTHON, AGENT_SCRIPT] + args
- while status == AGENT_AUTO_RESTART_EXIT_CODE:
- mainProcess = subprocess.Popen(mergedArgs)
- mainProcess.communicate()
- status = mainProcess.returncode
- if os.path.isfile(AGENT_PID_FILE) and status == AGENT_AUTO_RESTART_EXIT_CODE:
- os.remove(AGENT_PID_FILE)
+ # Become a parent for all subprocesses
+ os.setpgrp()
+
+ try:
+ while status == AGENT_AUTO_RESTART_EXIT_CODE:
+ mainProcess = subprocess.Popen(mergedArgs)
+ mainProcess.communicate()
+ status = mainProcess.returncode
+ if os.path.isfile(AGENT_PID_FILE) and status == AGENT_AUTO_RESTART_EXIT_CODE:
+ os.remove(AGENT_PID_FILE)
+ finally:
+ os.killpg(0, signal.SIGKILL)
if __name__ == "__main__":
main()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/d46461a3/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 5d3607a..fbb29f4 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -24,6 +24,7 @@ import logging
import multiprocessing
from ambari_agent.PythonReflectiveExecutor import PythonReflectiveExecutor
from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers
+from ambari_agent.ExitHelper import ExitHelper
logger = logging.getLogger(__name__)
@@ -41,6 +42,7 @@ class StatusCommandsExecutor(multiprocessing.Process):
self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) # in seconds
self.hasTimeoutedEvent = multiprocessing.Event()
+ ExitHelper().register(self.kill)
def run(self):
try:
@@ -88,4 +90,4 @@ class StatusCommandsExecutor(multiprocessing.Process):
# prevent queue from ending up with non-freed semaphores, locks during put. Which would result in dead-lock in process executing get.
self.actionQueue.statusCommandResultQueue.close()
self.actionQueue.statusCommandResultQueue.join_thread()
- self.actionQueue.statusCommandResultQueue = multiprocessing.Queue()
\ No newline at end of file
+ self.actionQueue.statusCommandResultQueue = multiprocessing.Queue()
http://git-wip-us.apache.org/repos/asf/ambari/blob/d46461a3/ambari-agent/src/test/python/ambari_agent/TestAmbariAgent.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAmbariAgent.py b/ambari-agent/src/test/python/ambari_agent/TestAmbariAgent.py
index 1be487c..8ff192a 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAmbariAgent.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAmbariAgent.py
@@ -34,7 +34,10 @@ class TestAmbariAgent(unittest.TestCase):
@patch.object(subprocess, "Popen")
@patch("os.path.isfile")
@patch("os.remove")
- def test_main(self, os_remove_mock, os_path_isfile_mock, subprocess_popen_mock):
+ @patch("os.killpg")
+ @patch("os.setpgrp")
+ def test_main(self, os_setpgrp_mock, os_killpg_mock, os_remove_mock,
+ os_path_isfile_mock, subprocess_popen_mock):
facter1 = MagicMock()
facter2 = MagicMock()
subprocess_popen_mock.side_effect = [facter1, facter2]
@@ -46,6 +49,7 @@ class TestAmbariAgent(unittest.TestCase):
sys.argv[0] = "test data"
AmbariAgent.main()
+ self.assertTrue(os_setpgrp_mock.called)
self.assertTrue(subprocess_popen_mock.called)
self.assertTrue(subprocess_popen_mock.call_count == 2)
self.assertTrue(facter1.communicate.called)
@@ -53,6 +57,7 @@ class TestAmbariAgent(unittest.TestCase):
self.assertTrue(os_path_isfile_mock.called)
self.assertTrue(os_path_isfile_mock.call_count == 2)
self.assertTrue(os_remove_mock.called)
+ self.assertTrue(os_killpg_mock.called)
#
# Test AmbariConfig.getLogFile() for ambari-agent
[2/5] ambari git commit: AMBARI-19392. Status command executor may
use obsolete settings. (stoader)
Posted by st...@apache.org.
AMBARI-19392. Status command executor may use obsolete settings. (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b5d3e072
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b5d3e072
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b5d3e072
Branch: refs/heads/branch-2.5
Commit: b5d3e072fe44fb3ceb124b769310d8bb75e9c88a
Parents: d46461a
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Fri Jan 6 23:45:31 2017 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Wed Jan 18 09:41:55 2017 +0100
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 14 +++++++++++---
.../src/main/python/ambari_agent/Controller.py | 7 ++++++-
2 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/b5d3e072/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index aeae954..3726286 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -98,9 +98,17 @@ class ActionQueue(threading.Thread):
return self._stop.isSet()
def put_status(self, commands):
- #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones
- while not self.statusCommandQueue.empty():
- self.statusCommandQueue.get()
+ if not self.statusCommandQueue.empty():
+ #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones
+ statusCommandQueueSize = 0
+ try:
+ while not self.statusCommandQueue.empty():
+ self.statusCommandQueue.get(False)
+ statusCommandQueueSize = statusCommandQueueSize + 1
+ except Queue.Empty:
+ pass
+
+ logger.info("Number of status commands removed from queue : " + str(statusCommandQueueSize))
for command in commands:
logger.info("Adding " + command['commandType'] + " for component " + \
http://git-wip-us.apache.org/repos/asf/ambari/blob/b5d3e072/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index f6296d8..11b98f4 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -198,6 +198,12 @@ class Controller(threading.Thread):
self.config.update_configuration_from_registration(ret)
logger.debug("Updated config:" + str(self.config))
+ if self.statusCommandsExecutor is None:
+ self.spawnStatusCommandsExecutorProcess()
+ elif self.statusCommandsExecutor.is_alive():
+ logger.info("Terminating statusCommandsExecutor as agent re-registered with server.")
+ self.statusCommandsExecutor.kill()
+
if 'statusCommands' in ret.keys():
logger.debug("Got status commands on registration.")
self.addToStatusQueue(ret['statusCommands'])
@@ -461,7 +467,6 @@ class Controller(threading.Thread):
try:
self.actionQueue = ActionQueue(self.config, controller=self)
self.actionQueue.start()
- self.spawnStatusCommandsExecutorProcess()
self.register = Register(self.config)
self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())