You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2015/04/22 16:26:18 UTC
ambari git commit: AMBARI-10657. Ambari restart/stop operation loses
control of Flume agents (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/trunk f23f12618 -> 6ea66b63e
AMBARI-10657. Ambari restart/stop operation loses control of Flume agents (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6ea66b63
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6ea66b63
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6ea66b63
Branch: refs/heads/trunk
Commit: 6ea66b63ebae3b4fa71a6741893fd14ee7d592db
Parents: f23f126
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Wed Apr 22 17:26:10 2015 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Wed Apr 22 17:26:10 2015 +0300
----------------------------------------------------------------------
.../libraries/functions/flume_agent_helper.py | 19 ++++++++++++++++++-
.../FLUME/1.4.0.2.0/package/scripts/flume.py | 3 +++
.../test/python/stacks/2.0.6/FLUME/test_flume.py | 10 ++++++++--
3 files changed, 29 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ea66b63/ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py b/ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py
index 4070006..94f96ca 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py
@@ -21,6 +21,7 @@ limitations under the License.
import json
import glob
import os
+import time
from resource_management.core.exceptions import ComponentIsNotRunning
from resource_management.libraries.functions import check_process_status
@@ -113,4 +114,20 @@ def get_live_status(pid_file, flume_conf_directory):
except:
pass
- return res
\ No newline at end of file
+ return res
+
+
+def await_flume_process_termination(pid_file, try_count=20, retry_delay=2):
+ """
+ Waits while the flume agent represented by the specified file is being stopped.
+ :param pid_file: the PID file of the agent to check
+ :param try_count: the count of checks
+ :param retry_delay: time between checks in seconds
+ :return: True if the agent was stopped, False otherwise
+ """
+ for i in range(0, try_count):
+ if not is_flume_process_live(pid_file):
+ return True
+ else:
+ time.sleep(retry_delay)
+ return not is_flume_process_live(pid_file)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ea66b63/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
index ee1ed00..11cae75 100644
--- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
@@ -23,6 +23,7 @@ import os
from resource_management import *
from resource_management.libraries.functions.flume_agent_helper import is_flume_process_live
from resource_management.libraries.functions.flume_agent_helper import find_expected_agent_names
+from resource_management.libraries.functions.flume_agent_helper import await_flume_process_termination
from ambari_commons import OSConst
from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
@@ -178,6 +179,8 @@ def flume(action = None):
pid_file = params.flume_run_dir + os.sep + agent + '.pid'
pid = format('`cat {pid_file}` > /dev/null 2>&1')
Execute(format('kill {pid}'), ignore_failures=True)
+ if not await_flume_process_termination(pid_file):
+ raise Fail("Can't stop flume agent: {0}".format(agent))
File(pid_file, action = 'delete')
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ea66b63/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
index 77494af..2317cf7 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
@@ -74,8 +74,10 @@ class TestFlumeHandler(RMFTestCase):
@patch("glob.glob")
@patch("flume._set_desired_state")
- def test_stop_default(self, set_desired_mock, glob_mock):
+ @patch("flume.await_flume_process_termination")
+ def test_stop_default(self, await_flume_process_termination_mock, set_desired_mock, glob_mock):
glob_mock.side_effect = [['/var/run/flume/a1/pid'], ['/etc/flume/conf/a1/ambari-meta.json']]
+ await_flume_process_termination_mock.return_value = True
self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py",
classname = "FlumeHandler",
@@ -85,6 +87,7 @@ class TestFlumeHandler(RMFTestCase):
target = RMFTestCase.TARGET_COMMON_SERVICES)
self.assertTrue(glob_mock.called)
+ await_flume_process_termination_mock.assert_called_with('/var/run/flume/a1.pid')
self.assertTrue(set_desired_mock.called)
self.assertTrue(set_desired_mock.call_args[0][0] == 'INSTALLED')
@@ -311,8 +314,10 @@ class TestFlumeHandler(RMFTestCase):
self.assertNoMoreResources()
@patch("glob.glob")
- def test_stop_single(self, glob_mock):
+ @patch("flume.await_flume_process_termination")
+ def test_stop_single(self, await_flume_process_termination_mock, glob_mock):
glob_mock.return_value = ['/var/run/flume/b1.pid']
+ await_flume_process_termination_mock.return_value = True
self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py",
classname = "FlumeHandler",
@@ -322,6 +327,7 @@ class TestFlumeHandler(RMFTestCase):
target = RMFTestCase.TARGET_COMMON_SERVICES)
self.assertTrue(glob_mock.called)
+ await_flume_process_termination_mock.assert_called_with('/var/run/flume/b1.pid')
self.assertResourceCalled('Execute', 'kill `cat /var/run/flume/b1.pid` > /dev/null 2>&1',
ignore_failures = True)