You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/11/03 21:29:59 UTC

[airflow] branch v2-2-test updated (f3d06b1 -> 8151307)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard f3d06b1  Bugfix: Check next run exists before reading data interval (#19307)
     new 848ac3f  Bump version to 2.2.2
     new 8666bf0  Add explicit session parameter in PoolSlotsAvailableDep (#18875)
     new 46bf6b4  Fix typo in ``tutorial.rst`` (#18983)
     new 901901a  Use ``execution_date`` to check for existing ``DagRun`` for ``TriggerDagRunOperator`` (#18968)
     new 9a60e62  Add Note to SLA regarding schedule_interval (#19173)
     new 7a3212d  Fix Toggle Wrap on DAG code page (#19211)
     new d291f76  sqlite_default has been hard-coded to /tmp, usegettempdir instead, (#19255)
     new 7a14324  Fix hidden tooltip position (#19261)
     new 44caa7e  Fix MySQL db migration with default encoding/collation (#19268)
     new 06c1cea  Bugfix: Check next run exists before reading data interval (#19307)
     new f9b48bb  Switch default Python version to 3.7 (#19317)
     new a0934d1  Clarify dag-not-found error message (#19338)
     new fbb7fbd  Improve Kubernetes Executor docs (#19339)
     new ee532d9  Docs: Fix typo in ``dag-run.rst`` (#19340)
     new 34768a8  Fix message on "Mark as" confirmation page (#19363)
     new 9b01467  Only mark SchedulerJobs as failed, not any jobs (#19375)
     new 8151307  Fix downgrade for a DB Migration (#19390)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f3d06b1)
            \
             N -- N -- N   refs/heads/v2-2-test (8151307)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 README.md                                          |  14 +-
 airflow/api/common/experimental/trigger_dag.py     |   6 +-
 airflow/dag_processing/processor.py                |  25 ++--
 airflow/jobs/scheduler_job.py                      |   1 +
 .../7b2661a43ba3_taskinstance_keyed_to_dagrun.py   |   9 +-
 airflow/models/dagrun.py                           |  65 +++++---
 airflow/ti_deps/deps/pool_slots_available_dep.py   |   2 +-
 airflow/utils/cli.py                               |   6 +-
 airflow/utils/db.py                                |   3 +-
 airflow/www/static/css/main.css                    |   9 ++
 airflow/www/static/js/dag_code.js                  |  12 +-
 airflow/www/templates/airflow/_messages.html       |   4 +-
 airflow/www/templates/airflow/dags.html            |  10 +-
 airflow/www/templates/airflow/main.html            |   6 +-
 airflow/www/templates/appbuilder/flash.html        |   2 +-
 docs/apache-airflow/concepts/tasks.rst             |   4 +
 docs/apache-airflow/dag-run.rst                    |   4 +-
 docs/apache-airflow/executor/kubernetes.rst        | 166 +++++++++++++--------
 docs/apache-airflow/tutorial.rst                   |   2 +-
 docs/spelling_wordlist.txt                         |   1 +
 scripts/ci/libraries/_initialization.sh            |   2 +-
 setup.py                                           |   2 +-
 tests/api/common/experimental/test_trigger_dag.py  |   6 +-
 tests/jobs/test_local_task_job.py                  |   2 +-
 tests/jobs/test_scheduler_job.py                   |  31 ++++
 tests/models/test_dagrun.py                        |  23 +++
 26 files changed, 284 insertions(+), 133 deletions(-)

[airflow] 12/17: Clarify dag-not-found error message (#19338)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a0934d154a1754af76e7aafb9a932b866caab450
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Mon Nov 1 09:22:23 2021 -0700

    Clarify dag-not-found error message (#19338)
    
    In this context, what's really happening is, we can't find the dag.  From a user
    perspective, when you encounter this error, 'could not find the dag' is
    a more intuitive representation of the problem than 'could not find the dag_id'.
    
    (cherry picked from commit 0be26b47439de11935b8b8b246a4cad35fbb8659)
---
 airflow/utils/cli.py | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 6415676..db58b88 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -177,8 +177,7 @@ def get_dag_by_file_location(dag_id: str):
     dag_model = DagModel.get_current(dag_id)
     if dag_model is None:
         raise AirflowException(
-            'dag_id could not be found: {}. Either the dag did not exist or it failed to '
-            'parse.'.format(dag_id)
+            f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
         )
     dagbag = DagBag(dag_folder=dag_model.fileloc)
     return dagbag.dags[dag_id]
@@ -191,8 +190,7 @@ def get_dag(subdir: Optional[str], dag_id: str) -> "DAG":
     dagbag = DagBag(process_subdir(subdir))
     if dag_id not in dagbag.dags:
         raise AirflowException(
-            'dag_id could not be found: {}. Either the dag did not exist or it failed to '
-            'parse.'.format(dag_id)
+            f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
         )
     return dagbag.dags[dag_id]
 

[airflow] 14/17: Docs: Fix typo in ``dag-run.rst`` (#19340)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ee532d9f7f281cc63f63ffafb4f63a9d2a26e047
Author: axxe <ax...@users.noreply.github.com>
AuthorDate: Mon Nov 1 16:38:19 2021 +0100

    Docs: Fix typo in ``dag-run.rst`` (#19340)
    
    (cherry picked from commit d810eaf2d176282f57cd35a1fb97a0604a0cfd24)
---
 docs/apache-airflow/dag-run.rst | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst
index 662ae1b..d4d2b77 100644
--- a/docs/apache-airflow/dag-run.rst
+++ b/docs/apache-airflow/dag-run.rst
@@ -76,7 +76,7 @@ of a DAG run, for example, denotes the start of the data interval, not when the
 DAG is actually executed.
 
 Similarly, since the ``start_date`` argument for the DAG and its tasks points to
-the same logical date, it marks the start of *the DAG's fist data interval*, not
+the same logical date, it marks the start of *the DAG's first data interval*, not
 when tasks in the DAG will start running. In other words, a DAG run will only be
 scheduled one interval after ``start_date``.
 
@@ -151,7 +151,7 @@ if your DAG performs catchup internally.
 
 Backfill
 ---------
-There can be the case when you may want to run the dag for a specified historical period e.g.,
+There can be the case when you may want to run the DAG for a specified historical period e.g.,
 A data filling DAG is created with ``start_date`` **2019-11-21**, but another user requires the output data from a month ago i.e., **2019-10-21**.
 This process is known as Backfill.
 

[airflow] 17/17: Fix downgrade for a DB Migration (#19390)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 815130724f6dc78359153f3643088408be30e0cb
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Wed Nov 3 18:30:03 2021 +0000

    Fix downgrade for a DB Migration (#19390)
    
    The downgrade was not working because of the issues fixed in this PR
    
    (cherry picked from commit a373ca347bb2e6308f1b91d2a6a0ae0cf1d39332)
---
 .../versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py           | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
index 91acf5e..dd27840 100644
--- a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
+++ b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
@@ -349,12 +349,12 @@ def downgrade():
         batch_op.drop_index('idx_task_reschedule_dag_task_run')
 
     with op.batch_alter_table('task_instance', schema=None) as batch_op:
+        batch_op.drop_constraint('task_instance_pkey', type_='primary')
         batch_op.alter_column('execution_date', existing_type=dt_type, existing_nullable=True, nullable=False)
         batch_op.alter_column(
             'dag_id', existing_type=string_id_col_type, existing_nullable=True, nullable=True
         )
 
-        batch_op.drop_constraint('task_instance_pkey', type_='primary')
         batch_op.create_primary_key('task_instance_pkey', ['dag_id', 'task_id', 'execution_date'])
 
         batch_op.drop_constraint('task_instance_dag_run_fkey', type_='foreignkey')
@@ -418,11 +418,11 @@ def downgrade():
         )
     else:
         with op.batch_alter_table('dag_run', schema=None) as batch_op:
-            batch_op.drop_index('dag_id_state', table_name='dag_run')
+            batch_op.drop_index('dag_id_state')
             batch_op.alter_column('run_id', existing_type=sa.VARCHAR(length=250), nullable=True)
             batch_op.alter_column('execution_date', existing_type=dt_type, nullable=True)
             batch_op.alter_column('dag_id', existing_type=sa.VARCHAR(length=250), nullable=True)
-            batch_op.create_index('dag_id_state', 'dag_run', ['dag_id', 'state'], unique=False)
+            batch_op.create_index('dag_id_state', ['dag_id', 'state'], unique=False)
 
 
 def _multi_table_update(dialect_name, target, column):

[airflow] 08/17: Fix hidden tooltip position (#19261)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7a1432400de5a6527669f3b3f6ba55de6170a7af
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Nov 2 11:09:17 2021 -0600

    Fix hidden tooltip position (#19261)
    
    Only apply a large z-index when the tooltip is supposed to be display.
    
    (cherry picked from commit 37767c1ba05845266668c84dec7f9af967139f42)
---
 airflow/www/static/css/main.css | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/airflow/www/static/css/main.css b/airflow/www/static/css/main.css
index baf3584..ad13ac6 100644
--- a/airflow/www/static/css/main.css
+++ b/airflow/www/static/css/main.css
@@ -454,3 +454,12 @@ label[for="timezone-other"],
   color: #262626;
   background-color: #f5f5f5;
 }
+
+.tooltip {
+  z-index: 0;
+}
+
+.tooltip.in,
+.tooltip.d3-tip {
+  z-index: 1070;
+}

[airflow] 09/17: Fix MySQL db migration with default encoding/collation (#19268)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 44caa7ee5a98dafa5879a2560d8a93bc49601f03
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Thu Oct 28 12:08:04 2021 -0600

    Fix MySQL db migration with default encoding/collation (#19268)
    
    (cherry picked from commit e76a69b7b14140d0f822e49a3edf4dec6c2780ab)
---
 .../migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py   | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
index 4676f4a..91acf5e 100644
--- a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
+++ b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
@@ -198,6 +198,9 @@ def upgrade():
         if dialect_name == "mysql":
             # Mysql creates an index and a constraint -- we have to drop both
             batch_op.drop_index('task_reschedule_dag_task_date_fkey')
+            batch_op.alter_column(
+                'dag_id', existing_type=sa.String(length=ID_LEN), type_=string_id_col_type, nullable=False
+            )
         batch_op.drop_index('idx_task_reschedule_dag_task_date')
 
     # Then update the new column by selecting the right value from DagRun

[airflow] 06/17: Fix Toggle Wrap on DAG code page (#19211)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7a3212d03543e25a2ec2be17d321811229583cd6
Author: PApostol <50...@users.noreply.github.com>
AuthorDate: Wed Nov 3 14:19:31 2021 +0000

    Fix Toggle Wrap on DAG code page (#19211)
    
    Co-authored-by: Brent Bovenzi <br...@gmail.com>
    (cherry picked from commit a1632edac783878cb82d9099f4f973c9a10b0d0f)
---
 airflow/www/static/js/dag_code.js | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/airflow/www/static/js/dag_code.js b/airflow/www/static/js/dag_code.js
index 701e87e..d9112ee 100644
--- a/airflow/www/static/js/dag_code.js
+++ b/airflow/www/static/js/dag_code.js
@@ -17,13 +17,19 @@
  * under the License.
  */
 
-/* global $ */
+/* global window, $ */
 
 import getMetaValue from './meta_value';
 
+function toggleWrap() {
+  $('.code pre').toggleClass('wrap');
+}
+
 const isWrapped = getMetaValue('wrapped');
 
 // pygments generates the HTML so set wrap toggle via js
-if (isWrapped) {
-  $('.code pre').toggleClass('wrap');
+if (isWrapped === 'True') {
+  toggleWrap();
 }
+
+window.toggleWrap = toggleWrap;

[airflow] 01/17: Bump version to 2.2.2

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 848ac3f38047db4977581ef5984f30f0814eada8
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Wed Nov 3 14:39:33 2021 -0600

    Bump version to 2.2.2
---
 README.md | 14 +++++++-------
 setup.py  |  2 +-
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/README.md b/README.md
index b94a982..74c0524 100644
--- a/README.md
+++ b/README.md
@@ -82,7 +82,7 @@ Airflow is not a streaming solution, but it is often used to process real-time d
 
 Apache Airflow is tested with:
 
-|                      | Main version (dev)        | Stable version (2.2.1)   |
+|                      | Main version (dev)        | Stable version (2.2.2)   |
 | -------------------- | ------------------------- | ------------------------ |
 | Python               | 3.6, 3.7, 3.8, 3.9        | 3.6, 3.7, 3.8, 3.9       |
 | Kubernetes           | 1.18, 1.19, 1.20          | 1.18, 1.19, 1.20         |
@@ -144,15 +144,15 @@ them to the appropriate format and workflow that your tool requires.
 
 
 ```bash
-pip install 'apache-airflow==2.2.1' \
- --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.1/constraints-3.7.txt"
+pip install 'apache-airflow==2.2.2' \
+ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.2/constraints-3.7.txt"
 ```
 
 2. Installing with extras (i.e., postgres, google)
 
 ```bash
-pip install 'apache-airflow[postgres,google]==2.2.1' \
- --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.1/constraints-3.7.txt"
+pip install 'apache-airflow[postgres,google]==2.2.2' \
+ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.2/constraints-3.7.txt"
 ```
 
 For information on installing provider packages, check
@@ -254,7 +254,7 @@ Apache Airflow version life cycle:
 
 | Version | Current Patch/Minor | State     | First Release | Limited Support | EOL/Terminated |
 |---------|---------------------|-----------|---------------|-----------------|----------------|
-| 2       | 2.2.1               | Supported | Dec 17, 2020  | TBD             | TBD            |
+| 2       | 2.2.2               | Supported | Dec 17, 2020  | TBD             | TBD            |
 | 1.10    | 1.10.15             | EOL       | Aug 27, 2018  | Dec 17, 2020    | June 17, 2021  |
 | 1.9     | 1.9.0               | EOL       | Jan 03, 2018  | Aug 27, 2018    | Aug 27, 2018   |
 | 1.8     | 1.8.2               | EOL       | Mar 19, 2017  | Jan 03, 2018    | Jan 03, 2018   |
@@ -280,7 +280,7 @@ They are based on the official release schedule of Python and Kubernetes, nicely
 
 2. The "oldest" supported version of Python/Kubernetes is the default one. "Default" is only meaningful
    in terms of "smoke tests" in CI PRs, which are run using this default version and the default reference
-   image available. Currently `apache/airflow:latest` and `apache/airflow:2.2.1` images
+   image available. Currently `apache/airflow:latest` and `apache/airflow:2.2.2` images
    are both Python 3.6 images. However, the first MINOR/MAJOR release of Airflow release after 23.12.2021 will
    become Python 3.7 images.
 
diff --git a/setup.py b/setup.py
index 9a4b2e2..101815b 100644
--- a/setup.py
+++ b/setup.py
@@ -41,7 +41,7 @@ PY39 = sys.version_info >= (3, 9)
 
 logger = logging.getLogger(__name__)
 
-version = '2.2.1'
+version = '2.2.2'
 
 my_dir = dirname(__file__)
 

[airflow] 11/17: Switch default Python version to 3.7 (#19317)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f9b48bb3bd31127d8ba5f7d5a9118994db1567dd
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Oct 30 01:16:06 2021 +0100

    Switch default Python version to 3.7 (#19317)
    
    Continuation of https://github.com/apache/airflow/pull/18922
    
    This case was missed and is needed to have correct image tagged for Dockerfiles
    
    (cherry picked from commit 98d906743689c4e0068db7a8b0d10f2486638a3b)
---
 scripts/ci/libraries/_initialization.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/scripts/ci/libraries/_initialization.sh b/scripts/ci/libraries/_initialization.sh
index 026795e..817727b 100644
--- a/scripts/ci/libraries/_initialization.sh
+++ b/scripts/ci/libraries/_initialization.sh
@@ -619,7 +619,7 @@ function initialization::initialize_common_environment() {
 
 function initialization::set_default_python_version_if_empty() {
     # default version of python used to tag the "main" and "latest" images in DockerHub
-    export DEFAULT_PYTHON_MAJOR_MINOR_VERSION=3.6
+    export DEFAULT_PYTHON_MAJOR_MINOR_VERSION=3.7
 
     # default python Major/Minor version
     export PYTHON_MAJOR_MINOR_VERSION=${PYTHON_MAJOR_MINOR_VERSION:=${DEFAULT_PYTHON_MAJOR_MINOR_VERSION}}

[airflow] 13/17: Improve Kubernetes Executor docs (#19339)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fbb7fbd30895eb6fac32a9a7dacbd904a9d348c9
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Wed Nov 3 05:48:07 2021 -0700

    Improve Kubernetes Executor docs (#19339)
    
    (cherry picked from commit eace4102b68e4964b47f2d8c555f65ceaf0a3690)
---
 docs/apache-airflow/executor/kubernetes.rst | 166 +++++++++++++++++-----------
 docs/spelling_wordlist.txt                  |   1 +
 2 files changed, 102 insertions(+), 65 deletions(-)

diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst
index 00a4b5a..923cf22 100644
--- a/docs/apache-airflow/executor/kubernetes.rst
+++ b/docs/apache-airflow/executor/kubernetes.rst
@@ -21,75 +21,91 @@
 Kubernetes Executor
 ===================
 
-The kubernetes executor is introduced in Apache Airflow 1.10.0. The Kubernetes executor will create a new pod for every task instance.
+The Kubernetes executor runs each task instance in its own pod on a Kubernetes cluster.
 
-Example kubernetes files are available at ``scripts/in_container/kubernetes/app/{secrets,volumes,postgres}.yaml`` in the source distribution (please note that these examples are not ideal for production environments).
-The volumes are optional and depend on your configuration. There are two volumes available:
+KubernetesExecutor runs as a process in the Airflow Scheduler. The scheduler itself does
+not necessarily need to be running on Kubernetes, but does need access to a Kubernetes cluster.
 
-- **Dags**:
+KubernetesExecutor requires a non-sqlite database in the backend.
+
+When a DAG submits a task, the KubernetesExecutor requests a worker pod from the Kubernetes API. The worker pod then runs the task, reports the result, and terminates.
 
-  - By storing dags onto persistent disk, it will be made available to all workers
+.. image:: ../img/arch-diag-kubernetes.png
 
-  - Another option is to use ``git-sync``. Before starting the container, a git pull of the dags repository will be performed and used throughout the lifecycle of the pod
 
-- **Logs**:
+One example of an Airflow deployment running on a distributed set of five nodes in a Kubernetes cluster is shown below.
 
-  - By storing logs onto a persistent disk, the files are accessible by workers and the webserver. If you don't configure this, the logs will be lost after the worker pods shuts down
+.. image:: ../img/arch-diag-kubernetes2.png
 
-  - Another option is to use S3/GCS/etc to store logs
+Consistent with the regular Airflow architecture, the Workers need access to the DAG files to execute the tasks within those DAGs and interact with the Metadata repository. Also, configuration information specific to the Kubernetes Executor, such as the worker namespace and image information, needs to be specified in the Airflow Configuration file.
 
-To troubleshoot issue with KubernetesExecutor, you can use ``airflow kubernetes generate-dag-yaml`` command.
-This command generates the pods as they will be launched in Kubernetes and dumps them into yaml files for you to inspect.
+Additionally, the Kubernetes Executor enables specification of additional features on a per-task basis using the Executor config.
+
+.. @startuml
+.. Airflow_Scheduler -> Kubernetes: Request a new pod with command "airflow run..."
+.. Kubernetes -> Airflow_Worker: Create Airflow worker with command "airflow run..."
+.. Airflow_Worker -> Airflow_DB: Report task passing or failure to DB
+.. Airflow_Worker -> Kubernetes: Pod completes with state "Succeeded" and k8s records in ETCD
+.. Kubernetes -> Airflow_Scheduler: Airflow scheduler reads "Succeeded" from k8s watcher thread
+.. @enduml
+.. image:: ../img/k8s-happy-path.png
+
+Configuration
+-------------
 
 .. _concepts:pod_template_file:
 
 pod_template_file
-#################
+~~~~~~~~~~~~~~~~~
+
+To customize the pod used for k8s executor worker processes, you may create a pod template file. You must provide
+the path to the template file in the ``pod_template_file`` option in the ``kubernetes`` section of ``airflow.cfg``.
+
+Airflow has two strict requirements for pod template files: base image and pod name.
+
+Base image
+^^^^^^^^^^
+
+A ``pod_template_file`` must have a container named ``base`` at the ``spec.containers[0]`` position, and
+its ``image`` must be specified.
 
-As of Airflow 1.10.12, you can now use the ``pod_template_file`` option in the ``kubernetes`` section
-of the ``airflow.cfg`` file to form the basis of your KubernetesExecutor pods. This process is faster to execute
-and easier to modify.
+You are free to create sidecar containers after this required container, but Airflow assumes that the
+airflow worker container exists at the beginning of the container array, and assumes that the
+container is named ``base``.
 
-We include multiple examples of working pod operators below, but we would also like to explain a few necessary components
-if you want to customize your template files. As long as you have these components, every other element
-in the template is customizable.
+.. note::
 
-1. Airflow will overwrite the base container image and the pod name
+    Airflow may override the base container ``image``, e.g. through :ref:`pod_override <concepts:pod_override>`
+    configuration; but it must be present in the template file and must not be blank.
 
-There are two points where Airflow potentially overwrites the base image: in the ``airflow.cfg``
-or the ``pod_override`` (discussed below) setting. This value is overwritten to ensure that users do
-not need to update multiple template files every time they upgrade their docker image. The other field
-that Airflow overwrites is the ``pod.metadata.name`` field. This field has to be unique across all pods,
-so we generate these names dynamically before launch.
+Pod name
+^^^^^^^^
 
-It's important to note while Airflow overwrites these fields, they **can not be left blank**.
-If these fields do not exist, kubernetes can not load the yaml into a Kubernetes V1Pod.
+The pod's ``metadata.name`` must be set in the template file.  This field will *always* be set dynamically at
+pod launch to guarantee uniqueness across all pods. But again, it must be included in the template, and cannot
+be left blank.
 
-2. Each Airflow ``pod_template_file`` must have a container named "base" at the ``pod.spec.containers[0]`` position
 
-Airflow uses the ``pod_template_file`` by making certain assumptions about the structure of the template.
-When airflow creates the worker pod's command, it assumes that the airflow worker container part exists
-at the beginning of the container array. It then assumes that the container is named ``base``
-when it merges this pod with internal configs. You are more than welcome to create
-sidecar containers after this required container.
+Example pod templates
+~~~~~~~~~~~~~~~~~~~~~
 
 With these requirements in mind, here are some examples of basic ``pod_template_file`` YAML files.
 
-pod_template_file using the ``dag_in_image`` setting:
+Storing DAGs in the image:
 
 .. exampleinclude:: /../../airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml
     :language: yaml
     :start-after: [START template_with_dags_in_image]
     :end-before: [END template_with_dags_in_image]
 
-``pod_template_file`` which stores DAGs in a ``persistentVolume``:
+Storing DAGs in a ``persistentVolume``:
 
 .. exampleinclude:: /../../airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
     :language: yaml
     :start-after: [START template_with_dags_in_volume]
     :end-before: [END template_with_dags_in_volume]
 
-``pod_template_file`` which pulls DAGs from git:
+Pulling DAGs from ``git``:
 
 .. exampleinclude:: /../../airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml
     :language: yaml
@@ -99,7 +115,7 @@ pod_template_file using the ``dag_in_image`` setting:
 .. _concepts:pod_override:
 
 pod_override
-############
+~~~~~~~~~~~~
 
 When using the KubernetesExecutor, Airflow offers the ability to override system defaults on a per-task basis.
 To utilize this functionality, create a Kubernetes V1pod object and fill in your desired overrides.
@@ -135,49 +151,70 @@ Here is an example of a task with both features:
     :start-after: [START task_with_template]
     :end-before: [END task_with_template]
 
-KubernetesExecutor Architecture
-################################
+Managing dags and logs
+~~~~~~~~~~~~~~~~~~~~~~
 
-The KubernetesExecutor runs as a process in the Scheduler that only requires access to the Kubernetes API (it does *not* need to run inside of a Kubernetes cluster). The KubernetesExecutor requires a non-sqlite database in the backend, but there are no external brokers or persistent workers needed.
-For these reasons, we recommend the KubernetesExecutor for deployments have long periods of dormancy between DAG execution.
+Use of persistent volumes is optional and depends on your configuration.
 
-When a DAG submits a task, the KubernetesExecutor requests a worker pod from the Kubernetes API. The worker pod then runs the task, reports the result, and terminates.
+- **Dags**:
 
+To get the DAGs into the workers, you can:
 
-.. image:: ../img/arch-diag-kubernetes.png
+  - Include dags in the image.
+  - Use ``git-sync`` which, before starting the worker container, will run a ``git pull`` of the dags repository.
+  - Storing dags on a persistent volume, which can be mounted on all workers.
 
+- **Logs**:
 
-In contrast to the Celery Executor, the Kubernetes Executor does not require additional components such as Redis and Flower, but does require the Kubernetes infrastructure.
+To get task logs out of the workers, you can:
 
-One example of an Airflow deployment running on a distributed set of five nodes in a Kubernetes cluster is shown below.
+  - Use a persistent volume mounted on both the webserver and workers.
 
-.. image:: ../img/arch-diag-kubernetes2.png
+  - Enable remote logging.
 
-The Kubernetes Executor has an advantage over the Celery Executor in that Pods are only spun up when required for task execution compared to the Celery Executor where the workers are statically configured and are running all the time, regardless of workloads. However, this could be a disadvantage depending on the latency needs, since a task takes longer to start using the Kubernetes Executor, since it now includes the Pod startup time.
+.. note::
 
-Consistent with the regular Airflow architecture, the Workers need access to the DAG files to execute the tasks within those DAGs and interact with the Metadata repository. Also, configuration information specific to the Kubernetes Executor, such as the worker namespace and image information, needs to be specified in the Airflow Configuration file.
+    If you don't enable logging persistence, and if you have not enabled remote logging, logs will be lost after the worker pods shut down.
 
-Additionally, the Kubernetes Executor enables specification of additional features on a per-task basis using the Executor config.
 
+Comparison with CeleryExecutor
+------------------------------
 
+In contrast to CeleryExecutor, KubernetesExecutor does not require additional components such as Redis and Flower, but does require access to Kubernetes cluster.
 
-.. @startuml
-.. Airflow_Scheduler -> Kubernetes: Request a new pod with command "airflow run..."
-.. Kubernetes -> Airflow_Worker: Create Airflow worker with command "airflow run..."
-.. Airflow_Worker -> Airflow_DB: Report task passing or failure to DB
-.. Airflow_Worker -> Kubernetes: Pod completes with state "Succeeded" and k8s records in ETCD
-.. Kubernetes -> Airflow_Scheduler: Airflow scheduler reads "Succeeded" from k8s watcher thread
-.. @enduml
-.. image:: ../img/k8s-happy-path.png
+With KubernetesExecutor, each task runs in its own pod. The pod is created when the task is queued, and terminates when the task completes.
+Historically, in scenarios such as burstable workloads, this presented a resource utilization advantage over CeleryExecutor, where you needed
+a fixed number of long-running celery worker pods, whether or not there were tasks to run.
+
+However, the :doc:`official Apache Airflow Helm chart <helm-chart:index>` can automatically scale celery workers down to zero based on the number of tasks in the queue,
+so when using the official chart, this is no longer an advantage.
 
+With Celery workers you will tend to have less task latency because the worker pod is already up and running when the task is queued. On the
+other hand, because multiple tasks are running in the same pod, with Celery you may have to be more mindful about resource utilization
+in your task design, particularly memory consumption.
+
+One scenario where KubernetesExecutor can be helpful is if you have long-running tasks, because if you deploy while a task is running,
+the task will keep running until it completes (or times out, etc). But with CeleryExecutor, provided you have set a grace period, the
+task will only keep running up until the grace period has elapsed, at which time the task will be terminated.  Another scenario where
+KubernetesExecutor can work well is when your tasks are not very uniform with respect to resource requirements or images.
+
+Finally, note that it does not have to be either-or; with CeleryKubernetesExecutor, it is possible to use both CeleryExecutor and
+KubernetesExecutor simultaneously on the same cluster. CeleryKubernetesExecutor will look at a task's ``queue`` to determine
+whether to run on Celery or Kubernetes.  By default, tasks are sent to Celery workers, but if you want a task to run using KubernetesExecutor,
+you send it to the  ``kubernetes`` queue and it will run in its own pod.  And KubernetesPodOperator can be used
+to similar effect, no matter what executor you are using.
 
-***************
 Fault Tolerance
-***************
+---------------
+
+.. tip::
+
+    To troubleshoot issues with KubernetesExecutor, you can use ``airflow kubernetes generate-dag-yaml`` command.
+    This command generates the pods as they will be launched in Kubernetes and dumps them into yaml files for you to inspect.
+
 
-===========================
 Handling Worker Pod Crashes
-===========================
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 When dealing with distributed systems, we need a system that assumes that any component can crash at any moment for reasons ranging from OOM errors to node upgrades.
 
@@ -201,13 +238,12 @@ A Kubernetes watcher is a thread that can subscribe to every change that occurs
 By monitoring this stream, the KubernetesExecutor can discover that the worker crashed and correctly report the task as failed.
 
 
-=====================================================
 But What About Cases Where the Scheduler Pod Crashes?
-=====================================================
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-In cases of scheduler crashes, we can completely rebuild the state of the scheduler using the watcher's ``resourceVersion``.
+In cases of scheduler crashes, the scheduler will recover its state using the watcher's ``resourceVersion``.
 
-When monitoring the Kubernetes cluster's watcher thread, each event has a monotonically rising number called a resourceVersion.
-Every time the executor reads a resourceVersion, the executor stores the latest value in the backend database.
+When monitoring the Kubernetes cluster's watcher thread, each event has a monotonically rising number called a ``resourceVersion``.
+Every time the executor reads a ``resourceVersion``, the executor stores the latest value in the backend database.
 Because the resourceVersion is stored, the scheduler can restart and continue reading the watcher stream from where it left off.
 Since the tasks are run independently of the executor and report results directly to the database, scheduler failures will not lead to task failures or re-runs.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 39b88c7..c0efd04 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -505,6 +505,7 @@ bq
 bugfix
 bugfixes
 buildType
+burstable
 bytestring
 cacert
 callables

[airflow] 10/17: Bugfix: Check next run exists before reading data interval (#19307)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 06c1cea45bb0f94189b2f3488884d1b566b08fb2
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Nov 2 03:53:37 2021 +0800

    Bugfix: Check next run exists before reading data interval (#19307)
    
    Fix #19304, and also an issue on scheduling a DAG's first-ever run introduced in #18897. We could fix it outside this function, but if `next_dagrun` is None, the next run's data interval is supposed to be None in the first place, so checking inside this function just makes sense.
    
    closes https://github.com/apache/airflow/issues/19343
    closes https://github.com/apache/airflow/issues/19304
    
    (cherry picked from commit dc4dcaa9ccbec6a1b1ce84d5ee42322ce1fbb081)
---
 airflow/models/dag.py    | 13 ++++++++-----
 tests/models/test_dag.py | 37 +++++++++++++++++++++++++++++++++++++
 2 files changed, 45 insertions(+), 5 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 55be0ec..d8fa1d0 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -607,12 +607,12 @@ class DAG(LoggingMixin):
             return None
         return self.timetable._get_prev(timezone.coerce_datetime(dttm))
 
-    def get_next_data_interval(self, dag_model: "DagModel") -> DataInterval:
+    def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval]:
         """Get the data interval of the next scheduled run.
 
         For compatibility, this method infers the data interval from the DAG's
-        schedule if the run does not have an explicit one set, which is possible for
-        runs created prior to AIP-39.
+        schedule if the run does not have an explicit one set, which is possible
+        for runs created prior to AIP-39.
 
         This function is private to Airflow core and should not be depended as a
         part of the Python API.
@@ -621,11 +621,14 @@ class DAG(LoggingMixin):
         """
         if self.dag_id != dag_model.dag_id:
             raise ValueError(f"Arguments refer to different DAGs: {self.dag_id} != {dag_model.dag_id}")
+        if dag_model.next_dagrun is None:  # Next run not scheduled.
+            return None
         data_interval = dag_model.next_dagrun_data_interval
         if data_interval is not None:
             return data_interval
-        # Compatibility: runs scheduled before AIP-39 implementation don't have an
-        # explicit data interval. Try to infer from the logical date.
+        # Compatibility: A run was scheduled without an explicit data interval.
+        # This means the run was scheduled before AIP-39 implementation. Try to
+        # infer from the logical date.
         return self.infer_automated_data_interval(dag_model.next_dagrun)
 
     def get_run_data_interval(self, run: DagRun) -> DataInterval:
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 5bea2b7..efdff89 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2234,3 +2234,40 @@ def test_iter_dagrun_infos_between_error(caplog):
         ),
     ]
     assert caplog.records[0].exc_info is not None, "should contain exception context"
+
+
+@pytest.mark.parametrize(
+    "logical_date, data_interval_start, data_interval_end, expected_data_interval",
+    [
+        pytest.param(None, None, None, None, id="no-next-run"),
+        pytest.param(
+            DEFAULT_DATE,
+            DEFAULT_DATE,
+            DEFAULT_DATE + timedelta(days=2),
+            DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=2)),
+            id="modern",
+        ),
+        pytest.param(
+            DEFAULT_DATE,
+            None,
+            None,
+            DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=1)),
+            id="legacy",
+        ),
+    ],
+)
+def test_get_next_data_interval(
+    logical_date,
+    data_interval_start,
+    data_interval_end,
+    expected_data_interval,
+):
+    dag = DAG(dag_id="test_get_next_data_interval", schedule_interval="@daily")
+    dag_model = DagModel(
+        dag_id="test_get_next_data_interval",
+        next_dagrun=logical_date,
+        next_dagrun_data_interval_start=data_interval_start,
+        next_dagrun_data_interval_end=data_interval_end,
+    )
+
+    assert dag.get_next_data_interval(dag_model) == expected_data_interval

[airflow] 03/17: Fix typo in ``tutorial.rst`` (#18983)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 46bf6b41e7d1388cf5bc8800a90dd5bd10decf3a
Author: gavinBeam <92...@users.noreply.github.com>
AuthorDate: Thu Oct 14 17:08:26 2021 +0100

    Fix typo in ``tutorial.rst`` (#18983)
    
    (cherry picked from commit 2ba722db4857db2881ee32c1b2e9330bc7163535)
---
 docs/apache-airflow/tutorial.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index c541d0e..443242a 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -489,7 +489,7 @@ Lets look at our DAG:
           cur = conn.cursor()
           with open("/usr/local/airflow/dags/files/employees.csv", "r") as file:
               cur.copy_from(
-                  f,
+                  file,
                   "Employees_temp",
                   columns=[
                       "Serial Number",

[airflow] 15/17: Fix message on "Mark as" confirmation page (#19363)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 34768a80950d91ff7174a9208ed3b359ef9feb74
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Nov 3 10:40:20 2021 +0000

    Fix message on "Mark as" confirmation page (#19363)
    
    In an earlier refactor I created a macro called `message` which
    "stomped" on the variable of the same name set in the view, meaning the
    page shows `<Macro message>` instead of the string we meant to set.
    
    This "fixes" it by using a less-likely-to-clash name for the macro (and
    fixing the typo in `dismissible` parameter.)
    
    (cherry picked from commit 270915384dbb5f78ff397bc3e94f8704ac2e5c4e)
---
 airflow/www/templates/airflow/_messages.html |  4 ++--
 airflow/www/templates/airflow/dags.html      | 10 +++++-----
 airflow/www/templates/airflow/main.html      |  6 +++---
 airflow/www/templates/appbuilder/flash.html  |  2 +-
 4 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/airflow/www/templates/airflow/_messages.html b/airflow/www/templates/airflow/_messages.html
index 56fa505..29a3a56 100644
--- a/airflow/www/templates/airflow/_messages.html
+++ b/airflow/www/templates/airflow/_messages.html
@@ -16,9 +16,9 @@
  specific language governing permissions and limitations
  under the License.
 #}
-{%- macro message(content, category='info', dismissable=true) -%}
+{%- macro show_message(content, category='info', dismissible=true) -%}
   <div class="alert alert-{{ category }}">
-    {%- if dismissable -%}
+    {%- if dismissible -%}
     <button type="button" class="close" data-dismiss="alert">&times;</button>
     {%- endif -%}
     {%- if caller is defined -%}
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 5d5d140..1f2f1d5 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -19,7 +19,7 @@
 
 {% extends base_template %}
 {% from 'appbuilder/loading_dots.html' import loading_dots %}
-{% from 'airflow/_messages.html' import message %}
+{% from 'airflow/_messages.html' import show_message %}
 
 {% block page_title %}
   {% if search_query %}"{{ search_query }}" - {% endif %}DAGs - {{ appbuilder.app_name }}
@@ -49,10 +49,10 @@
 
 {% block messages %}
   {% for m in dashboard_alerts %}
-    {{ message(m.message, m.category) }}
+    {{ show_message(m.message, m.category) }}
   {% endfor %}
   {% for original_table_name, moved_table_name in migration_moved_data_alerts %}
-    {% call message(category='error', dismissable=false) %}
+    {% call show_message(category='error', dismissible=false) %}
       Airflow found incompatible data in the <code>{{ original_table_name }}</code> table in the
       metadatabase, and has moved them to <code>{{ moved_table_name }}</code> during the database migration
       to upgrade. Please inspect the moved data to decide whether you need to keep them, and manually drop
@@ -61,14 +61,14 @@
   {% endfor %}
   {{ super() }}
   {% if sqlite_warning | default(true) %}
-    {% call message(category='warning', dismissable=false)  %}
+    {% call show_message(category='warning', dismissible=false)  %}
       Do not use <b>SQLite</b> as metadata DB in production &#8211; it should only be used for dev/testing
       We recommend using Postgres or MySQL.
       <a href={{ get_docs_url("howto/set-up-database.html") }}><b>Click here</b></a> for more information.
     {% endcall %}
   {% endif %}
   {% if sequential_executor_warning | default(false) %}
-    {% call message(category='warning', dismissable=false)  %}
+    {% call show_message(category='warning', dismissible=false)  %}
       Do not use <b>SequentialExecutor</b> in production.
       <a href={{ get_docs_url("executor/index.html") }}><b>Click here</b></a> for more information.
     {% endcall %}
diff --git a/airflow/www/templates/airflow/main.html b/airflow/www/templates/airflow/main.html
index f9190b4..d57f5f0 100644
--- a/airflow/www/templates/airflow/main.html
+++ b/airflow/www/templates/airflow/main.html
@@ -18,7 +18,7 @@
 #}
 
 {% extends 'appbuilder/baselayout.html' %}
-{% from 'airflow/_messages.html' import message %}
+{% from 'airflow/_messages.html' import show_message %}
 
 {% block page_title -%}
   {% if title is defined -%}
@@ -52,7 +52,7 @@
 {% block messages %}
   {% include 'appbuilder/flash.html' %}
   {% if scheduler_job is defined and (not scheduler_job or not scheduler_job.is_alive()) %}
-    {% call message(category='warning', dismissable=false) %}
+    {% call show_message(category='warning', dismissible=false) %}
       <p>The scheduler does not appear to be running.
       {% if scheduler_job %}
       Last heartbeat was received
@@ -67,7 +67,7 @@
     {% endcall %}
   {% endif %}
   {% if triggerer_job is defined and (not triggerer_job or not triggerer_job.is_alive()) %}
-    {% call message(category='warning', dismissable=false) %}
+    {% call show_message(category='warning', dismissible=false) %}
       <p>The triggerer does not appear to be running.
       {% if triggerer_job %}
       Last heartbeat was received
diff --git a/airflow/www/templates/appbuilder/flash.html b/airflow/www/templates/appbuilder/flash.html
index 0f97c9c..fa1da77 100644
--- a/airflow/www/templates/appbuilder/flash.html
+++ b/airflow/www/templates/appbuilder/flash.html
@@ -35,7 +35,7 @@
         {{ dag_import_errors.append((category, m)) if dag_import_errors.append((category, m)) != None else '' }}
       {% elif not (request.path == appbuilder.get_url_for_login and 'access is denied' in m.lower()) %}
       {# Don't show 'Access is Denied' alert if user is logged out and on the login page. #}
-        {{ message(m, category) }}
+        {{ show_message(m, category) }}
       {% endif %}
     {% endfor %}
   {% endif %}

[airflow] 07/17: sqlite_default has been hard-coded to /tmp, usegettempdir instead, (#19255)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d291f76e39ad1441e26c045dbdabaa0900fefb9f
Author: Amit Ran <89...@users.noreply.github.com>
AuthorDate: Thu Oct 28 01:12:35 2021 +0300

    sqlite_default has been hard-coded to /tmp, usegettempdir instead, (#19255)
    
    respecting tempdir enviroment variables.
    See #19208, #19229.
    
    (cherry picked from commit bce888306f28d0c56c6a45c874a045fa304c90b7)
---
 airflow/utils/db.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 17522d3..2da0722 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -18,6 +18,7 @@
 import logging
 import os
 import time
+from tempfile import gettempdir
 from typing import Iterable
 
 from sqlalchemy import Table, exc, func, inspect, or_, text
@@ -508,7 +509,7 @@ def create_default_connections(session=None):
         Connection(
             conn_id="sqlite_default",
             conn_type="sqlite",
-            host="/tmp/sqlite_default.db",
+            host=os.path.join(gettempdir(), "sqlite_default.db"),
         ),
         session,
     )

[airflow] 04/17: Use ``execution_date`` to check for existing ``DagRun`` for ``TriggerDagRunOperator`` (#18968)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 901901a4d19e3fbe33b58881bf7e0b09e4982fed
Author: Gulshan Gill <gu...@gmail.com>
AuthorDate: Thu Nov 4 03:09:41 2021 +0800

    Use ``execution_date`` to check for existing ``DagRun`` for ``TriggerDagRunOperator`` (#18968)
    
    A small suggestion to change `DagRun.find` in `trigger_dag` to use `execution_date` as a parameter rather than `run_id`.
    
    I feel it would be better to use this rather than `run_id` as a parameter since using `run_id` will miss out checking for a scheduled run that ran at the same `execution_date` and throw the error below when it tries to create a new run with the same `execution_date`:
    
    ```
    sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dag_run_dag_id_execution_date_key"
    ```
    
    There is a constraint in `dag_run` called `dag_run_dag_id_execution_date_key` which can be found [here](https://github.com/apache/airflow/blob/c4f5233cd10ae03ee69fba861c8a6fa64e1f8a71/airflow/models/dagrun.py#L103).
    
    (cherry picked from commit e54ee6e0d38ca469be6ba686e32ce7a3a34d03ca)
---
 airflow/api/common/experimental/trigger_dag.py    |  6 ++-
 airflow/models/dagrun.py                          | 65 +++++++++++++++++------
 tests/api/common/experimental/test_trigger_dag.py |  6 +--
 tests/models/test_dagrun.py                       | 23 ++++++++
 4 files changed, 78 insertions(+), 22 deletions(-)

diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 2e64f86..38a873c 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -68,10 +68,12 @@ def _trigger_dag(
             )
 
     run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
-    dag_run = DagRun.find(dag_id=dag_id, run_id=run_id)
+    dag_run = DagRun.find_duplicate(dag_id=dag_id, execution_date=execution_date, run_id=run_id)
 
     if dag_run:
-        raise DagRunAlreadyExists(f"Run id {run_id} already exists for dag id {dag_id}")
+        raise DagRunAlreadyExists(
+            f"A Dag Run already exists for dag id {dag_id} at {execution_date} with run id {run_id}"
+        )
 
     run_conf = None
     if conf:
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 800720c..8d2ab2a 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -285,12 +285,13 @@ class DagRun(Base, LoggingMixin):
             query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
         )
 
-    @staticmethod
+    @classmethod
     @provide_session
     def find(
+        cls,
         dag_id: Optional[Union[str, List[str]]] = None,
         run_id: Optional[str] = None,
-        execution_date: Optional[datetime] = None,
+        execution_date: Optional[Union[datetime, List[datetime]]] = None,
         state: Optional[DagRunState] = None,
         external_trigger: Optional[bool] = None,
         no_backfills: bool = False,
@@ -324,35 +325,65 @@ class DagRun(Base, LoggingMixin):
         :param execution_end_date: dag run that was executed until this date
         :type execution_end_date: datetime.datetime
         """
-        DR = DagRun
-
-        qry = session.query(DR)
+        qry = session.query(cls)
         dag_ids = [dag_id] if isinstance(dag_id, str) else dag_id
         if dag_ids:
-            qry = qry.filter(DR.dag_id.in_(dag_ids))
+            qry = qry.filter(cls.dag_id.in_(dag_ids))
         if run_id:
-            qry = qry.filter(DR.run_id == run_id)
+            qry = qry.filter(cls.run_id == run_id)
         if execution_date:
             if isinstance(execution_date, list):
-                qry = qry.filter(DR.execution_date.in_(execution_date))
+                qry = qry.filter(cls.execution_date.in_(execution_date))
             else:
-                qry = qry.filter(DR.execution_date == execution_date)
+                qry = qry.filter(cls.execution_date == execution_date)
         if execution_start_date and execution_end_date:
-            qry = qry.filter(DR.execution_date.between(execution_start_date, execution_end_date))
+            qry = qry.filter(cls.execution_date.between(execution_start_date, execution_end_date))
         elif execution_start_date:
-            qry = qry.filter(DR.execution_date >= execution_start_date)
+            qry = qry.filter(cls.execution_date >= execution_start_date)
         elif execution_end_date:
-            qry = qry.filter(DR.execution_date <= execution_end_date)
+            qry = qry.filter(cls.execution_date <= execution_end_date)
         if state:
-            qry = qry.filter(DR.state == state)
+            qry = qry.filter(cls.state == state)
         if external_trigger is not None:
-            qry = qry.filter(DR.external_trigger == external_trigger)
+            qry = qry.filter(cls.external_trigger == external_trigger)
         if run_type:
-            qry = qry.filter(DR.run_type == run_type)
+            qry = qry.filter(cls.run_type == run_type)
         if no_backfills:
-            qry = qry.filter(DR.run_type != DagRunType.BACKFILL_JOB)
+            qry = qry.filter(cls.run_type != DagRunType.BACKFILL_JOB)
+
+        return qry.order_by(cls.execution_date).all()
+
+    @classmethod
+    @provide_session
+    def find_duplicate(
+        cls,
+        dag_id: str,
+        run_id: str,
+        execution_date: datetime,
+        session: Session = None,
+    ) -> Optional['DagRun']:
+        """
+        Return an existing run for the DAG with a specific run_id or execution_date.
 
-        return qry.order_by(DR.execution_date).all()
+        *None* is returned if no such DAG run is found.
+
+        :param dag_id: the dag_id to find duplicates for
+        :type dag_id: str
+        :param run_id: defines the run id for this dag run
+        :type run_id: str
+        :param execution_date: the execution date
+        :type execution_date: datetime.datetime
+        :param session: database session
+        :type session: sqlalchemy.orm.session.Session
+        """
+        return (
+            session.query(cls)
+            .filter(
+                cls.dag_id == dag_id,
+                or_(cls.run_id == run_id, cls.execution_date == execution_date),
+            )
+            .one_or_none()
+        )
 
     @staticmethod
     def generate_run_id(run_type: DagRunType, execution_date: datetime) -> str:
diff --git a/tests/api/common/experimental/test_trigger_dag.py b/tests/api/common/experimental/test_trigger_dag.py
index cbca935..2f16446 100644
--- a/tests/api/common/experimental/test_trigger_dag.py
+++ b/tests/api/common/experimental/test_trigger_dag.py
@@ -49,7 +49,7 @@ class TestTriggerDag(unittest.TestCase):
         dag = DAG(dag_id)
         dag_bag_mock.dags = [dag_id]
         dag_bag_mock.get_dag.return_value = dag
-        dag_run_mock.find.return_value = DagRun()
+        dag_run_mock.find_duplicate.return_value = DagRun()
         with pytest.raises(AirflowException):
             _trigger_dag(dag_id, dag_bag_mock)
 
@@ -60,7 +60,7 @@ class TestTriggerDag(unittest.TestCase):
         dag_id = "trigger_dag"
         dag_bag_mock.dags = [dag_id]
         dag_bag_mock.get_dag.return_value = dag_mock
-        dag_run_mock.find.return_value = None
+        dag_run_mock.find_duplicate.return_value = None
         dag1 = mock.MagicMock(subdags=[])
         dag2 = mock.MagicMock(subdags=[])
         dag_mock.subdags = [dag1, dag2]
@@ -76,7 +76,7 @@ class TestTriggerDag(unittest.TestCase):
         dag_id = "trigger_dag"
         dag_bag_mock.dags = [dag_id]
         dag_bag_mock.get_dag.return_value = dag_mock
-        dag_run_mock.find.return_value = None
+        dag_run_mock.find_duplicate.return_value = None
         dag1 = mock.MagicMock(subdags=[])
         dag2 = mock.MagicMock(subdags=[dag1])
         dag_mock.subdags = [dag1, dag2]
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index c4ef287..00799be 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -142,6 +142,29 @@ class TestDagRun(unittest.TestCase):
         assert 0 == len(models.DagRun.find(dag_id=dag_id2, external_trigger=True))
         assert 1 == len(models.DagRun.find(dag_id=dag_id2, external_trigger=False))
 
+    def test_dagrun_find_duplicate(self):
+        session = settings.Session()
+        now = timezone.utcnow()
+
+        dag_id = "test_dagrun_find_duplicate"
+        dag_run = models.DagRun(
+            dag_id=dag_id,
+            run_id=dag_id,
+            run_type=DagRunType.MANUAL,
+            execution_date=now,
+            start_date=now,
+            state=State.RUNNING,
+            external_trigger=True,
+        )
+        session.add(dag_run)
+
+        session.commit()
+
+        assert models.DagRun.find_duplicate(dag_id=dag_id, run_id=dag_id, execution_date=now) is not None
+        assert models.DagRun.find_duplicate(dag_id=dag_id, run_id=dag_id, execution_date=None) is not None
+        assert models.DagRun.find_duplicate(dag_id=dag_id, run_id=None, execution_date=now) is not None
+        assert models.DagRun.find_duplicate(dag_id=dag_id, run_id=None, execution_date=None) is None
+
     def test_dagrun_success_when_all_skipped(self):
         """
         Tests that a DAG run succeeds when all tasks are skipped

[airflow] 16/17: Only mark SchedulerJobs as failed, not any jobs (#19375)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9b014673f379b910c22ca4902523a49c227115af
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Wed Nov 3 00:45:41 2021 -0600

    Only mark SchedulerJobs as failed, not any jobs (#19375)
    
    In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have
    failed `scheduler_health_check_threshold` to failed, however a missing
    condition was allowing that timeout to apply to all jobs, not just SchedulerJobs.
    This is because polymorphic identity isn't included for `update()`:
    https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update
    
    So if we had any running LocalTaskJobs that, for whatever reason, aren't
    heartbeating faster than `scheduler_health_check_threshold`, their state
    gets set to failed and they subsequently exit with a log line similar to:
    
        State of this instance has been externally set to scheduled. Terminating instance.
    
    Note that the state it is set to can be different (e.g. queued or
    up_for_retry) simply depending on how quickly the scheduler has
    progressed that task_instance again.
    
    (cherry picked from commit 38d329bd112e8be891f077b4e3300182930cf74d)
---
 airflow/jobs/scheduler_job.py    |  1 +
 tests/jobs/test_scheduler_job.py | 31 +++++++++++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2c7378d..2a230a7 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1127,6 +1127,7 @@ class SchedulerJob(BaseJob):
                     num_failed = (
                         session.query(SchedulerJob)
                         .filter(
+                            SchedulerJob.job_type == "SchedulerJob",
                             SchedulerJob.state == State.RUNNING,
                             SchedulerJob.latest_heartbeat < (timezone.utcnow() - timedelta(seconds=timeout)),
                         )
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 4922617..5380960 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -37,6 +37,7 @@ from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow.jobs.backfill_job import BackfillJob
+from airflow.jobs.base_job import BaseJob
 from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance
 from airflow.models.dagrun import DagRun
@@ -2445,6 +2446,36 @@ class TestSchedulerJob:
         if old_job.processor_agent:
             old_job.processor_agent.end()
 
+    def test_adopt_or_reset_orphaned_tasks_only_fails_scheduler_jobs(self, caplog):
+        """Make sure we only set SchedulerJobs to failed, not all jobs"""
+        session = settings.Session()
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.state = State.RUNNING
+        self.scheduler_job.latest_heartbeat = timezone.utcnow()
+        session.add(self.scheduler_job)
+        session.flush()
+
+        old_job = SchedulerJob(subdir=os.devnull)
+        old_job.state = State.RUNNING
+        old_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
+        session.add(old_job)
+        session.flush()
+
+        old_task_job = BaseJob()  # Imagine it's a LocalTaskJob, but this is easier to provision
+        old_task_job.state = State.RUNNING
+        old_task_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
+        session.add(old_task_job)
+        session.flush()
+
+        with caplog.at_level('INFO', logger='airflow.jobs.scheduler_job'):
+            self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
+        session.expire_all()
+
+        assert old_job.state == State.FAILED
+        assert old_task_job.state == State.RUNNING
+        assert 'Marked 1 SchedulerJob instances as failed' in caplog.messages
+
     def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker):
         """Test SLA Callbacks are not sent when check_slas is False"""
         dag_id = 'test_send_sla_callbacks_to_processor_sla_disabled'

[airflow] 05/17: Add Note to SLA regarding schedule_interval (#19173)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9a60e6244d497de37e002817b8b0f05bd5249c11
Author: john-jac <75...@users.noreply.github.com>
AuthorDate: Tue Nov 2 15:50:46 2021 -0700

    Add Note to SLA regarding schedule_interval (#19173)
    
    This document entry is intended to make explicit the requirement that SLAs will only be triggered for tasks in that are part of a scheduled DAG run.
    
    Manually triggering DAGs with schedule_interval of None causes the error
    ```
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/dag_processing/processor.py", line 411, in manage_slas
        while dttm < timezone.utcnow():
    TypeError: '<' not supported between instances of 'NoneType' and 'datetime.datetime'
    ```
    
    And manually triggering DAGs with a valid schedule_interval do not produce tasks that can invoke an SLA Miss.  Only scheduled DAGs will check tasks for SLA misses.
    
    (cherry picked from commit b18b2e502509fd492f65e18871b4db20f9e6cc4d)
---
 airflow/dag_processing/processor.py    | 25 +++++++++++++++----------
 docs/apache-airflow/concepts/tasks.rst |  4 ++++
 2 files changed, 19 insertions(+), 10 deletions(-)

diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index dd7937f..c9a9d08 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -414,16 +414,21 @@ class DagFileProcessor(LoggingMixin):
 
             sla_misses = []
             next_info = dag.next_dagrun_info(dag.get_run_data_interval(ti.dag_run), restricted=False)
-            while next_info.logical_date < ts:
-                next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False)
-                if next_info.logical_date + task.sla < ts:
-                    sla_miss = SlaMiss(
-                        task_id=ti.task_id,
-                        dag_id=ti.dag_id,
-                        execution_date=next_info.logical_date,
-                        timestamp=ts,
-                    )
-                    sla_misses.append(sla_miss)
+            if next_info is None:
+                self.log.info("Skipping SLA check for %s because task does not have scheduled date", ti)
+            else:
+                while next_info.logical_date < ts:
+                    next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False)
+                    if next_info is None:
+                        break
+                    if next_info.logical_date + task.sla < ts:
+                        sla_miss = SlaMiss(
+                            task_id=ti.task_id,
+                            dag_id=ti.dag_id,
+                            execution_date=next_info.logical_date,
+                            timestamp=ts,
+                        )
+                        sla_misses.append(sla_miss)
             if sla_misses:
                 session.add_all(sla_misses)
         session.commit()
diff --git a/docs/apache-airflow/concepts/tasks.rst b/docs/apache-airflow/concepts/tasks.rst
index efc7090..0f30dd1 100644
--- a/docs/apache-airflow/concepts/tasks.rst
+++ b/docs/apache-airflow/concepts/tasks.rst
@@ -163,6 +163,10 @@ If you want to disable SLA checking entirely, you can set ``check_slas = False``
 
 To read more about configuring the emails, see :doc:`/howto/email-config`.
 
+.. note::
+
+    Only scheduled tasks will be checked against SLA. For example, manually triggered tasks will not invoke an SLA miss. For more information on ``schedule_interval`` values see :doc:`DAG Run </dag-run>`.
+
 .. _concepts:sla_miss_callback:
 
 sla_miss_callback

[airflow] 02/17: Add explicit session parameter in PoolSlotsAvailableDep (#18875)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8666bf0b7eeac887516884687636bbd900479fa0
Author: Aleksey Kirilishin <54...@users.noreply.github.com>
AuthorDate: Fri Oct 29 09:06:28 2021 +0300

    Add explicit session parameter in PoolSlotsAvailableDep (#18875)
    
    (cherry picked from commit e0aa36ead4bb703abf5702bb1c9b105d60c15b28)
---
 airflow/ti_deps/deps/pool_slots_available_dep.py | 2 +-
 tests/jobs/test_local_task_job.py                | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/ti_deps/deps/pool_slots_available_dep.py b/airflow/ti_deps/deps/pool_slots_available_dep.py
index f8c461e..a37e62a 100644
--- a/airflow/ti_deps/deps/pool_slots_available_dep.py
+++ b/airflow/ti_deps/deps/pool_slots_available_dep.py
@@ -55,7 +55,7 @@ class PoolSlotsAvailableDep(BaseTIDep):
         else:
             # Controlled by UNIQUE key in slot_pool table,
             # only one result can be returned.
-            open_slots = pools[0].open_slots()
+            open_slots = pools[0].open_slots(session=session)
 
         if ti.state in EXECUTION_STATES:
             open_slots += ti.pool_slots
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 663ba43..f23a94f 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -878,5 +878,5 @@ def test_number_of_queries_single_loop(mock_get_task_runner, return_codes, dag_m
     ti.refresh_from_task(task)
 
     job = LocalTaskJob(task_instance=ti, executor=MockExecutor())
-    with assert_queries_count(20):
+    with assert_queries_count(18):
         job.run()