You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2014/08/20 17:30:26 UTC

[25/50] [abbrv] git commit: AMBARI-6808 - Ambari Agent DataCleaner should delete log files when a max size in MB is reached (Alejandro Fernandez via jonathanhurley)

AMBARI-6808 - Ambari Agent DataCleaner should delete log files when a max size in MB is reached (Alejandro Fernandez via jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/30f8a87a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/30f8a87a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/30f8a87a

Branch: refs/heads/branch-alerts-dev
Commit: 30f8a87a657993f2b0632289142fd513e7948e75
Parents: 1c5ceb2
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Tue Aug 19 14:21:16 2014 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Tue Aug 19 14:21:16 2014 -0400

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  1 +
 .../main/python/ambari_agent/AmbariConfig.py    |  1 +
 .../src/main/python/ambari_agent/DataCleaner.py | 60 +++++++++++++++++---
 .../test/python/ambari_agent/TestDataCleaner.py | 39 ++++++++-----
 4 files changed, 79 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 9bbef26..162041a 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -24,6 +24,7 @@ tmp_dir=/var/lib/ambari-agent/data/tmp
 loglevel=INFO
 data_cleanup_interval=86400
 data_cleanup_max_age=2592000
+data_cleanup_max_size_MB = 100
 ping_port=8670
 cache_dir=/var/lib/ambari-agent/cache
 tolerate_download_failures=true

http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 4330eb3..4453161 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -35,6 +35,7 @@ prefix=/tmp/ambari-agent
 tmp_dir=/tmp/ambari-agent/tmp
 data_cleanup_interval=86400
 data_cleanup_max_age=2592000
+data_cleanup_max_size_MB = 100
 ping_port=8670
 cache_dir=/var/lib/ambari-agent/cache
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py b/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
index e42caac..0102eef 100644
--- a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
+++ b/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
@@ -28,23 +28,34 @@ import logging
 logger = logging.getLogger()
 
 class DataCleaner(threading.Thread):
-  FILE_NAME_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp'
+  FILE_NAME_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp|structured-out-\d+.json|command-\d+.json'
 
   def __init__(self, config):
     threading.Thread.__init__(self)
     self.daemon = True
     logger.info('Data cleanup thread started')
     self.config = config
-    self.file_max_age = int(config.get('agent','data_cleanup_max_age'))
-    if self.file_max_age < 86400:
+
+    self.file_max_age = config.get('agent','data_cleanup_max_age')
+    self.file_max_age = int(self.file_max_age) if self.file_max_age else None
+    if self.file_max_age is None or self.file_max_age < 86400:       # keep for at least 24h
       logger.warn('The minimum value allowed for data_cleanup_max_age is 1 '
                   'day. Setting data_cleanup_max_age to 86400.')
       self.file_max_age = 86400
-    self.cleanup_interval = int(config.get('agent','data_cleanup_interval'))
-    if self.cleanup_interval < 3600:
+
+    self.cleanup_interval = config.get('agent','data_cleanup_interval')
+    self.cleanup_interval = int(self.cleanup_interval) if self.cleanup_interval else None
+    if self.cleanup_interval is None or self.cleanup_interval < 3600:    # wait at least 1 hour between runs
       logger.warn('The minimum value allowed for data_cleanup_interval is 1 '
                   'hour. Setting data_cleanup_interval to 3600.')
-      self.file_max_age = 3600
+      self.cleanup_interval = 3600
+
+    self.cleanup_max_size_MB = config.get('agent', 'data_cleanup_max_size_MB')
+    self.cleanup_max_size_MB = int(self.cleanup_max_size_MB) if self.cleanup_max_size_MB else None
+    if self.cleanup_max_size_MB is None or self.cleanup_max_size_MB > 10000:  # no more than 10 GBs
+      logger.warn('The maximum value allowed for cleanup_max_size_MB is 10000 MB (10 GB). '
+                        'Setting cleanup_max_size_MB to 10000.')
+      self.cleanup_max_size_MB = 10000
 
     self.data_dir = config.get('agent','prefix')
     self.compiled_pattern = re.compile(self.FILE_NAME_PATTERN)
@@ -54,17 +65,52 @@ class DataCleaner(threading.Thread):
     logger.info('Data cleanup thread killed.')
 
   def cleanup(self):
+    logger.debug("Cleaning up inside directory " + self.data_dir)
+    now = time.time()
+    total_size_bytes = 0
+    file_path_to_timestamp = {}
+    file_path_to_size = {}
+
     for root, dirs, files in os.walk(self.data_dir):
       for f in files:
         file_path = os.path.join(root, f)
         if self.compiled_pattern.match(f):
           try:
-            if time.time() - os.path.getmtime(file_path) > self.file_max_age:
+            file_age = now - os.path.getmtime(file_path)
+            if file_age > self.file_max_age:
               os.remove(os.path.join(file_path))
               logger.debug('Removed file: ' + file_path)
+            else:
+              # Since file wasn't deleted in first pass, consider it for the second one with oldest files first
+              file_size = os.path.getsize(file_path)
+              total_size_bytes += file_size
+              file_path_to_timestamp[file_path] = file_age
+              file_path_to_size[file_path] = file_size
           except Exception:
             logger.error('Error when removing file: ' + file_path)
 
+    target_size_bytes = self.cleanup_max_size_MB * 1000000
+    if len(file_path_to_timestamp) and total_size_bytes > target_size_bytes:
+      logger.info("DataCleaner values need to be more aggressive. Current size in bytes for all log files is %d, "
+                  "and will try to clean to reach %d bytes." % (total_size_bytes, target_size_bytes))
+      # Prune oldest files first
+      count = 0
+      file_path_oldest_first_list = sorted(file_path_to_timestamp, key=file_path_to_timestamp.get, reverse=True)
+      for file_path in file_path_oldest_first_list:
+        try:
+          os.remove(os.path.join(file_path))
+          total_size_bytes -= file_path_to_size[file_path]
+          count += 1
+          if total_size_bytes <= target_size_bytes:
+            # Finally reached below the cap
+            break
+        except Exception:
+          pass
+      else:
+        # Did not reach below cap.
+        logger.warn("DataCleaner deleted an additional %d files, currently log files occupy %d bytes." %
+                    (count, total_size_bytes))
+        pass
 
   def run(self):
     while not self.stopped:

http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py b/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py
index d385697..b64dc44 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py
@@ -28,14 +28,15 @@ class TestDataCleaner(unittest.TestCase):
 
   def setUp(self):
     self.test_dir = [('/test_path', [],
-                      ['errors-12.txt','output-12.txt','site-12.pp','site-13.pp','site-15.pp','version'])]
+                      ['errors-12.txt', 'output-12.txt', 'site-12.pp', 'site-13.pp', 'site-15.pp',
+                       'structured-out-13.json', 'command-13.json', 'version'])]
     self.config = MagicMock()
