You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/22 19:46:37 UTC

[airflow] branch v2-2-test updated (d3e99c8 -> b50aae1)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard d3e99c8  Add changelog for 2.2.5rc1
 discard 74b6712  Fix Tasks getting stuck in scheduled state (#19747)
 discard 533dad7  Reduce DB load incurred by Stale DAG deactivation (#21399)
 discard 5782ef2  adding `on_execute_callback` to callbacks docs (#22362)
 discard 36bc0dc  Add documentation on specifying a DB schema. (#22347)
 discard 8bb8d25  Fix race condition between triggerer and scheduler (#21316)
 discard a445da8  Fix postgres part of pipeline example of tutorial (#21586)
 discard 7477ad9  Log traceback in trigger excs (#21213)
 discard b5169f4  Fix duplicate trigger creation race condition (#20699)
 discard 260f854  Set X-Frame-Options header to DENY only if X_FRAME_ENABLED is set to true. (#19491)
 discard 8822956  Rename `to_delete` to `to_cancel` in TriggerRunner (#20658)
 discard aca5249  fix: Update custom connection field processing (#20883)
 discard 5cf7edf  A trigger might use a connection; make sure we mask passwords (#21207)
 discard 266bf02  Extend documentation for states of DAGs & tasks and update trigger rules docs (#21382)
 discard 330dd07  Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413)
 discard 28f52ce  Fix Resources __eq__ check (#21442)
 discard 66c31a6  Filter out default configs when overrides exist. (#21539)
 discard 4023d14  Fix logging JDBC SQL error when task fails (#21540)
 discard 7d1abed  Disable default_pool delete on web ui (#21658)
 discard 310e979  Log exception in local executor (#21667)
 discard 7467614  Correctly handle multiple '=' in LocalFileSystem secrets. (#21694)
 discard 6fd3167  Fix stray order_by(TaskInstance.execution_date) (#21705)
 discard 8d486ff  Fix filesystem sensor for directories (#21729)
 discard ecdbacf  Fix graph autorefresh on page load (#21736)
 discard 5ed7a9b  Correct a couple grammatical errors in docs (#21750)
 discard df5cb3d  Fix triggerer --capacity parameter (#21753)
 discard ce50097  Fix the triggerer capacity test (#21760)
 discard 6f7ecb7  Fix assignment of unassigned triggers (#21770)
 discard 60b48fa  Fix incorrect data provided to tries & landing times charts (#21928)
 discard a1af418  DB upgrade is required when updating Airflow (#22061)
 discard 0d80900  Bump version to 2.2.5
     new ebb8727  Bump version to 2.2.5
     new a58a274  DB upgrade is required when updating Airflow (#22061)
     new c5f7925  Fix incorrect data provided to tries & landing times charts (#21928)
     new 33b75ff  Fix assignment of unassigned triggers (#21770)
     new 3a2fd04  Fix the triggerer capacity test (#21760)
     new eb3c4b6  Fix triggerer --capacity parameter (#21753)
     new 318a5ed  Correct a couple grammatical errors in docs (#21750)
     new 534140e  Fix graph autorefresh on page load (#21736)
     new 467c759  Fix filesystem sensor for directories (#21729)
     new 6c88c0a  Fix stray order_by(TaskInstance.execution_date) (#21705)
     new c47323d  Correctly handle multiple '=' in LocalFileSystem secrets. (#21694)
     new b1e549a  Log exception in local executor (#21667)
     new 5cb0b40  Disable default_pool delete on web ui (#21658)
     new 5f9a275  Fix logging JDBC SQL error when task fails (#21540)
     new f092ed5  Filter out default configs when overrides exist. (#21539)
     new a99d290  Fix Resources __eq__ check (#21442)
     new ed25c60  Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413)
     new f4118c3  Extend documentation for states of DAGs & tasks and update trigger rules docs (#21382)
     new 84557a5  A trigger might use a connection; make sure we mask passwords (#21207)
     new 235e802  fix: Update custom connection field processing (#20883)
     new 504e845  Rename `to_delete` to `to_cancel` in TriggerRunner (#20658)
     new 58f09cd  Set X-Frame-Options header to DENY only if X_FRAME_ENABLED is set to true. (#19491)
     new 7a4bbdb  Fix duplicate trigger creation race condition (#20699)
     new f8e8ffb  Log traceback in trigger excs (#21213)
     new fe8f560  Fix postgres part of pipeline example of tutorial (#21586)
     new 994beac  Fix race condition between triggerer and scheduler (#21316)
     new 50a0dee  Add documentation on specifying a DB schema. (#22347)
     new 5d53fcb  adding `on_execute_callback` to callbacks docs (#22362)
     new ae573d8  Reduce DB load incurred by Stale DAG deactivation (#21399)
     new 6784c8c  Fix Tasks getting stuck in scheduled state (#19747)
     new b50aae1  Add 2.2.5 to CHANGELOG.txt and UPDATING.md

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (d3e99c8)
            \
             N -- N -- N   refs/heads/v2-2-test (b50aae1)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 31 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 UPDATING.md                                             | 4 ----
 docs/apache-airflow/installation/supported-versions.rst | 2 +-
 2 files changed, 1 insertion(+), 5 deletions(-)

[airflow] 26/31: Fix race condition between triggerer and scheduler (#21316)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 994beac9844bf01821f46bd31c0c80733d9bfa94
Author: Malthe Borch <mb...@gmail.com>
AuthorDate: Tue Feb 15 13:12:51 2022 +0000

    Fix race condition between triggerer and scheduler (#21316)
    
    (cherry picked from commit 2a6792d94d153c6f2dd116843a43ee63cd296c8d)
---
 airflow/executors/base_executor.py    | 36 ++++++++++++++++++---
 tests/executors/test_base_executor.py | 60 ++++++++++++++++++++++++++++++++---
 2 files changed, 87 insertions(+), 9 deletions(-)

diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index f7ad45a..1d993bb 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -17,7 +17,7 @@
 """Base executor - this is the base class for all the implemented executors."""
 import sys
 from collections import OrderedDict
-from typing import Any, Dict, List, Optional, Set, Tuple
+from typing import Any, Counter, Dict, List, Optional, Set, Tuple
 
 from airflow.configuration import conf
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
@@ -29,6 +29,8 @@ PARALLELISM: int = conf.getint('core', 'PARALLELISM')
 
 NOT_STARTED_MESSAGE = "The executor should be started first!"
 
+QUEUEING_ATTEMPTS = 5
+
 # Command to execute - list of strings
 # the first element is always "airflow".
 # It should be result of TaskInstance.generate_command method.q
@@ -63,6 +65,7 @@ class BaseExecutor(LoggingMixin):
         self.queued_tasks: OrderedDict[TaskInstanceKey, QueuedTaskInstanceType] = OrderedDict()
         self.running: Set[TaskInstanceKey] = set()
         self.event_buffer: Dict[TaskInstanceKey, EventBufferValueType] = {}
+        self.attempts: Counter[TaskInstanceKey] = Counter()
 
     def __repr__(self):
         return f"{self.__class__.__name__}(parallelism={self.parallelism})"
@@ -78,7 +81,7 @@ class BaseExecutor(LoggingMixin):
         queue: Optional[str] = None,
     ):
         """Queues command to task"""
-        if task_instance.key not in self.queued_tasks and task_instance.key not in self.running:
+        if task_instance.key not in self.queued_tasks:
             self.log.info("Adding to queue: %s", command)
             self.queued_tasks[task_instance.key] = (command, priority, queue, task_instance)
         else:
@@ -183,9 +186,32 @@ class BaseExecutor(LoggingMixin):
 
         for _ in range(min((open_slots, len(self.queued_tasks)))):
             key, (command, _, queue, ti) = sorted_queue.pop(0)
-            self.queued_tasks.pop(key)
-            self.running.add(key)
-            self.execute_async(key=key, command=command, queue=queue, executor_config=ti.executor_config)
+
+            # If a task makes it here but is still understood by the executor
+            # to be running, it generally means that the task has been killed
+            # externally and not yet been marked as failed.
+            #
+            # However, when a task is deferred, there is also a possibility of
+            # a race condition where a task might be scheduled again during
+            # trigger processing, even before we are able to register that the
+            # deferred task has completed. In this case and for this reason,
+            # we make a small number of attempts to see if the task has been
+            # removed from the running set in the meantime.
+            if key in self.running:
+                attempt = self.attempts[key]
+                if attempt < QUEUEING_ATTEMPTS - 1:
+                    self.attempts[key] = attempt + 1
+                    self.log.info("task %s is still running", key)
+                    continue
+
+                # We give up and remove the task from the queue.
+                self.log.error("could not queue task %s (still running after %d attempts)", key, attempt)
+                del self.attempts[key]
+                del self.queued_tasks[key]
+            else:
+                del self.queued_tasks[key]
+                self.running.add(key)
+                self.execute_async(key=key, command=command, queue=queue, executor_config=ti.executor_config)
 
     def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None:
         """
diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py
index 49d6c01..40bf8eb 100644
--- a/tests/executors/test_base_executor.py
+++ b/tests/executors/test_base_executor.py
@@ -18,7 +18,9 @@
 from datetime import timedelta
 from unittest import mock
 
-from airflow.executors.base_executor import BaseExecutor
+from pytest import mark
+
+from airflow.executors.base_executor import QUEUEING_ATTEMPTS, BaseExecutor
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.taskinstance import TaskInstanceKey
 from airflow.utils import timezone
@@ -57,7 +59,7 @@ def test_gauge_executor_metrics(mock_stats_gauge, mock_trigger_tasks, mock_sync)
     mock_stats_gauge.assert_has_calls(calls)
 
 
-def test_try_adopt_task_instances(dag_maker):
+def setup_dagrun(dag_maker):
     date = timezone.utcnow()
     start_date = date - timedelta(days=2)
 
@@ -66,8 +68,58 @@ def test_try_adopt_task_instances(dag_maker):
         BaseOperator(task_id="task_2", start_date=start_date)
         BaseOperator(task_id="task_3", start_date=start_date)
 
-    dagrun = dag_maker.create_dagrun(execution_date=date)
-    tis = dagrun.task_instances
+    return dag_maker.create_dagrun(execution_date=date)
 
+
+def test_try_adopt_task_instances(dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    tis = dagrun.task_instances
     assert {ti.task_id for ti in tis} == {"task_1", "task_2", "task_3"}
     assert BaseExecutor().try_adopt_task_instances(tis) == tis
+
+
+def enqueue_tasks(executor, dagrun):
+    for task_instance in dagrun.task_instances:
+        executor.queue_command(task_instance, ["airflow"])
+
+
+def setup_trigger_tasks(dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    executor = BaseExecutor()
+    executor.execute_async = mock.Mock()
+    enqueue_tasks(executor, dagrun)
+    return executor, dagrun
+
+
+@mark.parametrize("open_slots", [1, 2, 3])
+def test_trigger_queued_tasks(dag_maker, open_slots):
+    executor, _ = setup_trigger_tasks(dag_maker)
+    executor.trigger_tasks(open_slots)
+    assert len(executor.execute_async.mock_calls) == open_slots
+
+
+@mark.parametrize("change_state_attempt", range(QUEUEING_ATTEMPTS + 2))
+def test_trigger_running_tasks(dag_maker, change_state_attempt):
+    executor, dagrun = setup_trigger_tasks(dag_maker)
+    open_slots = 100
+    executor.trigger_tasks(open_slots)
+    expected_calls = len(dagrun.task_instances)  # initially `execute_async` called for each task
+    assert len(executor.execute_async.mock_calls) == expected_calls
+
+    # All the tasks are now "running", so while we enqueue them again here,
+    # they won't be executed again until the executor has been notified of a state change.
+    enqueue_tasks(executor, dagrun)
+
+    for attempt in range(QUEUEING_ATTEMPTS + 2):
+        # On the configured attempt, we notify the executor that the task has succeeded.
+        if attempt == change_state_attempt:
+            executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
+            # If we have not exceeded QUEUEING_ATTEMPTS, we should expect an additional "execute" call
+            if attempt < QUEUEING_ATTEMPTS:
+                expected_calls += 1
+        executor.trigger_tasks(open_slots)
+        assert len(executor.execute_async.mock_calls) == expected_calls
+    if change_state_attempt < QUEUEING_ATTEMPTS:
+        assert len(executor.execute_async.mock_calls) == len(dagrun.task_instances) + 1
+    else:
+        assert len(executor.execute_async.mock_calls) == len(dagrun.task_instances)

[airflow] 01/31: Bump version to 2.2.5

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ebb8727be5c4a39d090c8cbc907894399a1dff46
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Mar 22 20:41:47 2022 +0100

    Bump version to 2.2.5
---
 README.md                                                  | 14 +++++++-------
 docs/docker-stack/README.md                                |  6 +++---
 .../docker-examples/extending/add-apt-packages/Dockerfile  |  2 +-
 .../extending/add-build-essential-extend/Dockerfile        |  2 +-
 .../docker-examples/extending/add-providers/Dockerfile     |  2 +-
 .../docker-examples/extending/add-pypi-packages/Dockerfile |  2 +-
 .../docker-examples/extending/embedding-dags/Dockerfile    |  2 +-
 .../extending/writable-directory/Dockerfile                |  2 +-
 docs/docker-stack/entrypoint.rst                           | 14 +++++++-------
 scripts/ci/pre_commit/supported_versions.py                |  2 +-
 setup.py                                                   |  2 +-
 11 files changed, 25 insertions(+), 25 deletions(-)

diff --git a/README.md b/README.md
index 35243f0..0d54c3c 100644
--- a/README.md
+++ b/README.md
@@ -82,7 +82,7 @@ Airflow is not a streaming solution, but it is often used to process real-time d
 
 Apache Airflow is tested with:
 
-|                      | Main version (dev)        | Stable version (2.2.4)   |
+|                      | Main version (dev)        | Stable version (2.2.5)   |
 | -------------------- | ------------------------- | ------------------------ |
 | Python               | 3.6, 3.7, 3.8, 3.9        | 3.6, 3.7, 3.8, 3.9       |
 | Kubernetes           | 1.18, 1.19, 1.20          | 1.18, 1.19, 1.20         |
@@ -153,15 +153,15 @@ them to the appropriate format and workflow that your tool requires.
 
 
 ```bash
-pip install 'apache-airflow==2.2.4' \
- --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.4/constraints-3.7.txt"
+pip install 'apache-airflow==2.2.5' \
+ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-3.7.txt"
 ```
 
 2. Installing with extras (i.e., postgres, google)
 
 ```bash
-pip install 'apache-airflow[postgres,google]==2.2.4' \
- --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.4/constraints-3.7.txt"
+pip install 'apache-airflow[postgres,google]==2.2.5' \
+ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-3.7.txt"
 ```
 
 For information on installing provider packages, check
@@ -266,7 +266,7 @@ Apache Airflow version life cycle:
 
 | Version   | Current Patch/Minor   | State     | First Release   | Limited Support   | EOL/Terminated   |
 |-----------|-----------------------|-----------|-----------------|-------------------|------------------|
-| 2         | 2.2.4                 | Supported | Dec 17, 2020    | TBD               | TBD              |
+| 2         | 2.2.5                 | Supported | Dec 17, 2020    | TBD               | TBD              |
 | 1.10      | 1.10.15               | EOL       | Aug 27, 2018    | Dec 17, 2020      | June 17, 2021    |
 | 1.9       | 1.9.0                 | EOL       | Jan 03, 2018    | Aug 27, 2018      | Aug 27, 2018     |
 | 1.8       | 1.8.2                 | EOL       | Mar 19, 2017    | Jan 03, 2018      | Jan 03, 2018     |
@@ -295,7 +295,7 @@ They are based on the official release schedule of Python and Kubernetes, nicely
 2. The "oldest" supported version of Python/Kubernetes is the default one until we decide to switch to
    later version. "Default" is only meaningful in terms of "smoke tests" in CI PRs, which are run using this
    default version and the default reference image available. Currently `apache/airflow:latest`
-   and `apache/airflow:2.2.4` images are Python 3.7 images as we are preparing for 23.12.2021 when will
+   and `apache/airflow:2.2.5` images are Python 3.7 images as we are preparing for 23.12.2021 when will
    Python 3.6 reaches end of life.
 
 3. We support a new version of Python/Kubernetes in main after they are officially released, as soon as we
diff --git a/docs/docker-stack/README.md b/docs/docker-stack/README.md
index d6f1d81..9965b2a 100644
--- a/docs/docker-stack/README.md
+++ b/docs/docker-stack/README.md
@@ -31,12 +31,12 @@ Every time a new version of Airflow is released, the images are prepared in the
 [apache/airflow DockerHub](https://hub.docker.com/r/apache/airflow)
 for all the supported Python versions.
 
-You can find the following images there (Assuming Airflow version `2.2.4`):
+You can find the following images there (Assuming Airflow version `2.2.5`):
 
 * `apache/airflow:latest` - the latest released Airflow image with default Python version (3.7 currently)
 * `apache/airflow:latest-pythonX.Y` - the latest released Airflow image with specific Python version
-* `apache/airflow:2.2.4` - the versioned Airflow image with default Python version (3.7 currently)
-* `apache/airflow:2.2.4-pythonX.Y` - the versioned Airflow image with specific Python version
+* `apache/airflow:2.2.5` - the versioned Airflow image with default Python version (3.7 currently)
+* `apache/airflow:2.2.5-pythonX.Y` - the versioned Airflow image with specific Python version
 
 Those are "reference" images. They contain the most common set of extras, dependencies and providers that are
 often used by the users and they are good to "try-things-out" when you want to just take Airflow for a spin,
diff --git a/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile b/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile
index 18d5461..fe8e2ee 100644
--- a/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile
@@ -15,7 +15,7 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.4
+FROM apache/airflow:2.2.5
 USER root
 RUN apt-get update \
   && apt-get install -y --no-install-recommends \
diff --git a/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile b/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile
index b5d5cd1..4ca0899 100644
--- a/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile
@@ -15,7 +15,7 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.4
+FROM apache/airflow:2.2.5
 USER root
 RUN apt-get update \
   && apt-get install -y --no-install-recommends \
diff --git a/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile b/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile
index 1786f2e..0ada4cd 100644
--- a/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile
@@ -15,6 +15,6 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.4
+FROM apache/airflow:2.2.5
 RUN pip install --no-cache-dir apache-airflow-providers-docker==2.1.0
 # [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile b/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile
index feaf714..14e5227 100644
--- a/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile
@@ -15,6 +15,6 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.4
+FROM apache/airflow:2.2.5
 RUN pip install --no-cache-dir lxml
 # [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile b/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile
index 9342fae..3bdd5b9 100644
--- a/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile
@@ -15,7 +15,7 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.4
+FROM apache/airflow:2.2.5
 
 COPY --chown=airflow:root test_dag.py /opt/airflow/dags
 
diff --git a/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile b/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile
index ffcb8ad..10cb839 100644
--- a/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile
@@ -15,7 +15,7 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.4
+FROM apache/airflow:2.2.5
 RUN umask 0002; \
     mkdir -p ~/writeable-directory
 # [END Dockerfile]
diff --git a/docs/docker-stack/entrypoint.rst b/docs/docker-stack/entrypoint.rst
index f66ac1a..2ab55cf 100644
--- a/docs/docker-stack/entrypoint.rst
+++ b/docs/docker-stack/entrypoint.rst
@@ -132,7 +132,7 @@ if you specify extra arguments. For example:
 
 .. code-block:: bash
 
-  docker run -it apache/airflow:2.2.4-python3.6 bash -c "ls -la"
+  docker run -it apache/airflow:2.2.5-python3.6 bash -c "ls -la"
   total 16
   drwxr-xr-x 4 airflow root 4096 Jun  5 18:12 .
   drwxr-xr-x 1 root    root 4096 Jun  5 18:12 ..
@@ -144,7 +144,7 @@ you pass extra parameters. For example:
 
 .. code-block:: bash
 
-  > docker run -it apache/airflow:2.2.4-python3.6 python -c "print('test')"
+  > docker run -it apache/airflow:2.2.5-python3.6 python -c "print('test')"
   test
 
 If first argument equals to "airflow" - the rest of the arguments is treated as an airflow command
@@ -152,13 +152,13 @@ to execute. Example:
 
 .. code-block:: bash
 
-   docker run -it apache/airflow:2.2.4-python3.6 airflow webserver
+   docker run -it apache/airflow:2.2.5-python3.6 airflow webserver
 
 If there are any other arguments - they are simply passed to the "airflow" command
 
 .. code-block:: bash
 
-  > docker run -it apache/airflow:2.2.4-python3.6 help
+  > docker run -it apache/airflow:2.2.5-python3.6 help
     usage: airflow [-h] GROUP_OR_COMMAND ...
 
     positional arguments:
@@ -363,7 +363,7 @@ database and creating an ``admin/admin`` Admin user with the following command:
     --env "_AIRFLOW_DB_UPGRADE=true" \
     --env "_AIRFLOW_WWW_USER_CREATE=true" \
     --env "_AIRFLOW_WWW_USER_PASSWORD=admin" \
-      apache/airflow:2.2.4-python3.8 webserver
+      apache/airflow:2.2.5-python3.8 webserver
 
 
 .. code-block:: bash
@@ -372,7 +372,7 @@ database and creating an ``admin/admin`` Admin user with the following command:
     --env "_AIRFLOW_DB_UPGRADE=true" \
     --env "_AIRFLOW_WWW_USER_CREATE=true" \
     --env "_AIRFLOW_WWW_USER_PASSWORD_CMD=echo admin" \
-      apache/airflow:2.2.4-python3.8 webserver
+      apache/airflow:2.2.5-python3.8 webserver
 
 The commands above perform initialization of the SQLite database, create admin user with admin password
 and Admin role. They also forward local port ``8080`` to the webserver port and finally start the webserver.
@@ -412,6 +412,6 @@ Example:
     --env "_AIRFLOW_DB_UPGRADE=true" \
     --env "_AIRFLOW_WWW_USER_CREATE=true" \
     --env "_AIRFLOW_WWW_USER_PASSWORD_CMD=echo admin" \
-      apache/airflow:2.2.4-python3.8 webserver
+      apache/airflow:2.2.5-python3.8 webserver
 
 This method is only available starting from Docker image of Airflow 2.1.1 and above.
diff --git a/scripts/ci/pre_commit/supported_versions.py b/scripts/ci/pre_commit/supported_versions.py
index c345006..8b52476 100755
--- a/scripts/ci/pre_commit/supported_versions.py
+++ b/scripts/ci/pre_commit/supported_versions.py
@@ -25,7 +25,7 @@ AIRFLOW_SOURCES = Path(__file__).resolve().parent.parent.parent.parent
 HEADERS = ("Version", "Current Patch/Minor", "State", "First Release", "Limited Support", "EOL/Terminated")
 
 SUPPORTED_VERSIONS = (
-    ("2", "2.2.4", "Supported", "Dec 17, 2020", "TBD", "TBD"),
+    ("2", "2.2.5", "Supported", "Dec 17, 2020", "TBD", "TBD"),
     ("1.10", "1.10.15", "EOL", "Aug 27, 2018", "Dec 17, 2020", "June 17, 2021"),
     ("1.9", "1.9.0", "EOL", "Jan 03, 2018", "Aug 27, 2018", "Aug 27, 2018"),
     ("1.8", "1.8.2", "EOL", "Mar 19, 2017", "Jan 03, 2018", "Jan 03, 2018"),
diff --git a/setup.py b/setup.py
index b82eb81..5d8f050 100644
--- a/setup.py
+++ b/setup.py
@@ -41,7 +41,7 @@ PY39 = sys.version_info >= (3, 9)
 
 logger = logging.getLogger(__name__)
 
-version = '2.2.4'
+version = '2.2.5'
 
 my_dir = dirname(__file__)
 

[airflow] 17/31: Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ed25c60fa5c5fd780d726d9da73908f14f388331
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Feb 24 08:12:12 2022 +0100

    Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413)
    
    The finished dagrun was still being seen as running when we call dag.get_num_active_runs
    because the session was not flushed. This PR fixes it
    
    (cherry picked from commit feea143af9b1db3b1f8cd8d29677f0b2b2ab757a)
---
 airflow/models/dagrun.py         |  2 ++
 tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 20ec7cd..c42604d 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -604,11 +604,13 @@ class DagRun(Base, LoggingMixin):
                 self.data_interval_end,
                 self.dag_hash,
             )
+            session.flush()
 
         self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks)
         self._emit_duration_stats_for_finished_state()
 
         session.merge(self)
+        # We do not flush here for performance reasons(It increases queries count by +20)
 
         return schedulable_tis, callback
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 7185720..168d452 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1180,6 +1180,41 @@ class TestSchedulerJob:
         assert session.query(DagRun.state).filter(DagRun.state == State.QUEUED).count() == 0
         assert orm_dag.next_dagrun_create_after is None
 
+    def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, session):
+        """
+        Test that when creating runs once max_active_runs is reached the runs does not stick
+        """
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=True)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        with dag_maker(max_active_runs=1, session=session) as dag:
+            # Need to use something that doesn't immediately get marked as success by the scheduler
+            BashOperator(task_id='task', bash_command='true')
+
+        dag_run = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            session=session,
+        )
+
+        # Reach max_active_runs
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        # Complete dagrun
+        # Add dag_run back in to the session (_do_scheduling does an expunge_all)
+        dag_run = session.merge(dag_run)
+        session.refresh(dag_run)
+        dag_run.get_task_instance(task_id='task', session=session).state = State.SUCCESS
+
+        # create new run
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        # Assert that new runs has created
+        dag_runs = DagRun.find(dag_id=dag.dag_id, session=session)
+        assert len(dag_runs) == 2
+
     def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):
         """
         Test if a a dagrun will not be scheduled if max_dag_runs

[airflow] 10/31: Fix stray order_by(TaskInstance.execution_date) (#21705)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6c88c0af7d68f1f3d7f5a441c4d676e9650801b4
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Feb 23 04:12:22 2022 +0800

    Fix stray order_by(TaskInstance.execution_date) (#21705)
    
    (cherry picked from commit bb577a98494369b22ae252ac8d23fb8e95508a1c)
---
 airflow/models/baseoperator.py | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 9e4cedd..c758aeb 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1253,18 +1253,18 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
         end_date: Optional[datetime] = None,
         session: Session = None,
     ) -> List[TaskInstance]:
-        """
-        Get a set of task instance related to this task for a specific date
-        range.
-        """
+        """Get task instances related to this task for a specific date range."""
+        from airflow.models import DagRun
+
         end_date = end_date or timezone.utcnow()
         return (
             session.query(TaskInstance)
+            .join(TaskInstance.dag_run)
             .filter(TaskInstance.dag_id == self.dag_id)
             .filter(TaskInstance.task_id == self.task_id)
-            .filter(TaskInstance.execution_date >= start_date)
-            .filter(TaskInstance.execution_date <= end_date)
-            .order_by(TaskInstance.execution_date)
+            .filter(DagRun.execution_date >= start_date)
+            .filter(DagRun.execution_date <= end_date)
+            .order_by(DagRun.execution_date)
             .all()
         )
 

[airflow] 30/31: Fix Tasks getting stuck in scheduled state (#19747)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6784c8c8ab6aad216105e14ccc1b6718889dbc08
Author: Tanel Kiis <ta...@users.noreply.github.com>
AuthorDate: Tue Mar 22 19:30:37 2022 +0200

    Fix Tasks getting stuck in scheduled state (#19747)
    
    The scheduler_job can get stuck in a state, where it is not able to queue new tasks. It will get out of this state on its own, but the time taken depends on the runtime of current tasks - this could be several hours or even days.
    
    If the scheduler can't queue any tasks because of different concurrency limits (per pool, dag or task), then on next iterations of the scheduler loop it will try to queue the same tasks. Meanwhile there could be some scheduled tasks with lower priority_weight that could be queued, but they will remain waiting.
    
    The proposed solution is to keep track of dag and task ids, that are concurrecy limited and then repeat the query with these dags and tasks filtered out.
    
    Co-authored-by: Tanel Kiis <ta...@reach-u.com>
    (cherry picked from commit cd68540ef19b36180fdd1ebe38435637586747d4)
---
 airflow/jobs/scheduler_job.py    | 339 ++++++++++++++++++++++++---------------
 tests/jobs/test_scheduler_job.py |  94 ++++++++++-
 2 files changed, 300 insertions(+), 133 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 490d507..391697c 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -27,7 +27,7 @@ import time
 import warnings
 from collections import defaultdict
 from datetime import timedelta
-from typing import Collection, DefaultDict, Dict, Iterator, List, Optional, Tuple
+from typing import Collection, DefaultDict, Dict, Iterator, List, Optional, Set, Tuple
 
 from sqlalchemy import and_, func, not_, or_, tuple_
 from sqlalchemy.exc import OperationalError
@@ -259,54 +259,16 @@ class SchedulerJob(BaseJob):
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state != DagRunState.QUEUED)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == State.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]
-        dag_max_active_tasks_map, task_concurrency_map = self.__get_concurrency_maps(
+        dag_active_tasks_map, task_concurrency_map = self.__get_concurrency_maps(
             states=list(EXECUTION_STATES), session=session
         )
 
@@ -314,124 +276,237 @@ class SchedulerJob(BaseJob):
         # Number of tasks that cannot be scheduled because of no open slot in pool
         num_starving_tasks_total = 0
 
-        # Go through each pool, and queue up a task for execution if there are
-        # any open slots in the pool.
+        # dag and task ids that can't be queued because of concurrency limits
+        starved_dags: Set[str] = set()
+        starved_tasks: Set[Tuple[str, str]] = set()
 
-        for pool, task_instances in pool_to_task_instances.items():
-            pool_name = pool
-            if pool not in pools:
-                self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool)
-                continue
+        pool_num_starving_tasks: DefaultDict[str, int] = defaultdict(int)
+
+        for loop_count in itertools.count(start=1):
 
-            open_slots = pools[pool]["open"]
+            num_starved_pools = len(starved_pools)
+            num_starved_dags = len(starved_dags)
+            num_starved_tasks = len(starved_tasks)
 
-            num_ready = len(task_instances)
-            self.log.info(
-                "Figuring out tasks to run in Pool(name=%s) with %s open slots "
-                "and %s task instances ready to be queued",
-                pool,
-                open_slots,
-                num_ready,
+            # Get task instances associated with scheduled
+            # DagRuns which are not backfilled, in the given states,
+            # and the dag is not paused
+            query = (
+                session.query(TI)
+                .join(TI.dag_run)
+                .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
+                .join(TI.dag_model)
+                .filter(not_(DM.is_paused))
+                .filter(TI.state == TaskInstanceState.SCHEDULED)
+                .options(selectinload('dag_model'))
+                .order_by(-TI.priority_weight, DR.execution_date)
             )
 
-            priority_sorted_task_instances = sorted(
-                task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)
+            if starved_pools:
+                query = query.filter(not_(TI.pool.in_(starved_pools)))
+
+            if starved_dags:
+                query = query.filter(not_(TI.dag_id.in_(starved_dags)))
+
+            if starved_tasks:
+                if settings.engine.dialect.name == 'mssql':
+                    task_filter = or_(
+                        and_(
+                            TaskInstance.dag_id == dag_id,
+                            TaskInstance.task_id == task_id,
+                        )
+                        for (dag_id, task_id) in starved_tasks
+                    )
+                else:
+                    task_filter = tuple_(TaskInstance.dag_id, TaskInstance.task_id).in_(starved_tasks)
+
+                query = query.filter(not_(task_filter))
+
+            query = query.limit(max_tis)
+
+            task_instances_to_examine: List[TI] = with_row_locks(
+                query,
+                of=TI,
+                session=session,
+                **skip_locked(session=session),
+            ).all()
+            # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
+            # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
+
+            if len(task_instances_to_examine) == 0:
+                self.log.debug("No tasks to consider for execution.")
+                break
+
+            # Put one task instance on each line
+            task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
+            self.log.info(
+                "%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str
             )
 
-            num_starving_tasks = 0
-            for current_index, task_instance in enumerate(priority_sorted_task_instances):
-                if open_slots <= 0:
-                    self.log.info("Not scheduling since there are %s open slots in pool %s", open_slots, pool)
-                    # Can't schedule any more since there are no more open slots.
-                    num_unhandled = len(priority_sorted_task_instances) - current_index
-                    num_starving_tasks += num_unhandled
-                    num_starving_tasks_total += num_unhandled
-                    break
-
-                # Check to make sure that the task max_active_tasks of the DAG hasn't been
-                # reached.
-                dag_id = task_instance.dag_id
-
-                current_max_active_tasks_per_dag = dag_max_active_tasks_map[dag_id]
-                max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks
+            pool_to_task_instances: DefaultDict[str, List[TI]] = defaultdict(list)
+            for task_instance in task_instances_to_examine:
+                pool_to_task_instances[task_instance.pool].append(task_instance)
+
+            # Go through each pool, and queue up a task for execution if there are
+            # any open slots in the pool.
+
+            for pool, task_instances in pool_to_task_instances.items():
+                pool_name = pool
+                if pool not in pools:
+                    self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool)
+                    starved_pools.add(pool_name)
+                    continue
+
+                pool_total = pools[pool]["total"]
+                open_slots = pools[pool]["open"]
+
+                num_ready = len(task_instances)
                 self.log.info(
-                    "DAG %s has %s/%s running and queued tasks",
-                    dag_id,
-                    current_max_active_tasks_per_dag,
-                    max_active_tasks_per_dag_limit,
+                    "Figuring out tasks to run in Pool(name=%s) with %s open slots "
+                    "and %s task instances ready to be queued",
+                    pool,
+                    open_slots,
+                    num_ready,
                 )
-                if current_max_active_tasks_per_dag >= max_active_tasks_per_dag_limit:
+
+                priority_sorted_task_instances = sorted(
+                    task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)
+                )
+
+                for current_index, task_instance in enumerate(priority_sorted_task_instances):
+                    if open_slots <= 0:
+                        self.log.info(
+                            "Not scheduling since there are %s open slots in pool %s", open_slots, pool
+                        )
+                        # Can't schedule any more since there are no more open slots.
+                        num_unhandled = len(priority_sorted_task_instances) - current_index
+                        pool_num_starving_tasks[pool_name] += num_unhandled
+                        num_starving_tasks_total += num_unhandled
+                        starved_pools.add(pool_name)
+                        break
+
+                    if task_instance.pool_slots > pool_total:
+                        self.log.warning(
+                            "Not executing %s. Requested pool slots (%s) are greater than "
+                            "total pool slots: '%s' for pool: %s.",
+                            task_instance,
+                            task_instance.pool_slots,
+                            pool_total,
+                            pool,
+                        )
+
+                        starved_tasks.add((task_instance.dag_id, task_instance.task_id))
+                        continue
+
+                    if task_instance.pool_slots > open_slots:
+                        self.log.info(
+                            "Not executing %s since it requires %s slots "
+                            "but there are %s open slots in the pool %s.",
+                            task_instance,
+                            task_instance.pool_slots,
+                            open_slots,
+                            pool,
+                        )
+                        pool_num_starving_tasks[pool_name] += 1
+                        num_starving_tasks_total += 1
+                        starved_tasks.add((task_instance.dag_id, task_instance.task_id))
+                        # Though we can execute tasks with lower priority if there's enough room
+                        continue
+
+                    # Check to make sure that the task max_active_tasks of the DAG hasn't been
+                    # reached.
+                    dag_id = task_instance.dag_id
+
+                    current_active_tasks_per_dag = dag_active_tasks_map[dag_id]
+                    max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks
                     self.log.info(
-                        "Not executing %s since the number of tasks running or queued "
-                        "from DAG %s is >= to the DAG's max_active_tasks limit of %s",
-                        task_instance,
+                        "DAG %s has %s/%s running and queued tasks",
                         dag_id,
+                        current_active_tasks_per_dag,
                         max_active_tasks_per_dag_limit,
                     )
-                    continue
-
-                task_concurrency_limit: Optional[int] = None
-                if task_instance.dag_model.has_task_concurrency_limits:
-                    # Many dags don't have a task_concurrency, so where we can avoid loading the full
-                    # serialized DAG the better.
-                    serialized_dag = self.dagbag.get_dag(dag_id, session=session)
-                    # If the dag is missing, fail the task and continue to the next task.
-                    if not serialized_dag:
-                        self.log.error(
-                            "DAG '%s' for task instance %s not found in serialized_dag table",
-                            dag_id,
+                    if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit:
+                        self.log.info(
+                            "Not executing %s since the number of tasks running or queued "
+                            "from DAG %s is >= to the DAG's max_active_tasks limit of %s",
                             task_instance,
+                            dag_id,
+                            max_active_tasks_per_dag_limit,
                         )
-                        session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
-                            {TI.state: State.FAILED}, synchronize_session='fetch'
-                        )
+                        starved_dags.add(dag_id)
                         continue
-                    if serialized_dag.has_task(task_instance.task_id):
-                        task_concurrency_limit = serialized_dag.get_task(
-                            task_instance.task_id
-                        ).max_active_tis_per_dag
-
-                    if task_concurrency_limit is not None:
-                        current_task_concurrency = task_concurrency_map[
-                            (task_instance.dag_id, task_instance.task_id)
-                        ]
-
-                        if current_task_concurrency >= task_concurrency_limit:
-                            self.log.info(
-                                "Not executing %s since the task concurrency for"
-                                " this task has been reached.",
+
+                    if task_instance.dag_model.has_task_concurrency_limits:
+                        # Many dags don't have a task_concurrency, so where we can avoid loading the full
+                        # serialized DAG the better.
+                        serialized_dag = self.dagbag.get_dag(dag_id, session=session)
+                        # If the dag is missing, fail the task and continue to the next task.
+                        if not serialized_dag:
+                            self.log.error(
+                                "DAG '%s' for task instance %s not found in serialized_dag table",
+                                dag_id,
                                 task_instance,
                             )
+                            session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
+                                {TI.state: State.FAILED}, synchronize_session='fetch'
+                            )
                             continue
 
-                if task_instance.pool_slots > open_slots:
-                    self.log.info(
-                        "Not executing %s since it requires %s slots "
-                        "but there are %s open slots in the pool %s.",
-                        task_instance,
-                        task_instance.pool_slots,
-                        open_slots,
-                        pool,
-                    )
-                    num_starving_tasks += 1
-                    num_starving_tasks_total += 1
-                    # Though we can execute tasks with lower priority if there's enough room
-                    continue
+                        task_concurrency_limit: Optional[int] = None
+                        if serialized_dag.has_task(task_instance.task_id):
+                            task_concurrency_limit = serialized_dag.get_task(
+                                task_instance.task_id
+                            ).max_active_tis_per_dag
+
+                        if task_concurrency_limit is not None:
+                            current_task_concurrency = task_concurrency_map[
+                                (task_instance.dag_id, task_instance.task_id)
+                            ]
+
+                            if current_task_concurrency >= task_concurrency_limit:
+                                self.log.info(
+                                    "Not executing %s since the task concurrency for"
+                                    " this task has been reached.",
+                                    task_instance,
+                                )
+                                starved_tasks.add((task_instance.dag_id, task_instance.task_id))
+                                continue
+
+                    executable_tis.append(task_instance)
+                    open_slots -= task_instance.pool_slots
+                    dag_active_tasks_map[dag_id] += 1
+                    task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
+
+                pools[pool]["open"] = open_slots
+
+            is_done = executable_tis or len(task_instances_to_examine) < max_tis
+            # Check this to avoid accidental infinite loops
+            found_new_filters = (
+                len(starved_pools) > num_starved_pools
+                or len(starved_dags) > num_starved_dags
+                or len(starved_tasks) > num_starved_tasks
+            )
 
-                executable_tis.append(task_instance)
-                open_slots -= task_instance.pool_slots
-                dag_max_active_tasks_map[dag_id] += 1
-                task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
+            if is_done or not found_new_filters:
+                break
+
+            self.log.debug(
+                "Found no task instances to queue on the %s. iteration "
+                "but there could be more candidate task instances to check.",
+                loop_count,
+            )
 
+        for pool_name, num_starving_tasks in pool_num_starving_tasks.items():
             Stats.gauge(f'pool.starving_tasks.{pool_name}', num_starving_tasks)
 
         Stats.gauge('scheduler.tasks.starving', num_starving_tasks_total)
         Stats.gauge('scheduler.tasks.running', num_tasks_in_executor)
         Stats.gauge('scheduler.tasks.executable', len(executable_tis))
 
-        task_instance_str = "\n\t".join(repr(x) for x in executable_tis)
-        self.log.info("Setting the following tasks to queued state:\n\t%s", task_instance_str)
         if len(executable_tis) > 0:
+            task_instance_str = "\n\t".join(repr(x) for x in executable_tis)
+            self.log.info("Setting the following tasks to queued state:\n\t%s", task_instance_str)
+
             # set TIs to queued state
             filter_for_tis = TI.filter_for_tis(executable_tis)
             session.query(TI).filter(filter_for_tis).update(
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 168d452..db39977 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -466,6 +466,7 @@ class TestSchedulerJob:
             dr2.get_task_instance(task_id_1, session=session),
             dr2.get_task_instance(task_id_2, session=session),
         ]
+        tis = sorted(tis, key=lambda ti: ti.key)
         for ti in tis:
             ti.state = State.SCHEDULED
             session.merge(ti)
@@ -482,7 +483,7 @@ class TestSchedulerJob:
         for ti in res:
             res_keys.append(ti.key)
         assert tis[0].key in res_keys
-        assert tis[1].key in res_keys
+        assert tis[2].key in res_keys
         assert tis[3].key in res_keys
         session.rollback()
 
@@ -899,6 +900,97 @@ class TestSchedulerJob:
 
         session.rollback()
 
+    def test_find_executable_task_instances_not_enough_pool_slots_for_first(self, dag_maker):
+        set_default_pool_slots(1)
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_enough_pool_slots_for_first'
+        with dag_maker(dag_id=dag_id):
+            op1 = DummyOperator(task_id='dummy1', priority_weight=2, pool_slots=2)
+            op2 = DummyOperator(task_id='dummy2', priority_weight=1, pool_slots=1)
+
+        dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ti1 = dr1.get_task_instance(op1.task_id, session)
+        ti2 = dr1.get_task_instance(op2.task_id, session)
+        ti1.state = State.SCHEDULED
+        ti2.state = State.SCHEDULED
+        session.flush()
+
+        # Schedule ti with lower priority,
+        # because the one with higher priority is limited by a concurrency limit
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        assert 1 == len(res)
+        assert res[0].key == ti2.key
+
+        session.rollback()
+
+    def test_find_executable_task_instances_not_enough_dag_concurrency_for_first(self, dag_maker):
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_id_1 = (
+            'SchedulerJobTest.test_find_executable_task_instances_not_enough_dag_concurrency_for_first-a'
+        )
+        dag_id_2 = (
+            'SchedulerJobTest.test_find_executable_task_instances_not_enough_dag_concurrency_for_first-b'
+        )
+
+        with dag_maker(dag_id=dag_id_1, max_active_tasks=1):
+            op1a = DummyOperator(task_id='dummy1-a', priority_weight=2)
+            op1b = DummyOperator(task_id='dummy1-b', priority_weight=2)
+        dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        with dag_maker(dag_id=dag_id_2):
+            op2 = DummyOperator(task_id='dummy2', priority_weight=1)
+        dr2 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ti1a = dr1.get_task_instance(op1a.task_id, session)
+        ti1b = dr1.get_task_instance(op1b.task_id, session)
+        ti2 = dr2.get_task_instance(op2.task_id, session)
+        ti1a.state = State.RUNNING
+        ti1b.state = State.SCHEDULED
+        ti2.state = State.SCHEDULED
+        session.flush()
+
+        # Schedule ti with lower priority,
+        # because the one with higher priority is limited by a concurrency limit
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        assert 1 == len(res)
+        assert res[0].key == ti2.key
+
+        session.rollback()
+
+    def test_find_executable_task_instances_not_enough_task_concurrency_for_first(self, dag_maker):
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_enough_task_concurrency_for_first'
+
+        with dag_maker(dag_id=dag_id):
+            op1a = DummyOperator(task_id='dummy1-a', priority_weight=2, max_active_tis_per_dag=1)
+            op1b = DummyOperator(task_id='dummy1-b', priority_weight=1)
+        dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+        dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
+
+        ti1a = dr1.get_task_instance(op1a.task_id, session)
+        ti1b = dr1.get_task_instance(op1b.task_id, session)
+        ti2a = dr2.get_task_instance(op1a.task_id, session)
+        ti1a.state = State.RUNNING
+        ti1b.state = State.SCHEDULED
+        ti2a.state = State.SCHEDULED
+        session.flush()
+
+        # Schedule ti with lower priority,
+        # because the one with higher priority is limited by a concurrency limit
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        assert 1 == len(res)
+        assert res[0].key == ti1b.key
+
+        session.rollback()
+
     def test_enqueue_task_instances_with_queued_state(self, dag_maker):
         dag_id = 'SchedulerJobTest.test_enqueue_task_instances_with_queued_state'
         task_id_1 = 'dummy'

[airflow] 29/31: Reduce DB load incurred by Stale DAG deactivation (#21399)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ae573d847db344b5b99406cf0d685adf49afd540
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Sun Mar 20 00:17:42 2022 -0700

    Reduce DB load incurred by Stale DAG deactivation (#21399)
    
    Deactivating stale DAGs periodically in bulk
    
    By moving this logic into the DagFileProcessorManager and running it across all processed file periodically, we can prevent the use of un-indexed queries.
    
    The basic logic is that we can look at the last processed time of a file (for a given processor) and compare that to the last_parsed_time of an entry in the dag table. If the file has been processed significantly more recently than the DAG has been updated, then its safe to assume that the DAG is missing and can be marked inactive.
    
    (cherry picked from commit f309ea78f7d8b62383bc41eac217681a0916382b)
---
 airflow/config_templates/config.yml          |  8 +++++
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/dag_processing/manager.py            | 43 ++++++++++++++++++++++++-
 airflow/dag_processing/processor.py          | 11 -------
 tests/dag_processing/test_manager.py         | 48 +++++++++++++++++++++++++++-
 tests/dag_processing/test_processor.py       | 25 ---------------
 6 files changed, 101 insertions(+), 38 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 1e77041..cc5817a 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1767,6 +1767,14 @@
       type: string
       example: ~
       default: "30"
+    - name: deactivate_stale_dags_interval
+      description: |
+        How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
+        the expected files) which should be deactivated.
+      version_added: 2.3.0
+      type: integer
+      example: ~
+      default: "60"
     - name: dag_dir_list_interval
       description: |
         How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 826eaf4..8bd20ed 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -886,6 +886,10 @@ scheduler_idle_sleep_time = 1
 # this interval. Keeping this number low will increase CPU usage.
 min_file_process_interval = 30
 
+# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
+# the expected files) which should be deactivated.
+deactivate_stale_dags_interval = 60
+
 # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
 dag_dir_list_interval = 300
 
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 6c78aa6..315a3a5 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -468,9 +468,12 @@ class DagFileProcessorManager(LoggingMixin):
         self.last_stat_print_time = 0
         # TODO: Remove magic number
         self._zombie_query_interval = 10
+        # Last time we cleaned up DAGs which are no longer in files
+        self.last_deactivate_stale_dags_time = timezone.make_aware(datetime.fromtimestamp(0))
+        # How often to check for DAGs which are no longer in files
+        self.deactivate_stale_dags_interval = conf.getint('scheduler', 'deactivate_stale_dags_interval')
         # How long to wait before timing out a process to parse a DAG file
         self._processor_timeout = processor_timeout
-
         # How often to scan the DAGs directory for new files. Default to 5 minutes.
         self.dag_dir_list_interval = conf.getint('scheduler', 'dag_dir_list_interval')
 
@@ -519,6 +522,43 @@ class DagFileProcessorManager(LoggingMixin):
 
         return self._run_parsing_loop()
 
+    @provide_session
+    def _deactivate_stale_dags(self, session=None):
+        """Detects DAGs which are no longer present in files and deactivate them."""
+        now = timezone.utcnow()
+        elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
+        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+            last_parsed = {
+                fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
+            }
+            to_deactivate = set()
+            dags_parsed = (
+                session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time)
+                .filter(DagModel.is_active)
+                .all()
+            )
+            for dag in dags_parsed:
+                # The largest valid difference between a DagFileStat's last_finished_time and a DAG's
+                # last_parsed_time is _processor_timeout. Longer than that indicates that the DAG is
+                # no longer present in the file.
+                if (
+                    dag.fileloc in last_parsed
+                    and (dag.last_parsed_time + self._processor_timeout) < last_parsed[dag.fileloc]
+                ):
+                    self.log.info(f"DAG {dag.dag_id} is missing and will be deactivated.")
+                    to_deactivate.add(dag.dag_id)
+
+            if to_deactivate:
+                deactivated = (
+                    session.query(DagModel)
+                    .filter(DagModel.dag_id.in_(to_deactivate))
+                    .update({DagModel.is_active: False}, synchronize_session="fetch")
+                )
+                if deactivated:
+                    self.log.info("Deactivated %i DAGs which are no longer present in file.", deactivated)
+
+            self.last_deactivate_stale_dags_time = timezone.utcnow()
+
     def _run_parsing_loop(self):
 
         # In sync mode we want timeout=None -- wait forever until a message is received
@@ -581,6 +621,7 @@ class DagFileProcessorManager(LoggingMixin):
                 self.waitables.pop(sentinel)
                 self._processors.pop(processor.file_path)
 
+            self._deactivate_stale_dags()
             self._refresh_dag_dir()
             self._find_zombies()
 
diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index e59c818..a6cf372 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -648,8 +648,6 @@ class DagFileProcessor(LoggingMixin):
             Stats.incr('dag_file_refresh_error', 1, 1)
             return 0, 0
 
-        self._deactivate_missing_dags(session, dagbag, file_path)
-
         if len(dagbag.dags) > 0:
             self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
         else:
@@ -679,12 +677,3 @@ class DagFileProcessor(LoggingMixin):
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
-
-    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
-        deactivated = (
-            session.query(DagModel)
-            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
-            .update({DagModel.is_active: False}, synchronize_session="fetch")
-        )
-        if deactivated:
-            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index b549b2b..abba415 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -465,7 +465,6 @@ class TestDagFileProcessorManager:
             pickle_dags=False,
             async_mode=True,
         )
-
         dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
         with create_session() as session:
             session.query(LJ).delete()
@@ -596,6 +595,53 @@ class TestDagFileProcessorManager:
             child_pipe.close()
             parent_pipe.close()
 
+    def test_deactivate_stale_dags(self):
+        """
+        Ensure that DAGs are marked inactive when the file is parsed but the
+        DagModel.last_parsed_time is not updated.
+        """
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=1,
+            processor_timeout=timedelta(minutes=10),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+        test_dag_path = str(TEST_DAG_FOLDER / 'test_example_bash_operator.py')
+        dagbag = DagBag(test_dag_path, read_dags_from_db=False)
+
+        with create_session() as session:
+            # Add stale DAG to the DB
+            dag = dagbag.get_dag('test_example_bash_operator')
+            dag.last_parsed_time = timezone.utcnow()
+            dag.sync_to_db()
+
+            # Add DAG to the file_parsing_stats
+            stat = DagFileStat(
+                num_dags=1,
+                import_errors=0,
+                last_finish_time=timezone.utcnow() + timedelta(hours=1),
+                last_duration=1,
+                run_count=1,
+            )
+            manager._file_paths = [test_dag_path]
+            manager._file_stats[test_dag_path] = stat
+
+            active_dags = (
+                session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all()
+            )
+            assert len(active_dags) == 1
+
+            manager._file_stats[test_dag_path] = stat
+            manager._deactivate_stale_dags()
+            active_dags = (
+                session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all()
+            )
+
+            assert len(active_dags) == 0
+
     @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
     @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
     def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index c9ecfb0..c0d2267 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -700,31 +700,6 @@ class TestDagFileProcessor:
             assert import_error.stacktrace == expected_stacktrace.format(invalid_dag_filename)
             session.rollback()
 
-    def test_process_file_should_deactivate_missing_dags(self):
-
-        dag_file = os.path.join(
-            os.path.dirname(os.path.realpath(__file__)), '../dags/test_only_dummy_tasks.py'
-        )
-
-        # write a DAG into the DB which is not present in its specified file
-        with create_session() as session:
-            orm_dag = DagModel(dag_id='missing_dag', is_active=True, fileloc=dag_file)
-            session.merge(orm_dag)
-            session.commit()
-
-        session = settings.Session()
-
-        dags = session.query(DagModel).all()
-        assert [dag.dag_id for dag in dags if dag.is_active] == ['missing_dag']
-
-        # re-parse the file and see that the DAG is no longer there
-        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
-        dag_file_processor.process_file(dag_file, [])
-
-        dags = session.query(DagModel).all()
-        assert [dag.dag_id for dag in dags if dag.is_active] == ['test_only_dummy_tasks']
-        assert [dag.dag_id for dag in dags if not dag.is_active] == ['missing_dag']
-
 
 class TestProcessorAgent:
     @pytest.fixture(autouse=True)

[airflow] 27/31: Add documentation on specifying a DB schema. (#22347)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 50a0dee334a6b2fe369fa7f5ea5e51a97cfc2bf4
Author: nick <ni...@shook.family>
AuthorDate: Sun Mar 20 00:28:19 2022 -0700

    Add documentation on specifying a DB schema. (#22347)
    
    * Add documentation on specifying a DB schema.
    
    From request - https://github.com/apache/airflow/issues/17374#issuecomment-1060019956
    
    Co-authored-by: Nick Shook <ni...@apple.com>
    (cherry picked from commit 3bc0da326e7963eab9154913d5cafbc1be1c1a67)
---
 docs/apache-airflow/howto/set-up-database.rst | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/docs/apache-airflow/howto/set-up-database.rst b/docs/apache-airflow/howto/set-up-database.rst
index 14051b3..e556ec6 100644
--- a/docs/apache-airflow/howto/set-up-database.rst
+++ b/docs/apache-airflow/howto/set-up-database.rst
@@ -314,6 +314,16 @@ Other configuration options
 
 There are more configuration options for configuring SQLAlchemy behavior. For details, see :ref:`reference documentation <config:core>` for ``sqlalchemy_*`` option in ``[core]`` section.
 
+For instance, you can specify a database schema where Airflow will create its required tables. If you want Airflow to install its tables in the ``airflow`` schema of a PostgreSQL database, specify these environment variables:
+
+.. code-block:: bash
+
+    export AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow"
+    export AIRFLOW__CORE__SQL_ALCHEMY_SCHEMA="airflow"
+
+Note the ``search_path`` at the end of the ``SQL_ALCHEMY_CONN`` database URL.
+
+
 Initialize the database
 -----------------------
 

[airflow] 02/31: DB upgrade is required when updating Airflow (#22061)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a58a274f221aee844bcac46e775e3fa2767162eb
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Mar 7 15:05:58 2022 -0700

    DB upgrade is required when updating Airflow (#22061)
    
    Just strengthen the language that it is "required", not "recommended" to
    run `airflow db upgrade` when upgrading Airflow versions.
    
    (cherry picked from commit e7fed6bb3d7c8def86b47204176cebbfc6c401ff)
---
 docs/apache-airflow/installation/upgrading.rst | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/apache-airflow/installation/upgrading.rst b/docs/apache-airflow/installation/upgrading.rst
index 929c604..1f7687e 100644
--- a/docs/apache-airflow/installation/upgrading.rst
+++ b/docs/apache-airflow/installation/upgrading.rst
@@ -21,9 +21,9 @@ Upgrading Airflow to a newer version
 Why you need to upgrade
 =======================
 
-Newer Airflow versions can contain Database migrations so it is recommended that you run
-``airflow db upgrade`` to Upgrade your Database with the schema changes in the Airflow version
-you are upgrading to.
+Newer Airflow versions can contain database migrations so you must run ``airflow db upgrade``
+to upgrade your database with the schema changes in the Airflow version you are upgrading to.
+Don't worry, it's safe to run even if there are no migrations to perform.
 
 When you need to upgrade
 ========================

[airflow] 03/31: Fix incorrect data provided to tries & landing times charts (#21928)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c5f7925545a844ffad0c5585115592c6296c9e96
Author: Alexander Millin <a....@city-mobil.ru>
AuthorDate: Wed Mar 2 13:29:07 2022 +0300

    Fix incorrect data provided to tries & landing times charts (#21928)
    
    (cherry picked from commit 2c57ad4ff9ddde8102c62f2e25c2a2e82cceb3e7)
---
 airflow/www/views.py | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 9ebe899..9fc61b5 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2727,6 +2727,8 @@ class Airflow(AirflowBaseView):
             y_points = []
             x_points = []
             for ti in tis:
+                if ti.task_id != task.task_id:
+                    continue
                 dttm = wwwutils.epoch(ti.execution_date)
                 x_points.append(dttm)
                 # y value should reflect completed tries to have a 0 baseline.
@@ -2802,6 +2804,8 @@ class Airflow(AirflowBaseView):
             y_points[task_id] = []
             x_points[task_id] = []
             for ti in tis:
+                if ti.task_id != task.task_id:
+                    continue
                 ts = dag.get_run_data_interval(ti.dag_run).end
                 if ti.end_date:
                     dttm = wwwutils.epoch(ti.execution_date)

[airflow] 24/31: Log traceback in trigger excs (#21213)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f8e8ffbbf0877e9082d427d1eff11410f6c629ce
Author: Malthe Borch <mb...@gmail.com>
AuthorDate: Mon Feb 28 22:41:39 2022 +0000

    Log traceback in trigger excs (#21213)
    
    (cherry picked from commit 4ad21f5f7c2d416cf813a860564bc2bf3e161d46)
---
 airflow/__init__.py              |  1 +
 airflow/jobs/triggerer_job.py    | 18 ++++++++++--------
 airflow/models/taskinstance.py   |  3 +++
 airflow/models/trigger.py        |  6 ++++--
 tests/jobs/test_triggerer_job.py | 11 ++++++++---
 5 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/airflow/__init__.py b/airflow/__init__.py
index 173adc5..3421220 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -26,6 +26,7 @@ in their PYTHONPATH. airflow_login should be based off the
 isort:skip_file
 """
 
+
 # flake8: noqa: F401
 
 import sys
diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index dff0e0f..bd145f7 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -168,8 +168,8 @@ class TriggererJob(BaseJob):
         """
         while self.runner.failed_triggers:
             # Tell the model to fail this trigger's deps
-            trigger_id = self.runner.failed_triggers.popleft()
-            Trigger.submit_failure(trigger_id=trigger_id)
+            trigger_id, saved_exc = self.runner.failed_triggers.popleft()
+            Trigger.submit_failure(trigger_id=trigger_id, exc=saved_exc)
             # Emit stat event
             Stats.incr('triggers.failed')
 
@@ -211,7 +211,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
     events: Deque[Tuple[int, TriggerEvent]]
 
     # Outbound queue of failed triggers
-    failed_triggers: Deque[int]
+    failed_triggers: Deque[Tuple[int, BaseException]]
 
     # Should-we-stop flag
     stop: bool = False
@@ -291,6 +291,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         for trigger_id, details in list(self.triggers.items()):  # pylint: disable=too-many-nested-blocks
             if details["task"].done():
                 # Check to see if it exited for good reasons
+                saved_exc = None
                 try:
                     result = details["task"].result()
                 except (asyncio.CancelledError, SystemExit, KeyboardInterrupt):
@@ -301,7 +302,8 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                     continue
                 except BaseException as e:
                     # This is potentially bad, so log it.
-                    self.log.error("Trigger %s exited with error %s", details["name"], e)
+                    self.log.exception("Trigger %s exited with error %s", details["name"], e)
+                    saved_exc = e
                 else:
                     # See if they foolishly returned a TriggerEvent
                     if isinstance(result, TriggerEvent):
@@ -315,7 +317,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                         "Trigger %s exited without sending an event. Dependent tasks will be failed.",
                         details["name"],
                     )
-                    self.failed_triggers.append(trigger_id)
+                    self.failed_triggers.append((trigger_id, saved_exc))
                 del self.triggers[trigger_id]
             await asyncio.sleep(0)
 
@@ -386,7 +388,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
             running_trigger_ids.union(x[0] for x in self.events)
             .union(self.to_cancel)
             .union(x[0] for x in self.to_create)
-            .union(self.failed_triggers)
+            .union(trigger[0] for trigger in self.failed_triggers)
         )
         # Work out the two difference sets
         new_trigger_ids = requested_trigger_ids - known_trigger_ids
@@ -402,9 +404,9 @@ class TriggerRunner(threading.Thread, LoggingMixin):
             # Resolve trigger record into an actual class instance
             try:
                 trigger_class = self.get_trigger_by_classpath(new_triggers[new_id].classpath)
-            except BaseException:
+            except BaseException as e:
                 # Either the trigger code or the path to it is bad. Fail the trigger.
-                self.failed_triggers.append(new_id)
+                self.failed_triggers.append((new_id, e))
                 continue
             self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs)))
         # Enqueue orphaned triggers for cancellation
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2dcc923..f6440b4 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1486,6 +1486,9 @@ class TaskInstance(Base, LoggingMixin):
             # this task was scheduled specifically to fail.
             if self.next_method == "__fail__":
                 next_kwargs = self.next_kwargs or {}
+                traceback = self.next_kwargs.get("traceback")
+                if traceback is not None:
+                    self.log.error("Trigger failed:\n%s", "\n".join(traceback))
                 raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
             # Grab the callable off the Operator/Task and add in any kwargs
             execute_callable = getattr(task_copy, self.next_method)
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index aa0d2b1..d222fc7 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import datetime
+from traceback import format_exception
 from typing import Any, Dict, List, Optional
 
 from sqlalchemy import Column, Integer, String, func, or_
@@ -124,7 +125,7 @@ class Trigger(Base):
 
     @classmethod
     @provide_session
-    def submit_failure(cls, trigger_id, session=None):
+    def submit_failure(cls, trigger_id, exc=None, session=None):
         """
         Called when a trigger has failed unexpectedly, and we need to mark
         everything that depended on it as failed. Notably, we have to actually
@@ -144,8 +145,9 @@ class Trigger(Base):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the error and set the next_method to the fail state
+            traceback = format_exception(type(exc), exc, exc.__traceback__) if exc else None
             task_instance.next_method = "__fail__"
-            task_instance.next_kwargs = {"error": "Trigger failure"}
+            task_instance.next_kwargs = {"error": "Trigger failure", "traceback": traceback}
             # Remove ourselves as its trigger
             task_instance.trigger_id = None
             # Finally, mark it as scheduled so it gets re-queued
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 870116a..eee5ee2 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -390,7 +390,11 @@ def test_trigger_failing(session):
         # Wait for up to 3 seconds for it to fire and appear in the event queue
         for _ in range(30):
             if job.runner.failed_triggers:
-                assert list(job.runner.failed_triggers) == [1]
+                assert len(job.runner.failed_triggers) == 1
+                trigger_id, exc = list(job.runner.failed_triggers)[0]
+                assert trigger_id == 1
+                assert isinstance(exc, ValueError)
+                assert exc.args[0] == "Deliberate trigger failure"
                 break
             time.sleep(0.1)
         else:
@@ -448,7 +452,7 @@ def test_invalid_trigger(session, dag_maker):
     job.load_triggers()
 
     # Make sure it turned up in the failed queue
-    assert list(job.runner.failed_triggers) == [1]
+    assert len(job.runner.failed_triggers) == 1
 
     # Run the failed trigger handler
     job.handle_failed_triggers()
@@ -458,4 +462,5 @@ def test_invalid_trigger(session, dag_maker):
     task_instance.refresh_from_db()
     assert task_instance.state == TaskInstanceState.SCHEDULED
     assert task_instance.next_method == "__fail__"
-    assert task_instance.next_kwargs == {'error': 'Trigger failure'}
+    assert task_instance.next_kwargs['error'] == 'Trigger failure'
+    assert task_instance.next_kwargs['traceback'][-1] == "ModuleNotFoundError: No module named 'fake'\n"

[airflow] 14/31: Fix logging JDBC SQL error when task fails (#21540)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5f9a27536c348771db4f5308f5437581fa365717
Author: hubert-pietron <94...@users.noreply.github.com>
AuthorDate: Sun Feb 27 14:07:14 2022 +0100

    Fix logging JDBC SQL error when task fails (#21540)
    
    (cherry picked from commit bc1b422e1ce3a5b170618a7a6589f8ae2fc33ad6)
---
 airflow/utils/log/secrets_masker.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py
index c263cbb..6cdfd0e 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -145,7 +145,12 @@ class SecretsMasker(logging.Filter):
         return frozenset(record.__dict__).difference({'msg', 'args'})
 
     def _redact_exception_with_context(self, exception):
-        exception.args = (self.redact(v) for v in exception.args)
+        # Exception class may not be modifiable (e.g. declared by an
+        # extension module such as JDBC).
+        try:
+            exception.args = (self.redact(v) for v in exception.args)
+        except AttributeError:
+            pass
         if exception.__context__:
             self._redact_exception_with_context(exception.__context__)
         if exception.__cause__ and exception.__cause__ is not exception.__context__:

[airflow] 06/31: Fix triggerer --capacity parameter (#21753)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit eb3c4b60a5e7f5f18d85a6c45ae5360a6a8b07a4
Author: Sumit Maheshwari <ms...@users.noreply.github.com>
AuthorDate: Wed Feb 23 15:50:13 2022 +0530

    Fix triggerer --capacity parameter (#21753)
    
    (cherry picked from commit 9076b67c05cdba23e8fa51ebe5ad7f7d53e1c2ba)
---
 airflow/cli/cli_parser.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 64cc582..e9939be 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -758,7 +758,7 @@ ARG_INCLUDE_DAGS = Arg(
 # triggerer
 ARG_CAPACITY = Arg(
     ("--capacity",),
-    type=str,
+    type=positive_int(allow_zero=False),
     help="The maximum number of triggers that a Triggerer will run at one time.",
 )
 

[airflow] 13/31: Disable default_pool delete on web ui (#21658)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5cb0b401c0f4a57befcba3bbfed7117179e05d0c
Author: Chenglong Yan <al...@gmail.com>
AuthorDate: Wed Mar 16 16:31:12 2022 +0800

    Disable default_pool delete on web ui (#21658)
    
    (cherry picked from commit df6058c862a910a99fbb86858502d9d93fdbe1e5)
---
 airflow/api/common/experimental/pool.py |  2 +-
 airflow/models/pool.py                  | 19 ++++++++++++++++++-
 airflow/www/views.py                    | 13 ++++++++++++-
 tests/models/test_pool.py               |  6 ++++++
 4 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py
index fe4f161..b1ca9f0 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -83,7 +83,7 @@ def delete_pool(name, session=None):
         raise AirflowBadRequest("Pool name shouldn't be empty")
 
     if name == Pool.DEFAULT_POOL_NAME:
-        raise AirflowBadRequest("default_pool cannot be deleted")
+        raise AirflowBadRequest(f"{Pool.DEFAULT_POOL_NAME} cannot be deleted")
 
     pool = session.query(Pool).filter_by(pool=name).first()
     if pool is None:
diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index 8ae88aa..7d092f7 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -86,6 +86,23 @@ class Pool(Base):
 
     @staticmethod
     @provide_session
+    def is_default_pool(id: int, session: Session = NEW_SESSION) -> bool:
+        """
+        Check id if is the default_pool.
+
+        :param id: pool id
+        :param session: SQLAlchemy ORM Session
+        :return: True if id is default_pool, otherwise False
+        """
+        return (
+            session.query(func.count(Pool.id))
+            .filter(Pool.id == id, Pool.pool == Pool.DEFAULT_POOL_NAME)
+            .scalar()
+            > 0
+        )
+
+    @staticmethod
+    @provide_session
     def create_or_update_pool(name: str, slots: int, description: str, session: Session = NEW_SESSION):
         """Create a pool with given parameters or update it if it already exists."""
         if not name:
@@ -107,7 +124,7 @@ class Pool(Base):
     def delete_pool(name: str, session: Session = NEW_SESSION):
         """Delete pool by a given name."""
         if name == Pool.DEFAULT_POOL_NAME:
-            raise AirflowException("default_pool cannot be deleted")
+            raise AirflowException(f"{Pool.DEFAULT_POOL_NAME} cannot be deleted")
 
         pool = session.query(Pool).filter_by(pool=name).first()
         if pool is None:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 9fc61b5..d6bc40b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3727,13 +3727,24 @@ class PoolModelView(AirflowModelView):
     def action_muldelete(self, items):
         """Multiple delete."""
         if any(item.pool == models.Pool.DEFAULT_POOL_NAME for item in items):
-            flash("default_pool cannot be deleted", 'error')
+            flash(f"{models.Pool.DEFAULT_POOL_NAME} cannot be deleted", 'error')
             self.update_redirect()
             return redirect(self.get_redirect())
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @expose("/delete/<pk>", methods=["GET", "POST"])
+    @has_access
+    def delete(self, pk):
+        """Single delete."""
+        if models.Pool.is_default_pool(pk):
+            flash(f"{models.Pool.DEFAULT_POOL_NAME} cannot be deleted", 'error')
+            self.update_redirect()
+            return redirect(self.get_redirect())
+
+        return super().delete(pk)
+
     def pool_link(self):
         """Pool link rendering."""
         pool_id = self.get('pool')
diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py
index 95e585e..1c5bbe1 100644
--- a/tests/models/test_pool.py
+++ b/tests/models/test_pool.py
@@ -220,3 +220,9 @@ class TestPool:
     def test_delete_default_pool_not_allowed(self):
         with pytest.raises(AirflowException, match="^default_pool cannot be deleted$"):
             Pool.delete_pool(Pool.DEFAULT_POOL_NAME)
+
+    def test_is_default_pool(self):
+        pool = Pool.create_or_update_pool(name="not_default_pool", slots=1, description="test")
+        default_pool = Pool.get_default_pool()
+        assert not Pool.is_default_pool(id=pool.id)
+        assert Pool.is_default_pool(str(default_pool.id))

[airflow] 16/31: Fix Resources __eq__ check (#21442)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a99d290311f9e22762e750c2513e28863c41dad3
Author: Ping Zhang <pi...@umich.edu>
AuthorDate: Thu Feb 10 04:40:51 2022 -0800

    Fix Resources __eq__ check (#21442)
    
    (cherry picked from commit 6b308446eae2f83bf379f976c7d7801aa53370a3)
---
 airflow/utils/operator_resources.py    |  4 ++++
 tests/utils/test_operator_resources.py | 35 ++++++++++++++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git a/airflow/utils/operator_resources.py b/airflow/utils/operator_resources.py
index 8781021..010bf33 100644
--- a/airflow/utils/operator_resources.py
+++ b/airflow/utils/operator_resources.py
@@ -53,6 +53,8 @@ class Resource:
         self._qty = qty
 
     def __eq__(self, other):
+        if not isinstance(other, self.__class__):
+            return NotImplemented
         return self.__dict__ == other.__dict__
 
     def __repr__(self):
@@ -133,6 +135,8 @@ class Resources:
         self.gpus = GpuResource(gpus)
 
     def __eq__(self, other):
+        if not isinstance(other, self.__class__):
+            return NotImplemented
         return self.__dict__ == other.__dict__
 
     def __repr__(self):
diff --git a/tests/utils/test_operator_resources.py b/tests/utils/test_operator_resources.py
new file mode 100644
index 0000000..fb15580
--- /dev/null
+++ b/tests/utils/test_operator_resources.py
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow.utils.operator_resources import Resources
+
+
+class TestResources(unittest.TestCase):
+    def test_resource_eq(self):
+        r = Resources(cpus=0.1, ram=2048)
+        assert r not in [{}, [], None]
+        assert r == r
+
+        r2 = Resources(cpus=0.1, ram=2048)
+        assert r == r2
+        assert r2 == r
+
+        r3 = Resources(cpus=0.2, ram=2048)
+        assert r != r3

[airflow] 05/31: Fix the triggerer capacity test (#21760)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3a2fd04c12d0d8bf27b5edd893e62c6c36582d6b
Author: Mark Norman Francis <no...@201created.com>
AuthorDate: Wed Feb 23 12:38:27 2022 +0000

    Fix the triggerer capacity test (#21760)
    
    Commit 9076b67 changed the triggerer logic to use int not string.
    
    (cherry picked from commit e1fe30c70d0fe9c033db9daf9d4420f7fa815b2d)
---
 tests/cli/commands/test_triggerer_command.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/cli/commands/test_triggerer_command.py b/tests/cli/commands/test_triggerer_command.py
index 6b20e6f..26a46bd 100644
--- a/tests/cli/commands/test_triggerer_command.py
+++ b/tests/cli/commands/test_triggerer_command.py
@@ -43,4 +43,4 @@ class TestTriggererCommand(unittest.TestCase):
         """Ensure that the capacity argument is passed correctly"""
         args = self.parser.parse_args(['triggerer', '--capacity=42'])
         triggerer_command.triggerer(args)
-        mock_scheduler_job.assert_called_once_with(capacity="42")
+        mock_scheduler_job.assert_called_once_with(capacity=42)

[airflow] 25/31: Fix postgres part of pipeline example of tutorial (#21586)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fe8f560a622e12d8c01fe4cc4182a4a596c60311
Author: KevinYanesG <75...@users.noreply.github.com>
AuthorDate: Tue Feb 15 19:26:30 2022 +0100

    Fix postgres part of pipeline example of tutorial (#21586)
    
    (cherry picked from commit 40028f3ea3e78a9cf0db9de6b16fa67fa730dd7a)
---
 docs/apache-airflow/tutorial.rst | 67 ++++++++++++++++++++++++----------------
 1 file changed, 41 insertions(+), 26 deletions(-)

diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 085be42..7a2245f 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -381,11 +381,30 @@ We need to have docker and postgres installed.
 We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
 Follow the instructions properly to set up Airflow.
 
-Create a Employee table in postgres using this:
+You can use the postgres_default connection:
+
+- Conn id: postgres_default
+- Conn Type: postgres
+- Host: postgres
+- Schema: airflow
+- Login: airflow
+- Password: airflow
+
+
+After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection. For
+
+
+Open up a postgres shell:
+
+.. code-block:: bash
+
+  ./airflow.sh airflow db shell
+
+Create the Employees table with:
 
 .. code-block:: sql
 
-  CREATE TABLE "Employees"
+  CREATE TABLE EMPLOYEES
   (
       "Serial Number" NUMERIC PRIMARY KEY,
       "Company Name" TEXT,
@@ -394,7 +413,11 @@ Create a Employee table in postgres using this:
       "Leave" INTEGER
   );
 
-  CREATE TABLE "Employees_temp"
+Afterwards, create the Employees_temp table:
+
+.. code-block:: sql
+
+  CREATE TABLE EMPLOYEES_TEMP
   (
       "Serial Number" NUMERIC PRIMARY KEY,
       "Company Name" TEXT,
@@ -403,17 +426,9 @@ Create a Employee table in postgres using this:
       "Leave" INTEGER
   );
 
-We also need to add a connection to postgres. Go to the UI and click "Admin" >> "Connections". Specify the following for each field:
+We are now ready write the DAG.
 
-- Conn id: LOCAL
-- Conn Type: postgres
-- Host: postgres
-- Schema: <DATABASE_NAME>
-- Login: airflow
-- Password: airflow
-- Port: 5432
 
-After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection and we are now ready write the DAG.
 
 Let's break this down into 2 steps: get data & merge data:
 
@@ -436,12 +451,12 @@ Let's break this down into 2 steps: get data & merge data:
       with open(data_path, "w") as file:
           file.write(response.text)
 
-      postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+      postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
       conn = postgres_hook.get_conn()
       cur = conn.cursor()
       with open(data_path, "r") as file:
           cur.copy_expert(
-              "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+              "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
               file,
           )
       conn.commit()
@@ -457,16 +472,16 @@ Here we are passing a ``GET`` request to get the data from the URL and save it i
   @task
   def merge_data():
       query = """
-          DELETE FROM "Employees" e
-          USING "Employees_temp" et
+          DELETE FROM EMPLOYEES e
+          USING EMPLOYEES_TEMP et
           WHERE e."Serial Number" = et."Serial Number";
 
-          INSERT INTO "Employees"
+          INSERT INTO EMPLOYEES
           SELECT *
-          FROM "Employees_temp";
+          FROM EMPLOYEES_TEMP;
       """
       try:
-          postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+          postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
           conn = postgres_hook.get_conn()
           cur = conn.cursor()
           cur.execute(query)
@@ -509,12 +524,12 @@ Lets look at our DAG:
           with open(data_path, "w") as file:
               file.write(response.text)
 
-          postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+          postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
           conn = postgres_hook.get_conn()
           cur = conn.cursor()
           with open(data_path, "r") as file:
               cur.copy_expert(
-                  "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+                  "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
                   file,
               )
           conn.commit()
@@ -522,16 +537,16 @@ Lets look at our DAG:
       @task
       def merge_data():
           query = """
-                  DELETE FROM "Employees" e
-                  USING "Employees_temp" et
+                  DELETE FROM EMPLOYEES e
+                  USING EMPLOYEES_TEMP et
                   WHERE e."Serial Number" = et."Serial Number";
 
-                  INSERT INTO "Employees"
+                  INSERT INTO EMPLOYEES
                   SELECT *
-                  FROM "Employees_temp";
+                  FROM EMPLOYEES_TEMP;
                   """
           try:
-              postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+              postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
               conn = postgres_hook.get_conn()
               cur = conn.cursor()
               cur.execute(query)

[airflow] 19/31: A trigger might use a connection; make sure we mask passwords (#21207)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 84557a592fcb3f11e1758b1879cddcc5721db2ed
Author: Malthe Borch <mb...@gmail.com>
AuthorDate: Sat Jan 29 16:10:23 2022 +0000

    A trigger might use a connection; make sure we mask passwords (#21207)
    
    (cherry picked from commit 3d0c1aea5a85a4d31d3ade530e4c5b85b045503a)
---
 airflow/cli/commands/triggerer_command.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py
index 40b1b0e..110ff4f 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -30,6 +30,7 @@ from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, si
 @cli_utils.action_logging
 def triggerer(args):
     """Starts Airflow Triggerer"""
+    settings.MASK_SECRETS_IN_LOGS = True
     print(settings.HEADER)
     job = TriggererJob(capacity=args.capacity)
 

[airflow] 22/31: Set X-Frame-Options header to DENY only if X_FRAME_ENABLED is set to true. (#19491)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 58f09cdadbc2ff6e0d46a2c1aa0d71740f343097
Author: Kanthi <su...@gmail.com>
AuthorDate: Sat Jan 22 18:09:51 2022 -0500

    Set X-Frame-Options header to DENY only if X_FRAME_ENABLED is set to true. (#19491)
    
    (cherry picked from commit 084079f446570ba43114857ea1a54df896201419)
---
 airflow/www/extensions/init_security.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/www/extensions/init_security.py b/airflow/www/extensions/init_security.py
index 544deeb..2481a96 100644
--- a/airflow/www/extensions/init_security.py
+++ b/airflow/www/extensions/init_security.py
@@ -35,7 +35,8 @@ def init_xframe_protection(app):
         return
 
     def apply_caching(response):
-        response.headers["X-Frame-Options"] = "DENY"
+        if not x_frame_enabled:
+            response.headers["X-Frame-Options"] = "DENY"
         return response
 
     app.after_request(apply_caching)

[airflow] 20/31: fix: Update custom connection field processing (#20883)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 235e8028ed4735c0a17d6af8eaea493cf232b3e9
Author: Mike McDonald <30...@users.noreply.github.com>
AuthorDate: Sun Jan 23 16:33:06 2022 -0800

    fix: Update custom connection field processing (#20883)
    
    * fix: Update custom connection field processing
    
    Fixes issue where custom connectionfields are not updated because `extra` field is in form and has previous values, overriding custom field values.
    Adds portion of connection form tests to test functionality.
    
    (cherry picked from commit 44df1420582b358594c8d7344865811cff02956c)
---
 airflow/www/views.py                     | 28 +++++++++++++++++-----------
 tests/www/views/test_views_connection.py | 15 +++++++++++++++
 2 files changed, 32 insertions(+), 11 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index d6bc40b..750cbd9 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3518,19 +3518,17 @@ class ConnectionModelView(AirflowModelView):
         """Process form data."""
         conn_type = form.data['conn_type']
         conn_id = form.data["conn_id"]
-        extra = {
-            key: form.data[key]
-            for key in self.extra_fields
-            if key in form.data and key.startswith(f"extra__{conn_type}__")
-        }
 
-        # If parameters are added to the classic `Extra` field, include these values along with
-        # custom-field extras.
-        extra_conn_params = form.data.get("extra")
+        # The extra value is the combination of custom fields for this conn_type and the Extra field.
+        # The extra form field with all extra values (including custom fields) is in the form being processed
+        # so we start with those values, and override them with anything in the custom fields.
+        extra = {}
+
+        extra_field = form.data.get("extra")
 
-        if extra_conn_params:
+        if extra_field:
             try:
-                extra.update(json.loads(extra_conn_params))
+                extra.update(json.loads(extra_field))
             except (JSONDecodeError, TypeError):
                 flash(
                     Markup(
@@ -3539,11 +3537,19 @@ class ConnectionModelView(AirflowModelView):
                         "<p>If connection parameters need to be added to <em>Extra</em>, "
                         "please make sure they are in the form of a single, valid JSON object.</p><br>"
                         "The following <em>Extra</em> parameters were <b>not</b> added to the connection:<br>"
-                        f"{extra_conn_params}",
+                        f"{extra_field}",
                     ),
                     category="error",
                 )
 
+        custom_fields = {
+            key: form.data[key]
+            for key in self.extra_fields
+            if key in form.data and key.startswith(f"extra__{conn_type}__")
+        }
+
+        extra.update(custom_fields)
+
         if extra.keys():
             form.extra.data = json.dumps(extra)
 
diff --git a/tests/www/views/test_views_connection.py b/tests/www/views/test_views_connection.py
index 249bf2a..d64d261 100644
--- a/tests/www/views/test_views_connection.py
+++ b/tests/www/views/test_views_connection.py
@@ -107,6 +107,21 @@ def test_process_form_extras():
 
     assert json.loads(mock_form.extra.data) == {"extra__test3__custom_field": "custom_field_val3"}
 
+    # Testing parameters set in both extra and custom fields (cunnection updates).
+    mock_form = mock.Mock()
+    mock_form.data = {
+        "conn_type": "test4",
+        "conn_id": "extras_test4",
+        "extra": '{"extra__test4__custom_field": "custom_field_val3"}',
+        "extra__test4__custom_field": "custom_field_val4",
+    }
+
+    cmv = ConnectionModelView()
+    cmv.extra_fields = ["extra__test4__custom_field"]  # Custom field
+    cmv.process_form(form=mock_form, is_created=True)
+
+    assert json.loads(mock_form.extra.data) == {"extra__test4__custom_field": "custom_field_val4"}
+
 
 def test_duplicate_connection(admin_client):
     """Test Duplicate multiple connection with suffix"""

[airflow] 31/31: Add 2.2.5 to CHANGELOG.txt and UPDATING.md

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b50aae1c4d645047fd545fe9a511ab64a22eac53
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Mar 22 20:45:06 2022 +0100

    Add 2.2.5 to CHANGELOG.txt and UPDATING.md
---
 CHANGELOG.txt | 36 ++++++++++++++++++++++++++++++++++++
 1 file changed, 36 insertions(+)

diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index cb50c54..636077b 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -1,3 +1,39 @@
+Airflow 2.2.4, 2022-03-25
+-------------------------
+
+bug-fix
+"""""""
+- Fix Tasks getting stuck in scheduled state (#19747)
+- Reduce DB load incurred by Stale DAG deactivation (#21399)
+- Fix race condition between triggerer and scheduler (#21316)
+- Log traceback in trigger excs (#21213)
+- Fix duplicate trigger creation race condition (#20699)
+- Set X-Frame-Options header to DENY only if X_FRAME_ENABLED is set to true. (#19491)
+- Rename `to_delete` to `to_cancel` in TriggerRunner (#20658)
+- fix: Update custom connection field processing (#20883)
+- A trigger might use a connection; make sure we mask passwords (#21207)
+- Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413)
+- Fix Resources __eq__ check (#21442)
+- Filter out default configs when overrides exist. (#21539)
+- Fix logging JDBC SQL error when task fails (#21540)
+- Disable default_pool delete on web ui (#21658)
+- Log exception in local executor (#21667)
+- Correctly handle multiple '=' in LocalFileSystem secrets. (#21694)
+- Fix stray order_by(TaskInstance.execution_date) (#21705)
+- Fix filesystem sensor for directories (#21729)
+- Fix graph autorefresh on page load (#21736)
+- Fix triggerer --capacity parameter (#21753)
+- Fix assignment of unassigned triggers (#21770)
+- Fix incorrect data provided to tries & landing times charts (#21928)
+
+doc-only
+""""""""
+- adding `on_execute_callback` to callbacks docs (#22362)
+- Add documentation on specifying a DB schema. (#22347)
+- Fix postgres part of pipeline example of tutorial (#21586)
+- Extend documentation for states of DAGs & tasks and update trigger rules docs (#21382)
+- DB upgrade is required when updating Airflow (#22061)
+
 Airflow 2.2.4, 2021-02-22
 -------------------------
 

[airflow] 11/31: Correctly handle multiple '=' in LocalFileSystem secrets. (#21694)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c47323d9c4ac5cf4137b3266be3ac558aa63fca9
Author: Davy <ka...@gmail.com>
AuthorDate: Thu Feb 24 15:40:57 2022 +0800

    Correctly handle multiple '=' in LocalFileSystem secrets. (#21694)
    
    (cherry picked from commit 919b75ba20083cc83c4e84e35aae8102af2b5871)
---
 airflow/secrets/local_filesystem.py    |  7 +++----
 tests/secrets/test_local_filesystem.py | 17 +++++++++++++++++
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/airflow/secrets/local_filesystem.py b/airflow/secrets/local_filesystem.py
index d23969f..227a77c 100644
--- a/airflow/secrets/local_filesystem.py
+++ b/airflow/secrets/local_filesystem.py
@@ -75,8 +75,8 @@ def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSynt
             # Ignore comments
             continue
 
-        var_parts: List[str] = line.split("=", 2)
-        if len(var_parts) != 2:
+        key, sep, value = line.partition("=")
+        if not sep:
             errors.append(
                 FileSyntaxError(
                     line_no=line_no,
@@ -85,8 +85,7 @@ def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSynt
             )
             continue
 
-        key, value = var_parts
-        if not key:
+        if not value:
             errors.append(
                 FileSyntaxError(
                     line_no=line_no,
diff --git a/tests/secrets/test_local_filesystem.py b/tests/secrets/test_local_filesystem.py
index 85f0aaa..5993eb3 100644
--- a/tests/secrets/test_local_filesystem.py
+++ b/tests/secrets/test_local_filesystem.py
@@ -155,6 +155,23 @@ class TestLoadConnection(unittest.TestCase):
 
     @parameterized.expand(
         (
+            (
+                "CONN_ID=mysql://host_1?param1=val1&param2=val2",
+                {"CONN_ID": "mysql://host_1?param1=val1&param2=val2"},
+            ),
+        )
+    )
+    def test_parsing_with_params(self, content, expected_connection_uris):
+        with mock_local_file(content):
+            connections_by_conn_id = local_filesystem.load_connections_dict("a.env")
+            connection_uris_by_conn_id = {
+                conn_id: connection.get_uri() for conn_id, connection in connections_by_conn_id.items()
+            }
+
+            assert expected_connection_uris == connection_uris_by_conn_id
+
+    @parameterized.expand(
+        (
             ("AA", 'Invalid line format. The line should contain at least one equal sign ("=")'),
             ("=", "Invalid line format. Key is empty."),
         )

[airflow] 28/31: adding `on_execute_callback` to callbacks docs (#22362)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5d53fcbc32deff5fa0b559309b333e2fbab05ce4
Author: aspain <ad...@gmail.com>
AuthorDate: Mon Mar 21 16:38:25 2022 -0400

    adding `on_execute_callback` to callbacks docs (#22362)
    
    * adding on_execute_callback
    
    `on_execute_callback` is not listed but is an available callback to use. Could not find a docs page to link the way that the rest of them have.
    
    * Update docs/apache-airflow/logging-monitoring/callbacks.rst
    
    Co-authored-by: Josh Fell <48...@users.noreply.github.com>
    
    * Update docs/apache-airflow/logging-monitoring/callbacks.rst
    
    Co-authored-by: Josh Fell <48...@users.noreply.github.com>
    Co-authored-by: eladkal <45...@users.noreply.github.com>
    (cherry picked from commit 179c5b67e9c838f2f6df2b2b9a4451bfa09db42d)
---
 docs/apache-airflow/logging-monitoring/callbacks.rst | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/docs/apache-airflow/logging-monitoring/callbacks.rst b/docs/apache-airflow/logging-monitoring/callbacks.rst
index 15bbacb..4b1b754 100644
--- a/docs/apache-airflow/logging-monitoring/callbacks.rst
+++ b/docs/apache-airflow/logging-monitoring/callbacks.rst
@@ -32,7 +32,7 @@ For example, you may wish to alert when certain tasks have failed, or have the l
 Callback Types
 --------------
 
-There are four types of task events that can trigger a callback:
+There are five types of task events that can trigger a callback:
 
 =========================================== ================================================================
 Name                                        Description
@@ -41,6 +41,7 @@ Name                                        Description
 ``on_failure_callback``                     Invoked when the task :ref:`fails <concepts:task-instances>`
 ``sla_miss_callback``                       Invoked when a task misses its defined :ref:`SLA <concepts:slas>`
 ``on_retry_callback``                       Invoked when the task is :ref:`up for retry <concepts:task-instances>`
+``on_execute_callback``                     Invoked right before the task begins executing.
 =========================================== ================================================================
 
 

[airflow] 23/31: Fix duplicate trigger creation race condition (#20699)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7a4bbdb7134a0095ebb9cd5ec8405a1dbf4bab8f
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Thu Jan 6 15:16:02 2022 -0800

    Fix duplicate trigger creation race condition (#20699)
    
    The process for queueing up a trigger, for execution by the TriggerRunner, is handled by the TriggerJob's `load_triggers` method.  It fetches the triggers that should be running according to the database, checks if they are running and if not it adds them to `TriggerRunner.to_create`.  The problem is tha there's a small window of time between the moment a trigger (upon termination) is purged from the `TriggerRunner.triggers` set,  and the time that the database is updated to reflect t [...]
    
    To resolve this what we do here is, before adding a trigger to the `to_create` queue, instead of comparing against the "running" triggers, we compare against all triggers known to the TriggerRunner instance.  When triggers move out of the `triggers` set they move into other data structures such as `events` and `failed_triggers` and `to_cancel`.  So we union all of these and only create those triggers which the database indicates should exist _and_ which are know already being handled  [...]
    
    (cherry picked from commit 16b8c476518ed76e3689966ec4b0b788be935410)
---
 airflow/jobs/triggerer_job.py    |  12 +++-
 tests/jobs/test_triggerer_job.py | 136 ++++++++++++++++++++++++++++++++++++++-
 2 files changed, 142 insertions(+), 6 deletions(-)

diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index 25a4c79..dff0e0f 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -381,10 +381,16 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         # line's execution, but we consider that safe, since there's a strict
         # add -> remove -> never again lifecycle this function is already
         # handling.
-        current_trigger_ids = set(self.triggers.keys())
+        running_trigger_ids = set(self.triggers.keys())
+        known_trigger_ids = (
+            running_trigger_ids.union(x[0] for x in self.events)
+            .union(self.to_cancel)
+            .union(x[0] for x in self.to_create)
+            .union(self.failed_triggers)
+        )
         # Work out the two difference sets
-        new_trigger_ids = requested_trigger_ids.difference(current_trigger_ids)
-        cancel_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
+        new_trigger_ids = requested_trigger_ids - known_trigger_ids
+        cancel_trigger_ids = running_trigger_ids - requested_trigger_ids
         # Bulk-fetch new trigger records
         new_triggers = Trigger.bulk_fetch(new_trigger_ids)
         # Add in new triggers
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 5adc91f..870116a 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -16,29 +16,53 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import asyncio
 import datetime
 import sys
 import time
+from threading import Thread
 
 import pytest
 
-from airflow.jobs.triggerer_job import TriggererJob
-from airflow.models import Trigger
+from airflow.jobs.triggerer_job import TriggererJob, TriggerRunner
+from airflow.models import DagModel, DagRun, TaskInstance, Trigger
 from airflow.operators.dummy import DummyOperator
+from airflow.operators.python import PythonOperator
 from airflow.triggers.base import TriggerEvent
 from airflow.triggers.temporal import TimeDeltaTrigger
 from airflow.triggers.testing import FailureTrigger, SuccessTrigger
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State, TaskInstanceState
-from tests.test_utils.db import clear_db_runs
+from tests.test_utils.db import clear_db_dags, clear_db_runs
+
+
+class TimeDeltaTrigger_(TimeDeltaTrigger):
+    def __init__(self, delta, filename):
+        super().__init__(delta=delta)
+        self.filename = filename
+        self.delta = delta
+
+    async def run(self):
+        with open(self.filename, 'at') as f:
+            f.write('hi\n')
+        async for event in super().run():
+            yield event
+
+    def serialize(self):
+        return (
+            "tests.jobs.test_triggerer_job.TimeDeltaTrigger_",
+            {"delta": self.delta, "filename": self.filename},
+        )
 
 
 @pytest.fixture(autouse=True)
 def clean_database():
     """Fixture that cleans the database before and after every test."""
     clear_db_runs()
+    clear_db_dags()
     yield  # Test runs here
+    clear_db_dags()
     clear_db_runs()
 
 
@@ -160,6 +184,112 @@ def test_trigger_lifecycle(session):
 
 
 @pytest.mark.skipif(sys.version_info.minor <= 6 and sys.version_info.major <= 3, reason="No triggerer on 3.6")
+def test_trigger_create_race_condition_18392(session, tmp_path):
+    """
+    This verifies the resolution of race condition documented in github issue #18392.
+    Triggers are queued for creation by TriggerJob.load_triggers.
+    There was a race condition where multiple triggers would be created unnecessarily.
+    What happens is the runner completes the trigger and purges from the "running" list.
+    Then job.load_triggers is called and it looks like the trigger is not running but should,
+    so it queues it again.
+
+    The scenario is as follows:
+        1. job.load_triggers (trigger now queued)
+        2. runner.create_triggers (trigger now running)
+        3. job.handle_events (trigger still appears running so state not updated in DB)
+        4. runner.cleanup_finished_triggers (trigger completed at this point; trigger from "running" set)
+        5. job.load_triggers (trigger not running, but also not purged from DB, so it is queued again)
+        6. runner.create_triggers (trigger created again)
+
+    This test verifies that under this scenario only one trigger is created.
+    """
+    path = tmp_path / 'test_trigger_bad_respawn.txt'
+
+    class TriggerRunner_(TriggerRunner):
+        """We do some waiting for main thread looping"""
+
+        async def wait_for_job_method_count(self, method, count):
+            for _ in range(30):
+                await asyncio.sleep(0.1)
+                if getattr(self, f'{method}_count', 0) >= count:
+                    break
+            else:
+                pytest.fail(f"did not observe count {count} in job method {method}")
+
+        async def create_triggers(self):
+            """
+            On first run, wait for job.load_triggers to make sure they are queued
+            """
+            if getattr(self, 'loop_count', 0) == 0:
+                await self.wait_for_job_method_count('load_triggers', 1)
+            await super().create_triggers()
+            self.loop_count = getattr(self, 'loop_count', 0) + 1
+
+        async def cleanup_finished_triggers(self):
+            """On loop 1, make sure that job.handle_events was already called"""
+            if self.loop_count == 1:
+                await self.wait_for_job_method_count('handle_events', 1)
+            await super().cleanup_finished_triggers()
+
+    class TriggererJob_(TriggererJob):
+        """We do some waiting for runner thread looping (and track calls in job thread)"""
+
+        def wait_for_runner_loop(self, runner_loop_count):
+            for _ in range(30):
+                time.sleep(0.1)
+                if getattr(self.runner, 'call_count', 0) >= runner_loop_count:
+                    break
+            else:
+                pytest.fail("did not observe 2 loops in the runner thread")
+
+        def load_triggers(self):
+            """On second run, make sure that runner has called create_triggers in its second loop"""
+            super().load_triggers()
+            self.runner.load_triggers_count = getattr(self.runner, 'load_triggers_count', 0) + 1
+            if self.runner.load_triggers_count == 2:
+                self.wait_for_runner_loop(runner_loop_count=2)
+
+        def handle_events(self):
+            super().handle_events()
+            self.runner.handle_events_count = getattr(self.runner, 'handle_events_count', 0) + 1
+
+    trigger = TimeDeltaTrigger_(delta=datetime.timedelta(microseconds=1), filename=path.as_posix())
+    trigger_orm = Trigger.from_object(trigger)
+    trigger_orm.id = 1
+    session.add(trigger_orm)
+
+    dag = DagModel(dag_id='test-dag')
+    dag_run = DagRun(dag.dag_id, run_id='abc', run_type='none')
+    ti = TaskInstance(PythonOperator(task_id='dummy-task', python_callable=print), run_id=dag_run.run_id)
+    ti.dag_id = dag.dag_id
+    ti.trigger_id = 1
+    session.add(dag)
+    session.add(dag_run)
+    session.add(ti)
+
+    session.commit()
+
+    job = TriggererJob_()
+    job.runner = TriggerRunner_()
+    thread = Thread(target=job._execute)
+    thread.start()
+    try:
+        for _ in range(40):
+            time.sleep(0.1)
+            # ready to evaluate after 2 loops
+            if getattr(job.runner, 'loop_count', 0) >= 2:
+                break
+        else:
+            pytest.fail("did not observe 2 loops in the runner thread")
+    finally:
+        job.runner.stop = True
+        job.runner.join()
+        thread.join()
+    instances = path.read_text().splitlines()
+    assert len(instances) == 1
+
+
+@pytest.mark.skipif(sys.version_info.minor <= 6 and sys.version_info.major <= 3, reason="No triggerer on 3.6")
 def test_trigger_from_dead_triggerer(session):
     """
     Checks that the triggerer will correctly claim a Trigger that is assigned to a

[airflow] 15/31: Filter out default configs when overrides exist. (#21539)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f092ed5a3e7e9a1dcbafbb75a11e9614dd0302c6
Author: Xiao Yu <me...@xyu.io>
AuthorDate: Tue Mar 15 18:06:50 2022 +0000

    Filter out default configs when overrides exist. (#21539)
    
    * Filter out default configs when overrides exist.
    
    When sending configs to Airflow workers we materialize a temp config file. In #18772 a feature was added so that `_cmd` generated secrets are not written to the files in some cases instead favoring maintaining the raw `_cmd` settings. Unfortunately during materializing of the configs via `as_dict()` Airflow defaults are generated and materialized as well including defaults for the non `_cmd` versions of some settings. And due to Airflow setting precedence stating bare versions of sett [...]
    https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.html
    
    This change checks `_cmd`, env, and secrets when materializing configs via `as_dict()` so that if the bare versions of the values is exactly the same as Airflow defaults and we have "hidden" / special versions of these configs that are trying to be set we remove the bare versions so that the correct version can be used.
    
    Fixes: #20092
    Related to: #18772 #4050
    
    (cherry picked from commit e07bc63ec0e5b679c87de8e8d4cdff1cf4671146)
---
 airflow/configuration.py                 | 58 ++++++++++++++++++++++++++++++++
 docs/apache-airflow/howto/set-config.rst |  4 +++
 tests/core/test_configuration.py         | 46 +++++++++++++++++++++++++
 3 files changed, 108 insertions(+)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index e120b26..88587f3 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -581,6 +581,15 @@ class AirflowConfigParser(ConfigParser):
         """
         Returns the current configuration as an OrderedDict of OrderedDicts.
 
+        When materializing current configuration Airflow defaults are
+        materialized along with user set configs. If any of the `include_*`
+        options are False then the result of calling command or secret key
+        configs do not override Airflow defaults and instead are passed through.
+        In order to then avoid Airflow defaults from overwriting user set
+        command or secret key configs we filter out bare sensitive_config_values
+        that are set to Airflow defaults when command or secret key configs
+        produce different values.
+
         :param display_source: If False, the option value is returned. If True,
             a tuple of (option_value, source) is returned. Source is either
             'airflow.cfg', 'default', 'env var', or 'cmd'.
@@ -618,14 +627,21 @@ class AirflowConfigParser(ConfigParser):
         # add env vars and overwrite because they have priority
         if include_env:
             self._include_envs(config_sources, display_sensitive, display_source, raw)
+        else:
+            self._filter_by_source(config_sources, display_source, self._get_env_var_option)
 
         # add bash commands
         if include_cmds:
             self._include_commands(config_sources, display_sensitive, display_source, raw)
+        else:
+            self._filter_by_source(config_sources, display_source, self._get_cmd_option)
 
         # add config from secret backends
         if include_secret:
             self._include_secrets(config_sources, display_sensitive, display_source, raw)
+        else:
+            self._filter_by_source(config_sources, display_source, self._get_secret_option)
+
         return config_sources
 
     def _include_secrets(self, config_sources, display_sensitive, display_source, raw):
@@ -683,6 +699,48 @@ class AirflowConfigParser(ConfigParser):
                 key = key.lower()
             config_sources.setdefault(section, OrderedDict()).update({key: opt})
 
+    def _filter_by_source(self, config_sources, display_source, getter_func):
+        """
+        Deletes default configs from current configuration (an OrderedDict of
+        OrderedDicts) if it would conflict with special sensitive_config_values.
+
+        This is necessary because bare configs take precedence over the command
+        or secret key equivalents so if the current running config is
+        materialized with Airflow defaults they in turn override user set
+        command or secret key configs.
+
+        :param config_sources: The current configuration to operate on
+        :param display_source: If False, configuration options contain raw
+            values. If True, options are a tuple of (option_value, source).
+            Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
+        :param getter_func: A callback function that gets the user configured
+            override value for a particular sensitive_config_values config.
+        :rtype: None
+        :return: None, the given config_sources is filtered if necessary,
+            otherwise untouched.
+        """
+        for (section, key) in self.sensitive_config_values:
+            # Don't bother if we don't have section / key
+            if section not in config_sources or key not in config_sources[section]:
+                continue
+            # Check that there is something to override defaults
+            try:
+                getter_opt = getter_func(section, key)
+            except ValueError:
+                continue
+            if not getter_opt:
+                continue
+            # Check to see that there is a default value
+            if not self.airflow_defaults.has_option(section, key):
+                continue
+            # Check to see if bare setting is the same as defaults
+            if display_source:
+                opt, source = config_sources[section][key]
+            else:
+                opt = config_sources[section][key]
+            if opt == self.airflow_defaults.get(section, key):
+                del config_sources[section][key]
+
     @staticmethod
     def _replace_config_with_display_sources(config_sources, configs, display_source, raw):
         for (source_name, config) in configs:
diff --git a/docs/apache-airflow/howto/set-config.rst b/docs/apache-airflow/howto/set-config.rst
index 03cf0de..ad25c1d 100644
--- a/docs/apache-airflow/howto/set-config.rst
+++ b/docs/apache-airflow/howto/set-config.rst
@@ -100,6 +100,10 @@ The universal order of precedence for all configuration options is as follows:
 #. secret key in ``airflow.cfg``
 #. Airflow's built in defaults
 
+.. note::
+    For Airflow versions >= 2.2.1, < 2.3.0 Airflow's built in defaults took precedence
+    over command and secret key in ``airflow.cfg`` in some circumstances.
+
 You can check the current configuration with the ``airflow config list`` command.
 
 If you only want to see the value for one option, you can use ``airflow config get-value`` command as in
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index da1d736..a064c48 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
 import io
 import os
 import re
@@ -720,3 +721,48 @@ notacommand = OK
             "CRITICAL, FATAL, ERROR, WARN, WARNING, INFO, DEBUG."
         )
         assert message == exception
+
+    def test_as_dict_works_without_sensitive_cmds(self):
+        conf_materialize_cmds = conf.as_dict(display_sensitive=True, raw=True, include_cmds=True)
+        conf_maintain_cmds = conf.as_dict(display_sensitive=True, raw=True, include_cmds=False)
+
+        assert 'sql_alchemy_conn' in conf_materialize_cmds['core']
+        assert 'sql_alchemy_conn_cmd' not in conf_materialize_cmds['core']
+
+        assert 'sql_alchemy_conn' in conf_maintain_cmds['core']
+        assert 'sql_alchemy_conn_cmd' not in conf_maintain_cmds['core']
+
+        assert (
+            conf_materialize_cmds['core']['sql_alchemy_conn']
+            == conf_maintain_cmds['core']['sql_alchemy_conn']
+        )
+
+    def test_as_dict_respects_sensitive_cmds(self):
+        conf_conn = conf['core']['sql_alchemy_conn']
+        test_conf = copy.deepcopy(conf)
+        test_conf.read_string(
+            textwrap.dedent(
+                """
+                [core]
+                sql_alchemy_conn_cmd = echo -n my-super-secret-conn
+                """
+            )
+        )
+
+        conf_materialize_cmds = test_conf.as_dict(display_sensitive=True, raw=True, include_cmds=True)
+        conf_maintain_cmds = test_conf.as_dict(display_sensitive=True, raw=True, include_cmds=False)
+
+        assert 'sql_alchemy_conn' in conf_materialize_cmds['core']
+        assert 'sql_alchemy_conn_cmd' not in conf_materialize_cmds['core']
+
+        if conf_conn == test_conf.airflow_defaults['core']['sql_alchemy_conn']:
+            assert conf_materialize_cmds['core']['sql_alchemy_conn'] == 'my-super-secret-conn'
+
+        assert 'sql_alchemy_conn_cmd' in conf_maintain_cmds['core']
+        assert conf_maintain_cmds['core']['sql_alchemy_conn_cmd'] == 'echo -n my-super-secret-conn'
+
+        if conf_conn == test_conf.airflow_defaults['core']['sql_alchemy_conn']:
+            assert 'sql_alchemy_conn' not in conf_maintain_cmds['core']
+        else:
+            assert 'sql_alchemy_conn' in conf_maintain_cmds['core']
+            assert conf_maintain_cmds['core']['sql_alchemy_conn'] == conf_conn

[airflow] 04/31: Fix assignment of unassigned triggers (#21770)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 33b75ff1b9f1da68470688986e604173d0468aa1
Author: jkramer-ginkgo <68...@users.noreply.github.com>
AuthorDate: Sat Feb 26 14:25:15 2022 -0500

    Fix assignment of unassigned triggers (#21770)
    
    Previously, the query returned no alive triggerers which resulted
    in all triggers to be assigned to the current triggerer. This works
    fine, despite the logic bug, in the case where there's a single
    triggerer. But with multiple triggerers, concurrent iterations of
    the TriggerJob loop would bounce trigger ownership to whichever
    loop ran last.
    
    Addresses https://github.com/apache/airflow/issues/21616
    
    (cherry picked from commit b26d4d8a290ce0104992ba28850113490c1ca445)
---
 airflow/models/trigger.py    | 10 ++++++---
 tests/models/test_trigger.py | 52 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 5749589..aa0d2b1 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -17,7 +17,7 @@
 import datetime
 from typing import Any, Dict, List, Optional
 
-from sqlalchemy import Column, Integer, String, func
+from sqlalchemy import Column, Integer, String, func, or_
 
 from airflow.models.base import Base
 from airflow.models.taskinstance import TaskInstance
@@ -175,7 +175,7 @@ class Trigger(Base):
         alive_triggerer_ids = [
             row[0]
             for row in session.query(BaseJob.id).filter(
-                BaseJob.end_date is None,
+                BaseJob.end_date.is_(None),
                 BaseJob.latest_heartbeat > timezone.utcnow() - datetime.timedelta(seconds=30),
                 BaseJob.job_type == "TriggererJob",
             )
@@ -184,7 +184,11 @@ class Trigger(Base):
         # Find triggers who do NOT have an alive triggerer_id, and then assign
         # up to `capacity` of those to us.
         trigger_ids_query = (
-            session.query(cls.id).filter(cls.triggerer_id.notin_(alive_triggerer_ids)).limit(capacity).all()
+            session.query(cls.id)
+            # notin_ doesn't find NULL rows
+            .filter(or_(cls.triggerer_id.is_(None), cls.triggerer_id.notin_(alive_triggerer_ids)))
+            .limit(capacity)
+            .all()
         )
         session.query(cls).filter(cls.id.in_([i.id for i in trigger_ids_query])).update(
             {cls.triggerer_id: triggerer_id},
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index aacfa88..99cd71f 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -15,8 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
+
 import pytest
 
+from airflow.jobs.triggerer_job import TriggererJob
 from airflow.models import TaskInstance, Trigger
 from airflow.operators.dummy import DummyOperator
 from airflow.triggers.base import TriggerEvent
@@ -36,9 +39,11 @@ def session():
 def clear_db(session):
     session.query(TaskInstance).delete()
     session.query(Trigger).delete()
+    session.query(TriggererJob).delete()
     yield session
     session.query(TaskInstance).delete()
     session.query(Trigger).delete()
+    session.query(TriggererJob).delete()
     session.commit()
 
 
@@ -124,3 +129,50 @@ def test_submit_failure(session, create_task_instance):
     updated_task_instance = session.query(TaskInstance).one()
     assert updated_task_instance.state == State.SCHEDULED
     assert updated_task_instance.next_method == "__fail__"
+
+
+def test_assign_unassigned(session, create_task_instance):
+    """
+    Tests that unassigned triggers of all appropriate states are assigned.
+    """
+    finished_triggerer = TriggererJob(None, heartrate=10, state=State.SUCCESS)
+    finished_triggerer.end_date = timezone.utcnow() - datetime.timedelta(hours=1)
+    session.add(finished_triggerer)
+    assert not finished_triggerer.is_alive()
+    healthy_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING)
+    session.add(healthy_triggerer)
+    assert healthy_triggerer.is_alive()
+    new_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING)
+    session.add(new_triggerer)
+    assert new_triggerer.is_alive()
+    session.commit()
+    trigger_on_healthy_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_on_healthy_triggerer.id = 1
+    trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id
+    trigger_on_killed_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_on_killed_triggerer.id = 2
+    trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id
+    trigger_unassigned_to_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_unassigned_to_triggerer.id = 3
+    assert trigger_unassigned_to_triggerer.triggerer_id is None
+    session.add(trigger_on_healthy_triggerer)
+    session.add(trigger_on_killed_triggerer)
+    session.add(trigger_unassigned_to_triggerer)
+    session.commit()
+    assert session.query(Trigger).count() == 3
+    Trigger.assign_unassigned(new_triggerer.id, 100, session=session)
+    session.expire_all()
+    # Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer
+    assert (
+        session.query(Trigger).filter(Trigger.id == trigger_on_killed_triggerer.id).one().triggerer_id
+        == new_triggerer.id
+    )
+    assert (
+        session.query(Trigger).filter(Trigger.id == trigger_unassigned_to_triggerer.id).one().triggerer_id
+        == new_triggerer.id
+    )
+    # Check that trigger on healthy triggerer still assigned to existing triggerer
+    assert (
+        session.query(Trigger).filter(Trigger.id == trigger_on_healthy_triggerer.id).one().triggerer_id
+        == healthy_triggerer.id
+    )

[airflow] 09/31: Fix filesystem sensor for directories (#21729)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 467c7592bfbe5bd1654c815738fccc21bb9e33c5
Author: Mikhail Ilchenko <pr...@gmail.com>
AuthorDate: Tue Mar 1 12:18:34 2022 +0300

    Fix filesystem sensor for directories (#21729)
    
    Fix walking through wildcarded directory in `FileSensor.poke` method
    
    (cherry picked from commit 6b0ca646ec849af91fe8a10d3d5656cafa3ed4bd)
---
 airflow/sensors/filesystem.py    |  2 +-
 tests/sensors/test_filesystem.py | 36 ++++++++++++++++++++++++++++++++++--
 2 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py
index 130be5c..1a4711c 100644
--- a/airflow/sensors/filesystem.py
+++ b/airflow/sensors/filesystem.py
@@ -65,7 +65,7 @@ class FileSensor(BaseSensorOperator):
                 self.log.info('Found File %s last modified: %s', str(path), str(mod_time))
                 return True
 
-            for _, _, files in os.walk(full_path):
+            for _, _, files in os.walk(path):
                 if len(files) > 0:
                     return True
         return False
diff --git a/tests/sensors/test_filesystem.py b/tests/sensors/test_filesystem.py
index 4d23331..e696f1e 100644
--- a/tests/sensors/test_filesystem.py
+++ b/tests/sensors/test_filesystem.py
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-import os.path
+import os
 import shutil
 import tempfile
 import unittest
@@ -131,6 +131,38 @@ class TestFileSensor(unittest.TestCase):
             task._hook = self.hook
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_wildcard_empty_directory(self):
+        with tempfile.TemporaryDirectory() as temp_dir:
+            with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir):
+                task = FileSensor(
+                    task_id='test',
+                    filepath=os.path.join(temp_dir, '*dir'),
+                    fs_conn_id='fs_default',
+                    dag=self.dag,
+                    timeout=0,
+                )
+                task._hook = self.hook
+
+                # No files in dir
+                with pytest.raises(AirflowSensorTimeout):
+                    task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_wildcard_directory_with_files(self):
+        with tempfile.TemporaryDirectory() as temp_dir:
+            with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir) as subdir:
+                task = FileSensor(
+                    task_id='test',
+                    filepath=os.path.join(temp_dir, '*dir'),
+                    fs_conn_id='fs_default',
+                    dag=self.dag,
+                    timeout=0,
+                )
+                task._hook = self.hook
+
+                # `touch` the file in subdir
+                open(os.path.join(subdir, 'file'), 'a').close()
+                task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
     def test_wildcared_directory(self):
         temp_dir = tempfile.mkdtemp()
         subdir = tempfile.mkdtemp(dir=temp_dir)
@@ -146,7 +178,7 @@ class TestFileSensor(unittest.TestCase):
         task._hook = self.hook
 
         try:
-            # `touch` the dir
+            # `touch` the file in subdir
             open(subdir + "/file", "a").close()
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
         finally:

[airflow] 18/31: Extend documentation for states of DAGs & tasks and update trigger rules docs (#21382)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f4118c3474a497bd80c95bb20eee54252d64e93d
Author: Mateusz Nojek <ma...@gmail.com>
AuthorDate: Mon Feb 21 17:27:36 2022 +0100

    Extend documentation for states of DAGs & tasks and update trigger rules docs (#21382)
    
    (cherry picked from commit 4e959358ac4ef9554ff5d82cdc85ab7dc142a639)
---
 docs/apache-airflow/best-practices.rst |  67 +++++++++++++++++++++++++++++++++
 docs/apache-airflow/concepts/dags.rst  |   5 ++-
 docs/apache-airflow/concepts/tasks.rst |   6 +++
 docs/apache-airflow/dag-run.rst        |  27 +++++++++++--
 docs/apache-airflow/img/watcher.png    | Bin 0 -> 41592 bytes
 docs/spelling_wordlist.txt             |   1 +
 6 files changed, 101 insertions(+), 5 deletions(-)

diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst
index 3b01b3e..15fcea1 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -326,6 +326,73 @@ each parameter by following the links):
 * :ref:`config:scheduler__parsing_processes`
 * :ref:`config:scheduler__file_parsing_sort_mode`
 
+Example of watcher pattern with trigger rules
+---------------------------------------------
+
+The watcher pattern is how we call a DAG with a task that is "watching" the states of the other tasks. It's primary purpose is to fail a DAG Run when any other task fail.
+The need came from the Airflow system tests that are DAGs with different tasks (similarly like a test containing steps).
+
+Normally, when any task fails, all other tasks are not executed and the whole DAG Run gets failed status too. But when we use trigger rules, we can disrupt the normal flow of running tasks and the whole DAG may represent different status that we expect.
+For example, we can have a teardown task (with trigger rule set to ``"all_done"``) that will be executed regardless of the state of the other tasks (e.g. to clean up the resources). In such situation, the DAG would always run this task and the DAG Run will get the status of this particular task, so we can potentially lose the information about failing tasks.
+If we want to ensure that the DAG with teardown task would fail if any task fails, we need to use the watcher pattern.
+The watcher task is a task that will always fail if triggered, but it needs to be triggered only if any other task fails. It needs to have a trigger rule set to ``"one_failed"`` and it needs also to be a downstream task for all other tasks in the DAG.
+Thanks to this, if every other task will pass, the watcher will be skipped, but when something fails, the watcher task will be executed and fail making the DAG Run fail too.
+
+.. note::
+
+    Be aware that trigger rules only rely on the direct upstream (parent) tasks, e.g. ``one_failed`` will ignore any failed (or ``upstream_failed``) tasks that are not a direct parent of the parameterized task.
+
+It's easier to grab the concept with an example. Let's say that we have the following DAG:
+
+.. code-block:: python
+
+    from datetime import datetime
+    from airflow import DAG
+    from airflow.decorators import task
+    from airflow.exceptions import AirflowException
+    from airflow.operators.bash import BashOperator
+    from airflow.operators.python import PythonOperator
+
+
+    @task(trigger_rule="one_failed", retries=0)
+    def watcher():
+        raise AirflowException("Failing task because one or more upstream tasks failed.")
+
+
+    with DAG(
+        dag_id="watcher_example",
+        schedule_interval="@once",
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+    ) as dag:
+        failing_task = BashOperator(
+            task_id="failing_task", bash_command="exit 1", retries=0
+        )
+        passing_task = BashOperator(
+            task_id="passing_task", bash_command="echo passing_task"
+        )
+        teardown = BashOperator(
+            task_id="teardown", bash_command="echo teardown", trigger_rule="all_done"
+        )
+
+        failing_task >> passing_task >> teardown
+        list(dag.tasks) >> watcher()
+
+The visual representation of this DAG after execution looks like this:
+
+.. image:: /img/watcher.png
+
+We have several tasks that serve different purposes:
+
+- ``failing_task`` always fails,
+- ``passing_task`` always succeeds (if executed),
+- ``teardown`` is always triggered (regardless the states of the other tasks) and it should always succeed,
+- ``watcher`` is a downstream task for each other task, i.e. it will be triggered when any task fails and thus fail the whole DAG Run, since it's a leaf task.
+
+It's important to note, that without ``watcher`` task, the whole DAG Run will get the ``success`` state, since the only failing task is not the leaf task, and the ``teardown`` task will finish with ``success``.
+If we want the ``watcher`` to monitor the state of all tasks, we need to make it dependent on all of them separately. Thanks to this, we can fail the DAG Run if any of the tasks fail. Note that the watcher task has a trigger rule set to ``"one_failed"``.
+On the other hand, without the ``teardown`` task, the ``watcher`` task will not be needed, because ``failing_task`` will propagate its ``failed`` state to downstream task ``passed_task`` and the whole DAG Run will also get the ``failed`` status.
+
 .. _best_practices/reducing_dag_complexity:
 
 Reducing DAG complexity
diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index 32d21ce..b83b025 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -369,7 +369,7 @@ Note that if you are running the DAG at the very start of its life---specificall
 Trigger Rules
 ~~~~~~~~~~~~~
 
-By default, Airflow will wait for all upstream tasks for a task to be :ref:`successful <concepts:task-states>` before it runs that task.
+By default, Airflow will wait for all upstream (direct parents) tasks for a task to be :ref:`successful <concepts:task-states>` before it runs that task.
 
 However, this is just the default behaviour, and you can control it using the ``trigger_rule`` argument to a Task. The options for ``trigger_rule`` are:
 
@@ -383,6 +383,7 @@ However, this is just the default behaviour, and you can control it using the ``
 * ``none_skipped``: No upstream task is in a ``skipped`` state - that is, all upstream tasks are in a ``success``, ``failed``, or ``upstream_failed`` state
 * ``always``: No dependencies at all, run this task at any time
 
+
 You can also combine this with the :ref:`concepts:depends-on-past` functionality if you wish.
 
 .. note::
@@ -724,7 +725,7 @@ You can also prepare ``.airflowignore`` file for a subfolder in ``DAG_FOLDER`` a
 would only be applicable for that subfolder.
 
 DAG Dependencies
-================
+----------------
 
 *Added in Airflow 2.1*
 
diff --git a/docs/apache-airflow/concepts/tasks.rst b/docs/apache-airflow/concepts/tasks.rst
index f6c6619..41fa556 100644
--- a/docs/apache-airflow/concepts/tasks.rst
+++ b/docs/apache-airflow/concepts/tasks.rst
@@ -36,6 +36,12 @@ Relationships
 
 The key part of using Tasks is defining how they relate to each other - their *dependencies*, or as we say in Airflow, their *upstream* and *downstream* tasks. You declare your Tasks first, and then you declare their dependencies second.
 
+.. note::
+
+    We call the *upstream* task the one that is directly preceding the other task. We used to call it a parent task before.
+    Be aware that this concept does not describe the tasks that are higher in the tasks hierarchy (i.e. they are not a direct parents of the task).
+    Same definition applies to *downstream* task, which needs to be a direct child of the other task.
+
 There are two ways of declaring dependencies - using the ``>>`` and ``<<`` (bitshift) operators::
 
     first_task >> second_task >> [third_task, fourth_task]
diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst
index 1a2bbe3..52d90e0 100644
--- a/docs/apache-airflow/dag-run.rst
+++ b/docs/apache-airflow/dag-run.rst
@@ -18,6 +18,30 @@
 DAG Runs
 =========
 A DAG Run is an object representing an instantiation of the DAG in time.
+Any time the DAG is executed, a DAG Run is created and all tasks inside it are executed. The status of the DAG Run depends on the tasks states.
+Each DAG Run is run separately from another, meaning that you can have running DAG many times at the same time.
+
+.. _dag-run:dag-run-status:
+
+DAG Run Status
+''''''''''''''
+
+A DAG Run status is determined when the execution of the DAG is finished.
+The execution of the DAG depends on its containing tasks and their dependencies.
+The status is assigned to the DAG Run when all of the tasks are in the one of the terminal states (i.e. if there is no possible transition to another state) like ``success``, ``failed`` or ``skipped``.
+The DAG Run is having the status assigned based on the so-called "leaf nodes" or simply "leaves". Leaf nodes are the tasks with no children.
+
+There are two possible terminal states for the DAG Run:
+
+- ``success`` if all of the leaf nodes states are either ``success`` or ``skipped``,
+- ``failed`` if any of the leaf nodes state is either ``failed`` or ``upstream_failed``.
+
+.. note::
+    Be careful if some of your tasks have defined some specific `trigger rule <dags.html#trigger-rules>`_.
+    These can lead to some unexpected behavior, e.g. if you have a leaf task with trigger rule `"all_done"`, it will be executed regardless of the states of the rest of the tasks and if it will succeed, then the whole DAG Run will also be marked as ``success``, even if something failed in the middle.
+
+Cron Presets
+''''''''''''
 
 Each DAG may or may not have a schedule, which informs how DAG Runs are
 created. ``schedule_interval`` is defined as a DAG argument, which can be passed a
@@ -27,9 +51,6 @@ a ``str``, a ``datetime.timedelta`` object, or one of the following cron "preset
 .. tip::
     You can use an online editor for CRON expressions such as `Crontab guru <https://crontab.guru/>`_
 
-Cron Presets
-''''''''''''
-
 +----------------+----------------------------------------------------------------+-----------------+
 | preset         | meaning                                                        | cron            |
 +================+================================================================+=================+
diff --git a/docs/apache-airflow/img/watcher.png b/docs/apache-airflow/img/watcher.png
new file mode 100644
index 0000000..9e0ed2d
Binary files /dev/null and b/docs/apache-airflow/img/watcher.png differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index ed114b6..350b79d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1339,6 +1339,7 @@ taskflow
 taskinstance
 tblproperties
 tcp
+teardown
 templatable
 templateable
 templated

[airflow] 07/31: Correct a couple grammatical errors in docs (#21750)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 318a5edad40754d9b18bdbf97e142221d42c679a
Author: Zach McQuiston <37...@users.noreply.github.com>
AuthorDate: Wed Feb 23 13:48:06 2022 -0700

    Correct a couple grammatical errors in docs (#21750)
    
    Just reading through the docs as we implement Airflow on our end, saw a couple additions that could be made.
    
    (cherry picked from commit 3bb63d4cfbbb534298d32ec987f25a02c11fc4e6)
---
 docs/apache-airflow/plugins.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/plugins.rst b/docs/apache-airflow/plugins.rst
index 3543aef..f8a7e00 100644
--- a/docs/apache-airflow/plugins.rst
+++ b/docs/apache-airflow/plugins.rst
@@ -27,7 +27,7 @@ features to its core by simply dropping files in your
 The python modules in the ``plugins`` folder get imported, and **macros** and web **views**
 get integrated to Airflow's main collections and become available for use.
 
-To troubleshoot issue with plugins, you can use ``airflow plugins`` command.
+To troubleshoot issues with plugins, you can use the ``airflow plugins`` command.
 This command dumps information about loaded plugins.
 
 .. versionchanged:: 2.0

[airflow] 08/31: Fix graph autorefresh on page load (#21736)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 534140e9168d35a6722f5dc7c0094009c1683b3d
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Feb 22 11:41:39 2022 -0500

    Fix graph autorefresh on page load (#21736)
    
    * fix auto refresh check on page load
    
    * minor code cleanup
    
    * remove new line
    
    (cherry picked from commit b2c0a921c155e82d1140029e6495594061945025)
---
 airflow/www/static/js/graph.js | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index 2d3b22e..615e238 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -57,6 +57,13 @@ const stateFocusMap = {
   deferred: false,
   no_status: false,
 };
+
+const checkRunState = () => {
+  const states = Object.values(taskInstances).map((ti) => ti.state);
+  return !states.some((state) => (
+    ['success', 'failed', 'upstream_failed', 'skipped', 'removed'].indexOf(state) === -1));
+};
+
 const taskTip = d3.tip()
   .attr('class', 'tooltip d3-tip')
   .html((toolTipHtml) => toolTipHtml);
@@ -362,13 +369,11 @@ function handleRefresh() {
         if (prevTis !== tis) {
         // eslint-disable-next-line no-global-assign
           taskInstances = JSON.parse(tis);
-          const states = Object.values(taskInstances).map((ti) => ti.state);
           updateNodesStates(taskInstances);
 
           // end refresh if all states are final
-          if (!states.some((state) => (
-            ['success', 'failed', 'upstream_failed', 'skipped', 'removed'].indexOf(state) === -1))
-          ) {
+          const isFinal = checkRunState();
+          if (isFinal) {
             $('#auto_refresh').prop('checked', false);
             clearInterval(refreshInterval);
           }
@@ -410,9 +415,9 @@ $('#auto_refresh').change(() => {
 });
 
 function initRefresh() {
-  if (localStorage.getItem('disableAutoRefresh')) {
-    $('#auto_refresh').prop('checked', false);
-  }
+  const isDisabled = localStorage.getItem('disableAutoRefresh');
+  const isFinal = checkRunState();
+  $('#auto_refresh').prop('checked', !(isDisabled || isFinal));
   startOrStopRefresh();
   d3.select('#refresh_button').on('click', () => handleRefresh());
 }

[airflow] 21/31: Rename `to_delete` to `to_cancel` in TriggerRunner (#20658)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 504e845f8e1542340a9b2ce89d3c57f215429037
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Wed Jan 5 12:12:27 2022 -0800

    Rename `to_delete` to `to_cancel` in TriggerRunner (#20658)
    
    The queue's purpose is to track triggers that need to be canceled. The language `to_delete` was a bit confusing because for one it does not actually delete them but cancel them.  The deletion work is actually in `cleanup_finished_triggers`.  It seems that this method will usually not do anything and it's only for cancelling triggers that are currently running but for whatever reason no longer should be.  E.g. when a task is killed and therefore the trigger is no longer needed, or some [...]
    
    (cherry picked from commit c20ad79b40ea2b213f6dca221221c6dbd55bd08f)
---
 airflow/jobs/triggerer_job.py | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index a47bc3b..25a4c79 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -205,7 +205,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
     to_create: Deque[Tuple[int, BaseTrigger]]
 
     # Inbound queue of deleted triggers
-    to_delete: Deque[int]
+    to_cancel: Deque[int]
 
     # Outbound queue of events
     events: Deque[Tuple[int, TriggerEvent]]
@@ -221,7 +221,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         self.triggers = {}
         self.trigger_cache = {}
         self.to_create = deque()
-        self.to_delete = deque()
+        self.to_cancel = deque()
         self.events = deque()
         self.failed_triggers = deque()
 
@@ -242,7 +242,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         while not self.stop:
             # Run core logic
             await self.create_triggers()
-            await self.delete_triggers()
+            await self.cancel_triggers()
             await self.cleanup_finished_triggers()
             # Sleep for a bit
             await asyncio.sleep(1)
@@ -270,13 +270,13 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 self.log.warning("Trigger %s had insertion attempted twice", trigger_id)
             await asyncio.sleep(0)
 
-    async def delete_triggers(self):
+    async def cancel_triggers(self):
         """
-        Drain the to_delete queue and ensure all triggers that are not in the
+        Drain the to_cancel queue and ensure all triggers that are not in the
         DB are cancelled, so the cleanup job deletes them.
         """
-        while self.to_delete:
-            trigger_id = self.to_delete.popleft()
+        while self.to_cancel:
+            trigger_id = self.to_cancel.popleft()
             if trigger_id in self.triggers:
                 # We only delete if it did not exit already
                 self.triggers[trigger_id]["task"].cancel()
@@ -384,7 +384,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         current_trigger_ids = set(self.triggers.keys())
         # Work out the two difference sets
         new_trigger_ids = requested_trigger_ids.difference(current_trigger_ids)
-        old_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
+        cancel_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
         # Bulk-fetch new trigger records
         new_triggers = Trigger.bulk_fetch(new_trigger_ids)
         # Add in new triggers
@@ -401,9 +401,9 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 self.failed_triggers.append(new_id)
                 continue
             self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs)))
-        # Remove old triggers
-        for old_id in old_trigger_ids:
-            self.to_delete.append(old_id)
+        # Enqueue orphaned triggers for cancellation
+        for old_id in cancel_trigger_ids:
+            self.to_cancel.append(old_id)
 
     def get_trigger_by_classpath(self, classpath: str) -> Type[BaseTrigger]:
         """

[airflow] 12/31: Log exception in local executor (#21667)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b1e549a0043ca5ddf7463919e89df95a567c4ad4
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Mon Feb 21 05:05:01 2022 +0800

    Log exception in local executor (#21667)
    
    (cherry picked from commit a0fb0bbad312df06dd0a85453bd4f93ee2e01cbb)
---
 airflow/executors/local_executor.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 775b6ca..fc662fc 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -125,12 +125,12 @@ class LocalWorkerBase(Process, LoggingMixin):
             ret = 0
             return State.SUCCESS
         except Exception as e:
-            self.log.error("Failed to execute task %s.", str(e))
+            self.log.exception("Failed to execute task %s.", e)
+            return State.FAILED
         finally:
             Sentry.flush()
             logging.shutdown()
             os._exit(ret)
-            raise RuntimeError('unreachable -- keep mypy happy')
 
     @abstractmethod
     def do_work(self):