You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2014/04/14 20:11:16 UTC

git commit: AMBARI-5441. Flume: add process reporting mechanism (ncole)

Repository: ambari
Updated Branches:
  refs/heads/trunk e124c7d80 -> b740b57b7


AMBARI-5441. Flume: add process reporting mechanism (ncole)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b740b57b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b740b57b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b740b57b

Branch: refs/heads/trunk
Commit: b740b57b7f7ce68d4863d31ea2aa357aac726e0e
Parents: e124c7d
Author: Nate Cole <nc...@hortonworks.com>
Authored: Fri Apr 11 12:55:48 2014 -0400
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Mon Apr 14 13:44:10 2014 -0400

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 17 +++++-
 .../ambari_agent/CustomServiceOrchestrator.py   |  8 +--
 .../main/python/ambari_agent/PythonExecutor.py  | 11 +++-
 .../libraries/script/script.py                  |  3 +-
 .../test/python/ambari_agent/TestActionQueue.py |  1 +
 .../TestCustomServiceOrchestrator.py            |  5 +-
 .../python/ambari_agent/TestPuppetExecutor.py   |  2 -
 .../python/ambari_agent/TestPythonExecutor.py   |  2 +-
 .../services/FLUME/package/scripts/flume.py     | 62 +++++++++++++++-----
 .../FLUME/package/scripts/flume_handler.py      | 10 +++-
 .../python/stacks/2.0.6/FLUME/test_flume.py     | 41 ++++++++++---
 11 files changed, 123 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 551b59e..53191d7 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -250,16 +250,31 @@ class ActionQueue(threading.Thread):
 
       livestatus = LiveStatus(cluster, service, component,
                               globalConfig, self.config, self.configTags)
+
       component_status = None
+      component_extra = None
       if command_format == self.COMMAND_FORMAT_V2:
         # For custom services, responsibility to determine service status is
         # delegated to python scripts
-        component_status = self.customServiceOrchestrator.requestComponentStatus(command)
+        component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
+
+        if component_status_result['exitcode'] == 0:
+          component_status = LiveStatus.LIVE_STATUS
+        else:
+          component_status = LiveStatus.DEAD_STATUS
+
+        if component_status_result.has_key('structuredOut'):
+          component_extra = component_status_result['structuredOut']
 
       result = livestatus.build(forsed_component_status= component_status)
+
+      if component_extra is not None and len(component_extra) != 0:
+        result['extra'] = component_extra
+
       logger.debug("Got live status for component " + component + \
                    " of service " + str(service) + \
                    " of cluster " + str(cluster))
+
       logger.debug(pprint.pformat(result))
       if result is not None:
         self.commandStatuses.put_command_status(command, result)

http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 91d1b8d..a1e59a8 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -102,9 +102,9 @@ class CustomServiceOrchestrator():
         script_path = self.resolve_script_path(base_dir, script, script_type)
         script_tuple = (script_path, base_dir)
 
-
       tmpstrucoutfile = os.path.join(self.tmp_dir,
                                     "structured-out-{0}.json".format(task_id))
+
       if script_type.upper() != self.SCRIPT_TYPE_PYTHON:
       # We don't support anything else yet
         message = "Unknown script type {0}".format(script_type)
@@ -160,11 +160,7 @@ class CustomServiceOrchestrator():
     res = self.runCommand(command, self.status_commands_stdout,
                           self.status_commands_stderr, self.COMMAND_NAME_STATUS,
                           override_output_files=override_output_files)
-    if res['exitcode'] == 0:
-      return LiveStatus.LIVE_STATUS
-    else:
-      return LiveStatus.DEAD_STATUS
-
+    return res
 
   def resolve_script_path(self, base_dir, script, script_type):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index 3b175a1..f6a679d 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -64,6 +64,15 @@ class PythonExecutor:
     else: # Append to files
       tmpout =  open(tmpoutfile, 'a')
       tmperr =  open(tmperrfile, 'a')
+
+    # need to remove this file for the following case:
+    # status call 1 does not write to file; call 2 writes to file;
+    # call 3 does not write to file, so contents are still call 2's result
+    try:
+      os.unlink(tmpstructedoutfile)
+    except OSError:
+      pass # no error
+
     script_params += [tmpstructedoutfile]
     pythonCommand = self.python_command(script, script_params)
     logger.info("Running command " + pprint.pformat(pythonCommand))
@@ -94,7 +103,7 @@ class PythonExecutor:
         }
         logger.warn(structured_out)
       else:
-        structured_out = '{}'
+        structured_out = {}
 
     if self.python_process_has_been_killed:
       error = str(error) + "\n Python script has been killed due to timeout"

