You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2016/04/22 11:56:53 UTC
ambari git commit: AMBARI-16036. Add logging for problems in
ambari-agent Controller and ActionQueue (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/trunk 3e3bb2dab -> 33c01273e
AMBARI-16036. Add logging for problems in ambari-agent Controller and ActionQueue (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/33c01273
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/33c01273
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/33c01273
Branch: refs/heads/trunk
Commit: 33c01273e919e344f5a062d5666ec9404b19d513
Parents: 3e3bb2d
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Fri Apr 22 12:54:36 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Fri Apr 22 12:56:23 2016 +0300
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 68 +++++++++++---------
.../src/main/python/ambari_agent/Controller.py | 36 +++++++----
.../python/ambari_agent/HeartbeatHandlers.py | 1 +
.../test/python/ambari_agent/TestActionQueue.py | 22 +++----
4 files changed, 71 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/33c01273/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index a0596a2..c5340a0 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -144,37 +144,43 @@ class ActionQueue(threading.Thread):
self.customServiceOrchestrator.cancel_command(task_id, reason)
def run(self):
- while not self.stopped():
- self.processBackgroundQueueSafeEmpty();
- self.processStatusCommandQueueSafeEmpty();
- try:
- if self.parallel_execution == 0:
- command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
- self.process_command(command)
- else:
- # If parallel execution is enabled, just kick off all available
- # commands using separate threads
- while (True):
+ try:
+ while not self.stopped():
+ self.processBackgroundQueueSafeEmpty();
+ self.processStatusCommandQueueSafeEmpty();
+ try:
+ if self.parallel_execution == 0:
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
- # If command is not retry_enabled then do not start them in parallel
- # checking just one command is enough as all commands for a stage is sent
- # at the same time and retry is only enabled for initial start/install
- retryAble = False
- if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']:
- retryAble = command['commandParams']['command_retry_enabled'] == "true"
- if retryAble:
- logger.info("Kicking off a thread for the command, id=" +
- str(command['commandId']) + " taskId=" + str(command['taskId']))
- t = threading.Thread(target=self.process_command, args=(command,))
- t.daemon = True
- t.start()
- else:
- self.process_command(command)
- break;
+ self.process_command(command)
+ else:
+ # If parallel execution is enabled, just kick off all available
+ # commands using separate threads
+ while (True):
+ command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
+ # If command is not retry_enabled then do not start them in parallel
+ # checking just one command is enough as all commands for a stage is sent
+ # at the same time and retry is only enabled for initial start/install
+ retryAble = False
+ if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']:
+ retryAble = command['commandParams']['command_retry_enabled'] == "true"
+ if retryAble:
+ logger.info("Kicking off a thread for the command, id=" +
+ str(command['commandId']) + " taskId=" + str(command['taskId']))
+ t = threading.Thread(target=self.process_command, args=(command,))
+ t.daemon = True
+ t.start()
+ else:
+ self.process_command(command)
+ break;
+ pass
pass
+ except (Queue.Empty):
pass
- except (Queue.Empty):
- pass
+ except:
+ logger.exception("ActionQueue thread failed with exception:")
+ raise
+
+ logger.info("ActionQueue thread has successfully finished")
def processBackgroundQueueSafeEmpty(self):
while not self.backgroundCommandQueue.empty():
@@ -217,10 +223,8 @@ class ActionQueue(threading.Thread):
self.execute_status_command(command)
else:
logger.error("Unrecognized command " + pprint.pformat(command))
- except Exception, err:
- # Should not happen
- traceback.print_exc()
- logger.warn(err)
+ except Exception:
+ logger.exception("Exception while processing {0} command".format(commandType))
def tasks_in_progress_or_pending(self):
return_val = False
http://git-wip-us.apache.org/repos/asf/ambari/blob/33c01273/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 40114ca..aee0eec 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -336,6 +336,7 @@ class Controller(threading.Thread):
except ssl.SSLError:
self.repeatRegistration=False
self.isRegistered = False
+ logger.exception("SSLError while trying to heartbeat.")
return
except Exception, err:
if "code" in err:
@@ -373,19 +374,26 @@ class Controller(threading.Thread):
self.DEBUG_STOP_HEARTBEATING=True
def run(self):
- self.actionQueue = ActionQueue(self.config, controller=self)
- self.actionQueue.start()
- self.register = Register(self.config)
- self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
-
- opener = urllib2.build_opener()
- urllib2.install_opener(opener)
-
- while True:
- self.repeatRegistration = False
- self.registerAndHeartbeat()
- if not self.repeatRegistration:
- break
+ try:
+ self.actionQueue = ActionQueue(self.config, controller=self)
+ self.actionQueue.start()
+ self.register = Register(self.config)
+ self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
+
+ opener = urllib2.build_opener()
+ urllib2.install_opener(opener)
+
+ while True:
+ self.repeatRegistration = False
+ self.registerAndHeartbeat()
+ if not self.repeatRegistration:
+ logger.info("Finished heartbeating and registering cycle")
+ break
+ except:
+ logger.exception("Controller thread failed with exception:")
+ raise
+
+ logger.info("Controller thread has successfully finished")
def registerAndHeartbeat(self):
registerResponse = self.registerWithServer()
@@ -406,6 +414,8 @@ class Controller(threading.Thread):
time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
self.heartbeatWithServer()
+ else:
+ logger.info("Registration response from %s didn't contain 'response' as a key".format(self.serverHostname))
def restartAgent(self):
ExitHelper().exit(AGENT_AUTO_RESTART_EXIT_CODE)
http://git-wip-us.apache.org/repos/asf/ambari/blob/33c01273/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
index 7a9797d..67e3c77 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
@@ -79,6 +79,7 @@ class HeartbeatStopHandlersWindows(HeartbeatStopHandlers):
def signal_handler(signum, frame):
global _handler
+ logger.info("Ambari-agent received {0} signal, stopping...".format(signum))
_handler.set_stop()
http://git-wip-us.apache.org/repos/asf/ambari/blob/33c01273/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index fcf2965..bca506e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -279,11 +279,11 @@ class TestActionQueue(TestCase):
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch("traceback.print_exc")
+ @patch("logging.RootLogger.exception")
@patch.object(ActionQueue, "execute_command")
@patch.object(ActionQueue, "execute_status_command")
def test_process_command(self, execute_status_command_mock,
- execute_command_mock, print_exc_mock):
+ execute_command_mock, log_exc_mock):
dummy_controller = MagicMock()
config = AmbariConfig()
config.set('agent', 'tolerate_download_failures', "true")
@@ -301,42 +301,42 @@ class TestActionQueue(TestCase):
actionQueue.process_command(wrong_command)
self.assertFalse(execute_command_mock.called)
self.assertFalse(execute_status_command_mock.called)
- self.assertFalse(print_exc_mock.called)
+ self.assertFalse(log_exc_mock.called)
execute_command_mock.reset_mock()
execute_status_command_mock.reset_mock()
- print_exc_mock.reset_mock()
+ log_exc_mock.reset_mock()
# Try normal execution
actionQueue.process_command(execution_command)
self.assertTrue(execute_command_mock.called)
self.assertFalse(execute_status_command_mock.called)
- self.assertFalse(print_exc_mock.called)
+ self.assertFalse(log_exc_mock.called)
execute_command_mock.reset_mock()
execute_status_command_mock.reset_mock()
- print_exc_mock.reset_mock()
+ log_exc_mock.reset_mock()
actionQueue.process_command(status_command)
self.assertFalse(execute_command_mock.called)
self.assertTrue(execute_status_command_mock.called)
- self.assertFalse(print_exc_mock.called)
+ self.assertFalse(log_exc_mock.called)
execute_command_mock.reset_mock()
execute_status_command_mock.reset_mock()
- print_exc_mock.reset_mock()
+ log_exc_mock.reset_mock()
# Try exception to check proper logging
def side_effect(self):
raise Exception("TerribleException")
execute_command_mock.side_effect = side_effect
actionQueue.process_command(execution_command)
- self.assertTrue(print_exc_mock.called)
+ self.assertTrue(log_exc_mock.called)
- print_exc_mock.reset_mock()
+ log_exc_mock.reset_mock()
execute_status_command_mock.side_effect = side_effect
actionQueue.process_command(execution_command)
- self.assertTrue(print_exc_mock.called)
+ self.assertTrue(log_exc_mock.called)
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
@patch.object(CustomServiceOrchestrator, "runCommand")