You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2016/06/06 16:47:07 UTC
[1/2] ambari git commit: AMBARI-17030. Agents should automatically
restart if there is a memory leak (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-2.4 400537b9c -> 7869e4df8
refs/heads/trunk 4937b6ddd -> 11543ed4f
AMBARI-17030. Agents should automatically restart if there is a memory leak (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/11543ed4
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/11543ed4
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/11543ed4
Branch: refs/heads/trunk
Commit: 11543ed4f7ad03bbbbae579183c1547a6ed828fa
Parents: 4937b6d
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon Jun 6 19:46:55 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon Jun 6 19:46:55 2016 +0300
----------------------------------------------------------------------
ambari-agent/conf/unix/ambari-agent.ini | 2 ++
.../src/main/python/ambari_agent/Controller.py | 15 +++++++++++++++
.../src/test/python/ambari_agent/TestController.py | 10 ++++++----
.../src/main/python/ambari_commons/os_utils.py | 6 ++++++
4 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/11543ed4/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index aacbb8a..8f2ab1b 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -34,6 +34,8 @@ parallel_execution=0
alert_grace_period=5
alert_kinit_timeout=14400000
system_resource_overrides=/etc/resource_overrides
+; memory_threshold_soft_mb=400
+; memory_threshold_hard_mb=1000
[security]
keysdir=/var/lib/ambari-agent/keys
http://git-wip-us.apache.org/repos/asf/ambari/blob/11543ed4/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 91bc586..e981a76 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -47,11 +47,14 @@ from ambari_agent.RecoveryManager import RecoveryManager
from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
from ambari_agent.ExitHelper import ExitHelper
from resource_management.libraries.functions.version import compare_versions
+from ambari_commons.os_utils import get_used_ram
logger = logging.getLogger(__name__)
AGENT_AUTO_RESTART_EXIT_CODE = 77
+AGENT_RAM_OVERUSE_MESSAGE = "Ambari-agent RAM usage {used_ram} MB went above {config_name}={max_ram} MB. Restarting ambari-agent to clean the RAM."
+
class Controller(threading.Thread):
def __init__(self, config, server_hostname, heartbeat_stop_callback = None, range=30):
@@ -90,6 +93,9 @@ class Controller(threading.Thread):
if cache_dir is None:
cache_dir = '/var/lib/ambari-agent/cache'
+ self.max_ram_soft = int(config.get('agent','memory_threshold_soft_mb', default=0))
+ self.max_ram_hard = int(config.get('agent','memory_threshold_hard_mb', default=0))
+
stacks_cache_dir = os.path.join(cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
common_services_cache_dir = os.path.join(cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
@@ -279,6 +285,15 @@ class Controller(threading.Thread):
self.repeatRegistration = True
return
+ used_ram = get_used_ram()/1000
+ # dealing with a possible memory leaks
+ if self.max_ram_soft and used_ram >= self.max_ram_soft and not self.actionQueue.tasks_in_progress_or_pending():
+ logger.error(AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_soft_mb", max_ram=self.max_ram_soft))
+ self.restartAgent()
+ if self.max_ram_hard and used_ram >= self.max_ram_hard:
+ logger.error(AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_hard_mb", max_ram=self.max_ram_hard))
+ self.restartAgent()
+
if serverId != self.responseId + 1:
logger.error("Error in responseId sequence - restarting")
self.restartAgent()
http://git-wip-us.apache.org/repos/asf/ambari/blob/11543ed4/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 5604769..59b6276 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -412,10 +412,11 @@ class TestController(unittest.TestCase):
exceptionMessage, str(e))
+ @patch.object(ExitHelper, "exit")
@patch.object(threading._Event, "wait")
@patch("time.sleep")
@patch("ambari_simplejson.dumps")
- def test_heartbeatWithServer(self, dumpsMock, sleepMock, event_mock):
+ def test_heartbeatWithServer(self, dumpsMock, sleepMock, event_mock, exit_mock):
out = StringIO.StringIO()
sys.stdout = out
@@ -509,7 +510,7 @@ class TestController(unittest.TestCase):
self.controller.DEBUG_STOP_HEARTBEATING = False
self.controller.heartbeatWithServer()
- restartAgent.assert_called_once_with()
+ restartAgent.assert_called_with()
# executionCommands
self.controller.responseId = 1
@@ -539,7 +540,7 @@ class TestController(unittest.TestCase):
self.controller.restartAgent = restartAgent
self.controller.heartbeatWithServer()
- restartAgent.assert_called_once_with()
+ restartAgent.assert_called_with()
# actionQueue not idle
self.controller.responseId = 1
@@ -675,10 +676,11 @@ class TestController(unittest.TestCase):
self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
pass
+ @patch.object(ExitHelper, "exit")
@patch.object(threading._Event, "wait")
@patch("time.sleep")
@patch("ambari_simplejson.dumps")
- def test_recoveryHbCmd(self, dumpsMock, sleepMock, event_mock):
+ def test_recoveryHbCmd(self, dumpsMock, sleepMock, event_mock, exit_mock):
out = StringIO.StringIO()
sys.stdout = out
http://git-wip-us.apache.org/repos/asf/ambari/blob/11543ed4/ambari-common/src/main/python/ambari_commons/os_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/os_utils.py b/ambari-common/src/main/python/ambari_commons/os_utils.py
index 4a636d9..620bd0e 100644
--- a/ambari-common/src/main/python/ambari_commons/os_utils.py
+++ b/ambari-common/src/main/python/ambari_commons/os_utils.py
@@ -22,6 +22,7 @@ import re
import os
import shutil
import string
+import resource
from ambari_commons import OSCheck
from string import Template
@@ -42,6 +43,11 @@ else:
from ambari_commons.exceptions import FatalException
from ambari_commons.logging_utils import print_info_msg, print_warning_msg
+def get_used_ram():
+ """
+ Returns resident RAM used by current process in kilobytes
+ """
+ return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
def is_valid_filepath(filepath):
if not filepath or not os.path.exists(filepath) or os.path.isdir(filepath):
[2/2] ambari git commit: AMBARI-17030. Agents should automatically
restart if there is a memory leak (aonishuk)
Posted by ao...@apache.org.
AMBARI-17030. Agents should automatically restart if there is a memory leak (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7869e4df
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7869e4df
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7869e4df
Branch: refs/heads/branch-2.4
Commit: 7869e4df8e803794cd9363c6f3cee7ce37da9663
Parents: 400537b
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon Jun 6 19:46:58 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon Jun 6 19:46:58 2016 +0300
----------------------------------------------------------------------
ambari-agent/conf/unix/ambari-agent.ini | 2 ++
.../src/main/python/ambari_agent/Controller.py | 15 +++++++++++++++
.../src/test/python/ambari_agent/TestController.py | 10 ++++++----
.../src/main/python/ambari_commons/os_utils.py | 6 ++++++
4 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/7869e4df/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index aacbb8a..8f2ab1b 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -34,6 +34,8 @@ parallel_execution=0
alert_grace_period=5
alert_kinit_timeout=14400000
system_resource_overrides=/etc/resource_overrides
+; memory_threshold_soft_mb=400
+; memory_threshold_hard_mb=1000
[security]
keysdir=/var/lib/ambari-agent/keys
http://git-wip-us.apache.org/repos/asf/ambari/blob/7869e4df/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 91bc586..e981a76 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -47,11 +47,14 @@ from ambari_agent.RecoveryManager import RecoveryManager
from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
from ambari_agent.ExitHelper import ExitHelper
from resource_management.libraries.functions.version import compare_versions
+from ambari_commons.os_utils import get_used_ram
logger = logging.getLogger(__name__)
AGENT_AUTO_RESTART_EXIT_CODE = 77
+AGENT_RAM_OVERUSE_MESSAGE = "Ambari-agent RAM usage {used_ram} MB went above {config_name}={max_ram} MB. Restarting ambari-agent to clean the RAM."
+
class Controller(threading.Thread):
def __init__(self, config, server_hostname, heartbeat_stop_callback = None, range=30):
@@ -90,6 +93,9 @@ class Controller(threading.Thread):
if cache_dir is None:
cache_dir = '/var/lib/ambari-agent/cache'
+ self.max_ram_soft = int(config.get('agent','memory_threshold_soft_mb', default=0))
+ self.max_ram_hard = int(config.get('agent','memory_threshold_hard_mb', default=0))
+
stacks_cache_dir = os.path.join(cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
common_services_cache_dir = os.path.join(cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
@@ -279,6 +285,15 @@ class Controller(threading.Thread):
self.repeatRegistration = True
return
+ used_ram = get_used_ram()/1000
+ # dealing with a possible memory leaks
+ if self.max_ram_soft and used_ram >= self.max_ram_soft and not self.actionQueue.tasks_in_progress_or_pending():
+ logger.error(AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_soft_mb", max_ram=self.max_ram_soft))
+ self.restartAgent()
+ if self.max_ram_hard and used_ram >= self.max_ram_hard:
+ logger.error(AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_hard_mb", max_ram=self.max_ram_hard))
+ self.restartAgent()
+
if serverId != self.responseId + 1:
logger.error("Error in responseId sequence - restarting")
self.restartAgent()
http://git-wip-us.apache.org/repos/asf/ambari/blob/7869e4df/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 5604769..59b6276 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -412,10 +412,11 @@ class TestController(unittest.TestCase):
exceptionMessage, str(e))
+ @patch.object(ExitHelper, "exit")
@patch.object(threading._Event, "wait")
@patch("time.sleep")
@patch("ambari_simplejson.dumps")
- def test_heartbeatWithServer(self, dumpsMock, sleepMock, event_mock):
+ def test_heartbeatWithServer(self, dumpsMock, sleepMock, event_mock, exit_mock):
out = StringIO.StringIO()
sys.stdout = out
@@ -509,7 +510,7 @@ class TestController(unittest.TestCase):
self.controller.DEBUG_STOP_HEARTBEATING = False
self.controller.heartbeatWithServer()
- restartAgent.assert_called_once_with()
+ restartAgent.assert_called_with()
# executionCommands
self.controller.responseId = 1
@@ -539,7 +540,7 @@ class TestController(unittest.TestCase):
self.controller.restartAgent = restartAgent
self.controller.heartbeatWithServer()
- restartAgent.assert_called_once_with()
+ restartAgent.assert_called_with()
# actionQueue not idle
self.controller.responseId = 1
@@ -675,10 +676,11 @@ class TestController(unittest.TestCase):
self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
pass
+ @patch.object(ExitHelper, "exit")
@patch.object(threading._Event, "wait")
@patch("time.sleep")
@patch("ambari_simplejson.dumps")
- def test_recoveryHbCmd(self, dumpsMock, sleepMock, event_mock):
+ def test_recoveryHbCmd(self, dumpsMock, sleepMock, event_mock, exit_mock):
out = StringIO.StringIO()
sys.stdout = out
http://git-wip-us.apache.org/repos/asf/ambari/blob/7869e4df/ambari-common/src/main/python/ambari_commons/os_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/os_utils.py b/ambari-common/src/main/python/ambari_commons/os_utils.py
index 4a636d9..620bd0e 100644
--- a/ambari-common/src/main/python/ambari_commons/os_utils.py
+++ b/ambari-common/src/main/python/ambari_commons/os_utils.py
@@ -22,6 +22,7 @@ import re
import os
import shutil
import string
+import resource
from ambari_commons import OSCheck
from string import Template
@@ -42,6 +43,11 @@ else:
from ambari_commons.exceptions import FatalException
from ambari_commons.logging_utils import print_info_msg, print_warning_msg
+def get_used_ram():
+ """
+ Returns resident RAM used by current process in kilobytes
+ """
+ return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
def is_valid_filepath(filepath):
if not filepath or not os.path.exists(filepath) or os.path.isdir(filepath):