You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/07/30 12:50:44 UTC

[ambari] branch branch-2.7 updated: AMBARI-24270. Agent Status Command Are Randomly Failing With Empty stderr (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 814a212  AMBARI-24270. Agent Status Command Are Randomly Failing With Empty stderr (aonishuk)
814a212 is described below

commit 814a21252543543712007fa51ee0108a5bd04dac
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Sun Jul 29 11:29:49 2018 +0300

    AMBARI-24270. Agent Status Command Are Randomly Failing With Empty stderr (aonishuk)
---
 .../ambari_agent/CustomServiceOrchestrator.py      | 43 ++++++++++++++--------
 1 file changed, 27 insertions(+), 16 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 0da28ab..1dd4fa0 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -24,6 +24,7 @@ from ConfigParser import NoOptionError
 
 import ambari_simplejson as json
 import sys
+import uuid
 
 from ambari_agent.models.commands import AgentCommand
 from ambari_commons import shell
@@ -89,9 +90,9 @@ class CustomServiceOrchestrator(object):
     self.exec_tmp_dir = AGENT_TMP_DIR
     self.file_cache = initializer_module.file_cache
     self.status_commands_stdout = os.path.join(self.tmp_dir,
-                                               'status_command_stdout.txt')
+                                               'status_command_stdout_{0}.txt')
     self.status_commands_stderr = os.path.join(self.tmp_dir,
-                                               'status_command_stderr.txt')
+                                               'status_command_stderr_{0}.txt')
 
     # Construct the hadoop credential lib JARs path
     self.credential_shell_lib_path = os.path.join(self.config.get('security', 'credential_lib_dir',
@@ -100,13 +101,6 @@ class CustomServiceOrchestrator(object):
     self.credential_conf_dir = self.config.get('security', 'credential_conf_dir', self.DEFAULT_CREDENTIAL_CONF_DIR)
 
     self.credential_shell_cmd = self.config.get('security', 'credential_shell_cmd', self.DEFAULT_CREDENTIAL_SHELL_CMD)
-
-    # Clean up old status command files if any
-    try:
-      os.unlink(self.status_commands_stdout)
-      os.unlink(self.status_commands_stderr)
-    except OSError:
-      pass # Ignore fail
     self.commands_in_progress_lock = threading.RLock()
     self.commands_in_progress = {}
 
@@ -464,7 +458,13 @@ class CustomServiceOrchestrator(object):
         self.commands_for_component_in_progress[cluster_id][command['role']] -= 1
 
       if json_path:
-        self.conditionally_remove_command_file(json_path, ret)
+        if is_status_command:
+          try:
+            os.unlink(json_path)
+          except OSError:
+            pass  # Ignore failure
+        else:
+          self.conditionally_remove_command_file(json_path, ret)
 
     return ret
 
@@ -518,9 +518,21 @@ class CustomServiceOrchestrator(object):
     if logger.level == logging.DEBUG:
       override_output_files = False
 
-    res = self.runCommand(command_header, self.status_commands_stdout,
-                          self.status_commands_stderr, self.COMMAND_NAME_STATUS,
-                          override_output_files=override_output_files, is_status_command=True)
+    # make sure status commands that run in parallel don't use the same files
+    status_commands_stdout = self.status_commands_stdout.format(uuid.uuid4())
+    status_commands_stderr = self.status_commands_stderr.format(uuid.uuid4())
+
+    try:
+      res = self.runCommand(command_header, status_commands_stdout,
+                            status_commands_stderr, self.COMMAND_NAME_STATUS,
+                            override_output_files=override_output_files, is_status_command=True)
+    finally:
+      try:
+        os.unlink(status_commands_stdout)
+        os.unlink(status_commands_stderr)
+      except OSError:
+        pass # Ignore failure
+
     return res
 
   def resolve_script_path(self, base_dir, script):
@@ -541,9 +553,8 @@ class CustomServiceOrchestrator(object):
     command_type = command['commandType']
 
     if command_type == AgentCommand.status:
-      # These files are frequently created, that's why we don't
-      # store them all, but only the latest one
-      file_path = os.path.join(self.tmp_dir, "status_command.json")
+      # make sure status commands that run in parallel don't use the same files
+      file_path = os.path.join(self.tmp_dir, "status_command_{0}.json".format(uuid.uuid4()))
     else:
       task_id = command['taskId']
       file_path = os.path.join(self.tmp_dir, "command-{0}.json".format(task_id))