-    self.config.get.side_effect = [2592000,3600 + 1,"/test_path"]
+    self.config.get.side_effect = [2592000, (3600 + 1), 10000, "/test_path"]
     DataCleaner.logger = MagicMock()
 
   def test_init_success(self):
     config = MagicMock()
-    config.get.return_value = 2592000
+    config.get.side_effect = [2592000, (3600 + 1), 10000, "/test_path"]
     DataCleaner.logger.reset_mock()
     cleaner = DataCleaner.DataCleaner(config)
     self.assertFalse(DataCleaner.logger.warn.called)
@@ -43,45 +44,53 @@ class TestDataCleaner(unittest.TestCase):
 
   def test_init_warn(self):
     config = MagicMock()
-    config.get.return_value = 10
+    config.get.side_effect = [1, (3600 - 1), (10000 + 1), "/test_path"]
     DataCleaner.logger.reset_mock()
     cleaner = DataCleaner.DataCleaner(config)
     self.assertTrue(DataCleaner.logger.warn.called)
-    self.assertTrue(cleaner.file_max_age == 3600)
+    self.assertTrue(cleaner.file_max_age == 86400)
+    self.assertTrue(cleaner.cleanup_interval == 3600)
+    self.assertTrue(cleaner.cleanup_max_size_MB == 10000)
 
   @patch('os.walk')
   @patch('time.time')
   @patch('os.path.getmtime')
   @patch('os.remove')
-  def test_cleanup_success(self,remMock,mtimeMock,timeMock,walkMock):
+  @patch('os.path.getsize')
+  def test_cleanup_success(self, sizeMock, remMock, mtimeMock, timeMock, walkMock):
     self.config.reset_mock()
     DataCleaner.logger.reset_mock()
 
     walkMock.return_value = iter(self.test_dir)
     timeMock.return_value = 2592000 + 2
-    mtimeMock.side_effect = [1,1,1,2,1,1]
+    mtimeMock.side_effect = [1, 1, 1, 2, 1, 1, 1, 1]
+    sizeMock.return_value = 100
 
     cleaner = DataCleaner.DataCleaner(self.config)
     cleaner.cleanup()
 
-    self.assertTrue(len(remMock.call_args_list) == 4)
-    remMock.assert_any_call('/test_path/errors-12.txt');
-    remMock.assert_any_call('/test_path/output-12.txt');
-    remMock.assert_any_call('/test_path/site-12.pp');
-    remMock.assert_any_call('/test_path/site-15.pp');
+    self.assertTrue(len(remMock.call_args_list) == 6)
+    remMock.assert_any_call('/test_path/errors-12.txt')
+    remMock.assert_any_call('/test_path/output-12.txt')
+    remMock.assert_any_call('/test_path/site-12.pp')
+    remMock.assert_any_call('/test_path/site-15.pp')
+    remMock.assert_any_call('/test_path/structured-out-13.json')
+    remMock.assert_any_call('/test_path/command-13.json')
     pass
 
   @patch('os.walk')
   @patch('time.time')
   @patch('os.path.getmtime')
   @patch('os.remove')
-  def test_cleanup_remove_error(self,remMock,mtimeMock,timeMock,walkMock):
+  @patch('os.path.getsize')
+  def test_cleanup_remove_error(self, sizeMock, remMock, mtimeMock, timeMock, walkMock):
     self.config.reset_mock()
     DataCleaner.logger.reset_mock()
 
     walkMock.return_value = iter(self.test_dir)
     timeMock.return_value = 2592000 + 2
-    mtimeMock.side_effect = [1,1,1,2,1,1]
+    mtimeMock.side_effect = [1, 1, 1, 2, 1, 1, 1, 1]
+    sizeMock.return_value = 100
 
     def side_effect(arg):
       if arg == '/test_path/site-15.pp':
@@ -92,7 +101,7 @@ class TestDataCleaner(unittest.TestCase):
     cleaner = DataCleaner.DataCleaner(self.config)
     cleaner.cleanup()
 
-    self.assertTrue(len(remMock.call_args_list) == 4)
+    self.assertTrue(len(remMock.call_args_list) == 6)
     self.assertTrue(DataCleaner.logger.error.call_count == 1)
     pass