You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rz...@apache.org on 2017/02/17 22:56:49 UTC

[11/51] [abbrv] ambari git commit: AMBARI-19802. Debug: agent randomly losing heartbeat with the server. (Attila Doroszlai via stoader)

AMBARI-19802. Debug: agent randomly losing heartbeat with the server. (Attila Doroszlai via stoader)

(cherry picked from commit d071727734468fd78fdf02a94f490b80cba8cdb2)

Change-Id: I12749ac0225a1c8c13a79b71e44b84b75162d770


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/44f45199
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/44f45199
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/44f45199

Branch: refs/heads/branch-feature-BUG-74026
Commit: 44f45199abbac060767f2cda997406ef8f092f60
Parents: f04d7ba
Author: Attila Doroszlai <ad...@hortonworks.com>
Authored: Wed Feb 1 19:06:29 2017 +0100
Committer: Zuul <re...@hortonworks.com>
Committed: Mon Feb 6 21:48:54 2017 -0800

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/Controller.py  | 51 +++++++++++++-------
 .../ambari_agent/StatusCommandsExecutor.py      |  1 +
 2 files changed, 34 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/44f45199/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 09f15de..61a74e6 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -274,7 +274,7 @@ class Controller(threading.Thread):
     self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
     retry = False
     certVerifFailed = False
-    state_interval = self.config.get('heartbeat', 'state_interval_seconds', '60')
+    state_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60'))
 
     # last time when state was successfully sent to server
     last_state_timestamp = 0.0
@@ -289,27 +289,34 @@ class Controller(threading.Thread):
     getrecoverycommands_timestamp = 0.0
     getrecoverycommands_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
 
+    heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
+
     while not self.DEBUG_STOP_HEARTBEATING:
-      heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
+      current_time = time.time()
+      logging_level = logging.DEBUG
+      if current_time - heartbeat_running_msg_timestamp > state_interval:
+        # log more steps every minute or so
+        logging_level = logging.INFO
+        heartbeat_running_msg_timestamp = current_time
 
       try:
-        crt_time = time.time()
-        if crt_time - heartbeat_running_msg_timestamp > int(state_interval):
-          logger.info("Heartbeat (response id = %s) with server is running...", self.responseId)
-          heartbeat_running_msg_timestamp = crt_time
+        logger.log(logging_level, "Heartbeat (response id = %s) with server is running...", self.responseId)
 
         send_state = False
         if not retry:
-          if crt_time - last_state_timestamp > int(state_interval):
+          if current_time - last_state_timestamp > state_interval:
             send_state = True
 
-          data = json.dumps(
-              self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
+          logger.log(logging_level, "Building heartbeat message")
+
+          data = json.dumps(self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
         else:
           self.DEBUG_HEARTBEAT_RETRIES += 1
 
-
-        logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
+        if logger.isEnabledFor(logging.DEBUG):
+          logger.log(logging_level, "Sending Heartbeat (id = %s): %s", self.responseId, data)
+        else:
+          logger.log(logging_level, "Sending Heartbeat (id = %s)", self.responseId)
 
         response = self.sendRequest(self.heartbeatUrl, data)
         exitStatus = 0
@@ -321,8 +328,7 @@ class Controller(threading.Thread):
 
         serverId = int(response['responseId'])
 
-
-        logger.debug('Heartbeat response received (id = %s)', serverId)
+        logger.log(logging_level, 'Heartbeat response received (id = %s)', serverId)
 
         cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else -1
 
@@ -331,8 +337,7 @@ class Controller(threading.Thread):
           if cluster_size > 0 \
           else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
 
-
-        logger.debug("Heartbeat interval is %s seconds", heartbeat_interval)
+        logger.log(logging_level, "Heartbeat interval is %s seconds", heartbeat_interval)
 
         if 'hasMappedComponents' in response.keys():
           self.hasMappedComponents = response['hasMappedComponents'] is not False
@@ -364,10 +369,11 @@ class Controller(threading.Thread):
         else:
           self.responseId = serverId
           if send_state:
-            last_state_timestamp = time.time()
+            last_state_timestamp = current_time
 
         # if the response contains configurations, update the in-memory and
         # disk-based configuration cache (execution and alert commands have this)
+        logger.log(logging_level, "Updating configurations from heartbeat")
         self.cluster_configuration.update_configurations_from_heartbeat(response)
 
         response_keys = response.keys()
@@ -375,6 +381,8 @@ class Controller(threading.Thread):
         # there's case when canceled task can be processed in Action Queue.execute before adding rescheduled task to queue
         # this can cause command failure instead result suppression
         # so canceling and putting rescheduled commands should be executed atomically
+        if 'cancelCommands' in response_keys or 'executionCommands' in response_keys:
+          logger.log(logging_level, "Adding cancel/execution commands")
         with self.actionQueue.lock:
           if 'cancelCommands' in response_keys:
             self.cancelCommandInQueue(response['cancelCommands'])
@@ -388,9 +396,10 @@ class Controller(threading.Thread):
           # try storing execution command details and desired state
           self.addToStatusQueue(response['statusCommands'])
 
-        if crt_time - getrecoverycommands_timestamp > int(getrecoverycommands_interval):
-          getrecoverycommands_timestamp = crt_time
+        if current_time - getrecoverycommands_timestamp > getrecoverycommands_interval:
+          getrecoverycommands_timestamp = current_time
           if not self.actionQueue.tasks_in_progress_or_pending():
+            logger.log(logging_level, "Adding recovery commands")
             recovery_commands = self.recovery_manager.get_recovery_commands()
             for recovery_command in recovery_commands:
               logger.info("Adding recovery command %s for component %s",
@@ -398,9 +407,11 @@ class Controller(threading.Thread):
               self.addToQueue([recovery_command])
 
         if 'alertDefinitionCommands' in response_keys:
+          logger.log(logging_level, "Updating alert definitions")
           self.alert_scheduler_handler.update_definitions(response)
 
         if 'alertExecutionCommands' in response_keys:
+          logger.log(logging_level, "Executing alert commands")
           self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
 
         if "true" == response['restartAgent']:
@@ -414,6 +425,7 @@ class Controller(threading.Thread):
 
         if "recoveryConfig" in response:
           # update the list of components enabled for recovery
+          logger.log(logging_level, "Updating recovery config")
           self.recovery_manager.update_configuration_from_registration(response)
 
         retry = False
@@ -455,12 +467,15 @@ class Controller(threading.Thread):
 
       # Sleep for some time
       timeout = heartbeat_interval - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
+      logger.log(logging_level, "Waiting %s for next heartbeat", timeout)
 
       if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
         # Stop loop when stop event received
         logger.info("Stop event received")
         self.DEBUG_STOP_HEARTBEATING=True
 
+      logger.log(logging_level, "Wait for next heartbeat over")
+
   def spawnStatusCommandsExecutorProcess(self):
     '''
     Starts a new StatusCommandExecutor child process. In case there is a running instance

http://git-wip-us.apache.org/repos/asf/ambari/blob/44f45199/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index fbb29f4..2f15770 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -47,6 +47,7 @@ class StatusCommandsExecutor(multiprocessing.Process):
   def run(self):
     try:
       bind_debug_signal_handlers()
+      logger.info("StatusCommandsExecutor starting")
       while True:
         command = self.actionQueue.statusCommandQueue.get(True) # blocks until status status command appears
         logger.debug("Running status command for {0}".format(command['componentName']))