You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2014/08/20 17:30:26 UTC
[25/50] [abbrv] git commit: AMBARI-6808 - Ambari Agent DataCleaner
should delete log files when a max size in MB is reached (Alejandro Fernandez
via jonathanhurley)
AMBARI-6808 - Ambari Agent DataCleaner should delete log files when a max size in MB is reached (Alejandro Fernandez via jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/30f8a87a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/30f8a87a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/30f8a87a
Branch: refs/heads/branch-alerts-dev
Commit: 30f8a87a657993f2b0632289142fd513e7948e75
Parents: 1c5ceb2
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Tue Aug 19 14:21:16 2014 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Tue Aug 19 14:21:16 2014 -0400
----------------------------------------------------------------------
ambari-agent/conf/unix/ambari-agent.ini | 1 +
.../main/python/ambari_agent/AmbariConfig.py | 1 +
.../src/main/python/ambari_agent/DataCleaner.py | 60 +++++++++++++++++---
.../test/python/ambari_agent/TestDataCleaner.py | 39 ++++++++-----
4 files changed, 79 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 9bbef26..162041a 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -24,6 +24,7 @@ tmp_dir=/var/lib/ambari-agent/data/tmp
loglevel=INFO
data_cleanup_interval=86400
data_cleanup_max_age=2592000
+data_cleanup_max_size_MB = 100
ping_port=8670
cache_dir=/var/lib/ambari-agent/cache
tolerate_download_failures=true
http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 4330eb3..4453161 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -35,6 +35,7 @@ prefix=/tmp/ambari-agent
tmp_dir=/tmp/ambari-agent/tmp
data_cleanup_interval=86400
data_cleanup_max_age=2592000
+data_cleanup_max_size_MB = 100
ping_port=8670
cache_dir=/var/lib/ambari-agent/cache
http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py b/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
index e42caac..0102eef 100644
--- a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
+++ b/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
@@ -28,23 +28,34 @@ import logging
logger = logging.getLogger()
class DataCleaner(threading.Thread):
- FILE_NAME_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp'
+ FILE_NAME_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp|structured-out-\d+.json|command-\d+.json'
def __init__(self, config):
threading.Thread.__init__(self)
self.daemon = True
logger.info('Data cleanup thread started')
self.config = config
- self.file_max_age = int(config.get('agent','data_cleanup_max_age'))
- if self.file_max_age < 86400:
+
+ self.file_max_age = config.get('agent','data_cleanup_max_age')
+ self.file_max_age = int(self.file_max_age) if self.file_max_age else None
+ if self.file_max_age is None or self.file_max_age < 86400: # keep for at least 24h
logger.warn('The minimum value allowed for data_cleanup_max_age is 1 '
'day. Setting data_cleanup_max_age to 86400.')
self.file_max_age = 86400
- self.cleanup_interval = int(config.get('agent','data_cleanup_interval'))
- if self.cleanup_interval < 3600:
+
+ self.cleanup_interval = config.get('agent','data_cleanup_interval')
+ self.cleanup_interval = int(self.cleanup_interval) if self.cleanup_interval else None
+ if self.cleanup_interval is None or self.cleanup_interval < 3600: # wait at least 1 hour between runs
logger.warn('The minimum value allowed for data_cleanup_interval is 1 '
'hour. Setting data_cleanup_interval to 3600.')
- self.file_max_age = 3600
+ self.cleanup_interval = 3600
+
+ self.cleanup_max_size_MB = config.get('agent', 'data_cleanup_max_size_MB')
+ self.cleanup_max_size_MB = int(self.cleanup_max_size_MB) if self.cleanup_max_size_MB else None
+ if self.cleanup_max_size_MB is None or self.cleanup_max_size_MB > 10000: # no more than 10 GBs
+ logger.warn('The maximum value allowed for cleanup_max_size_MB is 10000 MB (10 GB). '
+ 'Setting cleanup_max_size_MB to 10000.')
+ self.cleanup_max_size_MB = 10000
self.data_dir = config.get('agent','prefix')
self.compiled_pattern = re.compile(self.FILE_NAME_PATTERN)
@@ -54,17 +65,52 @@ class DataCleaner(threading.Thread):
logger.info('Data cleanup thread killed.')
def cleanup(self):
+ logger.debug("Cleaning up inside directory " + self.data_dir)
+ now = time.time()
+ total_size_bytes = 0
+ file_path_to_timestamp = {}
+ file_path_to_size = {}
+
for root, dirs, files in os.walk(self.data_dir):
for f in files:
file_path = os.path.join(root, f)
if self.compiled_pattern.match(f):
try:
- if time.time() - os.path.getmtime(file_path) > self.file_max_age:
+ file_age = now - os.path.getmtime(file_path)
+ if file_age > self.file_max_age:
os.remove(os.path.join(file_path))
logger.debug('Removed file: ' + file_path)
+ else:
+ # Since file wasn't deleted in first pass, consider it for the second one with oldest files first
+ file_size = os.path.getsize(file_path)
+ total_size_bytes += file_size
+ file_path_to_timestamp[file_path] = file_age
+ file_path_to_size[file_path] = file_size
except Exception:
logger.error('Error when removing file: ' + file_path)
+ target_size_bytes = self.cleanup_max_size_MB * 1000000
+ if len(file_path_to_timestamp) and total_size_bytes > target_size_bytes:
+ logger.info("DataCleaner values need to be more aggressive. Current size in bytes for all log files is %d, "
+ "and will try to clean to reach %d bytes." % (total_size_bytes, target_size_bytes))
+ # Prune oldest files first
+ count = 0
+ file_path_oldest_first_list = sorted(file_path_to_timestamp, key=file_path_to_timestamp.get, reverse=True)
+ for file_path in file_path_oldest_first_list:
+ try:
+ os.remove(os.path.join(file_path))
+ total_size_bytes -= file_path_to_size[file_path]
+ count += 1
+ if total_size_bytes <= target_size_bytes:
+ # Finally reached below the cap
+ break
+ except Exception:
+ pass
+ else:
+ # Did not reach below cap.
+ logger.warn("DataCleaner deleted an additional %d files, currently log files occupy %d bytes." %
+ (count, total_size_bytes))
+ pass
def run(self):
while not self.stopped:
http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py b/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py
index d385697..b64dc44 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py
@@ -28,14 +28,15 @@ class TestDataCleaner(unittest.TestCase):
def setUp(self):
self.test_dir = [('/test_path', [],
- ['errors-12.txt','output-12.txt','site-12.pp','site-13.pp','site-15.pp','version'])]
+ ['errors-12.txt', 'output-12.txt', 'site-12.pp', 'site-13.pp', 'site-15.pp',
+ 'structured-out-13.json', 'command-13.json', 'version'])]
self.config = MagicMock()
- self.config.get.side_effect = [2592000,3600 + 1,"/test_path"]
+ self.config.get.side_effect = [2592000, (3600 + 1), 10000, "/test_path"]
DataCleaner.logger = MagicMock()
def test_init_success(self):
config = MagicMock()
- config.get.return_value = 2592000
+ config.get.side_effect = [2592000, (3600 + 1), 10000, "/test_path"]
DataCleaner.logger.reset_mock()
cleaner = DataCleaner.DataCleaner(config)
self.assertFalse(DataCleaner.logger.warn.called)
@@ -43,45 +44,53 @@ class TestDataCleaner(unittest.TestCase):
def test_init_warn(self):
config = MagicMock()
- config.get.return_value = 10
+ config.get.side_effect = [1, (3600 - 1), (10000 + 1), "/test_path"]
DataCleaner.logger.reset_mock()
cleaner = DataCleaner.DataCleaner(config)
self.assertTrue(DataCleaner.logger.warn.called)
- self.assertTrue(cleaner.file_max_age == 3600)
+ self.assertTrue(cleaner.file_max_age == 86400)
+ self.assertTrue(cleaner.cleanup_interval == 3600)
+ self.assertTrue(cleaner.cleanup_max_size_MB == 10000)
@patch('os.walk')
@patch('time.time')
@patch('os.path.getmtime')
@patch('os.remove')
- def test_cleanup_success(self,remMock,mtimeMock,timeMock,walkMock):
+ @patch('os.path.getsize')
+ def test_cleanup_success(self, sizeMock, remMock, mtimeMock, timeMock, walkMock):
self.config.reset_mock()
DataCleaner.logger.reset_mock()
walkMock.return_value = iter(self.test_dir)
timeMock.return_value = 2592000 + 2
- mtimeMock.side_effect = [1,1,1,2,1,1]
+ mtimeMock.side_effect = [1, 1, 1, 2, 1, 1, 1, 1]
+ sizeMock.return_value = 100
cleaner = DataCleaner.DataCleaner(self.config)
cleaner.cleanup()
- self.assertTrue(len(remMock.call_args_list) == 4)
- remMock.assert_any_call('/test_path/errors-12.txt');
- remMock.assert_any_call('/test_path/output-12.txt');
- remMock.assert_any_call('/test_path/site-12.pp');
- remMock.assert_any_call('/test_path/site-15.pp');
+ self.assertTrue(len(remMock.call_args_list) == 6)
+ remMock.assert_any_call('/test_path/errors-12.txt')
+ remMock.assert_any_call('/test_path/output-12.txt')
+ remMock.assert_any_call('/test_path/site-12.pp')
+ remMock.assert_any_call('/test_path/site-15.pp')
+ remMock.assert_any_call('/test_path/structured-out-13.json')
+ remMock.assert_any_call('/test_path/command-13.json')
pass
@patch('os.walk')
@patch('time.time')
@patch('os.path.getmtime')
@patch('os.remove')
- def test_cleanup_remove_error(self,remMock,mtimeMock,timeMock,walkMock):
+ @patch('os.path.getsize')
+ def test_cleanup_remove_error(self, sizeMock, remMock, mtimeMock, timeMock, walkMock):
self.config.reset_mock()
DataCleaner.logger.reset_mock()
walkMock.return_value = iter(self.test_dir)
timeMock.return_value = 2592000 + 2
- mtimeMock.side_effect = [1,1,1,2,1,1]
+ mtimeMock.side_effect = [1, 1, 1, 2, 1, 1, 1, 1]
+ sizeMock.return_value = 100
def side_effect(arg):
if arg == '/test_path/site-15.pp':
@@ -92,7 +101,7 @@ class TestDataCleaner(unittest.TestCase):
cleaner = DataCleaner.DataCleaner(self.config)
cleaner.cleanup()
- self.assertTrue(len(remMock.call_args_list) == 4)
+ self.assertTrue(len(remMock.call_args_list) == 6)
self.assertTrue(DataCleaner.logger.error.call_count == 1)
pass