You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/17 21:38:51 UTC

[airflow] branch v2-2-test updated (c03b086 -> 56d82fc)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard c03b086  Add note about Variable precedence with env vars (#21568)
 discard 2aec9d8  Reorder migrations to include bugfix in 2.2.4 (#21598)
 discard 045f6ce  Fix slow DAG deletion due to missing ``dag_id`` index for job table (#20282)
 discard a520845  update tutorial_etl_dag notes (#21503)
 discard 94c6134  Simplify trigger cancel button (#21591)
 discard a0ea47e  Add a session backend to store session data in the database (#21478)
 discard 57abbac  Show task status only for running dags or only for the last finished dag (#21352)
 discard f27357f  Use compat data interval shim in log handlers (#21289)
 discard 7c93cf7  Fix postgres hook import pipeline tutorial (#21491)
 discard 7aae75c  Fix mismatch in generated run_id and logical date of DAG run (#18707)
 discard b77fb10  Fix TriggerDagRunOperator extra link (#19410)
 discard fc6b0b3  Add possibility to create user in the Remote User mode (#19963)
 discard b1e3572  Avoid deadlock when rescheduling task (#21362)
 discard 2ebda8e  Fix docs link for smart sensor deprecation (#21394)
 discard 891f51d  Update example DAGs (#21372)
 discard c904256  Update error docs to include before_send option (#21275)
 discard 9e806aa  Fix the incorrect scheduling time for the first run of dag (#21011)
 discard 160d879  Update stat_name_handler documentation (#21298)
 discard 00cee8a  Docs: Fix task order in overview example (#21282)
 discard b36d298  Update recipe for Google Cloud SDK (#21268)
 discard e304067  Augment xcom docs (#20755)
 discard 70050c1  Update version to 2.2.4 for things in that release (#21196)
 discard 91d17f8  fןס Broken link in api.rst (#21165)
 discard 5c32faf  Limit SQLAlchemy to < 1.4.0 for 2.2.* line (#21235)
 discard cd97ecc  Fix Scheduler crash when executing task instances of missing DAG (#20349)
 discard 4852b81  Actually fix tuple and bool checks for black 22.1.0 (#21221)
    omit 30b0d98  bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)
    omit 762abfb  Removed duplicated dag_run join in Dag.get_task_instances() (#20591)
    omit 0740c08  Avoid unintentional data loss when deleting DAGs (#20758)
    omit c0fbf27  Deprecate some functions in the experimental API (#19931)
    omit 928e095  Fix session usage in ``/rendered-k8s`` view (#21006)
    omit d572161  Helper for provide_session-decorated functions (#20104)
    omit 0cc934c  Type-annotate SkipMixin and BaseXCom (#20011)
    omit 5fa0e13  Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)
    omit c347d80  Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)
    omit f02ae31  Update `version_added` for `[email] from_email` (#21138)
    omit 4633cf3  Improved instructions for custom image build with docker compose (#21052)
    omit 243f44d  Add back legacy .piprc customization for pip (#21124)
    omit abf8c03  Update logging-tasks.rst (#21088)
    omit 8eeb3ea  name mismatch (#21055)
    omit 84ef8d5  Update v1.yaml (#21024)
    omit ba17fa3  Return to the same place when triggering a DAG (#20955)
     new 31c66eb  Update v1.yaml (#21024)
     new a670f8c  name mismatch (#21055)
     new 9f6d6b9  Update logging-tasks.rst (#21088)
     new 680c011  Add back legacy .piprc customization for pip (#21124)
     new 5b51c41  Improved instructions for custom image build with docker compose (#21052)
     new 9f7d292  Update `version_added` for `[email] from_email` (#21138)
     new 07102e9  Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)
     new dda8f43  Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)
     new 016929f  Type-annotate SkipMixin and BaseXCom (#20011)
     new dda864d  Helper for provide_session-decorated functions (#20104)
     new daebc58  Fix session usage in ``/rendered-k8s`` view (#21006)
     new 663bb54  Deprecate some functions in the experimental API (#19931)
     new 4dc8b90  Avoid unintentional data loss when deleting DAGs (#20758)
     new 6d8342e  Removed duplicated dag_run join in Dag.get_task_instances() (#20591)
     new 55a4abb  bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)
     new 0ba033d  Actually fix tuple and bool checks for black 22.1.0 (#21221)
     new 1b139a7  Fix Scheduler crash when executing task instances of missing DAG (#20349)
     new 4ff0ab1  Limit SQLAlchemy to < 1.4.0 for 2.2.* line (#21235)
     new ede6d8f  fןס Broken link in api.rst (#21165)
     new 2066812  Update version to 2.2.4 for things in that release (#21196)
     new 8890087  Augment xcom docs (#20755)
     new 4b3fa3a  Update recipe for Google Cloud SDK (#21268)
     new a519e53  Docs: Fix task order in overview example (#21282)
     new 015c481  Update stat_name_handler documentation (#21298)
     new 64e0c50  Fix the incorrect scheduling time for the first run of dag (#21011)
     new 270516c  Update error docs to include before_send option (#21275)
     new 5c078cd  Update example DAGs (#21372)
     new f41ea34  Fix docs link for smart sensor deprecation (#21394)
     new f2fe0df  Avoid deadlock when rescheduling task (#21362)
     new 9b03071  Add possibility to create user in the Remote User mode (#19963)
     new 95eaef3  Fix TriggerDagRunOperator extra link (#19410)
     new 1c23405  Fix mismatch in generated run_id and logical date of DAG run (#18707)
     new efc2818  Fix postgres hook import pipeline tutorial (#21491)
     new 79e9954  Use compat data interval shim in log handlers (#21289)
     new f25a58e  Show task status only for running dags or only for the last finished dag (#21352)
     new 1c2909f  Add a session backend to store session data in the database (#21478)
     new 628aa1f  Simplify trigger cancel button (#21591)
     new dd0a3a3  update tutorial_etl_dag notes (#21503)
     new 436f452  Fix slow DAG deletion due to missing ``dag_id`` index for job table (#20282)
     new 1cbad37  Reorder migrations to include bugfix in 2.2.4 (#21598)
     new 7e80127  Add note about Variable precedence with env vars (#21568)
     new 8cbf934  Adding missing login provider related methods from Flask-Appbuilder (#21294)
     new 56d82fc  added explaining concept of logical date in DAG run docs (#21433)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c03b086)
            \
             N -- N -- N   refs/heads/v2-2-test (56d82fc)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 43 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/www/fab_security/manager.py    | 15 +++++++++++++++
 airflow/www/templates/airflow/dag.html |  4 ++--
 docs/apache-airflow/concepts/dags.rst  | 16 ++++++++++++++++
 docs/apache-airflow/dag-run.rst        |  2 ++
 docs/apache-airflow/faq.rst            |  7 +++++++
 setup.cfg                              |  7 ++++++-
 tests/www/views/test_views_tasks.py    | 24 ++++--------------------
 7 files changed, 52 insertions(+), 23 deletions(-)

[airflow] 18/43: Limit SQLAlchemy to < 1.4.0 for 2.2.* line (#21235)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4ff0ab16868d7e7b765a4ab1d285088cd1f162fe
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Jan 31 19:23:58 2022 +0100

    Limit SQLAlchemy to < 1.4.0 for 2.2.* line (#21235)
    
    The recent release of FAB 3.4.4 has unblocked us from upgrading
    SQLAlchemy to 1.4.* version. We wanted to do it for quite some
    time however upgrading to 1.4.* of sqlalchemy and allowing our
    users to use it for 2.2.4 is a bit risky.
    
    We are fixing resulting "aftermath" in the main branch and as
    of this commit there are two fixes merged and remaining MsSQL
    problem. The MSSql problem does not affect 2.2.4 as MsSQL will
    be available only starting from 2.3.0, however the two other
    problems have shown that SQLAlchemy has a potential to break
    things and we might want to test it more thoroughly before
    releasing 2.3.0.
    
    The problems in question are #21205 and #21228. Both were only
    test problems but the indicate that there might be more hidden
    issues involved.
    
    In order to limit risks, this PR proposes to limit SQLAlchemy
    for 2.2.* to < 1.4.0. This will allow to upgrade FAB and
    related dependencies without opening up Airflow to upgrade to
    SQLAlchemy 1.4 (yet).
---
 setup.cfg | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/setup.cfg b/setup.cfg
index c3cce1c..7ab5c77 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -145,7 +145,7 @@ install_requires =
     python3-openid~=3.2
     rich>=9.2.0
     setproctitle>=1.1.8, <2
-    sqlalchemy>=1.3.18
+    sqlalchemy>=1.3.18, <1.4.0
     sqlalchemy_jsonfield~=1.0
     tabulate>=0.7.5, <0.9
     tenacity>=6.2.0

[airflow] 06/43: Update `version_added` for `[email] from_email` (#21138)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9f7d292769fd21c06ab0f222d7c75085d8aab288
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Thu Jan 27 09:45:33 2022 -0700

    Update `version_added` for `[email] from_email` (#21138)
    
    (cherry picked from commit 362f397d7a3351c718b798a146f2f955a17b7074)
---
 airflow/config_templates/config.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index a70854e..6941f03 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1357,7 +1357,7 @@
       description: |
         Email address that will be used as sender address.
         It can either be raw email or the complete address in a format ``Sender Name <se...@email.com>``
-      version_added: 2.3.0
+      version_added: 2.2.4
       type: string
       example: "Airflow <ai...@example.com>"
       default: ~

[airflow] 05/43: Improved instructions for custom image build with docker compose (#21052)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5b51c41a39ee9238bc0fba0ace8c5ec27b9e8875
Author: Omer Ginosar <94...@users.noreply.github.com>
AuthorDate: Tue Jan 25 23:51:16 2022 +0200

    Improved instructions for custom image build with docker compose (#21052)
    
    * Create build.rst
    
    * Update docs/docker-stack/build.rst
    
    Co-authored-by: Jarek Potiuk <ja...@potiuk.com>
    
    * fix doc build
    
    Co-authored-by: Jarek Potiuk <ja...@potiuk.com>
    Co-authored-by: eladkal <45...@users.noreply.github.com>
    (cherry picked from commit 17b48e5baf09a86ea6e2036c864a882bb0c328e2)
---
 docs/docker-stack/build.rst | 19 +++++++++++++++++--
 docs/spelling_wordlist.txt  |  1 +
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/docs/docker-stack/build.rst b/docs/docker-stack/build.rst
index b85bf1c..6b5dc47 100644
--- a/docs/docker-stack/build.rst
+++ b/docs/docker-stack/build.rst
@@ -81,8 +81,23 @@ In the simplest case building your image consists of those steps:
 
 4) Once you build the image locally you have usually several options to make them available for your deployment:
 
-* For ``docker-compose`` deployment, that's all you need. The image is stored in docker engine cache
-  and docker compose will use it from there.
+* For ``docker-compose`` deployment, if you've already built your image, and want to continue
+  building the image manually when needed with ``docker build``, you can edit the
+  docker-compose.yaml and replace the "apache/airflow:<version>" image with the
+  image you've just built ``my-image:0.0.1`` - it will be used from your local Docker
+  Engine cache. You can also simply set ``AIRFLOW_IMAGE_NAME`` variable to
+  point to your image and ``docker-compose`` will use it automatically without having
+  to modify the file.
+
+* Also for ``docker-compose`` deployment, you can delegate image building to the docker-compose.
+  To do that - open your ``docker-compose.yaml`` file and search for the phrase "In order to add custom dependencies".
+  Follow these instructions of commenting the "image" line and uncommenting the "build" line.
+  This is a standard docker-compose feature and you can read about it in
+  `Docker Compose build reference <https://docs.docker.com/compose/reference/build/>`_.
+  Run ``docker-compose build`` to build the images. Similarly as in the previous case, the
+  image is stored in Docker engine cache and Docker Compose will use it from there.
+  The ``docker-compose build`` command uses the same ``docker build`` command that
+  you can run manually under-the-hood.
 
 * For some - development targeted - Kubernetes deployments you can load the images directly to
   Kubernetes clusters. Clusters such as ``kind`` or ``minikube`` have dedicated ``load`` method to load the
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 64d839f..5d77e29 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1384,6 +1384,7 @@ uid
 umask
 un
 unarchived
+uncommenting
 undead
 ungenerated
 unicode

[airflow] 40/43: Reorder migrations to include bugfix in 2.2.4 (#21598)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1cbad378cb778fca879a522916c11d32d80ac84e
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Feb 15 16:38:56 2022 -0700

    Reorder migrations to include bugfix in 2.2.4 (#21598)
    
    (cherry picked from commit 005cef042bc4184c24ad03c1b4ee40cdbaf96cb5)
---
 .../versions/587bdf053233_adding_index_for_dag_id_in_job.py           | 4 ++--
 docs/apache-airflow/migrations-ref.rst                                | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py b/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py
index 3532fe9..c643a62 100644
--- a/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py
+++ b/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py
@@ -19,7 +19,7 @@
 """adding index for dag_id in job
 
 Revision ID: 587bdf053233
-Revises: f9da662e7089
+Revises: c381b21cb7e4
 Create Date: 2021-12-14 10:20:12.482940
 
 """
@@ -28,7 +28,7 @@ from alembic import op
 
 # revision identifiers, used by Alembic.
 revision = '587bdf053233'
-down_revision = 'f9da662e7089'
+down_revision = 'c381b21cb7e4'
 branch_labels = None
 depends_on = None
 
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 0eac329..cdaa447 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``587bdf053233`` (head)        | ``f9da662e7089`` | ``2.3.0``       | Add index for ``dag_id`` column in ``job`` table.                                     |
+| ``587bdf053233`` (head)        | ``c381b21cb7e4`` | ``2.2.4``       | Add index for ``dag_id`` column in ``job`` table.                                     |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``c381b21cb7e4``               | ``be2bfac3da23`` | ``2.2.4``       | Create a ``session`` table to store web session data                                  |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+

[airflow] 16/43: Actually fix tuple and bool checks for black 22.1.0 (#21221)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0ba033daead44624847cbf26a5e19962575c94d0
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sun Jan 30 16:59:36 2022 +0100

    Actually fix tuple and bool checks for black 22.1.0 (#21221)
    
    Previous two fixes in #21215 and #21216 did not really fix the
    problem introduced by Black 22.1.0 (they could not as they were
    wrong). This change was actually tested with the new black and
    should fix it finally.
    
    (cherry picked from commit f9e20067e0ac593fd18ad068fcc56501c6a99f2b)
---
 dev/provider_packages/prepare_provider_packages.py | 23 +++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)

diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py
index 7d0e1e5..8d920d6 100755
--- a/dev/provider_packages/prepare_provider_packages.py
+++ b/dev/provider_packages/prepare_provider_packages.py
@@ -1334,9 +1334,14 @@ def get_all_changes_for_package(
                         )
                         # Returns 66 in case of doc-only changes
                         sys.exit(66)
+                    if len(changes) > len(changes_since_last_doc_only_check):
+                        # if doc-only was released after previous release - use it as starting point
+                        # but if before - stay with the releases from last tag.
+                        changes = changes_since_last_doc_only_check
                 except subprocess.CalledProcessError:
                     # ignore when the commit mentioned as last doc-only change is obsolete
                     pass
+
             console.print(f"[yellow]The provider {provider_package_id} has changes since last release[/]")
             console.print()
             console.print(
@@ -1697,16 +1702,16 @@ def black_mode():
     config = parse_pyproject_toml(os.path.join(SOURCE_DIR_PATH, "pyproject.toml"))
 
     target_versions = set(
-        target_version_option_callback(None, None, config.get('target_version', [])),
+        target_version_option_callback(None, None, tuple(config.get('target_version', ()))),
     )
 
     return Mode(
         target_versions=target_versions,
         line_length=config.get('line_length', Mode.line_length),
-        is_pyi=config.get('is_pyi', Mode.is_pyi),
-        string_normalization=not config.get('skip_string_normalization', not Mode.string_normalization),
-        experimental_string_processing=config.get(
-            'experimental_string_processing', Mode.experimental_string_processing
+        is_pyi=bool(config.get('is_pyi', Mode.is_pyi)),
+        string_normalization=not bool(config.get('skip_string_normalization', not Mode.string_normalization)),
+        experimental_string_processing=bool(
+            config.get('experimental_string_processing', Mode.experimental_string_processing)
         ),
     )
 
@@ -2180,6 +2185,14 @@ KNOWN_DEPRECATED_DIRECT_IMPORTS: Set[str] = {
     'This module is deprecated. Please use `airflow.providers.amazon.aws.operators.sagemaker`.',
     'This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.sagemaker`.',
     'This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.emr`.',
+    'This module is deprecated. Please use `airflow.providers.opsgenie.hooks.opsgenie`.',
+    'This module is deprecated. Please use `airflow.providers.opsgenie.operators.opsgenie`.',
+    'This module is deprecated. Please use `airflow.hooks.redshift_sql` '
+    'or `airflow.hooks.redshift_cluster` as appropriate.',
+    'This module is deprecated. Please use `airflow.providers.amazon.aws.operators.redshift_sql` or '
+    '`airflow.providers.amazon.aws.operators.redshift_cluster` as appropriate.',
+    'This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.redshift_cluster`.',
+    "This module is deprecated. Please use airflow.providers.amazon.aws.transfers.sql_to_s3`.",
 }
 
 

[airflow] 28/43: Fix docs link for smart sensor deprecation (#21394)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f41ea340769ac8d7d0eec10e4de68e446abac2e4
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Feb 7 08:55:20 2022 -0700

    Fix docs link for smart sensor deprecation (#21394)
    
    We are releasing the deprecation in version 2.2.4, not 2.3.0 like
    originally planned.
    
    (cherry picked from commit 3a780380d8f5d50ffc876c326e70ee0eee033c0d)
---
 UPDATING.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/UPDATING.md b/UPDATING.md
index a75b2d6..2ed4aac 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -87,7 +87,7 @@ https://developers.google.com/style/inclusive-documentation
 
 Smart sensors, an "early access" feature added in Airflow 2, are now deprecated and will be removed in Airflow 2.4.0. They have been superseded by Deferable Operators, added in Airflow 2.2.0.
 
-See [Migrating to Deferrable Operators](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/smart-sensors.html#migrating-to-deferrable-operators) for details on how to migrate.
+See [Migrating to Deferrable Operators](https://airflow.apache.org/docs/apache-airflow/2.2.4/concepts/smart-sensors.html#migrating-to-deferrable-operators) for details on how to migrate.
 
 ## Airflow 2.2.3
 

[airflow] 30/43: Add possibility to create user in the Remote User mode (#19963)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9b03071333cf8e40f5ee9b8aa030656df59eb83c
Author: Łukasz Wyszomirski <wy...@google.com>
AuthorDate: Fri Jan 28 06:18:05 2022 +0100

    Add possibility to create user in the Remote User mode (#19963)
    
    (cherry picked from commit cdd9ea66208e3d70d1cf2a34530ba69bc3c58a50)
---
 airflow/www/views.py | 25 +++++++++++++++++++++++++
 1 file changed, 25 insertions(+)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index f2642a7..2ed2a67 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -4731,3 +4731,28 @@ class CustomUserOIDModelView(MultiResourceUserMixin, UserOIDModelView):
 
 class CustomUserRemoteUserModelView(MultiResourceUserMixin, UserRemoteUserModelView):
     """Customize permission names for FAB's builtin UserRemoteUserModelView."""
+
+    _class_permission_name = permissions.RESOURCE_USER
+
+    class_permission_name_mapping = {
+        'userinfoedit': permissions.RESOURCE_MY_PROFILE,
+        'userinfo': permissions.RESOURCE_MY_PROFILE,
+    }
+
+    method_permission_name = {
+        'add': 'create',
+        'userinfo': 'read',
+        'download': 'read',
+        'show': 'read',
+        'list': 'read',
+        'edit': 'edit',
+        'userinfoedit': 'edit',
+        'delete': 'delete',
+    }
+
+    base_permissions = [
+        permissions.ACTION_CAN_CREATE,
+        permissions.ACTION_CAN_READ,
+        permissions.ACTION_CAN_EDIT,
+        permissions.ACTION_CAN_DELETE,
+    ]

[airflow] 38/43: update tutorial_etl_dag notes (#21503)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit dd0a3a3d768b8cb2118c0b8d89ed0af0b393d865
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Fri Feb 11 10:17:18 2022 +0200

    update tutorial_etl_dag notes (#21503)
    
    * update tutorial_etl_dag notes
    
    (cherry picked from commit a42607a4b75586a396d6a56145ed048d127dd344)
---
 airflow/example_dags/tutorial_etl_dag.py | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_etl_dag.py
index 8dd0ea4..dd18449 100644
--- a/airflow/example_dags/tutorial_etl_dag.py
+++ b/airflow/example_dags/tutorial_etl_dag.py
@@ -19,9 +19,7 @@
 
 """
 ### ETL DAG Tutorial Documentation
-This ETL DAG is compatible with Airflow 1.10.x (specifically tested with 1.10.12) and is referenced
-as part of the documentation that goes along with the Airflow Functional DAG tutorial located
-[here](https://airflow.apache.org/tutorial_decorated_flows.html)
+This ETL DAG is demonstrating an Extract -> Transform -> Load pipeline
 """
 # [START tutorial]
 # [START import_module]

[airflow] 10/43: Helper for provide_session-decorated functions (#20104)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit dda864d585431c1c46c2705c40ed27ab9c43be72
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Dec 7 21:50:34 2021 +0800

    Helper for provide_session-decorated functions (#20104)
    
    * Helper for provide_session-decorated functions
    
    * Apply NEW_SESSION trick on XCom
    
    (cherry picked from commit a80ac1eecc0ea187de7984510b4ef6f981b97196)
---
 airflow/models/xcom.py   | 24 ++++++++++++------------
 airflow/settings.py      | 10 ++++++----
 airflow/utils/session.py | 11 +++++++++--
 3 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 4bb9689..5efaa0a 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -32,7 +32,7 @@ from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
 from airflow.utils import timezone
 from airflow.utils.helpers import is_container
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import provide_session
+from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
 
 log = logging.getLogger(__name__)
@@ -90,7 +90,7 @@ class BaseXCom(Base, LoggingMixin):
         dag_id: str,
         task_id: str,
         run_id: str,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> None:
         """Store an XCom value.
 
@@ -116,7 +116,7 @@ class BaseXCom(Base, LoggingMixin):
         task_id: str,
         dag_id: str,
         execution_date: datetime.datetime,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> None:
         """:sphinx-autoapi-skip:"""
 
@@ -129,7 +129,7 @@ class BaseXCom(Base, LoggingMixin):
         task_id: str,
         dag_id: str,
         execution_date: Optional[datetime.datetime] = None,
-        session: Session = None,
+        session: Session = NEW_SESSION,
         *,
         run_id: Optional[str] = None,
     ) -> None:
@@ -170,7 +170,7 @@ class BaseXCom(Base, LoggingMixin):
         task_id: Optional[str] = None,
         dag_id: Optional[str] = None,
         include_prior_dates: bool = False,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> Optional[Any]:
         """Retrieve an XCom value, optionally meeting certain criteria.
 
@@ -207,7 +207,7 @@ class BaseXCom(Base, LoggingMixin):
         task_id: Optional[str] = None,
         dag_id: Optional[str] = None,
         include_prior_dates: bool = False,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> Optional[Any]:
         """:sphinx-autoapi-skip:"""
 
@@ -220,7 +220,7 @@ class BaseXCom(Base, LoggingMixin):
         task_id: Optional[Union[str, Iterable[str]]] = None,
         dag_id: Optional[Union[str, Iterable[str]]] = None,
         include_prior_dates: bool = False,
-        session: Session = None,
+        session: Session = NEW_SESSION,
         *,
         run_id: Optional[str] = None,
     ) -> Optional[Any]:
@@ -265,7 +265,7 @@ class BaseXCom(Base, LoggingMixin):
         dag_ids: Union[str, Iterable[str], None] = None,
         include_prior_dates: bool = False,
         limit: Optional[int] = None,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> Query:
         """Composes a query to get one or more XCom entries.
 
@@ -300,7 +300,7 @@ class BaseXCom(Base, LoggingMixin):
         dag_ids: Union[str, Iterable[str], None] = None,
         include_prior_dates: bool = False,
         limit: Optional[int] = None,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> Query:
         """:sphinx-autoapi-skip:"""
 
@@ -314,7 +314,7 @@ class BaseXCom(Base, LoggingMixin):
         dag_ids: Optional[Union[str, Iterable[str]]] = None,
         include_prior_dates: bool = False,
         limit: Optional[int] = None,
-        session: Session = None,
+        session: Session = NEW_SESSION,
         *,
         run_id: Optional[str] = None,
     ) -> Query:
@@ -397,7 +397,7 @@ class BaseXCom(Base, LoggingMixin):
         execution_date: pendulum.DateTime,
         dag_id: str,
         task_id: str,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> None:
         """:sphinx-autoapi-skip:"""
 
@@ -409,7 +409,7 @@ class BaseXCom(Base, LoggingMixin):
         dag_id: Optional[str] = None,
         task_id: Optional[str] = None,
         run_id: Optional[str] = None,
-        session: Session = None,
+        session: Session = NEW_SESSION,
     ) -> None:
         """:sphinx-autoapi-skip:"""
         # Given the historic order of this function (execution_date was first argument) to add a new optional
diff --git a/airflow/settings.py b/airflow/settings.py
index f9b97a2..139d6a4 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -22,7 +22,7 @@ import logging
 import os
 import sys
 import warnings
-from typing import Optional
+from typing import TYPE_CHECKING, Callable, List, Optional
 
 import pendulum
 import sqlalchemy
@@ -37,6 +37,9 @@ from airflow.executors import executor_constants
 from airflow.logging_config import configure_logging
 from airflow.utils.orm_event_handlers import setup_event_handlers
 
+if TYPE_CHECKING:
+    from airflow.www.utils import UIAlert
+
 log = logging.getLogger(__name__)
 
 
@@ -77,7 +80,7 @@ DONOT_MODIFY_HANDLERS: Optional[bool] = None
 DAGS_FOLDER: str = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
 
 engine: Optional[Engine] = None
-Session: Optional[SASession] = None
+Session: Callable[..., SASession]
 
 # The JSON library to use for DAG Serialization and De-Serialization
 json = json
@@ -563,8 +566,7 @@ MASK_SECRETS_IN_LOGS = False
 #       UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True),
 #   ]
 #
-# DASHBOARD_UIALERTS: List["UIAlert"]
-DASHBOARD_UIALERTS = []
+DASHBOARD_UIALERTS: List["UIAlert"] = []
 
 # Prefix used to identify tables holding data moved during migration.
 AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
diff --git a/airflow/utils/session.py b/airflow/utils/session.py
index 9636fc4..f0c3168 100644
--- a/airflow/utils/session.py
+++ b/airflow/utils/session.py
@@ -18,7 +18,7 @@
 import contextlib
 from functools import wraps
 from inspect import signature
-from typing import Callable, Iterator, TypeVar
+from typing import Callable, Iterator, TypeVar, cast
 
 from airflow import settings
 
@@ -26,7 +26,7 @@ from airflow import settings
 @contextlib.contextmanager
 def create_session() -> Iterator[settings.SASession]:
     """Contextmanager that will create and teardown a session."""
-    session: settings.SASession = settings.Session()
+    session = settings.Session()
     try:
         yield session
         session.commit()
@@ -105,3 +105,10 @@ def create_global_lock(session=None, pg_lock_id=1, lock_name='init', mysql_lock_
         if dialect.name == 'mssql':
             # TODO: make locking works for MSSQL
             pass
+
+
+# A fake session to use in functions decorated by provide_session. This allows
+# the 'session' argument to be of type Session instead of Optional[Session],
+# making it easier to type hint the function body without dealing with the None
+# case that can never happen at runtime.
+NEW_SESSION: settings.SASession = cast(settings.SASession, None)

[airflow] 37/43: Simplify trigger cancel button (#21591)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 628aa1f99c865d97d0b1c7c76e630e43a7b8d319
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Feb 15 11:00:26 2022 -0700

    Simplify trigger cancel button (#21591)
    
    Co-authored-by: Jed Cunningham <je...@apache.org>
    (cherry picked from commit 65297673a318660fba76797e50d0c06804dfcafc)
---
 airflow/www/templates/airflow/trigger.html |  2 +-
 tests/www/views/test_views_trigger_dag.py  | 11 +++++------
 2 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/airflow/www/templates/airflow/trigger.html b/airflow/www/templates/airflow/trigger.html
index efc1650..2388d4e 100644
--- a/airflow/www/templates/airflow/trigger.html
+++ b/airflow/www/templates/airflow/trigger.html
@@ -63,7 +63,7 @@
       </label>
     </div>
     <button type="submit" class="btn btn-primary">Trigger</button>
-    <button type="button" class="btn" onclick="location.href = '{{ origin }}'; return false">Cancel</button>
+    <a class="btn" href="{{ origin }}">Cancel</a>
   </form>
 {% endblock %}
 
diff --git a/tests/www/views/test_views_trigger_dag.py b/tests/www/views/test_views_trigger_dag.py
index f261438..2b43468 100644
--- a/tests/www/views/test_views_trigger_dag.py
+++ b/tests/www/views/test_views_trigger_dag.py
@@ -134,6 +134,10 @@ def test_trigger_dag_form(admin_client):
         ("http://google.com", "/home"),
         ("36539'%3balert(1)%2f%2f166", "/home"),
         (
+            '"><script>alert(99)</script><a href="',
+            "&#34;&gt;&lt;script&gt;alert(99)&lt;/script&gt;&lt;a href=&#34;",
+        ),
+        (
             "%2Ftree%3Fdag_id%3Dexample_bash_operator';alert(33)//",
             "/home",
         ),
@@ -145,12 +149,7 @@ def test_trigger_dag_form_origin_url(admin_client, test_origin, expected_origin)
     test_dag_id = "example_bash_operator"
 
     resp = admin_client.get(f'trigger?dag_id={test_dag_id}&origin={test_origin}')
-    check_content_in_response(
-        '<button type="button" class="btn" onclick="location.href = \'{}\'; return false">'.format(
-            expected_origin
-        ),
-        resp,
-    )
+    check_content_in_response(f'<a class="btn" href="{expected_origin}">Cancel</a>', resp)
 
 
 @pytest.mark.parametrize(

[airflow] 23/43: Docs: Fix task order in overview example (#21282)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a519e53ddee5675402c091c1e99fe1643a968e87
Author: Lucia Kasman <38...@users.noreply.github.com>
AuthorDate: Thu Feb 3 15:10:27 2022 -0300

    Docs: Fix task order in overview example (#21282)
    
    (cherry picked from commit 1ba83c01b2b466ad5a76a453e5f6ee2884081e53)
---
 docs/apache-airflow/concepts/overview.rst | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/concepts/overview.rst b/docs/apache-airflow/concepts/overview.rst
index fd862ea..567c3c8 100644
--- a/docs/apache-airflow/concepts/overview.rst
+++ b/docs/apache-airflow/concepts/overview.rst
@@ -65,12 +65,12 @@ Control Flow
 :doc:`tasks` have dependencies declared on each other. You'll see this in a DAG either using the ``>>`` and ``<<`` operators::
 
     first_task >> [second_task, third_task]
-    third_task << fourth_task
+    fourth_task << third_task
 
 Or, with the ``set_upstream`` and ``set_downstream`` methods::
 
     first_task.set_downstream([second_task, third_task])
-    third_task.set_upstream(fourth_task)
+    fourth_task.set_upstream(third_task)
 
 These dependencies are what make up the "edges" of the graph, and how Airflow works out which order to run your tasks in. By default, a task will wait for all of its upstream tasks to succeed before it runs, but this can be customized using features like :ref:`Branching <concepts:branching>`, :ref:`LatestOnly <concepts:latest-only>`, and :ref:`Trigger Rules <concepts:trigger-rules>`.
 

[airflow] 09/43: Type-annotate SkipMixin and BaseXCom (#20011)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 016929f782022791557d0ec1f316b338dc210566
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Dec 7 17:55:00 2021 +0800

    Type-annotate SkipMixin and BaseXCom (#20011)
    
    (cherry picked from commit 6dd0a0df7e6a2f025e9234bdbf97b41e9b8f6257)
---
 airflow/models/skipmixin.py |  15 +-
 airflow/models/xcom.py      | 335 ++++++++++++++++++++++++++++++--------------
 2 files changed, 232 insertions(+), 118 deletions(-)

diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py
index 5cd50a3..765a947 100644
--- a/airflow/models/skipmixin.py
+++ b/airflow/models/skipmixin.py
@@ -17,7 +17,7 @@
 # under the License.
 
 import warnings
-from typing import TYPE_CHECKING, Iterable, Union
+from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Union
 
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
@@ -26,6 +26,7 @@ from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import State
 
 if TYPE_CHECKING:
+    from pendulum import DateTime
     from sqlalchemy import Session
 
     from airflow.models import DagRun
@@ -66,9 +67,9 @@ class SkipMixin(LoggingMixin):
     def skip(
         self,
         dag_run: "DagRun",
-        execution_date: "timezone.DateTime",
-        tasks: "Iterable[BaseOperator]",
-        session: "Session" = None,
+        execution_date: "DateTime",
+        tasks: Sequence["BaseOperator"],
+        session: "Session",
     ):
         """
         Sets tasks instances to skipped from the same dag run.
@@ -114,11 +115,7 @@ class SkipMixin(LoggingMixin):
         session.commit()
 
         # SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available.
-        try:
-            task_id = self.task_id
-        except AttributeError:
-            task_id = None
-
+        task_id: Optional[str] = getattr(self, "task_id", None)
         if task_id is not None:
             from airflow.models.xcom import XCom
 
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 99c2b9a..4bb9689 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -16,10 +16,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
 import json
 import logging
 import pickle
-from typing import Any, Iterable, Optional, Union
+from typing import TYPE_CHECKING, Any, Iterable, Optional, Type, Union, cast, overload
 
 import pendulum
 from sqlalchemy import Column, LargeBinary, String
@@ -79,14 +80,60 @@ class BaseXCom(Base, LoggingMixin):
     def __repr__(self):
         return f'<XCom "{self.key}" ({self.task_id} @ {self.execution_date})>'
 
+    @overload
     @classmethod
-    @provide_session
-    def set(cls, key, value, task_id, dag_id, execution_date=None, run_id=None, session=None):
+    def set(
+        cls,
+        key: str,
+        value: Any,
+        *,
+        dag_id: str,
+        task_id: str,
+        run_id: str,
+        session: Optional[Session] = None,
+    ) -> None:
+        """Store an XCom value.
+
+        A deprecated form of this function accepts ``execution_date`` instead of
+        ``run_id``. The two arguments are mutually exclusive.
+
+        :param key: Key to store the XCom.
+        :param value: XCom value to store.
+        :param dag_id: DAG ID.
+        :param task_id: Task ID.
+        :param run_id: DAG run ID for the task.
+        :param session: Database session. If not given, a new session will be
+            created for this function.
+        :type session: sqlalchemy.orm.session.Session
         """
-        Store an XCom value.
 
-        :return: None
-        """
+    @overload
+    @classmethod
+    def set(
+        cls,
+        key: str,
+        value: Any,
+        task_id: str,
+        dag_id: str,
+        execution_date: datetime.datetime,
+        session: Optional[Session] = None,
+    ) -> None:
+        """:sphinx-autoapi-skip:"""
+
+    @classmethod
+    @provide_session
+    def set(
+        cls,
+        key: str,
+        value: Any,
+        task_id: str,
+        dag_id: str,
+        execution_date: Optional[datetime.datetime] = None,
+        session: Session = None,
+        *,
+        run_id: Optional[str] = None,
+    ) -> None:
+        """:sphinx-autoapi-skip:"""
         if not (execution_date is None) ^ (run_id is None):
             raise ValueError("Exactly one of execution_date or run_id must be passed")
 
@@ -94,70 +141,95 @@ class BaseXCom(Base, LoggingMixin):
             from airflow.models.dagrun import DagRun
 
             dag_run = session.query(DagRun).filter_by(dag_id=dag_id, run_id=run_id).one()
-
             execution_date = dag_run.execution_date
 
-        value = XCom.serialize_value(value)
-
-        # remove any duplicate XComs
+        # Remove duplicate XComs and insert a new one.
         session.query(cls).filter(
-            cls.key == key, cls.execution_date == execution_date, cls.task_id == task_id, cls.dag_id == dag_id
+            cls.key == key,
+            cls.execution_date == execution_date,
+            cls.task_id == task_id,
+            cls.dag_id == dag_id,
         ).delete()
-
+        new = cast(Any, cls)(  # Work around Mypy complaining model not defining '__init__'.
+            key=key,
+            value=cls.serialize_value(value),
+            execution_date=execution_date,
+            task_id=task_id,
+            dag_id=dag_id,
+        )
+        session.add(new)
         session.flush()
 
-        # insert new XCom
-        session.add(XCom(key=key, value=value, execution_date=execution_date, task_id=task_id, dag_id=dag_id))
+    @overload
+    @classmethod
+    def get_one(
+        cls,
+        *,
+        run_id: str,
+        key: Optional[str] = None,
+        task_id: Optional[str] = None,
+        dag_id: Optional[str] = None,
+        include_prior_dates: bool = False,
+        session: Optional[Session] = None,
+    ) -> Optional[Any]:
+        """Retrieve an XCom value, optionally meeting certain criteria.
+
+        This method returns "full" XCom values (i.e. uses ``deserialize_value``
+        from the XCom backend). Use :meth:`get_many` if you want the "shortened"
+        value via ``orm_deserialize_value``.
+
+        If there are no results, *None* is returned.
+
+        A deprecated form of this function accepts ``execution_date`` instead of
+        ``run_id``. The two arguments are mutually exclusive.
+
+        :param run_id: DAG run ID for the task.
+        :param key: A key for the XCom. If provided, only XCom with matching
+            keys will be returned. Pass *None* (default) to remove the filter.
+        :param task_id: Only XCom from task with matching ID will be pulled.
+            Pass *None* (default) to remove the filter.
+        :param dag_id: Only pull XCom from this DAG. If *None* (default), the
+            DAG of the calling task is used.
+        :param include_prior_dates: If *False* (default), only XCom from the
+            specified DAG run is returned. If *True*, the latest matching XCom is
+            returned regardless of the run it belongs to.
+        :param session: Database session. If not given, a new session will be
+            created for this function.
+        :type session: sqlalchemy.orm.session.Session
+        """
 
-        session.flush()
+    @overload
+    @classmethod
+    def get_one(
+        cls,
+        execution_date: pendulum.DateTime,
+        key: Optional[str] = None,
+        task_id: Optional[str] = None,
+        dag_id: Optional[str] = None,
+        include_prior_dates: bool = False,
+        session: Optional[Session] = None,
+    ) -> Optional[Any]:
+        """:sphinx-autoapi-skip:"""
 
     @classmethod
     @provide_session
     def get_one(
         cls,
         execution_date: Optional[pendulum.DateTime] = None,
-        run_id: Optional[str] = None,
         key: Optional[str] = None,
         task_id: Optional[Union[str, Iterable[str]]] = None,
         dag_id: Optional[Union[str, Iterable[str]]] = None,
         include_prior_dates: bool = False,
         session: Session = None,
+        *,
+        run_id: Optional[str] = None,
     ) -> Optional[Any]:
-        """
-        Retrieve an XCom value, optionally meeting certain criteria. Returns None
-        of there are no results.
-
-        ``run_id`` and ``execution_date`` are mutually exclusive.
-
-        This method returns "full" XCom values (i.e. it uses ``deserialize_value`` from the XCom backend).
-        Please use :meth:`get_many` if you want the "shortened" value via ``orm_deserialize_value``
-
-        :param execution_date: Execution date for the task
-        :type execution_date: pendulum.datetime
-        :param run_id: Dag run id for the task
-        :type run_id: str
-        :param key: A key for the XCom. If provided, only XComs with matching
-            keys will be returned. To remove the filter, pass key=None.
-        :type key: str
-        :param task_id: Only XComs from task with matching id will be
-            pulled. Can pass None to remove the filter.
-        :type task_id: str
-        :param dag_id: If provided, only pulls XCom from this DAG.
-            If None (default), the DAG of the calling task is used.
-        :type dag_id: str
-        :param include_prior_dates: If False, only XCom from the current
-            execution_date are returned. If True, XCom from previous dates
-            are returned as well.
-        :type include_prior_dates: bool
-        :param session: database session
-        :type session: sqlalchemy.orm.session.Session
-        """
+        """:sphinx-autoapi-skip:"""
         if not (execution_date is None) ^ (run_id is None):
             raise ValueError("Exactly one of execution_date or run_id must be passed")
 
-        result = (
-            cls.get_many(
-                execution_date=execution_date,
+        if run_id is not None:
+            query = cls.get_many(
                 run_id=run_id,
                 key=key,
                 task_ids=task_id,
@@ -165,58 +237,88 @@ class BaseXCom(Base, LoggingMixin):
                 include_prior_dates=include_prior_dates,
                 session=session,
             )
-            .with_entities(cls.value)
-            .first()
-        )
+        elif execution_date is not None:
+            query = cls.get_many(
+                execution_date=execution_date,
+                key=key,
+                task_ids=task_id,
+                dag_ids=dag_id,
+                include_prior_dates=include_prior_dates,
+                session=session,
+            )
+        else:
+            raise RuntimeError("Should not happen?")
+
+        result = query.with_entities(cls.value).first()
         if result:
             return cls.deserialize_value(result)
         return None
 
+    @overload
+    @classmethod
+    def get_many(
+        cls,
+        *,
+        run_id: str,
+        key: Optional[str] = None,
+        task_ids: Union[str, Iterable[str], None] = None,
+        dag_ids: Union[str, Iterable[str], None] = None,
+        include_prior_dates: bool = False,
+        limit: Optional[int] = None,
+        session: Optional[Session] = None,
+    ) -> Query:
+        """Composes a query to get one or more XCom entries.
+
+        This function returns an SQLAlchemy query of full XCom objects. If you
+        just want one stored value, use :meth:`get_one` instead.
+
+        A deprecated form of this function accepts ``execution_date`` instead of
+        ``run_id``. The two arguments are mutually exclusive.
+
+        :param run_id: DAG run ID for the task.
+        :param key: A key for the XComs. If provided, only XComs with matching
+            keys will be returned. Pass *None* (default) to remove the filter.
+        :param task_ids: Only XComs from task with matching IDs will be pulled.
+            Pass *None* (default) to remove the filter.
+        :param dag_id: Only pulls XComs from this DAG. If *None* (default), the
+            DAG of the calling task is used.
+        :param include_prior_dates: If *False* (default), only XComs from the
+            specified DAG run are returned. If *True*, all matching XComs are
+            returned regardless of the run it belongs to.
+        :param session: Database session. If not given, a new session will be
+            created for this function.
+        :type session: sqlalchemy.orm.session.Session
+        """
+
+    @overload
+    @classmethod
+    def get_many(
+        cls,
+        execution_date: pendulum.DateTime,
+        key: Optional[str] = None,
+        task_ids: Union[str, Iterable[str], None] = None,
+        dag_ids: Union[str, Iterable[str], None] = None,
+        include_prior_dates: bool = False,
+        limit: Optional[int] = None,
+        session: Optional[Session] = None,
+    ) -> Query:
+        """:sphinx-autoapi-skip:"""
+
     @classmethod
     @provide_session
     def get_many(
         cls,
         execution_date: Optional[pendulum.DateTime] = None,
-        run_id: Optional[str] = None,
         key: Optional[str] = None,
         task_ids: Optional[Union[str, Iterable[str]]] = None,
         dag_ids: Optional[Union[str, Iterable[str]]] = None,
         include_prior_dates: bool = False,
         limit: Optional[int] = None,
         session: Session = None,
+        *,
+        run_id: Optional[str] = None,
     ) -> Query:
-        """
-        Composes a query to get one or more values from the xcom table.
-
-        ``run_id`` and ``execution_date`` are mutually exclusive.
-
-        This function returns an SQLAlchemy query of full XCom objects. If you just want one stored value then
-        use :meth:`get_one`.
-
-        :param execution_date: Execution date for the task
-        :type execution_date: pendulum.datetime
-        :param run_id: Dag run id for the task
-        :type run_id: str
-        :param key: A key for the XCom. If provided, only XComs with matching
-            keys will be returned. To remove the filter, pass key=None.
-        :type key: str
-        :param task_ids: Only XComs from tasks with matching ids will be
-            pulled. Can pass None to remove the filter.
-        :type task_ids: str or iterable of strings (representing task_ids)
-        :param dag_ids: If provided, only pulls XComs from this DAG.
-            If None (default), the DAG of the calling task is used.
-        :type dag_ids: str
-        :param include_prior_dates: If False, only XComs from the current
-            execution_date are returned. If True, XComs from previous dates
-            are returned as well.
-        :type include_prior_dates: bool
-        :param limit: If required, limit the number of returned objects.
-            XCom objects can be quite big and you might want to limit the
-            number of rows.
-        :type limit: int
-        :param session: database session
-        :type session: sqlalchemy.orm.session.Session
-        """
+        """:sphinx-autoapi-skip:"""
         if not (execution_date is None) ^ (run_id is None):
             raise ValueError("Exactly one of execution_date or run_id must be passed")
 
@@ -262,8 +364,8 @@ class BaseXCom(Base, LoggingMixin):
 
     @classmethod
     @provide_session
-    def delete(cls, xcoms, session=None):
-        """Delete Xcom"""
+    def delete(cls, xcoms: Union["XCom", Iterable["XCom"]], session: Session) -> None:
+        """Delete one or multiple XCom entries."""
         if isinstance(xcoms, XCom):
             xcoms = [xcoms]
         for xcom in xcoms:
@@ -272,37 +374,49 @@ class BaseXCom(Base, LoggingMixin):
             session.delete(xcom)
         session.commit()
 
+    @overload
+    @classmethod
+    def clear(cls, *, dag_id: str, task_id: str, run_id: str, session: Optional[Session] = None) -> None:
+        """Clear all XCom data from the database for the given task instance.
+
+        A deprecated form of this function accepts ``execution_date`` instead of
+        ``run_id``. The two arguments are mutually exclusive.
+
+        :param dag_id: ID of DAG to clear the XCom for.
+        :param task_id: ID of task to clear the XCom for.
+        :param run_id: ID of DAG run to clear the XCom for.
+        :param session: Database session. If not given, a new session will be
+            created for this function.
+        :type session: sqlalchemy.orm.session.Session
+        """
+
+    @overload
+    @classmethod
+    def clear(
+        cls,
+        execution_date: pendulum.DateTime,
+        dag_id: str,
+        task_id: str,
+        session: Optional[Session] = None,
+    ) -> None:
+        """:sphinx-autoapi-skip:"""
+
     @classmethod
     @provide_session
     def clear(
         cls,
         execution_date: Optional[pendulum.DateTime] = None,
-        dag_id: str = None,
-        task_id: str = None,
-        run_id: str = None,
+        dag_id: Optional[str] = None,
+        task_id: Optional[str] = None,
+        run_id: Optional[str] = None,
         session: Session = None,
     ) -> None:
-        """
-        Clears all XCom data from the database for the task instance
-
-        ``run_id`` and ``execution_date`` are mutually exclusive.
-
-        :param execution_date: Execution date for the task
-        :type execution_date: pendulum.datetime or None
-        :param dag_id: ID of DAG to clear the XCom for.
-        :type dag_id: str
-        :param task_id: Only XComs from task with matching id will be cleared.
-        :type task_id: str
-        :param run_id: Dag run id for the task
-        :type run_id: str or None
-        :param session: database session
-        :type session: sqlalchemy.orm.session.Session
-        """
+        """:sphinx-autoapi-skip:"""
         # Given the historic order of this function (execution_date was first argument) to add a new optional
         # param we need to add default values for everything :(
-        if not dag_id:
+        if dag_id is None:
             raise TypeError("clear() missing required argument: dag_id")
-        if not task_id:
+        if task_id is None:
             raise TypeError("clear() missing required argument: task_id")
 
         if not (execution_date is None) ^ (run_id is None):
@@ -364,7 +478,7 @@ class BaseXCom(Base, LoggingMixin):
         return BaseXCom.deserialize_value(self)
 
 
-def resolve_xcom_backend():
+def resolve_xcom_backend() -> Type[BaseXCom]:
     """Resolves custom XCom class"""
     clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
     if clazz:
@@ -376,4 +490,7 @@ def resolve_xcom_backend():
     return BaseXCom
 
 
-XCom = resolve_xcom_backend()
+if TYPE_CHECKING:
+    XCom = BaseXCom  # Hack to avoid Mypy "Variable 'XCom' is not valid as a type".
+else:
+    XCom = resolve_xcom_backend()

[airflow] 17/43: Fix Scheduler crash when executing task instances of missing DAG (#20349)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1b139a77d012971d147bffc74646923514db4b48
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Jan 13 22:23:10 2022 +0100

    Fix Scheduler crash when executing task instances of missing DAG (#20349)
    
    When executing task instances, we do not check if the dag is missing in
    the dagbag. This PR fixes it by ignoring task instances if we can't find
    the dag in serialized dag table
    
    Closes: #20099
    (cherry picked from commit 98715760f72e5205c291293088b5e79636884491)
---
 airflow/jobs/scheduler_job.py    | 11 +++++++++++
 tests/jobs/test_scheduler_job.py | 28 ++++++++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2fedf80..490d507 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -375,6 +375,17 @@ class SchedulerJob(BaseJob):
                     # Many dags don't have a task_concurrency, so where we can avoid loading the full
                     # serialized DAG the better.
                     serialized_dag = self.dagbag.get_dag(dag_id, session=session)
+                    # If the dag is missing, fail the task and continue to the next task.
+                    if not serialized_dag:
+                        self.log.error(
+                            "DAG '%s' for task instance %s not found in serialized_dag table",
+                            dag_id,
+                            task_instance,
+                        )
+                        session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
+                            {TI.state: State.FAILED}, synchronize_session='fetch'
+                        )
+                        continue
                     if serialized_dag.has_task(task_instance.task_id):
                         task_concurrency_limit = serialized_dag.get_task(
                             task_instance.task_id
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index a308029..7185720 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -610,6 +610,34 @@ class TestSchedulerJob:
         session.rollback()
         session.close()
 
+    def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
+        """Check that task instances of missing DAGs are failed"""
+        dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag'
+        task_id_1 = 'dummy'
+        task_id_2 = 'dummydummy'
+
+        with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 1}):
+            DummyOperator(task_id=task_id_1)
+            DummyOperator(task_id=task_id_2)
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.dagbag = mock.MagicMock()
+        self.scheduler_job.dagbag.get_dag.return_value = None
+
+        dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+        tis = dr.task_instances
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        assert 0 == len(res)
+        tis = dr.get_task_instances(session=session)
+        assert len(tis) == 2
+        assert all(ti.state == State.FAILED for ti in tis)
+
     def test_nonexistent_pool(self, dag_maker):
         dag_id = 'SchedulerJobTest.test_nonexistent_pool'
         with dag_maker(dag_id=dag_id, max_active_tasks=16):

[airflow] 26/43: Update error docs to include before_send option (#21275)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 270516cb1ac54bcf3ede888ddfffeb9154fa37d3
Author: Abhijeet Prasad <de...@gmail.com>
AuthorDate: Thu Feb 3 18:15:21 2022 -0500

    Update error docs to include before_send option (#21275)
    
    https://github.com/apache/airflow/pull/18261 Added support for the `before_send` option when initializing the Sentry SDK in airflow. This patch updates the documentation to reflect this change.
    (cherry picked from commit b38391e2f91760e64576723c876341f532a6ee2d)
---
 docs/apache-airflow/logging-monitoring/errors.rst | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/logging-monitoring/errors.rst b/docs/apache-airflow/logging-monitoring/errors.rst
index 9f4256a..30bb66d 100644
--- a/docs/apache-airflow/logging-monitoring/errors.rst
+++ b/docs/apache-airflow/logging-monitoring/errors.rst
@@ -42,8 +42,14 @@ Add your ``SENTRY_DSN`` to your configuration file e.g. ``airflow.cfg`` in ``[se
 .. note::
     If this value is not provided, the SDK will try to read it from the ``SENTRY_DSN`` environment variable.
 
-You can supply `additional configuration options <https://docs.sentry.io/platforms/python/configuration/options>`__ based on the Python platform via ``[sentry]`` section.
-Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``, ``ignore_errors``, ``before_breadcrumb``, ``before_send``, ``transport``.
+The ``before_send`` option can be used to modify or drop events before they are sent to Sentry. To set this option, provide a dotted path to a before_send function that the sentry SDK should be configured to use.
+
+.. code-block:: ini
+
+    [sentry]
+    before_send = path.to.my.sentry.before_send
+
+You can supply `additional configuration options <https://docs.sentry.io/platforms/python/configuration/options>`__ based on the Python platform via ``[sentry]`` section. Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``, ``ignore_errors``, ``before_breadcrumb``, ``transport``.
 
 Tags
 -----

[airflow] 24/43: Update stat_name_handler documentation (#21298)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 015c481f63825e6d5a7d044d970c563a6450040c
Author: Fran Sánchez <fj...@users.noreply.github.com>
AuthorDate: Thu Feb 3 18:12:08 2022 +0000

    Update stat_name_handler documentation (#21298)
    
    Previously stat_name_handler was under the scheduler section of the
    configuration but it was moved to the metrics section since 2.0.0.
    
    (cherry picked from commit 0ae31e9cb95e5061a23c2f397ab9716391c1a488)
---
 docs/apache-airflow/logging-monitoring/metrics.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/logging-monitoring/metrics.rst b/docs/apache-airflow/logging-monitoring/metrics.rst
index c8fd182..bdbd2ec 100644
--- a/docs/apache-airflow/logging-monitoring/metrics.rst
+++ b/docs/apache-airflow/logging-monitoring/metrics.rst
@@ -50,7 +50,7 @@ the metrics that start with the elements of the list:
     statsd_allow_list = scheduler,executor,dagrun
 
 If you want to redirect metrics to different name, you can configure ``stat_name_handler`` option
-in ``[scheduler]`` section.  It should point to a function that validates the statsd stat name, applies changes
+in ``[metrics]`` section.  It should point to a function that validates the StatsD stat name, applies changes
 to the stat name if necessary, and returns the transformed stat name. The function may looks as follow:
 
 .. code-block:: python

[airflow] 12/43: Deprecate some functions in the experimental API (#19931)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 663bb546e782748bdd315483ca2070a77046997a
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Dec 16 12:30:42 2021 +0100

    Deprecate some functions in the experimental API (#19931)
    
    This PR seeks to deprecate some functions in the experimental API.
    Some of the deprecated functions are only used in the experimental REST API,
    others that are valid are being moved out of the experimental package.
    
    (cherry picked from commit 6239ae91a4c8bfb05f053a61cb8386f2d63b8b3a)
---
 airflow/api/client/local_client.py                 |  29 ++++--
 .../api/common/{experimental => }/delete_dag.py    |   3 +-
 airflow/api/common/experimental/delete_dag.py      |  70 ++-----------
 airflow/api/common/experimental/get_code.py        |   3 +
 .../api/common/experimental/get_dag_run_state.py   |   3 +
 airflow/api/common/experimental/get_task.py        |   3 +
 .../api/common/experimental/get_task_instance.py   |   3 +
 airflow/api/common/experimental/pool.py            |   6 ++
 airflow/api/common/experimental/trigger_dag.py     | 115 ++-------------------
 .../api/common/{experimental => }/trigger_dag.py   |   5 +-
 airflow/api_connexion/endpoints/dag_endpoint.py    |   7 +-
 airflow/models/pool.py                             |  52 ++++++++--
 airflow/operators/trigger_dagrun.py                |   2 +-
 airflow/utils/db.py                                |  15 +++
 airflow/www/views.py                               |   2 +-
 setup.cfg                                          |   1 +
 tests/api/client/test_local_client.py              |  31 +++++-
 .../common/{experimental => }/test_delete_dag.py   |   2 +-
 .../common/{experimental => }/test_trigger_dag.py  |   8 +-
 tests/models/test_pool.py                          |  71 +++++++++++++
 20 files changed, 229 insertions(+), 202 deletions(-)

diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py
index 7ce0d16..c005067 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -18,8 +18,10 @@
 """Local client API"""
 
 from airflow.api.client import api_client
-from airflow.api.common.experimental import delete_dag, pool, trigger_dag
+from airflow.api.common import delete_dag, trigger_dag
 from airflow.api.common.experimental.get_lineage import get_lineage as get_lineage_api
+from airflow.exceptions import AirflowBadRequest, PoolNotFound
+from airflow.models.pool import Pool
 
 
 class Client(api_client.Client):
@@ -36,19 +38,30 @@ class Client(api_client.Client):
         return f"Removed {count} record(s)"
 
     def get_pool(self, name):
-        the_pool = pool.get_pool(name=name)
-        return the_pool.pool, the_pool.slots, the_pool.description
+        pool = Pool.get_pool(pool_name=name)
+        if not pool:
+            raise PoolNotFound(f"Pool {name} not found")
+        return pool.pool, pool.slots, pool.description
 
     def get_pools(self):
-        return [(p.pool, p.slots, p.description) for p in pool.get_pools()]
+        return [(p.pool, p.slots, p.description) for p in Pool.get_pools()]
 
     def create_pool(self, name, slots, description):
-        the_pool = pool.create_pool(name=name, slots=slots, description=description)
-        return the_pool.pool, the_pool.slots, the_pool.description
+        if not (name and name.strip()):
+            raise AirflowBadRequest("Pool name shouldn't be empty")
+        pool_name_length = Pool.pool.property.columns[0].type.length
+        if len(name) > pool_name_length:
+            raise AirflowBadRequest(f"pool name cannot be more than {pool_name_length} characters")
+        try:
+            slots = int(slots)
+        except ValueError:
+            raise AirflowBadRequest(f"Bad value for `slots`: {slots}")
+        pool = Pool.create_or_update_pool(name=name, slots=slots, description=description)
+        return pool.pool, pool.slots, pool.description
 
     def delete_pool(self, name):
-        the_pool = pool.delete_pool(name=name)
-        return the_pool.pool, the_pool.slots, the_pool.description
+        pool = Pool.delete_pool(name=name)
+        return pool.pool, pool.slots, pool.description
 
     def get_lineage(self, dag_id, execution_date):
         lineage = get_lineage_api(dag_id=dag_id, execution_date=execution_date)
diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/delete_dag.py
similarity index 97%
copy from airflow/api/common/experimental/delete_dag.py
copy to airflow/api/common/delete_dag.py
index 44e54e3..c448127 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -24,6 +24,7 @@ from airflow import models
 from airflow.exceptions import AirflowException, DagNotFound
 from airflow.models import DagModel, TaskFail
 from airflow.models.serialized_dag import SerializedDagModel
+from airflow.utils.db import get_sqla_model_classes
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
@@ -60,7 +61,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
 
     count = 0
 
-    for model in models.base.Base._decl_class_registry.values():
+    for model in get_sqla_model_classes():
         if hasattr(model, "dag_id"):
             if keep_records_in_log and model.__name__ == 'Log':
                 continue
diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py
index 44e54e3..36bf7dd 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/experimental/delete_dag.py
@@ -15,68 +15,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Delete DAGs APIs."""
-import logging
+import warnings
 
-from sqlalchemy import or_
+from airflow.api.common.delete_dag import *  # noqa
 
-from airflow import models
-from airflow.exceptions import AirflowException, DagNotFound
-from airflow.models import DagModel, TaskFail
-from airflow.models.serialized_dag import SerializedDagModel
-from airflow.utils.session import provide_session
-from airflow.utils.state import State
-
-log = logging.getLogger(__name__)
-
-
-@provide_session
-def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> int:
-    """
-    :param dag_id: the dag_id of the DAG to delete
-    :param keep_records_in_log: whether keep records of the given dag_id
-        in the Log table in the backend database (for reasons like auditing).
-        The default value is True.
-    :param session: session used
-    :return count of deleted dags
-    """
-    log.info("Deleting DAG: %s", dag_id)
-    running_tis = (
-        session.query(models.TaskInstance.state)
-        .filter(models.TaskInstance.dag_id == dag_id)
-        .filter(models.TaskInstance.state == State.RUNNING)
-        .first()
-    )
-    if running_tis:
-        raise AirflowException("TaskInstances still running")
-    dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
-    if dag is None:
-        raise DagNotFound(f"Dag id {dag_id} not found")
-
-    # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
-    # There may be a lag, so explicitly removes serialized DAG here.
-    if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
-        SerializedDagModel.remove_dag(dag_id=dag_id, session=session)
-
-    count = 0
-
-    for model in models.base.Base._decl_class_registry.values():
-        if hasattr(model, "dag_id"):
-            if keep_records_in_log and model.__name__ == 'Log':
-                continue
-            cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
-            count += session.query(model).filter(cond).delete(synchronize_session='fetch')
-    if dag.is_subdag:
-        parent_dag_id, task_id = dag_id.rsplit(".", 1)
-        for model in TaskFail, models.TaskInstance:
-            count += (
-                session.query(model).filter(model.dag_id == parent_dag_id, model.task_id == task_id).delete()
-            )
-
-    # Delete entries in Import Errors table for a deleted DAG
-    # This handles the case when the dag_id is changed in the file
-    session.query(models.ImportError).filter(models.ImportError.filename == dag.fileloc).delete(
-        synchronize_session='fetch'
-    )
-
-    return count
+warnings.warn(
+    "This module is deprecated. Please use `airflow.api.common.delete_dag` instead.",
+    DeprecationWarning,
+    stacklevel=2,
+)
diff --git a/airflow/api/common/experimental/get_code.py b/airflow/api/common/experimental/get_code.py
index 79b0b9f..1a1fb62 100644
--- a/airflow/api/common/experimental/get_code.py
+++ b/airflow/api/common/experimental/get_code.py
@@ -16,11 +16,14 @@
 # specific language governing permissions and limitations
 # under the License.
 """Get code APIs."""
+from deprecated import deprecated
+
 from airflow.api.common.experimental import check_and_get_dag
 from airflow.exceptions import AirflowException, DagCodeNotFound
 from airflow.models.dagcode import DagCode
 
 
+@deprecated(reason="Use DagCode().get_code_by_fileloc() instead", version="2.2.3")
 def get_code(dag_id: str) -> str:
     """Return python code of a given dag_id.
 
diff --git a/airflow/api/common/experimental/get_dag_run_state.py b/airflow/api/common/experimental/get_dag_run_state.py
index ca71a9a..b2dedd5 100644
--- a/airflow/api/common/experimental/get_dag_run_state.py
+++ b/airflow/api/common/experimental/get_dag_run_state.py
@@ -19,9 +19,12 @@
 from datetime import datetime
 from typing import Dict
 
+from deprecated import deprecated
+
 from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
 
 
+@deprecated(reason="Use DagRun().get_state() instead", version="2.2.3")
 def get_dag_run_state(dag_id: str, execution_date: datetime) -> Dict[str, str]:
     """Return the Dag Run state identified by the given dag_id and execution_date.
 
diff --git a/airflow/api/common/experimental/get_task.py b/airflow/api/common/experimental/get_task.py
index 302ad64..fae5fd7 100644
--- a/airflow/api/common/experimental/get_task.py
+++ b/airflow/api/common/experimental/get_task.py
@@ -16,10 +16,13 @@
 # specific language governing permissions and limitations
 # under the License.
 """Task APIs.."""
+from deprecated import deprecated
+
 from airflow.api.common.experimental import check_and_get_dag
 from airflow.models import TaskInstance
 
 
+@deprecated(reason="Use DAG().get_task", version="2.2.3")
 def get_task(dag_id: str, task_id: str) -> TaskInstance:
     """Return the task object identified by the given dag_id and task_id."""
     dag = check_and_get_dag(dag_id, task_id)
diff --git a/airflow/api/common/experimental/get_task_instance.py b/airflow/api/common/experimental/get_task_instance.py
index f3ca1cf2..137f8a3 100644
--- a/airflow/api/common/experimental/get_task_instance.py
+++ b/airflow/api/common/experimental/get_task_instance.py
@@ -18,11 +18,14 @@
 """Task Instance APIs."""
 from datetime import datetime
 
+from deprecated import deprecated
+
 from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
 from airflow.exceptions import TaskInstanceNotFound
 from airflow.models import TaskInstance
 
 
+@deprecated(version="2.2.3", reason="Use DagRun.get_task_instance instead")
 def get_task_instance(dag_id: str, task_id: str, execution_date: datetime) -> TaskInstance:
     """Return the task instance identified by the given dag_id, task_id and execution_date."""
     dag = check_and_get_dag(dag_id, task_id)
diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py
index 30950ea..0b9c3a5 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -16,11 +16,14 @@
 # specific language governing permissions and limitations
 # under the License.
 """Pool APIs."""
+from deprecated import deprecated
+
 from airflow.exceptions import AirflowBadRequest, PoolNotFound
 from airflow.models import Pool
 from airflow.utils.session import provide_session
 
 
+@deprecated(reason="Use Pool.get_pool() instead", version="2.2.3")
 @provide_session
 def get_pool(name, session=None):
     """Get pool by a given name."""
@@ -34,12 +37,14 @@ def get_pool(name, session=None):
     return pool
 
 
+@deprecated(reason="Use Pool.get_pools() instead", version="2.2.3")
 @provide_session
 def get_pools(session=None):
     """Get all pools."""
     return session.query(Pool).all()
 
 
+@deprecated(reason="Use Pool.create_pool() instead", version="2.2.3")
 @provide_session
 def create_pool(name, slots, description, session=None):
     """Create a pool with a given parameters."""
@@ -70,6 +75,7 @@ def create_pool(name, slots, description, session=None):
     return pool
 
 
+@deprecated(reason="Use Pool.delete_pool() instead", version="2.2.3")
 @provide_session
 def delete_pool(name, session=None):
     """Delete pool by a given name."""
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 38a873c..d526312 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -15,114 +15,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Triggering DAG runs APIs."""
-import json
-from datetime import datetime
-from typing import List, Optional, Union
 
-from airflow.exceptions import DagNotFound, DagRunAlreadyExists
-from airflow.models import DagBag, DagModel, DagRun
-from airflow.utils import timezone
-from airflow.utils.state import State
-from airflow.utils.types import DagRunType
+import warnings
 
+from airflow.api.common.trigger_dag import *  # noqa
 
-def _trigger_dag(
-    dag_id: str,
-    dag_bag: DagBag,
-    run_id: Optional[str] = None,
-    conf: Optional[Union[dict, str]] = None,
-    execution_date: Optional[datetime] = None,
-    replace_microseconds: bool = True,
-) -> List[DagRun]:
-    """Triggers DAG run.
-
-    :param dag_id: DAG ID
-    :param dag_bag: DAG Bag model
-    :param run_id: ID of the dag_run
-    :param conf: configuration
-    :param execution_date: date of execution
-    :param replace_microseconds: whether microseconds should be zeroed
-    :return: list of triggered dags
-    """
-    dag = dag_bag.get_dag(dag_id)  # prefetch dag if it is stored serialized
-
-    if dag_id not in dag_bag.dags:
-        raise DagNotFound(f"Dag id {dag_id} not found")
-
-    execution_date = execution_date if execution_date else timezone.utcnow()
-
-    if not timezone.is_localized(execution_date):
-        raise ValueError("The execution_date should be localized")
-
-    if replace_microseconds:
-        execution_date = execution_date.replace(microsecond=0)
-
-    if dag.default_args and 'start_date' in dag.default_args:
-        min_dag_start_date = dag.default_args["start_date"]
-        if min_dag_start_date and execution_date < min_dag_start_date:
-            raise ValueError(
-                "The execution_date [{}] should be >= start_date [{}] from DAG's default_args".format(
-                    execution_date.isoformat(), min_dag_start_date.isoformat()
-                )
-            )
-
-    run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
-    dag_run = DagRun.find_duplicate(dag_id=dag_id, execution_date=execution_date, run_id=run_id)
-
-    if dag_run:
-        raise DagRunAlreadyExists(
-            f"A Dag Run already exists for dag id {dag_id} at {execution_date} with run id {run_id}"
-        )
-
-    run_conf = None
-    if conf:
-        run_conf = conf if isinstance(conf, dict) else json.loads(conf)
-
-    dag_runs = []
-    dags_to_run = [dag] + dag.subdags
-    for _dag in dags_to_run:
-        dag_run = _dag.create_dagrun(
-            run_id=run_id,
-            execution_date=execution_date,
-            state=State.QUEUED,
-            conf=run_conf,
-            external_trigger=True,
-            dag_hash=dag_bag.dags_hash.get(dag_id),
-        )
-        dag_runs.append(dag_run)
-
-    return dag_runs
-
-
-def trigger_dag(
-    dag_id: str,
-    run_id: Optional[str] = None,
-    conf: Optional[Union[dict, str]] = None,
-    execution_date: Optional[datetime] = None,
-    replace_microseconds: bool = True,
-) -> Optional[DagRun]:
-    """Triggers execution of DAG specified by dag_id
-
-    :param dag_id: DAG ID
-    :param run_id: ID of the dag_run
-    :param conf: configuration
-    :param execution_date: date of execution
-    :param replace_microseconds: whether microseconds should be zeroed
-    :return: first dag run triggered - even if more than one Dag Runs were triggered or None
-    """
-    dag_model = DagModel.get_current(dag_id)
-    if dag_model is None:
-        raise DagNotFound(f"Dag id {dag_id} not found in DagModel")
-
-    dagbag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
-    triggers = _trigger_dag(
-        dag_id=dag_id,
-        dag_bag=dagbag,
-        run_id=run_id,
-        conf=conf,
-        execution_date=execution_date,
-        replace_microseconds=replace_microseconds,
-    )
-
-    return triggers[0] if triggers else None
+warnings.warn(
+    "This module is deprecated. Please use `airflow.api.common.trigger_dag` instead.",
+    DeprecationWarning,
+    stacklevel=2,
+)
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/trigger_dag.py
similarity index 95%
copy from airflow/api/common/experimental/trigger_dag.py
copy to airflow/api/common/trigger_dag.py
index 38a873c..70bbb78 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/trigger_dag.py
@@ -62,9 +62,8 @@ def _trigger_dag(
         min_dag_start_date = dag.default_args["start_date"]
         if min_dag_start_date and execution_date < min_dag_start_date:
             raise ValueError(
-                "The execution_date [{}] should be >= start_date [{}] from DAG's default_args".format(
-                    execution_date.isoformat(), min_dag_start_date.isoformat()
-                )
+                f"The execution_date [{execution_date.isoformat()}] should be >= start_date "
+                f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
             )
 
     run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py
index c164fcc..286b191 100644
--- a/airflow/api_connexion/endpoints/dag_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_endpoint.py
@@ -110,13 +110,10 @@ def patch_dag(session, dag_id, update_mask=None):
 @provide_session
 def delete_dag(dag_id: str, session: Session):
     """Delete the specific DAG."""
-    # TODO: This function is shared with the /delete endpoint used by the web
-    # UI, so we're reusing it to simplify maintenance. Refactor the function to
-    # another place when the experimental/legacy API is removed.
-    from airflow.api.common.experimental import delete_dag
+    from airflow.api.common import delete_dag as delete_dag_module
 
     try:
-        delete_dag.delete_dag(dag_id, session=session)
+        delete_dag_module.delete_dag(dag_id, session=session)
     except DagNotFound:
         raise NotFound(f"Dag with id: '{dag_id}' not found")
     except AirflowException:
diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index 6f217c4..8ae88aa 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -21,11 +21,11 @@ from typing import Dict, Iterable, Optional, Tuple
 from sqlalchemy import Column, Integer, String, Text, func
 from sqlalchemy.orm.session import Session
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, PoolNotFound
 from airflow.models.base import Base
 from airflow.ti_deps.dependencies_states import EXECUTION_STATES
 from airflow.typing_compat import TypedDict
-from airflow.utils.session import provide_session
+from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import nowait, with_row_locks
 from airflow.utils.state import State
 
@@ -57,7 +57,13 @@ class Pool(Base):
 
     @staticmethod
     @provide_session
-    def get_pool(pool_name, session: Session = None):
+    def get_pools(session: Session = NEW_SESSION):
+        """Get all pools."""
+        return session.query(Pool).all()
+
+    @staticmethod
+    @provide_session
+    def get_pool(pool_name: str, session: Session = NEW_SESSION):
         """
         Get the Pool with specific pool name from the Pools.
 
@@ -69,7 +75,7 @@ class Pool(Base):
 
     @staticmethod
     @provide_session
-    def get_default_pool(session: Session = None):
+    def get_default_pool(session: Session = NEW_SESSION):
         """
         Get the Pool of the default_pool from the Pools.
 
@@ -80,10 +86,44 @@ class Pool(Base):
 
     @staticmethod
     @provide_session
+    def create_or_update_pool(name: str, slots: int, description: str, session: Session = NEW_SESSION):
+        """Create a pool with given parameters or update it if it already exists."""
+        if not name:
+            return
+        pool = session.query(Pool).filter_by(pool=name).first()
+        if pool is None:
+            pool = Pool(pool=name, slots=slots, description=description)
+            session.add(pool)
+        else:
+            pool.slots = slots
+            pool.description = description
+
+        session.commit()
+
+        return pool
+
+    @staticmethod
+    @provide_session
+    def delete_pool(name: str, session: Session = NEW_SESSION):
+        """Delete pool by a given name."""
+        if name == Pool.DEFAULT_POOL_NAME:
+            raise AirflowException("default_pool cannot be deleted")
+
+        pool = session.query(Pool).filter_by(pool=name).first()
+        if pool is None:
+            raise PoolNotFound(f"Pool '{name}' doesn't exist")
+
+        session.delete(pool)
+        session.commit()
+
+        return pool
+
+    @staticmethod
+    @provide_session
     def slots_stats(
         *,
         lock_rows: bool = False,
-        session: Session = None,
+        session: Session = NEW_SESSION,
     ) -> Dict[str, PoolStats]:
         """
         Get Pool stats (Number of Running, Queued, Open & Total tasks)
@@ -210,7 +250,7 @@ class Pool(Base):
         )
 
     @provide_session
-    def open_slots(self, session: Session) -> float:
+    def open_slots(self, session: Session = NEW_SESSION) -> float:
         """
         Get the number of slots open at the moment.
 
diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 1e6cb7f..421c796 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -21,7 +21,7 @@ import json
 import time
 from typing import Dict, List, Optional, Union
 
-from airflow.api.common.experimental.trigger_dag import trigger_dag
+from airflow.api.common.trigger_dag import trigger_dag
 from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists
 from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
 from airflow.utils import timezone
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index f35d165..023f482 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -991,3 +991,18 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+def get_sqla_model_classes():
+    """
+    Get all SQLAlchemy class mappers.
+
+    SQLAlchemy < 1.4 does not support registry.mappers so we use
+    try/except to handle it.
+    """
+    from airflow.models.base import Base
+
+    try:
+        return [mapper.class_ for mapper in Base.registry.mappers]
+    except AttributeError:
+        return Base._decl_class_registry.values()
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 2182a17..f2642a7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1607,7 +1607,7 @@ class Airflow(AirflowBaseView):
     @action_logging
     def delete(self):
         """Deletes DAG."""
-        from airflow.api.common.experimental import delete_dag
+        from airflow.api.common import delete_dag
         from airflow.exceptions import DagNotFound
 
         dag_id = request.values.get('dag_id')
diff --git a/setup.cfg b/setup.cfg
index b83ef9b..c3cce1c 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -95,6 +95,7 @@ install_requires =
     croniter>=0.3.17
     cryptography>=0.9.3
     dataclasses;python_version<"3.7"
+    deprecated>=1.2.13
     dill>=0.2.2, <0.4
     # Sphinx RTD theme 0.5.2. introduced limitation to docutils to account for some docutils markup
     # change:
diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py
index a2af8ca..9f574e4 100644
--- a/tests/api/client/test_local_client.py
+++ b/tests/api/client/test_local_client.py
@@ -17,6 +17,8 @@
 # under the License.
 
 import json
+import random
+import string
 import unittest
 from unittest.mock import ANY, patch
 
@@ -25,7 +27,7 @@ from freezegun import freeze_time
 
 from airflow.api.client.local_client import Client
 from airflow.example_dags import example_bash_operator
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowBadRequest, AirflowException, PoolNotFound
 from airflow.models import DAG, DagBag, DagModel, DagRun, Pool
 from airflow.utils import timezone
 from airflow.utils.session import create_session
@@ -133,6 +135,10 @@ class TestLocalClient(unittest.TestCase):
         pool = self.client.get_pool(name='foo')
         assert pool == ('foo', 1, '')
 
+    def test_get_pool_non_existing_raises(self):
+        with pytest.raises(PoolNotFound):
+            self.client.get_pool(name='foo')
+
     def test_get_pools(self):
         self.client.create_pool(name='foo1', slots=1, description='')
         self.client.create_pool(name='foo2', slots=2, description='')
@@ -145,6 +151,26 @@ class TestLocalClient(unittest.TestCase):
         with create_session() as session:
             assert session.query(Pool).count() == 2
 
+    def test_create_pool_bad_slots(self):
+        with pytest.raises(AirflowBadRequest, match="^Bad value for `slots`: foo$"):
+            self.client.create_pool(
+                name='foo',
+                slots='foo',
+                description='',
+            )
+
+    def test_create_pool_name_too_long(self):
+        long_name = ''.join(random.choices(string.ascii_lowercase, k=300))
+        pool_name_length = Pool.pool.property.columns[0].type.length
+        with pytest.raises(
+            AirflowBadRequest, match=f"^pool name cannot be more than {pool_name_length} characters"
+        ):
+            self.client.create_pool(
+                name=long_name,
+                slots=5,
+                description='',
+            )
+
     def test_delete_pool(self):
         self.client.create_pool(name='foo', slots=1, description='')
         with create_session() as session:
@@ -152,3 +178,6 @@ class TestLocalClient(unittest.TestCase):
         self.client.delete_pool(name='foo')
         with create_session() as session:
             assert session.query(Pool).count() == 1
+        for name in ('', '    '):
+            with pytest.raises(PoolNotFound, match=f"^Pool {name!r} doesn't exist$"):
+                Pool.delete_pool(name=name)
diff --git a/tests/api/common/experimental/test_delete_dag.py b/tests/api/common/test_delete_dag.py
similarity index 99%
rename from tests/api/common/experimental/test_delete_dag.py
rename to tests/api/common/test_delete_dag.py
index 5984cd2..0eb058a 100644
--- a/tests/api/common/experimental/test_delete_dag.py
+++ b/tests/api/common/test_delete_dag.py
@@ -20,7 +20,7 @@
 import pytest
 
 from airflow import models
-from airflow.api.common.experimental.delete_dag import delete_dag
+from airflow.api.common.delete_dag import delete_dag
 from airflow.exceptions import AirflowException, DagNotFound
 from airflow.operators.dummy import DummyOperator
 from airflow.utils.dates import days_ago
diff --git a/tests/api/common/experimental/test_trigger_dag.py b/tests/api/common/test_trigger_dag.py
similarity index 93%
rename from tests/api/common/experimental/test_trigger_dag.py
rename to tests/api/common/test_trigger_dag.py
index 2f16446..f79d413 100644
--- a/tests/api/common/experimental/test_trigger_dag.py
+++ b/tests/api/common/test_trigger_dag.py
@@ -22,7 +22,7 @@ from unittest import mock
 import pytest
 from parameterized import parameterized
 
-from airflow.api.common.experimental.trigger_dag import _trigger_dag
+from airflow.api.common.trigger_dag import _trigger_dag
 from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun
 from airflow.utils import timezone
@@ -42,7 +42,7 @@ class TestTriggerDag(unittest.TestCase):
         with pytest.raises(AirflowException):
             _trigger_dag('dag_not_found', dag_bag_mock)
 
-    @mock.patch('airflow.api.common.experimental.trigger_dag.DagRun', spec=DagRun)
+    @mock.patch('airflow.api.common.trigger_dag.DagRun', spec=DagRun)
     @mock.patch('airflow.models.DagBag')
     def test_trigger_dag_dag_run_exist(self, dag_bag_mock, dag_run_mock):
         dag_id = "dag_run_exist"
@@ -54,7 +54,7 @@ class TestTriggerDag(unittest.TestCase):
             _trigger_dag(dag_id, dag_bag_mock)
 
     @mock.patch('airflow.models.DAG')
-    @mock.patch('airflow.api.common.experimental.trigger_dag.DagRun', spec=DagRun)
+    @mock.patch('airflow.api.common.trigger_dag.DagRun', spec=DagRun)
     @mock.patch('airflow.models.DagBag')
     def test_trigger_dag_include_subdags(self, dag_bag_mock, dag_run_mock, dag_mock):
         dag_id = "trigger_dag"
@@ -70,7 +70,7 @@ class TestTriggerDag(unittest.TestCase):
         assert 3 == len(triggers)
 
     @mock.patch('airflow.models.DAG')
-    @mock.patch('airflow.api.common.experimental.trigger_dag.DagRun', spec=DagRun)
+    @mock.patch('airflow.api.common.trigger_dag.DagRun', spec=DagRun)
     @mock.patch('airflow.models.DagBag')
     def test_trigger_dag_include_nested_subdags(self, dag_bag_mock, dag_run_mock, dag_mock):
         dag_id = "trigger_dag"
diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py
index 00fe140..95e585e 100644
--- a/tests/models/test_pool.py
+++ b/tests/models/test_pool.py
@@ -16,11 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import pytest
+
 from airflow import settings
+from airflow.exceptions import AirflowException, PoolNotFound
 from airflow.models.pool import Pool
 from airflow.models.taskinstance import TaskInstance as TI
 from airflow.operators.dummy import DummyOperator
 from airflow.utils import timezone
+from airflow.utils.session import create_session
 from airflow.utils.state import State
 from tests.test_utils.db import clear_db_dags, clear_db_pools, clear_db_runs, set_default_pool_slots
 
@@ -28,6 +32,10 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 
 class TestPool:
+
+    USER_POOL_COUNT = 2
+    TOTAL_POOL_COUNT = USER_POOL_COUNT + 1  # including default_pool
+
     @staticmethod
     def clean_db():
         clear_db_dags()
@@ -36,6 +44,20 @@ class TestPool:
 
     def setup_method(self):
         self.clean_db()
+        self.pools = []
+
+    def add_pools(self):
+        self.pools = [Pool.get_default_pool()]
+        for i in range(self.USER_POOL_COUNT):
+            name = f'experimental_{i + 1}'
+            pool = Pool(
+                pool=name,
+                slots=i,
+                description=name,
+            )
+            self.pools.append(pool)
+        with create_session() as session:
+            session.add_all(self.pools)
 
     def teardown_method(self):
         self.clean_db()
@@ -149,3 +171,52 @@ class TestPool:
                 "running": 1,
             }
         } == Pool.slots_stats()
+
+    def test_get_pool(self):
+        self.add_pools()
+        pool = Pool.get_pool(pool_name=self.pools[0].pool)
+        assert pool.pool == self.pools[0].pool
+
+    def test_get_pool_non_existing(self):
+        self.add_pools()
+        assert not Pool.get_pool(pool_name='test')
+
+    def test_get_pool_bad_name(self):
+        for name in ('', '    '):
+            assert not Pool.get_pool(pool_name=name)
+
+    def test_get_pools(self):
+        self.add_pools()
+        pools = sorted(Pool.get_pools(), key=lambda p: p.pool)
+        assert pools[0].pool == self.pools[0].pool
+        assert pools[1].pool == self.pools[1].pool
+
+    def test_create_pool(self, session):
+        self.add_pools()
+        pool = Pool.create_or_update_pool(name='foo', slots=5, description='')
+        assert pool.pool == 'foo'
+        assert pool.slots == 5
+        assert pool.description == ''
+        assert session.query(Pool).count() == self.TOTAL_POOL_COUNT + 1
+
+    def test_create_pool_existing(self, session):
+        self.add_pools()
+        pool = Pool.create_or_update_pool(name=self.pools[0].pool, slots=5, description='')
+        assert pool.pool == self.pools[0].pool
+        assert pool.slots == 5
+        assert pool.description == ''
+        assert session.query(Pool).count() == self.TOTAL_POOL_COUNT
+
+    def test_delete_pool(self, session):
+        self.add_pools()
+        pool = Pool.delete_pool(name=self.pools[-1].pool)
+        assert pool.pool == self.pools[-1].pool
+        assert session.query(Pool).count() == self.TOTAL_POOL_COUNT - 1
+
+    def test_delete_pool_non_existing(self):
+        with pytest.raises(PoolNotFound, match="^Pool 'test' doesn't exist$"):
+            Pool.delete_pool(name='test')
+
+    def test_delete_default_pool_not_allowed(self):
+        with pytest.raises(AirflowException, match="^default_pool cannot be deleted$"):
+            Pool.delete_pool(Pool.DEFAULT_POOL_NAME)

[airflow] 36/43: Add a session backend to store session data in the database (#21478)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1c2909f8d69ade70803f10653e4845319ae99c0e
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Feb 15 10:57:46 2022 -0700

    Add a session backend to store session data in the database (#21478)
    
    Co-authored-by: Jed Cunningham <je...@apache.org>
    (cherry picked from commit da9d0863c7ff121c111a455708163b026943bdf1)
---
 airflow/config_templates/config.yml                |  7 +++
 airflow/config_templates/default_airflow.cfg       |  4 ++
 .../c381b21cb7e4_add_session_table_to_db.py        | 54 ++++++++++++++++++
 airflow/utils/db.py                                |  3 +
 airflow/www/app.py                                 |  3 +-
 airflow/www/extensions/init_session.py             | 63 ++++++++++++---------
 .../www/{extensions/init_session.py => session.py} | 29 ++++------
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 docs/spelling_wordlist.txt                         |  1 +
 setup.cfg                                          |  3 +
 tests/api_connexion/conftest.py                    |  7 ++-
 tests/api_connexion/test_security.py               |  4 ++
 tests/test_utils/decorators.py                     |  2 +-
 tests/utils/test_db.py                             |  3 +
 tests/www/views/conftest.py                        |  1 +
 tests/www/views/test_session.py                    | 65 ++++++++++++++++++++++
 16 files changed, 205 insertions(+), 48 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 6941f03..1e77041 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -999,6 +999,13 @@
       type: string
       example: ~
       default: ""
+    - name: session_backend
+      description: |
+        The type of backend used to store web session data, can be 'database' or 'securecookie'
+      version_added: 2.2.4
+      type: string
+      example: securecookie
+      default: database
     - name: web_server_master_timeout
       description: |
         Number of seconds the webserver waits before killing gunicorn master that doesn't respond
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 6a5449b..826eaf4 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -516,6 +516,10 @@ web_server_ssl_cert =
 # provided SSL will be enabled. This does not change the web server port.
 web_server_ssl_key =
 
+# The type of backend used to store web session data, can be 'database' or 'securecookie'
+# Example: session_backend = securecookie
+session_backend = database
+
 # Number of seconds the webserver waits before killing gunicorn master that doesn't respond
 web_server_master_timeout = 120
 
diff --git a/airflow/migrations/versions/c381b21cb7e4_add_session_table_to_db.py b/airflow/migrations/versions/c381b21cb7e4_add_session_table_to_db.py
new file mode 100644
index 0000000..cc6b9ab
--- /dev/null
+++ b/airflow/migrations/versions/c381b21cb7e4_add_session_table_to_db.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add session table to db
+
+Revision ID: c381b21cb7e4
+Revises: be2bfac3da23
+Create Date: 2022-01-25 13:56:35.069429
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = 'c381b21cb7e4'
+down_revision = 'be2bfac3da23'
+branch_labels = None
+depends_on = None
+
+TABLE_NAME = 'session'
+
+
+def upgrade():
+    """Apply add session table to db"""
+    op.create_table(
+        TABLE_NAME,
+        sa.Column('id', sa.Integer()),
+        sa.Column('session_id', sa.String(255)),
+        sa.Column('data', sa.LargeBinary()),
+        sa.Column('expiry', sa.DateTime()),
+        sa.PrimaryKeyConstraint('id'),
+        sa.UniqueConstraint('session_id'),
+    )
+
+
+def downgrade():
+    """Unapply add session table to db"""
+    op.drop_table(TABLE_NAME)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 023f482..c038d66 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -954,9 +954,12 @@ def drop_airflow_models(connection):
     users.drop(settings.engine, checkfirst=True)
     dag_stats = Table('dag_stats', Base.metadata)
     dag_stats.drop(settings.engine, checkfirst=True)
+    session = Table('session', Base.metadata)
+    session.drop(settings.engine, checkfirst=True)
 
     Base.metadata.drop_all(connection)
     # we remove the Tables here so that if resetdb is run metadata does not keep the old tables.
+    Base.metadata.remove(session)
     Base.metadata.remove(dag_stats)
     Base.metadata.remove(users)
     Base.metadata.remove(user)
diff --git a/airflow/www/app.py b/airflow/www/app.py
index 2de041b..16780cb 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -36,7 +36,7 @@ from airflow.www.extensions.init_jinja_globals import init_jinja_globals
 from airflow.www.extensions.init_manifest_files import configure_manifest_files
 from airflow.www.extensions.init_robots import init_robots
 from airflow.www.extensions.init_security import init_api_experimental_auth, init_xframe_protection
-from airflow.www.extensions.init_session import init_airflow_session_interface, init_permanent_session
+from airflow.www.extensions.init_session import init_airflow_session_interface
 from airflow.www.extensions.init_views import (
     init_api_connexion,
     init_api_experimental,
@@ -135,7 +135,6 @@ def create_app(config=None, testing=False):
 
         init_jinja_globals(flask_app)
         init_xframe_protection(flask_app)
-        init_permanent_session(flask_app)
         init_airflow_session_interface(flask_app)
     return flask_app
 
diff --git a/airflow/www/extensions/init_session.py b/airflow/www/extensions/init_session.py
index 06e0ba5..7a09de7 100644
--- a/airflow/www/extensions/init_session.py
+++ b/airflow/www/extensions/init_session.py
@@ -15,33 +15,46 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from flask import request, session as flask_session
-from flask.sessions import SecureCookieSessionInterface
+from flask import session as builtin_flask_session
 
-
-class AirflowSessionInterface(SecureCookieSessionInterface):
-    """
-    Airflow cookie session interface.
-    Modifications of sessions should be done here because
-    the change here is global.
-    """
-
-    def save_session(self, *args, **kwargs):
-        """Prevent creating session from REST API requests."""
-        if request.blueprint == '/api/v1':
-            return None
-        return super().save_session(*args, **kwargs)
-
-
-def init_permanent_session(app):
-    """Make session permanent to allows us to store data"""
-
-    def make_session_permanent():
-        flask_session.permanent = True
-
-    app.before_request(make_session_permanent)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.www.session import AirflowDatabaseSessionInterface, AirflowSecureCookieSessionInterface
 
 
 def init_airflow_session_interface(app):
     """Set airflow session interface"""
-    app.session_interface = AirflowSessionInterface()
+    config = app.config.copy()
+    selected_backend = conf.get('webserver', 'SESSION_BACKEND')
+    # A bit of a misnomer - normally cookies expire whenever the browser is closed
+    # or when they hit their expiry datetime, whichever comes first. "Permanent"
+    # cookies only expire when they hit their expiry datetime, and can outlive
+    # the browser being closed.
+    permanent_cookie = config.get('SESSION_PERMANENT', True)
+
+    if selected_backend == 'securecookie':
+        app.session_interface = AirflowSecureCookieSessionInterface()
+        if permanent_cookie:
+
+            def make_session_permanent():
+                builtin_flask_session.permanent = True
+
+            app.before_request(make_session_permanent)
+    elif selected_backend == 'database':
+        app.session_interface = AirflowDatabaseSessionInterface(
+            app=app,
+            db=None,
+            permanent=permanent_cookie,
+            # Typically these would be configurable with Flask-Session,
+            # but we will set them explicitly instead as they don't make
+            # sense to have configurable in Airflow's use case
+            table='session',
+            key_prefix='',
+            use_signer=True,
+        )
+    else:
+        raise AirflowConfigException(
+            "Unrecognized session backend specified in "
+            f"web_server_session_backend: '{selected_backend}'. Please set "
+            "this to either 'database' or 'securecookie'."
+        )
diff --git a/airflow/www/extensions/init_session.py b/airflow/www/session.py
similarity index 59%
copy from airflow/www/extensions/init_session.py
copy to airflow/www/session.py
index 06e0ba5..4092565 100644
--- a/airflow/www/extensions/init_session.py
+++ b/airflow/www/session.py
@@ -15,33 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from flask import request, session as flask_session
+from flask import request
 from flask.sessions import SecureCookieSessionInterface
+from flask_session.sessions import SqlAlchemySessionInterface
 
 
-class AirflowSessionInterface(SecureCookieSessionInterface):
-    """
-    Airflow cookie session interface.
-    Modifications of sessions should be done here because
-    the change here is global.
-    """
+class SesssionExemptMixin:
+    """Exempt certain blueprints/paths from autogenerated sessions"""
 
     def save_session(self, *args, **kwargs):
-        """Prevent creating session from REST API requests."""
+        """Prevent creating session from REST API and health requests."""
         if request.blueprint == '/api/v1':
             return None
+        if request.path == '/health':
+            return None
         return super().save_session(*args, **kwargs)
 
 
-def init_permanent_session(app):
-    """Make session permanent to allows us to store data"""
-
-    def make_session_permanent():
-        flask_session.permanent = True
-
-    app.before_request(make_session_permanent)
+class AirflowDatabaseSessionInterface(SesssionExemptMixin, SqlAlchemySessionInterface):
+    """Session interface that exempts some routes and stores session data in the database"""
 
 
-def init_airflow_session_interface(app):
-    """Set airflow session interface"""
-    app.session_interface = AirflowSessionInterface()
+class AirflowSecureCookieSessionInterface(SesssionExemptMixin, SecureCookieSessionInterface):
+    """Session interface that exempts some routes and stores session data in a signed cookie"""
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 016c624..8dc1a55 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``be2bfac3da23`` (head)        | ``7b2661a43ba3`` | ``2.2.3``       | Add has_import_errors column to DagModel                                              |
+| ``c381b21cb7e4`` (head)        | ``be2bfac3da23`` | ``2.2.4``       | Create a ``session`` table to store web session data                                  |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``be2bfac3da23``               | ``7b2661a43ba3`` | ``2.2.3``       | Add has_import_errors column to DagModel                                              |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``7b2661a43ba3``               | ``142555e44c17`` | ``2.2.0``       | Change ``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id.  |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 5d77e29..ed114b6 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1222,6 +1222,7 @@ sdk
 secretRef
 secretRefs
 securable
+securecookie
 securityManager
 seealso
 seedlist
diff --git a/setup.cfg b/setup.cfg
index 7ab5c77..8e36d06 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -107,6 +107,9 @@ install_requires =
     flask-appbuilder>=3.3.4, <4.0.0
     flask-caching>=1.5.0, <2.0.0
     flask-login>=0.3, <0.5
+    # Strict upper-bound on the latest release of flask-session,
+    # as any schema changes will require a migration.
+    flask-session>=0.3.1, <=0.4.0
     flask-wtf>=0.14.3, <0.15
     graphviz>=0.12
     gunicorn>=20.1.0
diff --git a/tests/api_connexion/conftest.py b/tests/api_connexion/conftest.py
index cc92733..9b37b52 100644
--- a/tests/api_connexion/conftest.py
+++ b/tests/api_connexion/conftest.py
@@ -25,7 +25,12 @@ from tests.test_utils.decorators import dont_initialize_flask_app_submodules
 @pytest.fixture(scope="session")
 def minimal_app_for_api():
     @dont_initialize_flask_app_submodules(
-        skip_all_except=["init_appbuilder", "init_api_experimental_auth", "init_api_connexion"]
+        skip_all_except=[
+            "init_appbuilder",
+            "init_api_experimental_auth",
+            "init_api_connexion",
+            "init_airflow_session_interface",
+        ]
     )
     def factory():
         with conf_vars({("api", "auth_backend"): "tests.test_utils.remote_user_api_auth_backend"}):
diff --git a/tests/api_connexion/test_security.py b/tests/api_connexion/test_security.py
index 244a8a2..68f6d31 100644
--- a/tests/api_connexion/test_security.py
+++ b/tests/api_connexion/test_security.py
@@ -45,3 +45,7 @@ class TestSession:
     def test_session_not_created_on_api_request(self):
         self.client.get("api/v1/dags", environ_overrides={'REMOTE_USER': "test"})
         assert all(cookie.name != "session" for cookie in self.client.cookie_jar)
+
+    def test_session_not_created_on_health_endpoint_request(self):
+        self.client.get("health")
+        assert all(cookie.name != "session" for cookie in self.client.cookie_jar)
diff --git a/tests/test_utils/decorators.py b/tests/test_utils/decorators.py
index d08d159..949df63 100644
--- a/tests/test_utils/decorators.py
+++ b/tests/test_utils/decorators.py
@@ -42,7 +42,7 @@ def dont_initialize_flask_app_submodules(_func=None, *, skip_all_except=None):
             "sync_appbuilder_roles",
             "init_jinja_globals",
             "init_xframe_protection",
-            "init_permanent_session",
+            "init_airflow_session_interface",
             "init_appbuilder",
         ]
 
diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py
index 601dc6f..27fa67b 100644
--- a/tests/utils/test_db.py
+++ b/tests/utils/test_db.py
@@ -74,6 +74,9 @@ class TestDb(unittest.TestCase):
             lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_usg'),
             lambda t: (t[0] == 'remove_table' and t[1].name == 'MSreplication_options'),
             lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_dev'),
+            # Ignore flask-session table/index
+            lambda t: (t[0] == 'remove_table' and t[1].name == 'session'),
+            lambda t: (t[0] == 'remove_index' and t[1].name == 'session_id'),
         ]
         for ignore in ignores:
             diff = [d for d in diff if not ignore(d)]
diff --git a/tests/www/views/conftest.py b/tests/www/views/conftest.py
index 05fe1e4..f95a814 100644
--- a/tests/www/views/conftest.py
+++ b/tests/www/views/conftest.py
@@ -55,6 +55,7 @@ def app(examples_dag_bag):
             "init_flash_views",
             "init_jinja_globals",
             "init_plugins",
+            "init_airflow_session_interface",
         ]
     )
     def factory():
diff --git a/tests/www/views/test_session.py b/tests/www/views/test_session.py
new file mode 100644
index 0000000..9fb6f36
--- /dev/null
+++ b/tests/www/views/test_session.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from airflow.exceptions import AirflowConfigException
+from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.decorators import dont_initialize_flask_app_submodules
+
+
+def test_session_cookie_created_on_login(user_client):
+    assert any(cookie.name == 'session' for cookie in user_client.cookie_jar)
+
+
+def test_session_inaccessible_after_logout(user_client):
+    session_cookie = next((cookie for cookie in user_client.cookie_jar if cookie.name == 'session'), None)
+    assert session_cookie is not None
+
+    resp = user_client.get('/logout/')
+    assert resp.status_code == 302
+
+    # Try to access /home with the session cookie from earlier
+    user_client.set_cookie('session', session_cookie.value)
+    user_client.get('/home/')
+    assert resp.status_code == 302
+
+
+def test_invalid_session_backend_option():
+    @dont_initialize_flask_app_submodules(
+        skip_all_except=[
+            "init_api_connexion",
+            "init_appbuilder",
+            "init_appbuilder_links",
+            "init_appbuilder_views",
+            "init_flash_views",
+            "init_jinja_globals",
+            "init_plugins",
+            "init_airflow_session_interface",
+        ]
+    )
+    def poorly_configured_app_factory():
+        with conf_vars({("webserver", "session_backend"): "invalid_value_for_session_backend"}):
+            return app.create_app(testing=True)
+
+    expected_exc_regex = (
+        "^Unrecognized session backend specified in web_server_session_backend: "
+        r"'invalid_value_for_session_backend'\. Please set this to .+\.$"
+    )
+    with pytest.raises(AirflowConfigException, match=expected_exc_regex):
+        poorly_configured_app_factory()

[airflow] 35/43: Show task status only for running dags or only for the last finished dag (#21352)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f25a58eebeb9ad3283fca2daa6666811f1a036c6
Author: Aleksey Kirilishin <54...@users.noreply.github.com>
AuthorDate: Mon Feb 14 18:55:00 2022 +0300

    Show task status only for running dags or only for the last finished dag (#21352)
    
    * Show task status only for running dags or only for the last finished dag
    
    * Brought the logic of getting task statistics into a separate function
    
    (cherry picked from commit 28d7bde2750c38300e5cf70ba32be153b1a11f2c)
---
 airflow/www/views.py          | 64 ++++++++++++++++++++++++++++++++++---------
 tests/www/views/test_views.py | 35 ++++++++++++++++++++++-
 2 files changed, 85 insertions(+), 14 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 2ed2a67..9ebe899 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -408,6 +408,31 @@ def dag_edges(dag):
     return result
 
 
+def get_task_stats_from_query(qry):
+    """
+    Return a dict of the task quantity, grouped by dag id and task status.
+
+    :param qry: The data in the format (<dag id>, <task state>, <is dag running>, <task count>),
+        ordered by <dag id> and <is dag running>
+    """
+    data = {}
+    last_dag_id = None
+    has_running_dags = False
+    for dag_id, state, is_dag_running, count in qry:
+        if last_dag_id != dag_id:
+            last_dag_id = dag_id
+            has_running_dags = False
+        elif not is_dag_running and has_running_dags:
+            continue
+
+        if is_dag_running:
+            has_running_dags = True
+        if dag_id not in data:
+            data[dag_id] = {}
+        data[dag_id][state] = count
+    return data
+
+
 ######################################################################################
 #                                    Error handlers
 ######################################################################################
@@ -814,7 +839,9 @@ class Airflow(AirflowBaseView):
 
         # Select all task_instances from active dag_runs.
         running_task_instance_query_result = session.query(
-            TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')
+            TaskInstance.dag_id.label('dag_id'),
+            TaskInstance.state.label('state'),
+            sqla.literal(True).label('is_dag_running'),
         ).join(
             running_dag_run_query_result,
             and_(
@@ -838,7 +865,11 @@ class Airflow(AirflowBaseView):
             # Select all task_instances from active dag_runs.
             # If no dag_run is active, return task instances from most recent dag_run.
             last_task_instance_query_result = (
-                session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state'))
+                session.query(
+                    TaskInstance.dag_id.label('dag_id'),
+                    TaskInstance.state.label('state'),
+                    sqla.literal(False).label('is_dag_running'),
+                )
                 .join(TaskInstance.dag_run)
                 .join(
                     last_dag_run,
@@ -855,18 +886,25 @@ class Airflow(AirflowBaseView):
         else:
             final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti')
 
-        qry = session.query(
-            final_task_instance_query_result.c.dag_id,
-            final_task_instance_query_result.c.state,
-            sqla.func.count(),
-        ).group_by(final_task_instance_query_result.c.dag_id, final_task_instance_query_result.c.state)
-
-        data = {}
-        for dag_id, state, count in qry:
-            if dag_id not in data:
-                data[dag_id] = {}
-            data[dag_id][state] = count
+        qry = (
+            session.query(
+                final_task_instance_query_result.c.dag_id,
+                final_task_instance_query_result.c.state,
+                final_task_instance_query_result.c.is_dag_running,
+                sqla.func.count(),
+            )
+            .group_by(
+                final_task_instance_query_result.c.dag_id,
+                final_task_instance_query_result.c.state,
+                final_task_instance_query_result.c.is_dag_running,
+            )
+            .order_by(
+                final_task_instance_query_result.c.dag_id,
+                final_task_instance_query_result.c.is_dag_running.desc(),
+            )
+        )
 
+        data = get_task_stats_from_query(qry)
         payload = {}
         for dag_id in filter_dag_ids:
             payload[dag_id] = []
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index b98c1bc..672d4a1 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -24,7 +24,13 @@ import pytest
 from airflow.configuration import initialize_config
 from airflow.plugins_manager import AirflowPlugin, EntryPointSource
 from airflow.www import views
-from airflow.www.views import get_key_paths, get_safe_url, get_value_from_path, truncate_task_duration
+from airflow.www.views import (
+    get_key_paths,
+    get_safe_url,
+    get_task_stats_from_query,
+    get_value_from_path,
+    truncate_task_duration,
+)
 from tests.test_utils.config import conf_vars
 from tests.test_utils.mock_plugins import mock_plugin_manager
 from tests.test_utils.www import check_content_in_response, check_content_not_in_response
@@ -333,3 +339,30 @@ def test_dag_edit_privileged_requires_view_has_action_decorators(cls: type):
     action_funcs = action_funcs - {"action_post"}
     for action_function in action_funcs:
         assert_decorator_used(cls, action_function, views.action_has_dag_edit_access)
+
+
+def test_get_task_stats_from_query():
+    query_data = [
+        ['dag1', 'queued', True, 1],
+        ['dag1', 'running', True, 2],
+        ['dag1', 'success', False, 3],
+        ['dag2', 'running', True, 4],
+        ['dag2', 'success', True, 5],
+        ['dag3', 'success', False, 6],
+    ]
+    expected_data = {
+        'dag1': {
+            'queued': 1,
+            'running': 2,
+        },
+        'dag2': {
+            'running': 4,
+            'success': 5,
+        },
+        'dag3': {
+            'success': 6,
+        },
+    }
+
+    data = get_task_stats_from_query(query_data)
+    assert data == expected_data

[airflow] 08/43: Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit dda8f4356525041c5200c42d00e5dc05fd79c54b
Author: SeonghwanLee <50...@users.noreply.github.com>
AuthorDate: Thu Jan 27 14:36:24 2022 +0900

    Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)
    
    Co-authored-by: uplsh <up...@linecorp.com>
    (cherry picked from commit d97e2bac854f9891eb47f0c06c261e89723038ca)
---
 airflow/cli/commands/dag_command.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index e04bc73..6e8e157 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -47,7 +47,7 @@ from airflow.utils.cli import (
 )
 from airflow.utils.dot_renderer import render_dag
 from airflow.utils.session import create_session, provide_session
-from airflow.utils.state import State
+from airflow.utils.state import DagRunState
 
 
 @cli_utils.action_logging
@@ -105,7 +105,7 @@ def dag_backfill(args, dag=None):
                 end_date=args.end_date,
                 confirm_prompt=not args.yes,
                 include_subdags=True,
-                dag_run_state=State.NONE,
+                dag_run_state=DagRunState.QUEUED,
             )
 
         try:

[airflow] 02/43: name mismatch (#21055)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a670f8c340b9e5a21f349c8dc5a7b9ff38579df9
Author: caxefaizan <63...@users.noreply.github.com>
AuthorDate: Mon Jan 24 16:17:23 2022 +0530

    name mismatch (#21055)
    
    (cherry picked from commit 4fb005ec122a1c0091db0083c2fe4305473abb49)
---
 .../kubernetes/pod_template_file_examples/dags_in_volume_template.yaml  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
index 389fe37..cc46149 100644
--- a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
+++ b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
@@ -63,7 +63,7 @@ spec:
     fsGroup: 50000
   serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
   volumes:
-    - name: dags
+    - name: airflow-dags
       persistentVolumeClaim:
         claimName: RELEASE-NAME-dags
     - emptyDir: {}

[airflow] 15/43: bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 55a4abbe1631f34325327d1494f1faaaa0c7e359
Author: Đặng Minh Dũng <du...@live.com>
AuthorDate: Wed Jan 5 14:42:57 2022 +0700

    bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)
    
    (cherry picked from commit 64c0bd50155dfdb84671ac35d645b812fafa78a1)
---
 airflow/api/common/experimental/mark_tasks.py | 121 ++++++++++++++++++--------
 1 file changed, 85 insertions(+), 36 deletions(-)

diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py
index 28e733d..4131cb5 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -17,23 +17,27 @@
 # under the License.
 """Marks tasks APIs."""
 
-import datetime
-from typing import Iterable
+from datetime import datetime
+from typing import Generator, Iterable, List, Optional
 
-from sqlalchemy import or_
 from sqlalchemy.orm import contains_eager
+from sqlalchemy.orm.session import Session as SASession
+from sqlalchemy.sql.expression import or_
 
+from airflow import DAG
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.operators.subdag import SubDagOperator
 from airflow.utils import timezone
-from airflow.utils.session import provide_session
-from airflow.utils.state import State
+from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.types import DagRunType
 
 
-def _create_dagruns(dag, execution_dates, state, run_type):
+def _create_dagruns(
+    dag: DAG, execution_dates: List[datetime], state: TaskInstanceState, run_type: DagRunType
+) -> List[DagRun]:
     """
     Infers from the dates which dag runs need to be created and does so.
 
@@ -63,15 +67,15 @@ def _create_dagruns(dag, execution_dates, state, run_type):
 @provide_session
 def set_state(
     tasks: Iterable[BaseOperator],
-    execution_date: datetime.datetime,
+    execution_date: datetime,
     upstream: bool = False,
     downstream: bool = False,
     future: bool = False,
     past: bool = False,
     state: str = State.SUCCESS,
     commit: bool = False,
-    session=None,
-):
+    session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
     """
     Set the state of a task instance and if needed its relatives. Can set state
     for future tasks (calculated from execution_date) and retroactively
@@ -134,7 +138,9 @@ def set_state(
     return tis_altered
 
 
-def all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates):
+def all_subdag_tasks_query(
+    sub_dag_run_ids: List[str], session: SASession, state: TaskInstanceState, confirmed_dates: List[datetime]
+):
     """Get *all* tasks of the sub dags"""
     qry_sub_dag = (
         session.query(TaskInstance)
@@ -144,7 +150,13 @@ def all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates):
     return qry_sub_dag
 
 
-def get_all_dag_task_query(dag, session, state, task_ids, confirmed_dates):
+def get_all_dag_task_query(
+    dag: DAG,
+    session: SASession,
+    state: TaskInstanceState,
+    task_ids: List[str],
+    confirmed_dates: List[datetime],
+):
     """Get all tasks of the main dag that will be affected by a state change"""
     qry_dag = (
         session.query(TaskInstance)
@@ -160,7 +172,14 @@ def get_all_dag_task_query(dag, session, state, task_ids, confirmed_dates):
     return qry_dag
 
 
-def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
+def get_subdag_runs(
+    dag: DAG,
+    session: SASession,
+    state: TaskInstanceState,
+    task_ids: List[str],
+    commit: bool,
+    confirmed_dates: List[datetime],
+) -> List[str]:
     """Go through subdag operators and create dag runs. We will only work
     within the scope of the subdag. We won't propagate to the parent dag,
     but we will propagate from parent to subdag.
@@ -181,7 +200,7 @@ def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
                 dag_runs = _create_dagruns(
                     current_task.subdag,
                     execution_dates=confirmed_dates,
-                    state=State.RUNNING,
+                    state=TaskInstanceState.RUNNING,
                     run_type=DagRunType.BACKFILL_JOB,
                 )
 
@@ -192,7 +211,13 @@ def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
     return sub_dag_ids
 
 
-def verify_dagruns(dag_runs, commit, state, session, current_task):
+def verify_dagruns(
+    dag_runs: List[DagRun],
+    commit: bool,
+    state: TaskInstanceState,
+    session: SASession,
+    current_task: BaseOperator,
+):
     """Verifies integrity of dag_runs.
 
     :param dag_runs: dag runs to verify
@@ -210,7 +235,7 @@ def verify_dagruns(dag_runs, commit, state, session, current_task):
             session.merge(dag_run)
 
 
-def verify_dag_run_integrity(dag, dates):
+def verify_dag_run_integrity(dag: DAG, dates: List[datetime]) -> List[datetime]:
     """
     Verify the integrity of the dag runs in case a task was added or removed
     set the confirmed execution dates as they might be different
@@ -225,7 +250,9 @@ def verify_dag_run_integrity(dag, dates):
     return confirmed_dates
 
 
-def find_task_relatives(tasks, downstream, upstream):
+def find_task_relatives(
+    tasks: Iterable[BaseOperator], downstream: bool, upstream: bool
+) -> Generator[str, None, None]:
     """Yield task ids and optionally ancestor and descendant ids."""
     for task in tasks:
         yield task.task_id
@@ -237,7 +264,7 @@ def find_task_relatives(tasks, downstream, upstream):
                 yield relative.task_id
 
 
-def get_execution_dates(dag, execution_date, future, past):
+def get_execution_dates(dag: DAG, execution_date: datetime, future: bool, past: bool) -> List[datetime]:
     """Returns dates of DAG execution"""
     latest_execution_date = dag.get_latest_execution_date()
     if latest_execution_date is None:
@@ -266,7 +293,9 @@ def get_execution_dates(dag, execution_date, future, past):
 
 
 @provide_session
-def _set_dag_run_state(dag_id, execution_date, state, session=None):
+def _set_dag_run_state(
+    dag_id: str, execution_date: datetime, state: TaskInstanceState, session: SASession = NEW_SESSION
+):
     """
     Helper method that set dag run state in the DB.
 
@@ -279,7 +308,7 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None):
         session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date).one()
     )
     dag_run.state = state
-    if state == State.RUNNING:
+    if state == TaskInstanceState.RUNNING:
         dag_run.start_date = timezone.utcnow()
         dag_run.end_date = None
     else:
@@ -288,7 +317,12 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None):
 
 
 @provide_session
-def set_dag_run_state_to_success(dag, execution_date, commit=False, session=None):
+def set_dag_run_state_to_success(
+    dag: Optional[DAG],
+    execution_date: Optional[datetime],
+    commit: bool = False,
+    session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
     """
     Set the dag run for a specific execution date and its task instances
     to success.
@@ -306,18 +340,27 @@ def set_dag_run_state_to_success(dag, execution_date, commit=False, session=None
 
     # Mark the dag run to success.
     if commit:
-        _set_dag_run_state(dag.dag_id, execution_date, State.SUCCESS, session)
+        _set_dag_run_state(dag.dag_id, execution_date, TaskInstanceState.SUCCESS, session)
 
     # Mark all task instances of the dag run to success.
     for task in dag.tasks:
         task.dag = dag
     return set_state(
-        tasks=dag.tasks, execution_date=execution_date, state=State.SUCCESS, commit=commit, session=session
+        tasks=dag.tasks,
+        execution_date=execution_date,
+        state=TaskInstanceState.SUCCESS,
+        commit=commit,
+        session=session,
     )
 
 
 @provide_session
-def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None):
+def set_dag_run_state_to_failed(
+    dag: Optional[DAG],
+    execution_date: Optional[datetime],
+    commit: bool = False,
+    session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
     """
     Set the dag run for a specific execution date and its running task instances
     to failed.
@@ -335,18 +378,15 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None)
 
     # Mark the dag run to failed.
     if commit:
-        _set_dag_run_state(dag.dag_id, execution_date, State.FAILED, session)
+        _set_dag_run_state(dag.dag_id, execution_date, TaskInstanceState.FAILED, session)
 
-    # Mark only RUNNING task instances.
+    # Mark only running task instances.
     task_ids = [task.task_id for task in dag.tasks]
-    tis = (
-        session.query(TaskInstance)
-        .filter(
-            TaskInstance.dag_id == dag.dag_id,
-            TaskInstance.execution_date == execution_date,
-            TaskInstance.task_id.in_(task_ids),
-        )
-        .filter(TaskInstance.state == State.RUNNING)
+    tis = session.query(TaskInstance).filter(
+        TaskInstance.dag_id == dag.dag_id,
+        TaskInstance.execution_date == execution_date,
+        TaskInstance.task_id.in_(task_ids),
+        TaskInstance.state.in_(State.running),
     )
     task_ids_of_running_tis = [task_instance.task_id for task_instance in tis]
 
@@ -358,12 +398,21 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None)
         tasks.append(task)
 
     return set_state(
-        tasks=tasks, execution_date=execution_date, state=State.FAILED, commit=commit, session=session
+        tasks=tasks,
+        execution_date=execution_date,
+        state=TaskInstanceState.FAILED,
+        commit=commit,
+        session=session,
     )
 
 
 @provide_session
-def set_dag_run_state_to_running(dag, execution_date, commit=False, session=None):
+def set_dag_run_state_to_running(
+    dag: Optional[DAG],
+    execution_date: Optional[datetime],
+    commit: bool = False,
+    session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
     """
     Set the dag run for a specific execution date to running.
 
@@ -380,7 +429,7 @@ def set_dag_run_state_to_running(dag, execution_date, commit=False, session=None
 
     # Mark the dag run to running.
     if commit:
-        _set_dag_run_state(dag.dag_id, execution_date, State.RUNNING, session)
+        _set_dag_run_state(dag.dag_id, execution_date, TaskInstanceState.RUNNING, session)
 
     # To keep the return type consistent with the other similar functions.
     return res

[airflow] 43/43: added explaining concept of logical date in DAG run docs (#21433)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 56d82fc3483f500d7a1da36019888849e8784c12
Author: Howard Yoo <32...@users.noreply.github.com>
AuthorDate: Thu Feb 17 14:01:58 2022 -0600

    added explaining concept of logical date in DAG run docs (#21433)
    
    (cherry picked from commit 752d53860e636ead2be7c3f2044b9b312ba86b95)
---
 docs/apache-airflow/concepts/dags.rst | 16 ++++++++++++++++
 docs/apache-airflow/dag-run.rst       |  2 ++
 docs/apache-airflow/faq.rst           |  7 +++++++
 3 files changed, 25 insertions(+)

diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index 3edaf35..e339abe 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -157,6 +157,8 @@ The ``schedule_interval`` argument takes any value that is a valid `Crontab <htt
     For more information on ``schedule_interval`` values, see :doc:`DAG Run </dag-run>`.
 
     If ``schedule_interval`` is not enough to express the DAG's schedule, see :doc:`Timetables </howto/timetable>`.
+    For more information on ``logical date``, see :ref:`data-interval` and
+    :ref:`faq:what-does-execution-date-mean`.
 
 Every time you run a DAG, you are creating a new instance of that DAG which
 Airflow calls a :doc:`DAG Run </dag-run>`. DAG Runs can run in parallel for the
@@ -177,6 +179,20 @@ In much the same way a DAG instantiates into a DAG Run every time it's run,
 Tasks specified inside a DAG are also instantiated into
 :ref:`Task Instances <concepts:task-instances>` along with it.
 
+A DAG run will have a start date when it starts, and end date when it ends.
+This period describes the time when the DAG actually 'ran.' Aside from the DAG
+run's start and end date, there is another date called *logical date*
+(formally known as execution date), which describes the intended time a
+DAG run is scheduled or triggered. The reason why this is called
+*logical* is because of the abstract nature of it having multiple meanings,
+depending on the context of the DAG run itself.
+
+For example, if a DAG run is manually triggered by the user, its logical date would be the
+date and time of which the DAG run was triggered, and the value should be equal
+to DAG run's start date. However, when the DAG is being automatically scheduled, with certain
+schedule interval put in place, the logical date is going to indicate the time
+at which it marks the start of the data interval, where the DAG run's start
+date would then be the logical date + scheduled interval.
 
 DAG Assignment
 --------------
diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst
index 90bb404..62555b1 100644
--- a/docs/apache-airflow/dag-run.rst
+++ b/docs/apache-airflow/dag-run.rst
@@ -84,6 +84,8 @@ scheduled one interval after ``start_date``.
 
     If ``schedule_interval`` is not enough to express your DAG's schedule,
     logical date, or data interval, see :doc:`/concepts/timetable`.
+    For more information on ``logical date``, see :ref:`concepts:dag-run` and
+    :ref:`faq:what-does-execution-date-mean`
 
 Re-run DAG
 ''''''''''
diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst
index 857e685..7f72a0d 100644
--- a/docs/apache-airflow/faq.rst
+++ b/docs/apache-airflow/faq.rst
@@ -214,6 +214,8 @@ This allows for a backfill on tasks that have ``depends_on_past=True`` to
 actually start. If this were not the case, the backfill just would not start.
 
 
+.. _faq:what-does-execution-date-mean:
+
 What does ``execution_date`` mean?
 ----------------------------------
 
@@ -248,6 +250,11 @@ misunderstandings.
 Note that ``ds`` (the YYYY-MM-DD form of ``data_interval_start``) refers to
 *date* ***string***, not *date* ***start*** as may be confusing to some.
 
+.. tip::
+
+    For more information on ``logical date``, see :ref:`data-interval` and
+    :ref:`concepts:dag-run`.
+
 
 How to create DAGs dynamically?
 -------------------------------

[airflow] 25/43: Fix the incorrect scheduling time for the first run of dag (#21011)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 64e0c5024b3cb13d2fc53f42b8096c2ae3441553
Author: wano <55...@users.noreply.github.com>
AuthorDate: Mon Feb 7 02:02:57 2022 +0800

    Fix the incorrect scheduling time for the first run of dag (#21011)
    
    When Catchup_by_default is set to false and start_date in the DAG is the
    previous day, the first schedule time for this DAG may be incorrect
    
    Co-authored-by: wanlce <wh...@foxmail.com>
    (cherry picked from commit 0bcca55f4881bacc3fbe86f69e71981f5552b398)
---
 airflow/timetables/interval.py              |  2 +-
 tests/timetables/test_interval_timetable.py | 21 +++++++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index d669cb6..01fac3a 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -218,7 +218,7 @@ class CronDataIntervalTimetable(_DataIntervalTimetable):
             raise AssertionError("next schedule shouldn't be earlier")
         if earliest is None:
             return new_start
-        return max(new_start, earliest)
+        return max(new_start, self._align(earliest))
 
     def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
         # Get the last complete period before run_after, e.g. if a DAG run is
diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py
index 842cc1f2..fe09e0c 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -35,11 +35,32 @@ PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1)
 PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END)
 
 CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE)
+YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1)
 
 HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE)
 HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
 HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1))
 
+CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", TIMEZONE)
+DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
+
+
+@pytest.mark.parametrize(
+    "last_automated_data_interval",
+    [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")],
+)
+@freezegun.freeze_time(CURRENT_TIME)
+def test_no_catchup_first_starts_at_current_time(
+    last_automated_data_interval: Optional[DataInterval],
+) -> None:
+    """If ``catchup=False`` and start_date is a day before"""
+    next_info = CRON_TIMETABLE.next_dagrun_info(
+        last_automated_data_interval=last_automated_data_interval,
+        restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False),
+    )
+    expected_start = YESTERDAY + DELTA_FROM_MIDNIGHT
+    assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT)
+
 
 @pytest.mark.parametrize(
     "timetable",

[airflow] 41/43: Add note about Variable precedence with env vars (#21568)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7e8012703cb7d79386b5c59e076a81dad60eabf3
Author: Madison Swain-Bowden <bo...@spu.edu>
AuthorDate: Tue Feb 15 13:56:00 2022 -0800

    Add note about Variable precedence with env vars (#21568)
    
    This PR updates some documentation regarding setting Airflow Variables using environment variables. Environment variables take precedence over variables defined in the UI/metastore based on this default search path list: https://github.dev/apache/airflow/blob/7864693e43c40fd8f0914c05f7e196a007d16d50/airflow/secrets/__init__.py#L29-L30
    
    (cherry picked from commit 7a268cb3c9fc6bc03f2400c6632ff8dccf4e451e)
---
 docs/apache-airflow/howto/variable.rst | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/docs/apache-airflow/howto/variable.rst b/docs/apache-airflow/howto/variable.rst
index 7cb9377..401dcb1 100644
--- a/docs/apache-airflow/howto/variable.rst
+++ b/docs/apache-airflow/howto/variable.rst
@@ -62,7 +62,8 @@ You can use them in your DAGs as:
     Single underscores surround ``VAR``.  This is in contrast with the way ``airflow.cfg``
     parameters are stored, where double underscores surround the config section name.
     Variables set using Environment Variables would not appear in the Airflow UI but you will
-    be able to use them in your DAG file.
+    be able to use them in your DAG file. Variables set using Environment Variables will also
+    take precedence over variables defined in the Airflow UI.
 
 Securing Variables
 ------------------

[airflow] 14/43: Removed duplicated dag_run join in Dag.get_task_instances() (#20591)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6d8342e78c6b6546845a7d2d5ba0a761af73d5f0
Author: hubert-pietron <94...@users.noreply.github.com>
AuthorDate: Thu Jan 27 06:20:17 2022 +0100

    Removed duplicated dag_run join in Dag.get_task_instances() (#20591)
    
    Co-authored-by: hubert-pietron <hu...@gmail.com>
    (cherry picked from commit 960f573615b5357677c10bd9f7ec11811a0355c6)
---
 airflow/models/dag.py | 1 -
 1 file changed, 1 deletion(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 2a08d26..477e597 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1343,7 +1343,6 @@ class DAG(LoggingMixin):
                 as_pk_tuple=False,
                 session=session,
             )
-            .join(TaskInstance.dag_run)
             .order_by(DagRun.execution_date)
             .all()
         )

[airflow] 39/43: Fix slow DAG deletion due to missing ``dag_id`` index for job table (#20282)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 436f452ab8e32bfd5997e9650d1cfc490a41b0e4
Author: Kush <36...@users.noreply.github.com>
AuthorDate: Thu Dec 30 15:56:24 2021 +0530

    Fix slow DAG deletion due to missing ``dag_id`` index for job table (#20282)
    
    Fixes #20249
    
    (cherry picked from commit ac9f29da200c208bb52d412186c5a1b936eb0b5a)
---
 airflow/jobs/base_job.py                           |  1 +
 .../587bdf053233_adding_index_for_dag_id_in_job.py | 43 ++++++++++++++++++++++
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 3 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 745f248..174e4d5 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -71,6 +71,7 @@ class BaseJob(Base, LoggingMixin):
     __table_args__ = (
         Index('job_type_heart', job_type, latest_heartbeat),
         Index('idx_job_state_heartbeat', state, latest_heartbeat),
+        Index('idx_job_dag_id', dag_id),
     )
 
     task_instances_enqueued = relationship(
diff --git a/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py b/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py
new file mode 100644
index 0000000..3532fe9
--- /dev/null
+++ b/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""adding index for dag_id in job
+
+Revision ID: 587bdf053233
+Revises: f9da662e7089
+Create Date: 2021-12-14 10:20:12.482940
+
+"""
+
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = '587bdf053233'
+down_revision = 'f9da662e7089'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply adding index for dag_id in job"""
+    op.create_index('idx_job_dag_id', 'job', ['dag_id'], unique=False)
+
+
+def downgrade():
+    """Unapply adding index for dag_id in job"""
+    op.drop_index('idx_job_dag_id', table_name='job')
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 8dc1a55..0eac329 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``c381b21cb7e4`` (head)        | ``be2bfac3da23`` | ``2.2.4``       | Create a ``session`` table to store web session data                                  |
+| ``587bdf053233`` (head)        | ``f9da662e7089`` | ``2.3.0``       | Add index for ``dag_id`` column in ``job`` table.                                     |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``c381b21cb7e4``               | ``be2bfac3da23`` | ``2.2.4``       | Create a ``session`` table to store web session data                                  |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``be2bfac3da23``               | ``7b2661a43ba3`` | ``2.2.3``       | Add has_import_errors column to DagModel                                              |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+

[airflow] 13/43: Avoid unintentional data loss when deleting DAGs (#20758)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4dc8b909bedd04094be3079c3f7384ea044ec011
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Mon Jan 10 11:55:51 2022 -0800

    Avoid unintentional data loss when deleting DAGs (#20758)
    
    (cherry picked from commit 5980d2b05eee484256c634d5efae9410265c65e9)
---
 airflow/api/common/delete_dag.py    | 18 +++++++++++++++---
 tests/api/common/test_delete_dag.py | 14 ++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py
index c448127..5e0afa8 100644
--- a/airflow/api/common/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -18,7 +18,7 @@
 """Delete DAGs APIs."""
 import logging
 
-from sqlalchemy import or_
+from sqlalchemy import and_, or_
 
 from airflow import models
 from airflow.exceptions import AirflowException, DagNotFound
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also delete all of its subdags
+    dags_to_delete_query = session.query(DagModel.dag_id).filter(
+        or_(
+            DagModel.dag_id == dag_id,
+            and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
+        )
+    )
+    dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]
+
     # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
     # There may be a lag, so explicitly removes serialized DAG here.
     if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
@@ -65,8 +74,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
         if hasattr(model, "dag_id"):
             if keep_records_in_log and model.__name__ == 'Log':
                 continue
-            cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
-            count += session.query(model).filter(cond).delete(synchronize_session='fetch')
+            count += (
+                session.query(model)
+                .filter(model.dag_id.in_(dags_to_delete))
+                .delete(synchronize_session='fetch')
+            )
     if dag.is_subdag:
         parent_dag_id, task_id = dag_id.rsplit(".", 1)
         for model in TaskFail, models.TaskInstance:
diff --git a/tests/api/common/test_delete_dag.py b/tests/api/common/test_delete_dag.py
index 0eb058a..d9dc0b0 100644
--- a/tests/api/common/test_delete_dag.py
+++ b/tests/api/common/test_delete_dag.py
@@ -162,3 +162,17 @@ class TestDeleteDAGSuccessfulDelete:
         self.check_dag_models_exists()
         delete_dag(dag_id=self.key, keep_records_in_log=False)
         self.check_dag_models_removed(expect_logs=0)
+
+    def test_delete_dag_preserves_other_dags(self):
+
+        self.setup_dag_models()
+
+        with create_session() as session:
+            session.add(DM(dag_id=self.key + ".other_dag", fileloc=self.dag_file_path))
+            session.add(DM(dag_id=self.key + ".subdag", fileloc=self.dag_file_path, is_subdag=True))
+
+        delete_dag(self.key)
+
+        with create_session() as session:
+            assert session.query(DM).filter(DM.dag_id == self.key + ".other_dag").count() == 1
+            assert session.query(DM).filter(DM.dag_id.like(self.key + "%")).count() == 1

[airflow] 29/43: Avoid deadlock when rescheduling task (#21362)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f2fe0df6b3caa86a4315322264fad077f03b32e6
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Feb 7 20:12:05 2022 +0100

    Avoid deadlock when rescheduling task (#21362)
    
    The scheduler job performs scheduling after locking the "scheduled"
    DagRun row for writing. This should prevent from modifying DagRun
    and related task instances by another scheduler or "mini-scheduler"
    run after task is completed.
    
    However there is apparently one more case where the DagRun is being
    locked by "Task" processes - namely when task throws
    AirflowRescheduleException. In this case a new "TaskReschedule"
    entity is inserted into the database and it also performs lock
    on the DagRun (because TaskReschedule has "DagRun" relationship.
    
    This PR modifies handling the AirflowRescheduleException to obtain the
    very same DagRun lock before it attempts to insert TaskReschedule
    entity.
    
    Seems that TaskReschedule is the only one that has this relationship
    so likely all the misterious SchedulerJob deadlock cases we
    experienced might be explained (and fixed) by this one.
    
    It is likely that this one:
    
    * Fixes: #16982
    * Fixes: #19957
    
    (cherry picked from commit 6d110b565a505505351d1ff19592626fb24e4516)
---
 airflow/models/taskinstance.py | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index ec34156..2dcc923 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -93,7 +93,7 @@ from airflow.utils.operator_helpers import context_to_airflow_vars
 from airflow.utils.platform import getuser
 from airflow.utils.retries import run_with_db_retries
 from airflow.utils.session import create_session, provide_session
-from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks
 from airflow.utils.state import DagRunState, State
 from airflow.utils.timeout import timeout
 
@@ -1657,11 +1657,24 @@ class TaskInstance(Base, LoggingMixin):
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()
+
         # Log reschedule request
         session.add(
             TaskReschedule(

[airflow] 11/43: Fix session usage in ``/rendered-k8s`` view (#21006)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit daebc586d0aaaddaea4658734c9292dece150c6a
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Fri Jan 21 21:44:40 2022 +0800

    Fix session usage in ``/rendered-k8s`` view (#21006)
    
    We can't commit the session too early because later functions need that
    session to fetch related objects.
    
    Fix #20534.
    
    (cherry picked from commit a665f48b606065977e0d3952bc74635ce11726d1)
---
 airflow/www/views.py | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 9b868f3..2182a17 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -84,7 +84,7 @@ from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
 from sqlalchemy import Date, and_, desc, func, inspect, union_all
 from sqlalchemy.exc import IntegrityError
-from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import Session, joinedload
 from wtforms import SelectField, validators
 from wtforms.validators import InputRequired
 
@@ -116,7 +116,7 @@ from airflow.utils.docs import get_doc_url_for_provider, get_docs_url
 from airflow.utils.helpers import alchemy_to_dict
 from airflow.utils.log import secrets_masker
 from airflow.utils.log.log_reader import TaskLogReader
-from airflow.utils.session import create_session, provide_session
+from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import State
 from airflow.utils.strings import to_boolean
 from airflow.version import version
@@ -1124,7 +1124,8 @@ class Airflow(AirflowBaseView):
         ]
     )
     @action_logging
-    def rendered_k8s(self):
+    @provide_session
+    def rendered_k8s(self, session: Session = NEW_SESSION):
         """Get rendered k8s yaml."""
         if not settings.IS_K8S_OR_K8SCELERY_EXECUTOR:
             abort(404)
@@ -1135,14 +1136,15 @@ class Airflow(AirflowBaseView):
         form = DateTimeForm(data={'execution_date': dttm})
         root = request.args.get('root', '')
         logging.info("Retrieving rendered templates.")
-        dag = current_app.dag_bag.get_dag(dag_id)
+
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
         task = dag.get_task(task_id)
-        dag_run = dag.get_dagrun(execution_date=dttm)
-        ti = dag_run.get_task_instance(task_id=task.task_id)
+        dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+        ti = dag_run.get_task_instance(task_id=task.task_id, session=session)
 
         pod_spec = None
         try:
-            pod_spec = ti.get_rendered_k8s_spec()
+            pod_spec = ti.get_rendered_k8s_spec(session=session)
         except AirflowException as e:
             msg = "Error rendering Kubernetes POD Spec: " + escape(e)
             if e.__cause__:

[airflow] 07/43: Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 07102e96dfb3c9794882f562548b37738ee4a37a
Author: yuqian90 <yu...@gmail.com>
AuthorDate: Thu Jan 27 06:47:10 2022 +0800

    Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)
    
    (cherry picked from commit e3832a77a3e0d374dfdbe14f34a941d22c9c459d)
---
 airflow/models/taskinstance.py    | 4 +++-
 tests/models/test_taskinstance.py | 6 ++++++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 281d067..ec34156 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -447,6 +447,7 @@ class TaskInstance(Base, LoggingMixin):
         self.run_id = run_id
 
         self.try_number = 0
+        self.max_tries = self.task.retries
         self.unixname = getuser()
         if state:
             self.state = state
@@ -775,7 +776,8 @@ class TaskInstance(Base, LoggingMixin):
         self.pool_slots = task.pool_slots
         self.priority_weight = task.priority_weight_total
         self.run_as_user = task.run_as_user
-        self.max_tries = task.retries
+        # Do not set max_tries to task.retries here because max_tries is a cumulative
+        # value that needs to be stored in the db.
         self.executor_config = task.executor_config
         self.operator = task.task_type
 
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index d111371..4fec49f 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2143,6 +2143,12 @@ def test_refresh_from_task(pool_override):
     assert ti.executor_config == task.executor_config
     assert ti.operator == DummyOperator.__name__
 
+    # Test that refresh_from_task does not reset ti.max_tries
+    expected_max_tries = task.retries + 10
+    ti.max_tries = expected_max_tries
+    ti.refresh_from_task(task)
+    assert ti.max_tries == expected_max_tries
+
 
 class TestRunRawTaskQueriesCount:
     """

[airflow] 22/43: Update recipe for Google Cloud SDK (#21268)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4b3fa3a99a90eff00b244a62b52a2d6c8e25d285
Author: Kamil Breguła <mi...@users.noreply.github.com>
AuthorDate: Thu Feb 3 19:20:12 2022 +0100

    Update recipe for Google Cloud SDK (#21268)
    
    (cherry picked from commit 874a22ee9b77f8f100736558723ceaf2d04b446b)
---
 docs/docker-stack/docker-images-recipes/gcloud.Dockerfile | 1 +
 1 file changed, 1 insertion(+)

diff --git a/docs/docker-stack/docker-images-recipes/gcloud.Dockerfile b/docs/docker-stack/docker-images-recipes/gcloud.Dockerfile
index b1589e1..48f7c2d 100644
--- a/docs/docker-stack/docker-images-recipes/gcloud.Dockerfile
+++ b/docs/docker-stack/docker-images-recipes/gcloud.Dockerfile
@@ -36,6 +36,7 @@ RUN DOWNLOAD_URL="https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/goo
        --additional-components alpha beta kubectl \
        --quiet \
     && rm -rf "${TMP_DIR}" \
+    && rm -rf "${GCLOUD_HOME}/.install/.backup/" \
     && gcloud --version
 
 USER ${AIRFLOW_UID}

[airflow] 04/43: Add back legacy .piprc customization for pip (#21124)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 680c011f3050d2858e5d648b68e59185a213709c
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Wed Jan 26 18:04:19 2022 +0100

    Add back legacy .piprc customization for pip (#21124)
    
    This change brings back backwards compatibility to using .piprc
    to customize Airflow Image. Some older vrsions of pip used .piprc
    (even though documentation about is difficult to find now) and we
    used to support this option. With #20445, we changed to use
    (fully documented) ``pip.conf`` option, however if someone used
    .piprc before to customize their image, this change would break it.
    
    The PR brings back also the .piprc option to the image (even if
    it is not really clear whether current and future versions of pip
    will support it.
    
    (cherry picked from commit d5a9edf25723396d17fd10bb980fb99ccac618bb)
---
 Dockerfile                  |  3 +++
 docs/docker-stack/build.rst | 10 +++++++++-
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/Dockerfile b/Dockerfile
index f880ec5..aadf896 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -212,6 +212,9 @@ USER airflow
 RUN if [[ -f /docker-context-files/pip.conf ]]; then \
         mkdir -p ${AIRFLOW_USER_HOME_DIR}/.config/pip; \
         cp /docker-context-files/pip.conf "${AIRFLOW_USER_HOME_DIR}/.config/pip/pip.conf"; \
+    fi; \
+    if [[ -f /docker-context-files/.piprc ]]; then \
+        cp /docker-context-files/.piprc "${AIRFLOW_USER_HOME_DIR}/.piprc"; \
     fi
 
 ENV AIRFLOW_PIP_VERSION=${AIRFLOW_PIP_VERSION} \
diff --git a/docs/docker-stack/build.rst b/docs/docker-stack/build.rst
index 2702c66..b85bf1c 100644
--- a/docs/docker-stack/build.rst
+++ b/docs/docker-stack/build.rst
@@ -522,13 +522,21 @@ described below but here is an example of rather complex command to customize th
 based on example in `this comment <https://github.com/apache/airflow/issues/8605#issuecomment-690065621>`_:
 
 In case you need to use your custom PyPI package indexes, you can also customize PYPI sources used during
-image build by adding a ``docker-context-files``/``pip.conf`` file when building the image.
+image build by adding a ``docker-context-files/pip.conf`` file when building the image.
 This ``pip.conf`` will not be committed to the repository (it is added to ``.gitignore``) and it will not be
 present in the final production image. It is added and used only in the build segment of the image.
 Therefore this ``pip.conf`` file can safely contain list of package indexes you want to use,
 usernames and passwords used for authentication. More details about ``pip.conf`` file can be found in the
 `pip configuration <https://pip.pypa.io/en/stable/topics/configuration/>`_.
 
+If you used the ``.piprc`` before (some older versions of ``pip`` used it for customization), you can put it
+in the ``docker-context-files/.piprc`` file and it will be automatically copied to ``HOME`` directory
+of the ``airflow`` user.
+
+Note, that those customizations are only available in the ``build`` segment of the Airflow image and they
+are not present in the ``final`` image. If you wish to extend the final image and add custom ``.piprc`` and
+``pip.conf``, you should add them in your own Dockerfile used to extend the Airflow image.
+
 Such customizations are independent of the way how airflow is installed.
 
 .. note::

[airflow] 03/43: Update logging-tasks.rst (#21088)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9f6d6b9a13d808c0faff899f79a7bbcaf78fc5aa
Author: caxefaizan <63...@users.noreply.github.com>
AuthorDate: Wed Jan 26 04:15:58 2022 +0530

    Update logging-tasks.rst (#21088)
    
    (cherry picked from commit 156284650f20bad131f26b91061e207e2e39253e)
---
 docs/apache-airflow/logging-monitoring/logging-tasks.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/logging-monitoring/logging-tasks.rst
index 13cb248..e64905f 100644
--- a/docs/apache-airflow/logging-monitoring/logging-tasks.rst
+++ b/docs/apache-airflow/logging-monitoring/logging-tasks.rst
@@ -122,7 +122,7 @@ Serving logs from workers
 
 Most task handlers send logs upon completion of a task. In order to view logs in real time, Airflow automatically starts an HTTP server to serve the logs in the following cases:
 
-- If ``SchedulerExecutor`` or ``LocalExecutor`` is used, then when ``airflow scheduler`` is running.
+- If ``SequentialExecutor`` or ``LocalExecutor`` is used, then when ``airflow scheduler`` is running.
 - If ``CeleryExecutor`` is used, then when ``airflow worker`` is running.
 
 The server is running on the port specified by ``worker_log_server_port`` option in ``[logging]`` section. By default, it is ``8793``.

[airflow] 01/43: Update v1.yaml (#21024)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 31c66eb9d7722ba751ecee3540b350cb685891f0
Author: Ilia Lazebnik <Il...@gmail.com>
AuthorDate: Sun Jan 23 21:29:52 2022 +0200

    Update v1.yaml (#21024)
    
    (cherry picked from commit 2af0f700857cbf7401d930ff24cdff273b501beb)
---
 airflow/api_connexion/openapi/v1.yaml | 2 --
 1 file changed, 2 deletions(-)

diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 3669c66..e7553a3 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2161,8 +2161,6 @@ components:
 
             The value of this field can be set only when creating the object. If you try to modify the
             field of an existing object, the request fails with an BAD_REQUEST error.
-      required:
-        - dag_id
 
     UpdateDagRunState:
       type: object

[airflow] 20/43: Update version to 2.2.4 for things in that release (#21196)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2066812960018ce0d7ba774dd2f9fe5c0d8b52a4
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Feb 1 12:05:04 2022 -0700

    Update version to 2.2.4 for things in that release (#21196)
    
    (cherry picked from commit 093702e9f579ee028a103cdc9acf0e6acccd6d79)
---
 airflow/api/common/experimental/get_code.py          | 2 +-
 airflow/api/common/experimental/get_dag_run_state.py | 2 +-
 airflow/api/common/experimental/get_task.py          | 2 +-
 airflow/api/common/experimental/get_task_instance.py | 2 +-
 airflow/api/common/experimental/pool.py              | 8 ++++----
 5 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/airflow/api/common/experimental/get_code.py b/airflow/api/common/experimental/get_code.py
index 1a1fb62..d4232b1 100644
--- a/airflow/api/common/experimental/get_code.py
+++ b/airflow/api/common/experimental/get_code.py
@@ -23,7 +23,7 @@ from airflow.exceptions import AirflowException, DagCodeNotFound
 from airflow.models.dagcode import DagCode
 
 
-@deprecated(reason="Use DagCode().get_code_by_fileloc() instead", version="2.2.3")
+@deprecated(reason="Use DagCode().get_code_by_fileloc() instead", version="2.2.4")
 def get_code(dag_id: str) -> str:
     """Return python code of a given dag_id.
 
diff --git a/airflow/api/common/experimental/get_dag_run_state.py b/airflow/api/common/experimental/get_dag_run_state.py
index b2dedd5..7201186 100644
--- a/airflow/api/common/experimental/get_dag_run_state.py
+++ b/airflow/api/common/experimental/get_dag_run_state.py
@@ -24,7 +24,7 @@ from deprecated import deprecated
 from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
 
 
-@deprecated(reason="Use DagRun().get_state() instead", version="2.2.3")
+@deprecated(reason="Use DagRun().get_state() instead", version="2.2.4")
 def get_dag_run_state(dag_id: str, execution_date: datetime) -> Dict[str, str]:
     """Return the Dag Run state identified by the given dag_id and execution_date.
 
diff --git a/airflow/api/common/experimental/get_task.py b/airflow/api/common/experimental/get_task.py
index fae5fd7..4589cc6 100644
--- a/airflow/api/common/experimental/get_task.py
+++ b/airflow/api/common/experimental/get_task.py
@@ -22,7 +22,7 @@ from airflow.api.common.experimental import check_and_get_dag
 from airflow.models import TaskInstance
 
 
-@deprecated(reason="Use DAG().get_task", version="2.2.3")
+@deprecated(reason="Use DAG().get_task", version="2.2.4")
 def get_task(dag_id: str, task_id: str) -> TaskInstance:
     """Return the task object identified by the given dag_id and task_id."""
     dag = check_and_get_dag(dag_id, task_id)
diff --git a/airflow/api/common/experimental/get_task_instance.py b/airflow/api/common/experimental/get_task_instance.py
index 137f8a3..7361efd 100644
--- a/airflow/api/common/experimental/get_task_instance.py
+++ b/airflow/api/common/experimental/get_task_instance.py
@@ -25,7 +25,7 @@ from airflow.exceptions import TaskInstanceNotFound
 from airflow.models import TaskInstance
 
 
-@deprecated(version="2.2.3", reason="Use DagRun.get_task_instance instead")
+@deprecated(version="2.2.4", reason="Use DagRun.get_task_instance instead")
 def get_task_instance(dag_id: str, task_id: str, execution_date: datetime) -> TaskInstance:
     """Return the task instance identified by the given dag_id, task_id and execution_date."""
     dag = check_and_get_dag(dag_id, task_id)
diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py
index 0b9c3a5..fe4f161 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -23,7 +23,7 @@ from airflow.models import Pool
 from airflow.utils.session import provide_session
 
 
-@deprecated(reason="Use Pool.get_pool() instead", version="2.2.3")
+@deprecated(reason="Use Pool.get_pool() instead", version="2.2.4")
 @provide_session
 def get_pool(name, session=None):
     """Get pool by a given name."""
@@ -37,14 +37,14 @@ def get_pool(name, session=None):
     return pool
 
 
-@deprecated(reason="Use Pool.get_pools() instead", version="2.2.3")
+@deprecated(reason="Use Pool.get_pools() instead", version="2.2.4")
 @provide_session
 def get_pools(session=None):
     """Get all pools."""
     return session.query(Pool).all()
 
 
-@deprecated(reason="Use Pool.create_pool() instead", version="2.2.3")
+@deprecated(reason="Use Pool.create_pool() instead", version="2.2.4")
 @provide_session
 def create_pool(name, slots, description, session=None):
     """Create a pool with a given parameters."""
@@ -75,7 +75,7 @@ def create_pool(name, slots, description, session=None):
     return pool
 
 
-@deprecated(reason="Use Pool.delete_pool() instead", version="2.2.3")
+@deprecated(reason="Use Pool.delete_pool() instead", version="2.2.4")
 @provide_session
 def delete_pool(name, session=None):
     """Delete pool by a given name."""

[airflow] 27/43: Update example DAGs (#21372)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5c078cda332d42baba72c985532e0060d30a31ef
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Feb 7 11:24:31 2022 -0700

    Update example DAGs (#21372)
    
    (cherry picked from commit 7a38ec2ad3b3bd6fda5e1ee9fe9e644ccb8b4c12)
---
 .../example_dags/example_passing_params_via_test_command.py   | 11 ++++++-----
 airflow/example_dags/tutorial.py                              |  2 --
 docs/apache-airflow/tutorial.rst                              |  9 ++-------
 tests/cli/commands/test_task_command.py                       |  1 -
 4 files changed, 8 insertions(+), 15 deletions(-)

diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index e3f04c4..d4781af 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -68,17 +68,18 @@ with DAG(
 ) as dag:
     run_this = my_py_command(params={"miff": "agg"})
 
-    my_templated_command = dedent(
+    my_command = dedent(
+        """
+        echo "'foo' was passed in via Airflow CLI Test command with value '$FOO'"
+        echo "'miff' was passed in via BashOperator with value '$MIFF'"
         """
-        echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
-        echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
-    """
     )
 
     also_run_this = BashOperator(
         task_id='also_run_this',
-        bash_command=my_templated_command,
+        bash_command=my_command,
         params={"miff": "agg"},
+        env={"FOO": "{{ params.foo }}", "MIFF": "{{ params.miff }}"},
     )
 
     env_var_test_task = print_env_vars()
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index 1049772..ff2bd2f 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -109,7 +109,6 @@ with DAG(
     {% for i in range(5) %}
         echo "{{ ds }}"
         echo "{{ macros.ds_add(ds, 7)}}"
-        echo "{{ params.my_param }}"
     {% endfor %}
     """
     )
@@ -118,7 +117,6 @@ with DAG(
         task_id='templated',
         depends_on_past=False,
         bash_command=templated_command,
-        params={'my_param': 'Parameter I passed in'},
     )
     # [END jinja_template]
 
diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 1c32e78..0034b2c 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -151,13 +151,8 @@ stamp").
     :end-before: [END jinja_template]
 
 Notice that the ``templated_command`` contains code logic in ``{% %}`` blocks,
-references parameters like ``{{ ds }}``, calls a function as in
-``{{ macros.ds_add(ds, 7)}}``, and references a user-defined parameter
-in ``{{ params.my_param }}``.
-
-The ``params`` hook in ``BaseOperator`` allows you to pass a dictionary of
-parameters and/or objects to your templates. Please take the time
-to understand how the parameter ``my_param`` makes it through to the template.
+references parameters like ``{{ ds }}``, and calls a function as in
+``{{ macros.ds_add(ds, 7)}}``.
 
 Files can also be passed to the ``bash_command`` argument, like
 ``bash_command='templated_command.sh'``, where the file location is relative to
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index 201af16..76c6cdb 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -263,7 +263,6 @@ class TestCliTasks(unittest.TestCase):
 
         assert 'echo "2016-01-01"' in output
         assert 'echo "2016-01-08"' in output
-        assert 'echo "Parameter I passed in"' in output
 
     def test_cli_run_when_pickle_and_dag_cli_method_selected(self):
         """

[airflow] 34/43: Use compat data interval shim in log handlers (#21289)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 79e995480822fd68c715c6ab5d83357721fa2d55
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Sat Feb 12 11:40:29 2022 +0800

    Use compat data interval shim in log handlers (#21289)
    
    (cherry picked from commit 44bd211b19dcb75eeb53ced5bea2cf0c80654b1a)
---
 .../providers/elasticsearch/log/es_task_handler.py | 27 ++++++++++++-----
 airflow/utils/log/file_task_handler.py             | 35 +++++++++++++++++-----
 2 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index cd08971..b591aef 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -101,15 +101,25 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         self.context_set = False
 
     def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
-        dag_run = ti.dag_run
+        dag_run = ti.get_dagrun()
+        try:
+            data_interval: Tuple[datetime, datetime] = ti.task.dag.get_run_data_interval(dag_run)
+        except AttributeError:  # ti.task is not always set.
+            data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
 
         if self.json_format:
-            data_interval_start = self._clean_date(dag_run.data_interval_start)
-            data_interval_end = self._clean_date(dag_run.data_interval_end)
+            data_interval_start = self._clean_date(data_interval[0])
+            data_interval_end = self._clean_date(data_interval[1])
             execution_date = self._clean_date(dag_run.execution_date)
         else:
-            data_interval_start = dag_run.data_interval_start.isoformat()
-            data_interval_end = dag_run.data_interval_end.isoformat()
+            if data_interval[0]:
+                data_interval_start = data_interval[0].isoformat()
+            else:
+                data_interval_start = ""
+            if data_interval[1]:
+                data_interval_end = data_interval[1].isoformat()
+            else:
+                data_interval_end = ""
             execution_date = dag_run.execution_date.isoformat()
 
         return self.log_id_template.format(
@@ -123,14 +133,15 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         )
 
     @staticmethod
-    def _clean_date(value: datetime) -> str:
+    def _clean_date(value: Optional[datetime]) -> str:
         """
         Clean up a date value so that it is safe to query in elasticsearch
         by removing reserved characters.
-        # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
 
-        :param execution_date: execution date of the dag run.
+        https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
         """
+        if value is None:
+            return ""
         return value.strftime("%Y_%m_%dT%H_%M_%S_%f")
 
     def _group_logs_by_host(self, logs):
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6e57c67..e13b8d4 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -18,8 +18,9 @@
 """File logging handler for tasks."""
 import logging
 import os
+from datetime import datetime
 from pathlib import Path
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, Optional, Tuple
 
 import httpx
 from itsdangerous import TimedJSONWebSignatureSerializer
@@ -82,13 +83,31 @@ class FileTaskHandler(logging.Handler):
                 context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat())
             context["try_number"] = try_number
             return render_template_to_string(self.filename_jinja_template, context)
-
-        return self.filename_template.format(
-            dag_id=ti.dag_id,
-            task_id=ti.task_id,
-            execution_date=ti.get_dagrun().logical_date.isoformat(),
-            try_number=try_number,
-        )
+        elif self.filename_template:
+            dag_run = ti.get_dagrun()
+            try:
+                data_interval: Tuple[datetime, datetime] = ti.task.dag.get_run_data_interval(dag_run)
+            except AttributeError:  # ti.task is not always set.
+                data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
+            if data_interval[0]:
+                data_interval_start = data_interval[0].isoformat()
+            else:
+                data_interval_start = ""
+            if data_interval[1]:
+                data_interval_end = data_interval[1].isoformat()
+            else:
+                data_interval_end = ""
+            return self.filename_template.format(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                run_id=ti.run_id,
+                data_interval_start=data_interval_start,
+                data_interval_end=data_interval_end,
+                execution_date=ti.get_dagrun().logical_date.isoformat(),
+                try_number=try_number,
+            )
+        else:
+            raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen")
 
     def _read_grouped_logs(self):
         return False

[airflow] 42/43: Adding missing login provider related methods from Flask-Appbuilder (#21294)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8cbf9340ec020810b505d0ccf197435eb0e8a704
Author: Pankaj Singh <aa...@gmail.com>
AuthorDate: Fri Feb 18 02:25:22 2022 +0530

    Adding missing login provider related methods from Flask-Appbuilder (#21294)
    
    (cherry picked from commit 38894e8013b5c38468e912164f80282e3b579993)
---
 airflow/www/fab_security/manager.py | 15 +++++++++++++++
 setup.cfg                           |  7 ++++++-
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/airflow/www/fab_security/manager.py b/airflow/www/fab_security/manager.py
index e340c17..f5385a6 100644
--- a/airflow/www/fab_security/manager.py
+++ b/airflow/www/fab_security/manager.py
@@ -187,6 +187,7 @@ class BaseSecurityManager:
         # Role Mapping
         app.config.setdefault("AUTH_ROLES_MAPPING", {})
         app.config.setdefault("AUTH_ROLES_SYNC_AT_LOGIN", False)
+        app.config.setdefault("AUTH_API_LOGIN_ALLOW_MULTIPLE_PROVIDERS", False)
 
         # LDAP Config
         if self.auth_type == AUTH_LDAP:
@@ -293,11 +294,21 @@ class BaseSecurityManager:
         return _roles
 
     @property
+    def auth_type_provider_name(self):
+        provider_to_auth_type = {AUTH_DB: "db", AUTH_LDAP: "ldap"}
+        return provider_to_auth_type.get(self.auth_type)
+
+    @property
     def get_url_for_registeruser(self):
         """Gets the URL for Register User"""
         return url_for(f"{self.registeruser_view.endpoint}.{self.registeruser_view.default_view}")
 
     @property
+    def get_user_datamodel(self):
+        """Gets the User data model"""
+        return self.user_view.datamodel
+
+    @property
     def get_register_user_datamodel(self):
         """Gets the Register User data model"""
         return self.registerusermodelview.datamodel
@@ -308,6 +319,10 @@ class BaseSecurityManager:
         return self._builtin_roles
 
     @property
+    def api_login_allow_multiple_providers(self):
+        return self.appbuilder.get_app.config["AUTH_API_LOGIN_ALLOW_MULTIPLE_PROVIDERS"]
+
+    @property
     def auth_type(self):
         """Get the auth type"""
         return self.appbuilder.get_app.config["AUTH_TYPE"]
diff --git a/setup.cfg b/setup.cfg
index 8e36d06..12bdeae 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -104,7 +104,12 @@ install_requires =
     #      https://github.com/readthedocs/sphinx_rtd_theme/issues/1115
     docutils<0.17
     flask>=1.1.0, <2.0
-    flask-appbuilder>=3.3.4, <4.0.0
+    # We are tightly coupled with FAB version because we vendored in part of FAB code related to security manager
+    # This is done as part of preparation to removing FAB as dependency, but we are not ready for it yet
+    # Every time we update FAB version here, please make sure that you review the classes and models in
+    # `airflow/www/fab_security` with their upstream counterparts. In particular, make sure any breaking changes,
+    # for example any new methods, are accounted for.
+    flask-appbuilder==3.4.4
     flask-caching>=1.5.0, <2.0.0
     flask-login>=0.3, <0.5
     # Strict upper-bound on the latest release of flask-session,

[airflow] 33/43: Fix postgres hook import pipeline tutorial (#21491)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit efc281829c6d9458ae83e026afcc753fe935ba75
Author: KevinYanesG <75...@users.noreply.github.com>
AuthorDate: Thu Feb 10 15:38:53 2022 +0100

    Fix postgres hook import pipeline tutorial (#21491)
    
    (cherry picked from commit a2abf663157aea14525e1a55eb9735ba659ae8d6)
---
 docs/apache-airflow/tutorial.rst | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 0034b2c..a8f76bc 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -413,7 +413,7 @@ Let's break this down into 2 steps: get data & merge data:
 
   import requests
   from airflow.decorators import task
-  from airflow.hooks.postgres import PostgresHook
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
 
 
   @task
@@ -478,7 +478,7 @@ Lets look at our DAG:
 
   import requests
   from airflow.decorators import dag, task
-  from airflow.hooks.postgres import PostgresHook
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
 
 
   @dag(

[airflow] 31/43: Fix TriggerDagRunOperator extra link (#19410)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 95eaef37f621e8abaa30bb145866f1863471fdd6
Author: Niko <on...@amazon.com>
AuthorDate: Thu Dec 9 05:46:59 2021 -0800

    Fix TriggerDagRunOperator extra link (#19410)
    
    The extra link provided by the operator was previously using the
    execution date of the triggering dag, not the triggered dag. Store the
    execution date of the triggered dag in xcom so that it can be read back
    later within the webserver when the link is being created.
    
    (cherry picked from commit 820e836c4a2e45239279d4d71e1db9434022fec5)
---
 airflow/operators/trigger_dagrun.py    | 19 ++++++++++++-
 tests/operators/test_trigger_dagrun.py | 49 +++++++++++++++++++++++++++-------
 2 files changed, 57 insertions(+), 11 deletions(-)

diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 421c796..a346db1 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -24,11 +24,15 @@ from typing import Dict, List, Optional, Union
 from airflow.api.common.trigger_dag import trigger_dag
 from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists
 from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
+from airflow.models.xcom import XCom
 from airflow.utils import timezone
 from airflow.utils.helpers import build_airflow_url_with_query
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
 
+XCOM_EXECUTION_DATE_ISO = "trigger_execution_date_iso"
+XCOM_RUN_ID = "trigger_run_id"
+
 
 class TriggerDagRunLink(BaseOperatorLink):
     """
@@ -39,7 +43,13 @@ class TriggerDagRunLink(BaseOperatorLink):
     name = 'Triggered DAG'
 
     def get_link(self, operator, dttm):
-        query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()}
+        # Fetch the correct execution date for the triggerED dag which is
+        # stored in xcom during execution of the triggerING task.
+        trigger_execution_date_iso = XCom.get_one(
+            execution_date=dttm, key=XCOM_EXECUTION_DATE_ISO, task_id=operator.task_id, dag_id=operator.dag_id
+        )
+
+        query = {"dag_id": operator.trigger_dag_id, "base_date": trigger_execution_date_iso}
         return build_airflow_url_with_query(query)
 
 
@@ -140,6 +150,7 @@ class TriggerDagRunOperator(BaseOperator):
                 execution_date=self.execution_date,
                 replace_microseconds=False,
             )
+
         except DagRunAlreadyExists as e:
             if self.reset_dag_run:
                 self.log.info("Clearing %s on %s", self.trigger_dag_id, self.execution_date)
@@ -157,6 +168,12 @@ class TriggerDagRunOperator(BaseOperator):
             else:
                 raise e
 
+        # Store the execution date from the dag run (either created or found above) to
+        # be used when creating the extra link on the webserver.
+        ti = context['task_instance']
+        ti.xcom_push(key=XCOM_EXECUTION_DATE_ISO, value=dag_run.execution_date.isoformat())
+        ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)
+
         if self.wait_for_completion:
             # wait for dag to complete
             while True:
diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py
index 9ff8735..1934c4d 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -19,7 +19,7 @@
 import pathlib
 import tempfile
 from datetime import datetime
-from unittest import TestCase
+from unittest import TestCase, mock
 
 import pytest
 
@@ -76,6 +76,25 @@ class TestDagRunOperator(TestCase):
 
         pathlib.Path(self._tmpfile).unlink()
 
+    @mock.patch('airflow.operators.trigger_dagrun.build_airflow_url_with_query')
+    def assert_extra_link(self, triggering_exec_date, triggered_dag_run, triggering_task, mock_build_url):
+        """
+        Asserts whether the correct extra links url will be created.
+
+        Specifically it tests whether the correct dag id and date are passed to
+        the method which constructs the final url.
+        Note: We can't run that method to generate the url itself because the Flask app context
+        isn't available within the test logic, so it is mocked here.
+        """
+        triggering_task.get_extra_links(triggering_exec_date, 'Triggered DAG')
+        assert mock_build_url.called
+        args, _ = mock_build_url.call_args
+        expected_args = {
+            'dag_id': triggered_dag_run.dag_id,
+            'base_date': triggered_dag_run.execution_date.isoformat(),
+        }
+        assert expected_args in args
+
     def test_trigger_dagrun(self):
         """Test TriggerDagRunOperator."""
         task = TriggerDagRunOperator(task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, dag=self.dag)
@@ -84,7 +103,9 @@ class TestDagRunOperator(TestCase):
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 1
-            assert dagruns[0].external_trigger
+            triggered_dag_run = dagruns[0]
+            assert triggered_dag_run.external_trigger
+            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
 
     def test_trigger_dagrun_custom_run_id(self):
         task = TriggerDagRunOperator(
@@ -114,8 +135,10 @@ class TestDagRunOperator(TestCase):
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 1
-            assert dagruns[0].external_trigger
-            assert dagruns[0].execution_date == utc_now
+            triggered_dag_run = dagruns[0]
+            assert triggered_dag_run.external_trigger
+            assert triggered_dag_run.execution_date == utc_now
+            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
 
     def test_trigger_dagrun_twice(self):
         """Test TriggerDagRunOperator with custom execution_date."""
@@ -140,12 +163,14 @@ class TestDagRunOperator(TestCase):
             )
             session.add(dag_run)
             session.commit()
-            task.execute(None)
+            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
             dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 1
-            assert dagruns[0].external_trigger
-            assert dagruns[0].execution_date == utc_now
+            triggered_dag_run = dagruns[0]
+            assert triggered_dag_run.external_trigger
+            assert triggered_dag_run.execution_date == utc_now
+            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
 
     def test_trigger_dagrun_with_templated_execution_date(self):
         """Test TriggerDagRunOperator with templated execution_date."""
@@ -160,8 +185,10 @@ class TestDagRunOperator(TestCase):
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 1
-            assert dagruns[0].external_trigger
-            assert dagruns[0].execution_date == DEFAULT_DATE
+            triggered_dag_run = dagruns[0]
+            assert triggered_dag_run.external_trigger
+            assert triggered_dag_run.execution_date == DEFAULT_DATE
+            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
 
     def test_trigger_dagrun_operator_conf(self):
         """Test passing conf to the triggered DagRun."""
@@ -288,7 +315,9 @@ class TestDagRunOperator(TestCase):
                 .all()
             )
             assert len(dagruns) == 2
-            assert dagruns[1].state == State.QUEUED
+            triggered_dag_run = dagruns[1]
+            assert triggered_dag_run.state == State.QUEUED
+            self.assert_extra_link(execution_date, triggered_dag_run, task)
 
     def test_trigger_dagrun_triggering_itself_with_execution_date(self):
         """Test TriggerDagRunOperator that triggers itself with execution date,

[airflow] 32/43: Fix mismatch in generated run_id and logical date of DAG run (#18707)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1c2340558b96cde92390bf1b4dc9483236675e18
Author: David Caron <dc...@gmail.com>
AuthorDate: Thu Feb 3 21:14:19 2022 -0500

    Fix mismatch in generated run_id and logical date of DAG run (#18707)
    
    Co-authored-by: Tzu-ping Chung <tp...@astronomer.io>
    Co-authored-by: Jed Cunningham <je...@apache.org>
    (cherry picked from commit 1f08d281632670aef1de8dfc62c9f63aeec18760)
---
 airflow/operators/trigger_dagrun.py    | 20 +++++++++-----------
 tests/operators/test_trigger_dagrun.py | 25 ++++++++++++-------------
 2 files changed, 21 insertions(+), 24 deletions(-)

diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index a346db1..7dae196 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -115,13 +115,13 @@ class TriggerDagRunOperator(BaseOperator):
         self.allowed_states = allowed_states or [State.SUCCESS]
         self.failed_states = failed_states or [State.FAILED]
 
-        if not isinstance(execution_date, (str, datetime.datetime, type(None))):
+        if execution_date is not None and not isinstance(execution_date, (str, datetime.datetime)):
             raise TypeError(
                 "Expected str or datetime.datetime type for execution_date."
                 "Got {}".format(type(execution_date))
             )
 
-        self.execution_date: Optional[datetime.datetime] = execution_date  # type: ignore
+        self.execution_date = execution_date
 
         try:
             json.dumps(self.conf)
@@ -130,30 +130,28 @@ class TriggerDagRunOperator(BaseOperator):
 
     def execute(self, context: Dict):
         if isinstance(self.execution_date, datetime.datetime):
-            execution_date = self.execution_date
+            parsed_execution_date = self.execution_date
         elif isinstance(self.execution_date, str):
-            execution_date = timezone.parse(self.execution_date)
-            self.execution_date = execution_date
+            parsed_execution_date = timezone.parse(self.execution_date)
         else:
-            execution_date = timezone.utcnow()
+            parsed_execution_date = timezone.utcnow()
 
         if self.trigger_run_id:
             run_id = self.trigger_run_id
         else:
-            run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
-
+            run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_execution_date)
         try:
             dag_run = trigger_dag(
                 dag_id=self.trigger_dag_id,
                 run_id=run_id,
                 conf=self.conf,
-                execution_date=self.execution_date,
+                execution_date=parsed_execution_date,
                 replace_microseconds=False,
             )
 
         except DagRunAlreadyExists as e:
             if self.reset_dag_run:
-                self.log.info("Clearing %s on %s", self.trigger_dag_id, self.execution_date)
+                self.log.info("Clearing %s on %s", self.trigger_dag_id, parsed_execution_date)
 
                 # Get target dag object and call clear()
 
@@ -163,7 +161,7 @@ class TriggerDagRunOperator(BaseOperator):
 
                 dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
                 dag = dag_bag.get_dag(self.trigger_dag_id)
-                dag.clear(start_date=self.execution_date, end_date=self.execution_date)
+                dag.clear(start_date=parsed_execution_date, end_date=parsed_execution_date)
                 dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
             else:
                 raise e
diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py
index 1934c4d..180781e 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -30,6 +30,7 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State
+from airflow.utils.types import DagRunType
 
 DEFAULT_DATE = datetime(2019, 1, 1, tzinfo=timezone.utc)
 TEST_DAG_ID = "testdag"
@@ -101,11 +102,10 @@ class TestDagRunOperator(TestCase):
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         with create_session() as session:
-            dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
-            assert len(dagruns) == 1
-            triggered_dag_run = dagruns[0]
-            assert triggered_dag_run.external_trigger
-            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
+            dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one()
+            assert dagrun.external_trigger
+            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.execution_date)
+            self.assert_extra_link(DEFAULT_DATE, dagrun, task)
 
     def test_trigger_dagrun_custom_run_id(self):
         task = TriggerDagRunOperator(
@@ -123,22 +123,21 @@ class TestDagRunOperator(TestCase):
 
     def test_trigger_dagrun_with_execution_date(self):
         """Test TriggerDagRunOperator with custom execution_date."""
-        utc_now = timezone.utcnow()
+        custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
         task = TriggerDagRunOperator(
             task_id="test_trigger_dagrun_with_execution_date",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=utc_now,
+            execution_date=custom_execution_date,
             dag=self.dag,
         )
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         with create_session() as session:
-            dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
-            assert len(dagruns) == 1
-            triggered_dag_run = dagruns[0]
-            assert triggered_dag_run.external_trigger
-            assert triggered_dag_run.execution_date == utc_now
-            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
+            dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one()
+            assert dagrun.external_trigger
+            assert dagrun.execution_date == custom_execution_date
+            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date)
+            self.assert_extra_link(DEFAULT_DATE, dagrun, task)
 
     def test_trigger_dagrun_twice(self):
         """Test TriggerDagRunOperator with custom execution_date."""

[airflow] 21/43: Augment xcom docs (#20755)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 88900872e642903eff59c82f319d5c137ff5d5db
Author: Lewis John McGibbney <le...@gmail.com>
AuthorDate: Thu Feb 3 08:50:25 2022 -0800

    Augment xcom docs (#20755)
    
    (cherry picked from commit 40d3a76a9bce2360b951f2e990cba571c5f51a76)
---
 docs/apache-airflow/concepts/xcoms.rst | 40 +++++++++++++++++++++++++++++++---
 1 file changed, 37 insertions(+), 3 deletions(-)

diff --git a/docs/apache-airflow/concepts/xcoms.rst b/docs/apache-airflow/concepts/xcoms.rst
index eb11ff7..57b9e54 100644
--- a/docs/apache-airflow/concepts/xcoms.rst
+++ b/docs/apache-airflow/concepts/xcoms.rst
@@ -42,8 +42,8 @@ XComs are a relative of :doc:`variables`, with the main difference being that XC
 
   Note: If the first task run is not succeeded then on every retry task XComs will be cleared to make the task run idempotent.
 
-Custom Backends
----------------
+Custom XCom Backends
+--------------------
 
 The XCom system has interchangeable backends, and you can set which backend is being used via the ``xcom_backend`` configuration option.
 
@@ -51,4 +51,38 @@ If you want to implement your own backend, you should subclass :class:`~airflow.
 
 There is also an ``orm_deserialize_value`` method that is called whenever the XCom objects are rendered for UI or reporting purposes; if you have large or expensive-to-retrieve values in your XComs, you should override this method to avoid calling that code (and instead return a lighter, incomplete representation) so the UI remains responsive.
 
-You can also override the ``clear`` method and use it when clearing results for given dags and tasks. This allows the custom XCom backend process the data lifecycle easier.
+You can also override the ``clear`` method and use it when clearing results for given dags and tasks. This allows the custom XCom backend to process the data lifecycle easier.
+
+Working with Custom XCom Backends in Containers
+-----------------------------------------------
+
+Depending on where Airflow is deployed i.e., local, Docker, K8s, etc. it can be useful to be assured that a custom XCom backend is actually being initialized. For example, the complexity of the container environment can make it more difficult to determine if your backend is being loaded correctly during container deployment. Luckily the following guidance can be used to assist you in building confidence in your custom XCom implementation.
+
+Firstly, if you can exec into a terminal in the container then you should be able to do:
+
+.. code-block:: python
+
+    from airflow.models.xcom import XCom
+
+    print(XCom.__name__)
+
+which will print the actual class that is being used.
+
+You can also examine Airflow's configuration:
+
+.. code-block:: python
+
+    from airflow.settings import conf
+
+    conf.get("core", "xcom_backend")
+
+Working with Custom Backends in K8s via Helm
+--------------------------------------------
+
+Running custom XCom backends in K8s will introduce even more complexity to you Airflow deployment. Put simply, sometimes things go wrong which can be difficult to debug.
+
+For example, if you define a custom XCom backend in the Chart ``values.yaml`` (via the ``xcom_backend`` configuration) and Airflow fails to load the class, the entire Chart deployment will fail with each pod container attempting to restart time and time again.
+
+When deploying in K8s your custom XCom backend needs to be reside in a ``config`` directory otherwise it cannot be located during Chart deployment.
+
+An observed problem is that it is very difficult to acquire logs from the container because there is a very small window of availability where the trace can be obtained. The only way you can determine the root cause is if you are fortunate enough to query and acquire the container logs at the right time. This in turn prevents the entire Helm chart from deploying successfully.

[airflow] 19/43: fןס Broken link in api.rst (#21165)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ede6d8f1d5aa7d34368df6e03c0890fba3f20b9f
Author: fritz-astronomer <80...@users.noreply.github.com>
AuthorDate: Tue Feb 1 14:03:46 2022 -0500

    fןס Broken link in api.rst (#21165)
    
    Co-authored-by: eladkal <45...@users.noreply.github.com>
    (cherry picked from commit 817bec0417b291326dfd760bd85439b3ba0a728d)
---
 docs/apache-airflow/security/api.rst | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/docs/apache-airflow/security/api.rst b/docs/apache-airflow/security/api.rst
index d7de78a..48a5b5a 100644
--- a/docs/apache-airflow/security/api.rst
+++ b/docs/apache-airflow/security/api.rst
@@ -80,8 +80,7 @@ principal exists in the keytab file.
 Basic authentication
 ''''''''''''''''''''
 
-`Basic username password authentication <https://tools.ietf.org/html/rfc7617
-https://en.wikipedia.org/wiki/Basic_access_authentication>`_ is currently
+`Basic username password authentication <https://en.wikipedia.org/wiki/Basic_access_authentication>`_ is currently
 supported for the API. This works for users created through LDAP login or
 within Airflow Metadata DB using password.