http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/main/python/resource_management/libraries/script/script.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/resource_management/libraries/script/script.py b/ambari-agent/src/main/python/resource_management/libraries/script/script.py
index d62bcd1..a51c9e8 100644
--- a/ambari-agent/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-agent/src/main/python/resource_management/libraries/script/script.py
@@ -58,9 +58,8 @@ class Script(object):
   def put_structured_out(self, sout):
     Script.structuredOut.update(sout)
     try:
-      structuredOut = json.dumps(Script.structuredOut)
       with open(self.stroutfile, 'w') as fp:
-        json.dump(structuredOut, fp)
+        json.dump(Script.structuredOut, fp)
     except IOError:
       Script.structuredOut.update({"errMsg" : "Unable to write to " + self.stroutfile})
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index abc6edc..bf93c10 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -448,6 +448,7 @@ class TestActionQueue(TestCase):
 
     # Check execution ov V2 status command
     requestComponentStatus_mock.reset_mock()
+    requestComponentStatus_mock.return_value = {'exitcode': 0}
     determine_command_format_version_mock.return_value = ActionQueue.COMMAND_FORMAT_V2
     actionQueue.execute_status_command(self.status_command)
     report = actionQueue.result()

http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index 54c17a6..ac1194e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -307,15 +307,16 @@ class TestCustomServiceOrchestrator(TestCase):
     runCommand_mock.return_value = {
       "exitcode" : 0
     }
+
     status = orchestrator.requestComponentStatus(status_command)
-    self.assertEqual(LiveStatus.LIVE_STATUS, status)
+    self.assertEqual(runCommand_mock.return_value, status)
 
     # Test dead case
     runCommand_mock.return_value = {
       "exitcode" : 1
     }
     status = orchestrator.requestComponentStatus(status_command)
-    self.assertEqual(LiveStatus.DEAD_STATUS, status)
+    self.assertEqual(runCommand_mock.return_value, status)
 
 
   def tearDown(self):

http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py
index eb9cf45..ee7f9d8 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py
@@ -231,8 +231,6 @@ class TestPuppetExecutor(TestCase):
 
   class  PuppetExecutor_mock(PuppetExecutor):
 
-
-
     def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config, subprocess_mockup):
       self.subprocess_mockup = subprocess_mockup
       PuppetExecutor.__init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config)

http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
index 8a73410..abfef7c 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
@@ -114,7 +114,7 @@ class TestPythonExecutor(TestCase):
     subproc_mock.should_finish_event.set()
     result = executor.run_file("file", ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstroutfile)
     self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output',
-                               'structuredOut': {'msg': 'Unable to read structured output from ' + tmpstroutfile}})
+                               'structuredOut': {}})
 
 
   def test_is_successfull(self):

