You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/17 21:38:51 UTC
[airflow] branch v2-2-test updated (c03b086 -> 56d82fc)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.
discard c03b086 Add note about Variable precedence with env vars (#21568)
discard 2aec9d8 Reorder migrations to include bugfix in 2.2.4 (#21598)
discard 045f6ce Fix slow DAG deletion due to missing ``dag_id`` index for job table (#20282)
discard a520845 update tutorial_etl_dag notes (#21503)
discard 94c6134 Simplify trigger cancel button (#21591)
discard a0ea47e Add a session backend to store session data in the database (#21478)
discard 57abbac Show task status only for running dags or only for the last finished dag (#21352)
discard f27357f Use compat data interval shim in log handlers (#21289)
discard 7c93cf7 Fix postgres hook import pipeline tutorial (#21491)
discard 7aae75c Fix mismatch in generated run_id and logical date of DAG run (#18707)
discard b77fb10 Fix TriggerDagRunOperator extra link (#19410)
discard fc6b0b3 Add possibility to create user in the Remote User mode (#19963)
discard b1e3572 Avoid deadlock when rescheduling task (#21362)
discard 2ebda8e Fix docs link for smart sensor deprecation (#21394)
discard 891f51d Update example DAGs (#21372)
discard c904256 Update error docs to include before_send option (#21275)
discard 9e806aa Fix the incorrect scheduling time for the first run of dag (#21011)
discard 160d879 Update stat_name_handler documentation (#21298)
discard 00cee8a Docs: Fix task order in overview example (#21282)
discard b36d298 Update recipe for Google Cloud SDK (#21268)
discard e304067 Augment xcom docs (#20755)
discard 70050c1 Update version to 2.2.4 for things in that release (#21196)
discard 91d17f8 fןס Broken link in api.rst (#21165)
discard 5c32faf Limit SQLAlchemy to < 1.4.0 for 2.2.* line (#21235)
discard cd97ecc Fix Scheduler crash when executing task instances of missing DAG (#20349)
discard 4852b81 Actually fix tuple and bool checks for black 22.1.0 (#21221)
omit 30b0d98 bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)
omit 762abfb Removed duplicated dag_run join in Dag.get_task_instances() (#20591)
omit 0740c08 Avoid unintentional data loss when deleting DAGs (#20758)
omit c0fbf27 Deprecate some functions in the experimental API (#19931)
omit 928e095 Fix session usage in ``/rendered-k8s`` view (#21006)
omit d572161 Helper for provide_session-decorated functions (#20104)
omit 0cc934c Type-annotate SkipMixin and BaseXCom (#20011)
omit 5fa0e13 Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)
omit c347d80 Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)
omit f02ae31 Update `version_added` for `[email] from_email` (#21138)
omit 4633cf3 Improved instructions for custom image build with docker compose (#21052)
omit 243f44d Add back legacy .piprc customization for pip (#21124)
omit abf8c03 Update logging-tasks.rst (#21088)
omit 8eeb3ea name mismatch (#21055)
omit 84ef8d5 Update v1.yaml (#21024)
omit ba17fa3 Return to the same place when triggering a DAG (#20955)
new 31c66eb Update v1.yaml (#21024)
new a670f8c name mismatch (#21055)
new 9f6d6b9 Update logging-tasks.rst (#21088)
new 680c011 Add back legacy .piprc customization for pip (#21124)
new 5b51c41 Improved instructions for custom image build with docker compose (#21052)
new 9f7d292 Update `version_added` for `[email] from_email` (#21138)
new 07102e9 Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)
new dda8f43 Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)
new 016929f Type-annotate SkipMixin and BaseXCom (#20011)
new dda864d Helper for provide_session-decorated functions (#20104)
new daebc58 Fix session usage in ``/rendered-k8s`` view (#21006)
new 663bb54 Deprecate some functions in the experimental API (#19931)
new 4dc8b90 Avoid unintentional data loss when deleting DAGs (#20758)
new 6d8342e Removed duplicated dag_run join in Dag.get_task_instances() (#20591)
new 55a4abb bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)
new 0ba033d Actually fix tuple and bool checks for black 22.1.0 (#21221)
new 1b139a7 Fix Scheduler crash when executing task instances of missing DAG (#20349)
new 4ff0ab1 Limit SQLAlchemy to < 1.4.0 for 2.2.* line (#21235)
new ede6d8f fןס Broken link in api.rst (#21165)
new 2066812 Update version to 2.2.4 for things in that release (#21196)
new 8890087 Augment xcom docs (#20755)
new 4b3fa3a Update recipe for Google Cloud SDK (#21268)
new a519e53 Docs: Fix task order in overview example (#21282)
new 015c481 Update stat_name_handler documentation (#21298)
new 64e0c50 Fix the incorrect scheduling time for the first run of dag (#21011)
new 270516c Update error docs to include before_send option (#21275)
new 5c078cd Update example DAGs (#21372)
new f41ea34 Fix docs link for smart sensor deprecation (#21394)
new f2fe0df Avoid deadlock when rescheduling task (#21362)
new 9b03071 Add possibility to create user in the Remote User mode (#19963)
new 95eaef3 Fix TriggerDagRunOperator extra link (#19410)
new 1c23405 Fix mismatch in generated run_id and logical date of DAG run (#18707)
new efc2818 Fix postgres hook import pipeline tutorial (#21491)
new 79e9954 Use compat data interval shim in log handlers (#21289)
new f25a58e Show task status only for running dags or only for the last finished dag (#21352)
new 1c2909f Add a session backend to store session data in the database (#21478)
new 628aa1f Simplify trigger cancel button (#21591)
new dd0a3a3 update tutorial_etl_dag notes (#21503)
new 436f452 Fix slow DAG deletion due to missing ``dag_id`` index for job table (#20282)
new 1cbad37 Reorder migrations to include bugfix in 2.2.4 (#21598)
new 7e80127 Add note about Variable precedence with env vars (#21568)
new 8cbf934 Adding missing login provider related methods from Flask-Appbuilder (#21294)
new 56d82fc added explaining concept of logical date in DAG run docs (#21433)
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (c03b086)
\
N -- N -- N refs/heads/v2-2-test (56d82fc)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 43 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
airflow/www/fab_security/manager.py | 15 +++++++++++++++
airflow/www/templates/airflow/dag.html | 4 ++--
docs/apache-airflow/concepts/dags.rst | 16 ++++++++++++++++
docs/apache-airflow/dag-run.rst | 2 ++
docs/apache-airflow/faq.rst | 7 +++++++
setup.cfg | 7 ++++++-
tests/www/views/test_views_tasks.py | 24 ++++--------------------
7 files changed, 52 insertions(+), 23 deletions(-)
[airflow] 18/43: Limit SQLAlchemy to < 1.4.0 for 2.2.* line (#21235)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4ff0ab16868d7e7b765a4ab1d285088cd1f162fe
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Jan 31 19:23:58 2022 +0100
Limit SQLAlchemy to < 1.4.0 for 2.2.* line (#21235)
The recent release of FAB 3.4.4 has unblocked us from upgrading
SQLAlchemy to 1.4.* version. We wanted to do it for quite some
time however upgrading to 1.4.* of sqlalchemy and allowing our
users to use it for 2.2.4 is a bit risky.
We are fixing resulting "aftermath" in the main branch and as
of this commit there are two fixes merged and remaining MsSQL
problem. The MSSql problem does not affect 2.2.4 as MsSQL will
be available only starting from 2.3.0, however the two other
problems have shown that SQLAlchemy has a potential to break
things and we might want to test it more thoroughly before
releasing 2.3.0.
The problems in question are #21205 and #21228. Both were only
test problems but the indicate that there might be more hidden
issues involved.
In order to limit risks, this PR proposes to limit SQLAlchemy
for 2.2.* to < 1.4.0. This will allow to upgrade FAB and
related dependencies without opening up Airflow to upgrade to
SQLAlchemy 1.4 (yet).
---
setup.cfg | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/setup.cfg b/setup.cfg
index c3cce1c..7ab5c77 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -145,7 +145,7 @@ install_requires =
python3-openid~=3.2
rich>=9.2.0
setproctitle>=1.1.8, <2
- sqlalchemy>=1.3.18
+ sqlalchemy>=1.3.18, <1.4.0
sqlalchemy_jsonfield~=1.0
tabulate>=0.7.5, <0.9
tenacity>=6.2.0
[airflow] 06/43: Update `version_added` for `[email] from_email` (#21138)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9f7d292769fd21c06ab0f222d7c75085d8aab288
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Thu Jan 27 09:45:33 2022 -0700
Update `version_added` for `[email] from_email` (#21138)
(cherry picked from commit 362f397d7a3351c718b798a146f2f955a17b7074)
---
airflow/config_templates/config.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index a70854e..6941f03 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1357,7 +1357,7 @@
description: |
Email address that will be used as sender address.
It can either be raw email or the complete address in a format ``Sender Name <se...@email.com>``
- version_added: 2.3.0
+ version_added: 2.2.4
type: string
example: "Airflow <ai...@example.com>"
default: ~
[airflow] 05/43: Improved instructions for custom image build with docker compose (#21052)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5b51c41a39ee9238bc0fba0ace8c5ec27b9e8875
Author: Omer Ginosar <94...@users.noreply.github.com>
AuthorDate: Tue Jan 25 23:51:16 2022 +0200
Improved instructions for custom image build with docker compose (#21052)
* Create build.rst
* Update docs/docker-stack/build.rst
Co-authored-by: Jarek Potiuk <ja...@potiuk.com>
* fix doc build
Co-authored-by: Jarek Potiuk <ja...@potiuk.com>
Co-authored-by: eladkal <45...@users.noreply.github.com>
(cherry picked from commit 17b48e5baf09a86ea6e2036c864a882bb0c328e2)
---
docs/docker-stack/build.rst | 19 +++++++++++++++++--
docs/spelling_wordlist.txt | 1 +
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/docs/docker-stack/build.rst b/docs/docker-stack/build.rst
index b85bf1c..6b5dc47 100644
--- a/docs/docker-stack/build.rst
+++ b/docs/docker-stack/build.rst
@@ -81,8 +81,23 @@ In the simplest case building your image consists of those steps:
4) Once you build the image locally you have usually several options to make them available for your deployment:
-* For ``docker-compose`` deployment, that's all you need. The image is stored in docker engine cache
- and docker compose will use it from there.
+* For ``docker-compose`` deployment, if you've already built your image, and want to continue
+ building the image manually when needed with ``docker build``, you can edit the
+ docker-compose.yaml and replace the "apache/airflow:<version>" image with the
+ image you've just built ``my-image:0.0.1`` - it will be used from your local Docker
+ Engine cache. You can also simply set ``AIRFLOW_IMAGE_NAME`` variable to
+ point to your image and ``docker-compose`` will use it automatically without having
+ to modify the file.
+
+* Also for ``docker-compose`` deployment, you can delegate image building to the docker-compose.
+ To do that - open your ``docker-compose.yaml`` file and search for the phrase "In order to add custom dependencies".
+ Follow these instructions of commenting the "image" line and uncommenting the "build" line.
+ This is a standard docker-compose feature and you can read about it in
+ `Docker Compose build reference <https://docs.docker.com/compose/reference/build/>`_.
+ Run ``docker-compose build`` to build the images. Similarly as in the previous case, the
+ image is stored in Docker engine cache and Docker Compose will use it from there.
+ The ``docker-compose build`` command uses the same ``docker build`` command that
+ you can run manually under-the-hood.
* For some - development targeted - Kubernetes deployments you can load the images directly to
Kubernetes clusters. Clusters such as ``kind`` or ``minikube`` have dedicated ``load`` method to load the
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 64d839f..5d77e29 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1384,6 +1384,7 @@ uid
umask
un
unarchived
+uncommenting
undead
ungenerated
unicode
[airflow] 40/43: Reorder migrations to include bugfix in 2.2.4 (#21598)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1cbad378cb778fca879a522916c11d32d80ac84e
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Feb 15 16:38:56 2022 -0700
Reorder migrations to include bugfix in 2.2.4 (#21598)
(cherry picked from commit 005cef042bc4184c24ad03c1b4ee40cdbaf96cb5)
---
.../versions/587bdf053233_adding_index_for_dag_id_in_job.py | 4 ++--
docs/apache-airflow/migrations-ref.rst | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py b/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py
index 3532fe9..c643a62 100644
--- a/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py
+++ b/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py
@@ -19,7 +19,7 @@
"""adding index for dag_id in job
Revision ID: 587bdf053233
-Revises: f9da662e7089
+Revises: c381b21cb7e4
Create Date: 2021-12-14 10:20:12.482940
"""
@@ -28,7 +28,7 @@ from alembic import op
# revision identifiers, used by Alembic.
revision = '587bdf053233'
-down_revision = 'f9da662e7089'
+down_revision = 'c381b21cb7e4'
branch_labels = None
depends_on = None
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 0eac329..cdaa447 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``587bdf053233`` (head) | ``f9da662e7089`` | ``2.3.0`` | Add index for ``dag_id`` column in ``job`` table. |
+| ``587bdf053233`` (head) | ``c381b21cb7e4`` | ``2.2.4`` | Add index for ``dag_id`` column in ``job`` table. |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``c381b21cb7e4`` | ``be2bfac3da23`` | ``2.2.4`` | Create a ``session`` table to store web session data |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
[airflow] 16/43: Actually fix tuple and bool checks for black 22.1.0 (#21221)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0ba033daead44624847cbf26a5e19962575c94d0
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sun Jan 30 16:59:36 2022 +0100
Actually fix tuple and bool checks for black 22.1.0 (#21221)
Previous two fixes in #21215 and #21216 did not really fix the
problem introduced by Black 22.1.0 (they could not as they were
wrong). This change was actually tested with the new black and
should fix it finally.
(cherry picked from commit f9e20067e0ac593fd18ad068fcc56501c6a99f2b)
---
dev/provider_packages/prepare_provider_packages.py | 23 +++++++++++++++++-----
1 file changed, 18 insertions(+), 5 deletions(-)
diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py
index 7d0e1e5..8d920d6 100755
--- a/dev/provider_packages/prepare_provider_packages.py
+++ b/dev/provider_packages/prepare_provider_packages.py
@@ -1334,9 +1334,14 @@ def get_all_changes_for_package(
)
# Returns 66 in case of doc-only changes
sys.exit(66)
+ if len(changes) > len(changes_since_last_doc_only_check):
+ # if doc-only was released after previous release - use it as starting point
+ # but if before - stay with the releases from last tag.
+ changes = changes_since_last_doc_only_check
except subprocess.CalledProcessError:
# ignore when the commit mentioned as last doc-only change is obsolete
pass
+
console.print(f"[yellow]The provider {provider_package_id} has changes since last release[/]")
console.print()
console.print(
@@ -1697,16 +1702,16 @@ def black_mode():
config = parse_pyproject_toml(os.path.join(SOURCE_DIR_PATH, "pyproject.toml"))
target_versions = set(
- target_version_option_callback(None, None, config.get('target_version', [])),
+ target_version_option_callback(None, None, tuple(config.get('target_version', ()))),
)
return Mode(
target_versions=target_versions,
line_length=config.get('line_length', Mode.line_length),
- is_pyi=config.get('is_pyi', Mode.is_pyi),
- string_normalization=not config.get('skip_string_normalization', not Mode.string_normalization),
- experimental_string_processing=config.get(
- 'experimental_string_processing', Mode.experimental_string_processing
+ is_pyi=bool(config.get('is_pyi', Mode.is_pyi)),
+ string_normalization=not bool(config.get('skip_string_normalization', not Mode.string_normalization)),
+ experimental_string_processing=bool(
+ config.get('experimental_string_processing', Mode.experimental_string_processing)
),
)
@@ -2180,6 +2185,14 @@ KNOWN_DEPRECATED_DIRECT_IMPORTS: Set[str] = {
'This module is deprecated. Please use `airflow.providers.amazon.aws.operators.sagemaker`.',
'This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.sagemaker`.',
'This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.emr`.',
+ 'This module is deprecated. Please use `airflow.providers.opsgenie.hooks.opsgenie`.',
+ 'This module is deprecated. Please use `airflow.providers.opsgenie.operators.opsgenie`.',
+ 'This module is deprecated. Please use `airflow.hooks.redshift_sql` '
+ 'or `airflow.hooks.redshift_cluster` as appropriate.',
+ 'This module is deprecated. Please use `airflow.providers.amazon.aws.operators.redshift_sql` or '
+ '`airflow.providers.amazon.aws.operators.redshift_cluster` as appropriate.',
+ 'This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.redshift_cluster`.',
+ "This module is deprecated. Please use airflow.providers.amazon.aws.transfers.sql_to_s3`.",
}
[airflow] 28/43: Fix docs link for smart sensor deprecation (#21394)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f41ea340769ac8d7d0eec10e4de68e446abac2e4
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Feb 7 08:55:20 2022 -0700
Fix docs link for smart sensor deprecation (#21394)
We are releasing the deprecation in version 2.2.4, not 2.3.0 like
originally planned.
(cherry picked from commit 3a780380d8f5d50ffc876c326e70ee0eee033c0d)
---
UPDATING.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/UPDATING.md b/UPDATING.md
index a75b2d6..2ed4aac 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -87,7 +87,7 @@ https://developers.google.com/style/inclusive-documentation
Smart sensors, an "early access" feature added in Airflow 2, are now deprecated and will be removed in Airflow 2.4.0. They have been superseded by Deferable Operators, added in Airflow 2.2.0.
-See [Migrating to Deferrable Operators](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/smart-sensors.html#migrating-to-deferrable-operators) for details on how to migrate.
+See [Migrating to Deferrable Operators](https://airflow.apache.org/docs/apache-airflow/2.2.4/concepts/smart-sensors.html#migrating-to-deferrable-operators) for details on how to migrate.
## Airflow 2.2.3
[airflow] 30/43: Add possibility to create user in the Remote User mode (#19963)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9b03071333cf8e40f5ee9b8aa030656df59eb83c
Author: Łukasz Wyszomirski <wy...@google.com>
AuthorDate: Fri Jan 28 06:18:05 2022 +0100
Add possibility to create user in the Remote User mode (#19963)
(cherry picked from commit cdd9ea66208e3d70d1cf2a34530ba69bc3c58a50)
---
airflow/www/views.py | 25 +++++++++++++++++++++++++
1 file changed, 25 insertions(+)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f2642a7..2ed2a67 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -4731,3 +4731,28 @@ class CustomUserOIDModelView(MultiResourceUserMixin, UserOIDModelView):
class CustomUserRemoteUserModelView(MultiResourceUserMixin, UserRemoteUserModelView):
"""Customize permission names for FAB's builtin UserRemoteUserModelView."""
+
+ _class_permission_name = permissions.RESOURCE_USER
+
+ class_permission_name_mapping = {
+ 'userinfoedit': permissions.RESOURCE_MY_PROFILE,
+ 'userinfo': permissions.RESOURCE_MY_PROFILE,
+ }
+
+ method_permission_name = {
+ 'add': 'create',
+ 'userinfo': 'read',
+ 'download': 'read',
+ 'show': 'read',
+ 'list': 'read',
+ 'edit': 'edit',
+ 'userinfoedit': 'edit',
+ 'delete': 'delete',
+ }
+
+ base_permissions = [
+ permissions.ACTION_CAN_CREATE,
+ permissions.ACTION_CAN_READ,
+ permissions.ACTION_CAN_EDIT,
+ permissions.ACTION_CAN_DELETE,
+ ]
[airflow] 38/43: update tutorial_etl_dag notes (#21503)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit dd0a3a3d768b8cb2118c0b8d89ed0af0b393d865
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Fri Feb 11 10:17:18 2022 +0200
update tutorial_etl_dag notes (#21503)
* update tutorial_etl_dag notes
(cherry picked from commit a42607a4b75586a396d6a56145ed048d127dd344)
---
airflow/example_dags/tutorial_etl_dag.py | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_etl_dag.py
index 8dd0ea4..dd18449 100644
--- a/airflow/example_dags/tutorial_etl_dag.py
+++ b/airflow/example_dags/tutorial_etl_dag.py
@@ -19,9 +19,7 @@
"""
### ETL DAG Tutorial Documentation
-This ETL DAG is compatible with Airflow 1.10.x (specifically tested with 1.10.12) and is referenced
-as part of the documentation that goes along with the Airflow Functional DAG tutorial located
-[here](https://airflow.apache.org/tutorial_decorated_flows.html)
+This ETL DAG is demonstrating an Extract -> Transform -> Load pipeline
"""
# [START tutorial]
# [START import_module]
[airflow] 10/43: Helper for provide_session-decorated functions (#20104)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit dda864d585431c1c46c2705c40ed27ab9c43be72
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Dec 7 21:50:34 2021 +0800
Helper for provide_session-decorated functions (#20104)
* Helper for provide_session-decorated functions
* Apply NEW_SESSION trick on XCom
(cherry picked from commit a80ac1eecc0ea187de7984510b4ef6f981b97196)
---
airflow/models/xcom.py | 24 ++++++++++++------------
airflow/settings.py | 10 ++++++----
airflow/utils/session.py | 11 +++++++++--
3 files changed, 27 insertions(+), 18 deletions(-)
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 4bb9689..5efaa0a 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -32,7 +32,7 @@ from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.utils import timezone
from airflow.utils.helpers import is_container
from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import provide_session
+from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime
log = logging.getLogger(__name__)
@@ -90,7 +90,7 @@ class BaseXCom(Base, LoggingMixin):
dag_id: str,
task_id: str,
run_id: str,
- session: Optional[Session] = None,
+ session: Session = NEW_SESSION,
) -> None:
"""Store an XCom value.
@@ -116,7 +116,7 @@ class BaseXCom(Base, LoggingMixin):
task_id: str,
dag_id: str,
execution_date: datetime.datetime,
- session: Optional[Session] = None,
+ session: Session = NEW_SESSION,
) -> None:
""":sphinx-autoapi-skip:"""
@@ -129,7 +129,7 @@ class BaseXCom(Base, LoggingMixin):
task_id: str,
dag_id: str,
execution_date: Optional[datetime.datetime] = None,
- session: Session = None,
+ session: Session = NEW_SESSION,
*,
run_id: Optional[str] = None,
) -> None:
@@ -170,7 +170,7 @@ class BaseXCom(Base, LoggingMixin):
task_id: Optional[str] = None,
dag_id: Optional[str] = None,
include_prior_dates: bool = False,
- session: Optional[Session] = None,
+ session: Session = NEW_SESSION,
) -> Optional[Any]:
"""Retrieve an XCom value, optionally meeting certain criteria.
@@ -207,7 +207,7 @@ class BaseXCom(Base, LoggingMixin):
task_id: Optional[str] = None,
dag_id: Optional[str] = None,
include_prior_dates: bool = False,
- session: Optional[Session] = None,
+ session: Session = NEW_SESSION,
) -> Optional[Any]:
""":sphinx-autoapi-skip:"""
@@ -220,7 +220,7 @@ class BaseXCom(Base, LoggingMixin):
task_id: Optional[Union[str, Iterable[str]]] = None,
dag_id: Optional[Union[str, Iterable[str]]] = None,
include_prior_dates: bool = False,
- session: Session = None,
+ session: Session = NEW_SESSION,
*,
run_id: Optional[str] = None,
) -> Optional[Any]:
@@ -265,7 +265,7 @@ class BaseXCom(Base, LoggingMixin):
dag_ids: Union[str, Iterable[str], None] = None,
include_prior_dates: bool = False,
limit: Optional[int] = None,
- session: Optional[Session] = None,
+ session: Session = NEW_SESSION,
) -> Query:
"""Composes a query to get one or more XCom entries.
@@ -300,7 +300,7 @@ class BaseXCom(Base, LoggingMixin):
dag_ids: Union[str, Iterable[str], None] = None,
include_prior_dates: bool = False,
limit: Optional[int] = None,
- session: Optional[Session] = None,
+ session: Session = NEW_SESSION,
) -> Query:
""":sphinx-autoapi-skip:"""
@@ -314,7 +314,7 @@ class BaseXCom(Base, LoggingMixin):
dag_ids: Optional[Union[str, Iterable[str]]] = None,
include_prior_dates: bool = False,
limit: Optional[int] = None,
- session: Session = None,
+ session: Session = NEW_SESSION,
*,
run_id: Optional[str] = None,
) -> Query:
@@ -397,7 +397,7 @@ class BaseXCom(Base, LoggingMixin):
execution_date: pendulum.DateTime,
dag_id: str,
task_id: str,
- session: Optional[Session] = None,
+ session: Session = NEW_SESSION,
) -> None:
""":sphinx-autoapi-skip:"""
@@ -409,7 +409,7 @@ class BaseXCom(Base, LoggingMixin):
dag_id: Optional[str] = None,
task_id: Optional[str] = None,
run_id: Optional[str] = None,
- session: Session = None,
+ session: Session = NEW_SESSION,
) -> None:
""":sphinx-autoapi-skip:"""
# Given the historic order of this function (execution_date was first argument) to add a new optional
diff --git a/airflow/settings.py b/airflow/settings.py
index f9b97a2..139d6a4 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -22,7 +22,7 @@ import logging
import os
import sys
import warnings
-from typing import Optional
+from typing import TYPE_CHECKING, Callable, List, Optional
import pendulum
import sqlalchemy
@@ -37,6 +37,9 @@ from airflow.executors import executor_constants
from airflow.logging_config import configure_logging
from airflow.utils.orm_event_handlers import setup_event_handlers
+if TYPE_CHECKING:
+ from airflow.www.utils import UIAlert
+
log = logging.getLogger(__name__)
@@ -77,7 +80,7 @@ DONOT_MODIFY_HANDLERS: Optional[bool] = None
DAGS_FOLDER: str = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
engine: Optional[Engine] = None
-Session: Optional[SASession] = None
+Session: Callable[..., SASession]
# The JSON library to use for DAG Serialization and De-Serialization
json = json
@@ -563,8 +566,7 @@ MASK_SECRETS_IN_LOGS = False
# UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True),
# ]
#
-# DASHBOARD_UIALERTS: List["UIAlert"]
-DASHBOARD_UIALERTS = []
+DASHBOARD_UIALERTS: List["UIAlert"] = []
# Prefix used to identify tables holding data moved during migration.
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
diff --git a/airflow/utils/session.py b/airflow/utils/session.py
index 9636fc4..f0c3168 100644
--- a/airflow/utils/session.py
+++ b/airflow/utils/session.py
@@ -18,7 +18,7 @@
import contextlib
from functools import wraps
from inspect import signature
-from typing import Callable, Iterator, TypeVar
+from typing import Callable, Iterator, TypeVar, cast
from airflow import settings
@@ -26,7 +26,7 @@ from airflow import settings
@contextlib.contextmanager
def create_session() -> Iterator[settings.SASession]:
"""Contextmanager that will create and teardown a session."""
- session: settings.SASession = settings.Session()
+ session = settings.Session()
try:
yield session
session.commit()
@@ -105,3 +105,10 @@ def create_global_lock(session=None, pg_lock_id=1, lock_name='init', mysql_lock_
if dialect.name == 'mssql':
# TODO: make locking works for MSSQL
pass
+
+
+# A fake session to use in functions decorated by provide_session. This allows
+# the 'session' argument to be of type Session instead of Optional[Session],
+# making it easier to type hint the function body without dealing with the None
+# case that can never happen at runtime.
+NEW_SESSION: settings.SASession = cast(settings.SASession, None)
[airflow] 37/43: Simplify trigger cancel button (#21591)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 628aa1f99c865d97d0b1c7c76e630e43a7b8d319
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Feb 15 11:00:26 2022 -0700
Simplify trigger cancel button (#21591)
Co-authored-by: Jed Cunningham <je...@apache.org>
(cherry picked from commit 65297673a318660fba76797e50d0c06804dfcafc)
---
airflow/www/templates/airflow/trigger.html | 2 +-
tests/www/views/test_views_trigger_dag.py | 11 +++++------
2 files changed, 6 insertions(+), 7 deletions(-)
diff --git a/airflow/www/templates/airflow/trigger.html b/airflow/www/templates/airflow/trigger.html
index efc1650..2388d4e 100644
--- a/airflow/www/templates/airflow/trigger.html
+++ b/airflow/www/templates/airflow/trigger.html
@@ -63,7 +63,7 @@
</label>
</div>
<button type="submit" class="btn btn-primary">Trigger</button>
- <button type="button" class="btn" onclick="location.href = '{{ origin }}'; return false">Cancel</button>
+ <a class="btn" href="{{ origin }}">Cancel</a>
</form>
{% endblock %}
diff --git a/tests/www/views/test_views_trigger_dag.py b/tests/www/views/test_views_trigger_dag.py
index f261438..2b43468 100644
--- a/tests/www/views/test_views_trigger_dag.py
+++ b/tests/www/views/test_views_trigger_dag.py
@@ -134,6 +134,10 @@ def test_trigger_dag_form(admin_client):
("http://google.com", "/home"),
("36539'%3balert(1)%2f%2f166", "/home"),
(
+ '"><script>alert(99)</script><a href="',
+ ""><script>alert(99)</script><a href="",
+ ),
+ (
"%2Ftree%3Fdag_id%3Dexample_bash_operator';alert(33)//",
"/home",
),
@@ -145,12 +149,7 @@ def test_trigger_dag_form_origin_url(admin_client, test_origin, expected_origin)
test_dag_id = "example_bash_operator"
resp = admin_client.get(f'trigger?dag_id={test_dag_id}&origin={test_origin}')
- check_content_in_response(
- '<button type="button" class="btn" onclick="location.href = \'{}\'; return false">'.format(
- expected_origin
- ),
- resp,
- )
+ check_content_in_response(f'<a class="btn" href="{expected_origin}">Cancel</a>', resp)
@pytest.mark.parametrize(
[airflow] 23/43: Docs: Fix task order in overview example (#21282)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a519e53ddee5675402c091c1e99fe1643a968e87
Author: Lucia Kasman <38...@users.noreply.github.com>
AuthorDate: Thu Feb 3 15:10:27 2022 -0300
Docs: Fix task order in overview example (#21282)
(cherry picked from commit 1ba83c01b2b466ad5a76a453e5f6ee2884081e53)
---
docs/apache-airflow/concepts/overview.rst | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/docs/apache-airflow/concepts/overview.rst b/docs/apache-airflow/concepts/overview.rst
index fd862ea..567c3c8 100644
--- a/docs/apache-airflow/concepts/overview.rst
+++ b/docs/apache-airflow/concepts/overview.rst
@@ -65,12 +65,12 @@ Control Flow
:doc:`tasks` have dependencies declared on each other. You'll see this in a DAG either using the ``>>`` and ``<<`` operators::
first_task >> [second_task, third_task]
- third_task << fourth_task
+ fourth_task << third_task
Or, with the ``set_upstream`` and ``set_downstream`` methods::
first_task.set_downstream([second_task, third_task])
- third_task.set_upstream(fourth_task)
+ fourth_task.set_upstream(third_task)
These dependencies are what make up the "edges" of the graph, and how Airflow works out which order to run your tasks in. By default, a task will wait for all of its upstream tasks to succeed before it runs, but this can be customized using features like :ref:`Branching <concepts:branching>`, :ref:`LatestOnly <concepts:latest-only>`, and :ref:`Trigger Rules <concepts:trigger-rules>`.
[airflow] 09/43: Type-annotate SkipMixin and BaseXCom (#20011)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 016929f782022791557d0ec1f316b338dc210566
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Dec 7 17:55:00 2021 +0800
Type-annotate SkipMixin and BaseXCom (#20011)
(cherry picked from commit 6dd0a0df7e6a2f025e9234bdbf97b41e9b8f6257)
---
airflow/models/skipmixin.py | 15 +-
airflow/models/xcom.py | 335 ++++++++++++++++++++++++++++++--------------
2 files changed, 232 insertions(+), 118 deletions(-)
diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py
index 5cd50a3..765a947 100644
--- a/airflow/models/skipmixin.py
+++ b/airflow/models/skipmixin.py
@@ -17,7 +17,7 @@
# under the License.
import warnings
-from typing import TYPE_CHECKING, Iterable, Union
+from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Union
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
@@ -26,6 +26,7 @@ from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State
if TYPE_CHECKING:
+ from pendulum import DateTime
from sqlalchemy import Session
from airflow.models import DagRun
@@ -66,9 +67,9 @@ class SkipMixin(LoggingMixin):
def skip(
self,
dag_run: "DagRun",
- execution_date: "timezone.DateTime",
- tasks: "Iterable[BaseOperator]",
- session: "Session" = None,
+ execution_date: "DateTime",
+ tasks: Sequence["BaseOperator"],
+ session: "Session",
):
"""
Sets tasks instances to skipped from the same dag run.
@@ -114,11 +115,7 @@ class SkipMixin(LoggingMixin):
session.commit()
# SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available.
- try:
- task_id = self.task_id
- except AttributeError:
- task_id = None
-
+ task_id: Optional[str] = getattr(self, "task_id", None)
if task_id is not None:
from airflow.models.xcom import XCom
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 99c2b9a..4bb9689 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -16,10 +16,11 @@
# specific language governing permissions and limitations
# under the License.
+import datetime
import json
import logging
import pickle
-from typing import Any, Iterable, Optional, Union
+from typing import TYPE_CHECKING, Any, Iterable, Optional, Type, Union, cast, overload
import pendulum
from sqlalchemy import Column, LargeBinary, String
@@ -79,14 +80,60 @@ class BaseXCom(Base, LoggingMixin):
def __repr__(self):
return f'<XCom "{self.key}" ({self.task_id} @ {self.execution_date})>'
+ @overload
@classmethod
- @provide_session
- def set(cls, key, value, task_id, dag_id, execution_date=None, run_id=None, session=None):
+ def set(
+ cls,
+ key: str,
+ value: Any,
+ *,
+ dag_id: str,
+ task_id: str,
+ run_id: str,
+ session: Optional[Session] = None,
+ ) -> None:
+ """Store an XCom value.
+
+ A deprecated form of this function accepts ``execution_date`` instead of
+ ``run_id``. The two arguments are mutually exclusive.
+
+ :param key: Key to store the XCom.
+ :param value: XCom value to store.
+ :param dag_id: DAG ID.
+ :param task_id: Task ID.
+ :param run_id: DAG run ID for the task.
+ :param session: Database session. If not given, a new session will be
+ created for this function.
+ :type session: sqlalchemy.orm.session.Session
"""
- Store an XCom value.
- :return: None
- """
+ @overload
+ @classmethod
+ def set(
+ cls,
+ key: str,
+ value: Any,
+ task_id: str,
+ dag_id: str,
+ execution_date: datetime.datetime,
+ session: Optional[Session] = None,
+ ) -> None:
+ """:sphinx-autoapi-skip:"""
+
+ @classmethod
+ @provide_session
+ def set(
+ cls,
+ key: str,
+ value: Any,
+ task_id: str,
+ dag_id: str,
+ execution_date: Optional[datetime.datetime] = None,
+ session: Session = None,
+ *,
+ run_id: Optional[str] = None,
+ ) -> None:
+ """:sphinx-autoapi-skip:"""
if not (execution_date is None) ^ (run_id is None):
raise ValueError("Exactly one of execution_date or run_id must be passed")
@@ -94,70 +141,95 @@ class BaseXCom(Base, LoggingMixin):
from airflow.models.dagrun import DagRun
dag_run = session.query(DagRun).filter_by(dag_id=dag_id, run_id=run_id).one()
-
execution_date = dag_run.execution_date
- value = XCom.serialize_value(value)
-
- # remove any duplicate XComs
+ # Remove duplicate XComs and insert a new one.
session.query(cls).filter(
- cls.key == key, cls.execution_date == execution_date, cls.task_id == task_id, cls.dag_id == dag_id
+ cls.key == key,
+ cls.execution_date == execution_date,
+ cls.task_id == task_id,
+ cls.dag_id == dag_id,
).delete()
-
+ new = cast(Any, cls)( # Work around Mypy complaining model not defining '__init__'.
+ key=key,
+ value=cls.serialize_value(value),
+ execution_date=execution_date,
+ task_id=task_id,
+ dag_id=dag_id,
+ )
+ session.add(new)
session.flush()
- # insert new XCom
- session.add(XCom(key=key, value=value, execution_date=execution_date, task_id=task_id, dag_id=dag_id))
+ @overload
+ @classmethod
+ def get_one(
+ cls,
+ *,
+ run_id: str,
+ key: Optional[str] = None,
+ task_id: Optional[str] = None,
+ dag_id: Optional[str] = None,
+ include_prior_dates: bool = False,
+ session: Optional[Session] = None,
+ ) -> Optional[Any]:
+ """Retrieve an XCom value, optionally meeting certain criteria.
+
+ This method returns "full" XCom values (i.e. uses ``deserialize_value``
+ from the XCom backend). Use :meth:`get_many` if you want the "shortened"
+ value via ``orm_deserialize_value``.
+
+ If there are no results, *None* is returned.
+
+ A deprecated form of this function accepts ``execution_date`` instead of
+ ``run_id``. The two arguments are mutually exclusive.
+
+ :param run_id: DAG run ID for the task.
+ :param key: A key for the XCom. If provided, only XCom with matching
+ keys will be returned. Pass *None* (default) to remove the filter.
+ :param task_id: Only XCom from task with matching ID will be pulled.
+ Pass *None* (default) to remove the filter.
+ :param dag_id: Only pull XCom from this DAG. If *None* (default), the
+ DAG of the calling task is used.
+ :param include_prior_dates: If *False* (default), only XCom from the
+ specified DAG run is returned. If *True*, the latest matching XCom is
+ returned regardless of the run it belongs to.
+ :param session: Database session. If not given, a new session will be
+ created for this function.
+ :type session: sqlalchemy.orm.session.Session
+ """
- session.flush()
+ @overload
+ @classmethod
+ def get_one(
+ cls,
+ execution_date: pendulum.DateTime,
+ key: Optional[str] = None,
+ task_id: Optional[str] = None,
+ dag_id: Optional[str] = None,
+ include_prior_dates: bool = False,
+ session: Optional[Session] = None,
+ ) -> Optional[Any]:
+ """:sphinx-autoapi-skip:"""
@classmethod
@provide_session
def get_one(
cls,
execution_date: Optional[pendulum.DateTime] = None,
- run_id: Optional[str] = None,
key: Optional[str] = None,
task_id: Optional[Union[str, Iterable[str]]] = None,
dag_id: Optional[Union[str, Iterable[str]]] = None,
include_prior_dates: bool = False,
session: Session = None,
+ *,
+ run_id: Optional[str] = None,
) -> Optional[Any]:
- """
- Retrieve an XCom value, optionally meeting certain criteria. Returns None
- of there are no results.
-
- ``run_id`` and ``execution_date`` are mutually exclusive.
-
- This method returns "full" XCom values (i.e. it uses ``deserialize_value`` from the XCom backend).
- Please use :meth:`get_many` if you want the "shortened" value via ``orm_deserialize_value``
-
- :param execution_date: Execution date for the task
- :type execution_date: pendulum.datetime
- :param run_id: Dag run id for the task
- :type run_id: str
- :param key: A key for the XCom. If provided, only XComs with matching
- keys will be returned. To remove the filter, pass key=None.
- :type key: str
- :param task_id: Only XComs from task with matching id will be
- pulled. Can pass None to remove the filter.
- :type task_id: str
- :param dag_id: If provided, only pulls XCom from this DAG.
- If None (default), the DAG of the calling task is used.
- :type dag_id: str
- :param include_prior_dates: If False, only XCom from the current
- execution_date are returned. If True, XCom from previous dates
- are returned as well.
- :type include_prior_dates: bool
- :param session: database session
- :type session: sqlalchemy.orm.session.Session
- """
+ """:sphinx-autoapi-skip:"""
if not (execution_date is None) ^ (run_id is None):
raise ValueError("Exactly one of execution_date or run_id must be passed")
- result = (
- cls.get_many(
- execution_date=execution_date,
+ if run_id is not None:
+ query = cls.get_many(
run_id=run_id,
key=key,
task_ids=task_id,
@@ -165,58 +237,88 @@ class BaseXCom(Base, LoggingMixin):
include_prior_dates=include_prior_dates,
session=session,
)
- .with_entities(cls.value)
- .first()
- )
+ elif execution_date is not None:
+ query = cls.get_many(
+ execution_date=execution_date,
+ key=key,
+ task_ids=task_id,
+ dag_ids=dag_id,
+ include_prior_dates=include_prior_dates,
+ session=session,
+ )
+ else:
+ raise RuntimeError("Should not happen?")
+
+ result = query.with_entities(cls.value).first()
if result:
return cls.deserialize_value(result)
return None
+ @overload
+ @classmethod
+ def get_many(
+ cls,
+ *,
+ run_id: str,
+ key: Optional[str] = None,
+ task_ids: Union[str, Iterable[str], None] = None,
+ dag_ids: Union[str, Iterable[str], None] = None,
+ include_prior_dates: bool = False,
+ limit: Optional[int] = None,
+ session: Optional[Session] = None,
+ ) -> Query:
+ """Composes a query to get one or more XCom entries.
+
+ This function returns an SQLAlchemy query of full XCom objects. If you
+ just want one stored value, use :meth:`get_one` instead.
+
+ A deprecated form of this function accepts ``execution_date`` instead of
+ ``run_id``. The two arguments are mutually exclusive.
+
+ :param run_id: DAG run ID for the task.
+ :param key: A key for the XComs. If provided, only XComs with matching
+ keys will be returned. Pass *None* (default) to remove the filter.
+ :param task_ids: Only XComs from task with matching IDs will be pulled.
+ Pass *None* (default) to remove the filter.
+ :param dag_id: Only pulls XComs from this DAG. If *None* (default), the
+ DAG of the calling task is used.
+ :param include_prior_dates: If *False* (default), only XComs from the
+ specified DAG run are returned. If *True*, all matching XComs are
+ returned regardless of the run it belongs to.
+ :param session: Database session. If not given, a new session will be
+ created for this function.
+ :type session: sqlalchemy.orm.session.Session
+ """
+
+ @overload
+ @classmethod
+ def get_many(
+ cls,
+ execution_date: pendulum.DateTime,
+ key: Optional[str] = None,
+ task_ids: Union[str, Iterable[str], None] = None,
+ dag_ids: Union[str, Iterable[str], None] = None,
+ include_prior_dates: bool = False,
+ limit: Optional[int] = None,
+ session: Optional[Session] = None,
+ ) -> Query:
+ """:sphinx-autoapi-skip:"""
+
@classmethod
@provide_session
def get_many(
cls,
execution_date: Optional[pendulum.DateTime] = None,
- run_id: Optional[str] = None,
key: Optional[str] = None,
task_ids: Optional[Union[str, Iterable[str]]] = None,
dag_ids: Optional[Union[str, Iterable[str]]] = None,
include_prior_dates: bool = False,
limit: Optional[int] = None,
session: Session = None,
+ *,
+ run_id: Optional[str] = None,
) -> Query:
- """
- Composes a query to get one or more values from the xcom table.
-
- ``run_id`` and ``execution_date`` are mutually exclusive.
-
- This function returns an SQLAlchemy query of full XCom objects. If you just want one stored value then
- use :meth:`get_one`.
-
- :param execution_date: Execution date for the task
- :type execution_date: pendulum.datetime
- :param run_id: Dag run id for the task
- :type run_id: str
- :param key: A key for the XCom. If provided, only XComs with matching
- keys will be returned. To remove the filter, pass key=None.
- :type key: str
- :param task_ids: Only XComs from tasks with matching ids will be
- pulled. Can pass None to remove the filter.
- :type task_ids: str or iterable of strings (representing task_ids)
- :param dag_ids: If provided, only pulls XComs from this DAG.
- If None (default), the DAG of the calling task is used.
- :type dag_ids: str
- :param include_prior_dates: If False, only XComs from the current
- execution_date are returned. If True, XComs from previous dates
- are returned as well.
- :type include_prior_dates: bool
- :param limit: If required, limit the number of returned objects.
- XCom objects can be quite big and you might want to limit the
- number of rows.
- :type limit: int
- :param session: database session
- :type session: sqlalchemy.orm.session.Session
- """
+ """:sphinx-autoapi-skip:"""
if not (execution_date is None) ^ (run_id is None):
raise ValueError("Exactly one of execution_date or run_id must be passed")
@@ -262,8 +364,8 @@ class BaseXCom(Base, LoggingMixin):
@classmethod
@provide_session
- def delete(cls, xcoms, session=None):
- """Delete Xcom"""
+ def delete(cls, xcoms: Union["XCom", Iterable["XCom"]], session: Session) -> None:
+ """Delete one or multiple XCom entries."""
if isinstance(xcoms, XCom):
xcoms = [xcoms]
for xcom in xcoms:
@@ -272,37 +374,49 @@ class BaseXCom(Base, LoggingMixin):
session.delete(xcom)
session.commit()
+ @overload
+ @classmethod
+ def clear(cls, *, dag_id: str, task_id: str, run_id: str, session: Optional[Session] = None) -> None:
+ """Clear all XCom data from the database for the given task instance.
+
+ A deprecated form of this function accepts ``execution_date`` instead of
+ ``run_id``. The two arguments are mutually exclusive.
+
+ :param dag_id: ID of DAG to clear the XCom for.
+ :param task_id: ID of task to clear the XCom for.
+ :param run_id: ID of DAG run to clear the XCom for.
+ :param session: Database session. If not given, a new session will be
+ created for this function.
+ :type session: sqlalchemy.orm.session.Session
+ """
+
+ @overload
+ @classmethod
+ def clear(
+ cls,
+ execution_date: pendulum.DateTime,
+ dag_id: str,
+ task_id: str,
+ session: Optional[Session] = None,
+ ) -> None:
+ """:sphinx-autoapi-skip:"""
+
@classmethod
@provide_session
def clear(
cls,
execution_date: Optional[pendulum.DateTime] = None,
- dag_id: str = None,
- task_id: str = None,
- run_id: str = None,
+ dag_id: Optional[str] = None,
+ task_id: Optional[str] = None,
+ run_id: Optional[str] = None,
session: Session = None,
) -> None:
- """
- Clears all XCom data from the database for the task instance
-
- ``run_id`` and ``execution_date`` are mutually exclusive.
-
- :param execution_date: Execution date for the task
- :type execution_date: pendulum.datetime or None
- :param dag_id: ID of DAG to clear the XCom for.
- :type dag_id: str
- :param task_id: Only XComs from task with matching id will be cleared.
- :type task_id: str
- :param run_id: Dag run id for the task
- :type run_id: str or None
- :param session: database session
- :type session: sqlalchemy.orm.session.Session
- """
+ """:sphinx-autoapi-skip:"""
# Given the historic order of this function (execution_date was first argument) to add a new optional
# param we need to add default values for everything :(
- if not dag_id:
+ if dag_id is None:
raise TypeError("clear() missing required argument: dag_id")
- if not task_id:
+ if task_id is None:
raise TypeError("clear() missing required argument: task_id")
if not (execution_date is None) ^ (run_id is None):
@@ -364,7 +478,7 @@ class BaseXCom(Base, LoggingMixin):
return BaseXCom.deserialize_value(self)
-def resolve_xcom_backend():
+def resolve_xcom_backend() -> Type[BaseXCom]:
"""Resolves custom XCom class"""
clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
if clazz:
@@ -376,4 +490,7 @@ def resolve_xcom_backend():
return BaseXCom
-XCom = resolve_xcom_backend()
+if TYPE_CHECKING:
+ XCom = BaseXCom # Hack to avoid Mypy "Variable 'XCom' is not valid as a type".
+else:
+ XCom = resolve_xcom_backend()
[airflow] 17/43: Fix Scheduler crash when executing task instances of missing DAG (#20349)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1b139a77d012971d147bffc74646923514db4b48
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Jan 13 22:23:10 2022 +0100
Fix Scheduler crash when executing task instances of missing DAG (#20349)
When executing task instances, we do not check if the dag is missing in
the dagbag. This PR fixes it by ignoring task instances if we can't find
the dag in serialized dag table
Closes: #20099
(cherry picked from commit 98715760f72e5205c291293088b5e79636884491)
---
airflow/jobs/scheduler_job.py | 11 +++++++++++
tests/jobs/test_scheduler_job.py | 28 ++++++++++++++++++++++++++++
2 files changed, 39 insertions(+)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2fedf80..490d507 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -375,6 +375,17 @@ class SchedulerJob(BaseJob):
# Many dags don't have a task_concurrency, so where we can avoid loading the full
# serialized DAG the better.
serialized_dag = self.dagbag.get_dag(dag_id, session=session)
+ # If the dag is missing, fail the task and continue to the next task.
+ if not serialized_dag:
+ self.log.error(
+ "DAG '%s' for task instance %s not found in serialized_dag table",
+ dag_id,
+ task_instance,
+ )
+ session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
+ {TI.state: State.FAILED}, synchronize_session='fetch'
+ )
+ continue
if serialized_dag.has_task(task_instance.task_id):
task_concurrency_limit = serialized_dag.get_task(
task_instance.task_id
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index a308029..7185720 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -610,6 +610,34 @@ class TestSchedulerJob:
session.rollback()
session.close()
+ def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
+ """Check that task instances of missing DAGs are failed"""
+ dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag'
+ task_id_1 = 'dummy'
+ task_id_2 = 'dummydummy'
+
+ with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 1}):
+ DummyOperator(task_id=task_id_1)
+ DummyOperator(task_id=task_id_2)
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.dagbag = mock.MagicMock()
+ self.scheduler_job.dagbag.get_dag.return_value = None
+
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+ tis = dr.task_instances
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+ session.flush()
+ assert 0 == len(res)
+ tis = dr.get_task_instances(session=session)
+ assert len(tis) == 2
+ assert all(ti.state == State.FAILED for ti in tis)
+
def test_nonexistent_pool(self, dag_maker):
dag_id = 'SchedulerJobTest.test_nonexistent_pool'
with dag_maker(dag_id=dag_id, max_active_tasks=16):
[airflow] 26/43: Update error docs to include before_send option (#21275)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 270516cb1ac54bcf3ede888ddfffeb9154fa37d3
Author: Abhijeet Prasad <de...@gmail.com>
AuthorDate: Thu Feb 3 18:15:21 2022 -0500
Update error docs to include before_send option (#21275)
https://github.com/apache/airflow/pull/18261 Added support for the `before_send` option when initializing the Sentry SDK in airflow. This patch updates the documentation to reflect this change.
(cherry picked from commit b38391e2f91760e64576723c876341f532a6ee2d)
---
docs/apache-airflow/logging-monitoring/errors.rst | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/docs/apache-airflow/logging-monitoring/errors.rst b/docs/apache-airflow/logging-monitoring/errors.rst
index 9f4256a..30bb66d 100644
--- a/docs/apache-airflow/logging-monitoring/errors.rst
+++ b/docs/apache-airflow/logging-monitoring/errors.rst
@@ -42,8 +42,14 @@ Add your ``SENTRY_DSN`` to your configuration file e.g. ``airflow.cfg`` in ``[se
.. note::
If this value is not provided, the SDK will try to read it from the ``SENTRY_DSN`` environment variable.
-You can supply `additional configuration options <https://docs.sentry.io/platforms/python/configuration/options>`__ based on the Python platform via ``[sentry]`` section.
-Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``, ``ignore_errors``, ``before_breadcrumb``, ``before_send``, ``transport``.
+The ``before_send`` option can be used to modify or drop events before they are sent to Sentry. To set this option, provide a dotted path to a before_send function that the sentry SDK should be configured to use.
+
+.. code-block:: ini
+
+ [sentry]
+ before_send = path.to.my.sentry.before_send
+
+You can supply `additional configuration options <https://docs.sentry.io/platforms/python/configuration/options>`__ based on the Python platform via ``[sentry]`` section. Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``, ``ignore_errors``, ``before_breadcrumb``, ``transport``.
Tags
-----
[airflow] 24/43: Update stat_name_handler documentation (#21298)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 015c481f63825e6d5a7d044d970c563a6450040c
Author: Fran Sánchez <fj...@users.noreply.github.com>
AuthorDate: Thu Feb 3 18:12:08 2022 +0000
Update stat_name_handler documentation (#21298)
Previously stat_name_handler was under the scheduler section of the
configuration but it was moved to the metrics section since 2.0.0.
(cherry picked from commit 0ae31e9cb95e5061a23c2f397ab9716391c1a488)
---
docs/apache-airflow/logging-monitoring/metrics.rst | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/apache-airflow/logging-monitoring/metrics.rst b/docs/apache-airflow/logging-monitoring/metrics.rst
index c8fd182..bdbd2ec 100644
--- a/docs/apache-airflow/logging-monitoring/metrics.rst
+++ b/docs/apache-airflow/logging-monitoring/metrics.rst
@@ -50,7 +50,7 @@ the metrics that start with the elements of the list:
statsd_allow_list = scheduler,executor,dagrun
If you want to redirect metrics to different name, you can configure ``stat_name_handler`` option
-in ``[scheduler]`` section. It should point to a function that validates the statsd stat name, applies changes
+in ``[metrics]`` section. It should point to a function that validates the StatsD stat name, applies changes
to the stat name if necessary, and returns the transformed stat name. The function may looks as follow:
.. code-block:: python
[airflow] 12/43: Deprecate some functions in the experimental API (#19931)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 663bb546e782748bdd315483ca2070a77046997a
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Dec 16 12:30:42 2021 +0100
Deprecate some functions in the experimental API (#19931)
This PR seeks to deprecate some functions in the experimental API.
Some of the deprecated functions are only used in the experimental REST API,
others that are valid are being moved out of the experimental package.
(cherry picked from commit 6239ae91a4c8bfb05f053a61cb8386f2d63b8b3a)
---
airflow/api/client/local_client.py | 29 ++++--
.../api/common/{experimental => }/delete_dag.py | 3 +-
airflow/api/common/experimental/delete_dag.py | 70 ++-----------
airflow/api/common/experimental/get_code.py | 3 +
.../api/common/experimental/get_dag_run_state.py | 3 +
airflow/api/common/experimental/get_task.py | 3 +
.../api/common/experimental/get_task_instance.py | 3 +
airflow/api/common/experimental/pool.py | 6 ++
airflow/api/common/experimental/trigger_dag.py | 115 ++-------------------
.../api/common/{experimental => }/trigger_dag.py | 5 +-
airflow/api_connexion/endpoints/dag_endpoint.py | 7 +-
airflow/models/pool.py | 52 ++++++++--
airflow/operators/trigger_dagrun.py | 2 +-
airflow/utils/db.py | 15 +++
airflow/www/views.py | 2 +-
setup.cfg | 1 +
tests/api/client/test_local_client.py | 31 +++++-
.../common/{experimental => }/test_delete_dag.py | 2 +-
.../common/{experimental => }/test_trigger_dag.py | 8 +-
tests/models/test_pool.py | 71 +++++++++++++
20 files changed, 229 insertions(+), 202 deletions(-)
diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py
index 7ce0d16..c005067 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -18,8 +18,10 @@
"""Local client API"""
from airflow.api.client import api_client
-from airflow.api.common.experimental import delete_dag, pool, trigger_dag
+from airflow.api.common import delete_dag, trigger_dag
from airflow.api.common.experimental.get_lineage import get_lineage as get_lineage_api
+from airflow.exceptions import AirflowBadRequest, PoolNotFound
+from airflow.models.pool import Pool
class Client(api_client.Client):
@@ -36,19 +38,30 @@ class Client(api_client.Client):
return f"Removed {count} record(s)"
def get_pool(self, name):
- the_pool = pool.get_pool(name=name)
- return the_pool.pool, the_pool.slots, the_pool.description
+ pool = Pool.get_pool(pool_name=name)
+ if not pool:
+ raise PoolNotFound(f"Pool {name} not found")
+ return pool.pool, pool.slots, pool.description
def get_pools(self):
- return [(p.pool, p.slots, p.description) for p in pool.get_pools()]
+ return [(p.pool, p.slots, p.description) for p in Pool.get_pools()]
def create_pool(self, name, slots, description):
- the_pool = pool.create_pool(name=name, slots=slots, description=description)
- return the_pool.pool, the_pool.slots, the_pool.description
+ if not (name and name.strip()):
+ raise AirflowBadRequest("Pool name shouldn't be empty")
+ pool_name_length = Pool.pool.property.columns[0].type.length
+ if len(name) > pool_name_length:
+ raise AirflowBadRequest(f"pool name cannot be more than {pool_name_length} characters")
+ try:
+ slots = int(slots)
+ except ValueError:
+ raise AirflowBadRequest(f"Bad value for `slots`: {slots}")
+ pool = Pool.create_or_update_pool(name=name, slots=slots, description=description)
+ return pool.pool, pool.slots, pool.description
def delete_pool(self, name):
- the_pool = pool.delete_pool(name=name)
- return the_pool.pool, the_pool.slots, the_pool.description
+ pool = Pool.delete_pool(name=name)
+ return pool.pool, pool.slots, pool.description
def get_lineage(self, dag_id, execution_date):
lineage = get_lineage_api(dag_id=dag_id, execution_date=execution_date)
diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/delete_dag.py
similarity index 97%
copy from airflow/api/common/experimental/delete_dag.py
copy to airflow/api/common/delete_dag.py
index 44e54e3..c448127 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -24,6 +24,7 @@ from airflow import models
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DagModel, TaskFail
from airflow.models.serialized_dag import SerializedDagModel
+from airflow.utils.db import get_sqla_model_classes
from airflow.utils.session import provide_session
from airflow.utils.state import State
@@ -60,7 +61,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
count = 0
- for model in models.base.Base._decl_class_registry.values():
+ for model in get_sqla_model_classes():
if hasattr(model, "dag_id"):
if keep_records_in_log and model.__name__ == 'Log':
continue
diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py
index 44e54e3..36bf7dd 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/experimental/delete_dag.py
@@ -15,68 +15,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Delete DAGs APIs."""
-import logging
+import warnings
-from sqlalchemy import or_
+from airflow.api.common.delete_dag import * # noqa
-from airflow import models
-from airflow.exceptions import AirflowException, DagNotFound
-from airflow.models import DagModel, TaskFail
-from airflow.models.serialized_dag import SerializedDagModel
-from airflow.utils.session import provide_session
-from airflow.utils.state import State
-
-log = logging.getLogger(__name__)
-
-
-@provide_session
-def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> int:
- """
- :param dag_id: the dag_id of the DAG to delete
- :param keep_records_in_log: whether keep records of the given dag_id
- in the Log table in the backend database (for reasons like auditing).
- The default value is True.
- :param session: session used
- :return count of deleted dags
- """
- log.info("Deleting DAG: %s", dag_id)
- running_tis = (
- session.query(models.TaskInstance.state)
- .filter(models.TaskInstance.dag_id == dag_id)
- .filter(models.TaskInstance.state == State.RUNNING)
- .first()
- )
- if running_tis:
- raise AirflowException("TaskInstances still running")
- dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
- if dag is None:
- raise DagNotFound(f"Dag id {dag_id} not found")
-
- # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
- # There may be a lag, so explicitly removes serialized DAG here.
- if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
- SerializedDagModel.remove_dag(dag_id=dag_id, session=session)
-
- count = 0
-
- for model in models.base.Base._decl_class_registry.values():
- if hasattr(model, "dag_id"):
- if keep_records_in_log and model.__name__ == 'Log':
- continue
- cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
- count += session.query(model).filter(cond).delete(synchronize_session='fetch')
- if dag.is_subdag:
- parent_dag_id, task_id = dag_id.rsplit(".", 1)
- for model in TaskFail, models.TaskInstance:
- count += (
- session.query(model).filter(model.dag_id == parent_dag_id, model.task_id == task_id).delete()
- )
-
- # Delete entries in Import Errors table for a deleted DAG
- # This handles the case when the dag_id is changed in the file
- session.query(models.ImportError).filter(models.ImportError.filename == dag.fileloc).delete(
- synchronize_session='fetch'
- )
-
- return count
+warnings.warn(
+ "This module is deprecated. Please use `airflow.api.common.delete_dag` instead.",
+ DeprecationWarning,
+ stacklevel=2,
+)
diff --git a/airflow/api/common/experimental/get_code.py b/airflow/api/common/experimental/get_code.py
index 79b0b9f..1a1fb62 100644
--- a/airflow/api/common/experimental/get_code.py
+++ b/airflow/api/common/experimental/get_code.py
@@ -16,11 +16,14 @@
# specific language governing permissions and limitations
# under the License.
"""Get code APIs."""
+from deprecated import deprecated
+
from airflow.api.common.experimental import check_and_get_dag
from airflow.exceptions import AirflowException, DagCodeNotFound
from airflow.models.dagcode import DagCode
+@deprecated(reason="Use DagCode().get_code_by_fileloc() instead", version="2.2.3")
def get_code(dag_id: str) -> str:
"""Return python code of a given dag_id.
diff --git a/airflow/api/common/experimental/get_dag_run_state.py b/airflow/api/common/experimental/get_dag_run_state.py
index ca71a9a..b2dedd5 100644
--- a/airflow/api/common/experimental/get_dag_run_state.py
+++ b/airflow/api/common/experimental/get_dag_run_state.py
@@ -19,9 +19,12 @@
from datetime import datetime
from typing import Dict
+from deprecated import deprecated
+
from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
+@deprecated(reason="Use DagRun().get_state() instead", version="2.2.3")
def get_dag_run_state(dag_id: str, execution_date: datetime) -> Dict[str, str]:
"""Return the Dag Run state identified by the given dag_id and execution_date.
diff --git a/airflow/api/common/experimental/get_task.py b/airflow/api/common/experimental/get_task.py
index 302ad64..fae5fd7 100644
--- a/airflow/api/common/experimental/get_task.py
+++ b/airflow/api/common/experimental/get_task.py
@@ -16,10 +16,13 @@
# specific language governing permissions and limitations
# under the License.
"""Task APIs.."""
+from deprecated import deprecated
+
from airflow.api.common.experimental import check_and_get_dag
from airflow.models import TaskInstance
+@deprecated(reason="Use DAG().get_task", version="2.2.3")
def get_task(dag_id: str, task_id: str) -> TaskInstance:
"""Return the task object identified by the given dag_id and task_id."""
dag = check_and_get_dag(dag_id, task_id)
diff --git a/airflow/api/common/experimental/get_task_instance.py b/airflow/api/common/experimental/get_task_instance.py
index f3ca1cf2..137f8a3 100644
--- a/airflow/api/common/experimental/get_task_instance.py
+++ b/airflow/api/common/experimental/get_task_instance.py
@@ -18,11 +18,14 @@
"""Task Instance APIs."""
from datetime import datetime
+from deprecated import deprecated
+
from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
from airflow.exceptions import TaskInstanceNotFound
from airflow.models import TaskInstance
+@deprecated(version="2.2.3", reason="Use DagRun.get_task_instance instead")
def get_task_instance(dag_id: str, task_id: str, execution_date: datetime) -> TaskInstance:
"""Return the task instance identified by the given dag_id, task_id and execution_date."""
dag = check_and_get_dag(dag_id, task_id)
diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py
index 30950ea..0b9c3a5 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -16,11 +16,14 @@
# specific language governing permissions and limitations
# under the License.
"""Pool APIs."""
+from deprecated import deprecated
+
from airflow.exceptions import AirflowBadRequest, PoolNotFound
from airflow.models import Pool
from airflow.utils.session import provide_session
+@deprecated(reason="Use Pool.get_pool() instead", version="2.2.3")
@provide_session
def get_pool(name, session=None):
"""Get pool by a given name."""
@@ -34,12 +37,14 @@ def get_pool(name, session=None):
return pool
+@deprecated(reason="Use Pool.get_pools() instead", version="2.2.3")
@provide_session
def get_pools(session=None):
"""Get all pools."""
return session.query(Pool).all()
+@deprecated(reason="Use Pool.create_pool() instead", version="2.2.3")
@provide_session
def create_pool(name, slots, description, session=None):
"""Create a pool with a given parameters."""
@@ -70,6 +75,7 @@ def create_pool(name, slots, description, session=None):
return pool
+@deprecated(reason="Use Pool.delete_pool() instead", version="2.2.3")
@provide_session
def delete_pool(name, session=None):
"""Delete pool by a given name."""
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 38a873c..d526312 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -15,114 +15,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Triggering DAG runs APIs."""
-import json
-from datetime import datetime
-from typing import List, Optional, Union
-from airflow.exceptions import DagNotFound, DagRunAlreadyExists
-from airflow.models import DagBag, DagModel, DagRun
-from airflow.utils import timezone
-from airflow.utils.state import State
-from airflow.utils.types import DagRunType
+import warnings
+from airflow.api.common.trigger_dag import * # noqa
-def _trigger_dag(
- dag_id: str,
- dag_bag: DagBag,
- run_id: Optional[str] = None,
- conf: Optional[Union[dict, str]] = None,
- execution_date: Optional[datetime] = None,
- replace_microseconds: bool = True,
-) -> List[DagRun]:
- """Triggers DAG run.
-
- :param dag_id: DAG ID
- :param dag_bag: DAG Bag model
- :param run_id: ID of the dag_run
- :param conf: configuration
- :param execution_date: date of execution
- :param replace_microseconds: whether microseconds should be zeroed
- :return: list of triggered dags
- """
- dag = dag_bag.get_dag(dag_id) # prefetch dag if it is stored serialized
-
- if dag_id not in dag_bag.dags:
- raise DagNotFound(f"Dag id {dag_id} not found")
-
- execution_date = execution_date if execution_date else timezone.utcnow()
-
- if not timezone.is_localized(execution_date):
- raise ValueError("The execution_date should be localized")
-
- if replace_microseconds:
- execution_date = execution_date.replace(microsecond=0)
-
- if dag.default_args and 'start_date' in dag.default_args:
- min_dag_start_date = dag.default_args["start_date"]
- if min_dag_start_date and execution_date < min_dag_start_date:
- raise ValueError(
- "The execution_date [{}] should be >= start_date [{}] from DAG's default_args".format(
- execution_date.isoformat(), min_dag_start_date.isoformat()
- )
- )
-
- run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
- dag_run = DagRun.find_duplicate(dag_id=dag_id, execution_date=execution_date, run_id=run_id)
-
- if dag_run:
- raise DagRunAlreadyExists(
- f"A Dag Run already exists for dag id {dag_id} at {execution_date} with run id {run_id}"
- )
-
- run_conf = None
- if conf:
- run_conf = conf if isinstance(conf, dict) else json.loads(conf)
-
- dag_runs = []
- dags_to_run = [dag] + dag.subdags
- for _dag in dags_to_run:
- dag_run = _dag.create_dagrun(
- run_id=run_id,
- execution_date=execution_date,
- state=State.QUEUED,
- conf=run_conf,
- external_trigger=True,
- dag_hash=dag_bag.dags_hash.get(dag_id),
- )
- dag_runs.append(dag_run)
-
- return dag_runs
-
-
-def trigger_dag(
- dag_id: str,
- run_id: Optional[str] = None,
- conf: Optional[Union[dict, str]] = None,
- execution_date: Optional[datetime] = None,
- replace_microseconds: bool = True,
-) -> Optional[DagRun]:
- """Triggers execution of DAG specified by dag_id
-
- :param dag_id: DAG ID
- :param run_id: ID of the dag_run
- :param conf: configuration
- :param execution_date: date of execution
- :param replace_microseconds: whether microseconds should be zeroed
- :return: first dag run triggered - even if more than one Dag Runs were triggered or None
- """
- dag_model = DagModel.get_current(dag_id)
- if dag_model is None:
- raise DagNotFound(f"Dag id {dag_id} not found in DagModel")
-
- dagbag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
- triggers = _trigger_dag(
- dag_id=dag_id,
- dag_bag=dagbag,
- run_id=run_id,
- conf=conf,
- execution_date=execution_date,
- replace_microseconds=replace_microseconds,
- )
-
- return triggers[0] if triggers else None
+warnings.warn(
+ "This module is deprecated. Please use `airflow.api.common.trigger_dag` instead.",
+ DeprecationWarning,
+ stacklevel=2,
+)
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/trigger_dag.py
similarity index 95%
copy from airflow/api/common/experimental/trigger_dag.py
copy to airflow/api/common/trigger_dag.py
index 38a873c..70bbb78 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/trigger_dag.py
@@ -62,9 +62,8 @@ def _trigger_dag(
min_dag_start_date = dag.default_args["start_date"]
if min_dag_start_date and execution_date < min_dag_start_date:
raise ValueError(
- "The execution_date [{}] should be >= start_date [{}] from DAG's default_args".format(
- execution_date.isoformat(), min_dag_start_date.isoformat()
- )
+ f"The execution_date [{execution_date.isoformat()}] should be >= start_date "
+ f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
)
run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py
index c164fcc..286b191 100644
--- a/airflow/api_connexion/endpoints/dag_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_endpoint.py
@@ -110,13 +110,10 @@ def patch_dag(session, dag_id, update_mask=None):
@provide_session
def delete_dag(dag_id: str, session: Session):
"""Delete the specific DAG."""
- # TODO: This function is shared with the /delete endpoint used by the web
- # UI, so we're reusing it to simplify maintenance. Refactor the function to
- # another place when the experimental/legacy API is removed.
- from airflow.api.common.experimental import delete_dag
+ from airflow.api.common import delete_dag as delete_dag_module
try:
- delete_dag.delete_dag(dag_id, session=session)
+ delete_dag_module.delete_dag(dag_id, session=session)
except DagNotFound:
raise NotFound(f"Dag with id: '{dag_id}' not found")
except AirflowException:
diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index 6f217c4..8ae88aa 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -21,11 +21,11 @@ from typing import Dict, Iterable, Optional, Tuple
from sqlalchemy import Column, Integer, String, Text, func
from sqlalchemy.orm.session import Session
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, PoolNotFound
from airflow.models.base import Base
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.typing_compat import TypedDict
-from airflow.utils.session import provide_session
+from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import nowait, with_row_locks
from airflow.utils.state import State
@@ -57,7 +57,13 @@ class Pool(Base):
@staticmethod
@provide_session
- def get_pool(pool_name, session: Session = None):
+ def get_pools(session: Session = NEW_SESSION):
+ """Get all pools."""
+ return session.query(Pool).all()
+
+ @staticmethod
+ @provide_session
+ def get_pool(pool_name: str, session: Session = NEW_SESSION):
"""
Get the Pool with specific pool name from the Pools.
@@ -69,7 +75,7 @@ class Pool(Base):
@staticmethod
@provide_session
- def get_default_pool(session: Session = None):
+ def get_default_pool(session: Session = NEW_SESSION):
"""
Get the Pool of the default_pool from the Pools.
@@ -80,10 +86,44 @@ class Pool(Base):
@staticmethod
@provide_session
+ def create_or_update_pool(name: str, slots: int, description: str, session: Session = NEW_SESSION):
+ """Create a pool with given parameters or update it if it already exists."""
+ if not name:
+ return
+ pool = session.query(Pool).filter_by(pool=name).first()
+ if pool is None:
+ pool = Pool(pool=name, slots=slots, description=description)
+ session.add(pool)
+ else:
+ pool.slots = slots
+ pool.description = description
+
+ session.commit()
+
+ return pool
+
+ @staticmethod
+ @provide_session
+ def delete_pool(name: str, session: Session = NEW_SESSION):
+ """Delete pool by a given name."""
+ if name == Pool.DEFAULT_POOL_NAME:
+ raise AirflowException("default_pool cannot be deleted")
+
+ pool = session.query(Pool).filter_by(pool=name).first()
+ if pool is None:
+ raise PoolNotFound(f"Pool '{name}' doesn't exist")
+
+ session.delete(pool)
+ session.commit()
+
+ return pool
+
+ @staticmethod
+ @provide_session
def slots_stats(
*,
lock_rows: bool = False,
- session: Session = None,
+ session: Session = NEW_SESSION,
) -> Dict[str, PoolStats]:
"""
Get Pool stats (Number of Running, Queued, Open & Total tasks)
@@ -210,7 +250,7 @@ class Pool(Base):
)
@provide_session
- def open_slots(self, session: Session) -> float:
+ def open_slots(self, session: Session = NEW_SESSION) -> float:
"""
Get the number of slots open at the moment.
diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 1e6cb7f..421c796 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -21,7 +21,7 @@ import json
import time
from typing import Dict, List, Optional, Union
-from airflow.api.common.experimental.trigger_dag import trigger_dag
+from airflow.api.common.trigger_dag import trigger_dag
from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists
from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
from airflow.utils import timezone
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index f35d165..023f482 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -991,3 +991,18 @@ def check(session=None):
"""
session.execute('select 1 as is_alive;')
log.info("Connection successful.")
+
+
+def get_sqla_model_classes():
+ """
+ Get all SQLAlchemy class mappers.
+
+ SQLAlchemy < 1.4 does not support registry.mappers so we use
+ try/except to handle it.
+ """
+ from airflow.models.base import Base
+
+ try:
+ return [mapper.class_ for mapper in Base.registry.mappers]
+ except AttributeError:
+ return Base._decl_class_registry.values()
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 2182a17..f2642a7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1607,7 +1607,7 @@ class Airflow(AirflowBaseView):
@action_logging
def delete(self):
"""Deletes DAG."""
- from airflow.api.common.experimental import delete_dag
+ from airflow.api.common import delete_dag
from airflow.exceptions import DagNotFound
dag_id = request.values.get('dag_id')
diff --git a/setup.cfg b/setup.cfg
index b83ef9b..c3cce1c 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -95,6 +95,7 @@ install_requires =
croniter>=0.3.17
cryptography>=0.9.3
dataclasses;python_version<"3.7"
+ deprecated>=1.2.13
dill>=0.2.2, <0.4
# Sphinx RTD theme 0.5.2. introduced limitation to docutils to account for some docutils markup
# change:
diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py
index a2af8ca..9f574e4 100644
--- a/tests/api/client/test_local_client.py
+++ b/tests/api/client/test_local_client.py
@@ -17,6 +17,8 @@
# under the License.
import json
+import random
+import string
import unittest
from unittest.mock import ANY, patch
@@ -25,7 +27,7 @@ from freezegun import freeze_time
from airflow.api.client.local_client import Client
from airflow.example_dags import example_bash_operator
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowBadRequest, AirflowException, PoolNotFound
from airflow.models import DAG, DagBag, DagModel, DagRun, Pool
from airflow.utils import timezone
from airflow.utils.session import create_session
@@ -133,6 +135,10 @@ class TestLocalClient(unittest.TestCase):
pool = self.client.get_pool(name='foo')
assert pool == ('foo', 1, '')
+ def test_get_pool_non_existing_raises(self):
+ with pytest.raises(PoolNotFound):
+ self.client.get_pool(name='foo')
+
def test_get_pools(self):
self.client.create_pool(name='foo1', slots=1, description='')
self.client.create_pool(name='foo2', slots=2, description='')
@@ -145,6 +151,26 @@ class TestLocalClient(unittest.TestCase):
with create_session() as session:
assert session.query(Pool).count() == 2
+ def test_create_pool_bad_slots(self):
+ with pytest.raises(AirflowBadRequest, match="^Bad value for `slots`: foo$"):
+ self.client.create_pool(
+ name='foo',
+ slots='foo',
+ description='',
+ )
+
+ def test_create_pool_name_too_long(self):
+ long_name = ''.join(random.choices(string.ascii_lowercase, k=300))
+ pool_name_length = Pool.pool.property.columns[0].type.length
+ with pytest.raises(
+ AirflowBadRequest, match=f"^pool name cannot be more than {pool_name_length} characters"
+ ):
+ self.client.create_pool(
+ name=long_name,
+ slots=5,
+ description='',
+ )
+
def test_delete_pool(self):
self.client.create_pool(name='foo', slots=1, description='')
with create_session() as session:
@@ -152,3 +178,6 @@ class TestLocalClient(unittest.TestCase):
self.client.delete_pool(name='foo')
with create_session() as session:
assert session.query(Pool).count() == 1
+ for name in ('', ' '):
+ with pytest.raises(PoolNotFound, match=f"^Pool {name!r} doesn't exist$"):
+ Pool.delete_pool(name=name)
diff --git a/tests/api/common/experimental/test_delete_dag.py b/tests/api/common/test_delete_dag.py
similarity index 99%
rename from tests/api/common/experimental/test_delete_dag.py
rename to tests/api/common/test_delete_dag.py
index 5984cd2..0eb058a 100644
--- a/tests/api/common/experimental/test_delete_dag.py
+++ b/tests/api/common/test_delete_dag.py
@@ -20,7 +20,7 @@
import pytest
from airflow import models
-from airflow.api.common.experimental.delete_dag import delete_dag
+from airflow.api.common.delete_dag import delete_dag
from airflow.exceptions import AirflowException, DagNotFound
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
diff --git a/tests/api/common/experimental/test_trigger_dag.py b/tests/api/common/test_trigger_dag.py
similarity index 93%
rename from tests/api/common/experimental/test_trigger_dag.py
rename to tests/api/common/test_trigger_dag.py
index 2f16446..f79d413 100644
--- a/tests/api/common/experimental/test_trigger_dag.py
+++ b/tests/api/common/test_trigger_dag.py
@@ -22,7 +22,7 @@ from unittest import mock
import pytest
from parameterized import parameterized
-from airflow.api.common.experimental.trigger_dag import _trigger_dag
+from airflow.api.common.trigger_dag import _trigger_dag
from airflow.exceptions import AirflowException
from airflow.models import DAG, DagRun
from airflow.utils import timezone
@@ -42,7 +42,7 @@ class TestTriggerDag(unittest.TestCase):
with pytest.raises(AirflowException):
_trigger_dag('dag_not_found', dag_bag_mock)
- @mock.patch('airflow.api.common.experimental.trigger_dag.DagRun', spec=DagRun)
+ @mock.patch('airflow.api.common.trigger_dag.DagRun', spec=DagRun)
@mock.patch('airflow.models.DagBag')
def test_trigger_dag_dag_run_exist(self, dag_bag_mock, dag_run_mock):
dag_id = "dag_run_exist"
@@ -54,7 +54,7 @@ class TestTriggerDag(unittest.TestCase):
_trigger_dag(dag_id, dag_bag_mock)
@mock.patch('airflow.models.DAG')
- @mock.patch('airflow.api.common.experimental.trigger_dag.DagRun', spec=DagRun)
+ @mock.patch('airflow.api.common.trigger_dag.DagRun', spec=DagRun)
@mock.patch('airflow.models.DagBag')
def test_trigger_dag_include_subdags(self, dag_bag_mock, dag_run_mock, dag_mock):
dag_id = "trigger_dag"
@@ -70,7 +70,7 @@ class TestTriggerDag(unittest.TestCase):
assert 3 == len(triggers)
@mock.patch('airflow.models.DAG')
- @mock.patch('airflow.api.common.experimental.trigger_dag.DagRun', spec=DagRun)
+ @mock.patch('airflow.api.common.trigger_dag.DagRun', spec=DagRun)
@mock.patch('airflow.models.DagBag')
def test_trigger_dag_include_nested_subdags(self, dag_bag_mock, dag_run_mock, dag_mock):
dag_id = "trigger_dag"
diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py
index 00fe140..95e585e 100644
--- a/tests/models/test_pool.py
+++ b/tests/models/test_pool.py
@@ -16,11 +16,15 @@
# specific language governing permissions and limitations
# under the License.
+import pytest
+
from airflow import settings
+from airflow.exceptions import AirflowException, PoolNotFound
from airflow.models.pool import Pool
from airflow.models.taskinstance import TaskInstance as TI
from airflow.operators.dummy import DummyOperator
from airflow.utils import timezone
+from airflow.utils.session import create_session
from airflow.utils.state import State
from tests.test_utils.db import clear_db_dags, clear_db_pools, clear_db_runs, set_default_pool_slots
@@ -28,6 +32,10 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
class TestPool:
+
+ USER_POOL_COUNT = 2
+ TOTAL_POOL_COUNT = USER_POOL_COUNT + 1 # including default_pool
+
@staticmethod
def clean_db():
clear_db_dags()
@@ -36,6 +44,20 @@ class TestPool:
def setup_method(self):
self.clean_db()
+ self.pools = []
+
+ def add_pools(self):
+ self.pools = [Pool.get_default_pool()]
+ for i in range(self.USER_POOL_COUNT):
+ name = f'experimental_{i + 1}'
+ pool = Pool(
+ pool=name,
+ slots=i,
+ description=name,
+ )
+ self.pools.append(pool)
+ with create_session() as session:
+ session.add_all(self.pools)
def teardown_method(self):
self.clean_db()
@@ -149,3 +171,52 @@ class TestPool:
"running": 1,
}
} == Pool.slots_stats()
+
+ def test_get_pool(self):
+ self.add_pools()
+ pool = Pool.get_pool(pool_name=self.pools[0].pool)
+ assert pool.pool == self.pools[0].pool
+
+ def test_get_pool_non_existing(self):
+ self.add_pools()
+ assert not Pool.get_pool(pool_name='test')
+
+ def test_get_pool_bad_name(self):
+ for name in ('', ' '):
+ assert not Pool.get_pool(pool_name=name)
+
+ def test_get_pools(self):
+ self.add_pools()
+ pools = sorted(Pool.get_pools(), key=lambda p: p.pool)
+ assert pools[0].pool == self.pools[0].pool
+ assert pools[1].pool == self.pools[1].pool
+
+ def test_create_pool(self, session):
+ self.add_pools()
+ pool = Pool.create_or_update_pool(name='foo', slots=5, description='')
+ assert pool.pool == 'foo'
+ assert pool.slots == 5
+ assert pool.description == ''
+ assert session.query(Pool).count() == self.TOTAL_POOL_COUNT + 1
+
+ def test_create_pool_existing(self, session):
+ self.add_pools()
+ pool = Pool.create_or_update_pool(name=self.pools[0].pool, slots=5, description='')
+ assert pool.pool == self.pools[0].pool
+ assert pool.slots == 5
+ assert pool.description == ''
+ assert session.query(Pool).count() == self.TOTAL_POOL_COUNT
+
+ def test_delete_pool(self, session):
+ self.add_pools()
+ pool = Pool.delete_pool(name=self.pools[-1].pool)
+ assert pool.pool == self.pools[-1].pool
+ assert session.query(Pool).count() == self.TOTAL_POOL_COUNT - 1
+
+ def test_delete_pool_non_existing(self):
+ with pytest.raises(PoolNotFound, match="^Pool 'test' doesn't exist$"):
+ Pool.delete_pool(name='test')
+
+ def test_delete_default_pool_not_allowed(self):
+ with pytest.raises(AirflowException, match="^default_pool cannot be deleted$"):
+ Pool.delete_pool(Pool.DEFAULT_POOL_NAME)
[airflow] 36/43: Add a session backend to store session data in the database (#21478)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1c2909f8d69ade70803f10653e4845319ae99c0e
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Feb 15 10:57:46 2022 -0700
Add a session backend to store session data in the database (#21478)
Co-authored-by: Jed Cunningham <je...@apache.org>
(cherry picked from commit da9d0863c7ff121c111a455708163b026943bdf1)
---
airflow/config_templates/config.yml | 7 +++
airflow/config_templates/default_airflow.cfg | 4 ++
.../c381b21cb7e4_add_session_table_to_db.py | 54 ++++++++++++++++++
airflow/utils/db.py | 3 +
airflow/www/app.py | 3 +-
airflow/www/extensions/init_session.py | 63 ++++++++++++---------
.../www/{extensions/init_session.py => session.py} | 29 ++++------
docs/apache-airflow/migrations-ref.rst | 4 +-
docs/spelling_wordlist.txt | 1 +
setup.cfg | 3 +
tests/api_connexion/conftest.py | 7 ++-
tests/api_connexion/test_security.py | 4 ++
tests/test_utils/decorators.py | 2 +-
tests/utils/test_db.py | 3 +
tests/www/views/conftest.py | 1 +
tests/www/views/test_session.py | 65 ++++++++++++++++++++++
16 files changed, 205 insertions(+), 48 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 6941f03..1e77041 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -999,6 +999,13 @@
type: string
example: ~
default: ""
+ - name: session_backend
+ description: |
+ The type of backend used to store web session data, can be 'database' or 'securecookie'
+ version_added: 2.2.4
+ type: string
+ example: securecookie
+ default: database
- name: web_server_master_timeout
description: |
Number of seconds the webserver waits before killing gunicorn master that doesn't respond
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 6a5449b..826eaf4 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -516,6 +516,10 @@ web_server_ssl_cert =
# provided SSL will be enabled. This does not change the web server port.
web_server_ssl_key =
+# The type of backend used to store web session data, can be 'database' or 'securecookie'
+# Example: session_backend = securecookie
+session_backend = database
+
# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
web_server_master_timeout = 120
diff --git a/airflow/migrations/versions/c381b21cb7e4_add_session_table_to_db.py b/airflow/migrations/versions/c381b21cb7e4_add_session_table_to_db.py
new file mode 100644
index 0000000..cc6b9ab
--- /dev/null
+++ b/airflow/migrations/versions/c381b21cb7e4_add_session_table_to_db.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add session table to db
+
+Revision ID: c381b21cb7e4
+Revises: be2bfac3da23
+Create Date: 2022-01-25 13:56:35.069429
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = 'c381b21cb7e4'
+down_revision = 'be2bfac3da23'
+branch_labels = None
+depends_on = None
+
+TABLE_NAME = 'session'
+
+
+def upgrade():
+ """Apply add session table to db"""
+ op.create_table(
+ TABLE_NAME,
+ sa.Column('id', sa.Integer()),
+ sa.Column('session_id', sa.String(255)),
+ sa.Column('data', sa.LargeBinary()),
+ sa.Column('expiry', sa.DateTime()),
+ sa.PrimaryKeyConstraint('id'),
+ sa.UniqueConstraint('session_id'),
+ )
+
+
+def downgrade():
+ """Unapply add session table to db"""
+ op.drop_table(TABLE_NAME)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 023f482..c038d66 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -954,9 +954,12 @@ def drop_airflow_models(connection):
users.drop(settings.engine, checkfirst=True)
dag_stats = Table('dag_stats', Base.metadata)
dag_stats.drop(settings.engine, checkfirst=True)
+ session = Table('session', Base.metadata)
+ session.drop(settings.engine, checkfirst=True)
Base.metadata.drop_all(connection)
# we remove the Tables here so that if resetdb is run metadata does not keep the old tables.
+ Base.metadata.remove(session)
Base.metadata.remove(dag_stats)
Base.metadata.remove(users)
Base.metadata.remove(user)
diff --git a/airflow/www/app.py b/airflow/www/app.py
index 2de041b..16780cb 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -36,7 +36,7 @@ from airflow.www.extensions.init_jinja_globals import init_jinja_globals
from airflow.www.extensions.init_manifest_files import configure_manifest_files
from airflow.www.extensions.init_robots import init_robots
from airflow.www.extensions.init_security import init_api_experimental_auth, init_xframe_protection
-from airflow.www.extensions.init_session import init_airflow_session_interface, init_permanent_session
+from airflow.www.extensions.init_session import init_airflow_session_interface
from airflow.www.extensions.init_views import (
init_api_connexion,
init_api_experimental,
@@ -135,7 +135,6 @@ def create_app(config=None, testing=False):
init_jinja_globals(flask_app)
init_xframe_protection(flask_app)
- init_permanent_session(flask_app)
init_airflow_session_interface(flask_app)
return flask_app
diff --git a/airflow/www/extensions/init_session.py b/airflow/www/extensions/init_session.py
index 06e0ba5..7a09de7 100644
--- a/airflow/www/extensions/init_session.py
+++ b/airflow/www/extensions/init_session.py
@@ -15,33 +15,46 @@
# specific language governing permissions and limitations
# under the License.
-from flask import request, session as flask_session
-from flask.sessions import SecureCookieSessionInterface
+from flask import session as builtin_flask_session
-
-class AirflowSessionInterface(SecureCookieSessionInterface):
- """
- Airflow cookie session interface.
- Modifications of sessions should be done here because
- the change here is global.
- """
-
- def save_session(self, *args, **kwargs):
- """Prevent creating session from REST API requests."""
- if request.blueprint == '/api/v1':
- return None
- return super().save_session(*args, **kwargs)
-
-
-def init_permanent_session(app):
- """Make session permanent to allows us to store data"""
-
- def make_session_permanent():
- flask_session.permanent = True
-
- app.before_request(make_session_permanent)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.www.session import AirflowDatabaseSessionInterface, AirflowSecureCookieSessionInterface
def init_airflow_session_interface(app):
"""Set airflow session interface"""
- app.session_interface = AirflowSessionInterface()
+ config = app.config.copy()
+ selected_backend = conf.get('webserver', 'SESSION_BACKEND')
+ # A bit of a misnomer - normally cookies expire whenever the browser is closed
+ # or when they hit their expiry datetime, whichever comes first. "Permanent"
+ # cookies only expire when they hit their expiry datetime, and can outlive
+ # the browser being closed.
+ permanent_cookie = config.get('SESSION_PERMANENT', True)
+
+ if selected_backend == 'securecookie':
+ app.session_interface = AirflowSecureCookieSessionInterface()
+ if permanent_cookie:
+
+ def make_session_permanent():
+ builtin_flask_session.permanent = True
+
+ app.before_request(make_session_permanent)
+ elif selected_backend == 'database':
+ app.session_interface = AirflowDatabaseSessionInterface(
+ app=app,
+ db=None,
+ permanent=permanent_cookie,
+ # Typically these would be configurable with Flask-Session,
+ # but we will set them explicitly instead as they don't make
+ # sense to have configurable in Airflow's use case
+ table='session',
+ key_prefix='',
+ use_signer=True,
+ )
+ else:
+ raise AirflowConfigException(
+ "Unrecognized session backend specified in "
+ f"web_server_session_backend: '{selected_backend}'. Please set "
+ "this to either 'database' or 'securecookie'."
+ )
diff --git a/airflow/www/extensions/init_session.py b/airflow/www/session.py
similarity index 59%
copy from airflow/www/extensions/init_session.py
copy to airflow/www/session.py
index 06e0ba5..4092565 100644
--- a/airflow/www/extensions/init_session.py
+++ b/airflow/www/session.py
@@ -15,33 +15,26 @@
# specific language governing permissions and limitations
# under the License.
-from flask import request, session as flask_session
+from flask import request
from flask.sessions import SecureCookieSessionInterface
+from flask_session.sessions import SqlAlchemySessionInterface
-class AirflowSessionInterface(SecureCookieSessionInterface):
- """
- Airflow cookie session interface.
- Modifications of sessions should be done here because
- the change here is global.
- """
+class SesssionExemptMixin:
+ """Exempt certain blueprints/paths from autogenerated sessions"""
def save_session(self, *args, **kwargs):
- """Prevent creating session from REST API requests."""
+ """Prevent creating session from REST API and health requests."""
if request.blueprint == '/api/v1':
return None
+ if request.path == '/health':
+ return None
return super().save_session(*args, **kwargs)
-def init_permanent_session(app):
- """Make session permanent to allows us to store data"""
-
- def make_session_permanent():
- flask_session.permanent = True
-
- app.before_request(make_session_permanent)
+class AirflowDatabaseSessionInterface(SesssionExemptMixin, SqlAlchemySessionInterface):
+ """Session interface that exempts some routes and stores session data in the database"""
-def init_airflow_session_interface(app):
- """Set airflow session interface"""
- app.session_interface = AirflowSessionInterface()
+class AirflowSecureCookieSessionInterface(SesssionExemptMixin, SecureCookieSessionInterface):
+ """Session interface that exempts some routes and stores session data in a signed cookie"""
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 016c624..8dc1a55 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``be2bfac3da23`` (head) | ``7b2661a43ba3`` | ``2.2.3`` | Add has_import_errors column to DagModel |
+| ``c381b21cb7e4`` (head) | ``be2bfac3da23`` | ``2.2.4`` | Create a ``session`` table to store web session data |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``be2bfac3da23`` | ``7b2661a43ba3`` | ``2.2.3`` | Add has_import_errors column to DagModel |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``7b2661a43ba3`` | ``142555e44c17`` | ``2.2.0`` | Change ``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id. |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 5d77e29..ed114b6 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1222,6 +1222,7 @@ sdk
secretRef
secretRefs
securable
+securecookie
securityManager
seealso
seedlist
diff --git a/setup.cfg b/setup.cfg
index 7ab5c77..8e36d06 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -107,6 +107,9 @@ install_requires =
flask-appbuilder>=3.3.4, <4.0.0
flask-caching>=1.5.0, <2.0.0
flask-login>=0.3, <0.5
+ # Strict upper-bound on the latest release of flask-session,
+ # as any schema changes will require a migration.
+ flask-session>=0.3.1, <=0.4.0
flask-wtf>=0.14.3, <0.15
graphviz>=0.12
gunicorn>=20.1.0
diff --git a/tests/api_connexion/conftest.py b/tests/api_connexion/conftest.py
index cc92733..9b37b52 100644
--- a/tests/api_connexion/conftest.py
+++ b/tests/api_connexion/conftest.py
@@ -25,7 +25,12 @@ from tests.test_utils.decorators import dont_initialize_flask_app_submodules
@pytest.fixture(scope="session")
def minimal_app_for_api():
@dont_initialize_flask_app_submodules(
- skip_all_except=["init_appbuilder", "init_api_experimental_auth", "init_api_connexion"]
+ skip_all_except=[
+ "init_appbuilder",
+ "init_api_experimental_auth",
+ "init_api_connexion",
+ "init_airflow_session_interface",
+ ]
)
def factory():
with conf_vars({("api", "auth_backend"): "tests.test_utils.remote_user_api_auth_backend"}):
diff --git a/tests/api_connexion/test_security.py b/tests/api_connexion/test_security.py
index 244a8a2..68f6d31 100644
--- a/tests/api_connexion/test_security.py
+++ b/tests/api_connexion/test_security.py
@@ -45,3 +45,7 @@ class TestSession:
def test_session_not_created_on_api_request(self):
self.client.get("api/v1/dags", environ_overrides={'REMOTE_USER': "test"})
assert all(cookie.name != "session" for cookie in self.client.cookie_jar)
+
+ def test_session_not_created_on_health_endpoint_request(self):
+ self.client.get("health")
+ assert all(cookie.name != "session" for cookie in self.client.cookie_jar)
diff --git a/tests/test_utils/decorators.py b/tests/test_utils/decorators.py
index d08d159..949df63 100644
--- a/tests/test_utils/decorators.py
+++ b/tests/test_utils/decorators.py
@@ -42,7 +42,7 @@ def dont_initialize_flask_app_submodules(_func=None, *, skip_all_except=None):
"sync_appbuilder_roles",
"init_jinja_globals",
"init_xframe_protection",
- "init_permanent_session",
+ "init_airflow_session_interface",
"init_appbuilder",
]
diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py
index 601dc6f..27fa67b 100644
--- a/tests/utils/test_db.py
+++ b/tests/utils/test_db.py
@@ -74,6 +74,9 @@ class TestDb(unittest.TestCase):
lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_usg'),
lambda t: (t[0] == 'remove_table' and t[1].name == 'MSreplication_options'),
lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_dev'),
+ # Ignore flask-session table/index
+ lambda t: (t[0] == 'remove_table' and t[1].name == 'session'),
+ lambda t: (t[0] == 'remove_index' and t[1].name == 'session_id'),
]
for ignore in ignores:
diff = [d for d in diff if not ignore(d)]
diff --git a/tests/www/views/conftest.py b/tests/www/views/conftest.py
index 05fe1e4..f95a814 100644
--- a/tests/www/views/conftest.py
+++ b/tests/www/views/conftest.py
@@ -55,6 +55,7 @@ def app(examples_dag_bag):
"init_flash_views",
"init_jinja_globals",
"init_plugins",
+ "init_airflow_session_interface",
]
)
def factory():
diff --git a/tests/www/views/test_session.py b/tests/www/views/test_session.py
new file mode 100644
index 0000000..9fb6f36
--- /dev/null
+++ b/tests/www/views/test_session.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from airflow.exceptions import AirflowConfigException
+from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.decorators import dont_initialize_flask_app_submodules
+
+
+def test_session_cookie_created_on_login(user_client):
+ assert any(cookie.name == 'session' for cookie in user_client.cookie_jar)
+
+
+def test_session_inaccessible_after_logout(user_client):
+ session_cookie = next((cookie for cookie in user_client.cookie_jar if cookie.name == 'session'), None)
+ assert session_cookie is not None
+
+ resp = user_client.get('/logout/')
+ assert resp.status_code == 302
+
+ # Try to access /home with the session cookie from earlier
+ user_client.set_cookie('session', session_cookie.value)
+ user_client.get('/home/')
+ assert resp.status_code == 302
+
+
+def test_invalid_session_backend_option():
+ @dont_initialize_flask_app_submodules(
+ skip_all_except=[
+ "init_api_connexion",
+ "init_appbuilder",
+ "init_appbuilder_links",
+ "init_appbuilder_views",
+ "init_flash_views",
+ "init_jinja_globals",
+ "init_plugins",
+ "init_airflow_session_interface",
+ ]
+ )
+ def poorly_configured_app_factory():
+ with conf_vars({("webserver", "session_backend"): "invalid_value_for_session_backend"}):
+ return app.create_app(testing=True)
+
+ expected_exc_regex = (
+ "^Unrecognized session backend specified in web_server_session_backend: "
+ r"'invalid_value_for_session_backend'\. Please set this to .+\.$"
+ )
+ with pytest.raises(AirflowConfigException, match=expected_exc_regex):
+ poorly_configured_app_factory()
[airflow] 35/43: Show task status only for running dags or only for the last finished dag (#21352)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f25a58eebeb9ad3283fca2daa6666811f1a036c6
Author: Aleksey Kirilishin <54...@users.noreply.github.com>
AuthorDate: Mon Feb 14 18:55:00 2022 +0300
Show task status only for running dags or only for the last finished dag (#21352)
* Show task status only for running dags or only for the last finished dag
* Brought the logic of getting task statistics into a separate function
(cherry picked from commit 28d7bde2750c38300e5cf70ba32be153b1a11f2c)
---
airflow/www/views.py | 64 ++++++++++++++++++++++++++++++++++---------
tests/www/views/test_views.py | 35 ++++++++++++++++++++++-
2 files changed, 85 insertions(+), 14 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 2ed2a67..9ebe899 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -408,6 +408,31 @@ def dag_edges(dag):
return result
+def get_task_stats_from_query(qry):
+ """
+ Return a dict of the task quantity, grouped by dag id and task status.
+
+ :param qry: The data in the format (<dag id>, <task state>, <is dag running>, <task count>),
+ ordered by <dag id> and <is dag running>
+ """
+ data = {}
+ last_dag_id = None
+ has_running_dags = False
+ for dag_id, state, is_dag_running, count in qry:
+ if last_dag_id != dag_id:
+ last_dag_id = dag_id
+ has_running_dags = False
+ elif not is_dag_running and has_running_dags:
+ continue
+
+ if is_dag_running:
+ has_running_dags = True
+ if dag_id not in data:
+ data[dag_id] = {}
+ data[dag_id][state] = count
+ return data
+
+
######################################################################################
# Error handlers
######################################################################################
@@ -814,7 +839,9 @@ class Airflow(AirflowBaseView):
# Select all task_instances from active dag_runs.
running_task_instance_query_result = session.query(
- TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')
+ TaskInstance.dag_id.label('dag_id'),
+ TaskInstance.state.label('state'),
+ sqla.literal(True).label('is_dag_running'),
).join(
running_dag_run_query_result,
and_(
@@ -838,7 +865,11 @@ class Airflow(AirflowBaseView):
# Select all task_instances from active dag_runs.
# If no dag_run is active, return task instances from most recent dag_run.
last_task_instance_query_result = (
- session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state'))
+ session.query(
+ TaskInstance.dag_id.label('dag_id'),
+ TaskInstance.state.label('state'),
+ sqla.literal(False).label('is_dag_running'),
+ )
.join(TaskInstance.dag_run)
.join(
last_dag_run,
@@ -855,18 +886,25 @@ class Airflow(AirflowBaseView):
else:
final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti')
- qry = session.query(
- final_task_instance_query_result.c.dag_id,
- final_task_instance_query_result.c.state,
- sqla.func.count(),
- ).group_by(final_task_instance_query_result.c.dag_id, final_task_instance_query_result.c.state)
-
- data = {}
- for dag_id, state, count in qry:
- if dag_id not in data:
- data[dag_id] = {}
- data[dag_id][state] = count
+ qry = (
+ session.query(
+ final_task_instance_query_result.c.dag_id,
+ final_task_instance_query_result.c.state,
+ final_task_instance_query_result.c.is_dag_running,
+ sqla.func.count(),
+ )
+ .group_by(
+ final_task_instance_query_result.c.dag_id,
+ final_task_instance_query_result.c.state,
+ final_task_instance_query_result.c.is_dag_running,
+ )
+ .order_by(
+ final_task_instance_query_result.c.dag_id,
+ final_task_instance_query_result.c.is_dag_running.desc(),
+ )
+ )
+ data = get_task_stats_from_query(qry)
payload = {}
for dag_id in filter_dag_ids:
payload[dag_id] = []
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index b98c1bc..672d4a1 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -24,7 +24,13 @@ import pytest
from airflow.configuration import initialize_config
from airflow.plugins_manager import AirflowPlugin, EntryPointSource
from airflow.www import views
-from airflow.www.views import get_key_paths, get_safe_url, get_value_from_path, truncate_task_duration
+from airflow.www.views import (
+ get_key_paths,
+ get_safe_url,
+ get_task_stats_from_query,
+ get_value_from_path,
+ truncate_task_duration,
+)
from tests.test_utils.config import conf_vars
from tests.test_utils.mock_plugins import mock_plugin_manager
from tests.test_utils.www import check_content_in_response, check_content_not_in_response
@@ -333,3 +339,30 @@ def test_dag_edit_privileged_requires_view_has_action_decorators(cls: type):
action_funcs = action_funcs - {"action_post"}
for action_function in action_funcs:
assert_decorator_used(cls, action_function, views.action_has_dag_edit_access)
+
+
+def test_get_task_stats_from_query():
+ query_data = [
+ ['dag1', 'queued', True, 1],
+ ['dag1', 'running', True, 2],
+ ['dag1', 'success', False, 3],
+ ['dag2', 'running', True, 4],
+ ['dag2', 'success', True, 5],
+ ['dag3', 'success', False, 6],
+ ]
+ expected_data = {
+ 'dag1': {
+ 'queued': 1,
+ 'running': 2,
+ },
+ 'dag2': {
+ 'running': 4,
+ 'success': 5,
+ },
+ 'dag3': {
+ 'success': 6,
+ },
+ }
+
+ data = get_task_stats_from_query(query_data)
+ assert data == expected_data
[airflow] 08/43: Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit dda8f4356525041c5200c42d00e5dc05fd79c54b
Author: SeonghwanLee <50...@users.noreply.github.com>
AuthorDate: Thu Jan 27 14:36:24 2022 +0900
Fix 'airflow dags backfill --reset-dagruns' errors when run twice (#21062)
Co-authored-by: uplsh <up...@linecorp.com>
(cherry picked from commit d97e2bac854f9891eb47f0c06c261e89723038ca)
---
airflow/cli/commands/dag_command.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index e04bc73..6e8e157 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -47,7 +47,7 @@ from airflow.utils.cli import (
)
from airflow.utils.dot_renderer import render_dag
from airflow.utils.session import create_session, provide_session
-from airflow.utils.state import State
+from airflow.utils.state import DagRunState
@cli_utils.action_logging
@@ -105,7 +105,7 @@ def dag_backfill(args, dag=None):
end_date=args.end_date,
confirm_prompt=not args.yes,
include_subdags=True,
- dag_run_state=State.NONE,
+ dag_run_state=DagRunState.QUEUED,
)
try:
[airflow] 02/43: name mismatch (#21055)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a670f8c340b9e5a21f349c8dc5a7b9ff38579df9
Author: caxefaizan <63...@users.noreply.github.com>
AuthorDate: Mon Jan 24 16:17:23 2022 +0530
name mismatch (#21055)
(cherry picked from commit 4fb005ec122a1c0091db0083c2fe4305473abb49)
---
.../kubernetes/pod_template_file_examples/dags_in_volume_template.yaml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
index 389fe37..cc46149 100644
--- a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
+++ b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
@@ -63,7 +63,7 @@ spec:
fsGroup: 50000
serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
volumes:
- - name: dags
+ - name: airflow-dags
persistentVolumeClaim:
claimName: RELEASE-NAME-dags
- emptyDir: {}
[airflow] 15/43: bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 55a4abbe1631f34325327d1494f1faaaa0c7e359
Author: Đặng Minh Dũng <du...@live.com>
AuthorDate: Wed Jan 5 14:42:57 2022 +0700
bugfix: deferred tasks does not cancel when DAG is marked fail (#20649)
(cherry picked from commit 64c0bd50155dfdb84671ac35d645b812fafa78a1)
---
airflow/api/common/experimental/mark_tasks.py | 121 ++++++++++++++++++--------
1 file changed, 85 insertions(+), 36 deletions(-)
diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py
index 28e733d..4131cb5 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -17,23 +17,27 @@
# under the License.
"""Marks tasks APIs."""
-import datetime
-from typing import Iterable
+from datetime import datetime
+from typing import Generator, Iterable, List, Optional
-from sqlalchemy import or_
from sqlalchemy.orm import contains_eager
+from sqlalchemy.orm.session import Session as SASession
+from sqlalchemy.sql.expression import or_
+from airflow import DAG
from airflow.models.baseoperator import BaseOperator
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.operators.subdag import SubDagOperator
from airflow.utils import timezone
-from airflow.utils.session import provide_session
-from airflow.utils.state import State
+from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.state import State, TaskInstanceState
from airflow.utils.types import DagRunType
-def _create_dagruns(dag, execution_dates, state, run_type):
+def _create_dagruns(
+ dag: DAG, execution_dates: List[datetime], state: TaskInstanceState, run_type: DagRunType
+) -> List[DagRun]:
"""
Infers from the dates which dag runs need to be created and does so.
@@ -63,15 +67,15 @@ def _create_dagruns(dag, execution_dates, state, run_type):
@provide_session
def set_state(
tasks: Iterable[BaseOperator],
- execution_date: datetime.datetime,
+ execution_date: datetime,
upstream: bool = False,
downstream: bool = False,
future: bool = False,
past: bool = False,
state: str = State.SUCCESS,
commit: bool = False,
- session=None,
-):
+ session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
"""
Set the state of a task instance and if needed its relatives. Can set state
for future tasks (calculated from execution_date) and retroactively
@@ -134,7 +138,9 @@ def set_state(
return tis_altered
-def all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates):
+def all_subdag_tasks_query(
+ sub_dag_run_ids: List[str], session: SASession, state: TaskInstanceState, confirmed_dates: List[datetime]
+):
"""Get *all* tasks of the sub dags"""
qry_sub_dag = (
session.query(TaskInstance)
@@ -144,7 +150,13 @@ def all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates):
return qry_sub_dag
-def get_all_dag_task_query(dag, session, state, task_ids, confirmed_dates):
+def get_all_dag_task_query(
+ dag: DAG,
+ session: SASession,
+ state: TaskInstanceState,
+ task_ids: List[str],
+ confirmed_dates: List[datetime],
+):
"""Get all tasks of the main dag that will be affected by a state change"""
qry_dag = (
session.query(TaskInstance)
@@ -160,7 +172,14 @@ def get_all_dag_task_query(dag, session, state, task_ids, confirmed_dates):
return qry_dag
-def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
+def get_subdag_runs(
+ dag: DAG,
+ session: SASession,
+ state: TaskInstanceState,
+ task_ids: List[str],
+ commit: bool,
+ confirmed_dates: List[datetime],
+) -> List[str]:
"""Go through subdag operators and create dag runs. We will only work
within the scope of the subdag. We won't propagate to the parent dag,
but we will propagate from parent to subdag.
@@ -181,7 +200,7 @@ def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
dag_runs = _create_dagruns(
current_task.subdag,
execution_dates=confirmed_dates,
- state=State.RUNNING,
+ state=TaskInstanceState.RUNNING,
run_type=DagRunType.BACKFILL_JOB,
)
@@ -192,7 +211,13 @@ def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
return sub_dag_ids
-def verify_dagruns(dag_runs, commit, state, session, current_task):
+def verify_dagruns(
+ dag_runs: List[DagRun],
+ commit: bool,
+ state: TaskInstanceState,
+ session: SASession,
+ current_task: BaseOperator,
+):
"""Verifies integrity of dag_runs.
:param dag_runs: dag runs to verify
@@ -210,7 +235,7 @@ def verify_dagruns(dag_runs, commit, state, session, current_task):
session.merge(dag_run)
-def verify_dag_run_integrity(dag, dates):
+def verify_dag_run_integrity(dag: DAG, dates: List[datetime]) -> List[datetime]:
"""
Verify the integrity of the dag runs in case a task was added or removed
set the confirmed execution dates as they might be different
@@ -225,7 +250,9 @@ def verify_dag_run_integrity(dag, dates):
return confirmed_dates
-def find_task_relatives(tasks, downstream, upstream):
+def find_task_relatives(
+ tasks: Iterable[BaseOperator], downstream: bool, upstream: bool
+) -> Generator[str, None, None]:
"""Yield task ids and optionally ancestor and descendant ids."""
for task in tasks:
yield task.task_id
@@ -237,7 +264,7 @@ def find_task_relatives(tasks, downstream, upstream):
yield relative.task_id
-def get_execution_dates(dag, execution_date, future, past):
+def get_execution_dates(dag: DAG, execution_date: datetime, future: bool, past: bool) -> List[datetime]:
"""Returns dates of DAG execution"""
latest_execution_date = dag.get_latest_execution_date()
if latest_execution_date is None:
@@ -266,7 +293,9 @@ def get_execution_dates(dag, execution_date, future, past):
@provide_session
-def _set_dag_run_state(dag_id, execution_date, state, session=None):
+def _set_dag_run_state(
+ dag_id: str, execution_date: datetime, state: TaskInstanceState, session: SASession = NEW_SESSION
+):
"""
Helper method that set dag run state in the DB.
@@ -279,7 +308,7 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None):
session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date).one()
)
dag_run.state = state
- if state == State.RUNNING:
+ if state == TaskInstanceState.RUNNING:
dag_run.start_date = timezone.utcnow()
dag_run.end_date = None
else:
@@ -288,7 +317,12 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None):
@provide_session
-def set_dag_run_state_to_success(dag, execution_date, commit=False, session=None):
+def set_dag_run_state_to_success(
+ dag: Optional[DAG],
+ execution_date: Optional[datetime],
+ commit: bool = False,
+ session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
"""
Set the dag run for a specific execution date and its task instances
to success.
@@ -306,18 +340,27 @@ def set_dag_run_state_to_success(dag, execution_date, commit=False, session=None
# Mark the dag run to success.
if commit:
- _set_dag_run_state(dag.dag_id, execution_date, State.SUCCESS, session)
+ _set_dag_run_state(dag.dag_id, execution_date, TaskInstanceState.SUCCESS, session)
# Mark all task instances of the dag run to success.
for task in dag.tasks:
task.dag = dag
return set_state(
- tasks=dag.tasks, execution_date=execution_date, state=State.SUCCESS, commit=commit, session=session
+ tasks=dag.tasks,
+ execution_date=execution_date,
+ state=TaskInstanceState.SUCCESS,
+ commit=commit,
+ session=session,
)
@provide_session
-def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None):
+def set_dag_run_state_to_failed(
+ dag: Optional[DAG],
+ execution_date: Optional[datetime],
+ commit: bool = False,
+ session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
"""
Set the dag run for a specific execution date and its running task instances
to failed.
@@ -335,18 +378,15 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None)
# Mark the dag run to failed.
if commit:
- _set_dag_run_state(dag.dag_id, execution_date, State.FAILED, session)
+ _set_dag_run_state(dag.dag_id, execution_date, TaskInstanceState.FAILED, session)
- # Mark only RUNNING task instances.
+ # Mark only running task instances.
task_ids = [task.task_id for task in dag.tasks]
- tis = (
- session.query(TaskInstance)
- .filter(
- TaskInstance.dag_id == dag.dag_id,
- TaskInstance.execution_date == execution_date,
- TaskInstance.task_id.in_(task_ids),
- )
- .filter(TaskInstance.state == State.RUNNING)
+ tis = session.query(TaskInstance).filter(
+ TaskInstance.dag_id == dag.dag_id,
+ TaskInstance.execution_date == execution_date,
+ TaskInstance.task_id.in_(task_ids),
+ TaskInstance.state.in_(State.running),
)
task_ids_of_running_tis = [task_instance.task_id for task_instance in tis]
@@ -358,12 +398,21 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None)
tasks.append(task)
return set_state(
- tasks=tasks, execution_date=execution_date, state=State.FAILED, commit=commit, session=session
+ tasks=tasks,
+ execution_date=execution_date,
+ state=TaskInstanceState.FAILED,
+ commit=commit,
+ session=session,
)
@provide_session
-def set_dag_run_state_to_running(dag, execution_date, commit=False, session=None):
+def set_dag_run_state_to_running(
+ dag: Optional[DAG],
+ execution_date: Optional[datetime],
+ commit: bool = False,
+ session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
"""
Set the dag run for a specific execution date to running.
@@ -380,7 +429,7 @@ def set_dag_run_state_to_running(dag, execution_date, commit=False, session=None
# Mark the dag run to running.
if commit:
- _set_dag_run_state(dag.dag_id, execution_date, State.RUNNING, session)
+ _set_dag_run_state(dag.dag_id, execution_date, TaskInstanceState.RUNNING, session)
# To keep the return type consistent with the other similar functions.
return res
[airflow] 43/43: added explaining concept of logical date in DAG run docs (#21433)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 56d82fc3483f500d7a1da36019888849e8784c12
Author: Howard Yoo <32...@users.noreply.github.com>
AuthorDate: Thu Feb 17 14:01:58 2022 -0600
added explaining concept of logical date in DAG run docs (#21433)
(cherry picked from commit 752d53860e636ead2be7c3f2044b9b312ba86b95)
---
docs/apache-airflow/concepts/dags.rst | 16 ++++++++++++++++
docs/apache-airflow/dag-run.rst | 2 ++
docs/apache-airflow/faq.rst | 7 +++++++
3 files changed, 25 insertions(+)
diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index 3edaf35..e339abe 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -157,6 +157,8 @@ The ``schedule_interval`` argument takes any value that is a valid `Crontab <htt
For more information on ``schedule_interval`` values, see :doc:`DAG Run </dag-run>`.
If ``schedule_interval`` is not enough to express the DAG's schedule, see :doc:`Timetables </howto/timetable>`.
+ For more information on ``logical date``, see :ref:`data-interval` and
+ :ref:`faq:what-does-execution-date-mean`.
Every time you run a DAG, you are creating a new instance of that DAG which
Airflow calls a :doc:`DAG Run </dag-run>`. DAG Runs can run in parallel for the
@@ -177,6 +179,20 @@ In much the same way a DAG instantiates into a DAG Run every time it's run,
Tasks specified inside a DAG are also instantiated into
:ref:`Task Instances <concepts:task-instances>` along with it.
+A DAG run will have a start date when it starts, and end date when it ends.
+This period describes the time when the DAG actually 'ran.' Aside from the DAG
+run's start and end date, there is another date called *logical date*
+(formally known as execution date), which describes the intended time a
+DAG run is scheduled or triggered. The reason why this is called
+*logical* is because of the abstract nature of it having multiple meanings,
+depending on the context of the DAG run itself.
+
+For example, if a DAG run is manually triggered by the user, its logical date would be the
+date and time of which the DAG run was triggered, and the value should be equal
+to DAG run's start date. However, when the DAG is being automatically scheduled, with certain
+schedule interval put in place, the logical date is going to indicate the time
+at which it marks the start of the data interval, where the DAG run's start
+date would then be the logical date + scheduled interval.
DAG Assignment
--------------
diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst
index 90bb404..62555b1 100644
--- a/docs/apache-airflow/dag-run.rst
+++ b/docs/apache-airflow/dag-run.rst
@@ -84,6 +84,8 @@ scheduled one interval after ``start_date``.
If ``schedule_interval`` is not enough to express your DAG's schedule,
logical date, or data interval, see :doc:`/concepts/timetable`.
+ For more information on ``logical date``, see :ref:`concepts:dag-run` and
+ :ref:`faq:what-does-execution-date-mean`
Re-run DAG
''''''''''
diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst
index 857e685..7f72a0d 100644
--- a/docs/apache-airflow/faq.rst
+++ b/docs/apache-airflow/faq.rst
@@ -214,6 +214,8 @@ This allows for a backfill on tasks that have ``depends_on_past=True`` to
actually start. If this were not the case, the backfill just would not start.
+.. _faq:what-does-execution-date-mean:
+
What does ``execution_date`` mean?
----------------------------------
@@ -248,6 +250,11 @@ misunderstandings.
Note that ``ds`` (the YYYY-MM-DD form of ``data_interval_start``) refers to
*date* ***string***, not *date* ***start*** as may be confusing to some.
+.. tip::
+
+ For more information on ``logical date``, see :ref:`data-interval` and
+ :ref:`concepts:dag-run`.
+
How to create DAGs dynamically?
-------------------------------
[airflow] 25/43: Fix the incorrect scheduling time for the first run of dag (#21011)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 64e0c5024b3cb13d2fc53f42b8096c2ae3441553
Author: wano <55...@users.noreply.github.com>
AuthorDate: Mon Feb 7 02:02:57 2022 +0800
Fix the incorrect scheduling time for the first run of dag (#21011)
When Catchup_by_default is set to false and start_date in the DAG is the
previous day, the first schedule time for this DAG may be incorrect
Co-authored-by: wanlce <wh...@foxmail.com>
(cherry picked from commit 0bcca55f4881bacc3fbe86f69e71981f5552b398)
---
airflow/timetables/interval.py | 2 +-
tests/timetables/test_interval_timetable.py | 21 +++++++++++++++++++++
2 files changed, 22 insertions(+), 1 deletion(-)
diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index d669cb6..01fac3a 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -218,7 +218,7 @@ class CronDataIntervalTimetable(_DataIntervalTimetable):
raise AssertionError("next schedule shouldn't be earlier")
if earliest is None:
return new_start
- return max(new_start, earliest)
+ return max(new_start, self._align(earliest))
def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
# Get the last complete period before run_after, e.g. if a DAG run is
diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py
index 842cc1f2..fe09e0c 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -35,11 +35,32 @@ PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1)
PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END)
CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE)
+YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1)
HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE)
HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1))
+CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", TIMEZONE)
+DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
+
+
+@pytest.mark.parametrize(
+ "last_automated_data_interval",
+ [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")],
+)
+@freezegun.freeze_time(CURRENT_TIME)
+def test_no_catchup_first_starts_at_current_time(
+ last_automated_data_interval: Optional[DataInterval],
+) -> None:
+ """If ``catchup=False`` and start_date is a day before"""
+ next_info = CRON_TIMETABLE.next_dagrun_info(
+ last_automated_data_interval=last_automated_data_interval,
+ restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False),
+ )
+ expected_start = YESTERDAY + DELTA_FROM_MIDNIGHT
+ assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT)
+
@pytest.mark.parametrize(
"timetable",
[airflow] 41/43: Add note about Variable precedence with env vars (#21568)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7e8012703cb7d79386b5c59e076a81dad60eabf3
Author: Madison Swain-Bowden <bo...@spu.edu>
AuthorDate: Tue Feb 15 13:56:00 2022 -0800
Add note about Variable precedence with env vars (#21568)
This PR updates some documentation regarding setting Airflow Variables using environment variables. Environment variables take precedence over variables defined in the UI/metastore based on this default search path list: https://github.dev/apache/airflow/blob/7864693e43c40fd8f0914c05f7e196a007d16d50/airflow/secrets/__init__.py#L29-L30
(cherry picked from commit 7a268cb3c9fc6bc03f2400c6632ff8dccf4e451e)
---
docs/apache-airflow/howto/variable.rst | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/docs/apache-airflow/howto/variable.rst b/docs/apache-airflow/howto/variable.rst
index 7cb9377..401dcb1 100644
--- a/docs/apache-airflow/howto/variable.rst
+++ b/docs/apache-airflow/howto/variable.rst
@@ -62,7 +62,8 @@ You can use them in your DAGs as:
Single underscores surround ``VAR``. This is in contrast with the way ``airflow.cfg``
parameters are stored, where double underscores surround the config section name.
Variables set using Environment Variables would not appear in the Airflow UI but you will
- be able to use them in your DAG file.
+ be able to use them in your DAG file. Variables set using Environment Variables will also
+ take precedence over variables defined in the Airflow UI.
Securing Variables
------------------
[airflow] 14/43: Removed duplicated dag_run join in Dag.get_task_instances() (#20591)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6d8342e78c6b6546845a7d2d5ba0a761af73d5f0
Author: hubert-pietron <94...@users.noreply.github.com>
AuthorDate: Thu Jan 27 06:20:17 2022 +0100
Removed duplicated dag_run join in Dag.get_task_instances() (#20591)
Co-authored-by: hubert-pietron <hu...@gmail.com>
(cherry picked from commit 960f573615b5357677c10bd9f7ec11811a0355c6)
---
airflow/models/dag.py | 1 -
1 file changed, 1 deletion(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 2a08d26..477e597 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1343,7 +1343,6 @@ class DAG(LoggingMixin):
as_pk_tuple=False,
session=session,
)
- .join(TaskInstance.dag_run)
.order_by(DagRun.execution_date)
.all()
)
[airflow] 39/43: Fix slow DAG deletion due to missing ``dag_id`` index for job table (#20282)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 436f452ab8e32bfd5997e9650d1cfc490a41b0e4
Author: Kush <36...@users.noreply.github.com>
AuthorDate: Thu Dec 30 15:56:24 2021 +0530
Fix slow DAG deletion due to missing ``dag_id`` index for job table (#20282)
Fixes #20249
(cherry picked from commit ac9f29da200c208bb52d412186c5a1b936eb0b5a)
---
airflow/jobs/base_job.py | 1 +
.../587bdf053233_adding_index_for_dag_id_in_job.py | 43 ++++++++++++++++++++++
docs/apache-airflow/migrations-ref.rst | 4 +-
3 files changed, 47 insertions(+), 1 deletion(-)
diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 745f248..174e4d5 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -71,6 +71,7 @@ class BaseJob(Base, LoggingMixin):
__table_args__ = (
Index('job_type_heart', job_type, latest_heartbeat),
Index('idx_job_state_heartbeat', state, latest_heartbeat),
+ Index('idx_job_dag_id', dag_id),
)
task_instances_enqueued = relationship(
diff --git a/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py b/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py
new file mode 100644
index 0000000..3532fe9
--- /dev/null
+++ b/airflow/migrations/versions/587bdf053233_adding_index_for_dag_id_in_job.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""adding index for dag_id in job
+
+Revision ID: 587bdf053233
+Revises: f9da662e7089
+Create Date: 2021-12-14 10:20:12.482940
+
+"""
+
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = '587bdf053233'
+down_revision = 'f9da662e7089'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply adding index for dag_id in job"""
+ op.create_index('idx_job_dag_id', 'job', ['dag_id'], unique=False)
+
+
+def downgrade():
+ """Unapply adding index for dag_id in job"""
+ op.drop_index('idx_job_dag_id', table_name='job')
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 8dc1a55..0eac329 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``c381b21cb7e4`` (head) | ``be2bfac3da23`` | ``2.2.4`` | Create a ``session`` table to store web session data |
+| ``587bdf053233`` (head) | ``f9da662e7089`` | ``2.3.0`` | Add index for ``dag_id`` column in ``job`` table. |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``c381b21cb7e4`` | ``be2bfac3da23`` | ``2.2.4`` | Create a ``session`` table to store web session data |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``be2bfac3da23`` | ``7b2661a43ba3`` | ``2.2.3`` | Add has_import_errors column to DagModel |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
[airflow] 13/43: Avoid unintentional data loss when deleting DAGs (#20758)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4dc8b909bedd04094be3079c3f7384ea044ec011
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Mon Jan 10 11:55:51 2022 -0800
Avoid unintentional data loss when deleting DAGs (#20758)
(cherry picked from commit 5980d2b05eee484256c634d5efae9410265c65e9)
---
airflow/api/common/delete_dag.py | 18 +++++++++++++++---
tests/api/common/test_delete_dag.py | 14 ++++++++++++++
2 files changed, 29 insertions(+), 3 deletions(-)
diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py
index c448127..5e0afa8 100644
--- a/airflow/api/common/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -18,7 +18,7 @@
"""Delete DAGs APIs."""
import logging
-from sqlalchemy import or_
+from sqlalchemy import and_, or_
from airflow import models
from airflow.exceptions import AirflowException, DagNotFound
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
if dag is None:
raise DagNotFound(f"Dag id {dag_id} not found")
+ # deleting a DAG should also delete all of its subdags
+ dags_to_delete_query = session.query(DagModel.dag_id).filter(
+ or_(
+ DagModel.dag_id == dag_id,
+ and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
+ )
+ )
+ dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]
+
# Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
# There may be a lag, so explicitly removes serialized DAG here.
if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
@@ -65,8 +74,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
if hasattr(model, "dag_id"):
if keep_records_in_log and model.__name__ == 'Log':
continue
- cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
- count += session.query(model).filter(cond).delete(synchronize_session='fetch')
+ count += (
+ session.query(model)
+ .filter(model.dag_id.in_(dags_to_delete))
+ .delete(synchronize_session='fetch')
+ )
if dag.is_subdag:
parent_dag_id, task_id = dag_id.rsplit(".", 1)
for model in TaskFail, models.TaskInstance:
diff --git a/tests/api/common/test_delete_dag.py b/tests/api/common/test_delete_dag.py
index 0eb058a..d9dc0b0 100644
--- a/tests/api/common/test_delete_dag.py
+++ b/tests/api/common/test_delete_dag.py
@@ -162,3 +162,17 @@ class TestDeleteDAGSuccessfulDelete:
self.check_dag_models_exists()
delete_dag(dag_id=self.key, keep_records_in_log=False)
self.check_dag_models_removed(expect_logs=0)
+
+ def test_delete_dag_preserves_other_dags(self):
+
+ self.setup_dag_models()
+
+ with create_session() as session:
+ session.add(DM(dag_id=self.key + ".other_dag", fileloc=self.dag_file_path))
+ session.add(DM(dag_id=self.key + ".subdag", fileloc=self.dag_file_path, is_subdag=True))
+
+ delete_dag(self.key)
+
+ with create_session() as session:
+ assert session.query(DM).filter(DM.dag_id == self.key + ".other_dag").count() == 1
+ assert session.query(DM).filter(DM.dag_id.like(self.key + "%")).count() == 1
[airflow] 29/43: Avoid deadlock when rescheduling task (#21362)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f2fe0df6b3caa86a4315322264fad077f03b32e6
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Feb 7 20:12:05 2022 +0100
Avoid deadlock when rescheduling task (#21362)
The scheduler job performs scheduling after locking the "scheduled"
DagRun row for writing. This should prevent from modifying DagRun
and related task instances by another scheduler or "mini-scheduler"
run after task is completed.
However there is apparently one more case where the DagRun is being
locked by "Task" processes - namely when task throws
AirflowRescheduleException. In this case a new "TaskReschedule"
entity is inserted into the database and it also performs lock
on the DagRun (because TaskReschedule has "DagRun" relationship.
This PR modifies handling the AirflowRescheduleException to obtain the
very same DagRun lock before it attempts to insert TaskReschedule
entity.
Seems that TaskReschedule is the only one that has this relationship
so likely all the misterious SchedulerJob deadlock cases we
experienced might be explained (and fixed) by this one.
It is likely that this one:
* Fixes: #16982
* Fixes: #19957
(cherry picked from commit 6d110b565a505505351d1ff19592626fb24e4516)
---
airflow/models/taskinstance.py | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index ec34156..2dcc923 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -93,7 +93,7 @@ from airflow.utils.operator_helpers import context_to_airflow_vars
from airflow.utils.platform import getuser
from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import create_session, provide_session
-from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks
from airflow.utils.state import DagRunState, State
from airflow.utils.timeout import timeout
@@ -1657,11 +1657,24 @@ class TaskInstance(Base, LoggingMixin):
# Don't record reschedule request in test mode
if test_mode:
return
+
+ from airflow.models.dagrun import DagRun # Avoid circular import
+
self.refresh_from_db(session)
self.end_date = timezone.utcnow()
self.set_duration()
+ # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+ # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+ with_row_locks(
+ session.query(DagRun).filter_by(
+ dag_id=self.dag_id,
+ run_id=self.run_id,
+ ),
+ session=session,
+ ).one()
+
# Log reschedule request
session.add(
TaskReschedule(
[airflow] 11/43: Fix session usage in ``/rendered-k8s`` view (#21006)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit daebc586d0aaaddaea4658734c9292dece150c6a
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Fri Jan 21 21:44:40 2022 +0800
Fix session usage in ``/rendered-k8s`` view (#21006)
We can't commit the session too early because later functions need that
session to fetch related objects.
Fix #20534.
(cherry picked from commit a665f48b606065977e0d3952bc74635ce11726d1)
---
airflow/www/views.py | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 9b868f3..2182a17 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -84,7 +84,7 @@ from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
from sqlalchemy import Date, and_, desc, func, inspect, union_all
from sqlalchemy.exc import IntegrityError
-from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import Session, joinedload
from wtforms import SelectField, validators
from wtforms.validators import InputRequired
@@ -116,7 +116,7 @@ from airflow.utils.docs import get_doc_url_for_provider, get_docs_url
from airflow.utils.helpers import alchemy_to_dict
from airflow.utils.log import secrets_masker
from airflow.utils.log.log_reader import TaskLogReader
-from airflow.utils.session import create_session, provide_session
+from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import State
from airflow.utils.strings import to_boolean
from airflow.version import version
@@ -1124,7 +1124,8 @@ class Airflow(AirflowBaseView):
]
)
@action_logging
- def rendered_k8s(self):
+ @provide_session
+ def rendered_k8s(self, session: Session = NEW_SESSION):
"""Get rendered k8s yaml."""
if not settings.IS_K8S_OR_K8SCELERY_EXECUTOR:
abort(404)
@@ -1135,14 +1136,15 @@ class Airflow(AirflowBaseView):
form = DateTimeForm(data={'execution_date': dttm})
root = request.args.get('root', '')
logging.info("Retrieving rendered templates.")
- dag = current_app.dag_bag.get_dag(dag_id)
+
+ dag: DAG = current_app.dag_bag.get_dag(dag_id)
task = dag.get_task(task_id)
- dag_run = dag.get_dagrun(execution_date=dttm)
- ti = dag_run.get_task_instance(task_id=task.task_id)
+ dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+ ti = dag_run.get_task_instance(task_id=task.task_id, session=session)
pod_spec = None
try:
- pod_spec = ti.get_rendered_k8s_spec()
+ pod_spec = ti.get_rendered_k8s_spec(session=session)
except AirflowException as e:
msg = "Error rendering Kubernetes POD Spec: " + escape(e)
if e.__cause__:
[airflow] 07/43: Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 07102e96dfb3c9794882f562548b37738ee4a37a
Author: yuqian90 <yu...@gmail.com>
AuthorDate: Thu Jan 27 06:47:10 2022 +0800
Do not set `TaskInstance.max_tries` in `refresh_from_task` (#21018)
(cherry picked from commit e3832a77a3e0d374dfdbe14f34a941d22c9c459d)
---
airflow/models/taskinstance.py | 4 +++-
tests/models/test_taskinstance.py | 6 ++++++
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 281d067..ec34156 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -447,6 +447,7 @@ class TaskInstance(Base, LoggingMixin):
self.run_id = run_id
self.try_number = 0
+ self.max_tries = self.task.retries
self.unixname = getuser()
if state:
self.state = state
@@ -775,7 +776,8 @@ class TaskInstance(Base, LoggingMixin):
self.pool_slots = task.pool_slots
self.priority_weight = task.priority_weight_total
self.run_as_user = task.run_as_user
- self.max_tries = task.retries
+ # Do not set max_tries to task.retries here because max_tries is a cumulative
+ # value that needs to be stored in the db.
self.executor_config = task.executor_config
self.operator = task.task_type
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index d111371..4fec49f 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2143,6 +2143,12 @@ def test_refresh_from_task(pool_override):
assert ti.executor_config == task.executor_config
assert ti.operator == DummyOperator.__name__
+ # Test that refresh_from_task does not reset ti.max_tries
+ expected_max_tries = task.retries + 10
+ ti.max_tries = expected_max_tries
+ ti.refresh_from_task(task)
+ assert ti.max_tries == expected_max_tries
+
class TestRunRawTaskQueriesCount:
"""
[airflow] 22/43: Update recipe for Google Cloud SDK (#21268)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4b3fa3a99a90eff00b244a62b52a2d6c8e25d285
Author: Kamil Breguła <mi...@users.noreply.github.com>
AuthorDate: Thu Feb 3 19:20:12 2022 +0100
Update recipe for Google Cloud SDK (#21268)
(cherry picked from commit 874a22ee9b77f8f100736558723ceaf2d04b446b)
---
docs/docker-stack/docker-images-recipes/gcloud.Dockerfile | 1 +
1 file changed, 1 insertion(+)
diff --git a/docs/docker-stack/docker-images-recipes/gcloud.Dockerfile b/docs/docker-stack/docker-images-recipes/gcloud.Dockerfile
index b1589e1..48f7c2d 100644
--- a/docs/docker-stack/docker-images-recipes/gcloud.Dockerfile
+++ b/docs/docker-stack/docker-images-recipes/gcloud.Dockerfile
@@ -36,6 +36,7 @@ RUN DOWNLOAD_URL="https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/goo
--additional-components alpha beta kubectl \
--quiet \
&& rm -rf "${TMP_DIR}" \
+ && rm -rf "${GCLOUD_HOME}/.install/.backup/" \
&& gcloud --version
USER ${AIRFLOW_UID}
[airflow] 04/43: Add back legacy .piprc customization for pip (#21124)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 680c011f3050d2858e5d648b68e59185a213709c
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Wed Jan 26 18:04:19 2022 +0100
Add back legacy .piprc customization for pip (#21124)
This change brings back backwards compatibility to using .piprc
to customize Airflow Image. Some older vrsions of pip used .piprc
(even though documentation about is difficult to find now) and we
used to support this option. With #20445, we changed to use
(fully documented) ``pip.conf`` option, however if someone used
.piprc before to customize their image, this change would break it.
The PR brings back also the .piprc option to the image (even if
it is not really clear whether current and future versions of pip
will support it.
(cherry picked from commit d5a9edf25723396d17fd10bb980fb99ccac618bb)
---
Dockerfile | 3 +++
docs/docker-stack/build.rst | 10 +++++++++-
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git a/Dockerfile b/Dockerfile
index f880ec5..aadf896 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -212,6 +212,9 @@ USER airflow
RUN if [[ -f /docker-context-files/pip.conf ]]; then \
mkdir -p ${AIRFLOW_USER_HOME_DIR}/.config/pip; \
cp /docker-context-files/pip.conf "${AIRFLOW_USER_HOME_DIR}/.config/pip/pip.conf"; \
+ fi; \
+ if [[ -f /docker-context-files/.piprc ]]; then \
+ cp /docker-context-files/.piprc "${AIRFLOW_USER_HOME_DIR}/.piprc"; \
fi
ENV AIRFLOW_PIP_VERSION=${AIRFLOW_PIP_VERSION} \
diff --git a/docs/docker-stack/build.rst b/docs/docker-stack/build.rst
index 2702c66..b85bf1c 100644
--- a/docs/docker-stack/build.rst
+++ b/docs/docker-stack/build.rst
@@ -522,13 +522,21 @@ described below but here is an example of rather complex command to customize th
based on example in `this comment <https://github.com/apache/airflow/issues/8605#issuecomment-690065621>`_:
In case you need to use your custom PyPI package indexes, you can also customize PYPI sources used during
-image build by adding a ``docker-context-files``/``pip.conf`` file when building the image.
+image build by adding a ``docker-context-files/pip.conf`` file when building the image.
This ``pip.conf`` will not be committed to the repository (it is added to ``.gitignore``) and it will not be
present in the final production image. It is added and used only in the build segment of the image.
Therefore this ``pip.conf`` file can safely contain list of package indexes you want to use,
usernames and passwords used for authentication. More details about ``pip.conf`` file can be found in the
`pip configuration <https://pip.pypa.io/en/stable/topics/configuration/>`_.
+If you used the ``.piprc`` before (some older versions of ``pip`` used it for customization), you can put it
+in the ``docker-context-files/.piprc`` file and it will be automatically copied to ``HOME`` directory
+of the ``airflow`` user.
+
+Note, that those customizations are only available in the ``build`` segment of the Airflow image and they
+are not present in the ``final`` image. If you wish to extend the final image and add custom ``.piprc`` and
+``pip.conf``, you should add them in your own Dockerfile used to extend the Airflow image.
+
Such customizations are independent of the way how airflow is installed.
.. note::
[airflow] 03/43: Update logging-tasks.rst (#21088)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9f6d6b9a13d808c0faff899f79a7bbcaf78fc5aa
Author: caxefaizan <63...@users.noreply.github.com>
AuthorDate: Wed Jan 26 04:15:58 2022 +0530
Update logging-tasks.rst (#21088)
(cherry picked from commit 156284650f20bad131f26b91061e207e2e39253e)
---
docs/apache-airflow/logging-monitoring/logging-tasks.rst | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/apache-airflow/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/logging-monitoring/logging-tasks.rst
index 13cb248..e64905f 100644
--- a/docs/apache-airflow/logging-monitoring/logging-tasks.rst
+++ b/docs/apache-airflow/logging-monitoring/logging-tasks.rst
@@ -122,7 +122,7 @@ Serving logs from workers
Most task handlers send logs upon completion of a task. In order to view logs in real time, Airflow automatically starts an HTTP server to serve the logs in the following cases:
-- If ``SchedulerExecutor`` or ``LocalExecutor`` is used, then when ``airflow scheduler`` is running.
+- If ``SequentialExecutor`` or ``LocalExecutor`` is used, then when ``airflow scheduler`` is running.
- If ``CeleryExecutor`` is used, then when ``airflow worker`` is running.
The server is running on the port specified by ``worker_log_server_port`` option in ``[logging]`` section. By default, it is ``8793``.
[airflow] 01/43: Update v1.yaml (#21024)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 31c66eb9d7722ba751ecee3540b350cb685891f0
Author: Ilia Lazebnik <Il...@gmail.com>
AuthorDate: Sun Jan 23 21:29:52 2022 +0200
Update v1.yaml (#21024)
(cherry picked from commit 2af0f700857cbf7401d930ff24cdff273b501beb)
---
airflow/api_connexion/openapi/v1.yaml | 2 --
1 file changed, 2 deletions(-)
diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 3669c66..e7553a3 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2161,8 +2161,6 @@ components:
The value of this field can be set only when creating the object. If you try to modify the
field of an existing object, the request fails with an BAD_REQUEST error.
- required:
- - dag_id
UpdateDagRunState:
type: object
[airflow] 20/43: Update version to 2.2.4 for things in that release (#21196)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2066812960018ce0d7ba774dd2f9fe5c0d8b52a4
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Feb 1 12:05:04 2022 -0700
Update version to 2.2.4 for things in that release (#21196)
(cherry picked from commit 093702e9f579ee028a103cdc9acf0e6acccd6d79)
---
airflow/api/common/experimental/get_code.py | 2 +-
airflow/api/common/experimental/get_dag_run_state.py | 2 +-
airflow/api/common/experimental/get_task.py | 2 +-
airflow/api/common/experimental/get_task_instance.py | 2 +-
airflow/api/common/experimental/pool.py | 8 ++++----
5 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/airflow/api/common/experimental/get_code.py b/airflow/api/common/experimental/get_code.py
index 1a1fb62..d4232b1 100644
--- a/airflow/api/common/experimental/get_code.py
+++ b/airflow/api/common/experimental/get_code.py
@@ -23,7 +23,7 @@ from airflow.exceptions import AirflowException, DagCodeNotFound
from airflow.models.dagcode import DagCode
-@deprecated(reason="Use DagCode().get_code_by_fileloc() instead", version="2.2.3")
+@deprecated(reason="Use DagCode().get_code_by_fileloc() instead", version="2.2.4")
def get_code(dag_id: str) -> str:
"""Return python code of a given dag_id.
diff --git a/airflow/api/common/experimental/get_dag_run_state.py b/airflow/api/common/experimental/get_dag_run_state.py
index b2dedd5..7201186 100644
--- a/airflow/api/common/experimental/get_dag_run_state.py
+++ b/airflow/api/common/experimental/get_dag_run_state.py
@@ -24,7 +24,7 @@ from deprecated import deprecated
from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
-@deprecated(reason="Use DagRun().get_state() instead", version="2.2.3")
+@deprecated(reason="Use DagRun().get_state() instead", version="2.2.4")
def get_dag_run_state(dag_id: str, execution_date: datetime) -> Dict[str, str]:
"""Return the Dag Run state identified by the given dag_id and execution_date.
diff --git a/airflow/api/common/experimental/get_task.py b/airflow/api/common/experimental/get_task.py
index fae5fd7..4589cc6 100644
--- a/airflow/api/common/experimental/get_task.py
+++ b/airflow/api/common/experimental/get_task.py
@@ -22,7 +22,7 @@ from airflow.api.common.experimental import check_and_get_dag
from airflow.models import TaskInstance
-@deprecated(reason="Use DAG().get_task", version="2.2.3")
+@deprecated(reason="Use DAG().get_task", version="2.2.4")
def get_task(dag_id: str, task_id: str) -> TaskInstance:
"""Return the task object identified by the given dag_id and task_id."""
dag = check_and_get_dag(dag_id, task_id)
diff --git a/airflow/api/common/experimental/get_task_instance.py b/airflow/api/common/experimental/get_task_instance.py
index 137f8a3..7361efd 100644
--- a/airflow/api/common/experimental/get_task_instance.py
+++ b/airflow/api/common/experimental/get_task_instance.py
@@ -25,7 +25,7 @@ from airflow.exceptions import TaskInstanceNotFound
from airflow.models import TaskInstance
-@deprecated(version="2.2.3", reason="Use DagRun.get_task_instance instead")
+@deprecated(version="2.2.4", reason="Use DagRun.get_task_instance instead")
def get_task_instance(dag_id: str, task_id: str, execution_date: datetime) -> TaskInstance:
"""Return the task instance identified by the given dag_id, task_id and execution_date."""
dag = check_and_get_dag(dag_id, task_id)
diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py
index 0b9c3a5..fe4f161 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -23,7 +23,7 @@ from airflow.models import Pool
from airflow.utils.session import provide_session
-@deprecated(reason="Use Pool.get_pool() instead", version="2.2.3")
+@deprecated(reason="Use Pool.get_pool() instead", version="2.2.4")
@provide_session
def get_pool(name, session=None):
"""Get pool by a given name."""
@@ -37,14 +37,14 @@ def get_pool(name, session=None):
return pool
-@deprecated(reason="Use Pool.get_pools() instead", version="2.2.3")
+@deprecated(reason="Use Pool.get_pools() instead", version="2.2.4")
@provide_session
def get_pools(session=None):
"""Get all pools."""
return session.query(Pool).all()
-@deprecated(reason="Use Pool.create_pool() instead", version="2.2.3")
+@deprecated(reason="Use Pool.create_pool() instead", version="2.2.4")
@provide_session
def create_pool(name, slots, description, session=None):
"""Create a pool with a given parameters."""
@@ -75,7 +75,7 @@ def create_pool(name, slots, description, session=None):
return pool
-@deprecated(reason="Use Pool.delete_pool() instead", version="2.2.3")
+@deprecated(reason="Use Pool.delete_pool() instead", version="2.2.4")
@provide_session
def delete_pool(name, session=None):
"""Delete pool by a given name."""
[airflow] 27/43: Update example DAGs (#21372)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5c078cda332d42baba72c985532e0060d30a31ef
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Feb 7 11:24:31 2022 -0700
Update example DAGs (#21372)
(cherry picked from commit 7a38ec2ad3b3bd6fda5e1ee9fe9e644ccb8b4c12)
---
.../example_dags/example_passing_params_via_test_command.py | 11 ++++++-----
airflow/example_dags/tutorial.py | 2 --
docs/apache-airflow/tutorial.rst | 9 ++-------
tests/cli/commands/test_task_command.py | 1 -
4 files changed, 8 insertions(+), 15 deletions(-)
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index e3f04c4..d4781af 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -68,17 +68,18 @@ with DAG(
) as dag:
run_this = my_py_command(params={"miff": "agg"})
- my_templated_command = dedent(
+ my_command = dedent(
+ """
+ echo "'foo' was passed in via Airflow CLI Test command with value '$FOO'"
+ echo "'miff' was passed in via BashOperator with value '$MIFF'"
"""
- echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
- echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
- """
)
also_run_this = BashOperator(
task_id='also_run_this',
- bash_command=my_templated_command,
+ bash_command=my_command,
params={"miff": "agg"},
+ env={"FOO": "{{ params.foo }}", "MIFF": "{{ params.miff }}"},
)
env_var_test_task = print_env_vars()
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index 1049772..ff2bd2f 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -109,7 +109,6 @@ with DAG(
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
- echo "{{ params.my_param }}"
{% endfor %}
"""
)
@@ -118,7 +117,6 @@ with DAG(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
- params={'my_param': 'Parameter I passed in'},
)
# [END jinja_template]
diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 1c32e78..0034b2c 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -151,13 +151,8 @@ stamp").
:end-before: [END jinja_template]
Notice that the ``templated_command`` contains code logic in ``{% %}`` blocks,
-references parameters like ``{{ ds }}``, calls a function as in
-``{{ macros.ds_add(ds, 7)}}``, and references a user-defined parameter
-in ``{{ params.my_param }}``.
-
-The ``params`` hook in ``BaseOperator`` allows you to pass a dictionary of
-parameters and/or objects to your templates. Please take the time
-to understand how the parameter ``my_param`` makes it through to the template.
+references parameters like ``{{ ds }}``, and calls a function as in
+``{{ macros.ds_add(ds, 7)}}``.
Files can also be passed to the ``bash_command`` argument, like
``bash_command='templated_command.sh'``, where the file location is relative to
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index 201af16..76c6cdb 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -263,7 +263,6 @@ class TestCliTasks(unittest.TestCase):
assert 'echo "2016-01-01"' in output
assert 'echo "2016-01-08"' in output
- assert 'echo "Parameter I passed in"' in output
def test_cli_run_when_pickle_and_dag_cli_method_selected(self):
"""
[airflow] 34/43: Use compat data interval shim in log handlers (#21289)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 79e995480822fd68c715c6ab5d83357721fa2d55
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Sat Feb 12 11:40:29 2022 +0800
Use compat data interval shim in log handlers (#21289)
(cherry picked from commit 44bd211b19dcb75eeb53ced5bea2cf0c80654b1a)
---
.../providers/elasticsearch/log/es_task_handler.py | 27 ++++++++++++-----
airflow/utils/log/file_task_handler.py | 35 +++++++++++++++++-----
2 files changed, 46 insertions(+), 16 deletions(-)
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index cd08971..b591aef 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -101,15 +101,25 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
self.context_set = False
def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
- dag_run = ti.dag_run
+ dag_run = ti.get_dagrun()
+ try:
+ data_interval: Tuple[datetime, datetime] = ti.task.dag.get_run_data_interval(dag_run)
+ except AttributeError: # ti.task is not always set.
+ data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
if self.json_format:
- data_interval_start = self._clean_date(dag_run.data_interval_start)
- data_interval_end = self._clean_date(dag_run.data_interval_end)
+ data_interval_start = self._clean_date(data_interval[0])
+ data_interval_end = self._clean_date(data_interval[1])
execution_date = self._clean_date(dag_run.execution_date)
else:
- data_interval_start = dag_run.data_interval_start.isoformat()
- data_interval_end = dag_run.data_interval_end.isoformat()
+ if data_interval[0]:
+ data_interval_start = data_interval[0].isoformat()
+ else:
+ data_interval_start = ""
+ if data_interval[1]:
+ data_interval_end = data_interval[1].isoformat()
+ else:
+ data_interval_end = ""
execution_date = dag_run.execution_date.isoformat()
return self.log_id_template.format(
@@ -123,14 +133,15 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
)
@staticmethod
- def _clean_date(value: datetime) -> str:
+ def _clean_date(value: Optional[datetime]) -> str:
"""
Clean up a date value so that it is safe to query in elasticsearch
by removing reserved characters.
- # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
- :param execution_date: execution date of the dag run.
+ https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
"""
+ if value is None:
+ return ""
return value.strftime("%Y_%m_%dT%H_%M_%S_%f")
def _group_logs_by_host(self, logs):
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6e57c67..e13b8d4 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -18,8 +18,9 @@
"""File logging handler for tasks."""
import logging
import os
+from datetime import datetime
from pathlib import Path
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, Optional, Tuple
import httpx
from itsdangerous import TimedJSONWebSignatureSerializer
@@ -82,13 +83,31 @@ class FileTaskHandler(logging.Handler):
context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat())
context["try_number"] = try_number
return render_template_to_string(self.filename_jinja_template, context)
-
- return self.filename_template.format(
- dag_id=ti.dag_id,
- task_id=ti.task_id,
- execution_date=ti.get_dagrun().logical_date.isoformat(),
- try_number=try_number,
- )
+ elif self.filename_template:
+ dag_run = ti.get_dagrun()
+ try:
+ data_interval: Tuple[datetime, datetime] = ti.task.dag.get_run_data_interval(dag_run)
+ except AttributeError: # ti.task is not always set.
+ data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
+ if data_interval[0]:
+ data_interval_start = data_interval[0].isoformat()
+ else:
+ data_interval_start = ""
+ if data_interval[1]:
+ data_interval_end = data_interval[1].isoformat()
+ else:
+ data_interval_end = ""
+ return self.filename_template.format(
+ dag_id=ti.dag_id,
+ task_id=ti.task_id,
+ run_id=ti.run_id,
+ data_interval_start=data_interval_start,
+ data_interval_end=data_interval_end,
+ execution_date=ti.get_dagrun().logical_date.isoformat(),
+ try_number=try_number,
+ )
+ else:
+ raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen")
def _read_grouped_logs(self):
return False
[airflow] 42/43: Adding missing login provider related methods from Flask-Appbuilder (#21294)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8cbf9340ec020810b505d0ccf197435eb0e8a704
Author: Pankaj Singh <aa...@gmail.com>
AuthorDate: Fri Feb 18 02:25:22 2022 +0530
Adding missing login provider related methods from Flask-Appbuilder (#21294)
(cherry picked from commit 38894e8013b5c38468e912164f80282e3b579993)
---
airflow/www/fab_security/manager.py | 15 +++++++++++++++
setup.cfg | 7 ++++++-
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/airflow/www/fab_security/manager.py b/airflow/www/fab_security/manager.py
index e340c17..f5385a6 100644
--- a/airflow/www/fab_security/manager.py
+++ b/airflow/www/fab_security/manager.py
@@ -187,6 +187,7 @@ class BaseSecurityManager:
# Role Mapping
app.config.setdefault("AUTH_ROLES_MAPPING", {})
app.config.setdefault("AUTH_ROLES_SYNC_AT_LOGIN", False)
+ app.config.setdefault("AUTH_API_LOGIN_ALLOW_MULTIPLE_PROVIDERS", False)
# LDAP Config
if self.auth_type == AUTH_LDAP:
@@ -293,11 +294,21 @@ class BaseSecurityManager:
return _roles
@property
+ def auth_type_provider_name(self):
+ provider_to_auth_type = {AUTH_DB: "db", AUTH_LDAP: "ldap"}
+ return provider_to_auth_type.get(self.auth_type)
+
+ @property
def get_url_for_registeruser(self):
"""Gets the URL for Register User"""
return url_for(f"{self.registeruser_view.endpoint}.{self.registeruser_view.default_view}")
@property
+ def get_user_datamodel(self):
+ """Gets the User data model"""
+ return self.user_view.datamodel
+
+ @property
def get_register_user_datamodel(self):
"""Gets the Register User data model"""
return self.registerusermodelview.datamodel
@@ -308,6 +319,10 @@ class BaseSecurityManager:
return self._builtin_roles
@property
+ def api_login_allow_multiple_providers(self):
+ return self.appbuilder.get_app.config["AUTH_API_LOGIN_ALLOW_MULTIPLE_PROVIDERS"]
+
+ @property
def auth_type(self):
"""Get the auth type"""
return self.appbuilder.get_app.config["AUTH_TYPE"]
diff --git a/setup.cfg b/setup.cfg
index 8e36d06..12bdeae 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -104,7 +104,12 @@ install_requires =
# https://github.com/readthedocs/sphinx_rtd_theme/issues/1115
docutils<0.17
flask>=1.1.0, <2.0
- flask-appbuilder>=3.3.4, <4.0.0
+ # We are tightly coupled with FAB version because we vendored in part of FAB code related to security manager
+ # This is done as part of preparation to removing FAB as dependency, but we are not ready for it yet
+ # Every time we update FAB version here, please make sure that you review the classes and models in
+ # `airflow/www/fab_security` with their upstream counterparts. In particular, make sure any breaking changes,
+ # for example any new methods, are accounted for.
+ flask-appbuilder==3.4.4
flask-caching>=1.5.0, <2.0.0
flask-login>=0.3, <0.5
# Strict upper-bound on the latest release of flask-session,
[airflow] 33/43: Fix postgres hook import pipeline tutorial (#21491)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit efc281829c6d9458ae83e026afcc753fe935ba75
Author: KevinYanesG <75...@users.noreply.github.com>
AuthorDate: Thu Feb 10 15:38:53 2022 +0100
Fix postgres hook import pipeline tutorial (#21491)
(cherry picked from commit a2abf663157aea14525e1a55eb9735ba659ae8d6)
---
docs/apache-airflow/tutorial.rst | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 0034b2c..a8f76bc 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -413,7 +413,7 @@ Let's break this down into 2 steps: get data & merge data:
import requests
from airflow.decorators import task
- from airflow.hooks.postgres import PostgresHook
+ from airflow.providers.postgres.hooks.postgres import PostgresHook
@task
@@ -478,7 +478,7 @@ Lets look at our DAG:
import requests
from airflow.decorators import dag, task
- from airflow.hooks.postgres import PostgresHook
+ from airflow.providers.postgres.hooks.postgres import PostgresHook
@dag(
[airflow] 31/43: Fix TriggerDagRunOperator extra link (#19410)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 95eaef37f621e8abaa30bb145866f1863471fdd6
Author: Niko <on...@amazon.com>
AuthorDate: Thu Dec 9 05:46:59 2021 -0800
Fix TriggerDagRunOperator extra link (#19410)
The extra link provided by the operator was previously using the
execution date of the triggering dag, not the triggered dag. Store the
execution date of the triggered dag in xcom so that it can be read back
later within the webserver when the link is being created.
(cherry picked from commit 820e836c4a2e45239279d4d71e1db9434022fec5)
---
airflow/operators/trigger_dagrun.py | 19 ++++++++++++-
tests/operators/test_trigger_dagrun.py | 49 +++++++++++++++++++++++++++-------
2 files changed, 57 insertions(+), 11 deletions(-)
diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 421c796..a346db1 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -24,11 +24,15 @@ from typing import Dict, List, Optional, Union
from airflow.api.common.trigger_dag import trigger_dag
from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists
from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
+from airflow.models.xcom import XCom
from airflow.utils import timezone
from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.state import State
from airflow.utils.types import DagRunType
+XCOM_EXECUTION_DATE_ISO = "trigger_execution_date_iso"
+XCOM_RUN_ID = "trigger_run_id"
+
class TriggerDagRunLink(BaseOperatorLink):
"""
@@ -39,7 +43,13 @@ class TriggerDagRunLink(BaseOperatorLink):
name = 'Triggered DAG'
def get_link(self, operator, dttm):
- query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()}
+ # Fetch the correct execution date for the triggerED dag which is
+ # stored in xcom during execution of the triggerING task.
+ trigger_execution_date_iso = XCom.get_one(
+ execution_date=dttm, key=XCOM_EXECUTION_DATE_ISO, task_id=operator.task_id, dag_id=operator.dag_id
+ )
+
+ query = {"dag_id": operator.trigger_dag_id, "base_date": trigger_execution_date_iso}
return build_airflow_url_with_query(query)
@@ -140,6 +150,7 @@ class TriggerDagRunOperator(BaseOperator):
execution_date=self.execution_date,
replace_microseconds=False,
)
+
except DagRunAlreadyExists as e:
if self.reset_dag_run:
self.log.info("Clearing %s on %s", self.trigger_dag_id, self.execution_date)
@@ -157,6 +168,12 @@ class TriggerDagRunOperator(BaseOperator):
else:
raise e
+ # Store the execution date from the dag run (either created or found above) to
+ # be used when creating the extra link on the webserver.
+ ti = context['task_instance']
+ ti.xcom_push(key=XCOM_EXECUTION_DATE_ISO, value=dag_run.execution_date.isoformat())
+ ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)
+
if self.wait_for_completion:
# wait for dag to complete
while True:
diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py
index 9ff8735..1934c4d 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -19,7 +19,7 @@
import pathlib
import tempfile
from datetime import datetime
-from unittest import TestCase
+from unittest import TestCase, mock
import pytest
@@ -76,6 +76,25 @@ class TestDagRunOperator(TestCase):
pathlib.Path(self._tmpfile).unlink()
+ @mock.patch('airflow.operators.trigger_dagrun.build_airflow_url_with_query')
+ def assert_extra_link(self, triggering_exec_date, triggered_dag_run, triggering_task, mock_build_url):
+ """
+ Asserts whether the correct extra links url will be created.
+
+ Specifically it tests whether the correct dag id and date are passed to
+ the method which constructs the final url.
+ Note: We can't run that method to generate the url itself because the Flask app context
+ isn't available within the test logic, so it is mocked here.
+ """
+ triggering_task.get_extra_links(triggering_exec_date, 'Triggered DAG')
+ assert mock_build_url.called
+ args, _ = mock_build_url.call_args
+ expected_args = {
+ 'dag_id': triggered_dag_run.dag_id,
+ 'base_date': triggered_dag_run.execution_date.isoformat(),
+ }
+ assert expected_args in args
+
def test_trigger_dagrun(self):
"""Test TriggerDagRunOperator."""
task = TriggerDagRunOperator(task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, dag=self.dag)
@@ -84,7 +103,9 @@ class TestDagRunOperator(TestCase):
with create_session() as session:
dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
assert len(dagruns) == 1
- assert dagruns[0].external_trigger
+ triggered_dag_run = dagruns[0]
+ assert triggered_dag_run.external_trigger
+ self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
def test_trigger_dagrun_custom_run_id(self):
task = TriggerDagRunOperator(
@@ -114,8 +135,10 @@ class TestDagRunOperator(TestCase):
with create_session() as session:
dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
assert len(dagruns) == 1
- assert dagruns[0].external_trigger
- assert dagruns[0].execution_date == utc_now
+ triggered_dag_run = dagruns[0]
+ assert triggered_dag_run.external_trigger
+ assert triggered_dag_run.execution_date == utc_now
+ self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
def test_trigger_dagrun_twice(self):
"""Test TriggerDagRunOperator with custom execution_date."""
@@ -140,12 +163,14 @@ class TestDagRunOperator(TestCase):
)
session.add(dag_run)
session.commit()
- task.execute(None)
+ task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
assert len(dagruns) == 1
- assert dagruns[0].external_trigger
- assert dagruns[0].execution_date == utc_now
+ triggered_dag_run = dagruns[0]
+ assert triggered_dag_run.external_trigger
+ assert triggered_dag_run.execution_date == utc_now
+ self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
def test_trigger_dagrun_with_templated_execution_date(self):
"""Test TriggerDagRunOperator with templated execution_date."""
@@ -160,8 +185,10 @@ class TestDagRunOperator(TestCase):
with create_session() as session:
dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
assert len(dagruns) == 1
- assert dagruns[0].external_trigger
- assert dagruns[0].execution_date == DEFAULT_DATE
+ triggered_dag_run = dagruns[0]
+ assert triggered_dag_run.external_trigger
+ assert triggered_dag_run.execution_date == DEFAULT_DATE
+ self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
def test_trigger_dagrun_operator_conf(self):
"""Test passing conf to the triggered DagRun."""
@@ -288,7 +315,9 @@ class TestDagRunOperator(TestCase):
.all()
)
assert len(dagruns) == 2
- assert dagruns[1].state == State.QUEUED
+ triggered_dag_run = dagruns[1]
+ assert triggered_dag_run.state == State.QUEUED
+ self.assert_extra_link(execution_date, triggered_dag_run, task)
def test_trigger_dagrun_triggering_itself_with_execution_date(self):
"""Test TriggerDagRunOperator that triggers itself with execution date,
[airflow] 32/43: Fix mismatch in generated run_id and logical date of DAG run (#18707)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1c2340558b96cde92390bf1b4dc9483236675e18
Author: David Caron <dc...@gmail.com>
AuthorDate: Thu Feb 3 21:14:19 2022 -0500
Fix mismatch in generated run_id and logical date of DAG run (#18707)
Co-authored-by: Tzu-ping Chung <tp...@astronomer.io>
Co-authored-by: Jed Cunningham <je...@apache.org>
(cherry picked from commit 1f08d281632670aef1de8dfc62c9f63aeec18760)
---
airflow/operators/trigger_dagrun.py | 20 +++++++++-----------
tests/operators/test_trigger_dagrun.py | 25 ++++++++++++-------------
2 files changed, 21 insertions(+), 24 deletions(-)
diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index a346db1..7dae196 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -115,13 +115,13 @@ class TriggerDagRunOperator(BaseOperator):
self.allowed_states = allowed_states or [State.SUCCESS]
self.failed_states = failed_states or [State.FAILED]
- if not isinstance(execution_date, (str, datetime.datetime, type(None))):
+ if execution_date is not None and not isinstance(execution_date, (str, datetime.datetime)):
raise TypeError(
"Expected str or datetime.datetime type for execution_date."
"Got {}".format(type(execution_date))
)
- self.execution_date: Optional[datetime.datetime] = execution_date # type: ignore
+ self.execution_date = execution_date
try:
json.dumps(self.conf)
@@ -130,30 +130,28 @@ class TriggerDagRunOperator(BaseOperator):
def execute(self, context: Dict):
if isinstance(self.execution_date, datetime.datetime):
- execution_date = self.execution_date
+ parsed_execution_date = self.execution_date
elif isinstance(self.execution_date, str):
- execution_date = timezone.parse(self.execution_date)
- self.execution_date = execution_date
+ parsed_execution_date = timezone.parse(self.execution_date)
else:
- execution_date = timezone.utcnow()
+ parsed_execution_date = timezone.utcnow()
if self.trigger_run_id:
run_id = self.trigger_run_id
else:
- run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
-
+ run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_execution_date)
try:
dag_run = trigger_dag(
dag_id=self.trigger_dag_id,
run_id=run_id,
conf=self.conf,
- execution_date=self.execution_date,
+ execution_date=parsed_execution_date,
replace_microseconds=False,
)
except DagRunAlreadyExists as e:
if self.reset_dag_run:
- self.log.info("Clearing %s on %s", self.trigger_dag_id, self.execution_date)
+ self.log.info("Clearing %s on %s", self.trigger_dag_id, parsed_execution_date)
# Get target dag object and call clear()
@@ -163,7 +161,7 @@ class TriggerDagRunOperator(BaseOperator):
dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
dag = dag_bag.get_dag(self.trigger_dag_id)
- dag.clear(start_date=self.execution_date, end_date=self.execution_date)
+ dag.clear(start_date=parsed_execution_date, end_date=parsed_execution_date)
dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
else:
raise e
diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py
index 1934c4d..180781e 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -30,6 +30,7 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
+from airflow.utils.types import DagRunType
DEFAULT_DATE = datetime(2019, 1, 1, tzinfo=timezone.utc)
TEST_DAG_ID = "testdag"
@@ -101,11 +102,10 @@ class TestDagRunOperator(TestCase):
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
with create_session() as session:
- dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
- assert len(dagruns) == 1
- triggered_dag_run = dagruns[0]
- assert triggered_dag_run.external_trigger
- self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
+ dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one()
+ assert dagrun.external_trigger
+ assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.execution_date)
+ self.assert_extra_link(DEFAULT_DATE, dagrun, task)
def test_trigger_dagrun_custom_run_id(self):
task = TriggerDagRunOperator(
@@ -123,22 +123,21 @@ class TestDagRunOperator(TestCase):
def test_trigger_dagrun_with_execution_date(self):
"""Test TriggerDagRunOperator with custom execution_date."""
- utc_now = timezone.utcnow()
+ custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
task = TriggerDagRunOperator(
task_id="test_trigger_dagrun_with_execution_date",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=utc_now,
+ execution_date=custom_execution_date,
dag=self.dag,
)
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
with create_session() as session:
- dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
- assert len(dagruns) == 1
- triggered_dag_run = dagruns[0]
- assert triggered_dag_run.external_trigger
- assert triggered_dag_run.execution_date == utc_now
- self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
+ dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one()
+ assert dagrun.external_trigger
+ assert dagrun.execution_date == custom_execution_date
+ assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date)
+ self.assert_extra_link(DEFAULT_DATE, dagrun, task)
def test_trigger_dagrun_twice(self):
"""Test TriggerDagRunOperator with custom execution_date."""
[airflow] 21/43: Augment xcom docs (#20755)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 88900872e642903eff59c82f319d5c137ff5d5db
Author: Lewis John McGibbney <le...@gmail.com>
AuthorDate: Thu Feb 3 08:50:25 2022 -0800
Augment xcom docs (#20755)
(cherry picked from commit 40d3a76a9bce2360b951f2e990cba571c5f51a76)
---
docs/apache-airflow/concepts/xcoms.rst | 40 +++++++++++++++++++++++++++++++---
1 file changed, 37 insertions(+), 3 deletions(-)
diff --git a/docs/apache-airflow/concepts/xcoms.rst b/docs/apache-airflow/concepts/xcoms.rst
index eb11ff7..57b9e54 100644
--- a/docs/apache-airflow/concepts/xcoms.rst
+++ b/docs/apache-airflow/concepts/xcoms.rst
@@ -42,8 +42,8 @@ XComs are a relative of :doc:`variables`, with the main difference being that XC
Note: If the first task run is not succeeded then on every retry task XComs will be cleared to make the task run idempotent.
-Custom Backends
----------------
+Custom XCom Backends
+--------------------
The XCom system has interchangeable backends, and you can set which backend is being used via the ``xcom_backend`` configuration option.
@@ -51,4 +51,38 @@ If you want to implement your own backend, you should subclass :class:`~airflow.
There is also an ``orm_deserialize_value`` method that is called whenever the XCom objects are rendered for UI or reporting purposes; if you have large or expensive-to-retrieve values in your XComs, you should override this method to avoid calling that code (and instead return a lighter, incomplete representation) so the UI remains responsive.
-You can also override the ``clear`` method and use it when clearing results for given dags and tasks. This allows the custom XCom backend process the data lifecycle easier.
+You can also override the ``clear`` method and use it when clearing results for given dags and tasks. This allows the custom XCom backend to process the data lifecycle easier.
+
+Working with Custom XCom Backends in Containers
+-----------------------------------------------
+
+Depending on where Airflow is deployed i.e., local, Docker, K8s, etc. it can be useful to be assured that a custom XCom backend is actually being initialized. For example, the complexity of the container environment can make it more difficult to determine if your backend is being loaded correctly during container deployment. Luckily the following guidance can be used to assist you in building confidence in your custom XCom implementation.
+
+Firstly, if you can exec into a terminal in the container then you should be able to do:
+
+.. code-block:: python
+
+ from airflow.models.xcom import XCom
+
+ print(XCom.__name__)
+
+which will print the actual class that is being used.
+
+You can also examine Airflow's configuration:
+
+.. code-block:: python
+
+ from airflow.settings import conf
+
+ conf.get("core", "xcom_backend")
+
+Working with Custom Backends in K8s via Helm
+--------------------------------------------
+
+Running custom XCom backends in K8s will introduce even more complexity to you Airflow deployment. Put simply, sometimes things go wrong which can be difficult to debug.
+
+For example, if you define a custom XCom backend in the Chart ``values.yaml`` (via the ``xcom_backend`` configuration) and Airflow fails to load the class, the entire Chart deployment will fail with each pod container attempting to restart time and time again.
+
+When deploying in K8s your custom XCom backend needs to be reside in a ``config`` directory otherwise it cannot be located during Chart deployment.
+
+An observed problem is that it is very difficult to acquire logs from the container because there is a very small window of availability where the trace can be obtained. The only way you can determine the root cause is if you are fortunate enough to query and acquire the container logs at the right time. This in turn prevents the entire Helm chart from deploying successfully.
[airflow] 19/43: fןס Broken link in api.rst (#21165)
Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ede6d8f1d5aa7d34368df6e03c0890fba3f20b9f
Author: fritz-astronomer <80...@users.noreply.github.com>
AuthorDate: Tue Feb 1 14:03:46 2022 -0500
fןס Broken link in api.rst (#21165)
Co-authored-by: eladkal <45...@users.noreply.github.com>
(cherry picked from commit 817bec0417b291326dfd760bd85439b3ba0a728d)
---
docs/apache-airflow/security/api.rst | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/docs/apache-airflow/security/api.rst b/docs/apache-airflow/security/api.rst
index d7de78a..48a5b5a 100644
--- a/docs/apache-airflow/security/api.rst
+++ b/docs/apache-airflow/security/api.rst
@@ -80,8 +80,7 @@ principal exists in the keytab file.
Basic authentication
''''''''''''''''''''
-`Basic username password authentication <https://tools.ietf.org/html/rfc7617
-https://en.wikipedia.org/wiki/Basic_access_authentication>`_ is currently
+`Basic username password authentication <https://en.wikipedia.org/wiki/Basic_access_authentication>`_ is currently
supported for the API. This works for users created through LDAP login or
within Airflow Metadata DB using password.