You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2014/04/14 20:11:16 UTC
git commit: AMBARI-5441. Flume: add process reporting mechanism
(ncole)
Repository: ambari
Updated Branches:
refs/heads/trunk e124c7d80 -> b740b57b7
AMBARI-5441. Flume: add process reporting mechanism (ncole)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b740b57b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b740b57b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b740b57b
Branch: refs/heads/trunk
Commit: b740b57b7f7ce68d4863d31ea2aa357aac726e0e
Parents: e124c7d
Author: Nate Cole <nc...@hortonworks.com>
Authored: Fri Apr 11 12:55:48 2014 -0400
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Mon Apr 14 13:44:10 2014 -0400
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 17 +++++-
.../ambari_agent/CustomServiceOrchestrator.py | 8 +--
.../main/python/ambari_agent/PythonExecutor.py | 11 +++-
.../libraries/script/script.py | 3 +-
.../test/python/ambari_agent/TestActionQueue.py | 1 +
.../TestCustomServiceOrchestrator.py | 5 +-
.../python/ambari_agent/TestPuppetExecutor.py | 2 -
.../python/ambari_agent/TestPythonExecutor.py | 2 +-
.../services/FLUME/package/scripts/flume.py | 62 +++++++++++++++-----
.../FLUME/package/scripts/flume_handler.py | 10 +++-
.../python/stacks/2.0.6/FLUME/test_flume.py | 41 ++++++++++---
11 files changed, 123 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 551b59e..53191d7 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -250,16 +250,31 @@ class ActionQueue(threading.Thread):
livestatus = LiveStatus(cluster, service, component,
globalConfig, self.config, self.configTags)
+
component_status = None
+ component_extra = None
if command_format == self.COMMAND_FORMAT_V2:
# For custom services, responsibility to determine service status is
# delegated to python scripts
- component_status = self.customServiceOrchestrator.requestComponentStatus(command)
+ component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
+
+ if component_status_result['exitcode'] == 0:
+ component_status = LiveStatus.LIVE_STATUS
+ else:
+ component_status = LiveStatus.DEAD_STATUS
+
+ if component_status_result.has_key('structuredOut'):
+ component_extra = component_status_result['structuredOut']
result = livestatus.build(forsed_component_status= component_status)
+
+ if component_extra is not None and len(component_extra) != 0:
+ result['extra'] = component_extra
+
logger.debug("Got live status for component " + component + \
" of service " + str(service) + \
" of cluster " + str(cluster))
+
logger.debug(pprint.pformat(result))
if result is not None:
self.commandStatuses.put_command_status(command, result)
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 91d1b8d..a1e59a8 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -102,9 +102,9 @@ class CustomServiceOrchestrator():
script_path = self.resolve_script_path(base_dir, script, script_type)
script_tuple = (script_path, base_dir)
-
tmpstrucoutfile = os.path.join(self.tmp_dir,
"structured-out-{0}.json".format(task_id))
+
if script_type.upper() != self.SCRIPT_TYPE_PYTHON:
# We don't support anything else yet
message = "Unknown script type {0}".format(script_type)
@@ -160,11 +160,7 @@ class CustomServiceOrchestrator():
res = self.runCommand(command, self.status_commands_stdout,
self.status_commands_stderr, self.COMMAND_NAME_STATUS,
override_output_files=override_output_files)
- if res['exitcode'] == 0:
- return LiveStatus.LIVE_STATUS
- else:
- return LiveStatus.DEAD_STATUS
-
+ return res
def resolve_script_path(self, base_dir, script, script_type):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index 3b175a1..f6a679d 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -64,6 +64,15 @@ class PythonExecutor:
else: # Append to files
tmpout = open(tmpoutfile, 'a')
tmperr = open(tmperrfile, 'a')
+
+ # need to remove this file for the following case:
+ # status call 1 does not write to file; call 2 writes to file;
+ # call 3 does not write to file, so contents are still call 2's result
+ try:
+ os.unlink(tmpstructedoutfile)
+ except OSError:
+ pass # no error
+
script_params += [tmpstructedoutfile]
pythonCommand = self.python_command(script, script_params)
logger.info("Running command " + pprint.pformat(pythonCommand))
@@ -94,7 +103,7 @@ class PythonExecutor:
}
logger.warn(structured_out)
else:
- structured_out = '{}'
+ structured_out = {}
if self.python_process_has_been_killed:
error = str(error) + "\n Python script has been killed due to timeout"
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/main/python/resource_management/libraries/script/script.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/resource_management/libraries/script/script.py b/ambari-agent/src/main/python/resource_management/libraries/script/script.py
index d62bcd1..a51c9e8 100644
--- a/ambari-agent/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-agent/src/main/python/resource_management/libraries/script/script.py
@@ -58,9 +58,8 @@ class Script(object):
def put_structured_out(self, sout):
Script.structuredOut.update(sout)
try:
- structuredOut = json.dumps(Script.structuredOut)
with open(self.stroutfile, 'w') as fp:
- json.dump(structuredOut, fp)
+ json.dump(Script.structuredOut, fp)
except IOError:
Script.structuredOut.update({"errMsg" : "Unable to write to " + self.stroutfile})
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index abc6edc..bf93c10 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -448,6 +448,7 @@ class TestActionQueue(TestCase):
# Check execution ov V2 status command
requestComponentStatus_mock.reset_mock()
+ requestComponentStatus_mock.return_value = {'exitcode': 0}
determine_command_format_version_mock.return_value = ActionQueue.COMMAND_FORMAT_V2
actionQueue.execute_status_command(self.status_command)
report = actionQueue.result()
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index 54c17a6..ac1194e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -307,15 +307,16 @@ class TestCustomServiceOrchestrator(TestCase):
runCommand_mock.return_value = {
"exitcode" : 0
}
+
status = orchestrator.requestComponentStatus(status_command)
- self.assertEqual(LiveStatus.LIVE_STATUS, status)
+ self.assertEqual(runCommand_mock.return_value, status)
# Test dead case
runCommand_mock.return_value = {
"exitcode" : 1
}
status = orchestrator.requestComponentStatus(status_command)
- self.assertEqual(LiveStatus.DEAD_STATUS, status)
+ self.assertEqual(runCommand_mock.return_value, status)
def tearDown(self):
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py
index eb9cf45..ee7f9d8 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py
@@ -231,8 +231,6 @@ class TestPuppetExecutor(TestCase):
class PuppetExecutor_mock(PuppetExecutor):
-
-
def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config, subprocess_mockup):
self.subprocess_mockup = subprocess_mockup
PuppetExecutor.__init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config)
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
index 8a73410..abfef7c 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
@@ -114,7 +114,7 @@ class TestPythonExecutor(TestCase):
subproc_mock.should_finish_event.set()
result = executor.run_file("file", ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstroutfile)
self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output',
- 'structuredOut': {'msg': 'Unable to read structured output from ' + tmpstroutfile}})
+ 'structuredOut': {}})
def test_is_successfull(self):
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
index a24ad4a..431935b 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
@@ -17,6 +17,7 @@ limitations under the License.
"""
+import glob
import os
from resource_management import *
@@ -25,7 +26,7 @@ def flume(action = None):
flume_agents = {}
if params.flume_conf_content is not None:
- flume_agents = buildFlumeTopology(params.flume_conf_content)
+ flume_agents = build_flume_topology(params.flume_conf_content)
if action == 'config':
Directory(params.flume_conf_dir)
@@ -58,26 +59,25 @@ def flume(action = None):
flume_agent_conf_file = flume_agent_conf_dir + os.sep + "flume.conf"
flume_agent_pid_file = params.flume_run_dir + os.sep + agent + ".pid"
- # TODO someday make the ganglia ports configurable
- extra_args = ''
- if params.ganglia_server_host is not None:
- extra_args = '-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts={0}:{1}'
- extra_args = extra_args.format(params.ganglia_server_host, '8655')
+ if not is_live(flume_agent_pid_file):
+ # TODO someday make the ganglia ports configurable
+ extra_args = ''
+ if params.ganglia_server_host is not None:
+ extra_args = '-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts={0}:{1}'
+ extra_args = extra_args.format(params.ganglia_server_host, '8655')
- flume_cmd = flume_base.format(agent, flume_agent_conf_dir,
- flume_agent_conf_file, extra_args)
+ flume_cmd = flume_base.format(agent, flume_agent_conf_dir,
+ flume_agent_conf_file, extra_args)
- Execute(flume_cmd, wait_for_finish=False)
+ Execute(flume_cmd, wait_for_finish=False)
- # TODO sometimes startup spawns a couple of threads - so only the first line may count
- pid_cmd = format('pgrep -f {flume_agent_conf_file} > {flume_agent_pid_file}')
+ # sometimes startup spawns a couple of threads - so only the first line may count
+ pid_cmd = format('pgrep -o -f {flume_agent_conf_file} > {flume_agent_pid_file}')
- Execute(pid_cmd, logoutput=True, tries=5, try_sleep=10)
+ Execute(pid_cmd, logoutput=True, tries=5, try_sleep=10)
pass
elif action == 'stop':
- import glob
-
pid_files = glob.glob(params.flume_run_dir + os.sep + "*.pid")
if 0 == len(pid_files):
@@ -96,7 +96,7 @@ def flume(action = None):
# define a map of dictionaries, where the key is agent name
# and the dictionary is the name/value pair
-def buildFlumeTopology(content):
+def build_flume_topology(content):
import ConfigParser
import StringIO
@@ -129,3 +129,35 @@ def buildFlumeTopology(content):
return result
+def is_live(pid_file):
+ live = False
+
+ try:
+ check_process_status(pid_file)
+ live = True
+ except ComponentIsNotRunning:
+ pass
+
+ return live
+
+def live_status(pid_file):
+ res = {}
+ res['name'] = pid_file.split(os.sep).pop()
+ res['status'] = 'RUNNING' if is_live(pid_file) else 'NOT_RUNNING'
+
+ return res
+
+
+def flume_status():
+ import params
+
+ procs = []
+
+ pid_files = glob.glob(params.flume_run_dir + os.sep + "*.pid")
+
+ if 0 != len(pid_files):
+ for pid_file in pid_files:
+ procs.append(live_status(pid_file))
+
+ return procs
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
index 341379d..f796435 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
@@ -19,6 +19,7 @@ limitations under the License.
from resource_management import *
from flume import flume
+from flume import flume_status
class FlumeHandler(Script):
def install(self, env):
@@ -50,7 +51,14 @@ class FlumeHandler(Script):
flume(action='config')
def status(self, env):
- pass
+ import params
+
+ env.set_params(params)
+
+ json = {}
+ json['processes'] = flume_status()
+
+ self.put_structured_out(json)
if __name__ == "__main__":
FlumeHandler().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/b740b57b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
index 48920e1..2cb51df 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
@@ -29,8 +29,8 @@ class TestFlumeHandler(RMFTestCase):
self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
classname = "FlumeHandler",
command = "configure",
- config_file="default.json"
- )
+ config_file="default.json")
+
self.assert_configure_default()
self.assertNoMoreResources()
@@ -38,8 +38,7 @@ class TestFlumeHandler(RMFTestCase):
self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
classname = "FlumeHandler",
command = "start",
- config_file="default.json"
- )
+ config_file="default.json")
self.assert_configure_default()
@@ -51,7 +50,7 @@ class TestFlumeHandler(RMFTestCase):
'-Dflume.monitoring.hosts=c6401.ambari.apache.org:8655'),
wait_for_finish = False)
- self.assertResourceCalled('Execute', 'pgrep -f /etc/flume/conf/a1/flume.conf > /var/run/flume/a1.pid',
+ self.assertResourceCalled('Execute', 'pgrep -o -f /etc/flume/conf/a1/flume.conf > /var/run/flume/a1.pid',
logoutput = True,
tries = 5,
try_sleep = 10)
@@ -65,8 +64,7 @@ class TestFlumeHandler(RMFTestCase):
self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
classname = "FlumeHandler",
command = "stop",
- config_file="default.json"
- )
+ config_file="default.json")
self.assertTrue(glob_mock.called)
@@ -77,6 +75,34 @@ class TestFlumeHandler(RMFTestCase):
self.assertNoMoreResources()
+ @patch("resource_management.libraries.script.Script.put_structured_out")
+ def test_status_default(self, structured_out_mock):
+ self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+ classname = "FlumeHandler",
+ command = "status",
+ config_file="default.json")
+
+ # test that the method was called with empty processes
+ self.assertTrue(structured_out_mock.called)
+ structured_out_mock.assert_called_with({'processes': []})
+
+ self.assertNoMoreResources()
+
+ @patch("resource_management.libraries.script.Script.put_structured_out")
+ @patch("glob.glob")
+ def test_status_with_result(self, glob_mock, structured_out_mock):
+ glob_mock.return_value = ['/var/run/flume/a1.pid']
+
+ self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+ classname = "FlumeHandler",
+ command = "status",
+ config_file="default.json")
+
+ self.assertTrue(structured_out_mock.called)
+ structured_out_mock.assert_called_with({'processes': [{'status': 'NOT_RUNNING', 'name': 'a1.pid'}]})
+
+ self.assertNoMoreResources()
+
def assert_configure_default(self):
self.assertResourceCalled('Directory', '/etc/flume/conf')
@@ -95,7 +121,6 @@ class TestFlumeHandler(RMFTestCase):
content = Template('log4j.properties.j2', agent_name = 'a1'),
mode = 0644)
-
def buildFlumeTopology(content):
import os
import ConfigParser