You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/04/01 13:47:15 UTC

[airflow] branch v2-0-test updated (fb81726 -> 663985d)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a change to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from fb81726  Pass queue to BaseExecutor.execute_async like in airflow 1.10 (#14861)
     new 9313082  Fix typo in doc docker-stack (#14928)
     new 48f5149  Compare string values, not if strings are the same object (#14942)
     new 7c06e2c  Docs: Clarify behavior of delete_worker_pods_on_failure (#14958)
     new c952545  Add missing comma in docs for KubernetesExecutor (#15035)
     new e66c870  More proper default value for namespace in K8S cleanup-pods CLI (#15060)
     new c50cb01  Fixed deprecated code example in Concepts doc (#15098)
     new 924c660  Remove 'conf' from search_columns in DagRun View (#15099)
     new f649b8a  Re-introduce dagrun.schedule_delay metric (#15105)
     new cae8ead  Allow pathlib.Path in DagBag and various util fns (#15110)
     new 132825b  Avoid scheduler/parser manager deadlock by using non-blocking IO (#15112)
     new 663985d  Fix bug in airflow.stats timing that broke dogstatsd mode (#15132)

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/cli/cli_parser.py                    |   4 +-
 airflow/config_templates/config.yml          |   2 +
 airflow/config_templates/default_airflow.cfg |   2 +
 airflow/executors/kubernetes_executor.py     |   2 +-
 airflow/jobs/scheduler_job.py                |  10 +-
 airflow/models/dagbag.py                     |  20 +--
 airflow/models/dagrun.py                     |   4 +-
 airflow/stats.py                             |   2 +-
 airflow/utils/dag_processing.py              |  39 ++++--
 airflow/utils/file.py                        |  13 +-
 airflow/www/views.py                         |   1 -
 docs/apache-airflow/concepts.rst             |   3 +-
 docs/apache-airflow/executor/kubernetes.rst  |   2 +-
 docs/docker-stack/entrypoint.rst             |   2 +-
 pylintrc-tests                               |   4 +-
 tests/core/test_stats.py                     | 193 ++++++++++++++-------------
 tests/jobs/test_scheduler_job.py             |  19 ++-
 tests/utils/test_dag_processing.py           |  99 ++++++++++++--
 18 files changed, 283 insertions(+), 138 deletions(-)

[airflow] 01/11: Fix typo in doc docker-stack (#14928)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 93130820d55d47b14c8f5813f2ef37ec7f5b1487
Author: raphaelauv <ra...@users.noreply.github.com>
AuthorDate: Mon Mar 22 11:02:54 2021 +0100

    Fix typo in doc docker-stack (#14928)
    
    Fix a typo on doc , the _AIRFLOW_WWW_USER_PASSWORD_CMD is repeated two times
    
    (cherry picked from commit fa92657490bc188d272fb4d8f3b09933815bcea1)
---
 docs/docker-stack/entrypoint.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/docker-stack/entrypoint.rst b/docs/docker-stack/entrypoint.rst
index a7889c4..829b37e 100644
--- a/docs/docker-stack/entrypoint.rst
+++ b/docs/docker-stack/entrypoint.rst
@@ -130,7 +130,7 @@ Creating admin user
 The entrypoint can also create webserver user automatically when you enter it. you need to set
 ``_AIRFLOW_WWW_USER_CREATE`` to a non-empty value in order to do that. This is not intended for
 production, it is only useful if you would like to run a quick test with the production image.
-You need to pass at least password to create such user via ``_AIRFLOW_WWW_USER_PASSWORD_CMD`` or
+You need to pass at least password to create such user via ``_AIRFLOW_WWW_USER_PASSWORD`` or
 ``_AIRFLOW_WWW_USER_PASSWORD_CMD`` similarly like for other ``*_CMD`` variables, the content of
 the ``*_CMD`` will be evaluated as shell command and it's output will be set as password.
 

[airflow] 09/11: Allow pathlib.Path in DagBag and various util fns (#15110)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cae8eadb49c904f1114b6167305fb5b3cfa9fa84
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Mar 31 15:48:46 2021 +0100

    Allow pathlib.Path in DagBag and various util fns (#15110)
    
    We do a lot of path manipulation in this test file, and it's easier to
    understand by using pathlib without all the nested `os.path.*` calls.
    
    This change adds "support" for passing Path objects to DagBag and
    util functions.
    
    (cherry picked from commit 6e99ae05642758691361dfe9d7b767cfc9a2b616)
---
 airflow/models/dagbag.py           | 20 ++++++++++++--------
 airflow/utils/dag_processing.py    |  7 +++++--
 airflow/utils/file.py              | 13 ++++++++-----
 tests/utils/test_dag_processing.py | 19 ++++++++++---------
 4 files changed, 35 insertions(+), 24 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 9ddf013..8228659 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -27,7 +27,7 @@ import traceback
 import warnings
 import zipfile
 from datetime import datetime, timedelta
-from typing import Dict, List, NamedTuple, Optional
+from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Union
 
 from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter
 from sqlalchemy.exc import OperationalError
@@ -46,6 +46,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import provide_session
 from airflow.utils.timeout import timeout
 
+if TYPE_CHECKING:
+    import pathlib
+
 
 class FileLoadStat(NamedTuple):
     """Information about single file"""
@@ -89,7 +92,7 @@ class DagBag(LoggingMixin):
 
     def __init__(
         self,
-        dag_folder: Optional[str] = None,
+        dag_folder: Union[str, "pathlib.Path", None] = None,
         include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
         include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'),
         safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
@@ -424,11 +427,11 @@ class DagBag(LoggingMixin):
 
     def collect_dags(
         self,
-        dag_folder=None,
-        only_if_updated=True,
-        include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
-        include_smart_sensor=conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'),
-        safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
+        dag_folder: Union[str, "pathlib.Path", None] = None,
+        only_if_updated: bool = True,
+        include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
+        include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'),
+        safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
     ):
         """
         Given a file path or a folder, this method looks for python modules,
@@ -450,7 +453,8 @@ class DagBag(LoggingMixin):
         # Used to store stats around DagBag processing
         stats = []
 
-        dag_folder = correct_maybe_zipped(dag_folder)
+        # Ensure dag_folder is a str -- it may have been a pathlib.Path
+        dag_folder = correct_maybe_zipped(str(dag_folder))
         for filepath in list_py_file_paths(
             dag_folder,
             safe_mode=safe_mode,
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 7e98c11..f93847c 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -30,7 +30,7 @@ from collections import defaultdict
 from datetime import datetime, timedelta
 from importlib import import_module
 from multiprocessing.connection import Connection as MultiprocessingConnection
-from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple, Union, cast
+from typing import TYPE_CHECKING, Any, Callable, Dict, List, NamedTuple, Optional, Tuple, Union, cast
 
 from setproctitle import setproctitle  # pylint: disable=no-name-in-module
 from sqlalchemy import or_
@@ -52,6 +52,9 @@ from airflow.utils.process_utils import kill_child_processes_by_pids, reap_proce
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
+if TYPE_CHECKING:
+    import pathlib
+
 
 class AbstractDagFileProcessorProcess(metaclass=ABCMeta):
     """Processes a DAG file. See SchedulerJob.process_file() for more details."""
@@ -489,7 +492,7 @@ class DagFileProcessorManager(LoggingMixin):  # pylint: disable=too-many-instanc
 
     def __init__(
         self,
-        dag_directory: str,
+        dag_directory: Union[str, "pathlib.Path"],
         max_runs: int,
         processor_factory: Callable[[str, List[CallbackRequest]], AbstractDagFileProcessorProcess],
         processor_timeout: timedelta,
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index 553c506..03343cd 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -20,10 +20,13 @@ import os
 import re
 import zipfile
 from pathlib import Path
-from typing import Dict, Generator, List, Optional, Pattern
+from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Pattern, Union
 
 from airflow.configuration import conf
 
+if TYPE_CHECKING:
+    import pathlib
+
 log = logging.getLogger(__name__)
 
 
@@ -130,7 +133,7 @@ def find_path_from_directory(base_dir_path: str, ignore_file_name: str) -> Gener
 
 
 def list_py_file_paths(
-    directory: str,
+    directory: Union[str, "pathlib.Path"],
     safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE', fallback=True),
     include_examples: Optional[bool] = None,
     include_smart_sensor: Optional[bool] = conf.getboolean('smart_sensor', 'use_smart_sensor'),
@@ -158,7 +161,7 @@ def list_py_file_paths(
     if directory is None:
         file_paths = []
     elif os.path.isfile(directory):
-        file_paths = [directory]
+        file_paths = [str(directory)]
     elif os.path.isdir(directory):
         find_dag_file_paths(directory, file_paths, safe_mode)
     if include_examples:
@@ -174,9 +177,9 @@ def list_py_file_paths(
     return file_paths
 
 
-def find_dag_file_paths(directory: str, file_paths: list, safe_mode: bool):
+def find_dag_file_paths(directory: Union[str, "pathlib.Path"], file_paths: list, safe_mode: bool):
     """Finds file paths of all DAG files."""
-    for file_path in find_path_from_directory(directory, ".airflowignore"):
+    for file_path in find_path_from_directory(str(directory), ".airflowignore"):
         try:
             if not os.path.isfile(file_path):
                 continue
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index dc08210..68a950f 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -18,6 +18,7 @@
 
 import multiprocessing
 import os
+import pathlib
 import sys
 import unittest
 from datetime import datetime, timedelta
@@ -49,7 +50,7 @@ from tests.core.test_logging_config import SETTINGS_FILE_VALID, settings_context
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
 
-TEST_DAG_FOLDER = os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, 'dags')
+TEST_DAG_FOLDER = pathlib.Path(__file__).parent.parent / 'dags'
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
@@ -276,7 +277,7 @@ class TestDagFileProcessorManager(unittest.TestCase):
         Check that the same set of failure callback with zombies are passed to the dag
         file processors until the next zombie detection logic is invoked.
         """
-        test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py')
+        test_dag_path = TEST_DAG_FOLDER / 'test_example_bash_operator.py'
         with conf_vars({('scheduler', 'parsing_processes'): '1', ('core', 'load_examples'): 'False'}):
             dagbag = DagBag(test_dag_path, read_dags_from_db=False)
             with create_session() as session:
@@ -305,7 +306,7 @@ class TestDagFileProcessorManager(unittest.TestCase):
                     )
                 ]
 
-            test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py')
+            test_dag_path = TEST_DAG_FOLDER / 'test_example_bash_operator.py'
 
             child_pipe, parent_pipe = multiprocessing.Pipe()
             async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
@@ -334,12 +335,12 @@ class TestDagFileProcessorManager(unittest.TestCase):
             if async_mode:
                 # Once for initial parse, and then again for the add_callback_to_queue
                 assert len(fake_processors) == 2
-                assert fake_processors[0]._file_path == test_dag_path
+                assert fake_processors[0]._file_path == str(test_dag_path)
                 assert fake_processors[0]._callback_requests == []
             else:
                 assert len(fake_processors) == 1
 
-            assert fake_processors[-1]._file_path == test_dag_path
+            assert fake_processors[-1]._file_path == str(test_dag_path)
             callback_requests = fake_processors[-1]._callback_requests
             assert {zombie.simple_task_instance.key for zombie in expected_failure_callback_requests} == {
                 result.simple_task_instance.key for result in callback_requests
@@ -403,7 +404,7 @@ class TestDagFileProcessorManager(unittest.TestCase):
         from airflow.jobs.scheduler_job import SchedulerJob
 
         dag_id = 'exit_test_dag'
-        dag_directory = os.path.normpath(os.path.join(TEST_DAG_FOLDER, os.pardir, "dags_with_system_exit"))
+        dag_directory = TEST_DAG_FOLDER.parent / 'dags_with_system_exit'
 
         # Delete the one valid DAG/SerializedDAG, and check that it gets re-created
         clear_db_dags()
@@ -465,7 +466,7 @@ class TestDagFileProcessorAgent(unittest.TestCase):
         with settings_context(SETTINGS_FILE_VALID):
             # Launch a process through DagFileProcessorAgent, which will try
             # reload the logging module.
-            test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py')
+            test_dag_path = TEST_DAG_FOLDER / 'test_scheduler_dags.py'
             async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
             log_file_loc = conf.get('logging', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
 
@@ -493,7 +494,7 @@ class TestDagFileProcessorAgent(unittest.TestCase):
         clear_db_serialized_dags()
         clear_db_dags()
 
-        test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py')
+        test_dag_path = TEST_DAG_FOLDER / 'test_scheduler_dags.py'
         async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
         processor_agent = DagFileProcessorAgent(
             test_dag_path, 1, type(self)._processor_factory, timedelta.max, [], False, async_mode
@@ -517,7 +518,7 @@ class TestDagFileProcessorAgent(unittest.TestCase):
             assert dag_ids == [('test_start_date_scheduling',), ('test_task_start_date_scheduling',)]
 
     def test_launch_process(self):
-        test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py')
+        test_dag_path = TEST_DAG_FOLDER / 'test_scheduler_dags.py'
         async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
 
         log_file_loc = conf.get('logging', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')

[airflow] 02/11: Compare string values, not if strings are the same object (#14942)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 48f5149bc78ef6b8fc37e1757abc1943fca6daf1
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Mar 22 18:55:39 2021 -0600

    Compare string values, not if strings are the same object (#14942)
    
    I found this when investigating why the delete_worker_pods_on_failure flag wasn't working. The feature has sufficient test coverage, but doesn't fail simply because the strings have the same id when running in the test suite, which is exactly what happens in practice.
    
    flake8/pylint also don't seem to raise their respective failures unless one side it literally a literal string, even though typing is applied 🤷‍♂️.
    
    I fixed 2 other occurrences I found while I was at it.
    
    (cherry picked from commit 6d30464319216981d10eec1d373646f043fe766c)
---
 airflow/executors/kubernetes_executor.py | 2 +-
 airflow/models/dagrun.py                 | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index c42531a..7e3d82b 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -576,7 +576,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             if self.kube_config.delete_worker_pods:
                 if not self.kube_scheduler:
                     raise AirflowException(NOT_STARTED_MESSAGE)
-                if state is not State.FAILED or self.kube_config.delete_worker_pods_on_failure:
+                if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
                     self.kube_scheduler.delete_pod(pod_id, namespace)
                     self.log.info('Deleted pod: %s in namespace %s', str(key), str(namespace))
             try:
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 674d4df..f1c32b1 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -619,7 +619,7 @@ class DagRun(Base, LoggingMixin):
             return
 
         duration = self.end_date - self.start_date
-        if self.state is State.SUCCESS:
+        if self.state == State.SUCCESS:
             Stats.timing(f'dagrun.duration.success.{self.dag_id}', duration)
         elif self.state == State.FAILED:
             Stats.timing(f'dagrun.duration.failed.{self.dag_id}', duration)
@@ -647,7 +647,7 @@ class DagRun(Base, LoggingMixin):
             except AirflowException:
                 if ti.state == State.REMOVED:
                     pass  # ti has already been removed, just ignore it
-                elif self.state is not State.RUNNING and not dag.partial:
+                elif self.state != State.RUNNING and not dag.partial:
                     self.log.warning("Failed to get task '%s' for dag '%s'. Marking it as removed.", ti, dag)
                     Stats.incr(f"task_removed_from_dag.{dag.dag_id}", 1, 1)
                     ti.state = State.REMOVED

[airflow] 04/11: Add missing comma in docs for KubernetesExecutor (#15035)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c952545118421a0cba9d5e65a1ae50dab09d0d62
Author: tpilewicz <31...@users.noreply.github.com>
AuthorDate: Fri Mar 26 21:09:23 2021 +0100

    Add missing comma in docs for KubernetesExecutor (#15035)
    
    (cherry picked from commit 0e43b600df123fe2fe3206d5477eaa2c9b4bc1ef)
---
 docs/apache-airflow/executor/kubernetes.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst
index a0df9db..217a29c 100644
--- a/docs/apache-airflow/executor/kubernetes.rst
+++ b/docs/apache-airflow/executor/kubernetes.rst
@@ -113,7 +113,7 @@ create a V1pod with a single container, and overwrite the fields as follows:
     :start-after: [START task_with_volume]
     :end-before: [END task_with_volume]
 
-Note that volume mounts environment variables, ports, and devices will all be extended instead of overwritten.
+Note that volume mounts, environment variables, ports, and devices will all be extended instead of overwritten.
 
 To add a sidecar container to the launched pod, create a V1pod with an empty first container with the
 name ``base`` and a second container containing your desired sidecar.

[airflow] 11/11: Fix bug in airflow.stats timing that broke dogstatsd mode (#15132)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 663985d8972a7ddb66684a413f17a29278ee509d
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Thu Apr 1 14:46:32 2021 +0100

    Fix bug in airflow.stats timing that broke dogstatsd mode (#15132)
    
    The fix for this was very easy -- just a `timer` -> `timed` typo.
    
    However it turns out that the tests for airflow.stats were insufficient
    and didn't catch this, so I have extended the tests in two ways:
    
    1. Test all the other stat methods than just incr (guage, timer, timing,
       decr)
    2. Use "auto-specing" feature of Mock to ensure that we can't make up
       methods to call on a mock object.
    
       > Autospeccing is based on the existing spec feature of mock.
       > It limits the api of mocks to the api of an original object (the
       > spec), but it is recursive (implemented lazily) so that attributes of
       > mocks only have the same api as the attributes of the spec. In
       > addition mocked functions / methods have the same call signature as
       > the original so they raise a TypeError if they are called
       > incorrectly.
    
    (cherry picked from commit b7cd2df056ac3ab113d77c5f6b65f02a77337907)
---
 airflow/stats.py         |   2 +-
 tests/core/test_stats.py | 193 +++++++++++++++++++++++++----------------------
 2 files changed, 104 insertions(+), 91 deletions(-)

diff --git a/airflow/stats.py b/airflow/stats.py
index 8207220..34677da 100644
--- a/airflow/stats.py
+++ b/airflow/stats.py
@@ -343,7 +343,7 @@ class SafeDogStatsdLogger:
         """Timer metric that can be cancelled"""
         if stat and self.allow_list_validator.test(stat):
             tags = tags or []
-            return Timer(self.dogstatsd.timer(stat, *args, tags=tags, **kwargs))
+            return Timer(self.dogstatsd.timed(stat, *args, tags=tags, **kwargs))
         return Timer()
 
 
diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py
index b635e62..83169e2 100644
--- a/tests/core/test_stats.py
+++ b/tests/core/test_stats.py
@@ -31,17 +31,7 @@ from tests.test_utils.config import conf_vars
 
 
 class CustomStatsd(statsd.StatsClient):
-    incr_calls = 0
-
-    def __init__(self, host=None, port=None, prefix=None):
-        super().__init__()
-
-    def incr(self, stat, count=1, rate=1):  # pylint: disable=unused-argument
-        CustomStatsd.incr_calls += 1
-
-    @classmethod
-    def _reset(cls):
-        cls.incr_calls = 0
+    pass
 
 
 class InvalidCustomStatsd:
@@ -50,25 +40,14 @@ class InvalidCustomStatsd:
     statsd.StatsClient.
     """
 
-    incr_calls = 0
-
     def __init__(self, host=None, port=None, prefix=None):
         pass
 
-    def incr(self, stat, count=1, rate=1):  # pylint: disable=unused-argument
-        InvalidCustomStatsd.incr_calls += 1
-
-    @classmethod
-    def _reset(cls):
-        cls.incr_calls = 0
-
 
 class TestStats(unittest.TestCase):
     def setUp(self):
-        self.statsd_client = Mock()
+        self.statsd_client = Mock(spec=statsd.StatsClient)
         self.stats = SafeStatsdLogger(self.statsd_client)
-        CustomStatsd._reset()
-        InvalidCustomStatsd._reset()
 
     def test_increment_counter_with_valid_name(self):
         self.stats.incr('test_stats_run')
@@ -86,49 +65,56 @@ class TestStats(unittest.TestCase):
         self.stats.incr('test/$tats')
         self.statsd_client.assert_not_called()
 
-    @conf_vars({('metrics', 'statsd_on'): 'True'})
-    @mock.patch("statsd.StatsClient")
-    def test_does_send_stats_using_statsd(self, mock_statsd):
-        importlib.reload(airflow.stats)
-        airflow.stats.Stats.incr("dummy_key")
-        mock_statsd.return_value.incr.assert_called_once_with('dummy_key', 1, 1)
+    def test_timer(self):
+        with self.stats.timer("dummy_timer"):
+            pass
+        self.statsd_client.timer.assert_called_once_with('dummy_timer')
 
-    @conf_vars({('metrics', 'statsd_on'): 'True'})
-    @mock.patch("datadog.DogStatsd")
-    def test_does_not_send_stats_using_dogstatsd(self, mock_dogstatsd):
-        importlib.reload(airflow.stats)
-        airflow.stats.Stats.incr("dummy_key")
-        mock_dogstatsd.return_value.assert_not_called()
+    def test_empty_timer(self):
+        with self.stats.timer():
+            pass
+        self.statsd_client.timer.assert_not_called()
 
-    @conf_vars(
-        {
-            ("metrics", "statsd_on"): "True",
-            ("metrics", "statsd_custom_client_path"): "tests.core.test_stats.CustomStatsd",
-        }
-    )
-    def test_load_custom_statsd_client(self):
+    def test_timing(self):
+        self.stats.timing("dummy_timer", 123)
+        self.statsd_client.timing.assert_called_once_with('dummy_timer', 123)
+
+    def test_gauge(self):
+        self.stats.gauge("dummy", 123)
+        self.statsd_client.gauge.assert_called_once_with('dummy', 123, 1, False)
+
+    def test_decr(self):
+        self.stats.decr("dummy")
+        self.statsd_client.decr.assert_called_once_with('dummy', 1, 1)
+
+    def test_enabled_by_config(self):
+        """Test that enabling this sets the right instance properties"""
+        with conf_vars({('metrics', 'statsd_on'): 'True'}):
+            importlib.reload(airflow.stats)
+            assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient)
+            assert not hasattr(airflow.stats.Stats, 'dogstatsd')
+        # Avoid side-effects
         importlib.reload(airflow.stats)
-        assert 'CustomStatsd' == type(airflow.stats.Stats.statsd).__name__  # noqa: E721
 
-    @conf_vars(
-        {
-            ("metrics", "statsd_on"): "True",
-            ("metrics", "statsd_custom_client_path"): "tests.core.test_stats.CustomStatsd",
-        }
-    )
-    def test_does_use_custom_statsd_client(self):
+    def test_load_custom_statsd_client(self):
+        with conf_vars(
+            {
+                ("metrics", "statsd_on"): "True",
+                ("metrics", "statsd_custom_client_path"): f"{__name__}.CustomStatsd",
+            }
+        ):
+            importlib.reload(airflow.stats)
+            assert isinstance(airflow.stats.Stats.statsd, CustomStatsd)
+        # Avoid side-effects
         importlib.reload(airflow.stats)
-        airflow.stats.Stats.incr("dummy_key")
-        assert airflow.stats.Stats.statsd.incr_calls == 1
 
-    @conf_vars(
-        {
-            ("metrics", "statsd_on"): "True",
-            ("metrics", "statsd_custom_client_path"): "tests.core.test_stats.InvalidCustomStatsd",
-        }
-    )
     def test_load_invalid_custom_stats_client(self):
-        with pytest.raises(
+        with conf_vars(
+            {
+                ("metrics", "statsd_on"): "True",
+                ("metrics", "statsd_custom_client_path"): f"{__name__}.InvalidCustomStatsd",
+            }
+        ), pytest.raises(
             AirflowConfigException,
             match=re.escape(
                 'Your custom Statsd client must extend the statsd.'
@@ -137,15 +123,15 @@ class TestStats(unittest.TestCase):
         ):
             importlib.reload(airflow.stats)
             airflow.stats.Stats.incr("dummy_key")
-
-    def tearDown(self) -> None:
-        # To avoid side-effect
         importlib.reload(airflow.stats)
 
 
 class TestDogStats(unittest.TestCase):
     def setUp(self):
-        self.dogstatsd_client = Mock()
+        pytest.importorskip('datadog')
+        from datadog import DogStatsd
+
+        self.dogstatsd_client = Mock(spec=DogStatsd)
         self.dogstatsd = SafeDogStatsdLogger(self.dogstatsd_client)
 
     def test_increment_counter_with_valid_name_with_dogstatsd(self):
@@ -166,48 +152,72 @@ class TestDogStats(unittest.TestCase):
         self.dogstatsd.incr('test/$tats')
         self.dogstatsd_client.assert_not_called()
 
-    @conf_vars({('metrics', 'statsd_datadog_enabled'): 'True'})
-    @mock.patch("datadog.DogStatsd")
-    def test_does_send_stats_using_dogstatsd_when_dogstatsd_on(self, mock_dogstatsd):
-        importlib.reload(airflow.stats)
-        airflow.stats.Stats.incr("dummy_key")
-        mock_dogstatsd.return_value.increment.assert_called_once_with(
+    def test_does_send_stats_using_dogstatsd_when_dogstatsd_on(self):
+        self.dogstatsd.incr("dummy_key")
+        self.dogstatsd_client.increment.assert_called_once_with(
             metric='dummy_key', sample_rate=1, tags=[], value=1
         )
 
-    @conf_vars({('metrics', 'statsd_datadog_enabled'): 'True'})
-    @mock.patch("datadog.DogStatsd")
-    def test_does_send_stats_using_dogstatsd_with_tags(self, mock_dogstatsd):
-        importlib.reload(airflow.stats)
-        airflow.stats.Stats.incr("dummy_key", 1, 1, ['key1:value1', 'key2:value2'])
-        mock_dogstatsd.return_value.increment.assert_called_once_with(
+    def test_does_send_stats_using_dogstatsd_with_tags(self):
+        self.dogstatsd.incr("dummy_key", 1, 1, ['key1:value1', 'key2:value2'])
+        self.dogstatsd_client.increment.assert_called_once_with(
             metric='dummy_key', sample_rate=1, tags=['key1:value1', 'key2:value2'], value=1
         )
 
-    @conf_vars({('metrics', 'statsd_on'): 'True', ('metrics', 'statsd_datadog_enabled'): 'True'})
-    @mock.patch("datadog.DogStatsd")
-    def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self, mock_dogstatsd):
-        importlib.reload(airflow.stats)
-        airflow.stats.Stats.incr("dummy_key")
-        mock_dogstatsd.return_value.increment.assert_called_once_with(
+    def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self):
+        self.dogstatsd.incr("dummy_key")
+        self.dogstatsd_client.increment.assert_called_once_with(
             metric='dummy_key', sample_rate=1, tags=[], value=1
         )
 
-    @conf_vars({('metrics', 'statsd_on'): 'True', ('metrics', 'statsd_datadog_enabled'): 'True'})
-    @mock.patch("statsd.StatsClient")
-    def test_does_not_send_stats_using_statsd_when_statsd_and_dogstatsd_both_on(self, mock_statsd):
+    def test_timer(self):
+        with self.dogstatsd.timer("dummy_timer"):
+            pass
+        self.dogstatsd_client.timed.assert_called_once_with('dummy_timer', tags=[])
+
+    def test_empty_timer(self):
+        with self.dogstatsd.timer():
+            pass
+        self.dogstatsd_client.timed.assert_not_called()
+
+    def test_timing(self):
+        self.dogstatsd.timing("dummy_timer", 123)
+        self.dogstatsd_client.timing.assert_called_once_with(metric='dummy_timer', value=123, tags=[])
+
+    def test_gauge(self):
+        self.dogstatsd.gauge("dummy", 123)
+        self.dogstatsd_client.gauge.assert_called_once_with(metric='dummy', sample_rate=1, value=123, tags=[])
+
+    def test_decr(self):
+        self.dogstatsd.decr("dummy")
+        self.dogstatsd_client.decrement.assert_called_once_with(
+            metric='dummy', sample_rate=1, value=1, tags=[]
+        )
+
+    def test_enabled_by_config(self):
+        """Test that enabling this sets the right instance properties"""
+        from datadog import DogStatsd
+
+        with conf_vars({('metrics', 'statsd_datadog_enabled'): 'True'}):
+            importlib.reload(airflow.stats)
+            assert isinstance(airflow.stats.Stats.dogstatsd, DogStatsd)
+            assert not hasattr(airflow.stats.Stats, 'statsd')
+        # Avoid side-effects
         importlib.reload(airflow.stats)
-        airflow.stats.Stats.incr("dummy_key")
-        mock_statsd.return_value.assert_not_called()
 
-    def tearDown(self) -> None:
-        # To avoid side-effect
+    def test_does_not_send_stats_using_statsd_when_statsd_and_dogstatsd_both_on(self):
+        from datadog import DogStatsd
+
+        with conf_vars({('metrics', 'statsd_on'): 'True', ('metrics', 'statsd_datadog_enabled'): 'True'}):
+            importlib.reload(airflow.stats)
+            assert isinstance(airflow.stats.Stats.dogstatsd, DogStatsd)
+            assert not hasattr(airflow.stats.Stats, 'statsd')
         importlib.reload(airflow.stats)
 
 
 class TestStatsWithAllowList(unittest.TestCase):
     def setUp(self):
-        self.statsd_client = Mock()
+        self.statsd_client = Mock(spec=statsd.StatsClient)
         self.stats = SafeStatsdLogger(self.statsd_client, AllowListValidator("stats_one, stats_two"))
 
     def test_increment_counter_with_allowed_key(self):
@@ -225,7 +235,10 @@ class TestStatsWithAllowList(unittest.TestCase):
 
 class TestDogStatsWithAllowList(unittest.TestCase):
     def setUp(self):
-        self.dogstatsd_client = Mock()
+        pytest.importorskip('datadog')
+        from datadog import DogStatsd
+
+        self.dogstatsd_client = Mock(speck=DogStatsd)
         self.dogstats = SafeDogStatsdLogger(self.dogstatsd_client, AllowListValidator("stats_one, stats_two"))
 
     def test_increment_counter_with_allowed_key(self):

[airflow] 05/11: More proper default value for namespace in K8S cleanup-pods CLI (#15060)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e66c870cda62441c93d18ec87074346fe11d20b4
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Mon Mar 29 00:00:15 2021 +0200

    More proper default value for namespace in K8S cleanup-pods CLI (#15060)
    
    Currently the default value for namespace is always 'default'.
    
    However, `conf.get('kubernetes', 'namespace')` may be a more proper
    default value for namespace in this case
    
    (cherry picked from commit b8cf46a12fba5701d9ffc0b31aac8375fbca37f9)
---
 airflow/cli/cli_parser.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index e62346a..c33a854 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -678,8 +678,8 @@ ARG_OPTION = Arg(
 # kubernetes cleanup-pods
 ARG_NAMESPACE = Arg(
     ("--namespace",),
-    default='default',
-    help="Kubernetes Namespace",
+    default=conf.get('kubernetes', 'namespace'),
+    help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in configuration.",
 )
 
 ALTERNATIVE_CONN_SPECS_ARGS = [

[airflow] 10/11: Avoid scheduler/parser manager deadlock by using non-blocking IO (#15112)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 132825be0c67e1ca855e3500736fc9e87d691d8c
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Mar 31 23:18:01 2021 +0100

    Avoid scheduler/parser manager deadlock by using non-blocking IO (#15112)
    
    There have been long standing issues where the scheduler would "stop
    responding" that we haven't been able to track down.
    
    Someone was able to catch the scheduler in this state in 2.0.1 and
    inspect it with py-spy (thanks, MatthewRBruce!)
    
    The stack traces (slightly shortened) were:
    
    ```
    Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler
    Python v3.8.7 (/usr/local/bin/python3.8)
    Thread 0x7FF5C09C8740 (active): "MainThread"
        _send (multiprocessing/connection.py:368)
        _send_bytes (multiprocessing/connection.py:411)
        send (multiprocessing/connection.py:206)
        send_callback_to_execute (airflow/utils/dag_processing.py:283)
        _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
        _schedule_dag_run (airflow/jobs/scheduler_job.py:1762)
    
    Process 77: airflow scheduler -- DagFileProcessorManager
    Python v3.8.7 (/usr/local/bin/python3.8)
    Thread 0x7FF5C09C8740 (active): "MainThread"
        _send (multiprocessing/connection.py:368)
        _send_bytes (multiprocessing/connection.py:405)
        send (multiprocessing/connection.py:206)
        _run_parsing_loop (airflow/utils/dag_processing.py:698)
        start (airflow/utils/dag_processing.py:596)
    ```
    
    What this shows is that both processes are stuck trying to send data to
    each other, but neither can proceed as both buffers are full, but since
    both are trying to send, neither side is going to read and make more
    space in the buffer. A classic deadlock!
    
    The fix for this is two fold:
    
    1) Enable non-blocking IO on the DagFileProcessorManager side.
    
       The only thing the Manager sends back up the pipe is (now, as of 2.0)
       the DagParsingStat object, and the scheduler will happily continue
       without receiving these, so in the case of a blocking error, it is
       simply better to ignore the error, continue the loop and try sending
       one again later.
    
    2) Reduce the size of DagParsingStat
    
       In the case of a large number of dag files we included the path for
       each and every one (in full) in _each_ parsing stat. Not only did the
       scheduler do nothing with this field, meaning it was larger than it
       needed to be, by making it such a large object, it increases the
       likely hood of hitting this send-buffer-full deadlock case!
    
    (cherry picked from commit b0e68ebcb88af8352919a883e38e759e6ceb4d2c)
---
 airflow/utils/dag_processing.py    | 32 +++++++++++----
 pylintrc-tests                     |  4 +-
 tests/utils/test_dag_processing.py | 80 +++++++++++++++++++++++++++++++++++++-
 3 files changed, 106 insertions(+), 10 deletions(-)

diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index f93847c..5ba44e3 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -139,7 +139,6 @@ class AbstractDagFileProcessorProcess(metaclass=ABCMeta):
 class DagParsingStat(NamedTuple):
     """Information on processing progress"""
 
-    file_paths: List[str]
     done: bool
     all_files_processed: bool
 
@@ -513,6 +512,15 @@ class DagFileProcessorManager(LoggingMixin):  # pylint: disable=too-many-instanc
         self._async_mode = async_mode
         self._parsing_start_time: Optional[int] = None
 
+        # Set the signal conn in to non-blocking mode, so that attempting to
+        # send when the buffer is full errors, rather than hangs for-ever
+        # attempting to send (this is to avoid deadlocks!)
+        #
+        # Don't do this in sync_mode, as we _need_ the DagParsingStat sent to
+        # continue the scheduler
+        if self._async_mode:
+            os.set_blocking(self._signal_conn.fileno(), False)
+
         self._parallelism = conf.getint('scheduler', 'parsing_processes')
         if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and self._parallelism > 1:
             self.log.warning(
@@ -621,6 +629,7 @@ class DagFileProcessorManager(LoggingMixin):  # pylint: disable=too-many-instanc
             ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
             if self._signal_conn in ready:
                 agent_signal = self._signal_conn.recv()
+
                 self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
                 if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
                     self.terminate()
@@ -693,12 +702,21 @@ class DagFileProcessorManager(LoggingMixin):  # pylint: disable=too-many-instanc
             all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths)
             max_runs_reached = self.max_runs_reached()
 
-            dag_parsing_stat = DagParsingStat(
-                self._file_paths,
-                max_runs_reached,
-                all_files_processed,
-            )
-            self._signal_conn.send(dag_parsing_stat)
+            try:
+                self._signal_conn.send(
+                    DagParsingStat(
+                        max_runs_reached,
+                        all_files_processed,
+                    )
+                )
+            except BlockingIOError:
+                # Try again next time around the loop!
+
+                # It is better to fail, than it is deadlock. This should
+                # "almost never happen" since the DagParsingStat object is
+                # small, and in async mode this stat is not actually _required_
+                # for normal operation (It only drives "max runs")
+                self.log.debug("BlockingIOError recived trying to send DagParsingStat, ignoring")
 
             if max_runs_reached:
                 self.log.info(
diff --git a/pylintrc-tests b/pylintrc-tests
index 5b360a9..8a7e56e 100644
--- a/pylintrc-tests
+++ b/pylintrc-tests
@@ -439,6 +439,7 @@ good-names=e,
            i,
            j,
            k,
+           n,
            v, # Commonly used when iterating dict.items()
            _,
            ti,  # Commonly used in Airflow as shorthand for taskinstance
@@ -450,7 +451,8 @@ good-names=e,
            cm,  # Commonly used as shorthand for context manager
            ds,  # Used in Airflow templates
            ts,  # Used in Airflow templates
-           id   # Commonly used as shorthand for identifier
+           id,  # Commonly used as shorthand for identifier
+           fd,  # aka "file-descriptor" -- common in socket code
 
 # Include a hint for the correct naming format with invalid-name.
 include-naming-hint=no
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index 68a950f..913711d2 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -16,10 +16,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import logging
 import multiprocessing
 import os
 import pathlib
+import socket
 import sys
+import threading
 import unittest
 from datetime import datetime, timedelta
 from tempfile import TemporaryDirectory
@@ -35,7 +38,7 @@ from airflow.models import DagBag, DagModel, TaskInstance as TI
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance
 from airflow.utils import timezone
-from airflow.utils.callback_requests import TaskCallbackRequest
+from airflow.utils.callback_requests import CallbackRequest, TaskCallbackRequest
 from airflow.utils.dag_processing import (
     DagFileProcessorAgent,
     DagFileProcessorManager,
@@ -425,17 +428,90 @@ class TestDagFileProcessorManager(unittest.TestCase):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert sum(stat.run_count for stat in manager._file_stats.values()) == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    @pytest.mark.execution_timeout(30)
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's buffer (and keeps it full).
+        def keep_pipe_full(pipe, exit_event):
+            n = 0
+            while True:
+                if exit_event.is_set():
+                    break
+
+                req = CallbackRequest(str(dag_filepath))
+                try:
+                    logging.debug("Sending CallbackRequests %d", n + 1)
+                    pipe.send(req)
+                except TypeError:
+                    # This is actually the error you get when the parent pipe
+                    # is closed! Nicely handled, eh?
+                    break
+                except OSError:
+                    break
+                n += 1
+                logging.debug("   Sent %d CallbackRequests", n)
+
+        thread = threading.Thread(target=keep_pipe_full, args=(parent_pipe, exit_event))
+
+        fake_processors = []
+
+        def fake_processor_factory(*args, **kwargs):
+            nonlocal fake_processors
+            processor = FakeDagFileProcessorRunner._fake_dag_processor_factory(*args, **kwargs)
+            fake_processors.append(processor)
+            return processor
+
+        manager = DagFileProcessorManager(
+            dag_directory=dag_filepath,
+            dag_ids=[],
+            # A reasonable large number to ensure that we trigger the deadlock
+            max_runs=100,
+            processor_factory=fake_processor_factory,
+            processor_timeout=timedelta(seconds=5),
+            signal_conn=child_pipe,
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        try:
+            thread.start()
+
+            # If this completes without hanging, then the test is good!
+            manager._run_parsing_loop()
+            exit_event.set()
+        finally:
+            logging.info("Closing pipes")
+            parent_pipe.close()
+            child_pipe.close()
+            thread.join(timeout=1.0)
+
 
 class TestDagFileProcessorAgent(unittest.TestCase):
     def setUp(self):

[airflow] 03/11: Docs: Clarify behavior of delete_worker_pods_on_failure (#14958)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7c06e2c62e4d1515ae085fce06ae329267a54633
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Mar 23 11:21:26 2021 -0600

    Docs: Clarify behavior of delete_worker_pods_on_failure (#14958)
    
    Clarify that the `delete_worker_pods_on_failure` flag only applies to worker failures, not task failures as well.
    
    (cherry picked from commit 7c2ed5394e12aa02ff280431b8d35af80d37b1f0)
---
 airflow/config_templates/config.yml          | 2 ++
 airflow/config_templates/default_airflow.cfg | 2 ++
 2 files changed, 4 insertions(+)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index a350a07..c53a6ca 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1987,6 +1987,8 @@
       description: |
         If False (and delete_worker_pods is True),
         failed worker pods will not be deleted so users can investigate them.
+        This only prevents removal of worker pods where the worker itself failed,
+        not when the task it ran failed.
       version_added: 1.10.11
       type: string
       example: ~
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 72e2e43..8e3ebf0 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -970,6 +970,8 @@ delete_worker_pods = True
 
 # If False (and delete_worker_pods is True),
 # failed worker pods will not be deleted so users can investigate them.
+# This only prevents removal of worker pods where the worker itself failed,
+# not when the task it ran failed.
 delete_worker_pods_on_failure = False
 
 # Number of Kubernetes Worker Pod creation calls per scheduler loop.

[airflow] 08/11: Re-introduce dagrun.schedule_delay metric (#15105)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f649b8a22b9bf109fe0355af6937324d69191f34
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Mar 31 13:38:14 2021 +0100

    Re-introduce dagrun.schedule_delay metric (#15105)
    
    This was mistakenly removed in the HA scheduler refactor work.
    
    It is now added back, and has tests this time so we will notice if it
    breaks in future.
    
    By using freezegun we can assert the _exact_ of the metric emitted to
    make sure it also has the correct value without introducing in
    timing-based flakiness.
    
    (cherry picked from commit ca4c4f3d343dea0a034546a896072b9c87244e71)
---
 airflow/jobs/scheduler_job.py    | 10 +++++++++-
 tests/jobs/test_scheduler_job.py | 19 ++++++++++++++++---
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e380512..1076cb6 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1637,7 +1637,7 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
             if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                dag.create_dagrun(
+                run = dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
                     start_date=timezone.utcnow(),
@@ -1648,6 +1648,14 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
                     creating_job_id=self.id,
                 )
 
+                expected_start_date = dag.following_schedule(run.execution_date)
+                if expected_start_date:
+                    schedule_delay = run.start_date - expected_start_date
+                    Stats.timing(
+                        f'dagrun.schedule_delay.{dag.dag_id}',
+                        schedule_delay,
+                    )
+
         self._update_dag_next_dagruns(dag_models, session)
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 9347fa4..7a9e273 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -30,6 +30,7 @@ from zipfile import ZipFile
 
 import psutil
 import pytest
+from freezegun import freeze_time
 from parameterized import parameterized
 from sqlalchemy import func
 
@@ -3643,8 +3644,16 @@ class TestSchedulerJob(unittest.TestCase):
                 full_filepath=dag.fileloc, dag_id=dag_id
             )
 
-    def test_scheduler_sets_job_id_on_dag_run(self):
-        dag = DAG(dag_id='test_scheduler_sets_job_id_on_dag_run', start_date=DEFAULT_DATE)
+    @freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9))
+    @mock.patch('airflow.jobs.scheduler_job.Stats.timing')
+    def test_create_dag_runs(self, stats_timing):
+        """
+        Test various invariants of _create_dag_runs.
+
+        - That the run created has the creating_job_id set
+        - That we emit the right DagRun metrics
+        """
+        dag = DAG(dag_id='test_create_dag_runs', start_date=DEFAULT_DATE)
 
         DummyOperator(
             task_id='dummy',
@@ -3652,7 +3661,7 @@ class TestSchedulerJob(unittest.TestCase):
         )
 
         dagbag = DagBag(
-            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            dag_folder=os.devnull,
             include_examples=False,
             read_dags_from_db=True,
         )
@@ -3666,6 +3675,10 @@ class TestSchedulerJob(unittest.TestCase):
         with create_session() as session:
             self.scheduler_job._create_dag_runs([dag_model], session)
 
+        stats_timing.assert_called_once_with(
+            "dagrun.schedule_delay.test_create_dag_runs", datetime.timedelta(seconds=9)
+        )
+
         assert dag.get_last_dagrun().creating_job_id == self.scheduler_job.id
 
     def test_extra_operator_links_not_loaded_in_scheduler_loop(self):

[airflow] 07/11: Remove 'conf' from search_columns in DagRun View (#15099)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 924c6609eed326c6ff99e636919fc3961a555aef
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Tue Mar 30 21:10:18 2021 +0200

    Remove 'conf' from search_columns in DagRun View (#15099)
    
    i.e. to not support filtering by 'conf' column in DagRun View.
    
    This cannot be supported because FAB uses ILIKE under the hood,
    which is not supported by 'bytea' type in Postgres or 'BLOB' in SQLite.
    
    Closes issue #14374
    
    (cherry picked from commit 3585b3c54ce930d2ce2eaeddc238201a0a867018)
---
 airflow/www/views.py | 1 -
 1 file changed, 1 deletion(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index e768464..f0116b3 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3307,7 +3307,6 @@ class DagRunModelView(AirflowModelView):
         'start_date',
         'end_date',
         'external_trigger',
-        'conf',
     ]
     edit_columns = ['state', 'dag_id', 'execution_date', 'start_date', 'end_date', 'run_id', 'conf']
 

[airflow] 06/11: Fixed deprecated code example in Concepts doc (#15098)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c50cb0165825b55976ae89d949240868dbacebe6
Author: Andrew Godwin <an...@aeracode.org>
AuthorDate: Tue Mar 30 13:53:07 2021 -0600

    Fixed deprecated code example in Concepts doc (#15098)
    
    This document used the now-deprecated import for "task";
    this updates it to come from `airflow.decorators`
    so it won't raise a DeprecationWarning if copied and used.
    
    (cherry picked from commit 1521b9665762e3457b24cef18a080aa1ee7402ef)
---
 docs/apache-airflow/concepts.rst | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/docs/apache-airflow/concepts.rst b/docs/apache-airflow/concepts.rst
index 6b5b2c3..2637b78 100644
--- a/docs/apache-airflow/concepts.rst
+++ b/docs/apache-airflow/concepts.rst
@@ -366,7 +366,8 @@ using ``@task`` decorator.
 
 .. code-block:: python
 
-    from airflow.operators.python import task, get_current_context
+    from airflow.decorators import task
+    from airflow.operators.python import get_current_context
 
     @task
     def my_task():