You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/01/28 21:25:13 UTC

[airflow] branch v2-2-test updated (5ad965d -> ec60dd7)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 5ad965d  Avoid unintentional data loss when deleting DAGs (#20758)
 discard 2785e19  Deprecate some functions in the experimental API (#19931)
 discard 2a934b3  Fix session usage in ``/rendered-k8s`` view (#21006)
 discard 91ec7b2  Helper for provide_session-decorated functions (#20104)
 discard 25c515e  Type-annotate SkipMixin and BaseXCom (#20011)
 discard b7438f7  Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)
 discard 753556f  Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)
 discard 4dff21d  Update `version_added` for `[email] from_email` (#21138)
 discard 000145b  Improved instructions for custom image build with docker compose (#21052)
 discard fb9a45e  Add back legacy .piprc customization for pip (#21124)
 discard 46f7508  Update logging-tasks.rst (#21088)
 discard 53d89b8  name mismatch (#21055)
 discard 40960c2  Update v1.yaml (#21024)
 discard 512a10a  Return to the same place when triggering a DAG (#20955)
 discard e671b92  Add downgrade to some FAB migrations (#20874)
     new ba17fa3  Return to the same place when triggering a DAG (#20955)
     new 84ef8d5  Update v1.yaml (#21024)
     new 8eeb3ea  name mismatch (#21055)
     new abf8c03  Update logging-tasks.rst (#21088)
     new 243f44d  Add back legacy .piprc customization for pip (#21124)
     new 4633cf3  Improved instructions for custom image build with docker compose (#21052)
     new f02ae31  Update `version_added` for `[email] from_email` (#21138)
     new c347d80  Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)
     new 5fa0e13  Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)
     new 0cc934c  Type-annotate SkipMixin and BaseXCom (#20011)
     new d572161  Helper for provide_session-decorated functions (#20104)
     new 928e095  Fix session usage in ``/rendered-k8s`` view (#21006)
     new c0fbf27  Deprecate some functions in the experimental API (#19931)
     new 0740c08  Avoid unintentional data loss when deleting DAGs (#20758)
     new 762abfb  Removed duplicated dag_run join in Dag.get_task_instances() (#20591)
     new 30b0d98  bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)
     new ec60dd7  Handle stuck queued tasks in Celery for db backend(#19769)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (5ad965d)
            \
             N -- N -- N   refs/heads/v2-2-test (ec60dd7)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/api/common/experimental/mark_tasks.py      | 121 +++++++++----
 airflow/config_templates/config.yml                |   7 +
 airflow/config_templates/default_airflow.cfg       |   3 +
 airflow/executors/celery_executor.py               |  52 ++++++
 .../2c6edca13270_resource_based_permissions.py     |  29 ---
 .../849da589634d_prefix_dag_permissions.py         |  70 +-------
 ...ad25_resource_based_permissions_for_default_.py |  29 ---
 airflow/models/dag.py                              |   1 -
 tests/executors/test_celery_executor.py            | 197 +++++++++++++++++++--
 9 files changed, 328 insertions(+), 181 deletions(-)

[airflow] 04/17: Update logging-tasks.rst (#21088)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit abf8c03da3d17618c2c3dd3c205e84769786530e
Author: caxefaizan <63...@users.noreply.github.com>
AuthorDate: Wed Jan 26 04:15:58 2022 +0530

    Update logging-tasks.rst (#21088)
    
    (cherry picked from commit 156284650f20bad131f26b91061e207e2e39253e)
---
 docs/apache-airflow/logging-monitoring/logging-tasks.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/logging-monitoring/logging-tasks.rst
index 13cb248..e64905f 100644
--- a/docs/apache-airflow/logging-monitoring/logging-tasks.rst
+++ b/docs/apache-airflow/logging-monitoring/logging-tasks.rst
@@ -122,7 +122,7 @@ Serving logs from workers
 
 Most task handlers send logs upon completion of a task. In order to view logs in real time, Airflow automatically starts an HTTP server to serve the logs in the following cases:
 
-- If ``SchedulerExecutor`` or ``LocalExecutor`` is used, then when ``airflow scheduler`` is running.
+- If ``SequentialExecutor`` or ``LocalExecutor`` is used, then when ``airflow scheduler`` is running.
 - If ``CeleryExecutor`` is used, then when ``airflow worker`` is running.
 
 The server is running on the port specified by ``worker_log_server_port`` option in ``[logging]`` section. By default, it is ``8793``.

[airflow] 11/17: Helper for provide_session-decorated functions (#20104)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d5721619bc970e42abdc474362fcf9c151bc33a8
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Dec 7 21:50:34 2021 +0800

    Helper for provide_session-decorated functions (#20104)
    
    * Helper for provide_session-decorated functions
    
    * Apply NEW_SESSION trick on XCom
    
    (cherry picked from commit a80ac1eecc0ea187de7984510b4ef6f981b97196)
---
 airflow/models/xcom.py   | 24 ++++++++++++------------
 airflow/settings.py      | 10 ++++++----
 airflow/utils/session.py | 11 +++++++++--
 3 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 4bb9689..5efaa0a 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -32,7 +32,7 @@ from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
 from airflow.utils import timezone
 from airflow.utils.helpers import is_container
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import provide_session
+from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
 
 log = logging.getLogger(__name__)
@@ -90,7 +90,7 @@ class BaseXCom(Base, LoggingMixin):
         dag_id: str,
         task_id: str,
         run_id: str,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> None:
         """Store an XCom value.
 
@@ -116,7 +116,7 @@ class BaseXCom(Base, LoggingMixin):
         task_id: str,
         dag_id: str,
         execution_date: datetime.datetime,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> None:
         """:sphinx-autoapi-skip:"""
 
@@ -129,7 +129,7 @@ class BaseXCom(Base, LoggingMixin):
         task_id: str,
         dag_id: str,
         execution_date: Optional[datetime.datetime] = None,
-        session: Session = None,
+        session: Session = NEW_SESSION,
         *,
         run_id: Optional[str] = None,
     ) -> None:
@@ -170,7 +170,7 @@ class BaseXCom(Base, LoggingMixin):
         task_id: Optional[str] = None,
         dag_id: Optional[str] = None,
         include_prior_dates: bool = False,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> Optional[Any]:
         """Retrieve an XCom value, optionally meeting certain criteria.
 
@@ -207,7 +207,7 @@ class BaseXCom(Base, LoggingMixin):
         task_id: Optional[str] = None,
         dag_id: Optional[str] = None,
         include_prior_dates: bool = False,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> Optional[Any]:
         """:sphinx-autoapi-skip:"""
 
@@ -220,7 +220,7 @@ class BaseXCom(Base, LoggingMixin):
         task_id: Optional[Union[str, Iterable[str]]] = None,
         dag_id: Optional[Union[str, Iterable[str]]] = None,
         include_prior_dates: bool = False,
-        session: Session = None,
+        session: Session = NEW_SESSION,
         *,
         run_id: Optional[str] = None,
     ) -> Optional[Any]:
@@ -265,7 +265,7 @@ class BaseXCom(Base, LoggingMixin):
         dag_ids: Union[str, Iterable[str], None] = None,
         include_prior_dates: bool = False,
         limit: Optional[int] = None,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> Query:
         """Composes a query to get one or more XCom entries.
 
@@ -300,7 +300,7 @@ class BaseXCom(Base, LoggingMixin):
         dag_ids: Union[str, Iterable[str], None] = None,
         include_prior_dates: bool = False,
         limit: Optional[int] = None,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> Query:
         """:sphinx-autoapi-skip:"""
 
@@ -314,7 +314,7 @@ class BaseXCom(Base, LoggingMixin):
         dag_ids: Optional[Union[str, Iterable[str]]] = None,
         include_prior_dates: bool = False,
         limit: Optional[int] = None,
-        session: Session = None,
+        session: Session = NEW_SESSION,
         *,
         run_id: Optional[str] = None,
     ) -> Query:
@@ -397,7 +397,7 @@ class BaseXCom(Base, LoggingMixin):
         execution_date: pendulum.DateTime,
         dag_id: str,
         task_id: str,
-        session: Optional[Session] = None,
+        session: Session = NEW_SESSION,
     ) -> None:
         """:sphinx-autoapi-skip:"""
 
@@ -409,7 +409,7 @@ class BaseXCom(Base, LoggingMixin):
         dag_id: Optional[str] = None,
         task_id: Optional[str] = None,
         run_id: Optional[str] = None,
-        session: Session = None,
+        session: Session = NEW_SESSION,
     ) -> None:
         """:sphinx-autoapi-skip:"""
         # Given the historic order of this function (execution_date was first argument) to add a new optional
diff --git a/airflow/settings.py b/airflow/settings.py
index f9b97a2..139d6a4 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -22,7 +22,7 @@ import logging
 import os
 import sys
 import warnings
-from typing import Optional
+from typing import TYPE_CHECKING, Callable, List, Optional
 
 import pendulum
 import sqlalchemy
@@ -37,6 +37,9 @@ from airflow.executors import executor_constants
 from airflow.logging_config import configure_logging
 from airflow.utils.orm_event_handlers import setup_event_handlers
 
+if TYPE_CHECKING:
+    from airflow.www.utils import UIAlert
+
 log = logging.getLogger(__name__)
 
 
@@ -77,7 +80,7 @@ DONOT_MODIFY_HANDLERS: Optional[bool] = None
 DAGS_FOLDER: str = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
 
 engine: Optional[Engine] = None
-Session: Optional[SASession] = None
+Session: Callable[..., SASession]
 
 # The JSON library to use for DAG Serialization and De-Serialization
 json = json
@@ -563,8 +566,7 @@ MASK_SECRETS_IN_LOGS = False
 #       UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True),
 #   ]
 #
-# DASHBOARD_UIALERTS: List["UIAlert"]
-DASHBOARD_UIALERTS = []
+DASHBOARD_UIALERTS: List["UIAlert"] = []
 
 # Prefix used to identify tables holding data moved during migration.
 AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
diff --git a/airflow/utils/session.py b/airflow/utils/session.py
index 9636fc4..f0c3168 100644
--- a/airflow/utils/session.py
+++ b/airflow/utils/session.py
@@ -18,7 +18,7 @@
 import contextlib
 from functools import wraps
 from inspect import signature
-from typing import Callable, Iterator, TypeVar
+from typing import Callable, Iterator, TypeVar, cast
 
 from airflow import settings
 
@@ -26,7 +26,7 @@ from airflow import settings
 @contextlib.contextmanager
 def create_session() -> Iterator[settings.SASession]:
     """Contextmanager that will create and teardown a session."""
-    session: settings.SASession = settings.Session()
+    session = settings.Session()
     try:
         yield session
         session.commit()
@@ -105,3 +105,10 @@ def create_global_lock(session=None, pg_lock_id=1, lock_name='init', mysql_lock_
         if dialect.name == 'mssql':
             # TODO: make locking works for MSSQL
             pass
+
+
+# A fake session to use in functions decorated by provide_session. This allows
+# the 'session' argument to be of type Session instead of Optional[Session],
+# making it easier to type hint the function body without dealing with the None
+# case that can never happen at runtime.
+NEW_SESSION: settings.SASession = cast(settings.SASession, None)

[airflow] 12/17: Fix session usage in ``/rendered-k8s`` view (#21006)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 928e09525823ce48a83e79791cce200aa3d1d639
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Fri Jan 21 21:44:40 2022 +0800

    Fix session usage in ``/rendered-k8s`` view (#21006)
    
    We can't commit the session too early because later functions need that
    session to fetch related objects.
    
    Fix #20534.
    
    (cherry picked from commit a665f48b606065977e0d3952bc74635ce11726d1)
---
 airflow/www/views.py | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 9b868f3..2182a17 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -84,7 +84,7 @@ from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
 from sqlalchemy import Date, and_, desc, func, inspect, union_all
 from sqlalchemy.exc import IntegrityError
-from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import Session, joinedload
 from wtforms import SelectField, validators
 from wtforms.validators import InputRequired
 
@@ -116,7 +116,7 @@ from airflow.utils.docs import get_doc_url_for_provider, get_docs_url
 from airflow.utils.helpers import alchemy_to_dict
 from airflow.utils.log import secrets_masker
 from airflow.utils.log.log_reader import TaskLogReader
-from airflow.utils.session import create_session, provide_session
+from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import State
 from airflow.utils.strings import to_boolean
 from airflow.version import version
@@ -1124,7 +1124,8 @@ class Airflow(AirflowBaseView):
         ]
     )
     @action_logging
-    def rendered_k8s(self):
+    @provide_session
+    def rendered_k8s(self, session: Session = NEW_SESSION):
         """Get rendered k8s yaml."""
         if not settings.IS_K8S_OR_K8SCELERY_EXECUTOR:
             abort(404)
@@ -1135,14 +1136,15 @@ class Airflow(AirflowBaseView):
         form = DateTimeForm(data={'execution_date': dttm})
         root = request.args.get('root', '')
         logging.info("Retrieving rendered templates.")
-        dag = current_app.dag_bag.get_dag(dag_id)
+
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
         task = dag.get_task(task_id)
-        dag_run = dag.get_dagrun(execution_date=dttm)
-        ti = dag_run.get_task_instance(task_id=task.task_id)
+        dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+        ti = dag_run.get_task_instance(task_id=task.task_id, session=session)
 
         pod_spec = None
         try:
-            pod_spec = ti.get_rendered_k8s_spec()
+            pod_spec = ti.get_rendered_k8s_spec(session=session)
         except AirflowException as e:
             msg = "Error rendering Kubernetes POD Spec: " + escape(e)
             if e.__cause__:

[airflow] 06/17: Improved instructions for custom image build with docker compose (#21052)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4633cf3f6f2189c9c90c91d563e44bbbaaec32ba
Author: Omer Ginosar <94...@users.noreply.github.com>
AuthorDate: Tue Jan 25 23:51:16 2022 +0200

    Improved instructions for custom image build with docker compose (#21052)
    
    * Create build.rst
    
    * Update docs/docker-stack/build.rst
    
    Co-authored-by: Jarek Potiuk <ja...@potiuk.com>
    
    * fix doc build
    
    Co-authored-by: Jarek Potiuk <ja...@potiuk.com>
    Co-authored-by: eladkal <45...@users.noreply.github.com>
    (cherry picked from commit 17b48e5baf09a86ea6e2036c864a882bb0c328e2)
---
 docs/docker-stack/build.rst | 19 +++++++++++++++++--
 docs/spelling_wordlist.txt  |  1 +
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/docs/docker-stack/build.rst b/docs/docker-stack/build.rst
index b85bf1c..6b5dc47 100644
--- a/docs/docker-stack/build.rst
+++ b/docs/docker-stack/build.rst
@@ -81,8 +81,23 @@ In the simplest case building your image consists of those steps:
 
 4) Once you build the image locally you have usually several options to make them available for your deployment:
 
-* For ``docker-compose`` deployment, that's all you need. The image is stored in docker engine cache
-  and docker compose will use it from there.
+* For ``docker-compose`` deployment, if you've already built your image, and want to continue
+  building the image manually when needed with ``docker build``, you can edit the
+  docker-compose.yaml and replace the "apache/airflow:<version>" image with the
+  image you've just built ``my-image:0.0.1`` - it will be used from your local Docker
+  Engine cache. You can also simply set ``AIRFLOW_IMAGE_NAME`` variable to
+  point to your image and ``docker-compose`` will use it automatically without having
+  to modify the file.
+
+* Also for ``docker-compose`` deployment, you can delegate image building to the docker-compose.
+  To do that - open your ``docker-compose.yaml`` file and search for the phrase "In order to add custom dependencies".
+  Follow these instructions of commenting the "image" line and uncommenting the "build" line.
+  This is a standard docker-compose feature and you can read about it in
+  `Docker Compose build reference <https://docs.docker.com/compose/reference/build/>`_.
+  Run ``docker-compose build`` to build the images. Similarly as in the previous case, the
+  image is stored in Docker engine cache and Docker Compose will use it from there.
+  The ``docker-compose build`` command uses the same ``docker build`` command that
+  you can run manually under-the-hood.
 
 * For some - development targeted - Kubernetes deployments you can load the images directly to
   Kubernetes clusters. Clusters such as ``kind`` or ``minikube`` have dedicated ``load`` method to load the
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 64d839f..5d77e29 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1384,6 +1384,7 @@ uid
 umask
 un
 unarchived
+uncommenting
 undead
 ungenerated
 unicode

[airflow] 14/17: Avoid unintentional data loss when deleting DAGs (#20758)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0740c08da7ef5b8d98bbd09f473724816ab1e61c
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Mon Jan 10 11:55:51 2022 -0800

    Avoid unintentional data loss when deleting DAGs (#20758)
    
    (cherry picked from commit 5980d2b05eee484256c634d5efae9410265c65e9)
---
 airflow/api/common/delete_dag.py    | 18 +++++++++++++++---
 tests/api/common/test_delete_dag.py | 14 ++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py
index c448127..5e0afa8 100644
--- a/airflow/api/common/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -18,7 +18,7 @@
 """Delete DAGs APIs."""
 import logging
 
-from sqlalchemy import or_
+from sqlalchemy import and_, or_
 
 from airflow import models
 from airflow.exceptions import AirflowException, DagNotFound
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also delete all of its subdags
+    dags_to_delete_query = session.query(DagModel.dag_id).filter(
+        or_(
+            DagModel.dag_id == dag_id,
+            and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
+        )
+    )
+    dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]
+
     # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
     # There may be a lag, so explicitly removes serialized DAG here.
     if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
@@ -65,8 +74,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
         if hasattr(model, "dag_id"):
             if keep_records_in_log and model.__name__ == 'Log':
                 continue
-            cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
-            count += session.query(model).filter(cond).delete(synchronize_session='fetch')
+            count += (
+                session.query(model)
+                .filter(model.dag_id.in_(dags_to_delete))
+                .delete(synchronize_session='fetch')
+            )
     if dag.is_subdag:
         parent_dag_id, task_id = dag_id.rsplit(".", 1)
         for model in TaskFail, models.TaskInstance:
diff --git a/tests/api/common/test_delete_dag.py b/tests/api/common/test_delete_dag.py
index 0eb058a..d9dc0b0 100644
--- a/tests/api/common/test_delete_dag.py
+++ b/tests/api/common/test_delete_dag.py
@@ -162,3 +162,17 @@ class TestDeleteDAGSuccessfulDelete:
         self.check_dag_models_exists()
         delete_dag(dag_id=self.key, keep_records_in_log=False)
         self.check_dag_models_removed(expect_logs=0)
+
+    def test_delete_dag_preserves_other_dags(self):
+
+        self.setup_dag_models()
+
+        with create_session() as session:
+            session.add(DM(dag_id=self.key + ".other_dag", fileloc=self.dag_file_path))
+            session.add(DM(dag_id=self.key + ".subdag", fileloc=self.dag_file_path, is_subdag=True))
+
+        delete_dag(self.key)
+
+        with create_session() as session:
+            assert session.query(DM).filter(DM.dag_id == self.key + ".other_dag").count() == 1
+            assert session.query(DM).filter(DM.dag_id.like(self.key + "%")).count() == 1

[airflow] 02/17: Update v1.yaml (#21024)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 84ef8d52f2186c12204650b9f8aa9c38e8e2f810
Author: Ilia Lazebnik <Il...@gmail.com>
AuthorDate: Sun Jan 23 21:29:52 2022 +0200

    Update v1.yaml (#21024)
    
    (cherry picked from commit 2af0f700857cbf7401d930ff24cdff273b501beb)
---
 airflow/api_connexion/openapi/v1.yaml | 2 --
 1 file changed, 2 deletions(-)

diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 3669c66..e7553a3 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2161,8 +2161,6 @@ components:
 
             The value of this field can be set only when creating the object. If you try to modify the
             field of an existing object, the request fails with an BAD_REQUEST error.
-      required:
-        - dag_id
 
     UpdateDagRunState:
       type: object

[airflow] 15/17: Removed duplicated dag_run join in Dag.get_task_instances() (#20591)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 762abfbc690d4cf9e15792b0d99fac16571b24a8
Author: hubert-pietron <94...@users.noreply.github.com>
AuthorDate: Thu Jan 27 06:20:17 2022 +0100

    Removed duplicated dag_run join in Dag.get_task_instances() (#20591)
    
    Co-authored-by: hubert-pietron <hu...@gmail.com>
    (cherry picked from commit 960f573615b5357677c10bd9f7ec11811a0355c6)
---
 airflow/models/dag.py | 1 -
 1 file changed, 1 deletion(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 2a08d26..477e597 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1343,7 +1343,6 @@ class DAG(LoggingMixin):
                 as_pk_tuple=False,
                 session=session,
             )
-            .join(TaskInstance.dag_run)
             .order_by(DagRun.execution_date)
             .all()
         )

[airflow] 17/17: Handle stuck queued tasks in Celery for db backend(#19769)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ec60dd799d65bdb80b83893db7df215d98342dde
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Fri Jan 14 09:55:15 2022 +0100

    Handle stuck queued tasks in Celery for db backend(#19769)
    
    Move the state of stuck queued tasks in Celery to Scheduled so that
    the Scheduler can queue them again. Only applies to DatabaseBackend
    
    (cherry picked from commit 14ee831c7ad767e31a3aeccf3edbc519b3b8c923)
---
 airflow/config_templates/config.yml          |   7 +
 airflow/config_templates/default_airflow.cfg |   3 +
 airflow/executors/celery_executor.py         |  52 +++++++
 tests/executors/test_celery_executor.py      | 197 ++++++++++++++++++++++++---
 4 files changed, 242 insertions(+), 17 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 6941f03..e061568 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1663,6 +1663,13 @@
       type: string
       example: ~
       default: "False"
+    - name: stuck_queued_task_check_interval
+      description: |
+        How often to check for stuck queued task (in seconds)
+      version_added: 2.3.0
+      type: integer
+      example: ~
+      default: "300"
 - name: celery_broker_transport_options
   description: |
     This section is for specifying options which can be passed to the
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 6a5449b..4024922 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -830,6 +830,9 @@ task_publish_max_retries = 3
 # Worker initialisation check to validate Metadata Database connection
 worker_precheck = False
 
+# How often to check for stuck queued task (in seconds)
+stuck_queued_task_check_interval = 300
+
 [celery_broker_transport_options]
 
 # This section is for specifying options which can be passed to the
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index f257b0c..8daced6 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -40,6 +40,7 @@ from celery.backends.database import DatabaseBackend, Task as TaskDb, session_cl
 from celery.result import AsyncResult
 from celery.signals import import_modules as celery_import_modules
 from setproctitle import setproctitle
+from sqlalchemy.orm.session import Session
 
 import airflow.settings as settings
 from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
@@ -50,6 +51,7 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
 from airflow.stats import Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
+from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 from airflow.utils.timezone import utcnow
@@ -231,6 +233,10 @@ class CeleryExecutor(BaseExecutor):
         self.task_adoption_timeout = datetime.timedelta(
             seconds=conf.getint('celery', 'task_adoption_timeout', fallback=600)
         )
+        self.stuck_tasks_last_check_time: int = time.time()
+        self.stuck_queued_task_check_interval = conf.getint(
+            'celery', 'stuck_queued_task_check_interval', fallback=300
+        )
         self.task_publish_retries: Dict[TaskInstanceKey, int] = OrderedDict()
         self.task_publish_max_retries = conf.getint('celery', 'task_publish_max_retries', fallback=3)
 
@@ -335,6 +341,8 @@ class CeleryExecutor(BaseExecutor):
 
         if self.adopted_task_timeouts:
             self._check_for_stalled_adopted_tasks()
+        if time.time() - self.stuck_tasks_last_check_time > self.stuck_queued_task_check_interval:
+            self._clear_stuck_queued_tasks()
 
     def _check_for_stalled_adopted_tasks(self):
         """
@@ -375,6 +383,50 @@ class CeleryExecutor(BaseExecutor):
             for key in timedout_keys:
                 self.change_state(key, State.FAILED)
 
+    @provide_session
+    def _clear_stuck_queued_tasks(self, session: Session = NEW_SESSION) -> None:
+        """
+        Tasks can get stuck in queued state in DB while still not in
+        worker. This happens when the worker is autoscaled down and
+        the task is queued but has not been picked up by any worker prior to the scaling.
+
+        In such situation, we update the task instance state to scheduled so that
+        it can be queued again. We chose to use task_adoption_timeout to decide when
+        a queued task is considered stuck and should be reschelduled.
+        """
+        if not isinstance(app.backend, DatabaseBackend):
+            # We only want to do this for database backends where
+            # this case has been spotted
+            return
+        # We use this instead of using bulk_state_fetcher because we
+        # may not have the stuck task in self.tasks and we don't want
+        # to clear task in self.tasks too
+        session_ = app.backend.ResultSession()
+        task_cls = getattr(app.backend, "task_cls", TaskDb)
+        with session_cleanup(session_):
+            celery_task_ids = [t.task_id for t in session_.query(task_cls.task_id).all()]
+        self.log.debug("Checking for stuck queued tasks")
+
+        max_allowed_time = utcnow() - self.task_adoption_timeout
+
+        for task in session.query(TaskInstance).filter(
+            TaskInstance.state == State.QUEUED, TaskInstance.queued_dttm < max_allowed_time
+        ):
+            if task.key in self.queued_tasks or task.key in self.running:
+                continue
+
+            if task.external_executor_id in celery_task_ids:
+                # The task is still running in the worker
+                continue
+
+            self.log.info(
+                'TaskInstance: %s found in queued state for more than %s seconds, rescheduling',
+                task,
+                self.task_adoption_timeout.total_seconds(),
+            )
+            task.state = State.SCHEDULED
+            session.merge(task)
+
     def debug_dump(self) -> None:
         """Called in response to SIGUSR2 by the scheduler"""
         super().debug_dump()
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index db63b18..5632f7d 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -17,10 +17,13 @@
 # under the License.
 import contextlib
 import json
+import logging
 import os
 import signal
 import sys
+import time
 import unittest
+from collections import namedtuple
 from datetime import datetime, timedelta
 from unittest import mock
 
@@ -32,6 +35,7 @@ from celery.backends.base import BaseBackend, BaseKeyValueStoreBackend
 from celery.backends.database import DatabaseBackend
 from celery.contrib.testing.worker import start_worker
 from celery.result import AsyncResult
+from freezegun import freeze_time
 from kombu.asynchronous import set_event_loop
 from parameterized import parameterized
 
@@ -94,12 +98,12 @@ def _prepare_app(broker_url=None, execute=None):
             set_event_loop(None)
 
 
-class TestCeleryExecutor(unittest.TestCase):
-    def setUp(self) -> None:
+class TestCeleryExecutor:
+    def setup_method(self) -> None:
         db.clear_db_runs()
         db.clear_db_jobs()
 
-    def tearDown(self) -> None:
+    def teardown_method(self) -> None:
         db.clear_db_runs()
         db.clear_db_jobs()
 
@@ -196,10 +200,10 @@ class TestCeleryExecutor(unittest.TestCase):
     @pytest.mark.integration("redis")
     @pytest.mark.integration("rabbitmq")
     @pytest.mark.backend("mysql", "postgres")
-    def test_retry_on_error_sending_task(self):
+    def test_retry_on_error_sending_task(self, caplog):
         """Test that Airflow retries publishing tasks to Celery Broker at least 3 times"""
 
-        with _prepare_app(), self.assertLogs(celery_executor.log) as cm, mock.patch.object(
+        with _prepare_app(), caplog.at_level(logging.INFO), mock.patch.object(
             # Mock `with timeout()` to _instantly_ fail.
             celery_executor.timeout,
             "__enter__",
@@ -227,28 +231,19 @@ class TestCeleryExecutor(unittest.TestCase):
             assert dict(executor.task_publish_retries) == {key: 2}
             assert 1 == len(executor.queued_tasks), "Task should remain in queue"
             assert executor.event_buffer == {}
-            assert (
-                "INFO:airflow.executors.celery_executor.CeleryExecutor:"
-                f"[Try 1 of 3] Task Timeout Error for Task: ({key})." in cm.output
-            )
+            assert f"[Try 1 of 3] Task Timeout Error for Task: ({key})." in caplog.text
 
             executor.heartbeat()
             assert dict(executor.task_publish_retries) == {key: 3}
             assert 1 == len(executor.queued_tasks), "Task should remain in queue"
             assert executor.event_buffer == {}
-            assert (
-                "INFO:airflow.executors.celery_executor.CeleryExecutor:"
-                f"[Try 2 of 3] Task Timeout Error for Task: ({key})." in cm.output
-            )
+            assert f"[Try 2 of 3] Task Timeout Error for Task: ({key})." in caplog.text
 
             executor.heartbeat()
             assert dict(executor.task_publish_retries) == {key: 4}
             assert 1 == len(executor.queued_tasks), "Task should remain in queue"
             assert executor.event_buffer == {}
-            assert (
-                "INFO:airflow.executors.celery_executor.CeleryExecutor:"
-                f"[Try 3 of 3] Task Timeout Error for Task: ({key})." in cm.output
-            )
+            assert f"[Try 3 of 3] Task Timeout Error for Task: ({key})." in caplog.text
 
             executor.heartbeat()
             assert dict(executor.task_publish_retries) == {}
@@ -411,6 +406,174 @@ class TestCeleryExecutor(unittest.TestCase):
         assert executor.running == {key_2}
         assert executor.adopted_task_timeouts == {key_2: queued_dttm_2 + executor.task_adoption_timeout}
 
+    @pytest.mark.backend("mysql", "postgres")
+    @pytest.mark.parametrize(
+        "state, queued_dttm, executor_id",
+        [
+            (State.SCHEDULED, timezone.utcnow() - timedelta(days=2), '231'),
+            (State.QUEUED, timezone.utcnow(), '231'),
+            (State.QUEUED, timezone.utcnow(), None),
+        ],
+    )
+    def test_stuck_queued_tasks_are_cleared(
+        self, state, queued_dttm, executor_id, session, dag_maker, create_dummy_dag, create_task_instance
+    ):
+        """Test that clear_stuck_queued_tasks works"""
+        ti = create_task_instance(state=State.QUEUED)
+        ti.queued_dttm = queued_dttm
+        ti.external_executor_id = executor_id
+        session.merge(ti)
+        session.flush()
+        executor = celery_executor.CeleryExecutor()
+        executor._clear_stuck_queued_tasks()
+        session.flush()
+        ti = session.query(TaskInstance).filter(TaskInstance.task_id == ti.task_id).one()
+        assert ti.state == state
+
+    @pytest.mark.backend("mysql", "postgres")
+    def test_task_in_queued_tasks_dict_are_not_cleared(
+        self, session, dag_maker, create_dummy_dag, create_task_instance
+    ):
+        """Test that clear_stuck_queued_tasks doesn't clear tasks in executor.queued_tasks"""
+        ti = create_task_instance(state=State.QUEUED)
+        ti.queued_dttm = timezone.utcnow() - timedelta(days=2)
+        ti.external_executor_id = '231'
+        session.merge(ti)
+        session.flush()
+        executor = celery_executor.CeleryExecutor()
+        executor.queued_tasks = {ti.key: AsyncResult("231")}
+        executor._clear_stuck_queued_tasks()
+        session.flush()
+        ti = session.query(TaskInstance).filter(TaskInstance.task_id == ti.task_id).one()
+        assert executor.queued_tasks == {ti.key: AsyncResult("231")}
+        assert ti.state == State.QUEUED
+
+    @pytest.mark.backend("mysql", "postgres")
+    def test_task_in_running_dict_are_not_cleared(
+        self, session, dag_maker, create_dummy_dag, create_task_instance
+    ):
+        """Test that clear_stuck_queued_tasks doesn't clear tasks in executor.running"""
+        ti = create_task_instance(state=State.QUEUED)
+        ti.queued_dttm = timezone.utcnow() - timedelta(days=2)
+        ti.external_executor_id = '231'
+        session.merge(ti)
+        session.flush()
+        executor = celery_executor.CeleryExecutor()
+        executor.running = {ti.key: AsyncResult("231")}
+        executor._clear_stuck_queued_tasks()
+        session.flush()
+        ti = session.query(TaskInstance).filter(TaskInstance.task_id == ti.task_id).one()
+        assert executor.running == {ti.key: AsyncResult("231")}
+        assert ti.state == State.QUEUED
+
+    @pytest.mark.backend("mysql", "postgres")
+    def test_only_database_result_backend_supports_clearing_queued_task(
+        self, session, dag_maker, create_dummy_dag, create_task_instance
+    ):
+        with _prepare_app():
+            mock_backend = BaseKeyValueStoreBackend(app=celery_executor.app)
+            with mock.patch('airflow.executors.celery_executor.Celery.backend', mock_backend):
+                ti = create_task_instance(state=State.QUEUED)
+                ti.queued_dttm = timezone.utcnow() - timedelta(days=2)
+                ti.external_executor_id = '231'
+                session.merge(ti)
+                session.flush()
+                executor = celery_executor.CeleryExecutor()
+                executor.tasks = {ti.key: AsyncResult("231")}
+                executor._clear_stuck_queued_tasks()
+                session.flush()
+                ti = session.query(TaskInstance).filter(TaskInstance.task_id == ti.task_id).one()
+                # Not cleared
+                assert ti.state == State.QUEUED
+                assert executor.tasks == {ti.key: AsyncResult("231")}
+
+    @mock.patch("celery.backends.database.DatabaseBackend.ResultSession")
+    @pytest.mark.backend("mysql", "postgres")
+    @freeze_time("2020-01-01")
+    @pytest.mark.parametrize(
+        "state",
+        [
+            (State.SCHEDULED),
+            (State.QUEUED),
+        ],
+    )
+    def test_the_check_interval_to_clear_stuck_queued_task_is_correct(
+        self,
+        mock_result_session,
+        state,
+        session,
+        dag_maker,
+        create_dummy_dag,
+        create_task_instance,
+    ):
+        with _prepare_app():
+            mock_backend = DatabaseBackend(app=celery_executor.app, url="sqlite3://")
+            with mock.patch('airflow.executors.celery_executor.Celery.backend', mock_backend):
+                mock_session = mock_backend.ResultSession.return_value
+                mock_session.query.return_value.all.return_value = [
+                    mock.MagicMock(**{"to_dict.return_value": {"status": "SUCCESS", "task_id": "123"}})
+                ]
+                if state == State.SCHEDULED:
+                    last_check_time = time.time() - 302  # should clear ti state
+                else:
+                    last_check_time = time.time() - 298  # should not clear ti state
+
+                ti = create_task_instance(state=State.QUEUED)
+                ti.queued_dttm = timezone.utcnow() - timedelta(days=2)
+                ti.external_executor_id = '231'
+                session.merge(ti)
+                session.flush()
+                executor = celery_executor.CeleryExecutor()
+                executor.tasks = {ti.key: AsyncResult("231")}
+                executor.stuck_tasks_last_check_time = last_check_time
+                executor.sync()
+                session.flush()
+                ti = session.query(TaskInstance).filter(TaskInstance.task_id == ti.task_id).one()
+                assert ti.state == state
+
+    @mock.patch("celery.backends.database.DatabaseBackend.ResultSession")
+    @pytest.mark.backend("mysql", "postgres")
+    @freeze_time("2020-01-01")
+    @pytest.mark.parametrize(
+        "task_id, state",
+        [
+            ('231', State.QUEUED),
+            ('111', State.SCHEDULED),
+        ],
+    )
+    def test_the_check_interval_to_clear_stuck_queued_task_is_correct_for_db_query(
+        self,
+        mock_result_session,
+        task_id,
+        state,
+        session,
+        dag_maker,
+        create_dummy_dag,
+        create_task_instance,
+    ):
+        """Here we test that task are not cleared if found in celery database"""
+        result_obj = namedtuple('Result', ['status', 'task_id'])
+        with _prepare_app():
+            mock_backend = DatabaseBackend(app=celery_executor.app, url="sqlite3://")
+            with mock.patch('airflow.executors.celery_executor.Celery.backend', mock_backend):
+                mock_session = mock_backend.ResultSession.return_value
+                mock_session.query.return_value.all.return_value = [result_obj("SUCCESS", task_id)]
+
+                last_check_time = time.time() - 302  # should clear ti state
+
+                ti = create_task_instance(state=State.QUEUED)
+                ti.queued_dttm = timezone.utcnow() - timedelta(days=2)
+                ti.external_executor_id = '231'
+                session.merge(ti)
+                session.flush()
+                executor = celery_executor.CeleryExecutor()
+                executor.tasks = {ti.key: AsyncResult("231")}
+                executor.stuck_tasks_last_check_time = last_check_time
+                executor.sync()
+                session.flush()
+                ti = session.query(TaskInstance).filter(TaskInstance.task_id == ti.task_id).one()
+                assert ti.state == state
+
 
 def test_operation_timeout_config():
     assert celery_executor.OPERATION_TIMEOUT == 1

[airflow] 10/17: Type-annotate SkipMixin and BaseXCom (#20011)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0cc934ce12c36cbcf2572dc50675b9da77859eb9
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Dec 7 17:55:00 2021 +0800

    Type-annotate SkipMixin and BaseXCom (#20011)
    
    (cherry picked from commit 6dd0a0df7e6a2f025e9234bdbf97b41e9b8f6257)
---
 airflow/models/skipmixin.py |  15 +-
 airflow/models/xcom.py      | 335 ++++++++++++++++++++++++++++++--------------
 2 files changed, 232 insertions(+), 118 deletions(-)

diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py
index 5cd50a3..765a947 100644
--- a/airflow/models/skipmixin.py
+++ b/airflow/models/skipmixin.py
@@ -17,7 +17,7 @@
 # under the License.
 
 import warnings
-from typing import TYPE_CHECKING, Iterable, Union
+from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Union
 
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
@@ -26,6 +26,7 @@ from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import State
 
 if TYPE_CHECKING:
+    from pendulum import DateTime
     from sqlalchemy import Session
 
     from airflow.models import DagRun
@@ -66,9 +67,9 @@ class SkipMixin(LoggingMixin):
     def skip(
         self,
         dag_run: "DagRun",
-        execution_date: "timezone.DateTime",
-        tasks: "Iterable[BaseOperator]",
-        session: "Session" = None,
+        execution_date: "DateTime",
+        tasks: Sequence["BaseOperator"],
+        session: "Session",
     ):
         """
         Sets tasks instances to skipped from the same dag run.
@@ -114,11 +115,7 @@ class SkipMixin(LoggingMixin):
         session.commit()
 
         # SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available.
-        try:
-            task_id = self.task_id
-        except AttributeError:
-            task_id = None
-
+        task_id: Optional[str] = getattr(self, "task_id", None)
         if task_id is not None:
             from airflow.models.xcom import XCom
 
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 99c2b9a..4bb9689 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -16,10 +16,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
 import json
 import logging
 import pickle
-from typing import Any, Iterable, Optional, Union
+from typing import TYPE_CHECKING, Any, Iterable, Optional, Type, Union, cast, overload
 
 import pendulum
 from sqlalchemy import Column, LargeBinary, String
@@ -79,14 +80,60 @@ class BaseXCom(Base, LoggingMixin):
     def __repr__(self):
         return f'<XCom "{self.key}" ({self.task_id} @ {self.execution_date})>'
 
+    @overload
     @classmethod
-    @provide_session
-    def set(cls, key, value, task_id, dag_id, execution_date=None, run_id=None, session=None):
+    def set(
+        cls,
+        key: str,
+        value: Any,
+        *,
+        dag_id: str,
+        task_id: str,
+        run_id: str,
+        session: Optional[Session] = None,
+    ) -> None:
+        """Store an XCom value.
+
+        A deprecated form of this function accepts ``execution_date`` instead of
+        ``run_id``. The two arguments are mutually exclusive.
+
+        :param key: Key to store the XCom.
+        :param value: XCom value to store.
+        :param dag_id: DAG ID.
+        :param task_id: Task ID.
+        :param run_id: DAG run ID for the task.
+        :param session: Database session. If not given, a new session will be
+            created for this function.
+        :type session: sqlalchemy.orm.session.Session
         """
-        Store an XCom value.
 
-        :return: None
-        """
+    @overload
+    @classmethod
+    def set(
+        cls,
+        key: str,
+        value: Any,
+        task_id: str,
+        dag_id: str,
+        execution_date: datetime.datetime,
+        session: Optional[Session] = None,
+    ) -> None:
+        """:sphinx-autoapi-skip:"""
+
+    @classmethod
+    @provide_session
+    def set(
+        cls,
+        key: str,
+        value: Any,
+        task_id: str,
+        dag_id: str,
+        execution_date: Optional[datetime.datetime] = None,
+        session: Session = None,
+        *,
+        run_id: Optional[str] = None,
+    ) -> None:
+        """:sphinx-autoapi-skip:"""
         if not (execution_date is None) ^ (run_id is None):
             raise ValueError("Exactly one of execution_date or run_id must be passed")
 
@@ -94,70 +141,95 @@ class BaseXCom(Base, LoggingMixin):
             from airflow.models.dagrun import DagRun
 
             dag_run = session.query(DagRun).filter_by(dag_id=dag_id, run_id=run_id).one()
-
             execution_date = dag_run.execution_date
 
-        value = XCom.serialize_value(value)
-
-        # remove any duplicate XComs
+        # Remove duplicate XComs and insert a new one.
         session.query(cls).filter(
-            cls.key == key, cls.execution_date == execution_date, cls.task_id == task_id, cls.dag_id == dag_id
+            cls.key == key,
+            cls.execution_date == execution_date,
+            cls.task_id == task_id,
+            cls.dag_id == dag_id,
         ).delete()
-
+        new = cast(Any, cls)(  # Work around Mypy complaining model not defining '__init__'.
+            key=key,
+            value=cls.serialize_value(value),
+            execution_date=execution_date,
+            task_id=task_id,
+            dag_id=dag_id,
+        )
+        session.add(new)
         session.flush()
 
-        # insert new XCom
-        session.add(XCom(key=key, value=value, execution_date=execution_date, task_id=task_id, dag_id=dag_id))
+    @overload
+    @classmethod
+    def get_one(
+        cls,
+        *,
+        run_id: str,
+        key: Optional[str] = None,
+        task_id: Optional[str] = None,
+        dag_id: Optional[str] = None,
+        include_prior_dates: bool = False,
+        session: Optional[Session] = None,
+    ) -> Optional[Any]:
+        """Retrieve an XCom value, optionally meeting certain criteria.
+
+        This method returns "full" XCom values (i.e. uses ``deserialize_value``
+        from the XCom backend). Use :meth:`get_many` if you want the "shortened"
+        value via ``orm_deserialize_value``.
+
+        If there are no results, *None* is returned.
+
+        A deprecated form of this function accepts ``execution_date`` instead of
+        ``run_id``. The two arguments are mutually exclusive.
+
+        :param run_id: DAG run ID for the task.
+        :param key: A key for the XCom. If provided, only XCom with matching
+            keys will be returned. Pass *None* (default) to remove the filter.
+        :param task_id: Only XCom from task with matching ID will be pulled.
+            Pass *None* (default) to remove the filter.
+        :param dag_id: Only pull XCom from this DAG. If *None* (default), the
+            DAG of the calling task is used.
+        :param include_prior_dates: If *False* (default), only XCom from the
+            specified DAG run is returned. If *True*, the latest matching XCom is
+            returned regardless of the run it belongs to.
+        :param session: Database session. If not given, a new session will be
+            created for this function.
+        :type session: sqlalchemy.orm.session.Session
+        """
 
-        session.flush()
+    @overload
+    @classmethod
+    def get_one(
+        cls,
+        execution_date: pendulum.DateTime,
+        key: Optional[str] = None,
+        task_id: Optional[str] = None,
+        dag_id: Optional[str] = None,
+        include_prior_dates: bool = False,
+        session: Optional[Session] = None,
+    ) -> Optional[Any]:
+        """:sphinx-autoapi-skip:"""
 
     @classmethod
     @provide_session
     def get_one(
         cls,
         execution_date: Optional[pendulum.DateTime] = None,
-        run_id: Optional[str] = None,
         key: Optional[str] = None,
         task_id: Optional[Union[str, Iterable[str]]] = None,
         dag_id: Optional[Union[str, Iterable[str]]] = None,
         include_prior_dates: bool = False,
         session: Session = None,
+        *,
+        run_id: Optional[str] = None,
     ) -> Optional[Any]:
-        """
-        Retrieve an XCom value, optionally meeting certain criteria. Returns None
-        of there are no results.
-
-        ``run_id`` and ``execution_date`` are mutually exclusive.
-
-        This method returns "full" XCom values (i.e. it uses ``deserialize_value`` from the XCom backend).
-        Please use :meth:`get_many` if you want the "shortened" value via ``orm_deserialize_value``
-
-        :param execution_date: Execution date for the task
-        :type execution_date: pendulum.datetime
-        :param run_id: Dag run id for the task
-        :type run_id: str
-        :param key: A key for the XCom. If provided, only XComs with matching
-            keys will be returned. To remove the filter, pass key=None.
-        :type key: str
-        :param task_id: Only XComs from task with matching id will be
-            pulled. Can pass None to remove the filter.
-        :type task_id: str
-        :param dag_id: If provided, only pulls XCom from this DAG.
-            If None (default), the DAG of the calling task is used.
-        :type dag_id: str
-        :param include_prior_dates: If False, only XCom from the current
-            execution_date are returned. If True, XCom from previous dates
-            are returned as well.
-        :type include_prior_dates: bool
-        :param session: database session
-        :type session: sqlalchemy.orm.session.Session
-        """
+        """:sphinx-autoapi-skip:"""
         if not (execution_date is None) ^ (run_id is None):
             raise ValueError("Exactly one of execution_date or run_id must be passed")
 
-        result = (
-            cls.get_many(
-                execution_date=execution_date,
+        if run_id is not None:
+            query = cls.get_many(
                 run_id=run_id,
                 key=key,
                 task_ids=task_id,
@@ -165,58 +237,88 @@ class BaseXCom(Base, LoggingMixin):
                 include_prior_dates=include_prior_dates,
                 session=session,
             )
-            .with_entities(cls.value)
-            .first()
-        )
+        elif execution_date is not None:
+            query = cls.get_many(
+                execution_date=execution_date,
+                key=key,
+                task_ids=task_id,
+                dag_ids=dag_id,
+                include_prior_dates=include_prior_dates,
+                session=session,
+            )
+        else:
+            raise RuntimeError("Should not happen?")
+
+        result = query.with_entities(cls.value).first()
         if result:
             return cls.deserialize_value(result)
         return None
 
+    @overload
+    @classmethod
+    def get_many(
+        cls,
+        *,
+        run_id: str,
+        key: Optional[str] = None,
+        task_ids: Union[str, Iterable[str], None] = None,
+        dag_ids: Union[str, Iterable[str], None] = None,
+        include_prior_dates: bool = False,
+        limit: Optional[int] = None,
+        session: Optional[Session] = None,
+    ) -> Query:
+        """Composes a query to get one or more XCom entries.
+
+        This function returns an SQLAlchemy query of full XCom objects. If you
+        just want one stored value, use :meth:`get_one` instead.
+
+        A deprecated form of this function accepts ``execution_date`` instead of
+        ``run_id``. The two arguments are mutually exclusive.
+
+        :param run_id: DAG run ID for the task.
+        :param key: A key for the XComs. If provided, only XComs with matching
+            keys will be returned. Pass *None* (default) to remove the filter.
+        :param task_ids: Only XComs from task with matching IDs will be pulled.
+            Pass *None* (default) to remove the filter.
+        :param dag_id: Only pulls XComs from this DAG. If *None* (default), the
+            DAG of the calling task is used.
+        :param include_prior_dates: If *False* (default), only XComs from the
+            specified DAG run are returned. If *True*, all matching XComs are
+            returned regardless of the run it belongs to.
+        :param session: Database session. If not given, a new session will be
+            created for this function.
+        :type session: sqlalchemy.orm.session.Session
+        """
+
+    @overload
+    @classmethod
+    def get_many(
+        cls,
+        execution_date: pendulum.DateTime,
+        key: Optional[str] = None,
+        task_ids: Union[str, Iterable[str], None] = None,
+        dag_ids: Union[str, Iterable[str], None] = None,
+        include_prior_dates: bool = False,
+        limit: Optional[int] = None,
+        session: Optional[Session] = None,
+    ) -> Query:
+        """:sphinx-autoapi-skip:"""
+
     @classmethod
     @provide_session
     def get_many(
         cls,
         execution_date: Optional[pendulum.DateTime] = None,
-        run_id: Optional[str] = None,
         key: Optional[str] = None,
         task_ids: Optional[Union[str, Iterable[str]]] = None,
         dag_ids: Optional[Union[str, Iterable[str]]] = None,
         include_prior_dates: bool = False,
         limit: Optional[int] = None,
         session: Session = None,
+        *,
+        run_id: Optional[str] = None,
     ) -> Query:
-        """
-        Composes a query to get one or more values from the xcom table.
-
-        ``run_id`` and ``execution_date`` are mutually exclusive.
-
-        This function returns an SQLAlchemy query of full XCom objects. If you just want one stored value then
-        use :meth:`get_one`.
-
-        :param execution_date: Execution date for the task
-        :type execution_date: pendulum.datetime
-        :param run_id: Dag run id for the task
-        :type run_id: str
-        :param key: A key for the XCom. If provided, only XComs with matching
-            keys will be returned. To remove the filter, pass key=None.
-        :type key: str
-        :param task_ids: Only XComs from tasks with matching ids will be
-            pulled. Can pass None to remove the filter.
-        :type task_ids: str or iterable of strings (representing task_ids)
-        :param dag_ids: If provided, only pulls XComs from this DAG.
-            If None (default), the DAG of the calling task is used.
-        :type dag_ids: str
-        :param include_prior_dates: If False, only XComs from the current
-            execution_date are returned. If True, XComs from previous dates
-            are returned as well.
-        :type include_prior_dates: bool
-        :param limit: If required, limit the number of returned objects.
-            XCom objects can be quite big and you might want to limit the
-            number of rows.
-        :type limit: int
-        :param session: database session
-        :type session: sqlalchemy.orm.session.Session
-        """
+        """:sphinx-autoapi-skip:"""
         if not (execution_date is None) ^ (run_id is None):
             raise ValueError("Exactly one of execution_date or run_id must be passed")
 
@@ -262,8 +364,8 @@ class BaseXCom(Base, LoggingMixin):
 
     @classmethod
     @provide_session
-    def delete(cls, xcoms, session=None):
-        """Delete Xcom"""
+    def delete(cls, xcoms: Union["XCom", Iterable["XCom"]], session: Session) -> None:
+        """Delete one or multiple XCom entries."""
         if isinstance(xcoms, XCom):
             xcoms = [xcoms]
         for xcom in xcoms:
@@ -272,37 +374,49 @@ class BaseXCom(Base, LoggingMixin):
             session.delete(xcom)
         session.commit()
 
+    @overload
+    @classmethod
+    def clear(cls, *, dag_id: str, task_id: str, run_id: str, session: Optional[Session] = None) -> None:
+        """Clear all XCom data from the database for the given task instance.
+
+        A deprecated form of this function accepts ``execution_date`` instead of
+        ``run_id``. The two arguments are mutually exclusive.
+
+        :param dag_id: ID of DAG to clear the XCom for.
+        :param task_id: ID of task to clear the XCom for.
+        :param run_id: ID of DAG run to clear the XCom for.
+        :param session: Database session. If not given, a new session will be
+            created for this function.
+        :type session: sqlalchemy.orm.session.Session
+        """
+
+    @overload
+    @classmethod
+    def clear(
+        cls,
+        execution_date: pendulum.DateTime,
+        dag_id: str,
+        task_id: str,
+        session: Optional[Session] = None,
+    ) -> None:
+        """:sphinx-autoapi-skip:"""
+
     @classmethod
     @provide_session
     def clear(
         cls,
         execution_date: Optional[pendulum.DateTime] = None,
-        dag_id: str = None,
-        task_id: str = None,
-        run_id: str = None,
+        dag_id: Optional[str] = None,
+        task_id: Optional[str] = None,
+        run_id: Optional[str] = None,
         session: Session = None,
     ) -> None:
-        """
-        Clears all XCom data from the database for the task instance
-
-        ``run_id`` and ``execution_date`` are mutually exclusive.
-
-        :param execution_date: Execution date for the task
-        :type execution_date: pendulum.datetime or None
-        :param dag_id: ID of DAG to clear the XCom for.
-        :type dag_id: str
-        :param task_id: Only XComs from task with matching id will be cleared.
-        :type task_id: str
-        :param run_id: Dag run id for the task
-        :type run_id: str or None
-        :param session: database session
-        :type session: sqlalchemy.orm.session.Session
-        """
+        """:sphinx-autoapi-skip:"""
         # Given the historic order of this function (execution_date was first argument) to add a new optional
         # param we need to add default values for everything :(
-        if not dag_id:
+        if dag_id is None:
             raise TypeError("clear() missing required argument: dag_id")
-        if not task_id:
+        if task_id is None:
             raise TypeError("clear() missing required argument: task_id")
 
         if not (execution_date is None) ^ (run_id is None):
@@ -364,7 +478,7 @@ class BaseXCom(Base, LoggingMixin):
         return BaseXCom.deserialize_value(self)
 
 
-def resolve_xcom_backend():
+def resolve_xcom_backend() -> Type[BaseXCom]:
     """Resolves custom XCom class"""
     clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
     if clazz:
@@ -376,4 +490,7 @@ def resolve_xcom_backend():
     return BaseXCom
 
 
-XCom = resolve_xcom_backend()
+if TYPE_CHECKING:
+    XCom = BaseXCom  # Hack to avoid Mypy "Variable 'XCom' is not valid as a type".
+else:
+    XCom = resolve_xcom_backend()

[airflow] 13/17: Deprecate some functions in the experimental API (#19931)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c0fbf272c4344599647601e3169f90bb53874e6a
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Dec 16 12:30:42 2021 +0100

    Deprecate some functions in the experimental API (#19931)
    
    This PR seeks to deprecate some functions in the experimental API.
    Some of the deprecated functions are only used in the experimental REST API,
    others that are valid are being moved out of the experimental package.
    
    (cherry picked from commit 6239ae91a4c8bfb05f053a61cb8386f2d63b8b3a)
---
 airflow/api/client/local_client.py                 |  29 ++++--
 .../api/common/{experimental => }/delete_dag.py    |   3 +-
 airflow/api/common/experimental/delete_dag.py      |  70 ++-----------
 airflow/api/common/experimental/get_code.py        |   3 +
 .../api/common/experimental/get_dag_run_state.py   |   3 +
 airflow/api/common/experimental/get_task.py        |   3 +
 .../api/common/experimental/get_task_instance.py   |   3 +
 airflow/api/common/experimental/pool.py            |   6 ++
 airflow/api/common/experimental/trigger_dag.py     | 115 ++-------------------
 .../api/common/{experimental => }/trigger_dag.py   |   5 +-
 airflow/api_connexion/endpoints/dag_endpoint.py    |   7 +-
 airflow/models/pool.py                             |  52 ++++++++--
 airflow/operators/trigger_dagrun.py                |   2 +-
 airflow/utils/db.py                                |  15 +++
 airflow/www/views.py                               |   2 +-
 setup.cfg                                          |   1 +
 tests/api/client/test_local_client.py              |  31 +++++-
 .../common/{experimental => }/test_delete_dag.py   |   2 +-
 .../common/{experimental => }/test_trigger_dag.py  |   8 +-
 tests/models/test_pool.py                          |  71 +++++++++++++
 20 files changed, 229 insertions(+), 202 deletions(-)

diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py
index 7ce0d16..c005067 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -18,8 +18,10 @@
 """Local client API"""
 
 from airflow.api.client import api_client
-from airflow.api.common.experimental import delete_dag, pool, trigger_dag
+from airflow.api.common import delete_dag, trigger_dag
 from airflow.api.common.experimental.get_lineage import get_lineage as get_lineage_api
+from airflow.exceptions import AirflowBadRequest, PoolNotFound
+from airflow.models.pool import Pool
 
 
 class Client(api_client.Client):
@@ -36,19 +38,30 @@ class Client(api_client.Client):
         return f"Removed {count} record(s)"
 
     def get_pool(self, name):
-        the_pool = pool.get_pool(name=name)
-        return the_pool.pool, the_pool.slots, the_pool.description
+        pool = Pool.get_pool(pool_name=name)
+        if not pool:
+            raise PoolNotFound(f"Pool {name} not found")
+        return pool.pool, pool.slots, pool.description
 
     def get_pools(self):
-        return [(p.pool, p.slots, p.description) for p in pool.get_pools()]
+        return [(p.pool, p.slots, p.description) for p in Pool.get_pools()]
 
     def create_pool(self, name, slots, description):
-        the_pool = pool.create_pool(name=name, slots=slots, description=description)
-        return the_pool.pool, the_pool.slots, the_pool.description
+        if not (name and name.strip()):
+            raise AirflowBadRequest("Pool name shouldn't be empty")
+        pool_name_length = Pool.pool.property.columns[0].type.length
+        if len(name) > pool_name_length:
+            raise AirflowBadRequest(f"pool name cannot be more than {pool_name_length} characters")
+        try:
+            slots = int(slots)
+        except ValueError:
+            raise AirflowBadRequest(f"Bad value for `slots`: {slots}")
+        pool = Pool.create_or_update_pool(name=name, slots=slots, description=description)
+        return pool.pool, pool.slots, pool.description
 
     def delete_pool(self, name):
-        the_pool = pool.delete_pool(name=name)
-        return the_pool.pool, the_pool.slots, the_pool.description
+        pool = Pool.delete_pool(name=name)
+        return pool.pool, pool.slots, pool.description
 
     def get_lineage(self, dag_id, execution_date):
         lineage = get_lineage_api(dag_id=dag_id, execution_date=execution_date)
diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/delete_dag.py
similarity index 97%
copy from airflow/api/common/experimental/delete_dag.py
copy to airflow/api/common/delete_dag.py
index 44e54e3..c448127 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -24,6 +24,7 @@ from airflow import models
 from airflow.exceptions import AirflowException, DagNotFound
 from airflow.models import DagModel, TaskFail
 from airflow.models.serialized_dag import SerializedDagModel
+from airflow.utils.db import get_sqla_model_classes
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
@@ -60,7 +61,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
 
     count = 0
 
-    for model in models.base.Base._decl_class_registry.values():
+    for model in get_sqla_model_classes():
         if hasattr(model, "dag_id"):
             if keep_records_in_log and model.__name__ == 'Log':
                 continue
diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py
index 44e54e3..36bf7dd 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/experimental/delete_dag.py
@@ -15,68 +15,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Delete DAGs APIs."""
-import logging
+import warnings
 
-from sqlalchemy import or_
+from airflow.api.common.delete_dag import *  # noqa
 
-from airflow import models
-from airflow.exceptions import AirflowException, DagNotFound
-from airflow.models import DagModel, TaskFail
-from airflow.models.serialized_dag import SerializedDagModel
-from airflow.utils.session import provide_session
-from airflow.utils.state import State
-
-log = logging.getLogger(__name__)
-
-
-@provide_session
-def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> int:
-    """
-    :param dag_id: the dag_id of the DAG to delete
-    :param keep_records_in_log: whether keep records of the given dag_id
-        in the Log table in the backend database (for reasons like auditing).
-        The default value is True.
-    :param session: session used
-    :return count of deleted dags
-    """
-    log.info("Deleting DAG: %s", dag_id)
-    running_tis = (
-        session.query(models.TaskInstance.state)
-        .filter(models.TaskInstance.dag_id == dag_id)
-        .filter(models.TaskInstance.state == State.RUNNING)
-        .first()
-    )
-    if running_tis:
-        raise AirflowException("TaskInstances still running")
-    dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
-    if dag is None:
-        raise DagNotFound(f"Dag id {dag_id} not found")
-
-    # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
-    # There may be a lag, so explicitly removes serialized DAG here.
-    if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
-        SerializedDagModel.remove_dag(dag_id=dag_id, session=session)
-
-    count = 0
-
-    for model in models.base.Base._decl_class_registry.values():
-        if hasattr(model, "dag_id"):
-            if keep_records_in_log and model.__name__ == 'Log':
-                continue
-            cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
-            count += session.query(model).filter(cond).delete(synchronize_session='fetch')
-    if dag.is_subdag:
-        parent_dag_id, task_id = dag_id.rsplit(".", 1)
-        for model in TaskFail, models.TaskInstance:
-            count += (
-                session.query(model).filter(model.dag_id == parent_dag_id, model.task_id == task_id).delete()
-            )
-
-    # Delete entries in Import Errors table for a deleted DAG
-    # This handles the case when the dag_id is changed in the file
-    session.query(models.ImportError).filter(models.ImportError.filename == dag.fileloc).delete(
-        synchronize_session='fetch'
-    )
-
-    return count
+warnings.warn(
+    "This module is deprecated. Please use `airflow.api.common.delete_dag` instead.",
+    DeprecationWarning,
+    stacklevel=2,
+)
diff --git a/airflow/api/common/experimental/get_code.py b/airflow/api/common/experimental/get_code.py
index 79b0b9f..1a1fb62 100644
--- a/airflow/api/common/experimental/get_code.py
+++ b/airflow/api/common/experimental/get_code.py
@@ -16,11 +16,14 @@
 # specific language governing permissions and limitations
 # under the License.
 """Get code APIs."""
+from deprecated import deprecated
+
 from airflow.api.common.experimental import check_and_get_dag
 from airflow.exceptions import AirflowException, DagCodeNotFound
 from airflow.models.dagcode import DagCode
 
 
+@deprecated(reason="Use DagCode().get_code_by_fileloc() instead", version="2.2.3")
 def get_code(dag_id: str) -> str:
     """Return python code of a given dag_id.
 
diff --git a/airflow/api/common/experimental/get_dag_run_state.py b/airflow/api/common/experimental/get_dag_run_state.py
index ca71a9a..b2dedd5 100644
--- a/airflow/api/common/experimental/get_dag_run_state.py
+++ b/airflow/api/common/experimental/get_dag_run_state.py
@@ -19,9 +19,12 @@
 from datetime import datetime
 from typing import Dict
 
+from deprecated import deprecated
+
 from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
 
 
+@deprecated(reason="Use DagRun().get_state() instead", version="2.2.3")
 def get_dag_run_state(dag_id: str, execution_date: datetime) -> Dict[str, str]:
     """Return the Dag Run state identified by the given dag_id and execution_date.
 
diff --git a/airflow/api/common/experimental/get_task.py b/airflow/api/common/experimental/get_task.py
index 302ad64..fae5fd7 100644
--- a/airflow/api/common/experimental/get_task.py
+++ b/airflow/api/common/experimental/get_task.py
@@ -16,10 +16,13 @@
 # specific language governing permissions and limitations
 # under the License.
 """Task APIs.."""
+from deprecated import deprecated
+
 from airflow.api.common.experimental import check_and_get_dag
 from airflow.models import TaskInstance
 
 
+@deprecated(reason="Use DAG().get_task", version="2.2.3")
 def get_task(dag_id: str, task_id: str) -> TaskInstance:
     """Return the task object identified by the given dag_id and task_id."""
     dag = check_and_get_dag(dag_id, task_id)
diff --git a/airflow/api/common/experimental/get_task_instance.py b/airflow/api/common/experimental/get_task_instance.py
index f3ca1cf2..137f8a3 100644
--- a/airflow/api/common/experimental/get_task_instance.py
+++ b/airflow/api/common/experimental/get_task_instance.py
@@ -18,11 +18,14 @@
 """Task Instance APIs."""
 from datetime import datetime
 
+from deprecated import deprecated
+
 from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
 from airflow.exceptions import TaskInstanceNotFound
 from airflow.models import TaskInstance
 
 
+@deprecated(version="2.2.3", reason="Use DagRun.get_task_instance instead")
 def get_task_instance(dag_id: str, task_id: str, execution_date: datetime) -> TaskInstance:
     """Return the task instance identified by the given dag_id, task_id and execution_date."""
     dag = check_and_get_dag(dag_id, task_id)
diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py
index 30950ea..0b9c3a5 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -16,11 +16,14 @@
 # specific language governing permissions and limitations
 # under the License.
 """Pool APIs."""
+from deprecated import deprecated
+
 from airflow.exceptions import AirflowBadRequest, PoolNotFound
 from airflow.models import Pool
 from airflow.utils.session import provide_session
 
 
+@deprecated(reason="Use Pool.get_pool() instead", version="2.2.3")
 @provide_session
 def get_pool(name, session=None):
     """Get pool by a given name."""
@@ -34,12 +37,14 @@ def get_pool(name, session=None):
     return pool
 
 
+@deprecated(reason="Use Pool.get_pools() instead", version="2.2.3")
 @provide_session
 def get_pools(session=None):
     """Get all pools."""
     return session.query(Pool).all()
 
 
+@deprecated(reason="Use Pool.create_pool() instead", version="2.2.3")
 @provide_session
 def create_pool(name, slots, description, session=None):
     """Create a pool with a given parameters."""
@@ -70,6 +75,7 @@ def create_pool(name, slots, description, session=None):
     return pool
 
 
+@deprecated(reason="Use Pool.delete_pool() instead", version="2.2.3")
 @provide_session
 def delete_pool(name, session=None):
     """Delete pool by a given name."""
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 38a873c..d526312 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -15,114 +15,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Triggering DAG runs APIs."""
-import json
-from datetime import datetime
-from typing import List, Optional, Union
 
-from airflow.exceptions import DagNotFound, DagRunAlreadyExists
-from airflow.models import DagBag, DagModel, DagRun
-from airflow.utils import timezone
-from airflow.utils.state import State
-from airflow.utils.types import DagRunType
+import warnings
 
+from airflow.api.common.trigger_dag import *  # noqa
 
-def _trigger_dag(
-    dag_id: str,
-    dag_bag: DagBag,
-    run_id: Optional[str] = None,
-    conf: Optional[Union[dict, str]] = None,
-    execution_date: Optional[datetime] = None,
-    replace_microseconds: bool = True,
-) -> List[DagRun]:
-    """Triggers DAG run.
-
-    :param dag_id: DAG ID
-    :param dag_bag: DAG Bag model
-    :param run_id: ID of the dag_run
-    :param conf: configuration
-    :param execution_date: date of execution
-    :param replace_microseconds: whether microseconds should be zeroed
-    :return: list of triggered dags
-    """
-    dag = dag_bag.get_dag(dag_id)  # prefetch dag if it is stored serialized
-
-    if dag_id not in dag_bag.dags:
-        raise DagNotFound(f"Dag id {dag_id} not found")
-
-    execution_date = execution_date if execution_date else timezone.utcnow()
-
-    if not timezone.is_localized(execution_date):
-        raise ValueError("The execution_date should be localized")
-
-    if replace_microseconds:
-        execution_date = execution_date.replace(microsecond=0)
-
-    if dag.default_args and 'start_date' in dag.default_args:
-        min_dag_start_date = dag.default_args["start_date"]
-        if min_dag_start_date and execution_date < min_dag_start_date:
-            raise ValueError(
-                "The execution_date [{}] should be >= start_date [{}] from DAG's default_args".format(
-                    execution_date.isoformat(), min_dag_start_date.isoformat()
-                )
-            )
-
-    run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
-    dag_run = DagRun.find_duplicate(dag_id=dag_id, execution_date=execution_date, run_id=run_id)
-
-    if dag_run:
-        raise DagRunAlreadyExists(
-            f"A Dag Run already exists for dag id {dag_id} at {execution_date} with run id {run_id}"
-        )
-
-    run_conf = None
-    if conf:
-        run_conf = conf if isinstance(conf, dict) else json.loads(conf)
-
-    dag_runs = []
-    dags_to_run = [dag] + dag.subdags
-    for _dag in dags_to_run:
-        dag_run = _dag.create_dagrun(
-            run_id=run_id,
-            execution_date=execution_date,
-            state=State.QUEUED,
-            conf=run_conf,
-            external_trigger=True,
-            dag_hash=dag_bag.dags_hash.get(dag_id),
-        )
-        dag_runs.append(dag_run)
-
-    return dag_runs
-
-
-def trigger_dag(
-    dag_id: str,
-    run_id: Optional[str] = None,
-    conf: Optional[Union[dict, str]] = None,
-    execution_date: Optional[datetime] = None,
-    replace_microseconds: bool = True,
-) -> Optional[DagRun]:
-    """Triggers execution of DAG specified by dag_id
-
-    :param dag_id: DAG ID
-    :param run_id: ID of the dag_run
-    :param conf: configuration
-    :param execution_date: date of execution
-    :param replace_microseconds: whether microseconds should be zeroed
-    :return: first dag run triggered - even if more than one Dag Runs were triggered or None
-    """
-    dag_model = DagModel.get_current(dag_id)
-    if dag_model is None:
-        raise DagNotFound(f"Dag id {dag_id} not found in DagModel")
-
-    dagbag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
-    triggers = _trigger_dag(
-        dag_id=dag_id,
-        dag_bag=dagbag,
-        run_id=run_id,
-        conf=conf,
-        execution_date=execution_date,
-        replace_microseconds=replace_microseconds,
-    )
-
-    return triggers[0] if triggers else None
+warnings.warn(
+    "This module is deprecated. Please use `airflow.api.common.trigger_dag` instead.",
+    DeprecationWarning,
+    stacklevel=2,
+)
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/trigger_dag.py
similarity index 95%
copy from airflow/api/common/experimental/trigger_dag.py
copy to airflow/api/common/trigger_dag.py
index 38a873c..70bbb78 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/trigger_dag.py
@@ -62,9 +62,8 @@ def _trigger_dag(
         min_dag_start_date = dag.default_args["start_date"]
         if min_dag_start_date and execution_date < min_dag_start_date:
             raise ValueError(
-                "The execution_date [{}] should be >= start_date [{}] from DAG's default_args".format(
-                    execution_date.isoformat(), min_dag_start_date.isoformat()
-                )
+                f"The execution_date [{execution_date.isoformat()}] should be >= start_date "
+                f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
             )
 
     run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py
index c164fcc..286b191 100644
--- a/airflow/api_connexion/endpoints/dag_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_endpoint.py
@@ -110,13 +110,10 @@ def patch_dag(session, dag_id, update_mask=None):
 @provide_session
 def delete_dag(dag_id: str, session: Session):
     """Delete the specific DAG."""
-    # TODO: This function is shared with the /delete endpoint used by the web
-    # UI, so we're reusing it to simplify maintenance. Refactor the function to
-    # another place when the experimental/legacy API is removed.
-    from airflow.api.common.experimental import delete_dag
+    from airflow.api.common import delete_dag as delete_dag_module
 
     try:
-        delete_dag.delete_dag(dag_id, session=session)
+        delete_dag_module.delete_dag(dag_id, session=session)
     except DagNotFound:
         raise NotFound(f"Dag with id: '{dag_id}' not found")
     except AirflowException:
diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index 6f217c4..8ae88aa 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -21,11 +21,11 @@ from typing import Dict, Iterable, Optional, Tuple
 from sqlalchemy import Column, Integer, String, Text, func
 from sqlalchemy.orm.session import Session
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, PoolNotFound
 from airflow.models.base import Base
 from airflow.ti_deps.dependencies_states import EXECUTION_STATES
 from airflow.typing_compat import TypedDict
-from airflow.utils.session import provide_session
+from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import nowait, with_row_locks
 from airflow.utils.state import State
 
@@ -57,7 +57,13 @@ class Pool(Base):
 
     @staticmethod
     @provide_session
-    def get_pool(pool_name, session: Session = None):
+    def get_pools(session: Session = NEW_SESSION):
+        """Get all pools."""
+        return session.query(Pool).all()
+
+    @staticmethod
+    @provide_session
+    def get_pool(pool_name: str, session: Session = NEW_SESSION):
         """
         Get the Pool with specific pool name from the Pools.
 
@@ -69,7 +75,7 @@ class Pool(Base):
 
     @staticmethod
     @provide_session
-    def get_default_pool(session: Session = None):
+    def get_default_pool(session: Session = NEW_SESSION):
         """
         Get the Pool of the default_pool from the Pools.
 
@@ -80,10 +86,44 @@ class Pool(Base):
 
     @staticmethod
     @provide_session
+    def create_or_update_pool(name: str, slots: int, description: str, session: Session = NEW_SESSION):
+        """Create a pool with given parameters or update it if it already exists."""
+        if not name:
+            return
+        pool = session.query(Pool).filter_by(pool=name).first()
+        if pool is None:
+            pool = Pool(pool=name, slots=slots, description=description)
+            session.add(pool)
+        else:
+            pool.slots = slots
+            pool.description = description
+
+        session.commit()
+
+        return pool
+
+    @staticmethod
+    @provide_session
+    def delete_pool(name: str, session: Session = NEW_SESSION):
+        """Delete pool by a given name."""
+        if name == Pool.DEFAULT_POOL_NAME:
+            raise AirflowException("default_pool cannot be deleted")
+
+        pool = session.query(Pool).filter_by(pool=name).first()
+        if pool is None:
+            raise PoolNotFound(f"Pool '{name}' doesn't exist")
+
+        session.delete(pool)
+        session.commit()
+
+        return pool
+
+    @staticmethod
+    @provide_session
     def slots_stats(
         *,
         lock_rows: bool = False,
-        session: Session = None,
+        session: Session = NEW_SESSION,
     ) -> Dict[str, PoolStats]:
         """
         Get Pool stats (Number of Running, Queued, Open & Total tasks)
@@ -210,7 +250,7 @@ class Pool(Base):
         )
 
     @provide_session
-    def open_slots(self, session: Session) -> float:
+    def open_slots(self, session: Session = NEW_SESSION) -> float:
         """
         Get the number of slots open at the moment.
 
diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 1e6cb7f..421c796 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -21,7 +21,7 @@ import json
 import time
 from typing import Dict, List, Optional, Union
 
-from airflow.api.common.experimental.trigger_dag import trigger_dag
+from airflow.api.common.trigger_dag import trigger_dag
 from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists
 from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
 from airflow.utils import timezone
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index f35d165..023f482 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -991,3 +991,18 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+def get_sqla_model_classes():
+    """
+    Get all SQLAlchemy class mappers.
+
+    SQLAlchemy < 1.4 does not support registry.mappers so we use
+    try/except to handle it.
+    """
+    from airflow.models.base import Base
+
+    try:
+        return [mapper.class_ for mapper in Base.registry.mappers]
+    except AttributeError:
+        return Base._decl_class_registry.values()
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 2182a17..f2642a7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1607,7 +1607,7 @@ class Airflow(AirflowBaseView):
     @action_logging
     def delete(self):
         """Deletes DAG."""
-        from airflow.api.common.experimental import delete_dag
+        from airflow.api.common import delete_dag
         from airflow.exceptions import DagNotFound
 
         dag_id = request.values.get('dag_id')
diff --git a/setup.cfg b/setup.cfg
index b83ef9b..c3cce1c 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -95,6 +95,7 @@ install_requires =
     croniter>=0.3.17
     cryptography>=0.9.3
     dataclasses;python_version<"3.7"
+    deprecated>=1.2.13
     dill>=0.2.2, <0.4
     # Sphinx RTD theme 0.5.2. introduced limitation to docutils to account for some docutils markup
     # change:
diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py
index a2af8ca..9f574e4 100644
--- a/tests/api/client/test_local_client.py
+++ b/tests/api/client/test_local_client.py
@@ -17,6 +17,8 @@
 # under the License.
 
 import json
+import random
+import string
 import unittest
 from unittest.mock import ANY, patch
 
@@ -25,7 +27,7 @@ from freezegun import freeze_time
 
 from airflow.api.client.local_client import Client
 from airflow.example_dags import example_bash_operator
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowBadRequest, AirflowException, PoolNotFound
 from airflow.models import DAG, DagBag, DagModel, DagRun, Pool
 from airflow.utils import timezone
 from airflow.utils.session import create_session
@@ -133,6 +135,10 @@ class TestLocalClient(unittest.TestCase):
         pool = self.client.get_pool(name='foo')
         assert pool == ('foo', 1, '')
 
+    def test_get_pool_non_existing_raises(self):
+        with pytest.raises(PoolNotFound):
+            self.client.get_pool(name='foo')
+
     def test_get_pools(self):
         self.client.create_pool(name='foo1', slots=1, description='')
         self.client.create_pool(name='foo2', slots=2, description='')
@@ -145,6 +151,26 @@ class TestLocalClient(unittest.TestCase):
         with create_session() as session:
             assert session.query(Pool).count() == 2
 
+    def test_create_pool_bad_slots(self):
+        with pytest.raises(AirflowBadRequest, match="^Bad value for `slots`: foo$"):
+            self.client.create_pool(
+                name='foo',
+                slots='foo',
+                description='',
+            )
+
+    def test_create_pool_name_too_long(self):
+        long_name = ''.join(random.choices(string.ascii_lowercase, k=300))
+        pool_name_length = Pool.pool.property.columns[0].type.length
+        with pytest.raises(
+            AirflowBadRequest, match=f"^pool name cannot be more than {pool_name_length} characters"
+        ):
+            self.client.create_pool(
+                name=long_name,
+                slots=5,
+                description='',
+            )
+
     def test_delete_pool(self):
         self.client.create_pool(name='foo', slots=1, description='')
         with create_session() as session:
@@ -152,3 +178,6 @@ class TestLocalClient(unittest.TestCase):
         self.client.delete_pool(name='foo')
         with create_session() as session:
             assert session.query(Pool).count() == 1
+        for name in ('', '    '):
+            with pytest.raises(PoolNotFound, match=f"^Pool {name!r} doesn't exist$"):
+                Pool.delete_pool(name=name)
diff --git a/tests/api/common/experimental/test_delete_dag.py b/tests/api/common/test_delete_dag.py
similarity index 99%
rename from tests/api/common/experimental/test_delete_dag.py
rename to tests/api/common/test_delete_dag.py
index 5984cd2..0eb058a 100644
--- a/tests/api/common/experimental/test_delete_dag.py
+++ b/tests/api/common/test_delete_dag.py
@@ -20,7 +20,7 @@
 import pytest
 
 from airflow import models
-from airflow.api.common.experimental.delete_dag import delete_dag
+from airflow.api.common.delete_dag import delete_dag
 from airflow.exceptions import AirflowException, DagNotFound
 from airflow.operators.dummy import DummyOperator
 from airflow.utils.dates import days_ago
diff --git a/tests/api/common/experimental/test_trigger_dag.py b/tests/api/common/test_trigger_dag.py
similarity index 93%
rename from tests/api/common/experimental/test_trigger_dag.py
rename to tests/api/common/test_trigger_dag.py
index 2f16446..f79d413 100644
--- a/tests/api/common/experimental/test_trigger_dag.py
+++ b/tests/api/common/test_trigger_dag.py
@@ -22,7 +22,7 @@ from unittest import mock
 import pytest
 from parameterized import parameterized
 
-from airflow.api.common.experimental.trigger_dag import _trigger_dag
+from airflow.api.common.trigger_dag import _trigger_dag
 from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun
 from airflow.utils import timezone
@@ -42,7 +42,7 @@ class TestTriggerDag(unittest.TestCase):
         with pytest.raises(AirflowException):
             _trigger_dag('dag_not_found', dag_bag_mock)
 
-    @mock.patch('airflow.api.common.experimental.trigger_dag.DagRun', spec=DagRun)
+    @mock.patch('airflow.api.common.trigger_dag.DagRun', spec=DagRun)
     @mock.patch('airflow.models.DagBag')
     def test_trigger_dag_dag_run_exist(self, dag_bag_mock, dag_run_mock):
         dag_id = "dag_run_exist"
@@ -54,7 +54,7 @@ class TestTriggerDag(unittest.TestCase):
             _trigger_dag(dag_id, dag_bag_mock)
 
     @mock.patch('airflow.models.DAG')
-    @mock.patch('airflow.api.common.experimental.trigger_dag.DagRun', spec=DagRun)
+    @mock.patch('airflow.api.common.trigger_dag.DagRun', spec=DagRun)
     @mock.patch('airflow.models.DagBag')
     def test_trigger_dag_include_subdags(self, dag_bag_mock, dag_run_mock, dag_mock):
         dag_id = "trigger_dag"
@@ -70,7 +70,7 @@ class TestTriggerDag(unittest.TestCase):
         assert 3 == len(triggers)
 
     @mock.patch('airflow.models.DAG')
-    @mock.patch('airflow.api.common.experimental.trigger_dag.DagRun', spec=DagRun)
+    @mock.patch('airflow.api.common.trigger_dag.DagRun', spec=DagRun)
     @mock.patch('airflow.models.DagBag')
     def test_trigger_dag_include_nested_subdags(self, dag_bag_mock, dag_run_mock, dag_mock):
         dag_id = "trigger_dag"
diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py
index 00fe140..95e585e 100644
--- a/tests/models/test_pool.py
+++ b/tests/models/test_pool.py
@@ -16,11 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import pytest
+
 from airflow import settings
+from airflow.exceptions import AirflowException, PoolNotFound
 from airflow.models.pool import Pool
 from airflow.models.taskinstance import TaskInstance as TI
 from airflow.operators.dummy import DummyOperator
 from airflow.utils import timezone
+from airflow.utils.session import create_session
 from airflow.utils.state import State
 from tests.test_utils.db import clear_db_dags, clear_db_pools, clear_db_runs, set_default_pool_slots
 
@@ -28,6 +32,10 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 
 class TestPool:
+
+    USER_POOL_COUNT = 2
+    TOTAL_POOL_COUNT = USER_POOL_COUNT + 1  # including default_pool
+
     @staticmethod
     def clean_db():
         clear_db_dags()
@@ -36,6 +44,20 @@ class TestPool:
 
     def setup_method(self):
         self.clean_db()
+        self.pools = []
+
+    def add_pools(self):
+        self.pools = [Pool.get_default_pool()]
+        for i in range(self.USER_POOL_COUNT):
+            name = f'experimental_{i + 1}'
+            pool = Pool(
+                pool=name,
+                slots=i,
+                description=name,
+            )
+            self.pools.append(pool)
+        with create_session() as session:
+            session.add_all(self.pools)
 
     def teardown_method(self):
         self.clean_db()
@@ -149,3 +171,52 @@ class TestPool:
                 "running": 1,
             }
         } == Pool.slots_stats()
+
+    def test_get_pool(self):
+        self.add_pools()
+        pool = Pool.get_pool(pool_name=self.pools[0].pool)
+        assert pool.pool == self.pools[0].pool
+
+    def test_get_pool_non_existing(self):
+        self.add_pools()
+        assert not Pool.get_pool(pool_name='test')
+
+    def test_get_pool_bad_name(self):
+        for name in ('', '    '):
+            assert not Pool.get_pool(pool_name=name)
+
+    def test_get_pools(self):
+        self.add_pools()
+        pools = sorted(Pool.get_pools(), key=lambda p: p.pool)
+        assert pools[0].pool == self.pools[0].pool
+        assert pools[1].pool == self.pools[1].pool
+
+    def test_create_pool(self, session):
+        self.add_pools()
+        pool = Pool.create_or_update_pool(name='foo', slots=5, description='')
+        assert pool.pool == 'foo'
+        assert pool.slots == 5
+        assert pool.description == ''
+        assert session.query(Pool).count() == self.TOTAL_POOL_COUNT + 1
+
+    def test_create_pool_existing(self, session):
+        self.add_pools()
+        pool = Pool.create_or_update_pool(name=self.pools[0].pool, slots=5, description='')
+        assert pool.pool == self.pools[0].pool
+        assert pool.slots == 5
+        assert pool.description == ''
+        assert session.query(Pool).count() == self.TOTAL_POOL_COUNT
+
+    def test_delete_pool(self, session):
+        self.add_pools()
+        pool = Pool.delete_pool(name=self.pools[-1].pool)
+        assert pool.pool == self.pools[-1].pool
+        assert session.query(Pool).count() == self.TOTAL_POOL_COUNT - 1
+
+    def test_delete_pool_non_existing(self):
+        with pytest.raises(PoolNotFound, match="^Pool 'test' doesn't exist$"):
+            Pool.delete_pool(name='test')
+
+    def test_delete_default_pool_not_allowed(self):
+        with pytest.raises(AirflowException, match="^default_pool cannot be deleted$"):
+            Pool.delete_pool(Pool.DEFAULT_POOL_NAME)

[airflow] 05/17: Add back legacy .piprc customization for pip (#21124)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 243f44dcce87a3c8ffc86925fb1eb6aa7a22e1c1
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Wed Jan 26 18:04:19 2022 +0100

    Add back legacy .piprc customization for pip (#21124)
    
    This change brings back backwards compatibility to using .piprc
    to customize Airflow Image. Some older vrsions of pip used .piprc
    (even though documentation about is difficult to find now) and we
    used to support this option. With #20445, we changed to use
    (fully documented) ``pip.conf`` option, however if someone used
    .piprc before to customize their image, this change would break it.
    
    The PR brings back also the .piprc option to the image (even if
    it is not really clear whether current and future versions of pip
    will support it.
    
    (cherry picked from commit d5a9edf25723396d17fd10bb980fb99ccac618bb)
---
 Dockerfile                  |  3 +++
 docs/docker-stack/build.rst | 10 +++++++++-
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/Dockerfile b/Dockerfile
index f880ec5..aadf896 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -212,6 +212,9 @@ USER airflow
 RUN if [[ -f /docker-context-files/pip.conf ]]; then \
         mkdir -p ${AIRFLOW_USER_HOME_DIR}/.config/pip; \
         cp /docker-context-files/pip.conf "${AIRFLOW_USER_HOME_DIR}/.config/pip/pip.conf"; \
+    fi; \
+    if [[ -f /docker-context-files/.piprc ]]; then \
+        cp /docker-context-files/.piprc "${AIRFLOW_USER_HOME_DIR}/.piprc"; \
     fi
 
 ENV AIRFLOW_PIP_VERSION=${AIRFLOW_PIP_VERSION} \
diff --git a/docs/docker-stack/build.rst b/docs/docker-stack/build.rst
index 2702c66..b85bf1c 100644
--- a/docs/docker-stack/build.rst
+++ b/docs/docker-stack/build.rst
@@ -522,13 +522,21 @@ described below but here is an example of rather complex command to customize th
 based on example in `this comment <https://github.com/apache/airflow/issues/8605#issuecomment-690065621>`_:
 
 In case you need to use your custom PyPI package indexes, you can also customize PYPI sources used during
-image build by adding a ``docker-context-files``/``pip.conf`` file when building the image.
+image build by adding a ``docker-context-files/pip.conf`` file when building the image.
 This ``pip.conf`` will not be committed to the repository (it is added to ``.gitignore``) and it will not be
 present in the final production image. It is added and used only in the build segment of the image.
 Therefore this ``pip.conf`` file can safely contain list of package indexes you want to use,
 usernames and passwords used for authentication. More details about ``pip.conf`` file can be found in the
 `pip configuration <https://pip.pypa.io/en/stable/topics/configuration/>`_.
 
+If you used the ``.piprc`` before (some older versions of ``pip`` used it for customization), you can put it
+in the ``docker-context-files/.piprc`` file and it will be automatically copied to ``HOME`` directory
+of the ``airflow`` user.
+
+Note, that those customizations are only available in the ``build`` segment of the Airflow image and they
+are not present in the ``final`` image. If you wish to extend the final image and add custom ``.piprc`` and
+``pip.conf``, you should add them in your own Dockerfile used to extend the Airflow image.
+
 Such customizations are independent of the way how airflow is installed.
 
 .. note::

[airflow] 01/17: Return to the same place when triggering a DAG (#20955)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ba17fa34358971a78493d73f62a5992e65cbbf2f
Author: Mark Norman Francis <no...@201created.com>
AuthorDate: Thu Jan 20 08:15:37 2022 +0000

    Return to the same place when triggering a DAG (#20955)
    
    (cherry picked from commit 928dafe6c495bbf3e03d14473753fce915134a46)
---
 airflow/www/templates/airflow/dag.html |  4 ++--
 tests/www/views/test_views_tasks.py    | 24 ++++++++++++++++++++----
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index a88e3d43..d69f42f 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -148,11 +148,11 @@
                   <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
                   <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
                   <input type="hidden" name="unpause" value="True">
-                  <input type="hidden" name="origin" value="{{ url_for('Airflow.' + dag.default_view, dag_id=dag.dag_id) }}">
+                  <input type="hidden" name="origin" value="{{ url_for(request.endpoint, dag_id=dag.dag_id) }}">
                   <button type="submit" class="dropdown-form-btn">Trigger DAG</button>
                 </form>
               </li>
-              <li><a href="{{ url_for('Airflow.trigger', dag_id=dag.dag_id, origin=url_for('Airflow.' + dag.default_view, dag_id=dag.dag_id)) }}">Trigger DAG w/ config</a></li>
+              <li><a href="{{ url_for('Airflow.trigger', dag_id=dag.dag_id, origin=url_for(request.endpoint, dag_id=dag.dag_id)) }}">Trigger DAG w/ config</a></li>
             </ul>
           </div>
           <a href="{{ url_for('Airflow.delete', dag_id=dag.dag_id) }}" title="Delete&nbsp;DAG" class="btn btn-default btn-icon-only{{ ' disabled' if not dag.can_delete }}"
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index e4336c5..ed0466e 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -278,7 +278,7 @@ def test_escape_in_tree_view(app, admin_client, test_str, expected_text):
     check_content_in_response(expected_text, resp)
 
 
-def test_dag_details_trigger_origin_tree_view(app, admin_client):
+def test_tree_trigger_origin_tree_view(app, admin_client):
     app.dag_bag.get_dag('test_tree_view').create_dagrun(
         run_type=DagRunType.SCHEDULED,
         execution_date=DEFAULT_DATE,
@@ -286,14 +286,30 @@ def test_dag_details_trigger_origin_tree_view(app, admin_client):
         state=State.RUNNING,
     )
 
-    url = 'dag_details?dag_id=test_tree_view'
+    url = 'tree?dag_id=test_tree_view'
     resp = admin_client.get(url, follow_redirects=True)
     params = {'dag_id': 'test_tree_view', 'origin': '/tree?dag_id=test_tree_view'}
     href = f"/trigger?{html.escape(urllib.parse.urlencode(params))}"
     check_content_in_response(href, resp)
 
 
-def test_dag_details_trigger_origin_graph_view(app, admin_client):
+def test_graph_trigger_origin_graph_view(app, admin_client):
+    app.dag_bag.get_dag('test_tree_view').create_dagrun(
+        run_type=DagRunType.SCHEDULED,
+        execution_date=DEFAULT_DATE,
+        data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+        start_date=timezone.utcnow(),
+        state=State.RUNNING,
+    )
+
+    url = 'graph?dag_id=test_tree_view'
+    resp = admin_client.get(url, follow_redirects=True)
+    params = {'dag_id': 'test_tree_view', 'origin': '/graph?dag_id=test_tree_view'}
+    href = f"/trigger?{html.escape(urllib.parse.urlencode(params))}"
+    check_content_in_response(href, resp)
+
+
+def test_dag_details_trigger_origin_dag_details_view(app, admin_client):
     app.dag_bag.get_dag('test_graph_view').create_dagrun(
         run_type=DagRunType.SCHEDULED,
         execution_date=DEFAULT_DATE,
@@ -303,7 +319,7 @@ def test_dag_details_trigger_origin_graph_view(app, admin_client):
 
     url = 'dag_details?dag_id=test_graph_view'
     resp = admin_client.get(url, follow_redirects=True)
-    params = {'dag_id': 'test_graph_view', 'origin': '/graph?dag_id=test_graph_view'}
+    params = {'dag_id': 'test_graph_view', 'origin': '/dag_details?dag_id=test_graph_view'}
     href = f"/trigger?{html.escape(urllib.parse.urlencode(params))}"
     check_content_in_response(href, resp)
 

[airflow] 09/17: Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5fa0e1394e18000bf1ddb4948a92d5f64282e9bc
Author: SeonghwanLee <50...@users.noreply.github.com>
AuthorDate: Thu Jan 27 14:36:24 2022 +0900

    Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)
    
    Co-authored-by: uplsh <up...@linecorp.com>
    (cherry picked from commit d97e2bac854f9891eb47f0c06c261e89723038ca)
---
 airflow/cli/commands/dag_command.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index e04bc73..6e8e157 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -47,7 +47,7 @@ from airflow.utils.cli import (
 )
 from airflow.utils.dot_renderer import render_dag
 from airflow.utils.session import create_session, provide_session
-from airflow.utils.state import State
+from airflow.utils.state import DagRunState
 
 
 @cli_utils.action_logging
@@ -105,7 +105,7 @@ def dag_backfill(args, dag=None):
                 end_date=args.end_date,
                 confirm_prompt=not args.yes,
                 include_subdags=True,
-                dag_run_state=State.NONE,
+                dag_run_state=DagRunState.QUEUED,
             )
 
         try:

[airflow] 16/17: bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 30b0d98d2a31136cb64dd383e0c8ab4a8010d319
Author: Đặng Minh Dũng <du...@live.com>
AuthorDate: Wed Jan 5 14:42:57 2022 +0700

    bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)
    
    (cherry picked from commit 64c0bd50155dfdb84671ac35d645b812fafa78a1)
---
 airflow/api/common/experimental/mark_tasks.py | 121 ++++++++++++++++++--------
 1 file changed, 85 insertions(+), 36 deletions(-)

diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py
index 28e733d..4131cb5 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -17,23 +17,27 @@
 # under the License.
 """Marks tasks APIs."""
 
-import datetime
-from typing import Iterable
+from datetime import datetime
+from typing import Generator, Iterable, List, Optional
 
-from sqlalchemy import or_
 from sqlalchemy.orm import contains_eager
+from sqlalchemy.orm.session import Session as SASession
+from sqlalchemy.sql.expression import or_
 
+from airflow import DAG
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.operators.subdag import SubDagOperator
 from airflow.utils import timezone
-from airflow.utils.session import provide_session
-from airflow.utils.state import State
+from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.types import DagRunType
 
 
-def _create_dagruns(dag, execution_dates, state, run_type):
+def _create_dagruns(
+    dag: DAG, execution_dates: List[datetime], state: TaskInstanceState, run_type: DagRunType
+) -> List[DagRun]:
     """
     Infers from the dates which dag runs need to be created and does so.
 
@@ -63,15 +67,15 @@ def _create_dagruns(dag, execution_dates, state, run_type):
 @provide_session
 def set_state(
     tasks: Iterable[BaseOperator],
-    execution_date: datetime.datetime,
+    execution_date: datetime,
     upstream: bool = False,
     downstream: bool = False,
     future: bool = False,
     past: bool = False,
     state: str = State.SUCCESS,
     commit: bool = False,
-    session=None,
-):
+    session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
     """
     Set the state of a task instance and if needed its relatives. Can set state
     for future tasks (calculated from execution_date) and retroactively
@@ -134,7 +138,9 @@ def set_state(
     return tis_altered
 
 
-def all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates):
+def all_subdag_tasks_query(
+    sub_dag_run_ids: List[str], session: SASession, state: TaskInstanceState, confirmed_dates: List[datetime]
+):
     """Get *all* tasks of the sub dags"""
     qry_sub_dag = (
         session.query(TaskInstance)
@@ -144,7 +150,13 @@ def all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates):
     return qry_sub_dag
 
 
-def get_all_dag_task_query(dag, session, state, task_ids, confirmed_dates):
+def get_all_dag_task_query(
+    dag: DAG,
+    session: SASession,
+    state: TaskInstanceState,
+    task_ids: List[str],
+    confirmed_dates: List[datetime],
+):
     """Get all tasks of the main dag that will be affected by a state change"""
     qry_dag = (
         session.query(TaskInstance)
@@ -160,7 +172,14 @@ def get_all_dag_task_query(dag, session, state, task_ids, confirmed_dates):
     return qry_dag
 
 
-def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
+def get_subdag_runs(
+    dag: DAG,
+    session: SASession,
+    state: TaskInstanceState,
+    task_ids: List[str],
+    commit: bool,
+    confirmed_dates: List[datetime],
+) -> List[str]:
     """Go through subdag operators and create dag runs. We will only work
     within the scope of the subdag. We won't propagate to the parent dag,
     but we will propagate from parent to subdag.
@@ -181,7 +200,7 @@ def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
                 dag_runs = _create_dagruns(
                     current_task.subdag,
                     execution_dates=confirmed_dates,
-                    state=State.RUNNING,
+                    state=TaskInstanceState.RUNNING,
                     run_type=DagRunType.BACKFILL_JOB,
                 )
 
@@ -192,7 +211,13 @@ def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
     return sub_dag_ids
 
 
-def verify_dagruns(dag_runs, commit, state, session, current_task):
+def verify_dagruns(
+    dag_runs: List[DagRun],
+    commit: bool,
+    state: TaskInstanceState,
+    session: SASession,
+    current_task: BaseOperator,
+):
     """Verifies integrity of dag_runs.
 
     :param dag_runs: dag runs to verify
@@ -210,7 +235,7 @@ def verify_dagruns(dag_runs, commit, state, session, current_task):
             session.merge(dag_run)
 
 
-def verify_dag_run_integrity(dag, dates):
+def verify_dag_run_integrity(dag: DAG, dates: List[datetime]) -> List[datetime]:
     """
     Verify the integrity of the dag runs in case a task was added or removed
     set the confirmed execution dates as they might be different
@@ -225,7 +250,9 @@ def verify_dag_run_integrity(dag, dates):
     return confirmed_dates
 
 
-def find_task_relatives(tasks, downstream, upstream):
+def find_task_relatives(
+    tasks: Iterable[BaseOperator], downstream: bool, upstream: bool
+) -> Generator[str, None, None]:
     """Yield task ids and optionally ancestor and descendant ids."""
     for task in tasks:
         yield task.task_id
@@ -237,7 +264,7 @@ def find_task_relatives(tasks, downstream, upstream):
                 yield relative.task_id
 
 
-def get_execution_dates(dag, execution_date, future, past):
+def get_execution_dates(dag: DAG, execution_date: datetime, future: bool, past: bool) -> List[datetime]:
     """Returns dates of DAG execution"""
     latest_execution_date = dag.get_latest_execution_date()
     if latest_execution_date is None:
@@ -266,7 +293,9 @@ def get_execution_dates(dag, execution_date, future, past):
 
 
 @provide_session
-def _set_dag_run_state(dag_id, execution_date, state, session=None):
+def _set_dag_run_state(
+    dag_id: str, execution_date: datetime, state: TaskInstanceState, session: SASession = NEW_SESSION
+):
     """
     Helper method that set dag run state in the DB.
 
@@ -279,7 +308,7 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None):
         session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date).one()
     )
     dag_run.state = state
-    if state == State.RUNNING:
+    if state == TaskInstanceState.RUNNING:
         dag_run.start_date = timezone.utcnow()
         dag_run.end_date = None
     else:
@@ -288,7 +317,12 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None):
 
 
 @provide_session
-def set_dag_run_state_to_success(dag, execution_date, commit=False, session=None):
+def set_dag_run_state_to_success(
+    dag: Optional[DAG],
+    execution_date: Optional[datetime],
+    commit: bool = False,
+    session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
     """
     Set the dag run for a specific execution date and its task instances
     to success.
@@ -306,18 +340,27 @@ def set_dag_run_state_to_success(dag, execution_date, commit=False, session=None
 
     # Mark the dag run to success.
     if commit:
-        _set_dag_run_state(dag.dag_id, execution_date, State.SUCCESS, session)
+        _set_dag_run_state(dag.dag_id, execution_date, TaskInstanceState.SUCCESS, session)
 
     # Mark all task instances of the dag run to success.
     for task in dag.tasks:
         task.dag = dag
     return set_state(
-        tasks=dag.tasks, execution_date=execution_date, state=State.SUCCESS, commit=commit, session=session
+        tasks=dag.tasks,
+        execution_date=execution_date,
+        state=TaskInstanceState.SUCCESS,
+        commit=commit,
+        session=session,
     )
 
 
 @provide_session
-def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None):
+def set_dag_run_state_to_failed(
+    dag: Optional[DAG],
+    execution_date: Optional[datetime],
+    commit: bool = False,
+    session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
     """
     Set the dag run for a specific execution date and its running task instances
     to failed.
@@ -335,18 +378,15 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None)
 
     # Mark the dag run to failed.
     if commit:
-        _set_dag_run_state(dag.dag_id, execution_date, State.FAILED, session)
+        _set_dag_run_state(dag.dag_id, execution_date, TaskInstanceState.FAILED, session)
 
-    # Mark only RUNNING task instances.
+    # Mark only running task instances.
     task_ids = [task.task_id for task in dag.tasks]
-    tis = (
-        session.query(TaskInstance)
-        .filter(
-            TaskInstance.dag_id == dag.dag_id,
-            TaskInstance.execution_date == execution_date,
-            TaskInstance.task_id.in_(task_ids),
-        )
-        .filter(TaskInstance.state == State.RUNNING)
+    tis = session.query(TaskInstance).filter(
+        TaskInstance.dag_id == dag.dag_id,
+        TaskInstance.execution_date == execution_date,
+        TaskInstance.task_id.in_(task_ids),
+        TaskInstance.state.in_(State.running),
     )
     task_ids_of_running_tis = [task_instance.task_id for task_instance in tis]
 
@@ -358,12 +398,21 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None)
         tasks.append(task)
 
     return set_state(
-        tasks=tasks, execution_date=execution_date, state=State.FAILED, commit=commit, session=session
+        tasks=tasks,
+        execution_date=execution_date,
+        state=TaskInstanceState.FAILED,
+        commit=commit,
+        session=session,
     )
 
 
 @provide_session
-def set_dag_run_state_to_running(dag, execution_date, commit=False, session=None):
+def set_dag_run_state_to_running(
+    dag: Optional[DAG],
+    execution_date: Optional[datetime],
+    commit: bool = False,
+    session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
     """
     Set the dag run for a specific execution date to running.
 
@@ -380,7 +429,7 @@ def set_dag_run_state_to_running(dag, execution_date, commit=False, session=None
 
     # Mark the dag run to running.
     if commit:
-        _set_dag_run_state(dag.dag_id, execution_date, State.RUNNING, session)
+        _set_dag_run_state(dag.dag_id, execution_date, TaskInstanceState.RUNNING, session)
 
     # To keep the return type consistent with the other similar functions.
     return res

[airflow] 08/17: Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c347d807d9eb73eeedb12d777b10a0fdf6b1a6e8
Author: yuqian90 <yu...@gmail.com>
AuthorDate: Thu Jan 27 06:47:10 2022 +0800

    Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)
    
    (cherry picked from commit e3832a77a3e0d374dfdbe14f34a941d22c9c459d)
---
 airflow/models/taskinstance.py    | 4 +++-
 tests/models/test_taskinstance.py | 6 ++++++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 281d067..ec34156 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -447,6 +447,7 @@ class TaskInstance(Base, LoggingMixin):
         self.run_id = run_id
 
         self.try_number = 0
+        self.max_tries = self.task.retries
         self.unixname = getuser()
         if state:
             self.state = state
@@ -775,7 +776,8 @@ class TaskInstance(Base, LoggingMixin):
         self.pool_slots = task.pool_slots
         self.priority_weight = task.priority_weight_total
         self.run_as_user = task.run_as_user
-        self.max_tries = task.retries
+        # Do not set max_tries to task.retries here because max_tries is a cumulative
+        # value that needs to be stored in the db.
         self.executor_config = task.executor_config
         self.operator = task.task_type
 
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index d111371..4fec49f 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2143,6 +2143,12 @@ def test_refresh_from_task(pool_override):
     assert ti.executor_config == task.executor_config
     assert ti.operator == DummyOperator.__name__
 
+    # Test that refresh_from_task does not reset ti.max_tries
+    expected_max_tries = task.retries + 10
+    ti.max_tries = expected_max_tries
+    ti.refresh_from_task(task)
+    assert ti.max_tries == expected_max_tries
+
 
 class TestRunRawTaskQueriesCount:
     """

[airflow] 07/17: Update `version_added` for `[email] from_email` (#21138)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f02ae31d125f6f247ad4e20f8e60560087739aa9
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Thu Jan 27 09:45:33 2022 -0700

    Update `version_added` for `[email] from_email` (#21138)
    
    (cherry picked from commit 362f397d7a3351c718b798a146f2f955a17b7074)
---
 airflow/config_templates/config.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index a70854e..6941f03 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1357,7 +1357,7 @@
       description: |
         Email address that will be used as sender address.
         It can either be raw email or the complete address in a format ``Sender Name <se...@email.com>``
-      version_added: 2.3.0
+      version_added: 2.2.4
       type: string
       example: "Airflow <ai...@example.com>"
       default: ~

[airflow] 03/17: name mismatch (#21055)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8eeb3ea35fa21698aa049f23c68ba495c8cc2508
Author: caxefaizan <63...@users.noreply.github.com>
AuthorDate: Mon Jan 24 16:17:23 2022 +0530

    name mismatch (#21055)
    
    (cherry picked from commit 4fb005ec122a1c0091db0083c2fe4305473abb49)
---
 .../kubernetes/pod_template_file_examples/dags_in_volume_template.yaml  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
index 389fe37..cc46149 100644
--- a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
+++ b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
@@ -63,7 +63,7 @@ spec:
     fsGroup: 50000
   serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
   volumes:
-    - name: dags
+    - name: airflow-dags
       persistentVolumeClaim:
         claimName: RELEASE-NAME-dags
     - emptyDir: {}