http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
index a24ad4a..431935b 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
@@ -17,6 +17,7 @@ limitations under the License.
 
 """
 
+import glob
 import os
 from resource_management import *
 
@@ -25,7 +26,7 @@ def flume(action = None):
 
   flume_agents = {}
   if params.flume_conf_content is not None:
-    flume_agents = buildFlumeTopology(params.flume_conf_content)
+    flume_agents = build_flume_topology(params.flume_conf_content)
 
   if action == 'config':
     Directory(params.flume_conf_dir)
@@ -58,26 +59,25 @@ def flume(action = None):
       flume_agent_conf_file = flume_agent_conf_dir + os.sep + "flume.conf"
       flume_agent_pid_file = params.flume_run_dir + os.sep + agent + ".pid"
 
-      # TODO someday make the ganglia ports configurable
-      extra_args = ''
-      if params.ganglia_server_host is not None:
-        extra_args = '-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts={0}:{1}'
-        extra_args = extra_args.format(params.ganglia_server_host, '8655')
+      if not is_live(flume_agent_pid_file):
+        # TODO someday make the ganglia ports configurable
+        extra_args = ''
+        if params.ganglia_server_host is not None:
+          extra_args = '-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts={0}:{1}'
+          extra_args = extra_args.format(params.ganglia_server_host, '8655')
 
-      flume_cmd = flume_base.format(agent, flume_agent_conf_dir,
-         flume_agent_conf_file, extra_args)
+        flume_cmd = flume_base.format(agent, flume_agent_conf_dir,
+           flume_agent_conf_file, extra_args)
 
-      Execute(flume_cmd, wait_for_finish=False)
+        Execute(flume_cmd, wait_for_finish=False)
 
-      # TODO sometimes startup spawns a couple of threads - so only the first line may count
-      pid_cmd = format('pgrep -f {flume_agent_conf_file} > {flume_agent_pid_file}')
+        # sometimes startup spawns a couple of threads - so only the first line may count
+        pid_cmd = format('pgrep -o -f {flume_agent_conf_file} > {flume_agent_pid_file}')
 
-      Execute(pid_cmd, logoutput=True, tries=5, try_sleep=10)
+        Execute(pid_cmd, logoutput=True, tries=5, try_sleep=10)
 
     pass
   elif action == 'stop':
-    import glob
-
     pid_files = glob.glob(params.flume_run_dir + os.sep + "*.pid")
 
     if 0 == len(pid_files):
@@ -96,7 +96,7 @@ def flume(action = None):
 
 # define a map of dictionaries, where the key is agent name
 # and the dictionary is the name/value pair
-def buildFlumeTopology(content):
+def build_flume_topology(content):
   import ConfigParser
   import StringIO
 
@@ -129,3 +129,35 @@ def buildFlumeTopology(content):
 
   return result
 
+def is_live(pid_file):
+  live = False
+
+  try:
+    check_process_status(pid_file)
+    live = True
+  except ComponentIsNotRunning:
+    pass
+
+  return live
+
+def live_status(pid_file):
+  res = {}
+  res['name'] = pid_file.split(os.sep).pop()
+  res['status'] = 'RUNNING' if is_live(pid_file) else 'NOT_RUNNING'
+
+  return res
+  
+
+def flume_status():
+  import params
+  
+  procs = []
+
+  pid_files = glob.glob(params.flume_run_dir + os.sep + "*.pid")
+
+  if 0 != len(pid_files):
+    for pid_file in pid_files:
+      procs.append(live_status(pid_file))
+
+  return procs
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
index 341379d..f796435 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
@@ -19,6 +19,7 @@ limitations under the License.
 
 from resource_management import *
 from flume import flume
+from flume import flume_status
 
 class FlumeHandler(Script):
   def install(self, env):
@@ -50,7 +51,14 @@ class FlumeHandler(Script):
     flume(action='config')
 
   def status(self, env):
-    pass
+    import params
+
+    env.set_params(params)
+
+    json = {}
+    json['processes'] = flume_status()
+
+    self.put_structured_out(json)
 
 if __name__ == "__main__":
   FlumeHandler().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
index 48920e1..2cb51df 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
@@ -29,8 +29,8 @@ class TestFlumeHandler(RMFTestCase):
     self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
                        classname = "FlumeHandler",
                        command = "configure",
-                       config_file="default.json"
-    )
+                       config_file="default.json")
+
     self.assert_configure_default()
     self.assertNoMoreResources()
 
@@ -38,8 +38,7 @@ class TestFlumeHandler(RMFTestCase):
     self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
                        classname = "FlumeHandler",
                        command = "start",
-                       config_file="default.json"
-    )
+                       config_file="default.json")
 
     self.assert_configure_default()
 
@@ -51,7 +50,7 @@ class TestFlumeHandler(RMFTestCase):
       '-Dflume.monitoring.hosts=c6401.ambari.apache.org:8655'),
       wait_for_finish = False)
 
-    self.assertResourceCalled('Execute', 'pgrep -f /etc/flume/conf/a1/flume.conf > /var/run/flume/a1.pid',
+    self.assertResourceCalled('Execute', 'pgrep -o -f /etc/flume/conf/a1/flume.conf > /var/run/flume/a1.pid',
       logoutput = True,
       tries = 5,
       try_sleep = 10)
@@ -65,8 +64,7 @@ class TestFlumeHandler(RMFTestCase):
     self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
                        classname = "FlumeHandler",
                        command = "stop",
-                       config_file="default.json"
-    )
+                       config_file="default.json")
 
     self.assertTrue(glob_mock.called)
 
@@ -77,6 +75,34 @@ class TestFlumeHandler(RMFTestCase):
 
     self.assertNoMoreResources()
 
+  @patch("resource_management.libraries.script.Script.put_structured_out")
+  def test_status_default(self, structured_out_mock):
+    self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+                       classname = "FlumeHandler",
+                       command = "status",
+                       config_file="default.json")
+    
+    # test that the method was called with empty processes
+    self.assertTrue(structured_out_mock.called)
+    structured_out_mock.assert_called_with({'processes': []})
+
+    self.assertNoMoreResources()
+
+  @patch("resource_management.libraries.script.Script.put_structured_out")
+  @patch("glob.glob")
+  def test_status_with_result(self, glob_mock, structured_out_mock):
+    glob_mock.return_value = ['/var/run/flume/a1.pid']
+
+    self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+                       classname = "FlumeHandler",
+                       command = "status",
+                       config_file="default.json")
+    
+    self.assertTrue(structured_out_mock.called)
+    structured_out_mock.assert_called_with({'processes': [{'status': 'NOT_RUNNING', 'name': 'a1.pid'}]})
+
+    self.assertNoMoreResources()
+
   def assert_configure_default(self):
 
     self.assertResourceCalled('Directory', '/etc/flume/conf')
@@ -95,7 +121,6 @@ class TestFlumeHandler(RMFTestCase):
       content = Template('log4j.properties.j2', agent_name = 'a1'),
       mode = 0644)
 
-
 def buildFlumeTopology(content):
   import os
   import ConfigParser