You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2014/08/15 15:16:31 UTC
[05/12] git commit: AMBARI-6864. Flume: with no agents defined,
state should be determined by overall desired state (ncole)
AMBARI-6864. Flume: with no agents defined, state should be determined by overall desired state (ncole)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/60db8871
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/60db8871
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/60db8871
Branch: refs/heads/branch-alerts-dev
Commit: 60db8871554f210ba7a2409cdc80eac7eac4363b
Parents: acb133f
Author: Nate Cole <nc...@hortonworks.com>
Authored: Thu Aug 14 12:29:04 2014 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Fri Aug 15 09:15:57 2014 -0400
----------------------------------------------------------------------
.../services/FLUME/package/scripts/flume.py | 46 ++++++++++++++------
.../FLUME/package/scripts/flume_handler.py | 5 ++-
.../python/stacks/2.0.6/FLUME/test_flume.py | 24 ++++++----
3 files changed, 53 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/60db8871/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
index 4016c79..6109d3e 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py
@@ -58,6 +58,10 @@ def flume(action = None):
mode = 0644)
elif action == 'start':
+ # desired state for service should be STARTED
+ if len(params.flume_command_targets) == 0:
+ _set_desired_state('STARTED')
+
flume_base = format('su -s /bin/bash {flume_user} -c "export JAVA_HOME={java_home}; '
'/usr/bin/flume-ng agent --name {{0}} --conf {{1}} --conf-file {{2}} {{3}}"')
@@ -83,10 +87,14 @@ def flume(action = None):
# sometimes startup spawns a couple of threads - so only the first line may count
pid_cmd = format('pgrep -o -u {flume_user} -f ^{java_home}.*{agent}.* > {flume_agent_pid_file}')
- Execute(pid_cmd, logoutput=True, tries=10, try_sleep=1)
+ Execute(pid_cmd, logoutput=True, tries=10, try_sleep=6)
pass
elif action == 'stop':
+ # desired state for service should be INSTALLED
+ if len(params.flume_command_targets) == 0:
+ _set_desired_state('INSTALLED')
+
pid_files = glob.glob(params.flume_run_dir + os.sep + "*.pid")
if 0 == len(pid_files):
@@ -94,18 +102,13 @@ def flume(action = None):
agent_names = cmd_target_names()
- if len(agent_names) > 0:
- for agent in agent_names:
- pid_file = params.flume_run_dir + os.sep + agent + '.pid'
- pid = format('`cat {pid_file}` > /dev/null 2>&1')
- Execute(format('kill {pid}'), ignore_failures=True)
- File(pid_file, action = 'delete')
- else:
- for pid_file in pid_files:
- pid = format('`cat {pid_file}` > /dev/null 2>&1')
- Execute(format('kill {pid}'), ignore_failures=True)
- File(pid_file, action = 'delete')
-
+
+ for agent in agent_names:
+ pid_file = params.flume_run_dir + os.sep + agent + '.pid'
+ pid = format('`cat {pid_file}` > /dev/null 2>&1')
+ Execute(format('kill {pid}'), ignore_failures=True)
+ File(pid_file, action = 'delete')
+
def ambari_meta(agent_name, agent_conf):
res = {}
@@ -228,3 +231,20 @@ def cmd_target_names():
else:
return find_expected_agent_names()
+def _set_desired_state(state):
+ import params
+ try:
+ with open(os.path.join(params.flume_run_dir, 'ambari-state.txt'), 'w') as fp:
+ fp.write(state)
+ except:
+ pass
+
+def get_desired_state():
+ import params
+
+ try:
+ with open(os.path.join(params.flume_run_dir, 'ambari-state.txt'), 'r') as fp:
+ return fp.read()
+ except:
+ return 'INSTALLED'
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/60db8871/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
index 99c29cb..42ac560 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
@@ -21,6 +21,7 @@ from resource_management import *
from flume import flume
from flume import flume_status
from flume import find_expected_agent_names
+from flume import get_desired_state
class FlumeHandler(Script):
def install(self, env):
@@ -108,11 +109,13 @@ class FlumeHandler(Script):
# only throw an exception if there are agents defined and there is a
# problem with the processes; if there are no agents defined, then
- # the service should report STARTED (green) instead of INSTALLED (red)
+ # the service should report STARTED (green) ONLY if the desired state is started. otherwise, INSTALLED (red)
if len(expected_agents) > 0:
for proc in processes:
if not proc.has_key('status') or proc['status'] == 'NOT_RUNNING':
raise ComponentIsNotRunning()
+ elif len(expected_agents) == 0 and 'INSTALLED' == get_desired_state():
+ raise ComponentIsNotRunning()
if __name__ == "__main__":
FlumeHandler().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/60db8871/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
index e4fd61d..13f3049 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
@@ -36,19 +36,22 @@ class TestFlumeHandler(RMFTestCase):
@patch("os.path.isfile")
@patch("flume.cmd_target_names")
- def test_start_default(self, cmd_target_names_mock, os_path_isfile_mock):
+ @patch("flume._set_desired_state")
+ def test_start_default(self, set_desired_mock, cmd_target_names_mock, os_path_isfile_mock):
# 1st call is to check if the conf file is there - that should be True
# 2nd call is to check if the process is live - that should be False
os_path_isfile_mock.side_effect = [True, False]
cmd_target_names_mock.return_value = ["a1"]
self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
- classname = "FlumeHandler",
- command = "start",
+ classname = "FlumeHandler", command = "start",
config_file="default.json")
self.assert_configure_default()
+ self.assertTrue(set_desired_mock.called)
+ self.assertTrue(set_desired_mock.call_args[0][0] == 'STARTED')
+
self.assertResourceCalled('Execute', format('su -s /bin/bash flume -c "export JAVA_HOME=/usr/jdk64/jdk1.7.0_45; /usr/bin/flume-ng agent '
'--name a1 '
'--conf /etc/flume/conf/a1 '
@@ -60,12 +63,13 @@ class TestFlumeHandler(RMFTestCase):
self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*a1.* > /var/run/flume/a1.pid',
logoutput = True,
tries = 10,
- try_sleep = 1)
+ try_sleep = 6)
self.assertNoMoreResources()
@patch("glob.glob")
- def test_stop_default(self, glob_mock):
+ @patch("flume._set_desired_state")
+ def test_stop_default(self, set_desired_mock, glob_mock):
glob_mock.side_effect = [['/var/run/flume/a1/pid'], ['/etc/flume/conf/a1/ambari-meta.json']]
self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
@@ -75,6 +79,9 @@ class TestFlumeHandler(RMFTestCase):
self.assertTrue(glob_mock.called)
+ self.assertTrue(set_desired_mock.called)
+ self.assertTrue(set_desired_mock.call_args[0][0] == 'INSTALLED')
+
self.assertResourceCalled('Execute', 'kill `cat /var/run/flume/a1.pid` > /dev/null 2>&1',
ignore_failures = True)
@@ -137,7 +144,8 @@ class TestFlumeHandler(RMFTestCase):
command = "status",
config_file="default.json")
except:
- self.fail("Exception not expected when no agents are defined")
+ # expected since ComponentIsNotRunning gets raised
+ pass
self.assertTrue(structured_out_mock.called)
@@ -230,7 +238,7 @@ class TestFlumeHandler(RMFTestCase):
self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*b1.* > /var/run/flume/b1.pid',
logoutput = True,
tries = 10,
- try_sleep = 1)
+ try_sleep = 6)
self.assertNoMoreResources()
@@ -258,7 +266,7 @@ class TestFlumeHandler(RMFTestCase):
self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*b1.* > /var/run/flume/b1.pid',
logoutput = True,
tries = 10,
- try_sleep = 1)
+ try_sleep = 6)
self.assertNoMoreResources()