You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/30 10:55:56 UTC

[airflow] branch v1-10-test updated (13e5349 -> 3ca09b9)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    omit 13e5349  Add docs about reload_on_plugin_change opiton (#9575)
    omit 38e2794  [AIRFLOW-6897] Simplify DagFileProcessorManager (#7521)
    omit b6307ef  Fix failing tests from #8997 (#9576)
     new ed0df62  fixup! Reload gunicorn when plugins has beeen changed (#8997)
     new 78e6a4b  Fix failing tests from #8997 (#9576)
     new 942cd79  Add docs about reload_on_plugin_change opiton (#9575)
     new 3ca09b9  [AIRFLOW-6897] Simplify DagFileProcessorManager (#7521)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (13e5349)
            \
             N -- N -- N   refs/heads/v1-10-test (3ca09b9)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/bin/cli.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[airflow] 04/04: [AIRFLOW-6897] Simplify DagFileProcessorManager (#7521)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3ca09b92e0fa1f8e961ab634baedfc15296899c5
Author: Kamil Breguła <mi...@users.noreply.github.com>
AuthorDate: Mon Feb 24 16:15:38 2020 +0100

    [AIRFLOW-6897] Simplify DagFileProcessorManager (#7521)
    
    (cherry picked from commit 83d826b9925ce0eb2bd1fe403f5151fbef310b63)
---
 airflow/utils/dag_processing.py | 129 ++++++++++++++++++++--------------------
 1 file changed, 63 insertions(+), 66 deletions(-)

diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 3aac8fd..e4ecc29 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -763,7 +763,7 @@ class DagFileProcessorManager(LoggingMixin):
         # Map from file path to the processor
         self._processors = {}
 
-        self._heartbeat_count = 0
+        self._num_run = 0
 
         # Map from file path to stats about the file
         self._file_stats = {}  # type: dict(str, DagFileStat)
@@ -843,11 +843,24 @@ class DagFileProcessorManager(LoggingMixin):
                 # are told to (as that would open another connection to the
                 # SQLite DB which isn't a good practice
                 continue
-
+            # pylint: enable=no-else-break
             self._refresh_dag_dir()
-            self._find_zombies()
+            self._find_zombies()  # pylint: disable=no-value-for-parameter
+
+            self._kill_timed_out_processors()
+            simple_dags = self.collect_results()
+
+            # Generate more file paths to process if we processed all the files
+            # already.
+            if not self._file_path_queue:
+                self.emit_metrics()
+                self.prepare_file_path_queue()
+
+            self.start_new_processes()
+
+            # Update number of loop iteration.
+            self._num_run += 1
 
-            simple_dags = self.heartbeat()
             for simple_dag in simple_dags:
                 self._signal_conn.send(simple_dag)
 
@@ -1197,65 +1210,11 @@ class DagFileProcessorManager(LoggingMixin):
 
         return simple_dags
 
-    def heartbeat(self):
-        """
-        This should be periodically called by the manager loop. This method will
-        kick off new processes to process DAG definition files and read the
-        results from the finished processors.
-
-        :return: a list of SimpleDags that were produced by processors that
-            have finished since the last time this was called
-        :rtype: list[airflow.utils.dag_processing.SimpleDag]
+    def start_new_processes(self):
+        """"
+        Start more processors if we have enough slots and files to process
         """
-        simple_dags = self.collect_results()
-
-        # Generate more file paths to process if we processed all the files
-        # already.
-        if len(self._file_path_queue) == 0:
-            self.emit_metrics()
-
-            self._parsing_start_time = timezone.utcnow()
-            # If the file path is already being processed, or if a file was
-            # processed recently, wait until the next batch
-            file_paths_in_progress = self._processors.keys()
-            now = timezone.utcnow()
-            file_paths_recently_processed = []
-            for file_path in self._file_paths:
-                last_finish_time = self.get_last_finish_time(file_path)
-                if (last_finish_time is not None and
-                    (now - last_finish_time).total_seconds() <
-                        self._file_process_interval):
-                    file_paths_recently_processed.append(file_path)
-
-            files_paths_at_run_limit = [file_path
-                                        for file_path, stat in self._file_stats.items()
-                                        if stat.run_count == self._max_runs]
-
-            files_paths_to_queue = list(set(self._file_paths) -
-                                        set(file_paths_in_progress) -
-                                        set(file_paths_recently_processed) -
-                                        set(files_paths_at_run_limit))
-
-            for file_path, processor in self._processors.items():
-                self.log.debug(
-                    "File path %s is still being processed (started: %s)",
-                    processor.file_path, processor.start_time.isoformat()
-                )
-
-            self.log.debug(
-                "Queuing the following files for processing:\n\t%s",
-                "\n\t".join(files_paths_to_queue)
-            )
-
-            for file_path in files_paths_to_queue:
-                if file_path not in self._file_stats:
-                    self._file_stats[file_path] = DagFileStat(0, 0, None, None, 0)
-
-            self._file_path_queue.extend(files_paths_to_queue)
-
-        # Start more processors if we have enough slots and files to process
-        while (self._parallelism - len(self._processors) > 0 and
-               len(self._file_path_queue) > 0):
+        while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
             file_path = self._file_path_queue.pop(0)
             processor = self._processor_factory(file_path, self._zombies)
             Stats.incr('dag_processing.processes')
@@ -1267,10 +1226,48 @@ class DagFileProcessorManager(LoggingMixin):
             )
             self._processors[file_path] = processor
 
-        # Update heartbeat count.
-        self._heartbeat_count += 1
+    def prepare_file_path_queue(self):
+        """
+        Generate more file paths to process. Result are saved in _file_path_queue.
+        """
+        self._parsing_start_time = timezone.utcnow()
+        # If the file path is already being processed, or if a file was
+        # processed recently, wait until the next batch
+        file_paths_in_progress = self._processors.keys()
+        now = timezone.utcnow()
+        file_paths_recently_processed = []
+        for file_path in self._file_paths:
+            last_finish_time = self.get_last_finish_time(file_path)
+            if (last_finish_time is not None and
+                (now - last_finish_time).total_seconds() <
+                    self._file_process_interval):
+                file_paths_recently_processed.append(file_path)
+
+        files_paths_at_run_limit = [file_path
+                                    for file_path, stat in self._file_stats.items()
+                                    if stat.run_count == self._max_runs]
+
+        files_paths_to_queue = list(set(self._file_paths) -
+                                    set(file_paths_in_progress) -
+                                    set(file_paths_recently_processed) -
+                                    set(files_paths_at_run_limit))
 
-        return simple_dags
+        for file_path, processor in self._processors.items():
+            self.log.debug(
+                "File path %s is still being processed (started: %s)",
+                processor.file_path, processor.start_time.isoformat()
+            )
+
+        self.log.debug(
+            "Queuing the following files for processing:\n\t%s",
+            "\n\t".join(files_paths_to_queue)
+        )
+
+        for file_path in files_paths_to_queue:
+            if file_path not in self._file_stats:
+                self._file_stats[file_path] = DagFileStat(0, 0, None, None, 0)
+
+        self._file_path_queue.extend(files_paths_to_queue)
 
     @provide_session
     def _find_zombies(self, session):
@@ -1338,7 +1335,7 @@ class DagFileProcessorManager(LoggingMixin):
         for stat in self._file_stats.values():
             if stat.run_count < self._max_runs:
                 return False
-        if self._heartbeat_count < self._max_runs:
+        if self._num_run < self._max_runs:
             return False
         return True
 


[airflow] 01/04: fixup! Reload gunicorn when plugins has beeen changed (#8997)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ed0df6253d514fbefad4c0bf05cfd7223d207a93
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Jun 30 11:54:31 2020 +0100

    fixup! Reload gunicorn when plugins has beeen changed (#8997)
---
 airflow/bin/cli.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index b81c7b1..8140206 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1112,7 +1112,7 @@ def webserver(args):
             run_args += ['--error-logfile', str(args.error_logfile)]
 
         if args.daemon:
-            run_args += ['--deamon']
+            run_args += ['--daemon']
 
         if ssl_cert:
             run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]


[airflow] 02/04: Fix failing tests from #8997 (#9576)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 78e6a4b6095838f0f672ee8b37463977f3478d54
Author: Kamil Breguła <mi...@users.noreply.github.com>
AuthorDate: Mon Jun 29 21:02:44 2020 +0200

    Fix failing tests from #8997 (#9576)
    
    (cherry picked from commit 9858294f7252e626bec7b701280e81d4f27df452)
---
 airflow/bin/cli.py                           | 4 +++-
 airflow/config_templates/config.yml          | 4 ++--
 airflow/config_templates/default_airflow.cfg | 4 ++--
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 8140206..9643660 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1135,7 +1135,9 @@ def webserver(args):
                 master_timeout=conf.getint('webserver', 'web_server_master_timeout'),
                 worker_refresh_interval=conf.getint('webserver', 'worker_refresh_interval', fallback=10),
                 worker_refresh_batch_size=conf.getint('webserver', 'worker_refresh_batch_size', fallback=1),
-                reload_on_plugin_change=conf.getint('webserver', 'reload_on_plugin_change', fallback=1),
+                reload_on_plugin_change=conf.getboolean(
+                    'webserver', 'reload_on_plugin_change', fallback=False
+                ),
             ).start()
 
         if args.daemon:
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 1f697b1..f632cd5 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -701,12 +701,12 @@
       default: "30"
     - name: reload_on_plugin_change
       description: |
-        If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+        If set to True, Airflow will track files in plugins_folder directory. When it detects changes,
         then reload the gunicorn.
       version_added: ~
       type: boolean
       example: ~
-      default: False
+      default: "False"
     - name: secret_key
       description: |
         Secret key used to run your flask app
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index cfc92ad..a061d46 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -341,9 +341,9 @@ worker_refresh_batch_size = 1
 # Number of seconds to wait before refreshing a batch of workers.
 worker_refresh_interval = 30
 
-# If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+# If set to True, Airflow will track files in plugins_folder directory. When it detects changes,
 # then reload the gunicorn.
-reload_on_plugin_change =
+reload_on_plugin_change = False
 
 # Secret key used to run your flask app
 # It should be as random as possible


[airflow] 03/04: Add docs about reload_on_plugin_change opiton (#9575)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 942cd7930468d89031b57d6c09818cc077dccb1c
Author: Kamil Breguła <mi...@users.noreply.github.com>
AuthorDate: Mon Jun 29 23:24:24 2020 +0200

    Add docs about reload_on_plugin_change opiton (#9575)
    
    
    (cherry picked from commit 656c48da9c1d2ea13b928bd6b968ccd86bc7a95d)
---
 docs/plugins.rst | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/docs/plugins.rst b/docs/plugins.rst
index bfe08aa..88559e0 100644
--- a/docs/plugins.rst
+++ b/docs/plugins.rst
@@ -318,3 +318,12 @@ This will create a hook, and an operator accessible at:
 
 - ``airflow.hooks.my_namespace.MyHook``
 - ``airflow.operators.my_namespace.MyOperator``
+
+Automatic reloading webserver
+-----------------------------
+
+To enable automatic reloading of the webserver, when changes in a directory with plugins has been detected,
+you should set ``reload_on_plugin_change`` option in ``[webserver]`` section to ``True``.
+
+.. note::
+    For more information on setting the configuration, see :doc:`/howto/set-config`