You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2016/04/22 11:56:53 UTC

ambari git commit: AMBARI-16036. Add logging for problems in ambari-agent Controller and ActionQueue (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/trunk 3e3bb2dab -> 33c01273e


AMBARI-16036. Add logging for problems in ambari-agent Controller and ActionQueue (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/33c01273
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/33c01273
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/33c01273

Branch: refs/heads/trunk
Commit: 33c01273e919e344f5a062d5666ec9404b19d513
Parents: 3e3bb2d
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Fri Apr 22 12:54:36 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Fri Apr 22 12:56:23 2016 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 68 +++++++++++---------
 .../src/main/python/ambari_agent/Controller.py  | 36 +++++++----
 .../python/ambari_agent/HeartbeatHandlers.py    |  1 +
 .../test/python/ambari_agent/TestActionQueue.py | 22 +++----
 4 files changed, 71 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/33c01273/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index a0596a2..c5340a0 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -144,37 +144,43 @@ class ActionQueue(threading.Thread):
       self.customServiceOrchestrator.cancel_command(task_id, reason)
 
   def run(self):
-    while not self.stopped():
-      self.processBackgroundQueueSafeEmpty();
-      self.processStatusCommandQueueSafeEmpty();
-      try:
-        if self.parallel_execution == 0:
-          command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
-          self.process_command(command)
-        else:
-          # If parallel execution is enabled, just kick off all available
-          # commands using separate threads
-          while (True):
+    try:
+      while not self.stopped():
+        self.processBackgroundQueueSafeEmpty();
+        self.processStatusCommandQueueSafeEmpty();
+        try:
+          if self.parallel_execution == 0:
             command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
-            # If command is not retry_enabled then do not start them in parallel
-            # checking just one command is enough as all commands for a stage is sent
-            # at the same time and retry is only enabled for initial start/install
-            retryAble = False
-            if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']:
-              retryAble = command['commandParams']['command_retry_enabled'] == "true"
-            if retryAble:
-              logger.info("Kicking off a thread for the command, id=" +
-                          str(command['commandId']) + " taskId=" + str(command['taskId']))
-              t = threading.Thread(target=self.process_command, args=(command,))
-              t.daemon = True
-              t.start()
-            else:
-              self.process_command(command)
-              break;
+            self.process_command(command)
+          else:
+            # If parallel execution is enabled, just kick off all available
+            # commands using separate threads
+            while (True):
+              command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
+              # If command is not retry_enabled then do not start them in parallel
+              # checking just one command is enough as all commands for a stage is sent
+              # at the same time and retry is only enabled for initial start/install
+              retryAble = False
+              if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']:
+                retryAble = command['commandParams']['command_retry_enabled'] == "true"
+              if retryAble:
+                logger.info("Kicking off a thread for the command, id=" +
+                            str(command['commandId']) + " taskId=" + str(command['taskId']))
+                t = threading.Thread(target=self.process_command, args=(command,))
+                t.daemon = True
+                t.start()
+              else:
+                self.process_command(command)
+                break;
+              pass
             pass
+        except (Queue.Empty):
           pass
-      except (Queue.Empty):
-        pass
+    except:
+      logger.exception("ActionQueue thread failed with exception:")
+      raise
+    
+    logger.info("ActionQueue thread has successfully finished")
 
   def processBackgroundQueueSafeEmpty(self):
     while not self.backgroundCommandQueue.empty():
@@ -217,10 +223,8 @@ class ActionQueue(threading.Thread):
         self.execute_status_command(command)
       else:
         logger.error("Unrecognized command " + pprint.pformat(command))
-    except Exception, err:
-      # Should not happen
-      traceback.print_exc()
-      logger.warn(err)
+    except Exception:
+      logger.exception("Exception while processing {0} command".format(commandType))
 
   def tasks_in_progress_or_pending(self):
     return_val = False

http://git-wip-us.apache.org/repos/asf/ambari/blob/33c01273/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 40114ca..aee0eec 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -336,6 +336,7 @@ class Controller(threading.Thread):
       except ssl.SSLError:
         self.repeatRegistration=False
         self.isRegistered = False
+        logger.exception("SSLError while trying to heartbeat.")
         return
       except Exception, err:
         if "code" in err:
@@ -373,19 +374,26 @@ class Controller(threading.Thread):
         self.DEBUG_STOP_HEARTBEATING=True
 
   def run(self):
-    self.actionQueue = ActionQueue(self.config, controller=self)
-    self.actionQueue.start()
-    self.register = Register(self.config)
-    self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
-
-    opener = urllib2.build_opener()
-    urllib2.install_opener(opener)
-
-    while True:
-      self.repeatRegistration = False
-      self.registerAndHeartbeat()
-      if not self.repeatRegistration:
-        break
+    try:
+      self.actionQueue = ActionQueue(self.config, controller=self)
+      self.actionQueue.start()
+      self.register = Register(self.config)
+      self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
+  
+      opener = urllib2.build_opener()
+      urllib2.install_opener(opener)
+  
+      while True:
+        self.repeatRegistration = False
+        self.registerAndHeartbeat()
+        if not self.repeatRegistration:
+          logger.info("Finished heartbeating and registering cycle")
+          break
+    except:
+      logger.exception("Controller thread failed with exception:")
+      raise
+    
+    logger.info("Controller thread has successfully finished")
 
   def registerAndHeartbeat(self):
     registerResponse = self.registerWithServer()
@@ -406,6 +414,8 @@ class Controller(threading.Thread):
 
         time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
         self.heartbeatWithServer()
+      else:
+        logger.info("Registration response from %s didn't contain 'response' as a key".format(self.serverHostname))
 
   def restartAgent(self):
     ExitHelper().exit(AGENT_AUTO_RESTART_EXIT_CODE)

http://git-wip-us.apache.org/repos/asf/ambari/blob/33c01273/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
index 7a9797d..67e3c77 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
@@ -79,6 +79,7 @@ class HeartbeatStopHandlersWindows(HeartbeatStopHandlers):
 
 def signal_handler(signum, frame):
   global _handler
+  logger.info("Ambari-agent received {0} signal, stopping...".format(signum))
   _handler.set_stop()
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/33c01273/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index fcf2965..bca506e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -279,11 +279,11 @@ class TestActionQueue(TestCase):
 
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch("traceback.print_exc")
+  @patch("logging.RootLogger.exception")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(ActionQueue, "execute_status_command")
   def test_process_command(self, execute_status_command_mock,
-                           execute_command_mock, print_exc_mock):
+                           execute_command_mock, log_exc_mock):
     dummy_controller = MagicMock()
     config = AmbariConfig()
     config.set('agent', 'tolerate_download_failures', "true")
@@ -301,42 +301,42 @@ class TestActionQueue(TestCase):
     actionQueue.process_command(wrong_command)
     self.assertFalse(execute_command_mock.called)
     self.assertFalse(execute_status_command_mock.called)
-    self.assertFalse(print_exc_mock.called)
+    self.assertFalse(log_exc_mock.called)
 
     execute_command_mock.reset_mock()
     execute_status_command_mock.reset_mock()
-    print_exc_mock.reset_mock()
+    log_exc_mock.reset_mock()
     # Try normal execution
     actionQueue.process_command(execution_command)
     self.assertTrue(execute_command_mock.called)
     self.assertFalse(execute_status_command_mock.called)
-    self.assertFalse(print_exc_mock.called)
+    self.assertFalse(log_exc_mock.called)
 
     execute_command_mock.reset_mock()
     execute_status_command_mock.reset_mock()
-    print_exc_mock.reset_mock()
+    log_exc_mock.reset_mock()
 
     actionQueue.process_command(status_command)
     self.assertFalse(execute_command_mock.called)
     self.assertTrue(execute_status_command_mock.called)
-    self.assertFalse(print_exc_mock.called)
+    self.assertFalse(log_exc_mock.called)
 
     execute_command_mock.reset_mock()
     execute_status_command_mock.reset_mock()
-    print_exc_mock.reset_mock()
+    log_exc_mock.reset_mock()
 
     # Try exception to check proper logging
     def side_effect(self):
       raise Exception("TerribleException")
     execute_command_mock.side_effect = side_effect
     actionQueue.process_command(execution_command)
-    self.assertTrue(print_exc_mock.called)
+    self.assertTrue(log_exc_mock.called)
 
-    print_exc_mock.reset_mock()
+    log_exc_mock.reset_mock()
 
     execute_status_command_mock.side_effect = side_effect
     actionQueue.process_command(execution_command)
-    self.assertTrue(print_exc_mock.called)
+    self.assertTrue(log_exc_mock.called)
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(CustomServiceOrchestrator, "runCommand")