You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/01/21 00:06:52 UTC

[airflow] branch v2-2-test updated (06c82e1 -> 713a807)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 06c82e1  Add changelog for 2.2.3rc2
     new 027e1d1  Bump version to 2.2.4
     new fd5558f  Fixing ses email backend (#18042)
     new d2ae684  Enhance `multiple_outputs` inference of dict typing (#19608)
     new b05722e  Correctly send timing metrics when using dogstatsd (fix schedule_delay metric) (#19973)
     new 03be5a1  Update upgrading.rst with detailed code example of how to resolve post-upgrade warning (#19993)
     new 94865f9  Adds retry on taskinstance retrieval lock (#20030)
     new 1285215  Docs for multiple pool slots (#20257)
     new c7256dc  Doc: Fix incorrect filename references (#20277)
     new d6466ee  fix(dag-dependencies): fix arrow styling (#20303)
     new 1a7f943  Add docs about ``.airflowignore`` (#20311)
     new 614cd3c  Fix typo (#20314)
     new c836e71  Bugfix: Deepcopying Kubernetes Secrets attributes causing issues (#20318)
     new a25d7ce  Un-ignore DeprecationWarning (#20322)
     new 2ee6351  Fix grammar mistakes (#20341)
     new f1a2e50  Correct typo (#20345)
     new 581fcfd  Remove unnecssary logging in experimental API (#20356)
     new 330c365  Fix typo in docs (#20371)
     new b19dfdb  fix(standalone): Remove hardcoded Webserver port (#20429)
     new b43882c  Avoid calling DAG.following_schedule() for TaskInstance.get_template_context() (#20486)
     new 3692007  20496 fix port standalone mode (#20505)
     new cb6891b  Doc: Update Supported column for 1.10.x series (#20592)
     new 9912cf1  Docs: Changed macros to correct classes and modules (#20637)
     new 713a807  Docs: Clarify ``sentry_on`` value is not quoted with example (#20639)

The 23 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .pre-commit-config.yaml                            |  1 +
 README.md                                          | 14 ++--
 airflow/cli/commands/standalone_command.py         | 10 ++-
 airflow/config_templates/config.yml                |  8 +++
 airflow/config_templates/default_airflow.cfg       |  5 ++
 airflow/decorators/base.py                         | 27 ++++---
 airflow/decorators/python.py                       | 16 ++---
 airflow/decorators/python_virtualenv.py            | 10 ++-
 airflow/kubernetes/secret.py                       | 16 +++--
 airflow/models/taskinstance.py                     | 71 +++++++++---------
 airflow/operators/datetime.py                      |  2 +-
 airflow/operators/python.py                        | 26 ++++---
 airflow/operators/weekday.py                       |  2 +-
 airflow/providers/amazon/aws/utils/emailer.py      |  3 +-
 airflow/providers/http/operators/http.py           | 10 +--
 airflow/providers/http/sensors/http.py             |  7 +-
 airflow/sensors/external_task.py                   | 24 +++----
 airflow/sensors/weekday.py                         |  2 +-
 airflow/stats.py                                   |  9 ++-
 airflow/utils/context.py                           | 33 +++++++++
 airflow/utils/context.pyi                          |  6 +-
 airflow/utils/email.py                             | 10 ++-
 airflow/utils/helpers.py                           |  2 +-
 .../log/task_handler_with_custom_formatter.py      |  4 +-
 airflow/utils/operator_helpers.py                  | 84 +++++++++++++++++-----
 airflow/www/api/experimental/endpoints.py          |  8 +--
 airflow/www/static/js/dag_dependencies.js          |  5 +-
 docs/apache-airflow/concepts/dags.rst              | 29 ++++++++
 docs/apache-airflow/concepts/pools.rst             | 42 ++++++++++-
 docs/apache-airflow/concepts/tasks.rst             |  4 +-
 .../howto/create-custom-decorator.rst              |  2 +-
 docs/apache-airflow/howto/email-config.rst         |  7 ++
 docs/apache-airflow/installation/index.rst         |  2 +-
 .../installation/supported-versions.rst            |  4 +-
 docs/apache-airflow/installation/upgrading.rst     | 15 ++++
 docs/apache-airflow/logging-monitoring/errors.rst  |  5 +-
 docs/apache-airflow/modules_management.rst         | 10 +--
 docs/apache-airflow/start/local.rst                |  1 +
 docs/apache-airflow/templates-ref.rst              |  4 +-
 .../extending/add-apt-packages/Dockerfile          |  2 +-
 .../add-build-essential-extend/Dockerfile          |  2 +-
 .../extending/add-providers/Dockerfile             |  2 +-
 .../extending/add-pypi-packages/Dockerfile         |  2 +-
 .../extending/embedding-dags/Dockerfile            |  2 +-
 .../extending/writable-directory/Dockerfile        |  2 +-
 .../restricted/restricted_environments.sh          |  4 +-
 docs/docker-stack/entrypoint.rst                   | 16 ++---
 scripts/ci/kubernetes/ci_run_kubernetes_tests.sh   |  7 +-
 scripts/in_container/entrypoint_ci.sh              |  2 -
 setup.py                                           |  2 +-
 tests/cli/commands/test_task_command.py            |  2 +
 tests/conftest.py                                  | 12 +++-
 tests/core/test_core.py                            | 21 +++---
 tests/core/test_stats.py                           |  5 ++
 tests/decorators/test_python.py                    | 23 ++++--
 tests/models/test_taskinstance.py                  | 25 +++++++
 tests/operators/test_email.py                      |  2 +-
 tests/operators/test_python.py                     |  9 ++-
 tests/operators/test_trigger_dagrun.py             |  2 +-
 tests/providers/amazon/aws/utils/test_emailer.py   | 42 +++++------
 tests/providers/http/sensors/test_http.py          |  4 +-
 tests/sensors/test_external_task_sensor.py         |  8 +--
 tests/utils/test_email.py                          | 14 ++++
 tests/utils/test_log_handlers.py                   |  6 +-
 64 files changed, 524 insertions(+), 234 deletions(-)

[airflow] 07/23: Docs for multiple pool slots (#20257)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 128521548614294e267fe134ffb8a8418fee0376
Author: PApostol <50...@users.noreply.github.com>
AuthorDate: Mon Dec 13 11:54:11 2021 +0000

    Docs for multiple pool slots (#20257)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 7c4bed095e15a5ecb3320aa9e68a468d67832a70)
---
 docs/apache-airflow/concepts/pools.rst | 42 ++++++++++++++++++++++++++++++++--
 1 file changed, 40 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/concepts/pools.rst b/docs/apache-airflow/concepts/pools.rst
index f6c97d7..5395ccf 100644
--- a/docs/apache-airflow/concepts/pools.rst
+++ b/docs/apache-airflow/concepts/pools.rst
@@ -39,13 +39,51 @@ Tasks can then be associated with one of the existing pools by using the ``pool`
 
 
 Tasks will be scheduled as usual while the slots fill up. The number of slots occupied by a task can be configured by
-``pool_slots``. Once capacity is reached, runnable tasks get queued and their state will show as such in the UI.
+``pool_slots`` (see section below). Once capacity is reached, runnable tasks get queued and their state will show as such in the UI.
 As slots free up, queued tasks start running based on the :ref:`concepts:priority-weight` of the task and its
 descendants.
 
-Note that if tasks are not given a pool, they are assigned to a default pool ``default_pool``.  ``default_pool`` is
+Note that if tasks are not given a pool, they are assigned to a default pool ``default_pool``, which is
 initialized with 128 slots and can be modified through the UI or CLI (but cannot be removed).
 
+Using multiple pool slots
+-------------------------
+
+Airflow tasks will each occupy a single pool slot by default, but they can be configured to occupy more with the ``pool_slots`` argument if required.
+This is particularly useful when several tasks that belong to the same pool don't carry the same "computational weight".
+
+For instance, consider a pool with 2 slots, ``Pool(pool='maintenance', slots=2)``, and the following tasks:
+
+.. code-block:: python
+
+    BashOperator(
+        task_id="heavy_task",
+        bash_command="bash backup_data.sh",
+        pool_slots=2,
+        pool="maintenance",
+    )
+
+    BashOperator(
+        task_id="light_task1",
+        bash_command="bash check_files.sh",
+        pool_slots=1,
+        pool="maintenance",
+    )
+
+    BashOperator(
+        task_id="light_task2",
+        bash_command="bash remove_files.sh",
+        pool_slots=1,
+        pool="maintenance",
+    )
+
+Since the heavy task is configured to use 2 pool slots, it depletes the pool when running. Therefore, any of the light tasks must queue and wait
+for the heavy task to complete before they are executed. Here, in terms of resource usage, the heavy task is equivalent to two light tasks running concurrently.
+
+This implementation can prevent overwhelming system resources, which (in this example) could occur when a heavy and a light task are running concurrently.
+On the other hand, both light tasks can run concurrently since they only occupy one pool slot each, while the heavy task would have to wait for two pool
+slots to become available before getting executed.
+
 .. warning::
 
     Pools and SubDAGs do not interact as you might first expect. SubDAGs will *not* honor any pool you set on them at

[airflow] 08/23: Doc: Fix incorrect filename references (#20277)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c7256dcac9af524d4f058e60cb2779db8a1ff5dd
Author: Roberto Li <14...@users.noreply.github.com>
AuthorDate: Thu Dec 16 01:11:02 2021 +0100

    Doc: Fix incorrect filename references (#20277)
    
    Minor typo corrections. I changed the filenames in the example folder structure instead of the later references to be consistent with the other examples in the documentation.
    
    (cherry picked from commit d11087c22ef509831379fa6730496f3a4d4c9eed)
---
 docs/apache-airflow/modules_management.rst | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/docs/apache-airflow/modules_management.rst b/docs/apache-airflow/modules_management.rst
index 8488a58..4bc16f6 100644
--- a/docs/apache-airflow/modules_management.rst
+++ b/docs/apache-airflow/modules_management.rst
@@ -103,8 +103,8 @@ This is an example structure that you might have in your ``dags`` folder:
                  |
                  | my_custom_dags
                                  | __init__.py
-                                 | my_dag_1.py
-                                 | my_dag_2.py
+                                 | my_dag1.py
+                                 | my_dag2.py
                                  | base_dag.py
 
 In the case above, there are the ways you could import the python files:
@@ -123,7 +123,7 @@ shared code in the other folders, not the actual DAGs).
 
 In the example above the dags are only in ``my_custom_dags`` folder, the ``common_package`` should not be
 scanned by scheduler when searching for DAGS, so we should ignore ``common_package`` folder. You also
-want to ignore the ``base_dag`` if you keep a base DAG there that ``my_dag1.py`` and ``my_dag1.py`` derives
+want to ignore the ``base_dag.py`` if you keep a base DAG there that ``my_dag1.py`` and ``my_dag2.py`` derives
 from. Your ``.airflowignore`` should look then like this:
 
 .. code-block:: none
@@ -186,7 +186,7 @@ You should import such shared dag using full path (starting from the directory w
 
 The relative imports are counter-intuitive, and depending on how you start your python code, they can behave
 differently. In Airflow the same DAG file might be parsed in different contexts (by schedulers, by workers
-or during tests) and in those cases, relatives imports might behave differently. Always use full
+or during tests) and in those cases, relative imports might behave differently. Always use full
 python package paths when you import anything in Airflow DAGs, this will save you a lot of troubles.
 You can read more about relative import caveats in
 `this Stack Overflow thread <https://stackoverflow.com/q/16981921/516701>`_.

[airflow] 20/23: 20496 fix port standalone mode (#20505)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 36920077f790f4202238f1bf27608714f0794f25
Author: Kanthi <su...@gmail.com>
AuthorDate: Tue Dec 28 16:15:31 2021 -0500

    20496 fix port standalone mode (#20505)
    
    (cherry picked from commit f743e46c5a4fdd0b76fea2d07729b744644fc416)
---
 airflow/cli/commands/standalone_command.py | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/airflow/cli/commands/standalone_command.py b/airflow/cli/commands/standalone_command.py
index 37d38cb..82a082e 100644
--- a/airflow/cli/commands/standalone_command.py
+++ b/airflow/cli/commands/standalone_command.py
@@ -81,6 +81,8 @@ class StandaloneCommand:
             command=["triggerer"],
             env=env,
         )
+
+        self.web_server_port = conf.getint('webserver', 'WEB_SERVER_PORT', fallback=8080)
         # Run subcommand threads
         for command in self.subcommands.values():
             command.start()
@@ -206,7 +208,11 @@ class StandaloneCommand:
         Detects when all Airflow components are ready to serve.
         For now, it's simply time-based.
         """
-        return self.port_open(8080) and self.job_running(SchedulerJob) and self.job_running(TriggererJob)
+        return (
+            self.port_open(self.web_server_port)
+            and self.job_running(SchedulerJob)
+            and self.job_running(TriggererJob)
+        )
 
     def port_open(self, port):
         """

[airflow] 19/23: Avoid calling DAG.following_schedule() for TaskInstance.get_template_context() (#20486)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b43882ca1a8f26a582f13a5e4443e9d95c8842f5
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Dec 29 03:12:21 2021 +0800

    Avoid calling DAG.following_schedule() for TaskInstance.get_template_context() (#20486)
    
    This can use a more modern mechanism since get_template_context() has
    enough context (namely, the current data interval).
    
    (cherry picked from commit 9e315ff7caec7fd3d4c0dfe8b89ee2a1c7b5fe3a)
---
 airflow/models/taskinstance.py    | 25 +++++++++++++------------
 tests/conftest.py                 | 12 +++++++++++-
 tests/models/test_taskinstance.py | 25 +++++++++++++++++++++++++
 3 files changed, 49 insertions(+), 13 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 716167c..281d067 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1839,14 +1839,14 @@ class TaskInstance(Base, LoggingMixin):
 
         @cache
         def get_yesterday_ds() -> str:
-            return (self.execution_date - timedelta(1)).strftime('%Y-%m-%d')
+            return (logical_date - timedelta(1)).strftime('%Y-%m-%d')
 
         def get_yesterday_ds_nodash() -> str:
             return get_yesterday_ds().replace('-', '')
 
         @cache
         def get_tomorrow_ds() -> str:
-            return (self.execution_date + timedelta(1)).strftime('%Y-%m-%d')
+            return (logical_date + timedelta(1)).strftime('%Y-%m-%d')
 
         def get_tomorrow_ds_nodash() -> str:
             return get_tomorrow_ds().replace('-', '')
@@ -1854,18 +1854,15 @@ class TaskInstance(Base, LoggingMixin):
         @cache
         def get_next_execution_date() -> Optional[pendulum.DateTime]:
             # For manually triggered dagruns that aren't run on a schedule,
-            # next/previous execution dates don't make sense, and should be set
+            # the "next" execution date doesn't make sense, and should be set
             # to execution date for consistency with how execution_date is set
             # for manually triggered tasks, i.e. triggered_date == execution_date.
             if dag_run.external_trigger:
-                next_execution_date = dag_run.execution_date
-            else:
-                with warnings.catch_warnings():
-                    warnings.simplefilter("ignore", DeprecationWarning)
-                    next_execution_date = dag.following_schedule(self.execution_date)
-            if next_execution_date is None:
+                return logical_date
+            next_info = dag.next_dagrun_info(data_interval, restricted=False)
+            if next_info is None:
                 return None
-            return timezone.coerce_datetime(next_execution_date)
+            return timezone.coerce_datetime(next_info.logical_date)
 
         def get_next_ds() -> Optional[str]:
             execution_date = get_next_execution_date()
@@ -1881,11 +1878,15 @@ class TaskInstance(Base, LoggingMixin):
 
         @cache
         def get_prev_execution_date():
+            # For manually triggered dagruns that aren't run on a schedule,
+            # the "previous" execution date doesn't make sense, and should be set
+            # to execution date for consistency with how execution_date is set
+            # for manually triggered tasks, i.e. triggered_date == execution_date.
             if dag_run.external_trigger:
-                return timezone.coerce_datetime(self.execution_date)
+                return logical_date
             with warnings.catch_warnings():
                 warnings.simplefilter("ignore", DeprecationWarning)
-                return dag.previous_schedule(self.execution_date)
+                return dag.previous_schedule(logical_date)
 
         @cache
         def get_prev_ds() -> Optional[str]:
diff --git a/tests/conftest.py b/tests/conftest.py
index f7248d1..9e72d37 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -709,7 +709,15 @@ def create_task_instance(dag_maker, create_dummy_dag):
     Uses ``create_dummy_dag`` to create the dag structure.
     """
 
-    def maker(execution_date=None, dagrun_state=None, state=None, run_id=None, run_type=None, **kwargs):
+    def maker(
+        execution_date=None,
+        dagrun_state=None,
+        state=None,
+        run_id=None,
+        run_type=None,
+        data_interval=None,
+        **kwargs,
+    ):
         if execution_date is None:
             from airflow.utils import timezone
 
@@ -721,6 +729,8 @@ def create_task_instance(dag_maker, create_dummy_dag):
             dagrun_kwargs["run_id"] = run_id
         if run_type is not None:
             dagrun_kwargs["run_type"] = run_type
+        if data_interval is not None:
+            dagrun_kwargs["data_interval"] = data_interval
         dagrun = dag_maker.create_dagrun(**dagrun_kwargs)
         (ti,) = dagrun.task_instances
         ti.state = state
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 8458ea9..d111371 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -30,6 +30,7 @@ import pytest
 from freezegun import freeze_time
 
 from airflow import models, settings
+from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
 from airflow.exceptions import (
     AirflowException,
     AirflowFailException,
@@ -1630,6 +1631,30 @@ class TestTaskInstance:
         with pytest.raises(KeyError):
             ti.task.render_template('{{ var.json.get("missing_variable") }}', context)
 
+    def test_tempalte_with_custom_timetable_deprecated_context(self, create_task_instance):
+        ti = create_task_instance(
+            start_date=DEFAULT_DATE,
+            timetable=AfterWorkdayTimetable(),
+            run_type=DagRunType.SCHEDULED,
+            execution_date=timezone.datetime(2021, 9, 6),
+            data_interval=(timezone.datetime(2021, 9, 6), timezone.datetime(2021, 9, 7)),
+        )
+        context = ti.get_template_context()
+        with pytest.deprecated_call():
+            assert context["execution_date"] == pendulum.DateTime(2021, 9, 6, tzinfo=timezone.TIMEZONE)
+        with pytest.deprecated_call():
+            assert context["next_ds"] == "2021-09-07"
+        with pytest.deprecated_call():
+            assert context["next_ds_nodash"] == "20210907"
+        with pytest.deprecated_call():
+            assert context["next_execution_date"] == pendulum.DateTime(2021, 9, 7, tzinfo=timezone.TIMEZONE)
+        with pytest.deprecated_call():
+            assert context["prev_ds"] is None, "Does not make sense for custom timetable"
+        with pytest.deprecated_call():
+            assert context["prev_ds_nodash"] is None, "Does not make sense for custom timetable"
+        with pytest.deprecated_call():
+            assert context["prev_execution_date"] is None, "Does not make sense for custom timetable"
+
     def test_execute_callback(self, create_task_instance):
         called = False
 

[airflow] 17/23: Fix typo in docs (#20371)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 330c3657922cbabc8ca096fc0ad29f2f5bf42a99
Author: Daniel van der Ende <da...@gmail.com>
AuthorDate: Fri Dec 17 13:38:19 2021 +0100

    Fix typo in docs (#20371)
    
    Minor typo in the task decorator documentation
    
    (cherry picked from commit 7f6ab06d218973900ead79b74b7c9dca4734ee06)
---
 docs/apache-airflow/howto/create-custom-decorator.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/howto/create-custom-decorator.rst b/docs/apache-airflow/howto/create-custom-decorator.rst
index b6a9a7b..ebb6c15 100644
--- a/docs/apache-airflow/howto/create-custom-decorator.rst
+++ b/docs/apache-airflow/howto/create-custom-decorator.rst
@@ -53,7 +53,7 @@ tasks. The steps to create and register ``@task.foo`` are:
 
 3. Register your new decorator in get_provider_info of your provider
 
-    Finally, add a key-value ``task-decortor`` to the dict returned from the provider entrypoint. This should be
+    Finally, add a key-value ``task-decorators`` to the dict returned from the provider entrypoint. This should be
     a list with each item containing ``name`` and ``class-name`` keys. When Airflow starts, the
     ``ProviderManager`` class will automatically import this value and ``task.foo`` will work as a new decorator!
 

[airflow] 18/23: fix(standalone): Remove hardcoded Webserver port (#20429)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b19dfdbc946254cd570b14e032ae9b63042d9339
Author: Jonas Strassel <in...@jonas-strassel.de>
AuthorDate: Tue Dec 21 16:17:00 2021 +0100

    fix(standalone): Remove hardcoded Webserver port (#20429)
    
    Port 8080 is the default port for webserver (https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html?highlight=webserver#webserver). By setting it here again explicitly, we forbid users to override it using AIRFLOW__WEBSERVER__WEB_SERVER_PORT. Removing it IMO is not a breaking change, since it will still default to 8080.
    
    (cherry picked from commit 9d36b1fdac16d8db8907d4b792fdbe13a6e80f7e)
---
 airflow/cli/commands/standalone_command.py | 2 +-
 docs/apache-airflow/start/local.rst        | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/cli/commands/standalone_command.py b/airflow/cli/commands/standalone_command.py
index 41c1684..37d38cb 100644
--- a/airflow/cli/commands/standalone_command.py
+++ b/airflow/cli/commands/standalone_command.py
@@ -72,7 +72,7 @@ class StandaloneCommand:
         self.subcommands["webserver"] = SubCommand(
             self,
             name="webserver",
-            command=["webserver", "--port", "8080"],
+            command=["webserver"],
             env=env,
         )
         self.subcommands["triggerer"] = SubCommand(
diff --git a/docs/apache-airflow/start/local.rst b/docs/apache-airflow/start/local.rst
index b80a933..6441306 100644
--- a/docs/apache-airflow/start/local.rst
+++ b/docs/apache-airflow/start/local.rst
@@ -62,6 +62,7 @@ constraint files to enable reproducible installation, so using ``pip`` and const
 
 Upon running these commands, Airflow will create the ``$AIRFLOW_HOME`` folder
 and create the "airflow.cfg" file with defaults that will get you going fast.
+You can override defaults using environment variables, see :doc:`/configurations-ref`.
 You can inspect the file either in ``$AIRFLOW_HOME/airflow.cfg``, or through the UI in
 the ``Admin->Configuration`` menu. The PID file for the webserver will be stored
 in ``$AIRFLOW_HOME/airflow-webserver.pid`` or in ``/run/airflow/webserver.pid``

[airflow] 21/23: Doc: Update Supported column for 1.10.x series (#20592)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cb6891b9cf929633e805b458043e836015b22b33
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu Dec 30 17:16:03 2021 +0000

    Doc: Update Supported column for 1.10.x series (#20592)
    
    1.10.x is EOL
    
    (cherry picked from commit dcd4c492f0b869b2e7dd80756da5695036d70758)
---
 docs/apache-airflow/installation/supported-versions.rst | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/installation/supported-versions.rst b/docs/apache-airflow/installation/supported-versions.rst
index 2e1ee97..c4a2a26 100644
--- a/docs/apache-airflow/installation/supported-versions.rst
+++ b/docs/apache-airflow/installation/supported-versions.rst
@@ -26,9 +26,9 @@ Apache Airflow version life cycle:
 +---------+-----------------+---------------+-----------------+----------------+
 | Version | State           | First Release | Limited Support | EOL/Terminated |
 +---------+-----------------+---------------+-----------------+----------------+
-| 2       | Supported       | Dec 17, 2020  | Dec 2021        | TBD            |
+| 2       | Supported       | Dec 17, 2020  | TBD             | TBD            |
 +---------+-----------------+---------------+-----------------+----------------+
-| 1.10    | Limited Support | Aug 27, 2018  | Dec 17, 2020    | June 2021      |
+| 1.10    | EOL             | Aug 27, 2018  | Dec 17, 2020    | June 2021      |
 +---------+-----------------+---------------+-----------------+----------------+
 | 1.9     | EOL             | Jan 03, 2018  | Aug 27, 2018    | Aug 2018       |
 +---------+-----------------+---------------+-----------------+----------------+

[airflow] 06/23: Adds retry on taskinstance retrieval lock (#20030)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 94865f9c6b780ab80bc78f6287752c426e769c60
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Tue Dec 7 16:05:47 2021 +0100

    Adds retry on taskinstance retrieval lock (#20030)
    
    Fixes: #19832
    
    Co-authored-by: Jaroslaw Potiuk <ja...@Jaroslaws-MacBook-Pro.local>
    (cherry picked from commit 78c815e22b67e442982b53f41d7d899723d5de9f)
---
 airflow/models/taskinstance.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6e9862e..f37cada 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -91,6 +91,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
 from airflow.utils.operator_helpers import context_to_airflow_vars
 from airflow.utils.platform import getuser
+from airflow.utils.retries import run_with_db_retries
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
 from airflow.utils.state import DagRunState, State
@@ -723,7 +724,9 @@ class TaskInstance(Base, LoggingMixin):
         )
 
         if lock_for_update:
-            ti: Optional[TaskInstance] = qry.with_for_update().first()
+            for attempt in run_with_db_retries(logger=self.log):
+                with attempt:
+                    ti: Optional[TaskInstance] = qry.with_for_update().first()
         else:
             ti = qry.first()
         if ti:

[airflow] 11/23: Fix typo (#20314)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 614cd3c7eed52711001a16b7afba02678c081635
Author: Tanguy <35...@users.noreply.github.com>
AuthorDate: Wed Dec 15 13:15:24 2021 +0100

    Fix typo (#20314)
    
    Build should be built.
    
    (cherry picked from commit de36616e1e3b578d9a5b6727daf7a32fe15c4c32)
---
 docs/apache-airflow/installation/index.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/installation/index.rst b/docs/apache-airflow/installation/index.rst
index e2f23ad..dfb18f5 100644
--- a/docs/apache-airflow/installation/index.rst
+++ b/docs/apache-airflow/installation/index.rst
@@ -155,7 +155,7 @@ This installation method is useful when you are familiar with Container/Docker s
 running Airflow components in isolation from other software running on the same physical or virtual machines with easy
 maintenance of dependencies.
 
-The images are build by Apache Airflow release managers and they use officially released packages from PyPI
+The images are built by Apache Airflow release managers and they use officially released packages from PyPI
 and official constraint files- same that are used for installing Airflow from PyPI.
 
 **Intended users**

[airflow] 13/23: Un-ignore DeprecationWarning (#20322)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a25d7cef7f10be25b2446abe641c0b5822e9d9dc
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Dec 21 18:00:46 2021 +0800

    Un-ignore DeprecationWarning (#20322)
    
    (cherry picked from commit 9876e19273cd56dc53d3a4e287db43acbfa65c4b)
---
 airflow/models/taskinstance.py                     | 41 +++++------
 airflow/operators/datetime.py                      |  2 +-
 airflow/operators/python.py                        | 26 ++++---
 airflow/operators/weekday.py                       |  2 +-
 airflow/providers/http/operators/http.py           | 10 +--
 airflow/providers/http/sensors/http.py             |  7 +-
 airflow/sensors/external_task.py                   | 24 +++----
 airflow/sensors/weekday.py                         |  2 +-
 airflow/utils/context.py                           | 33 +++++++++
 airflow/utils/context.pyi                          |  6 +-
 airflow/utils/helpers.py                           |  2 +-
 .../log/task_handler_with_custom_formatter.py      |  4 +-
 airflow/utils/operator_helpers.py                  | 84 +++++++++++++++++-----
 scripts/ci/kubernetes/ci_run_kubernetes_tests.sh   |  7 +-
 scripts/in_container/entrypoint_ci.sh              |  2 -
 tests/cli/commands/test_task_command.py            |  2 +
 tests/core/test_core.py                            | 21 +++---
 tests/operators/test_email.py                      |  2 +-
 tests/operators/test_python.py                     |  9 ++-
 tests/operators/test_trigger_dagrun.py             |  2 +-
 tests/providers/http/sensors/test_http.py          |  4 +-
 tests/sensors/test_external_task_sensor.py         |  8 +--
 tests/utils/test_log_handlers.py                   |  6 +-
 23 files changed, 195 insertions(+), 111 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index f37cada..716167c 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -86,7 +86,7 @@ from airflow.typing_compat import Literal
 from airflow.utils import timezone
 from airflow.utils.context import ConnectionAccessor, Context, VariableAccessor
 from airflow.utils.email import send_email
-from airflow.utils.helpers import is_container
+from airflow.utils.helpers import is_container, render_template_to_string
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
 from airflow.utils.operator_helpers import context_to_airflow_vars
@@ -2016,7 +2016,7 @@ class TaskInstance(Base, LoggingMixin):
         sanitized_pod = ApiClient().sanitize_for_serialization(pod)
         return sanitized_pod
 
-    def get_email_subject_content(self, exception):
+    def get_email_subject_content(self, exception: BaseException) -> Tuple[str, str, str]:
         """Get the email subject content for exceptions."""
         # For a ti from DB (without ti.task), return the default value
         # Reuse it for smart sensor to send default email alert
@@ -2043,18 +2043,18 @@ class TaskInstance(Base, LoggingMixin):
             'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
         )
 
+        # This function is called after changing the state from State.RUNNING,
+        # so we need to subtract 1 from self.try_number here.
+        current_try_number = self.try_number - 1
+        additional_context = {
+            "exception": exception,
+            "exception_html": exception_html,
+            "try_number": current_try_number,
+            "max_tries": self.max_tries,
+        }
+
         if use_default:
-            jinja_context = {'ti': self}
-            # This function is called after changing the state
-            # from State.RUNNING so need to subtract 1 from self.try_number.
-            jinja_context.update(
-                dict(
-                    exception=exception,
-                    exception_html=exception_html,
-                    try_number=self.try_number - 1,
-                    max_tries=self.max_tries,
-                )
-            )
+            jinja_context = {"ti": self, **additional_context}
             jinja_env = jinja2.Environment(
                 loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True
             )
@@ -2064,24 +2064,15 @@ class TaskInstance(Base, LoggingMixin):
 
         else:
             jinja_context = self.get_template_context()
-
-            jinja_context.update(
-                dict(
-                    exception=exception,
-                    exception_html=exception_html,
-                    try_number=self.try_number - 1,
-                    max_tries=self.max_tries,
-                )
-            )
-
+            jinja_context.update(additional_context)
             jinja_env = self.task.get_template_env()
 
-            def render(key, content):
+            def render(key: str, content: str) -> str:
                 if conf.has_option('email', key):
                     path = conf.get('email', key)
                     with open(path) as f:
                         content = f.read()
-                return jinja_env.from_string(content).render(**jinja_context)
+                return render_template_to_string(jinja_env.from_string(content), jinja_context)
 
             subject = render('subject_template', default_subject)
             html_content = render('html_content_template', default_html_content)
diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py
index 6b1acf7..15d4300 100644
--- a/airflow/operators/datetime.py
+++ b/airflow/operators/datetime.py
@@ -72,7 +72,7 @@ class BranchDateTimeOperator(BaseBranchOperator):
 
     def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
         if self.use_task_execution_date is True:
-            now = timezone.make_naive(context["execution_date"], self.dag.timezone)
+            now = timezone.make_naive(context["logical_date"], self.dag.timezone)
         else:
             now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
 
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 5b552b8..8e51536 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -24,7 +24,7 @@ import types
 import warnings
 from tempfile import TemporaryDirectory
 from textwrap import dedent
-from typing import Callable, Dict, Iterable, List, Optional, Union
+from typing import Any, Callable, Collection, Dict, Iterable, List, Mapping, Optional, Union
 
 import dill
 
@@ -33,7 +33,7 @@ from airflow.models import BaseOperator
 from airflow.models.skipmixin import SkipMixin
 from airflow.models.taskinstance import _CURRENT_CONTEXT
 from airflow.utils.context import Context
-from airflow.utils.operator_helpers import determine_kwargs
+from airflow.utils.operator_helpers import KeywordParameters
 from airflow.utils.process_utils import execute_in_subprocess
 from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script
 
@@ -142,8 +142,8 @@ class PythonOperator(BaseOperator):
         self,
         *,
         python_callable: Callable,
-        op_args: Optional[List] = None,
-        op_kwargs: Optional[Dict] = None,
+        op_args: Optional[Collection[Any]] = None,
+        op_kwargs: Optional[Mapping[str, Any]] = None,
         templates_dict: Optional[Dict] = None,
         templates_exts: Optional[List[str]] = None,
         **kwargs,
@@ -159,7 +159,7 @@ class PythonOperator(BaseOperator):
         if not callable(python_callable):
             raise AirflowException('`python_callable` param must be callable')
         self.python_callable = python_callable
-        self.op_args = op_args or []
+        self.op_args = op_args or ()
         self.op_kwargs = op_kwargs or {}
         self.templates_dict = templates_dict
         if templates_exts:
@@ -169,12 +169,15 @@ class PythonOperator(BaseOperator):
         context.update(self.op_kwargs)
         context['templates_dict'] = self.templates_dict
 
-        self.op_kwargs = determine_kwargs(self.python_callable, self.op_args, context)
+        self.op_kwargs = self.determine_kwargs(context)
 
         return_value = self.execute_callable()
         self.log.info("Done. Returned value was: %s", return_value)
         return return_value
 
+    def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
+        return KeywordParameters.determine(self.python_callable, self.op_args, context).unpacking()
+
     def execute_callable(self):
         """
         Calls the python callable with the given arguments.
@@ -241,11 +244,11 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
 
         self.log.info('Skipping downstream tasks...')
 
-        downstream_tasks = context['task'].get_flat_relatives(upstream=False)
+        downstream_tasks = context["task"].get_flat_relatives(upstream=False)
         self.log.debug("Downstream task_ids %s", downstream_tasks)
 
         if downstream_tasks:
-            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
+            self.skip(context["dag_run"], context["logical_date"], downstream_tasks)
 
         self.log.info("Done.")
 
@@ -345,8 +348,8 @@ class PythonVirtualenvOperator(PythonOperator):
         python_version: Optional[Union[str, int, float]] = None,
         use_dill: bool = False,
         system_site_packages: bool = True,
-        op_args: Optional[List] = None,
-        op_kwargs: Optional[Dict] = None,
+        op_args: Optional[Collection[Any]] = None,
+        op_kwargs: Optional[Mapping[str, Any]] = None,
         string_args: Optional[Iterable[str]] = None,
         templates_dict: Optional[Dict] = None,
         templates_exts: Optional[List[str]] = None,
@@ -392,6 +395,9 @@ class PythonVirtualenvOperator(PythonOperator):
         serializable_context = context.copy_only(serializable_keys)
         return super().execute(context=serializable_context)
 
+    def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
+        return KeywordParameters.determine(self.python_callable, self.op_args, context).serializing()
+
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py
index e1167a5..2e4e656 100644
--- a/airflow/operators/weekday.py
+++ b/airflow/operators/weekday.py
@@ -67,7 +67,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
 
     def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
         if self.use_task_execution_day:
-            now = context["execution_date"]
+            now = context["logical_date"]
         else:
             now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
 
diff --git a/airflow/providers/http/operators/http.py b/airflow/providers/http/operators/http.py
index b629518..d36ceb2 100644
--- a/airflow/providers/http/operators/http.py
+++ b/airflow/providers/http/operators/http.py
@@ -104,7 +104,7 @@ class SimpleHttpOperator(BaseOperator):
             raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
 
     def execute(self, context: Dict[str, Any]) -> Any:
-        from airflow.utils.operator_helpers import make_kwargs_callable
+        from airflow.utils.operator_helpers import determine_kwargs
 
         http = HttpHook(self.method, http_conn_id=self.http_conn_id, auth_type=self.auth_type)
 
@@ -114,10 +114,10 @@ class SimpleHttpOperator(BaseOperator):
         if self.log_response:
             self.log.info(response.text)
         if self.response_check:
-            kwargs_callable = make_kwargs_callable(self.response_check)
-            if not kwargs_callable(response, **context):
+            kwargs = determine_kwargs(self.response_check, [response], context)
+            if not self.response_check(response, **kwargs):
                 raise AirflowException("Response check returned False.")
         if self.response_filter:
-            kwargs_callable = make_kwargs_callable(self.response_filter)
-            return kwargs_callable(response, **context)
+            kwargs = determine_kwargs(self.response_filter, [response], context)
+            return self.response_filter(response, **kwargs)
         return response.text
diff --git a/airflow/providers/http/sensors/http.py b/airflow/providers/http/sensors/http.py
index 6ef55ea..e052c01 100644
--- a/airflow/providers/http/sensors/http.py
+++ b/airflow/providers/http/sensors/http.py
@@ -96,7 +96,7 @@ class HttpSensor(BaseSensorOperator):
         self.hook = HttpHook(method=method, http_conn_id=http_conn_id)
 
     def poke(self, context: Dict[Any, Any]) -> bool:
-        from airflow.utils.operator_helpers import make_kwargs_callable
+        from airflow.utils.operator_helpers import determine_kwargs
 
         self.log.info('Poking: %s', self.endpoint)
         try:
@@ -107,9 +107,8 @@ class HttpSensor(BaseSensorOperator):
                 extra_options=self.extra_options,
             )
             if self.response_check:
-                kwargs_callable = make_kwargs_callable(self.response_check)
-                return kwargs_callable(response, **context)
-
+                kwargs = determine_kwargs(self.response_check, [response], context)
+                return self.response_check(response, **kwargs)
         except AirflowException as exc:
             if str(exc).startswith("404"):
                 return False
diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index c451001..32336d3 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -47,7 +47,7 @@ class ExternalTaskSensorLink(BaseOperatorLink):
 class ExternalTaskSensor(BaseSensorOperator):
     """
     Waits for a different DAG or a task in a different DAG to complete for a
-    specific execution_date
+    specific logical date.
 
     :param external_dag_id: The dag_id that contains the task you want to
         wait for
@@ -65,14 +65,14 @@ class ExternalTaskSensor(BaseSensorOperator):
     :param failed_states: Iterable of failed or dis-allowed states, default is ``None``
     :type failed_states: Iterable
     :param execution_delta: time difference with the previous execution to
-        look at, the default is the same execution_date as the current task or DAG.
+        look at, the default is the same logical date as the current task or DAG.
         For yesterday, use [positive!] datetime.timedelta(days=1). Either
         execution_delta or execution_date_fn can be passed to
         ExternalTaskSensor, but not both.
     :type execution_delta: Optional[datetime.timedelta]
-    :param execution_date_fn: function that receives the current execution date as the first
+    :param execution_date_fn: function that receives the current execution's logical date as the first
         positional argument and optionally any number of keyword arguments available in the
-        context dictionary, and returns the desired execution dates to query.
+        context dictionary, and returns the desired logical dates to query.
         Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor,
         but not both.
     :type execution_date_fn: Optional[Callable]
@@ -157,11 +157,11 @@ class ExternalTaskSensor(BaseSensorOperator):
     @provide_session
     def poke(self, context, session=None):
         if self.execution_delta:
-            dttm = context['execution_date'] - self.execution_delta
+            dttm = context['logical_date'] - self.execution_delta
         elif self.execution_date_fn:
             dttm = self._handle_execution_date_fn(context=context)
         else:
-            dttm = context['execution_date']
+            dttm = context['logical_date']
 
         dttm_filter = dttm if isinstance(dttm, list) else [dttm]
         serialized_dttm_filter = ','.join(dt.isoformat() for dt in dttm_filter)
@@ -260,14 +260,14 @@ class ExternalTaskSensor(BaseSensorOperator):
         """
         from airflow.utils.operator_helpers import make_kwargs_callable
 
-        # Remove "execution_date" because it is already a mandatory positional argument
-        execution_date = context["execution_date"]
-        kwargs = {k: v for k, v in context.items() if k != "execution_date"}
+        # Remove "logical_date" because it is already a mandatory positional argument
+        logical_date = context["logical_date"]
+        kwargs = {k: v for k, v in context.items() if k not in {"execution_date", "logical_date"}}
         # Add "context" in the kwargs for backward compatibility (because context used to be
         # an acceptable argument of execution_date_fn)
         kwargs["context"] = context
         kwargs_callable = make_kwargs_callable(self.execution_date_fn)
-        return kwargs_callable(execution_date, **kwargs)
+        return kwargs_callable(logical_date, **kwargs)
 
 
 class ExternalTaskMarker(DummyOperator):
@@ -281,7 +281,7 @@ class ExternalTaskMarker(DummyOperator):
     :type external_dag_id: str
     :param external_task_id: The task_id of the dependent task that needs to be cleared.
     :type external_task_id: str
-    :param execution_date: The execution_date of the dependent task that needs to be cleared.
+    :param execution_date: The logical date of the dependent task execution that needs to be cleared.
     :type execution_date: str or datetime.datetime
     :param recursion_depth: The maximum level of transitive dependencies allowed. Default is 10.
         This is mostly used for preventing cyclic dependencies. It is fine to increase
@@ -300,7 +300,7 @@ class ExternalTaskMarker(DummyOperator):
         *,
         external_dag_id: str,
         external_task_id: str,
-        execution_date: Optional[Union[str, datetime.datetime]] = "{{ execution_date.isoformat() }}",
+        execution_date: Optional[Union[str, datetime.datetime]] = "{{ logical_date.isoformat() }}",
         recursion_depth: int = 10,
         **kwargs,
     ):
diff --git a/airflow/sensors/weekday.py b/airflow/sensors/weekday.py
index 03e3221..741e166 100644
--- a/airflow/sensors/weekday.py
+++ b/airflow/sensors/weekday.py
@@ -84,6 +84,6 @@ class DayOfWeekSensor(BaseSensorOperator):
             WeekDay(timezone.utcnow().isoweekday()).name,
         )
         if self.use_task_execution_day:
-            return context['execution_date'].isoweekday() in self._week_day_num
+            return context['logical_date'].isoweekday() in self._week_day_num
         else:
             return timezone.utcnow().isoweekday() in self._week_day_num
diff --git a/airflow/utils/context.py b/airflow/utils/context.py
index 61f9319..d8eee04 100644
--- a/airflow/utils/context.py
+++ b/airflow/utils/context.py
@@ -20,6 +20,7 @@
 
 import contextlib
 import copy
+import functools
 import warnings
 from typing import (
     AbstractSet,
@@ -28,12 +29,15 @@ from typing import (
     Dict,
     Iterator,
     List,
+    Mapping,
     MutableMapping,
     Optional,
     Tuple,
     ValuesView,
 )
 
+import lazy_object_proxy
+
 _NOT_SET: Any = object()
 
 
@@ -194,3 +198,32 @@ class Context(MutableMapping[str, Any]):
         new = type(self)({k: v for k, v in self._context.items() if k in keys})
         new._deprecation_replacements = self._deprecation_replacements.copy()
         return new
+
+
+def lazy_mapping_from_context(source: Context) -> Mapping[str, Any]:
+    """Create a mapping that wraps deprecated entries in a lazy object proxy.
+
+    This further delays deprecation warning to until when the entry is actually
+    used, instead of when it's accessed in the context. The result is useful for
+    passing into a callable with ``**kwargs``, which would unpack the mapping
+    too eagerly otherwise.
+
+    This is implemented as a free function because the ``Context`` type is
+    "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom
+    functions.
+
+    :meta private:
+    """
+
+    def _deprecated_proxy_factory(k: str, v: Any) -> Any:
+        replacements = source._deprecation_replacements[k]
+        warnings.warn(_create_deprecation_warning(k, replacements))
+        return v
+
+    def _create_value(k: str, v: Any) -> Any:
+        if k not in source._deprecation_replacements:
+            return v
+        factory = functools.partial(_deprecated_proxy_factory, k, v)
+        return lazy_object_proxy.Proxy(factory)
+
+    return {k: _create_value(k, v) for k, v in source._context.items()}
diff --git a/airflow/utils/context.pyi b/airflow/utils/context.pyi
index 0921d79..44b152c 100644
--- a/airflow/utils/context.pyi
+++ b/airflow/utils/context.pyi
@@ -25,7 +25,7 @@
 # undefined attribute errors from Mypy. Hopefully there will be a mechanism to
 # declare "these are defined, but don't error if others are accessed" someday.
 
-from typing import Any, Optional
+from typing import Any, Mapping, Optional
 
 from pendulum import DateTime
 
@@ -80,3 +80,7 @@ class Context(TypedDict, total=False):
     var: _VariableAccessors
     yesterday_ds: str
     yesterday_ds_nodash: str
+
+class AirflowContextDeprecationWarning(DeprecationWarning): ...
+
+def lazy_mapping_from_context(source: Context) -> Mapping[str, Any]: ...
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index c5f9f27..2215c4c 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -167,7 +167,7 @@ def render_log_filename(ti: "TaskInstance", try_number, filename_template) -> st
     if filename_jinja_template:
         jinja_context = ti.get_template_context()
         jinja_context['try_number'] = try_number
-        return filename_jinja_template.render(**jinja_context)
+        return render_template_to_string(filename_jinja_template, jinja_context)
 
     return filename_template.format(
         dag_id=ti.dag_id,
diff --git a/airflow/utils/log/task_handler_with_custom_formatter.py b/airflow/utils/log/task_handler_with_custom_formatter.py
index 5034d00..b7b431b 100644
--- a/airflow/utils/log/task_handler_with_custom_formatter.py
+++ b/airflow/utils/log/task_handler_with_custom_formatter.py
@@ -20,7 +20,7 @@ import logging
 from logging import StreamHandler
 
 from airflow.configuration import conf
-from airflow.utils.helpers import parse_template_string
+from airflow.utils.helpers import parse_template_string, render_template_to_string
 
 
 class TaskHandlerWithCustomFormatter(StreamHandler):
@@ -52,6 +52,6 @@ class TaskHandlerWithCustomFormatter(StreamHandler):
     def _render_prefix(self, ti):
         if self.prefix_jinja_template:
             jinja_context = ti.get_template_context()
-            return self.prefix_jinja_template.render(**jinja_context)
+            return render_template_to_string(self.prefix_jinja_template, jinja_context)
         logging.warning("'task_log_prefix_template' is in invalid format, ignoring the variable value")
         return ""
diff --git a/airflow/utils/operator_helpers.py b/airflow/utils/operator_helpers.py
index 8c5125b..05c050c 100644
--- a/airflow/utils/operator_helpers.py
+++ b/airflow/utils/operator_helpers.py
@@ -17,7 +17,9 @@
 # under the License.
 #
 from datetime import datetime
-from typing import Callable, Dict, List, Mapping, Tuple, Union
+from typing import Any, Callable, Collection, Mapping
+
+from airflow.utils.context import Context, lazy_mapping_from_context
 
 AIRFLOW_VAR_NAME_FORMAT_MAPPING = {
     'AIRFLOW_CONTEXT_DAG_ID': {'default': 'airflow.ctx.dag_id', 'env_var_format': 'AIRFLOW_CTX_DAG_ID'},
@@ -88,7 +90,67 @@ def context_to_airflow_vars(context, in_env_var_format=False):
     return params
 
 
-def determine_kwargs(func: Callable, args: Union[Tuple, List], kwargs: Mapping) -> Dict:
+class KeywordParameters:
+    """Wrapper representing ``**kwargs`` to a callable.
+
+    The actual ``kwargs`` can be obtained by calling either ``unpacking()`` or
+    ``serializing()``. They behave almost the same and are only different if
+    the containing ``kwargs`` is an Airflow Context object, and the calling
+    function uses ``**kwargs`` in the argument list.
+
+    In this particular case, ``unpacking()`` uses ``lazy-object-proxy`` to
+    prevent the Context from emitting deprecation warnings too eagerly when it's
+    unpacked by ``**``. ``serializing()`` does not do this, and will allow the
+    warnings to be emitted eagerly, which is useful when you want to dump the
+    content and use it somewhere else without needing ``lazy-object-proxy``.
+    """
+
+    def __init__(self, kwargs: Mapping[str, Any], *, wildcard: bool) -> None:
+        self._kwargs = kwargs
+        self._wildcard = wildcard
+
+    @classmethod
+    def determine(
+        cls,
+        func: Callable[..., Any],
+        args: Collection[Any],
+        kwargs: Mapping[str, Any],
+    ) -> "KeywordParameters":
+        import inspect
+        import itertools
+
+        signature = inspect.signature(func)
+        has_wildcard_kwargs = any(p.kind == p.VAR_KEYWORD for p in signature.parameters.values())
+
+        for name in itertools.islice(signature.parameters.keys(), len(args)):
+            # Check if args conflict with names in kwargs.
+            if name in kwargs:
+                raise ValueError(f"The key {name!r} in args is a part of kwargs and therefore reserved.")
+
+        if has_wildcard_kwargs:
+            # If the callable has a **kwargs argument, it's ready to accept all the kwargs.
+            return cls(kwargs, wildcard=True)
+
+        # If the callable has no **kwargs argument, it only wants the arguments it requested.
+        kwargs = {key: kwargs[key] for key in signature.parameters if key in kwargs}
+        return cls(kwargs, wildcard=False)
+
+    def unpacking(self) -> Mapping[str, Any]:
+        """Dump the kwargs mapping to unpack with ``**`` in a function call."""
+        if self._wildcard and isinstance(self._kwargs, Context):
+            return lazy_mapping_from_context(self._kwargs)
+        return self._kwargs
+
+    def serializing(self) -> Mapping[str, Any]:
+        """Dump the kwargs mapping for serialization purposes."""
+        return self._kwargs
+
+
+def determine_kwargs(
+    func: Callable[..., Any],
+    args: Collection[Any],
+    kwargs: Mapping[str, Any],
+) -> Mapping[str, Any]:
     """
     Inspect the signature of a given callable to determine which arguments in kwargs need
     to be passed to the callable.
@@ -99,23 +161,7 @@ def determine_kwargs(func: Callable, args: Union[Tuple, List], kwargs: Mapping)
     :param kwargs: The keyword arguments that need to be filtered before passing to the callable.
     :return: A dictionary which contains the keyword arguments that are compatible with the callable.
     """
-    import inspect
-    import itertools
-
-    signature = inspect.signature(func)
-    has_kwargs = any(p.kind == p.VAR_KEYWORD for p in signature.parameters.values())
-
-    for name in itertools.islice(signature.parameters.keys(), len(args)):
-        # Check if args conflict with names in kwargs
-        if name in kwargs:
-            raise ValueError(f"The key {name} in args is part of kwargs and therefore reserved.")
-
-    if has_kwargs:
-        # If the callable has a **kwargs argument, it's ready to accept all the kwargs.
-        return kwargs
-
-    # If the callable has no **kwargs argument, it only wants the arguments it requested.
-    return {key: kwargs[key] for key in signature.parameters if key in kwargs}
+    return KeywordParameters.determine(func, args, kwargs).unpacking()
 
 
 def make_kwargs_callable(func: Callable) -> Callable:
diff --git a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh
index a97f692..e586c30 100755
--- a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh
+++ b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh
@@ -52,10 +52,7 @@ function parse_tests_to_run() {
         else
             tests_to_run=("${@}")
         fi
-        pytest_args=(
-            "--pythonwarnings=ignore::DeprecationWarning"
-            "--pythonwarnings=ignore::PendingDeprecationWarning"
-        )
+        pytest_args=()
     else
         tests_to_run=("kubernetes_tests")
         pytest_args=(
@@ -64,8 +61,6 @@ function parse_tests_to_run() {
             "--durations=100"
             "--color=yes"
             "--maxfail=50"
-            "--pythonwarnings=ignore::DeprecationWarning"
-            "--pythonwarnings=ignore::PendingDeprecationWarning"
             )
 
     fi
diff --git a/scripts/in_container/entrypoint_ci.sh b/scripts/in_container/entrypoint_ci.sh
index 29f5210..5d7aca0 100755
--- a/scripts/in_container/entrypoint_ci.sh
+++ b/scripts/in_container/entrypoint_ci.sh
@@ -209,8 +209,6 @@ EXTRA_PYTEST_ARGS=(
     "--cov-report=xml:/files/coverage-${TEST_TYPE}-${BACKEND}.xml"
     "--color=yes"
     "--maxfail=50"
-    "--pythonwarnings=ignore::DeprecationWarning"
-    "--pythonwarnings=ignore::PendingDeprecationWarning"
     "--junitxml=${RESULT_LOG_FILE}"
     # timeouts in seconds for individual tests
     "--timeouts-order"
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index 7d246c7..201af16 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -84,6 +84,7 @@ class TestCliTasks(unittest.TestCase):
         args = self.parser.parse_args(['tasks', 'list', 'example_bash_operator', '--tree'])
         task_command.task_list(args)
 
+    @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
     def test_test(self):
         """Test the `airflow test` command"""
         args = self.parser.parse_args(
@@ -96,6 +97,7 @@ class TestCliTasks(unittest.TestCase):
         # Check that prints, and log messages, are shown
         assert "'example_python_operator__print_the_context__20180101'" in stdout.getvalue()
 
+    @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
     def test_test_with_existing_dag_run(self):
         """Test the `airflow test` command"""
         task_id = 'print_the_context'
diff --git a/tests/core/test_core.py b/tests/core/test_core.py
index cae311d..02162e9 100644
--- a/tests/core/test_core.py
+++ b/tests/core/test_core.py
@@ -218,7 +218,7 @@ class TestCore:
             op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_python_op(self, dag_maker):
-        def test_py_op(templates_dict, ds, **kwargs):
+        def test_py_op(templates_dict, ds):
             if not templates_dict['ds'] == ds:
                 raise Exception("failure")
 
@@ -246,10 +246,6 @@ class TestCore:
         assert context['ds'] == '2015-01-01'
         assert context['ds_nodash'] == '20150101'
 
-        # next_ds is 2015-01-02 as the dag schedule is daily.
-        assert context['next_ds'] == '2015-01-02'
-        assert context['next_ds_nodash'] == '20150102'
-
         assert context['ts'] == '2015-01-01T00:00:00+00:00'
         assert context['ts_nodash'] == '20150101T000000'
         assert context['ts_nodash_with_tz'] == '20150101T000000+0000'
@@ -259,6 +255,8 @@ class TestCore:
 
         # Test deprecated fields.
         expected_deprecated_fields = [
+            ("next_ds", "2015-01-02"),
+            ("next_ds_nodash", "20150102"),
             ("prev_ds", "2014-12-31"),
             ("prev_ds_nodash", "20141231"),
             ("yesterday_ds", "2014-12-31"),
@@ -267,14 +265,17 @@ class TestCore:
             ("tomorrow_ds_nodash", "20150102"),
         ]
         for key, expected_value in expected_deprecated_fields:
-            message = (
+            message_beginning = (
                 f"Accessing {key!r} from the template is deprecated and "
                 f"will be removed in a future version."
             )
             with pytest.deprecated_call() as recorder:
                 value = str(context[key])  # Simulate template evaluation to trigger warning.
             assert value == expected_value
-            assert [str(m.message) for m in recorder] == [message]
+
+            recorded_message = [str(m.message) for m in recorder]
+            assert len(recorded_message) == 1
+            assert recorded_message[0].startswith(message_beginning)
 
     def test_bad_trigger_rule(self, dag_maker):
         with pytest.raises(AirflowException):
@@ -338,8 +339,10 @@ class TestCore:
         context = ti.get_template_context()
 
         # next_ds should be the execution date for manually triggered runs
-        assert context['next_ds'] == execution_ds
-        assert context['next_ds_nodash'] == execution_ds_nodash
+        with pytest.deprecated_call():
+            assert context['next_ds'] == execution_ds
+        with pytest.deprecated_call():
+            assert context['next_ds_nodash'] == execution_ds_nodash
 
     def test_dag_params_and_task_params(self, dag_maker):
         # This test case guards how params of DAG and Operator work together.
diff --git a/tests/operators/test_email.py b/tests/operators/test_email.py
index 5419796..ba2acda 100644
--- a/tests/operators/test_email.py
+++ b/tests/operators/test_email.py
@@ -50,7 +50,7 @@ class TestEmailOperator(unittest.TestCase):
             html_content='The quick brown fox jumps over the lazy dog',
             task_id='task',
             dag=self.dag,
-            files=["/tmp/Report-A-{{ execution_date.strftime('%Y-%m-%d') }}.csv"],
+            files=["/tmp/Report-A-{{ ds }}.csv"],
             **kwargs,
         )
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 172468b..ac34468 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -19,6 +19,7 @@ import copy
 import logging
 import sys
 import unittest.mock
+import warnings
 from collections import namedtuple
 from datetime import date, datetime, timedelta
 from subprocess import CalledProcessError
@@ -39,6 +40,7 @@ from airflow.operators.python import (
     get_current_context,
 )
 from airflow.utils import timezone
+from airflow.utils.context import AirflowContextDeprecationWarning
 from airflow.utils.dates import days_ago
 from airflow.utils.session import create_session
 from airflow.utils.state import State
@@ -850,6 +852,7 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
     # This tests might take longer than default 60 seconds as it is serializing a lot of
     # context using dill (which is slow apparently).
     @pytest.mark.execution_timeout(120)
+    @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
     def test_airflow_context(self):
         def f(
             # basic
@@ -890,6 +893,7 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
 
         self._run_as_operator(f, use_dill=True, system_site_packages=True, requirements=None)
 
+    @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
     def test_pendulum_context(self):
         def f(
             # basic
@@ -923,6 +927,7 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
 
         self._run_as_operator(f, use_dill=True, system_site_packages=False, requirements=['pendulum'])
 
+    @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
     def test_base_context(self):
         def f(
             # basic
@@ -1026,7 +1031,9 @@ class MyContextAssertOperator(BaseOperator):
 
 def get_all_the_context(**context):
     current_context = get_current_context()
-    assert context == current_context._context
+    with warnings.catch_warnings():
+        warnings.simplefilter("ignore", AirflowContextDeprecationWarning)
+        assert context == current_context._context
 
 
 @pytest.fixture()
diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py
index ea61687..9ff8735 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -152,7 +152,7 @@ class TestDagRunOperator(TestCase):
         task = TriggerDagRunOperator(
             task_id="test_trigger_dagrun_with_str_execution_date",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date="{{ execution_date }}",
+            execution_date="{{ logical_date }}",
             dag=self.dag,
         )
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
diff --git a/tests/providers/http/sensors/test_http.py b/tests/providers/http/sensors/test_http.py
index 3fc61bb..dc3b41f 100644
--- a/tests/providers/http/sensors/test_http.py
+++ b/tests/providers/http/sensors/test_http.py
@@ -125,8 +125,8 @@ class TestHttpSensor:
         response.status_code = 200
         mock_session_send.return_value = response
 
-        def resp_check(_, execution_date):
-            if execution_date == DEFAULT_DATE:
+        def resp_check(_, logical_date):
+            if logical_date == DEFAULT_DATE:
                 return True
             raise AirflowException('AirflowException raised here!')
 
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index d1e150b..28018b9 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -174,7 +174,7 @@ class TestExternalTaskSensor(unittest.TestCase):
 
     def test_external_task_sensor_fn_multiple_execution_dates(self):
         bash_command_code = """
-{% set s=execution_date.time().second %}
+{% set s=logical_date.time().second %}
 echo "second is {{ s }}"
 if [[ $(( {{ s }} % 60 )) == 1 ]]
     then
@@ -292,7 +292,7 @@ exit 0
         self.test_time_sensor()
 
         def my_func(dt, context):
-            assert context['execution_date'] == dt
+            assert context['logical_date'] == dt
             return dt + timedelta(0)
 
         op1 = ExternalTaskSensor(
@@ -541,7 +541,7 @@ def dag_bag_parent_child():
             task_id="task_1",
             external_dag_id=dag_0.dag_id,
             external_task_id=task_0.task_id,
-            execution_date_fn=lambda execution_date: day_1 if execution_date == day_1 else [],
+            execution_date_fn=lambda logical_date: day_1 if logical_date == day_1 else [],
             mode='reschedule',
         )
 
@@ -884,7 +884,7 @@ def dag_bag_head_tail():
             task_id="tail",
             external_dag_id=dag.dag_id,
             external_task_id=head.task_id,
-            execution_date="{{ tomorrow_ds_nodash }}",
+            execution_date="{{ macros.ds_add(ds, 1) }}",
         )
         head >> body >> tail
 
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 4503dd8..78166a8 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -62,7 +62,7 @@ class TestFileTaskLogHandler:
         assert handler.name == FILE_TASK_HANDLER
 
     def test_file_task_handler_when_ti_value_is_invalid(self):
-        def task_callable(ti, **kwargs):
+        def task_callable(ti):
             ti.log.info("test")
 
         dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE)
@@ -114,7 +114,7 @@ class TestFileTaskLogHandler:
         os.remove(log_filename)
 
     def test_file_task_handler(self):
-        def task_callable(ti, **kwargs):
+        def task_callable(ti):
             ti.log.info("test")
 
         dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE)
@@ -168,7 +168,7 @@ class TestFileTaskLogHandler:
         os.remove(log_filename)
 
     def test_file_task_handler_running(self):
-        def task_callable(ti, **kwargs):
+        def task_callable(ti):
             ti.log.info("test")
 
         dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE)

[airflow] 10/23: Add docs about ``.airflowignore`` (#20311)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1a7f94389274a689590c718d1e4ae800b50bbfa9
Author: Matt Rixman <58...@users.noreply.github.com>
AuthorDate: Wed Dec 15 08:13:19 2021 -0700

    Add docs about ``.airflowignore`` (#20311)
    
    This was deleted  in an [earlier refactor](https://github.com/apache/airflow/pull/15444/files) (see `concepts.rst`).  This PR brings it back.  I added it under the "DAGs" section because even though it's file-based and not dag-based, excluding files that define dags is the most likely use case for this feature (I think).
    
    (cherry picked from commit 6eac2e0807a8be5f39178f079db28ebcd2f83621)
---
 docs/apache-airflow/concepts/dags.rst | 29 +++++++++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index b871895..83d1cbd 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -616,6 +616,35 @@ Note that packaged DAGs come with some caveats:
 
 In general, if you have a complex set of compiled dependencies and modules, you are likely better off using the Python ``virtualenv`` system and installing the necessary packages on your target systems with ``pip``.
 
+``.airflowignore``
+------------------
+
+A ``.airflowignore`` file specifies the directories or files in ``DAG_FOLDER``
+or ``PLUGINS_FOLDER`` that Airflow should intentionally ignore.
+Each line in ``.airflowignore`` specifies a regular expression pattern,
+and directories or files whose names (not DAG id) match any of the patterns
+would be ignored (under the hood, ``Pattern.search()`` is used to match the pattern).
+Overall it works like a ``.gitignore`` file.
+Use the ``#`` character to indicate a comment; all characters
+on a line following a ``#`` will be ignored.
+
+``.airflowignore`` file should be put in your ``DAG_FOLDER``.
+For example, you can prepare a ``.airflowignore`` file with content
+
+.. code-block::
+
+    project_a
+    tenant_[\d]
+
+Then files like ``project_a_dag_1.py``, ``TESTING_project_a.py``, ``tenant_1.py``,
+``project_a/dag_1.py``, and ``tenant_1/dag_1.py`` in your ``DAG_FOLDER`` would be ignored
+(If a directory's name matches any of the patterns, this directory and all its subfolders
+would not be scanned by Airflow at all. This improves efficiency of DAG finding).
+
+The scope of a ``.airflowignore`` file is the directory it is in plus all its subfolders.
+You can also prepare ``.airflowignore`` file for a subfolder in ``DAG_FOLDER`` and it
+would only be applicable for that subfolder.
+
 DAG Dependencies
 ================
 

[airflow] 09/23: fix(dag-dependencies): fix arrow styling (#20303)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d6466ee8b4ea7bbd20adc47283efdca3ffaa97cb
Author: jon-fearer <fe...@gmail.com>
AuthorDate: Tue Dec 14 20:50:00 2021 -0700

    fix(dag-dependencies): fix arrow styling (#20303)
    
    (cherry picked from commit 28045696dd3ea7207b1162c2343ba142e1f75e5d)
---
 airflow/www/static/js/dag_dependencies.js | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/airflow/www/static/js/dag_dependencies.js b/airflow/www/static/js/dag_dependencies.js
index 02f83f6..4e34228 100644
--- a/airflow/www/static/js/dag_dependencies.js
+++ b/airflow/www/static/js/dag_dependencies.js
@@ -200,7 +200,10 @@ const renderGraph = () => {
 
   // Set edges
   edges.forEach((edge) => {
-    g.setEdge(edge.u, edge.v);
+    g.setEdge(edge.u, edge.v, {
+      curve: d3.curveBasis,
+      arrowheadClass: 'arrowhead',
+    });
   });
 
   innerSvg.call(render, g);

[airflow] 22/23: Docs: Changed macros to correct classes and modules (#20637)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9912cf1de91e9e39ba53adc08168b575ce8b0300
Author: rustikk <tu...@gmail.com>
AuthorDate: Mon Jan 3 20:39:51 2022 -0700

    Docs: Changed macros to correct classes and modules (#20637)
    
    closes: #20545
    Fixed docs for time and random macros as the reference to what they are was incorrect.
    
    (cherry picked from commit 8b2299b284ac15900f54bf8c84976cc01f4d597c)
---
 docs/apache-airflow/templates-ref.rst | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/templates-ref.rst b/docs/apache-airflow/templates-ref.rst
index 28aba68..f3c4a46 100644
--- a/docs/apache-airflow/templates-ref.rst
+++ b/docs/apache-airflow/templates-ref.rst
@@ -166,9 +166,9 @@ Variable                            Description
 ``macros.datetime``                 The standard lib's :class:`datetime.datetime`
 ``macros.timedelta``                The standard lib's :class:`datetime.timedelta`
 ``macros.dateutil``                 A reference to the ``dateutil`` package
-``macros.time``                     The standard lib's :class:`datetime.time`
+``macros.time``                     The standard lib's :mod:`time`
 ``macros.uuid``                     The standard lib's :mod:`uuid`
-``macros.random``                   The standard lib's :mod:`random`
+``macros.random``                   The standard lib's :class:`random.random`
 =================================   ==============================================
 
 Some airflow specific macros are also defined:

[airflow] 04/23: Correctly send timing metrics when using dogstatsd (fix schedule_delay metric) (#19973)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b05722e8b98963f65cd5f615d4390bb6377ba660
Author: Lutz Ostkamp <35...@users.noreply.github.com>
AuthorDate: Wed Dec 15 11:42:14 2021 +0100

    Correctly send timing metrics when using dogstatsd (fix schedule_delay metric) (#19973)
    
    (cherry picked from commit 5d405d9cda0b88909e6b726769381044477f4678)
---
 airflow/stats.py         | 9 ++++++---
 tests/core/test_stats.py | 5 +++++
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/airflow/stats.py b/airflow/stats.py
index 0a7004d..d1c4da6 100644
--- a/airflow/stats.py
+++ b/airflow/stats.py
@@ -16,13 +16,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
 import logging
 import socket
 import string
 import textwrap
 import time
 from functools import wraps
-from typing import TYPE_CHECKING, Callable, Optional, TypeVar, cast
+from typing import TYPE_CHECKING, Callable, List, Optional, TypeVar, Union, cast
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException, InvalidStatsNameException
@@ -65,7 +66,7 @@ class StatsLogger(Protocol):
         """Gauge stat"""
 
     @classmethod
-    def timing(cls, stat: str, dt) -> None:
+    def timing(cls, stat: str, dt: Union[float, datetime.timedelta]) -> None:
         """Stats timing"""
 
     @classmethod
@@ -331,10 +332,12 @@ class SafeDogStatsdLogger:
         return None
 
     @validate_stat
-    def timing(self, stat, dt, tags=None):
+    def timing(self, stat, dt: Union[float, datetime.timedelta], tags: Optional[List[str]] = None):
         """Stats timing"""
         if self.allow_list_validator.test(stat):
             tags = tags or []
+            if isinstance(dt, datetime.timedelta):
+                dt = dt.total_seconds()
             return self.dogstatsd.timing(metric=stat, value=dt, tags=tags)
         return None
 
diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py
index 83169e2..c401a2f 100644
--- a/tests/core/test_stats.py
+++ b/tests/core/test_stats.py
@@ -181,9 +181,14 @@ class TestDogStats(unittest.TestCase):
         self.dogstatsd_client.timed.assert_not_called()
 
     def test_timing(self):
+        import datetime
+
         self.dogstatsd.timing("dummy_timer", 123)
         self.dogstatsd_client.timing.assert_called_once_with(metric='dummy_timer', value=123, tags=[])
 
+        self.dogstatsd.timing("dummy_timer", datetime.timedelta(seconds=123))
+        self.dogstatsd_client.timing.assert_called_with(metric='dummy_timer', value=123.0, tags=[])
+
     def test_gauge(self):
         self.dogstatsd.gauge("dummy", 123)
         self.dogstatsd_client.gauge.assert_called_once_with(metric='dummy', sample_rate=1, value=123, tags=[])

[airflow] 12/23: Bugfix: Deepcopying Kubernetes Secrets attributes causing issues (#20318)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c836e71b2f39e069ae880eb7a2eacfb420672fd6
Author: Bas Harenslak <Ba...@users.noreply.github.com>
AuthorDate: Thu Dec 16 00:54:31 2021 +0100

    Bugfix: Deepcopying Kubernetes Secrets attributes causing issues (#20318)
    
    Encountered a nasty bug where somebody basically implemented their own KubernetesPodSensor, which failed after more than one attempt when using mode="poke" + a volume + a secret.
    
    Root cause turned out to be in `secret.attach_to_pod()`. In here, a volume and volumemount is created to mount the secret. A deepcopy() is made of the given Pod spec. In order to avoid appending to None, there is this line: `cp_pod.spec.volumes = pod.spec.volumes or []`. In case a volume is set on the Pod spec, a reference is created to the original pod spec volumes, which in turn was a reference to `self.volumes`. As a result, each secret resulted in a volume added to `self.volumes`, [...]
    
    This PR references the deepcopied object instead, and creates a new list if pod.spec.volumes is None.
    
    Co-authored-by: Bas Harenslak <ba...@astronomer.io>
    (cherry picked from commit 2409760694b668213a111712bb1162884c23dd2d)
---
 airflow/kubernetes/secret.py | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/airflow/kubernetes/secret.py b/airflow/kubernetes/secret.py
index 20ed27b..1ca2611 100644
--- a/airflow/kubernetes/secret.py
+++ b/airflow/kubernetes/secret.py
@@ -91,20 +91,28 @@ class Secret(K8SModel):
     def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod:
         """Attaches to pod"""
         cp_pod = copy.deepcopy(pod)
+
         if self.deploy_type == 'volume':
             volume, volume_mount = self.to_volume_secret()
-            cp_pod.spec.volumes = pod.spec.volumes or []
+            if cp_pod.spec.volumes is None:
+                cp_pod.spec.volumes = []
             cp_pod.spec.volumes.append(volume)
-            cp_pod.spec.containers[0].volume_mounts = pod.spec.containers[0].volume_mounts or []
+            if cp_pod.spec.containers[0].volume_mounts is None:
+                cp_pod.spec.containers[0].volume_mounts = []
             cp_pod.spec.containers[0].volume_mounts.append(volume_mount)
+
         if self.deploy_type == 'env' and self.key is not None:
             env = self.to_env_secret()
-            cp_pod.spec.containers[0].env = cp_pod.spec.containers[0].env or []
+            if cp_pod.spec.containers[0].env is None:
+                cp_pod.spec.containers[0].env = []
             cp_pod.spec.containers[0].env.append(env)
+
         if self.deploy_type == 'env' and self.key is None:
             env_from = self.to_env_from_secret()
-            cp_pod.spec.containers[0].env_from = cp_pod.spec.containers[0].env_from or []
+            if cp_pod.spec.containers[0].env_from is None:
+                cp_pod.spec.containers[0].env_from = []
             cp_pod.spec.containers[0].env_from.append(env_from)
+
         return cp_pod
 
     def __eq__(self, other):

[airflow] 16/23: Remove unnecssary logging in experimental API (#20356)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 581fcfdd58705a44c24cacbe364786b0422a0d1d
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Dec 17 20:47:49 2021 +0000

    Remove unnecssary logging in experimental API (#20356)
    
    The `execution_data` does not need to be passed to log. We send enough details to the API user in the response.
    
    (cherry picked from commit 790bc784435646c043d8def7096917a4ce0a62f7)
---
 airflow/www/api/experimental/endpoints.py | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index 91528e9..30c2728 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -103,11 +103,11 @@ def trigger_dag(dag_id):
         try:
             execution_date = timezone.parse(execution_date)
         except ValueError:
+            log.error("Given execution date could not be identified as a date.")
             error_message = (
                 'Given execution date, {}, could not be identified '
                 'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(execution_date)
             )
-            log.error(error_message)
             response = jsonify({'error': error_message})
             response.status_code = 400
 
@@ -253,11 +253,11 @@ def task_instance_info(dag_id, execution_date, task_id):
     try:
         execution_date = timezone.parse(execution_date)
     except ValueError:
+        log.error("Given execution date could not be identified as a date.")
         error_message = (
             'Given execution date, {}, could not be identified '
             'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(execution_date)
         )
-        log.error(error_message)
         response = jsonify({'error': error_message})
         response.status_code = 400
 
@@ -289,11 +289,11 @@ def dag_run_status(dag_id, execution_date):
     try:
         execution_date = timezone.parse(execution_date)
     except ValueError:
+        log.error("Given execution date could not be identified as a date.")
         error_message = (
             'Given execution date, {}, could not be identified '
             'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(execution_date)
         )
-        log.error(error_message)
         response = jsonify({'error': error_message})
         response.status_code = 400
 
@@ -402,11 +402,11 @@ def get_lineage(dag_id: str, execution_date: str):
     try:
         execution_dt = timezone.parse(execution_date)
     except ValueError:
+        log.error("Given execution date could not be identified as a date.")
         error_message = (
             'Given execution date, {}, could not be identified '
             'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(execution_date)
         )
-        log.error(error_message)
         response = jsonify({'error': error_message})
         response.status_code = 400
 

[airflow] 05/23: Update upgrading.rst with detailed code example of how to resolve post-upgrade warning (#19993)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 03be5a186520d78afb4ac15368594d0e0c7c4468
Author: adaezebestow <87...@users.noreply.github.com>
AuthorDate: Sun Dec 19 12:26:04 2021 -0500

    Update upgrading.rst with detailed code example of how to resolve post-upgrade warning (#19993)
    
    (cherry picked from commit 4ac35d723b73d02875d56bf000aafd2235ef0f4a)
---
 docs/apache-airflow/installation/upgrading.rst | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/docs/apache-airflow/installation/upgrading.rst b/docs/apache-airflow/installation/upgrading.rst
index cd29e32..929c604 100644
--- a/docs/apache-airflow/installation/upgrading.rst
+++ b/docs/apache-airflow/installation/upgrading.rst
@@ -79,6 +79,21 @@ table or rename it or move it to another database using those tools. If you don'
 can use the ``airflow db shell`` command - this will drop you in the db shell tool for your database and you
 will be able to both inspect and delete the table.
 
+How to drop the table using Kubernetes:
+
+
+1. Exec into any of the Airflow pods - webserver or scheduler: ``kubectl exec -it <your-webserver-pod> python``
+
+2. Run the following commands in the python shell:
+
+ .. code-block:: python
+
+     from airflow.settings import Session
+
+     session = Session()
+     session.execute("DROP TABLE _airflow_moved__2_2__task_instance")
+     session.commit()
+
 Please replace ``<table>`` in the examples with the actual table name as printed in the warning message.
 
 Inspecting a table:

[airflow] 15/23: Correct typo (#20345)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f1a2e5024cdfeac4cbe80437991d9c44a8ac7850
Author: Luís Miranda <lu...@gmail.com>
AuthorDate: Thu Dec 16 15:58:52 2021 +0000

    Correct typo (#20345)
    
    (cherry picked from commit c4d2e16197c5f49493c142bfd9b754ea3c816f48)
---
 docs/apache-airflow/modules_management.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/modules_management.rst b/docs/apache-airflow/modules_management.rst
index 4bc16f6..00e031c 100644
--- a/docs/apache-airflow/modules_management.rst
+++ b/docs/apache-airflow/modules_management.rst
@@ -107,7 +107,7 @@ This is an example structure that you might have in your ``dags`` folder:
                                  | my_dag2.py
                                  | base_dag.py
 
-In the case above, there are the ways you could import the python files:
+In the case above, these are the ways you could import the python files:
 
 .. code-block:: python
 

[airflow] 03/23: Enhance `multiple_outputs` inference of dict typing (#19608)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d2ae684a09c85fc557b24e2fb4421df7da79a9b0
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Mon Jan 10 11:14:22 2022 -0500

    Enhance `multiple_outputs` inference of dict typing (#19608)
    
    (cherry picked from commit 4198550bba474e7942705a4c6df2ad916fb76561)
---
 .pre-commit-config.yaml                 |  1 +
 airflow/decorators/base.py              | 27 +++++++++++++++++----------
 airflow/decorators/python.py            | 16 ++++++----------
 airflow/decorators/python_virtualenv.py | 10 ++++------
 tests/decorators/test_python.py         | 23 ++++++++++++++++++-----
 5 files changed, 46 insertions(+), 31 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 0003071..08d60ab 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -195,6 +195,7 @@ repos:
           - "4"
         files: ^chart/values\.schema\.json$|^chart/values_schema\.schema\.json$
         pass_filenames: true
+  # TODO: Bump to Python 3.7 when support for Python 3.6 is dropped in Airflow 2.3.
   - repo: https://github.com/asottile/pyupgrade
     rev: v2.29.0
     hooks:
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 229a114..cd76839 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -18,6 +18,7 @@
 import functools
 import inspect
 import re
+import sys
 from inspect import signature
 from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
 
@@ -91,9 +92,8 @@ class DecoratedOperator(BaseOperator):
     :param op_args: a list of positional arguments that will get unpacked when
         calling your callable (templated)
     :type op_args: list
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
-        Defaults to False.
+    :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+        multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     :type multiple_outputs: bool
     :param kwargs_to_upstream: For certain operators, we might need to upstream certain arguments
         that would otherwise be absorbed by the DecoratedOperator (for example python_callable for the
@@ -189,10 +189,8 @@ def task_decorator_factory(
 
     :param python_callable: Function to decorate
     :type python_callable: Optional[Callable]
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. List/Tuples will unroll to xcom values
-        with index as key. Dict will unroll to xcom values with keys as XCom keys.
-        Defaults to False.
+    :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+        multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     :type multiple_outputs: bool
     :param decorated_operator_class: The operator that executes the logic needed to run the python function in
         the correct environment
@@ -201,10 +199,19 @@ def task_decorator_factory(
     """
     # try to infer from  type annotation
     if python_callable and multiple_outputs is None:
-        sig = signature(python_callable).return_annotation
-        ttype = getattr(sig, "__origin__", None)
+        return_type = signature(python_callable).return_annotation
+
+        # If the return type annotation is already the builtins ``dict`` type, use it for the inference.
+        if return_type == dict:
+            ttype = return_type
+        # Checking if Python 3.6, ``__origin__`` attribute does not exist until 3.7; need to use ``__extra__``
+        # TODO: Remove check when support for Python 3.6 is dropped in Airflow 2.3.
+        elif sys.version_info < (3, 7):
+            ttype = getattr(return_type, "__extra__", None)
+        else:
+            ttype = getattr(return_type, "__origin__", None)
 
-        multiple_outputs = sig != inspect.Signature.empty and ttype in (dict, Dict)
+        multiple_outputs = return_type != inspect.Signature.empty and ttype in (dict, Dict)
 
     def wrapper(f: T):
         """
diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py
index 7dc6c1b..2411761 100644
--- a/airflow/decorators/python.py
+++ b/airflow/decorators/python.py
@@ -33,9 +33,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
     :param op_args: a list of positional arguments that will get unpacked when
         calling your callable (templated)
     :type op_args: list
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
-        Defaults to False.
+    :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+        multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     :type multiple_outputs: bool
     """
 
@@ -85,9 +84,8 @@ class PythonDecoratorMixin:
 
         :param python_callable: Function to decorate
         :type python_callable: Optional[Callable]
-        :param multiple_outputs: if set, function return value will be
-            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
-            with index as key. Dict will unroll to xcom values with keys as XCom keys.
+        :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+            multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
             Defaults to False.
         :type multiple_outputs: bool
         """
@@ -109,10 +107,8 @@ def python_task(
 
     :param python_callable: Function to decorate
     :type python_callable: Optional[Callable]
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. List/Tuples will unroll to xcom values
-        with index as key. Dict will unroll to xcom values with keys as XCom keys.
-        Defaults to False.
+    :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+        multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     :type multiple_outputs: bool
     """
     return task_decorator_factory(
diff --git a/airflow/decorators/python_virtualenv.py b/airflow/decorators/python_virtualenv.py
index 8024e5a..d412344 100644
--- a/airflow/decorators/python_virtualenv.py
+++ b/airflow/decorators/python_virtualenv.py
@@ -36,9 +36,8 @@ class _PythonVirtualenvDecoratedOperator(DecoratedOperator, PythonVirtualenvOper
     :param op_args: a list of positional arguments that will get unpacked when
         calling your callable (templated)
     :type op_args: list
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
-        Defaults to False.
+    :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+        multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     :type multiple_outputs: bool
     """
 
@@ -88,9 +87,8 @@ class PythonVirtualenvDecoratorMixin:
 
         :param python_callable: Function to decorate
         :type python_callable: Optional[Callable]
-        :param multiple_outputs: if set, function return value will be
-            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
-            with index as key. Dict will unroll to xcom values with keys as XCom keys.
+        :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+            multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
             Defaults to False.
         :type multiple_outputs: bool
         """
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 8782999..798d877 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -15,12 +15,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import sys
 import unittest.mock
 from collections import namedtuple
 from datetime import date, timedelta
 from typing import Dict, Tuple
 
 import pytest
+from parameterized import parameterized
 
 from airflow.decorators import task as task_decorator
 from airflow.exceptions import AirflowException
@@ -112,13 +114,24 @@ class TestAirflowTaskDecorator(TestPythonBase):
         with pytest.raises(AirflowException):
             task_decorator(not_callable, dag=self.dag)
 
-    def test_infer_multiple_outputs_using_typing(self):
-        @task_decorator
-        def identity_dict(x: int, y: int) -> Dict[str, int]:
-            return {"x": x, "y": y}
+    @parameterized.expand([["dict"], ["dict[str, int]"], ["Dict"], ["Dict[str, int]"]])
+    def test_infer_multiple_outputs_using_dict_typing(self, test_return_annotation):
+        if sys.version_info < (3, 9) and test_return_annotation == "dict[str, int]":
+            self.skipTest("dict[...] not a supported typing prior to Python 3.9")
+
+            @task_decorator
+            def identity_dict(x: int, y: int) -> eval(test_return_annotation):
+                return {"x": x, "y": y}
+
+            assert identity_dict(5, 5).operator.multiple_outputs is True
+
+            @task_decorator
+            def identity_dict_stringified(x: int, y: int) -> test_return_annotation:
+                return {"x": x, "y": y}
 
-        assert identity_dict(5, 5).operator.multiple_outputs is True
+            assert identity_dict_stringified(5, 5).operator.multiple_outputs is True
 
+    def test_infer_multiple_outputs_using_other_typing(self):
         @task_decorator
         def identity_tuple(x: int, y: int) -> Tuple[int, int]:
             return x, y

[airflow] 01/23: Bump version to 2.2.4

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 027e1d1e114ab92410bb3e1a8c2edfa9a9b5134b
Author: Jed Cunningham <je...@apache.org>
AuthorDate: Thu Jan 20 13:55:04 2022 -0700

    Bump version to 2.2.4
---
 README.md                                                | 14 +++++++-------
 .../extending/add-apt-packages/Dockerfile                |  2 +-
 .../extending/add-build-essential-extend/Dockerfile      |  2 +-
 .../docker-examples/extending/add-providers/Dockerfile   |  2 +-
 .../extending/add-pypi-packages/Dockerfile               |  2 +-
 .../docker-examples/extending/embedding-dags/Dockerfile  |  2 +-
 .../extending/writable-directory/Dockerfile              |  2 +-
 .../restricted/restricted_environments.sh                |  4 ++--
 docs/docker-stack/entrypoint.rst                         | 16 ++++++++--------
 setup.py                                                 |  2 +-
 10 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/README.md b/README.md
index cc25821..2534c94 100644
--- a/README.md
+++ b/README.md
@@ -82,7 +82,7 @@ Airflow is not a streaming solution, but it is often used to process real-time d
 
 Apache Airflow is tested with:
 
-|                      | Main version (dev)        | Stable version (2.2.3)   |
+|                      | Main version (dev)        | Stable version (2.2.4)   |
 | -------------------- | ------------------------- | ------------------------ |
 | Python               | 3.6, 3.7, 3.8, 3.9        | 3.6, 3.7, 3.8, 3.9       |
 | Kubernetes           | 1.18, 1.19, 1.20          | 1.18, 1.19, 1.20         |
@@ -153,15 +153,15 @@ them to the appropriate format and workflow that your tool requires.
 
 
 ```bash
-pip install 'apache-airflow==2.2.3' \
- --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.7.txt"
+pip install 'apache-airflow==2.2.4' \
+ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.4/constraints-3.7.txt"
 ```
 
 2. Installing with extras (i.e., postgres, google)
 
 ```bash
-pip install 'apache-airflow[postgres,google]==2.2.3' \
- --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.7.txt"
+pip install 'apache-airflow[postgres,google]==2.2.4' \
+ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.4/constraints-3.7.txt"
 ```
 
 For information on installing provider packages, check
@@ -263,7 +263,7 @@ Apache Airflow version life cycle:
 
 | Version | Current Patch/Minor | State     | First Release | Limited Support | EOL/Terminated |
 |---------|---------------------|-----------|---------------|-----------------|----------------|
-| 2       | 2.2.3               | Supported | Dec 17, 2020  | TBD             | TBD            |
+| 2       | 2.2.4               | Supported | Dec 17, 2020  | TBD             | TBD            |
 | 1.10    | 1.10.15             | EOL       | Aug 27, 2018  | Dec 17, 2020    | June 17, 2021  |
 | 1.9     | 1.9.0               | EOL       | Jan 03, 2018  | Aug 27, 2018    | Aug 27, 2018   |
 | 1.8     | 1.8.2               | EOL       | Mar 19, 2017  | Jan 03, 2018    | Jan 03, 2018   |
@@ -290,7 +290,7 @@ They are based on the official release schedule of Python and Kubernetes, nicely
 2. The "oldest" supported version of Python/Kubernetes is the default one until we decide to switch to
    later version. "Default" is only meaningful in terms of "smoke tests" in CI PRs, which are run using this
    default version and the default reference image available. Currently `apache/airflow:latest`
-   and `apache/airflow:2.2.3` images are Python 3.7 images as we are preparing for 23.12.2021 when will
+   and `apache/airflow:2.2.4` images are Python 3.7 images as we are preparing for 23.12.2021 when will
    Python 3.6 reaches end of life.
 
 3. We support a new version of Python/Kubernetes in main after they are officially released, as soon as we
diff --git a/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile b/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile
index de55cd6..18d5461 100644
--- a/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile
@@ -15,7 +15,7 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.3
+FROM apache/airflow:2.2.4
 USER root
 RUN apt-get update \
   && apt-get install -y --no-install-recommends \
diff --git a/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile b/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile
index 220b917..b5d5cd1 100644
--- a/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile
@@ -15,7 +15,7 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.3
+FROM apache/airflow:2.2.4
 USER root
 RUN apt-get update \
   && apt-get install -y --no-install-recommends \
diff --git a/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile b/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile
index bb17c3a..1786f2e 100644
--- a/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile
@@ -15,6 +15,6 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.3
+FROM apache/airflow:2.2.4
 RUN pip install --no-cache-dir apache-airflow-providers-docker==2.1.0
 # [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile b/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile
index b487d6e..feaf714 100644
--- a/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile
@@ -15,6 +15,6 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.3
+FROM apache/airflow:2.2.4
 RUN pip install --no-cache-dir lxml
 # [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile b/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile
index e5562ef..9342fae 100644
--- a/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile
@@ -15,7 +15,7 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.3
+FROM apache/airflow:2.2.4
 
 COPY --chown=airflow:root test_dag.py /opt/airflow/dags
 
diff --git a/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile b/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile
index 42f1c06..ffcb8ad 100644
--- a/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile
@@ -15,7 +15,7 @@
 
 # This is an example Dockerfile. It is not intended for PRODUCTION use
 # [START Dockerfile]
-FROM apache/airflow:2.2.3
+FROM apache/airflow:2.2.4
 RUN umask 0002; \
     mkdir -p ~/writeable-directory
 # [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/restricted/restricted_environments.sh b/docs/docker-stack/docker-examples/restricted/restricted_environments.sh
index 4eefc69..3a87f43 100755
--- a/docs/docker-stack/docker-examples/restricted/restricted_environments.sh
+++ b/docs/docker-stack/docker-examples/restricted/restricted_environments.sh
@@ -25,7 +25,7 @@ cd "${AIRFLOW_SOURCES}"
 rm docker-context-files/*.whl docker-context-files/*.tar.gz docker-context-files/*.txt || true
 
 curl -Lo "docker-context-files/constraints-3.7.txt" \
-    https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.7.txt
+    https://raw.githubusercontent.com/apache/airflow/constraints-2.2.4/constraints-3.7.txt
 
 # For Airflow pre 2.1 you need to use PIP 20.2.4 to install/download Airflow packages.
 pip install pip==20.2.4
@@ -39,7 +39,7 @@ pip download --dest docker-context-files \
 docker build . \
     --build-arg PYTHON_BASE_IMAGE="python:3.7-slim-buster" \
     --build-arg AIRFLOW_INSTALLATION_METHOD="apache-airflow" \
-    --build-arg AIRFLOW_VERSION="2.2.3" \
+    --build-arg AIRFLOW_VERSION="2.2.4" \
     --build-arg INSTALL_MYSQL_CLIENT="false" \
     --build-arg INSTALL_MSSQL_CLIENT="false" \
     --build-arg AIRFLOW_PRE_CACHED_PIP_PACKAGES="false" \
diff --git a/docs/docker-stack/entrypoint.rst b/docs/docker-stack/entrypoint.rst
index e63a230..542e7e5 100644
--- a/docs/docker-stack/entrypoint.rst
+++ b/docs/docker-stack/entrypoint.rst
@@ -132,7 +132,7 @@ if you specify extra arguments. For example:
 
 .. code-block:: bash
 
-  docker run -it apache/airflow:2.2.3-python3.6 bash -c "ls -la"
+  docker run -it apache/airflow:2.2.4-python3.6 bash -c "ls -la"
   total 16
   drwxr-xr-x 4 airflow root 4096 Jun  5 18:12 .
   drwxr-xr-x 1 root    root 4096 Jun  5 18:12 ..
@@ -144,7 +144,7 @@ you pass extra parameters. For example:
 
 .. code-block:: bash
 
-  > docker run -it apache/airflow:2.2.3-python3.6 python -c "print('test')"
+  > docker run -it apache/airflow:2.2.4-python3.6 python -c "print('test')"
   test
 
 If first argument equals to "airflow" - the rest of the arguments is treated as an airflow command
@@ -152,13 +152,13 @@ to execute. Example:
 
 .. code-block:: bash
 
-   docker run -it apache/airflow:2.2.3-python3.6 airflow webserver
+   docker run -it apache/airflow:2.2.4-python3.6 airflow webserver
 
 If there are any other arguments - they are simply passed to the "airflow" command
 
 .. code-block:: bash
 
-  > docker run -it apache/airflow:2.2.3-python3.6 help
+  > docker run -it apache/airflow:2.2.4-python3.6 help
     usage: airflow [-h] GROUP_OR_COMMAND ...
 
     positional arguments:
@@ -258,7 +258,7 @@ And then you can run this script by running the command:
 
 .. code-block:: bash
 
-  docker run -it apache/airflow:2.2.3-python3.6 bash -c "/my_after_entrypoint_script.sh"
+  docker run -it apache/airflow:2.2.4-python3.6 bash -c "/my_after_entrypoint_script.sh"
 
 
 Signal propagation
@@ -363,7 +363,7 @@ database and creating an ``admin/admin`` Admin user with the following command:
     --env "_AIRFLOW_DB_UPGRADE=true" \
     --env "_AIRFLOW_WWW_USER_CREATE=true" \
     --env "_AIRFLOW_WWW_USER_PASSWORD=admin" \
-      apache/airflow:2.2.3-python3.8 webserver
+      apache/airflow:2.2.4-python3.8 webserver
 
 
 .. code-block:: bash
@@ -372,7 +372,7 @@ database and creating an ``admin/admin`` Admin user with the following command:
     --env "_AIRFLOW_DB_UPGRADE=true" \
     --env "_AIRFLOW_WWW_USER_CREATE=true" \
     --env "_AIRFLOW_WWW_USER_PASSWORD_CMD=echo admin" \
-      apache/airflow:2.2.3-python3.8 webserver
+      apache/airflow:2.2.4-python3.8 webserver
 
 The commands above perform initialization of the SQLite database, create admin user with admin password
 and Admin role. They also forward local port ``8080`` to the webserver port and finally start the webserver.
@@ -412,6 +412,6 @@ Example:
     --env "_AIRFLOW_DB_UPGRADE=true" \
     --env "_AIRFLOW_WWW_USER_CREATE=true" \
     --env "_AIRFLOW_WWW_USER_PASSWORD_CMD=echo admin" \
-      apache/airflow:2.2.3-python3.8 webserver
+      apache/airflow:2.2.4-python3.8 webserver
 
 This method is only available starting from Docker image of Airflow 2.1.1 and above.
diff --git a/setup.py b/setup.py
index 90a2037..d1ac695 100644
--- a/setup.py
+++ b/setup.py
@@ -41,7 +41,7 @@ PY39 = sys.version_info >= (3, 9)
 
 logger = logging.getLogger(__name__)
 
-version = '2.2.3'
+version = '2.2.4'
 
 my_dir = dirname(__file__)
 

[airflow] 02/23: Fixing ses email backend (#18042)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fd5558fe9eb4c82f1aa49244deb75a6d500334bc
Author: ignaski <53...@users.noreply.github.com>
AuthorDate: Fri Oct 29 12:10:49 2021 +0300

    Fixing ses email backend (#18042)
    
    (cherry picked from commit 1543dc28f4a2f1631dfaedd948e646a181ccf7ee)
---
 airflow/config_templates/config.yml              |  8 +++++
 airflow/config_templates/default_airflow.cfg     |  5 +++
 airflow/providers/amazon/aws/utils/emailer.py    |  3 +-
 airflow/utils/email.py                           | 10 ++++--
 docs/apache-airflow/howto/email-config.rst       |  7 ++++
 tests/providers/amazon/aws/utils/test_emailer.py | 42 +++++++++++++-----------
 tests/utils/test_email.py                        | 14 ++++++++
 7 files changed, 66 insertions(+), 23 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 6fa38d7..a70854e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1353,6 +1353,14 @@
       example: "/path/to/my_html_content_template_file"
       default: ~
       see_also: ":doc:`Email Configuration </howto/email-config>`"
+    - name: from_email
+      description: |
+        Email address that will be used as sender address.
+        It can either be raw email or the complete address in a format ``Sender Name <se...@email.com>``
+      version_added: 2.3.0
+      type: string
+      example: "Airflow <ai...@example.com>"
+      default: ~
 
 - name: smtp
   description: |
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 7124d95..6a5449b 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -681,6 +681,11 @@ default_email_on_failure = True
 # Example: html_content_template = /path/to/my_html_content_template_file
 # html_content_template =
 
+# Email address that will be used as sender address.
+# It can either be raw email or the complete address in a format ``Sender Name <se...@email.com>``
+# Example: from_email = Airflow <ai...@example.com>
+# from_email =
+
 [smtp]
 
 # If you want airflow to send emails on retries, failure, and you want to use
diff --git a/airflow/providers/amazon/aws/utils/emailer.py b/airflow/providers/amazon/aws/utils/emailer.py
index d098892..fc34835 100644
--- a/airflow/providers/amazon/aws/utils/emailer.py
+++ b/airflow/providers/amazon/aws/utils/emailer.py
@@ -23,6 +23,7 @@ from airflow.providers.amazon.aws.hooks.ses import SESHook
 
 
 def send_email(
+    from_email: str,
     to: Union[List[str], str],
     subject: str,
     html_content: str,
@@ -37,7 +38,7 @@ def send_email(
     """Email backend for SES."""
     hook = SESHook(aws_conn_id=conn_id)
     hook.send_email(
-        mail_from=None,
+        mail_from=from_email,
         to=to,
         subject=subject,
         html_content=html_content,
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index 7d17027..50f2415 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -49,6 +49,8 @@ def send_email(
     """Send email using backend specified in EMAIL_BACKEND."""
     backend = conf.getimport('email', 'EMAIL_BACKEND')
     backend_conn_id = conn_id or conf.get("email", "EMAIL_CONN_ID")
+    from_email = conf.get('email', 'from_email', fallback=None)
+
     to_list = get_email_address_list(to)
     to_comma_separated = ", ".join(to_list)
 
@@ -63,6 +65,7 @@ def send_email(
         mime_subtype=mime_subtype,
         mime_charset=mime_charset,
         conn_id=backend_conn_id,
+        from_email=from_email,
         **kwargs,
     )
 
@@ -78,6 +81,7 @@ def send_email_smtp(
     mime_subtype: str = 'mixed',
     mime_charset: str = 'utf-8',
     conn_id: str = "smtp_default",
+    from_email: str = None,
     **kwargs,
 ):
     """
@@ -87,8 +91,10 @@ def send_email_smtp(
     """
     smtp_mail_from = conf.get('smtp', 'SMTP_MAIL_FROM')
 
+    mail_from = smtp_mail_from or from_email
+
     msg, recipients = build_mime_message(
-        mail_from=smtp_mail_from,
+        mail_from=mail_from,
         to=to,
         subject=subject,
         html_content=html_content,
@@ -99,7 +105,7 @@ def send_email_smtp(
         mime_charset=mime_charset,
     )
 
-    send_mime_email(e_from=smtp_mail_from, e_to=recipients, mime_msg=msg, conn_id=conn_id, dryrun=dryrun)
+    send_mime_email(e_from=mail_from, e_to=recipients, mime_msg=msg, conn_id=conn_id, dryrun=dryrun)
 
 
 def build_mime_message(
diff --git a/docs/apache-airflow/howto/email-config.rst b/docs/apache-airflow/howto/email-config.rst
index 67e26a7..af7b3cf 100644
--- a/docs/apache-airflow/howto/email-config.rst
+++ b/docs/apache-airflow/howto/email-config.rst
@@ -29,6 +29,8 @@ in the ``[email]`` section.
   subject_template = /path/to/my_subject_template_file
   html_content_template = /path/to/my_html_content_template_file
 
+You can configure sender's email address by setting ``from_email`` in the ``[email]`` section.
+
 To configure SMTP settings, checkout the :ref:`SMTP <config:smtp>` section in the standard configuration.
 If you do not want to store the SMTP credentials in the config or in the environment variables, you can create a
 connection called ``smtp_default`` of ``Email`` type, or choose a custom connection name and set the ``email_conn_id`` with it's name in
@@ -91,6 +93,9 @@ or
    name and set it in ``email_conn_id`` of  'Email' type. Only login and password
    are used from the connection.
 
+4. Configure sender's email address and name either by exporting the environment variables ``SENDGRID_MAIL_FROM`` and ``SENDGRID_MAIL_SENDER`` or
+   in your ``airflow.cfg`` by setting ``from_email`` in the ``[email]`` section.
+
 .. _email-configuration-ses:
 
 Send email using AWS SES
@@ -116,3 +121,5 @@ Follow the steps below to enable it:
 
 3. Create a connection called ``aws_default``, or choose a custom connection
    name and set it in ``email_conn_id``. The type of connection should be ``Amazon Web Services``.
+
+4. Configure sender's email address in your ``airflow.cfg`` by setting ``from_email`` in the ``[email]`` section.
diff --git a/tests/providers/amazon/aws/utils/test_emailer.py b/tests/providers/amazon/aws/utils/test_emailer.py
index 3d99573..bcbbd4e 100644
--- a/tests/providers/amazon/aws/utils/test_emailer.py
+++ b/tests/providers/amazon/aws/utils/test_emailer.py
@@ -16,27 +16,29 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
-from unittest import mock
+from unittest import TestCase, mock
 
 from airflow.providers.amazon.aws.utils.emailer import send_email
 
 
-@mock.patch("airflow.providers.amazon.aws.utils.emailer.SESHook")
-def test_send_email(mock_hook):
-    send_email(
-        to="to@test.com",
-        subject="subject",
-        html_content="content",
-    )
-    mock_hook.return_value.send_email.assert_called_once_with(
-        mail_from=None,
-        to="to@test.com",
-        subject="subject",
-        html_content="content",
-        bcc=None,
-        cc=None,
-        files=None,
-        mime_charset="utf-8",
-        mime_subtype="mixed",
-    )
+class TestSendEmailSes(TestCase):
+    @mock.patch("airflow.providers.amazon.aws.utils.emailer.SESHook")
+    def test_send_ses_email(self, mock_hook):
+        send_email(
+            from_email="From Test <fr...@test.com>",
+            to="to@test.com",
+            subject="subject",
+            html_content="content",
+        )
+
+        mock_hook.return_value.send_email.assert_called_once_with(
+            mail_from="From Test <fr...@test.com>",
+            to="to@test.com",
+            subject="subject",
+            html_content="content",
+            bcc=None,
+            cc=None,
+            files=None,
+            mime_charset="utf-8",
+            mime_subtype="mixed",
+        )
diff --git a/tests/utils/test_email.py b/tests/utils/test_email.py
index 28d4328..b458bbd 100644
--- a/tests/utils/test_email.py
+++ b/tests/utils/test_email.py
@@ -99,9 +99,23 @@ class TestEmail(unittest.TestCase):
             mime_charset='utf-8',
             mime_subtype='mixed',
             conn_id='smtp_default',
+            from_email=None,
         )
         assert not mock_send_email.called
 
+    @mock.patch('airflow.utils.email.send_email_smtp')
+    @conf_vars(
+        {
+            ('email', 'email_backend'): 'tests.utils.test_email.send_email_test',
+            ('email', 'from_email'): 'from@test.com',
+        }
+    )
+    def test_custom_backend_sender(self, mock_send_email_smtp):
+        utils.email.send_email('to', 'subject', 'content')
+        _, call_kwargs = send_email_test.call_args
+        assert call_kwargs['from_email'] == 'from@test.com'
+        assert not mock_send_email_smtp.called
+
     def test_build_mime_message(self):
         mail_from = 'from@example.com'
         mail_to = 'to@example.com'

[airflow] 14/23: Fix grammar mistakes (#20341)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2ee63511f05622874ca81a540df354bc8dc379cc
Author: Tanguy <35...@users.noreply.github.com>
AuthorDate: Thu Dec 16 16:47:03 2021 +0100

    Fix grammar mistakes (#20341)
    
    (cherry picked from commit 0163495d7193aee7be86cebcc4116b279d460004)
---
 docs/apache-airflow/concepts/tasks.rst | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/concepts/tasks.rst b/docs/apache-airflow/concepts/tasks.rst
index 23c7713..f6c6619 100644
--- a/docs/apache-airflow/concepts/tasks.rst
+++ b/docs/apache-airflow/concepts/tasks.rst
@@ -47,7 +47,7 @@ Or the more explicit ``set_upstream`` and ``set_downstream`` methods::
 
 These both do exactly the same thing, but in general we recommend you use the bitshift operators, as they are easier to read in most cases.
 
-By default, a Task will run when all of its upstream (parent) tasks have succeeded, but there are many ways of modifying this behaviour to add branching, only wait for some upstream tasks, or change behaviour based on where the current run is in history. For more, see :ref:`concepts:control-flow`.
+By default, a Task will run when all of its upstream (parent) tasks have succeeded, but there are many ways of modifying this behaviour to add branching, to only wait for some upstream tasks, or to change behaviour based on where the current run is in history. For more, see :ref:`concepts:control-flow`.
 
 Tasks don't pass information to each other by default, and run entirely independently. If you want to pass information from one Task to another, you should use :doc:`xcoms`.
 
@@ -153,7 +153,7 @@ If you merely want to be notified if a task runs over but still let it run to co
 SLAs
 ----
 
-An SLA, or a Service Level Agreement, is an expectation for the maximum time a Task should take. If a task takes longer than this to run, then it visible in the "SLA Misses" part of the user interface, as well going out in an email of all tasks that missed their SLA.
+An SLA, or a Service Level Agreement, is an expectation for the maximum time a Task should take. If a task takes longer than this to run, it is then visible in the "SLA Misses" part of the user interface, as well as going out in an email of all tasks that missed their SLA.
 
 Tasks over their SLA are not cancelled, though - they are allowed to run to completion. If you want to cancel a task after a certain runtime is reached, you want :ref:`concepts:timeouts` instead.
 

[airflow] 23/23: Docs: Clarify ``sentry_on`` value is not quoted with example (#20639)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 713a807f0f7b948387945e763cab8a7646fd35a8
Author: Alan Ma <al...@gmail.com>
AuthorDate: Wed Jan 5 00:51:00 2022 -0800

    Docs: Clarify ``sentry_on`` value is not quoted with example (#20639)
    
    Clarify the value for ``sentry_on`` is not quoted by providing an example.
    
    (cherry picked from commit e8b5ab9efe93f826a2b521b5d1cac0404354c3b4)
---
 docs/apache-airflow/logging-monitoring/errors.rst | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/logging-monitoring/errors.rst b/docs/apache-airflow/logging-monitoring/errors.rst
index 7a5df12..9f4256a 100644
--- a/docs/apache-airflow/logging-monitoring/errors.rst
+++ b/docs/apache-airflow/logging-monitoring/errors.rst
@@ -29,13 +29,14 @@ First you must install sentry requirement:
 
    pip install 'apache-airflow[sentry]'
 
-After that, you need to enable the integration by set ``sentry_on`` option in ``[sentry]`` section to ``"True"``.
+After that, you need to enable the integration by set ``sentry_on`` option in ``[sentry]`` section to ``True``.
 
-Add your ``SENTRY_DSN`` to your configuration file e.g. ``airflow.cfg`` in ``[sentry]`` section. Its template resembles the following: ``'{PROTOCOL}://{PUBLIC_KEY}@{HOST}/{PROJECT_ID}'``
+Add your ``SENTRY_DSN`` to your configuration file e.g. ``airflow.cfg`` in ``[sentry]`` section. Its template resembles the following: ``{PROTOCOL}://{PUBLIC_KEY}@{HOST}/{PROJECT_ID}``
 
 .. code-block:: ini
 
     [sentry]
+    sentry_on = True
     sentry_dsn = http://foo@sentry.io/123
 
 .. note::