You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/20 13:16:04 UTC

[GitHub] aoen closed pull request #3747: [AIRFLOW-2895] Prevent scheduler from spamming heartbeats/logs

aoen closed pull request #3747: [AIRFLOW-2895] Prevent scheduler from spamming heartbeats/logs
URL: https://github.com/apache/incubator-airflow/pull/3747
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/README.md b/README.md
index 0f5c71446c..b68ae286d7 100644
--- a/README.md
+++ b/README.md
@@ -246,6 +246,7 @@ Currently **officially** using Airflow:
 1. [Tile](https://tile.com/) [[@ranjanmanish](https://github.com/ranjanmanish)]
 1. [Tokopedia](https://www.tokopedia.com/) [@topedmaria](https://github.com/topedmaria)
 1. [Twine Labs](https://www.twinelabs.com/) [[@ivorpeles](https://github.com/ivorpeles)]
+1. [Twitter](https://www.twitter.com/) [[@aoen](https://github.com/aoen)]
 1. [T2 Systems](http://t2systems.com) [[@unclaimedpants](https://github.com/unclaimedpants)]
 1. [Ubisoft](https://www.ubisoft.com/) [[@Walkoss](https://github.com/Walkoss)]
 1. [United Airlines](https://www.united.com/) [[@ilopezfr](https://github.com/ilopezfr)]
diff --git a/UPDATING.md b/UPDATING.md
index af10729085..78b8327f05 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -12,6 +12,11 @@ so you might need to update your config.
 
 `task_runner = StandardTaskRunner`
 
+### min_file_parsing_loop_time config option temporarily disabled
+
+The scheduler.min_file_parsing_loop_time config option has been temporarily removed due to
+some bugs.
+
 ## Airflow 1.10
 
 Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` in your environment or
@@ -428,7 +433,7 @@ indefinitely. This is only available on the command line.
 After how much time should an updated DAG be picked up from the filesystem.
 
 #### min_file_parsing_loop_time
-
+CURRENTLY DISABLED DUE TO A BUG
 How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
 
 #### dag_dir_list_interval
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 76c66c90f6..18c486cb1e 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -435,9 +435,6 @@ run_duration = -1
 # after how much time (seconds) a new DAGs should be picked up from the filesystem
 min_file_process_interval = 0
 
-# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
-min_file_parsing_loop_time = 1
-
 # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
 dag_dir_list_interval = 300
 
diff --git a/airflow/jobs.py b/airflow/jobs.py
index d51e7c2537..75d6d3951c 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -533,8 +533,7 @@ def __init__(
             num_runs=-1,
             file_process_interval=conf.getint('scheduler',
                                               'min_file_process_interval'),
-            min_file_parsing_loop_time=conf.getint('scheduler',
-                                                   'min_file_parsing_loop_time'),
+            processor_poll_interval=1.0,
             run_duration=None,
             do_pickle=False,
             log=None,
@@ -549,6 +548,8 @@ def __init__(
         :type subdir: unicode
         :param num_runs: The number of times to try to schedule each DAG file.
         -1 for unlimited within the run_duration.
+        :param processor_poll_interval: The number of seconds to wait between
+        polls of running processors
         :param run_duration: how long to run (in seconds) before exiting
         :type run_duration: int
         :param do_pickle: once a DAG object is obtained by executing the Python
@@ -565,6 +566,7 @@ def __init__(
 
         self.num_runs = num_runs
         self.run_duration = run_duration
+        self._processor_poll_interval = processor_poll_interval
 
         self.do_pickle = do_pickle
         super(SchedulerJob, self).__init__(*args, **kwargs)
@@ -592,10 +594,7 @@ def __init__(
 
         self.file_process_interval = file_process_interval
 
-        # Wait until at least this many seconds have passed before parsing files once all
-        # files have finished parsing.
-        self.min_file_parsing_loop_time = min_file_parsing_loop_time
-
+        self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         if run_duration is None:
             self.run_duration = conf.getint('scheduler',
                                             'run_duration')
@@ -1557,16 +1556,18 @@ def _execute(self):
         # DAGs in parallel. By processing them in separate processes,
         # we can get parallelism and isolation from potentially harmful
         # user code.
-        self.log.info("Processing files using up to %s processes at a time",
-                      self.max_threads)
+        self.log.info(
+            "Processing files using up to %s processes at a time",
+            self.max_threads)
         self.log.info("Running execute loop for %s seconds", self.run_duration)
         self.log.info("Processing each file at most %s times", self.num_runs)
-        self.log.info("Process each file at most once every %s seconds",
-                      self.file_process_interval)
-        self.log.info("Wait until at least %s seconds have passed between file parsing "
-                      "loops", self.min_file_parsing_loop_time)
-        self.log.info("Checking for new files in %s every %s seconds",
-                      self.subdir, self.dag_dir_list_interval)
+        self.log.info(
+            "Process each file at most once every %s seconds",
+            self.file_process_interval)
+        self.log.info(
+            "Checking for new files in %s every %s seconds",
+            self.subdir,
+            self.dag_dir_list_interval)
 
         # Build up a list of Python files that could contain DAGs
         self.log.info("Searching for files in %s", self.subdir)
@@ -1582,7 +1583,6 @@ def processor_factory(file_path):
                                                     known_file_paths,
                                                     self.max_threads,
                                                     self.file_process_interval,
-                                                    self.min_file_parsing_loop_time,
                                                     self.num_runs,
                                                     processor_factory)
 
@@ -1734,13 +1734,17 @@ def _execute_helper(self, processor_manager):
                 last_stat_print_time = timezone.utcnow()
 
             loop_end_time = time.time()
-            self.log.debug("Ran scheduling loop in %.2f seconds",
-                           loop_end_time - loop_start_time)
+            self.log.debug(
+                "Ran scheduling loop in %.2f seconds",
+                loop_end_time - loop_start_time)
+            self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
+            time.sleep(self._processor_poll_interval)
 
             # Exit early for a test mode
             if processor_manager.max_runs_reached():
-                self.log.info("Exiting loop as all files have been processed %s times",
-                              self.num_runs)
+                self.log.info(
+                    "Exiting loop as all files have been processed %s times",
+                    self.num_runs)
                 break
 
         # Stop any processors
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 3796f3e950..89e5701cf1 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -326,7 +326,6 @@ def __init__(self,
                  file_paths,
                  parallelism,
                  process_file_interval,
-                 min_file_parsing_loop_time,
                  max_runs,
                  processor_factory):
         """
@@ -340,9 +339,6 @@ def __init__(self,
         :param process_file_interval: process a file at most once every this
         many seconds
         :type process_file_interval: float
-        :param min_file_parsing_loop_time: wait until at least this many seconds have
-        passed before parsing files once all files have finished parsing.
-        :type min_file_parsing_loop_time: float
         :param max_runs: The number of times to parse and schedule each file. -1
         for unlimited.
         :type max_runs: int
@@ -358,7 +354,6 @@ def __init__(self,
         self._dag_directory = dag_directory
         self._max_runs = max_runs
         self._process_file_interval = process_file_interval
-        self._min_file_parsing_loop_time = min_file_parsing_loop_time
         self._processor_factory = processor_factory
         # Map from file path to the processor
         self._processors = {}
@@ -529,24 +524,12 @@ def heartbeat(self):
             file_paths_in_progress = self._processors.keys()
             now = timezone.utcnow()
             file_paths_recently_processed = []
-
-            longest_parse_duration = 0
             for file_path in self._file_paths:
                 last_finish_time = self.get_last_finish_time(file_path)
-                if last_finish_time is not None:
-                    duration = now - last_finish_time
-                    longest_parse_duration = max(duration.total_seconds(),
-                                                 longest_parse_duration)
-                    if duration.total_seconds() < self._process_file_interval:
-                        file_paths_recently_processed.append(file_path)
-
-            sleep_length = max(self._min_file_parsing_loop_time - longest_parse_duration,
-                               0)
-            if sleep_length > 0:
-                self.log.debug("Sleeping for %.2f seconds to prevent excessive "
-                               "logging",
-                               sleep_length)
-                time.sleep(sleep_length)
+                if (last_finish_time is not None and
+                    (now - last_finish_time).total_seconds() <
+                        self._process_file_interval):
+                    file_paths_recently_processed.append(file_path)
 
             files_paths_at_run_limit = [file_path
                                         for file_path, num_runs in self._run_count.items()
diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml
index 3e64ae4e47..f8e99778f5 100644
--- a/scripts/ci/kubernetes/kube/configmaps.yaml
+++ b/scripts/ci/kubernetes/kube/configmaps.yaml
@@ -57,9 +57,6 @@ data:
     statsd_port = 8125
     statsd_prefix = airflow
 
-    # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
-    min_file_parsing_loop_time = 1
-
     print_stats_interval = 30
     scheduler_zombie_task_threshold = 300
     max_tis_per_query = 0
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index 7abe7efe9b..f29e384b8c 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -32,7 +32,6 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
             parallelism=1,
             process_file_interval=1,
             max_runs=1,
-            min_file_parsing_loop_time=0,
             processor_factory=MagicMock().return_value)
 
         mock_processor = MagicMock()
@@ -52,7 +51,6 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
             parallelism=1,
             process_file_interval=1,
             max_runs=1,
-            min_file_parsing_loop_time=0,
             processor_factory=MagicMock().return_value)
 
         mock_processor = MagicMock()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services