You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2015/04/22 16:26:18 UTC

ambari git commit: AMBARI-10657. Ambari restart/stop operation loses control of Flume agents (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/trunk f23f12618 -> 6ea66b63e


AMBARI-10657. Ambari restart/stop operation loses control of Flume agents (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6ea66b63
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6ea66b63
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6ea66b63

Branch: refs/heads/trunk
Commit: 6ea66b63ebae3b4fa71a6741893fd14ee7d592db
Parents: f23f126
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Wed Apr 22 17:26:10 2015 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Wed Apr 22 17:26:10 2015 +0300

----------------------------------------------------------------------
 .../libraries/functions/flume_agent_helper.py    | 19 ++++++++++++++++++-
 .../FLUME/1.4.0.2.0/package/scripts/flume.py     |  3 +++
 .../test/python/stacks/2.0.6/FLUME/test_flume.py | 10 ++++++++--
 3 files changed, 29 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6ea66b63/ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py b/ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py
index 4070006..94f96ca 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py
@@ -21,6 +21,7 @@ limitations under the License.
 import json
 import glob
 import os
+import time
 
 from resource_management.core.exceptions import ComponentIsNotRunning
 from resource_management.libraries.functions import check_process_status
@@ -113,4 +114,20 @@ def get_live_status(pid_file, flume_conf_directory):
   except:
     pass
 
-  return res
\ No newline at end of file
+  return res
+
+
+def await_flume_process_termination(pid_file, try_count=20, retry_delay=2):
+  """
+  Waits while the flume agent represented by the specified file is being stopped.
+  :param pid_file: the PID file of the agent to check
+  :param try_count: the count of checks
+  :param retry_delay: time between checks in seconds
+  :return: True if the agent was stopped, False otherwise
+  """
+  for i in range(0, try_count):
+    if not is_flume_process_live(pid_file):
+      return True
+    else:
+      time.sleep(retry_delay)
+  return not is_flume_process_live(pid_file)

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ea66b63/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
index ee1ed00..11cae75 100644
--- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
@@ -23,6 +23,7 @@ import os
 from resource_management import *
 from resource_management.libraries.functions.flume_agent_helper import is_flume_process_live
 from resource_management.libraries.functions.flume_agent_helper import find_expected_agent_names
+from resource_management.libraries.functions.flume_agent_helper import await_flume_process_termination
 from ambari_commons import OSConst
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
 
@@ -178,6 +179,8 @@ def flume(action = None):
       pid_file = params.flume_run_dir + os.sep + agent + '.pid'
       pid = format('`cat {pid_file}` > /dev/null 2>&1')
       Execute(format('kill {pid}'), ignore_failures=True)
+      if not await_flume_process_termination(pid_file):
+        raise Fail("Can't stop flume agent: {0}".format(agent))
       File(pid_file, action = 'delete')
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ea66b63/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
index 77494af..2317cf7 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
@@ -74,8 +74,10 @@ class TestFlumeHandler(RMFTestCase):
 
   @patch("glob.glob")
   @patch("flume._set_desired_state")
-  def test_stop_default(self, set_desired_mock, glob_mock):
+  @patch("flume.await_flume_process_termination")
+  def test_stop_default(self, await_flume_process_termination_mock, set_desired_mock, glob_mock):
     glob_mock.side_effect = [['/var/run/flume/a1/pid'], ['/etc/flume/conf/a1/ambari-meta.json']]
+    await_flume_process_termination_mock.return_value = True
 
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py",
                        classname = "FlumeHandler",
@@ -85,6 +87,7 @@ class TestFlumeHandler(RMFTestCase):
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
 
     self.assertTrue(glob_mock.called)
+    await_flume_process_termination_mock.assert_called_with('/var/run/flume/a1.pid')
 
     self.assertTrue(set_desired_mock.called)
     self.assertTrue(set_desired_mock.call_args[0][0] == 'INSTALLED')
@@ -311,8 +314,10 @@ class TestFlumeHandler(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch("glob.glob")
-  def test_stop_single(self, glob_mock):
+  @patch("flume.await_flume_process_termination")
+  def test_stop_single(self, await_flume_process_termination_mock, glob_mock):
     glob_mock.return_value = ['/var/run/flume/b1.pid']
+    await_flume_process_termination_mock.return_value = True
 
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py",
                        classname = "FlumeHandler",
@@ -322,6 +327,7 @@ class TestFlumeHandler(RMFTestCase):
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
 
     self.assertTrue(glob_mock.called)
+    await_flume_process_termination_mock.assert_called_with('/var/run/flume/b1.pid')
 
     self.assertResourceCalled('Execute', 'kill `cat /var/run/flume/b1.pid` > /dev/null 2>&1',
       ignore_failures = True)