You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/05 14:47:19 UTC

[airflow] branch v2-3-test updated (679bd6afaf -> 774f8bc67c)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a change to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


    from 679bd6afaf Add cache_ok flag to sqlalchemy TypeDecorators. (#24499)
     new 40f16612f9 Fix StatD timing metric units (#21106)
     new 7b548b0e69 ExternalTaskSensor respects soft_fail if the external task enters a failed_state (#23647)
     new 6ca7f3a317 Fix doc description of [core] parallelism config setting (#23768)
     new 4081a65b77 Remove special serde logic for mapped op_kwargs (#23860)
     new 46d26c76af Drop Python 3.6 compatibility objects/modules (#24048)
     new 4284d03e98 Handle occasional deadlocks in trigger with retries (#24071)
     new 065cd19de4 Unify return_code interface for task runner (#24093)
     new 49fc732db3 scheduleinterval nullable true added in openapi (#24253)
     new d3366fcdde Fix bugs in URI constructor for MySQL connection (#24320)
     new b9fb473eb3 DebugExecutor use ti.run() instead of ti._run_raw_task (#24357)
     new 7a11046ca1 Mask secrets in stdout for 'airflow tasks test' (#24362)
     new 2b9f6d0512 Add missing types to FSHook (#24470)
     new 465834bfbe Fix typo (#24568)
     new 51c58c0c68 Move fallible ti.task.dag assignment back inside try/except block (#24533) (#24592)
     new 691ceab953 Fix timestamp defaults for sensorinstance (#24638)
     new 774f8bc67c Update the release note

The 16 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 RELEASE_NOTES.rst                                  |  15 +++
 airflow/api_connexion/openapi/v1.yaml              |   1 +
 airflow/cli/commands/task_command.py               |  10 +-
 airflow/compat/asyncio.py                          |  28 ------
 airflow/config_templates/config.yml                |   7 +-
 airflow/config_templates/default_airflow.cfg       |   7 +-
 airflow/dag_processing/manager.py                  |   6 +-
 airflow/decorators/base.py                         |  10 +-
 airflow/executors/debug_executor.py                |   2 +-
 airflow/hooks/filesystem.py                        |   4 +-
 airflow/jobs/triggerer_job.py                      |   5 +-
 .../versions/0001_1_5_0_current_schema.py          |   2 +-
 .../0065_2_0_0_update_schema_for_smart_sensor.py   |   4 +-
 .../0103_2_3_0_add_callback_request_table.py       |   2 +-
 airflow/models/connection.py                       |  10 +-
 airflow/models/dag.py                              |   6 +-
 airflow/models/mappedoperator.py                   |   1 +
 airflow/models/sensorinstance.py                   |   4 +-
 airflow/models/taskmixin.py                        |   2 +-
 airflow/models/trigger.py                          |   9 +-
 .../providers/elasticsearch/log/es_task_handler.py |   7 +-
 airflow/sensors/external_task.py                   |  29 +++++-
 airflow/sensors/smart_sensor.py                    |   9 +-
 airflow/serialization/serialized_objects.py        |  29 +-----
 airflow/task/task_runner/base_task_runner.py       |   2 +-
 airflow/task/task_runner/cgroup_task_runner.py     |   3 +-
 airflow/typing_compat.py                           |   9 --
 airflow/utils/log/file_task_handler.py             |  10 +-
 airflow/utils/log/secrets_masker.py                |  42 +++++---
 tests/cli/commands/test_connection_command.py      |   6 +-
 tests/cli/commands/test_task_command.py            |  40 +++++---
 tests/dag_processing/test_manager.py               |   6 +-
 .../dags/{test_missing_owner.py => test_sensor.py} |  20 ++--
 tests/executors/test_debug_executor.py             |   4 +-
 tests/hooks/test_dbapi.py                          | 106 +++++++++++++++++++++
 tests/jobs/test_backfill_job.py                    |  58 +++++++----
 tests/secrets/test_local_filesystem.py             |   4 +-
 tests/sensors/test_external_task_sensor.py         |  57 +++++++++--
 tests/sensors/test_smart_sensor_operator.py        |  23 +++++
 tests/serialization/test_dag_serialization.py      |  29 +++---
 tests/triggers/test_temporal.py                    |   5 +-
 tests/utils/log/test_secrets_masker.py             |  23 ++++-
 42 files changed, 443 insertions(+), 213 deletions(-)
 delete mode 100644 airflow/compat/asyncio.py
 copy tests/dags/{test_missing_owner.py => test_sensor.py} (66%)


[airflow] 02/16: ExternalTaskSensor respects soft_fail if the external task enters a failed_state (#23647)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7b548b0e6942c11288ec4cc30682362aad93d7ba
Author: Andrew Gibbs <gi...@andrew.gibbs.io>
AuthorDate: Fri Jun 24 14:50:13 2022 +0100

    ExternalTaskSensor respects soft_fail if the external task enters a failed_state (#23647)
    
    * Respecting soft_fail in ExternalTaskSensor when the upstream tasks are in the failed state (#19754)
    
    - Changed behaviour of sensor to as above to respect soft_fail
    - Added tests of new soft_fail behaviour (#19754)
    - Added newsfragment and improved sensor docstring
    
    (cherry picked from commit 1b345981f6e8e910b3542ec53829e39e6c9b6dba)
---
 airflow/sensors/external_task.py           | 29 ++++++++++++++-
 newsfragments/23647.bugfix.rst             |  1 +
 tests/sensors/test_external_task_sensor.py | 57 +++++++++++++++++++++++++-----
 3 files changed, 78 insertions(+), 9 deletions(-)

diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 30c27c7214..bd66c8da29 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, Any, Callable, Collection, FrozenSet, Iterable
 
 from sqlalchemy import func
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowSkipException
 from airflow.models.baseoperator import BaseOperatorLink
 from airflow.models.dag import DagModel
 from airflow.models.dagbag import DagBag
@@ -55,6 +55,24 @@ class ExternalTaskSensor(BaseSensorOperator):
     Waits for a different DAG or a task in a different DAG to complete for a
     specific logical date.
 
+    By default the ExternalTaskSensor will wait for the external task to
+    succeed, at which point it will also succeed. However, by default it will
+    *not* fail if the external task fails, but will continue to check the status
+    until the sensor times out (thus giving you time to retry the external task
+    without also having to clear the sensor).
+
+    It is possible to alter the default behavior by setting states which
+    cause the sensor to fail, e.g. by setting ``allowed_states=[State.FAILED]``
+    and ``failed_states=[State.SUCCESS]`` you will flip the behaviour to get a
+    sensor which goes green when the external task *fails* and immediately goes
+    red if the external task *succeeds*!
+
+    Note that ``soft_fail`` is respected when examining the failed_states. Thus
+    if the external task enters a failed state and ``soft_fail == True`` the
+    sensor will _skip_ rather than fail. As a result, setting ``soft_fail=True``
+    and ``failed_states=[State.SKIPPED]`` will result in the sensor skipping if
+    the external task skips.
+
     :param external_dag_id: The dag_id that contains the task you want to
         wait for
     :param external_task_id: The task_id that contains the task you want to
@@ -182,11 +200,20 @@ class ExternalTaskSensor(BaseSensorOperator):
 
         if count_failed == len(dttm_filter):
             if self.external_task_ids:
+                if self.soft_fail:
+                    raise AirflowSkipException(
+                        f'Some of the external tasks {self.external_task_ids} '
+                        f'in DAG {self.external_dag_id} failed. Skipping due to soft_fail.'
+                    )
                 raise AirflowException(
                     f'Some of the external tasks {self.external_task_ids} '
                     f'in DAG {self.external_dag_id} failed.'
                 )
             else:
+                if self.soft_fail:
+                    raise AirflowSkipException(
+                        f'The external DAG {self.external_dag_id} failed. Skipping due to soft_fail.'
+                    )
                 raise AirflowException(f'The external DAG {self.external_dag_id} failed.')
 
         return count_allowed == len(dttm_filter)
diff --git a/newsfragments/23647.bugfix.rst b/newsfragments/23647.bugfix.rst
new file mode 100644
index 0000000000..d12c1d7046
--- /dev/null
+++ b/newsfragments/23647.bugfix.rst
@@ -0,0 +1 @@
+``ExternalTaskSensor`` now supports the ``soft_fail`` flag to skip if external task or DAG enters a failed state.
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 8725d76081..8a2a571160 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -134,6 +134,28 @@ class TestExternalTaskSensor(unittest.TestCase):
                 "unit_test_dag failed."
             )
 
+    def test_external_task_sensor_soft_fail_failed_states_as_skipped(self, session=None):
+        self.test_time_sensor()
+        op = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            allowed_states=[State.FAILED],
+            failed_states=[State.SUCCESS],
+            soft_fail=True,
+            dag=self.dag,
+        )
+
+        # when
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # then
+        session = settings.Session()
+        TI = TaskInstance
+        task_instances: list[TI] = session.query(TI).filter(TI.task_id == op.task_id).all()
+        assert len(task_instances) == 1, "Unexpected number of task instances"
+        assert task_instances[0].state == State.SKIPPED, "Unexpected external task state"
+
     def test_external_task_sensor_external_task_id_param(self):
         """Test external_task_ids is set properly when external_task_id is passed as a template"""
         self.test_time_sensor()
@@ -141,10 +163,7 @@ class TestExternalTaskSensor(unittest.TestCase):
             task_id='test_external_task_sensor_check',
             external_dag_id='{{ params.dag_id }}',
             external_task_id='{{ params.task_id }}',
-            params={
-                'dag_id': TEST_DAG_ID,
-                'task_id': TEST_TASK_ID,
-            },
+            params={'dag_id': TEST_DAG_ID, 'task_id': TEST_TASK_ID},
             dag=self.dag,
         )
 
@@ -162,10 +181,7 @@ class TestExternalTaskSensor(unittest.TestCase):
             task_id='test_external_task_sensor_check',
             external_dag_id='{{ params.dag_id }}',
             external_task_ids=['{{ params.task_id }}'],
-            params={
-                'dag_id': TEST_DAG_ID,
-                'task_id': TEST_TASK_ID,
-            },
+            params={'dag_id': TEST_DAG_ID, 'task_id': TEST_TASK_ID},
             dag=self.dag,
         )
 
@@ -214,6 +230,31 @@ class TestExternalTaskSensor(unittest.TestCase):
         )
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_external_dag_sensor_soft_fail_as_skipped(self):
+        other_dag = DAG('other_dag', default_args=self.args, end_date=DEFAULT_DATE, schedule_interval='@once')
+        other_dag.create_dagrun(
+            run_id='test', start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.SUCCESS
+        )
+        op = ExternalTaskSensor(
+            task_id='test_external_dag_sensor_check',
+            external_dag_id='other_dag',
+            external_task_id=None,
+            allowed_states=[State.FAILED],
+            failed_states=[State.SUCCESS],
+            soft_fail=True,
+            dag=self.dag,
+        )
+
+        # when
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # then
+        session = settings.Session()
+        TI = TaskInstance
+        task_instances: list[TI] = session.query(TI).filter(TI.task_id == op.task_id).all()
+        assert len(task_instances) == 1, "Unexpected number of task instances"
+        assert task_instances[0].state == State.SKIPPED, "Unexpected external task state"
+
     def test_external_task_sensor_fn_multiple_execution_dates(self):
         bash_command_code = """
 {% set s=logical_date.time().second %}


[airflow] 03/16: Fix doc description of [core] parallelism config setting (#23768)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6ca7f3a3178e99c7c2742a8d32430b4899a7d3fa
Author: Chris Redekop <32...@users.noreply.github.com>
AuthorDate: Wed May 18 01:08:33 2022 -0600

    Fix doc description of [core] parallelism config setting (#23768)
    
    (cherry picked from commit 23037503b51cd8ea2acd2214116fc1693eb98cb4)
---
 airflow/config_templates/config.yml          | 7 ++++---
 airflow/config_templates/default_airflow.cfg | 7 ++++---
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 9da14d48aa..f7409373c2 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -62,9 +62,10 @@
       default: "SequentialExecutor"
     - name: parallelism
       description: |
-        This defines the maximum number of task instances that can run concurrently in Airflow
-        regardless of scheduler count and worker count. Generally, this value is reflective of
-        the number of task instances with the running state in the metadata database.
+        This defines the maximum number of task instances that can run concurrently per scheduler in
+        Airflow, regardless of the worker count. Generally this value, multiplied by the number of
+        schedulers in your cluster, is the maximum number of task instances with the running
+        state in the metadata database.
       version_added: ~
       type: string
       example: ~
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index f58b688a6b..6591f82dc0 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -53,9 +53,10 @@ default_timezone = utc
 # full import path to the class when using a custom executor.
 executor = SequentialExecutor
 
-# This defines the maximum number of task instances that can run concurrently in Airflow
-# regardless of scheduler count and worker count. Generally, this value is reflective of
-# the number of task instances with the running state in the metadata database.
+# This defines the maximum number of task instances that can run concurrently per scheduler in
+# Airflow, regardless of the worker count. Generally this value, multiplied by the number of
+# schedulers in your cluster, is the maximum number of task instances with the running
+# state in the metadata database.
 parallelism = 32
 
 # The maximum number of task instances allowed to run concurrently in each DAG. To calculate


[airflow] 16/16: Update the release note

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 774f8bc67caff858b60d2d567f62aeef9132eb66
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jul 5 15:44:34 2022 +0100

    Update the release note
---
 RELEASE_NOTES.rst              | 15 +++++++++++++++
 newsfragments/23647.bugfix.rst |  1 -
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst
index ed9ea874af..2598d8264f 100644
--- a/RELEASE_NOTES.rst
+++ b/RELEASE_NOTES.rst
@@ -64,6 +64,19 @@ Here is the list of breaking changes in dependencies that comes together with FA
 Bug Fixes
 ^^^^^^^^^
 
+- Fix timestamp defaults for ``sensorinstance`` (#24638)
+- Move fallible ``ti.task.dag`` assignment back inside ``try/except`` block (#24533) (#24592)
+- Add missing types to ``FSHook`` (#24470)
+- Mask secrets in ``stdout`` for ``airflow tasks test`` (#24362)
+- ``DebugExecutor`` use ``ti.run()`` instead of ``ti._run_raw_task`` (#24357)
+- Fix bugs in ``URI`` constructor for ``MySQL`` connection (#24320)
+- Missing ``scheduleinterval`` nullable true added in ``openapi`` (#24253)
+- Unify ``return_code`` interface for task runner (#24093)
+- Handle occasional deadlocks in trigger with retries (#24071)
+- Remove special serde logic for mapped ``op_kwargs`` (#23860)
+- ``ExternalTaskSensor`` respects ``soft_fail`` if the external task enters a ``failed_state`` (#23647)
+- Fix ``StatD`` timing metric units (#21106)
+- Add ``cache_ok`` flag to sqlalchemy TypeDecorators. (#24499)
 - Allow for ``LOGGING_LEVEL=DEBUG`` (#23360)
 - Fix grid date ticks (#24738)
 - Debounce status highlighting in Grid view (#24710)
@@ -121,6 +134,8 @@ Doc only changes
 Misc/Internal
 ^^^^^^^^^^^^^
 
+- Drop Python 3.6 compatibility objects/modules (#24048)
+- Remove upper-binding for SQLAlchemy (#24819)
 - Remove internet explorer support (#24495)
 - Removing magic status code numbers from ``api_connexion`` (#24050)
 - Upgrade FAB to ``4.1.2`` (#24619)
diff --git a/newsfragments/23647.bugfix.rst b/newsfragments/23647.bugfix.rst
deleted file mode 100644
index d12c1d7046..0000000000
--- a/newsfragments/23647.bugfix.rst
+++ /dev/null
@@ -1 +0,0 @@
-``ExternalTaskSensor`` now supports the ``soft_fail`` flag to skip if external task or DAG enters a failed state.


[airflow] 04/16: Remove special serde logic for mapped op_kwargs (#23860)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4081a65b77c5105d2b3d892d5f25adc3bdb3b450
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Wed Jun 22 15:48:50 2022 +0800

    Remove special serde logic for mapped op_kwargs (#23860)
    
    Co-authored-by: Daniel Standish <15...@users.noreply.github.com>
    (cherry picked from commit 5877f45d65d5aa864941efebd2040661b6f89cb1)
---
 airflow/decorators/base.py                    | 10 +--------
 airflow/models/mappedoperator.py              |  1 +
 airflow/serialization/serialized_objects.py   | 29 +++++----------------------
 tests/serialization/test_dag_serialization.py | 29 +++++++++++++++------------
 4 files changed, 23 insertions(+), 46 deletions(-)

diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 1b14cd0668..92cf0691e4 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -39,7 +39,7 @@ from typing import (
 import attr
 import typing_extensions
 
-from airflow.compat.functools import cache, cached_property
+from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
 from airflow.models.abstractoperator import DEFAULT_RETRIES, DEFAULT_RETRY_DELAY
 from airflow.models.baseoperator import (
@@ -420,14 +420,6 @@ class DecoratedMappedOperator(MappedOperator):
     def __hash__(self):
         return id(self)
 
-    @classmethod
-    @cache
-    def get_serialized_fields(cls):
-        # The magic super() doesn't work here, so we use the explicit form.
-        # Not using super(..., cls) to work around pyupgrade bug.
-        sup = super(DecoratedMappedOperator, DecoratedMappedOperator)
-        return sup.get_serialized_fields() | {"mapped_op_kwargs"}
-
     def __attrs_post_init__(self):
         # The magic super() doesn't work here, so we use the explicit form.
         # Not using super(..., self) to work around pyupgrade bug.
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 663ceeece1..6b202d2cc6 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -323,6 +323,7 @@ class MappedOperator(AbstractOperator):
             "dag",
             "deps",
             "is_mapped",
+            "mapped_kwargs",  # This is needed to be able to accept XComArg.
             "subdag",
             "task_group",
             "upstream_task_ids",
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 3e674b2f8d..dd3dc4404e 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -16,7 +16,7 @@
 # under the License.
 
 """Serialized DAG and BaseOperator"""
-import contextlib
+
 import datetime
 import enum
 import logging
@@ -592,6 +592,9 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
     def serialize_mapped_operator(cls, op: MappedOperator) -> Dict[str, Any]:
         serialized_op = cls._serialize_node(op, include_deps=op.deps is MappedOperator.deps_for(BaseOperator))
 
+        # Handle mapped_kwargs and mapped_op_kwargs.
+        serialized_op[op._expansion_kwargs_attr] = cls._serialize(op._get_expansion_kwargs())
+
         # Simplify partial_kwargs by comparing it to the most barebone object.
         # Remove all entries that are simply default values.
         serialized_partial = serialized_op["partial_kwargs"]
@@ -603,20 +606,6 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
             if v == default:
                 del serialized_partial[k]
 
-        # Simplify op_kwargs format. It must be a dict, so we flatten it.
-        with contextlib.suppress(KeyError):
-            op_kwargs = serialized_op["mapped_kwargs"]["op_kwargs"]
-            assert op_kwargs[Encoding.TYPE] == DAT.DICT
-            serialized_op["mapped_kwargs"]["op_kwargs"] = op_kwargs[Encoding.VAR]
-        with contextlib.suppress(KeyError):
-            op_kwargs = serialized_op["partial_kwargs"]["op_kwargs"]
-            assert op_kwargs[Encoding.TYPE] == DAT.DICT
-            serialized_op["partial_kwargs"]["op_kwargs"] = op_kwargs[Encoding.VAR]
-        with contextlib.suppress(KeyError):
-            op_kwargs = serialized_op["mapped_op_kwargs"]
-            assert op_kwargs[Encoding.TYPE] == DAT.DICT
-            serialized_op["mapped_op_kwargs"] = op_kwargs[Encoding.VAR]
-
         serialized_op["_is_mapped"] = True
         return serialized_op
 
@@ -752,15 +741,7 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
                 v = cls._deserialize_deps(v)
             elif k == "params":
                 v = cls._deserialize_params_dict(v)
-            elif k in ("mapped_kwargs", "partial_kwargs"):
-                if "op_kwargs" not in v:
-                    op_kwargs: Optional[dict] = None
-                else:
-                    op_kwargs = {arg: cls._deserialize(value) for arg, value in v.pop("op_kwargs").items()}
-                v = {arg: cls._deserialize(value) for arg, value in v.items()}
-                if op_kwargs is not None:
-                    v["op_kwargs"] = op_kwargs
-            elif k == "mapped_op_kwargs":
+            elif k == "partial_kwargs":
                 v = {arg: cls._deserialize(value) for arg, value in v.items()}
             elif k in cls._decorated_fields or k not in op.get_serialized_fields():
                 v = cls._deserialize(v)
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index fe9fc7c7e5..7d6a43e933 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -1688,18 +1688,13 @@ def test_mapped_operator_serde():
         '_task_type': 'BashOperator',
         'downstream_task_ids': [],
         'mapped_kwargs': {
-            'bash_command': [
-                1,
-                2,
-                {"__type": "dict", "__var": {'a': 'b'}},
-            ]
+            "__type": "dict",
+            "__var": {'bash_command': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
         },
         'partial_kwargs': {
             'executor_config': {
                 '__type': 'dict',
-                '__var': {
-                    'dict': {"__type": "dict", "__var": {'sub': 'value'}},
-                },
+                '__var': {'dict': {"__type": "dict", "__var": {'sub': 'value'}}},
             },
         },
         'task_id': 'a',
@@ -1744,7 +1739,10 @@ def test_mapped_operator_xcomarg_serde():
         '_task_module': 'tests.test_utils.mock_operators',
         '_task_type': 'MockOperator',
         'downstream_task_ids': [],
-        'mapped_kwargs': {'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'return_value'}}},
+        'mapped_kwargs': {
+            "__type": "dict",
+            "__var": {'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'return_value'}}},
+        },
         'partial_kwargs': {},
         'task_id': 'task_2',
         'template_fields': ['arg1', 'arg2'],
@@ -1825,13 +1823,18 @@ def test_mapped_decorator_serde():
         'downstream_task_ids': [],
         'partial_kwargs': {
             'op_args': [],
-            'op_kwargs': {'arg1': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
+            'op_kwargs': {
+                '__type': 'dict',
+                '__var': {'arg1': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
+            },
             'retry_delay': {'__type': 'timedelta', '__var': 30.0},
         },
-        'mapped_kwargs': {},
         'mapped_op_kwargs': {
-            'arg2': {"__type": "dict", "__var": {'a': 1, 'b': 2}},
-            'arg3': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'return_value'}},
+            "__type": "dict",
+            "__var": {
+                'arg2': {"__type": "dict", "__var": {'a': 1, 'b': 2}},
+                'arg3': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'return_value'}},
+            },
         },
         'operator_extra_links': [],
         'ui_color': '#ffefeb',


[airflow] 01/16: Fix StatD timing metric units (#21106)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 40f16612f9f7a8bbac609397abf236895dba58c2
Author: viktorvia <86...@users.noreply.github.com>
AuthorDate: Wed Jun 1 07:40:36 2022 +0300

    Fix StatD timing metric units (#21106)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    Co-authored-by: Tzu-ping Chung <tp...@astronomer.io>
    (cherry picked from commit 1507ca48d7c211799129ce7956c11f4c45fee5bc)
---
 airflow/dag_processing/manager.py           |  6 +++---
 airflow/sensors/smart_sensor.py             |  9 +++++----
 tests/dag_processing/test_manager.py        |  6 ++++--
 tests/sensors/test_smart_sensor_operator.py | 23 +++++++++++++++++++++++
 4 files changed, 35 insertions(+), 9 deletions(-)

diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 0f900c63f0..cbbc2bfdaf 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -74,7 +74,7 @@ class DagFileStat(NamedTuple):
     num_dags: int
     import_errors: int
     last_finish_time: Optional[datetime]
-    last_duration: Optional[float]
+    last_duration: Optional[timedelta]
     run_count: int
 
 
@@ -834,7 +834,7 @@ class DagFileProcessorManager(LoggingMixin):
         :rtype: float
         """
         stat = self._file_stats.get(file_path)
-        return stat.last_duration if stat else None
+        return stat.last_duration.total_seconds() if stat and stat.last_duration else None
 
     def get_last_dag_count(self, file_path):
         """
@@ -927,7 +927,7 @@ class DagFileProcessorManager(LoggingMixin):
             count_import_errors = -1
             num_dags = 0
 
-        last_duration = (last_finish_time - processor.start_time).total_seconds()
+        last_duration = last_finish_time - processor.start_time
         stat = DagFileStat(
             num_dags=num_dags,
             import_errors=count_import_errors,
diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py
index a45eb10d1c..bc22ab9c54 100644
--- a/airflow/sensors/smart_sensor.py
+++ b/airflow/sensors/smart_sensor.py
@@ -738,16 +738,17 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
             for sensor_work in self.sensor_works:
                 self._execute_sensor_work(sensor_work)
 
-            duration = (timezone.utcnow() - poke_start_time).total_seconds()
+            duration = timezone.utcnow() - poke_start_time
+            duration_seconds = duration.total_seconds()
 
-            self.log.info("Taking %s to execute %s tasks.", duration, len(self.sensor_works))
+            self.log.info("Taking %s seconds to execute %s tasks.", duration_seconds, len(self.sensor_works))
 
             Stats.timing("smart_sensor_operator.loop_duration", duration)
             Stats.gauge("smart_sensor_operator.executed_tasks", len(self.sensor_works))
             self._emit_loop_stats()
 
-            if duration < self.poke_interval:
-                sleep(self.poke_interval - duration)
+            if duration_seconds < self.poke_interval:
+                sleep(self.poke_interval - duration_seconds)
             if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
                 self.log.info("Time is out for smart sensor.")
                 return
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 6a65116d51..ed1b194a7b 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -426,7 +426,7 @@ class TestDagFileProcessorManager:
         # let's say the DAG was just parsed 2 seconds before the Freezed time
         last_finish_time = freezed_base_time - timedelta(seconds=10)
         manager._file_stats = {
-            "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1),
+            "file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1),
         }
         with freeze_time(freezed_base_time):
             manager.set_file_paths(dag_files)
@@ -695,7 +695,9 @@ class TestDagFileProcessorManager:
         child_pipe.close()
         parent_pipe.close()
 
-        statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime)
+        statsd_timing_mock.assert_called_with(
+            'dag_processing.last_duration.temp_dag', timedelta(seconds=last_runtime)
+        )
 
     def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
         """Test DagFileProcessorManager._refresh_dag_dir method"""
diff --git a/tests/sensors/test_smart_sensor_operator.py b/tests/sensors/test_smart_sensor_operator.py
index 7c875a06f4..22c03918ca 100644
--- a/tests/sensors/test_smart_sensor_operator.py
+++ b/tests/sensors/test_smart_sensor_operator.py
@@ -20,6 +20,7 @@ import logging
 import os
 import time
 import unittest
+from unittest import mock
 from unittest.mock import Mock
 
 from freezegun import freeze_time
@@ -310,3 +311,25 @@ class SmartSensorTest(unittest.TestCase):
         assert sensor_instance is not None
         assert sensor_instance.state == State.SENSING
         assert sensor_instance.operator == "DummySensor"
+
+    @mock.patch('airflow.sensors.smart_sensor.Stats.timing')
+    @mock.patch('airflow.sensors.smart_sensor.timezone.utcnow')
+    def test_send_sensor_timing(self, timezone_utcnow_mock, statsd_timing_mock):
+        initial_time = timezone.datetime(2022, 1, 5, 0, 0, 0)
+        timezone_utcnow_mock.return_value = initial_time
+        self._make_sensor_dag_run()
+        smart = self._make_smart_operator(0)
+        smart.timeout = 0
+        duration = datetime.timedelta(seconds=3)
+        timezone_utcnow_mock.side_effect = [
+            # started_at
+            initial_time,
+            # poke_start_time
+            initial_time,
+            # duration
+            initial_time + duration,
+            # timeout check
+            initial_time + duration,
+        ]
+        smart.execute(None)
+        statsd_timing_mock.assert_called_with('smart_sensor_operator.loop_duration', duration)


[airflow] 11/16: Mask secrets in stdout for 'airflow tasks test' (#24362)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7a11046ca1efff9292ff9ea67ad03655bf316990
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Mon Jun 13 04:51:07 2022 +0800

    Mask secrets in stdout for 'airflow tasks test' (#24362)
    
    A stdout redirector is implemented to mask all values to stdout and
    redact any secrets in it with the secrets masker. This redirector is
    applied to the 'airflow.task' logger.
    
    Co-authored-by: Alex Kennedy <al...@astronomer.io>
    (cherry picked from commit 3007159c2468f8e74476cc17573e03655ab168fa)
---
 airflow/cli/commands/task_command.py    | 10 +++++----
 airflow/utils/log/secrets_masker.py     | 38 ++++++++++++++++++++++---------
 tests/cli/commands/test_task_command.py | 40 ++++++++++++++++++++++-----------
 tests/utils/log/test_secrets_masker.py  | 23 ++++++++++++++++++-
 4 files changed, 83 insertions(+), 28 deletions(-)

diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 2b743b91fe..a789779342 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -52,6 +52,7 @@ from airflow.utils.cli import (
 )
 from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
+from airflow.utils.log.secrets_masker import RedactedIO
 from airflow.utils.net import get_hostname
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import DagRunState
@@ -539,10 +540,11 @@ def task_test(args, dag=None):
     ti, dr_created = _get_ti(task, args.execution_date_or_run_id, args.map_index, create_if_necessary="db")
 
     try:
-        if args.dry_run:
-            ti.dry_run()
-        else:
-            ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
+        with redirect_stdout(RedactedIO()):
+            if args.dry_run:
+                ti.dry_run()
+            else:
+                ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
     except Exception:
         if args.post_mortem:
             debugger = _guess_debugger()
diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py
index de038be48b..bde5141719 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -18,18 +18,17 @@
 import collections
 import logging
 import re
-from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Union
+import sys
+from typing import Any, Dict, Iterable, List, Optional, Set, TextIO, Tuple, TypeVar, Union
 
 from airflow import settings
 from airflow.compat.functools import cache, cached_property
 
-if TYPE_CHECKING:
-    RedactableItem = Union[str, Dict[Any, Any], Tuple[Any, ...], List[Any]]
-
+Redactable = TypeVar("Redactable", str, Dict[Any, Any], Tuple[Any, ...], List[Any])
+Redacted = Union[Redactable, str]
 
 log = logging.getLogger(__name__)
 
-
 DEFAULT_SENSITIVE_FIELDS = frozenset(
     {
         'access_token',
@@ -91,14 +90,13 @@ def mask_secret(secret: Union[str, dict, Iterable], name: Optional[str] = None)
     _secrets_masker().add_mask(secret, name)
 
 
-def redact(value: "RedactableItem", name: Optional[str] = None) -> "RedactableItem":
+def redact(value: Redactable, name: Optional[str] = None) -> Redacted:
     """Redact any secrets found in ``value``."""
     return _secrets_masker().redact(value, name)
 
 
 @cache
 def _secrets_masker() -> "SecretsMasker":
-
     for flt in logging.getLogger('airflow.task').filters:
         if isinstance(flt, SecretsMasker):
             return flt
@@ -177,7 +175,7 @@ class SecretsMasker(logging.Filter):
 
         return True
 
-    def _redact_all(self, item: "RedactableItem", depth: int) -> "RedactableItem":
+    def _redact_all(self, item: Redactable, depth: int) -> Redacted:
         if depth > self.MAX_RECURSION_DEPTH or isinstance(item, str):
             return '***'
         if isinstance(item, dict):
@@ -190,7 +188,7 @@ class SecretsMasker(logging.Filter):
         else:
             return item
 
-    def _redact(self, item: "RedactableItem", name: Optional[str], depth: int) -> "RedactableItem":
+    def _redact(self, item: Redactable, name: Optional[str], depth: int) -> Redacted:
         # Avoid spending too much effort on redacting on deeply nested
         # structures. This also avoid infinite recursion if a structure has
         # reference to self.
@@ -231,7 +229,7 @@ class SecretsMasker(logging.Filter):
             )
             return item
 
-    def redact(self, item: "RedactableItem", name: Optional[str] = None) -> "RedactableItem":
+    def redact(self, item: Redactable, name: Optional[str] = None) -> Redacted:
         """Redact an any secrets found in ``item``, if it is a string.
 
         If ``name`` is given, and it's a "sensitive" name (see
@@ -258,3 +256,23 @@ class SecretsMasker(logging.Filter):
         elif isinstance(secret, collections.abc.Iterable):
             for v in secret:
                 self.add_mask(v, name)
+
+
+class RedactedIO(TextIO):
+    """IO class that redacts values going into stdout.
+
+    Expected usage::
+
+        with contextlib.redirect_stdout(RedactedIO()):
+            ...  # Writes to stdout will be redacted.
+    """
+
+    def __init__(self):
+        self.target = sys.stdout
+
+    def write(self, s: str) -> int:
+        s = redact(s)
+        return self.target.write(s)
+
+    def flush(self) -> None:
+        return self.target.flush()
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index deaf474200..8df5a17492 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -57,7 +57,7 @@ def reset(dag_id):
 
 
 # TODO: Check if tests needs side effects - locally there's missing DAG
-class TestCliTasks(unittest.TestCase):
+class TestCliTasks:
     run_id = 'TEST_RUN_ID'
     dag_id = 'example_python_operator'
     parser: ArgumentParser
@@ -66,7 +66,7 @@ class TestCliTasks(unittest.TestCase):
     dag_run: DagRun
 
     @classmethod
-    def setUpClass(cls):
+    def setup_class(cls):
         cls.dagbag = DagBag(include_examples=True)
         cls.parser = cli_parser.get_parser()
         clear_db_runs()
@@ -77,7 +77,7 @@ class TestCliTasks(unittest.TestCase):
         )
 
     @classmethod
-    def tearDownClass(cls) -> None:
+    def teardown_class(cls) -> None:
         clear_db_runs()
 
     def test_cli_list_tasks(self):
@@ -102,20 +102,34 @@ class TestCliTasks(unittest.TestCase):
         assert "'example_python_operator__print_the_context__20180101'" in stdout.getvalue()
 
     @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
-    def test_test_with_existing_dag_run(self):
+    def test_test_with_existing_dag_run(self, caplog):
         """Test the `airflow test` command"""
         task_id = 'print_the_context'
-
         args = self.parser.parse_args(["tasks", "test", self.dag_id, task_id, DEFAULT_DATE.isoformat()])
+        with caplog.at_level("INFO", logger="airflow.task"):
+            task_command.task_test(args)
+        assert f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}" in caplog.text
+
+    @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
+    def test_test_filters_secrets(self, capsys):
+        """Test ``airflow test`` does not print secrets to stdout.
+
+        Output should be filtered by SecretsMasker.
+        """
+        password = "somepassword1234!"
+        logging.getLogger("airflow.task").filters[0].add_mask(password)
+        args = self.parser.parse_args(
+            ["tasks", "test", "example_python_operator", "print_the_context", "2018-01-01"],
+        )
 
-        with self.assertLogs('airflow.task', level='INFO') as cm:
+        with mock.patch("airflow.models.TaskInstance.run", new=lambda *_, **__: print(password)):
             task_command.task_test(args)
-            assert any(
-                [
-                    f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}" in log
-                    for log in cm.output
-                ]
-            )
+        assert capsys.readouterr().out.endswith("***\n")
+
+        not_password = "!4321drowssapemos"
+        with mock.patch("airflow.models.TaskInstance.run", new=lambda *_, **__: print(not_password)):
+            task_command.task_test(args)
+        assert capsys.readouterr().out.endswith(f"{not_password}\n")
 
     @mock.patch("airflow.cli.commands.task_command.LocalTaskJob")
     def test_run_with_existing_dag_run_id(self, mock_local_job):
@@ -164,7 +178,7 @@ class TestCliTasks(unittest.TestCase):
             task0_id,
             run_id,
         ]
-        with self.assertRaises(DagRunNotFound):
+        with pytest.raises(DagRunNotFound):
             task_command.task_run(self.parser.parse_args(args0), dag=dag)
 
     def test_cli_test_with_params(self):
diff --git a/tests/utils/log/test_secrets_masker.py b/tests/utils/log/test_secrets_masker.py
index edac9f49b7..ee0bc2eb98 100644
--- a/tests/utils/log/test_secrets_masker.py
+++ b/tests/utils/log/test_secrets_masker.py
@@ -14,6 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import contextlib
 import inspect
 import logging
 import logging.config
@@ -23,7 +24,7 @@ import textwrap
 import pytest
 
 from airflow import settings
-from airflow.utils.log.secrets_masker import SecretsMasker, should_hide_value_for_key
+from airflow.utils.log.secrets_masker import RedactedIO, SecretsMasker, should_hide_value_for_key
 from tests.test_utils.config import conf_vars
 
 settings.MASK_SECRETS_IN_LOGS = True
@@ -341,3 +342,23 @@ class ShortExcFormatter(logging.Formatter):
 def lineno():
     """Returns the current line number in our program."""
     return inspect.currentframe().f_back.f_lineno
+
+
+class TestRedactedIO:
+    def test_redacts_from_print(self, capsys):
+        # Without redacting, password is printed.
+        print(p)
+        stdout = capsys.readouterr().out
+        assert stdout == f"{p}\n"
+        assert "***" not in stdout
+
+        # With context manager, password is redacted.
+        with contextlib.redirect_stdout(RedactedIO()):
+            print(p)
+        stdout = capsys.readouterr().out
+        assert stdout == "***\n"
+
+    def test_write(self, capsys):
+        RedactedIO().write(p)
+        stdout = capsys.readouterr().out
+        assert stdout == "***"


[airflow] 10/16: DebugExecutor use ti.run() instead of ti._run_raw_task (#24357)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b9fb473eb3eb1dffe76e07dd9e96ca72688cb1e5
Author: Niko <on...@amazon.com>
AuthorDate: Mon Jun 13 00:02:56 2022 -0700

    DebugExecutor use ti.run() instead of ti._run_raw_task (#24357)
    
    The DebugExecutor previously executed tasks by calling the "private"
    ti._run_raw_task(...) method instead of ti.run(...). But the latter
    contains the logic to increase task instance try_numbers when running,
    thus tasks executed with the DebugExecutor were never getting their
    try_numbers increased and for rescheduled tasks this led to off-by-one
    errors (as the logic to reduce the try_number for the reschedule was
    still working while the increase was not).
    
    (cherry picked from commit da7b22be2986fc3217ac4d7fa6c3831d87ccff87)
---
 airflow/executors/debug_executor.py    |  2 +-
 tests/dags/test_sensor.py              | 33 +++++++++++++++++++
 tests/executors/test_debug_executor.py |  4 +--
 tests/jobs/test_backfill_job.py        | 58 ++++++++++++++++++++++------------
 4 files changed, 73 insertions(+), 24 deletions(-)

diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py
index 865186dd18..6c8a13b345 100644
--- a/airflow/executors/debug_executor.py
+++ b/airflow/executors/debug_executor.py
@@ -76,7 +76,7 @@ class DebugExecutor(BaseExecutor):
         key = ti.key
         try:
             params = self.tasks_params.pop(ti.key, {})
-            ti._run_raw_task(job_id=ti.job_id, **params)
+            ti.run(job_id=ti.job_id, **params)
             self.change_state(key, State.SUCCESS)
             ti._run_finished_callback()
             return True
diff --git a/tests/dags/test_sensor.py b/tests/dags/test_sensor.py
new file mode 100644
index 0000000000..760300ef00
--- /dev/null
+++ b/tests/dags/test_sensor.py
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.sensors.date_time import DateTimeSensor
+from airflow.utils import timezone
+
+with DAG(
+    dag_id='test_sensor', start_date=datetime.datetime(2022, 1, 1), catchup=False, schedule_interval='@once'
+) as dag:
+
+    @task
+    def get_date():
+        return str(timezone.utcnow() + datetime.timedelta(seconds=3))
+
+    DateTimeSensor(task_id='dts', target_time=str(get_date()), poke_interval=1, mode='reschedule')
diff --git a/tests/executors/test_debug_executor.py b/tests/executors/test_debug_executor.py
index dbce6aca40..371fbc5213 100644
--- a/tests/executors/test_debug_executor.py
+++ b/tests/executors/test_debug_executor.py
@@ -49,7 +49,7 @@ class TestDebugExecutor:
         succeeded = executor._run_task(task_instance_mock)
 
         assert succeeded
-        task_instance_mock._run_raw_task.assert_called_once_with(job_id=job_id)
+        task_instance_mock.run.assert_called_once_with(job_id=job_id)
 
     def test_queue_task_instance(self):
         key = "ti_key"
@@ -100,7 +100,7 @@ class TestDebugExecutor:
         ti1 = MagicMock(key="t1")
         ti2 = MagicMock(key="t2")
 
-        ti1._run_raw_task.side_effect = Exception
+        ti1.run.side_effect = Exception
 
         executor.tasks_to_run = [ti1, ti2]
 
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index e5ab189f35..5222f9cdbe 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1309,10 +1309,18 @@ class TestBackfillJob:
 
         ti_status = BackfillJob._DagRunTaskStatus()
 
-        # test for success
-        ti.set_state(State.SUCCESS, session)
-        ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status, session=session)
+        # Test for success
+        # The in-memory task key in ti_status.running contains a try_number
+        # that is always one behind the DB. The _update_counters method however uses
+        # a reduced_key to handle this. To test this, we mark the task as running in-memory
+        # and then increase the try number as it would be before the raw task is executed.
+        # When updating the counters the reduced_key will be used which will match what's
+        # in the in-memory ti_status.running map. This is the same for skipped, failed
+        # and retry states.
+        ti_status.running[ti.key] = ti  # Task is queued and marked as running
+        ti._try_number += 1  # Try number is increased during ti.run()
+        ti.set_state(State.SUCCESS, session)  # Task finishes with success state
+        job._update_counters(ti_status=ti_status, session=session)  # Update counters
         assert len(ti_status.running) == 0
         assert len(ti_status.succeeded) == 1
         assert len(ti_status.skipped) == 0
@@ -1321,9 +1329,10 @@ class TestBackfillJob:
 
         ti_status.succeeded.clear()
 
-        # test for skipped
-        ti.set_state(State.SKIPPED, session)
+        # Test for skipped
         ti_status.running[ti.key] = ti
+        ti._try_number += 1
+        ti.set_state(State.SKIPPED, session)
         job._update_counters(ti_status=ti_status, session=session)
         assert len(ti_status.running) == 0
         assert len(ti_status.succeeded) == 0
@@ -1333,9 +1342,10 @@ class TestBackfillJob:
 
         ti_status.skipped.clear()
 
-        # test for failed
-        ti.set_state(State.FAILED, session)
+        # Test for failed
         ti_status.running[ti.key] = ti
+        ti._try_number += 1
+        ti.set_state(State.FAILED, session)
         job._update_counters(ti_status=ti_status, session=session)
         assert len(ti_status.running) == 0
         assert len(ti_status.succeeded) == 0
@@ -1345,9 +1355,10 @@ class TestBackfillJob:
 
         ti_status.failed.clear()
 
-        # test for retry
-        ti.set_state(State.UP_FOR_RETRY, session)
+        # Test for retry
         ti_status.running[ti.key] = ti
+        ti._try_number += 1
+        ti.set_state(State.UP_FOR_RETRY, session)
         job._update_counters(ti_status=ti_status, session=session)
         assert len(ti_status.running) == 0
         assert len(ti_status.succeeded) == 0
@@ -1357,13 +1368,18 @@ class TestBackfillJob:
 
         ti_status.to_run.clear()
 
-        # test for reschedule
-        # For rescheduled state, tests that reduced_key is not
-        # used by upping try_number.
-        ti._try_number = 2
-        ti.set_state(State.UP_FOR_RESCHEDULE, session)
-        assert ti.try_number == 3  # see ti.try_number property in taskinstance module
-        ti_status.running[ti.key] = ti
+        # Test for reschedule
+        # Logic in taskinstance reduces the try number for a task that's been
+        # rescheduled (which makes sense because it's the _same_ try, but it's
+        # just being rescheduled to a later time). This now makes the in-memory
+        # and DB representation of the task try_number the _same_, which is unlike
+        # the above cases. But this is okay because the reduced_key is NOT used for
+        # the rescheduled case in _update_counters, for this exact reason.
+        ti_status.running[ti.key] = ti  # Task queued and marked as running
+        # Note: Both the increase and decrease are kept here for context
+        ti._try_number += 1  # Try number is increased during ti.run()
+        ti._try_number -= 1  # Task is being rescheduled, decrement try_number
+        ti.set_state(State.UP_FOR_RESCHEDULE, session)  # Task finishes with reschedule state
         job._update_counters(ti_status=ti_status, session=session)
         assert len(ti_status.running) == 0
         assert len(ti_status.succeeded) == 0
@@ -1580,10 +1596,10 @@ class TestBackfillJob:
 
     @pytest.mark.long_running
     @pytest.mark.parametrize("executor_name", ["SequentialExecutor", "DebugExecutor"])
-    @pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow"])
-    def test_mapped_dag(self, dag_id, executor_name, session):
+    @pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow", "test_sensor"])
+    def test_backfilling_dags(self, dag_id, executor_name, session):
         """
-        End-to-end test of a simple mapped dag.
+        End-to-end test for backfilling dags with various executors.
 
         We test with multiple executors as they have different "execution environments" -- for instance
         DebugExecutor runs a lot more in the same process than other Executors.
@@ -1595,7 +1611,7 @@ class TestBackfillJob:
         self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py'))
         dag = self.dagbag.get_dag(dag_id)
 
-        when = datetime.datetime(2022, 1, 1)
+        when = timezone.datetime(2022, 1, 1)
 
         job = BackfillJob(
             dag=dag,


[airflow] 05/16: Drop Python 3.6 compatibility objects/modules (#24048)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 46d26c76af43f0e5acfe299176c906414f69bcbd
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Wed Jun 1 08:15:56 2022 +0300

    Drop Python 3.6 compatibility objects/modules (#24048)
    
    (cherry picked from commit 1dccaad46b901189c1928cef8419f1ea1160d550)
---
 airflow/compat/asyncio.py           | 28 ----------------------------
 airflow/jobs/triggerer_job.py       |  5 ++---
 airflow/models/dag.py               |  6 +++---
 airflow/typing_compat.py            |  9 ---------
 airflow/utils/log/secrets_masker.py |  4 +---
 tests/triggers/test_temporal.py     |  5 ++---
 6 files changed, 8 insertions(+), 49 deletions(-)

diff --git a/airflow/compat/asyncio.py b/airflow/compat/asyncio.py
deleted file mode 100644
index a999ce604d..0000000000
--- a/airflow/compat/asyncio.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-try:
-    from asyncio import create_task
-except ImportError:
-    # create_task is not present in Python 3.6. Once Airflow is at 3.7+, we can
-    # remove this helper.
-    def create_task(*args, **kwargs):  # type: ignore
-        """A version of create_task that always errors."""
-        raise RuntimeError("Airflow's async functionality is only available on Python 3.7+")
-
-
-__all__ = ["create_task"]
diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index 2c0a0bd5bf..ac7d22a6b1 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -26,7 +26,6 @@ from typing import Deque, Dict, Set, Tuple, Type
 
 from sqlalchemy import func
 
-from airflow.compat.asyncio import create_task
 from airflow.configuration import conf
 from airflow.jobs.base_job import BaseJob
 from airflow.models.trigger import Trigger
@@ -236,7 +235,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         The loop in here runs trigger addition/deletion/cleanup. Actual
         triggers run in their own separate coroutines.
         """
-        watchdog = create_task(self.block_watchdog())
+        watchdog = asyncio.create_task(self.block_watchdog())
         last_status = time.time()
         while not self.stop:
             # Run core logic
@@ -263,7 +262,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
             trigger_id, trigger_instance = self.to_create.popleft()
             if trigger_id not in self.triggers:
                 self.triggers[trigger_id] = {
-                    "task": create_task(self.run_trigger(trigger_id, trigger_instance)),
+                    "task": asyncio.create_task(self.run_trigger(trigger_id, trigger_instance)),
                     "name": f"{trigger_instance!r} (ID {trigger_id})",
                     "events": 0,
                 }
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 823287dcb1..4370f36c3a 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -78,7 +78,7 @@ from airflow.stats import Stats
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
 from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
 from airflow.timetables.simple import NullTimetable, OnceTimetable
-from airflow.typing_compat import Literal, RePatternType
+from airflow.typing_compat import Literal
 from airflow.utils import timezone
 from airflow.utils.dag_cycle_tester import check_cycle
 from airflow.utils.dates import cron_presets, date_range as utils_date_range
@@ -1998,7 +1998,7 @@ class DAG(LoggingMixin):
 
     def partial_subset(
         self,
-        task_ids_or_regex: Union[str, RePatternType, Iterable[str]],
+        task_ids_or_regex: Union[str, re.Pattern, Iterable[str]],
         include_downstream=False,
         include_upstream=True,
         include_direct_upstream=False,
@@ -2026,7 +2026,7 @@ class DAG(LoggingMixin):
         memo = {id(self.task_dict): None, id(self._task_group): None}
         dag = copy.deepcopy(self, memo)  # type: ignore
 
-        if isinstance(task_ids_or_regex, (str, RePatternType)):
+        if isinstance(task_ids_or_regex, (str, re.Pattern)):
             matched_tasks = [t for t in self.tasks if re.findall(task_ids_or_regex, t.task_id)]
         else:
             matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex]
diff --git a/airflow/typing_compat.py b/airflow/typing_compat.py
index 9f1185f76c..163889b8a2 100644
--- a/airflow/typing_compat.py
+++ b/airflow/typing_compat.py
@@ -28,12 +28,3 @@ try:
     from typing import Literal, Protocol, TypedDict, runtime_checkable  # type: ignore
 except ImportError:
     from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable  # type: ignore # noqa
-
-
-# Before Py 3.7, there is no re.Pattern class
-try:
-    from re import Pattern as RePatternType  # type: ignore
-except ImportError:
-    import re
-
-    RePatternType = type(re.compile('', 0))  # type: ignore
diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py
index c4c1c390b2..de038be48b 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -24,8 +24,6 @@ from airflow import settings
 from airflow.compat.functools import cache, cached_property
 
 if TYPE_CHECKING:
-    from airflow.typing_compat import RePatternType
-
     RedactableItem = Union[str, Dict[Any, Any], Tuple[Any, ...], List[Any]]
 
 
@@ -115,7 +113,7 @@ def _secrets_masker() -> "SecretsMasker":
 class SecretsMasker(logging.Filter):
     """Redact secrets from logs"""
 
-    replacer: Optional["RePatternType"] = None
+    replacer: Optional[re.Pattern] = None
     patterns: Set[str]
 
     ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py
index 8f2f222d71..1f22694fae 100644
--- a/tests/triggers/test_temporal.py
+++ b/tests/triggers/test_temporal.py
@@ -21,7 +21,6 @@ import datetime
 import pendulum
 import pytest
 
-from airflow.compat.asyncio import create_task
 from airflow.triggers.base import TriggerEvent
 from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
 from airflow.utils import timezone
@@ -72,7 +71,7 @@ async def test_datetime_trigger_timing():
 
     # Create a task that runs the trigger for a short time then cancels it
     trigger = DateTimeTrigger(future_moment)
-    trigger_task = create_task(trigger.run().__anext__())
+    trigger_task = asyncio.create_task(trigger.run().__anext__())
     await asyncio.sleep(0.5)
 
     # It should not have produced a result
@@ -81,7 +80,7 @@ async def test_datetime_trigger_timing():
 
     # Now, make one waiting for en event in the past and do it again
     trigger = DateTimeTrigger(past_moment)
-    trigger_task = create_task(trigger.run().__anext__())
+    trigger_task = asyncio.create_task(trigger.run().__anext__())
     await asyncio.sleep(0.5)
 
     assert trigger_task.done() is True


[airflow] 08/16: scheduleinterval nullable true added in openapi (#24253)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 49fc732db365f9aee7f23717e4600fbf36d08453
Author: Bowrna <ma...@gmail.com>
AuthorDate: Tue Jun 7 16:51:50 2022 +0530

    scheduleinterval nullable true added in openapi (#24253)
    
    (cherry picked from commit 7e56bf662915cd58849626d7a029a4ba70cdda4d)
---
 airflow/api_connexion/openapi/v1.yaml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index d19c11aeba..553be4fa23 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3564,6 +3564,7 @@ components:
       description: |
         Schedule interval. Defines how often DAG runs, this object gets added to your latest task instance's
         execution_date to figure out the next schedule.
+      nullable: true
       readOnly: true
       anyOf:
         - $ref: '#/components/schemas/TimeDelta'


[airflow] 09/16: Fix bugs in URI constructor for MySQL connection (#24320)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d3366fcdde01b7702d534ccaca44035d39a53518
Author: Maksim <ma...@gmail.com>
AuthorDate: Mon Jun 20 00:25:59 2022 +0300

    Fix bugs in URI constructor for MySQL connection (#24320)
    
    * Fix bugs in URI constructor for MySQL connection
    
    * Update unit tests
    
    (cherry picked from commit ea54faf290cc15a4ace50c23fe9ab2fa9593059f)
---
 airflow/models/connection.py                  |  10 +--
 tests/cli/commands/test_connection_command.py |   6 +-
 tests/hooks/test_dbapi.py                     | 106 ++++++++++++++++++++++++++
 tests/secrets/test_local_filesystem.py        |   4 +-
 4 files changed, 116 insertions(+), 10 deletions(-)

diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 8134f372ca..4935023292 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -231,10 +231,10 @@ class Connection(Base, LoggingMixin):
             host_block += quote(self.host, safe='')
 
         if self.port:
-            if host_block > '':
-                host_block += f':{self.port}'
-            else:
+            if host_block == '' and authority_block == '':
                 host_block += f'@:{self.port}'
+            else:
+                host_block += f':{self.port}'
 
         if self.schema:
             host_block += f"/{quote(self.schema, safe='')}"
@@ -247,9 +247,9 @@ class Connection(Base, LoggingMixin):
             except TypeError:
                 query = None
             if query and self.extra_dejson == dict(parse_qsl(query, keep_blank_values=True)):
-                uri += '?' + query
+                uri += ('?' if self.schema else '/?') + query
             else:
-                uri += '?' + urlencode({self.EXTRA_KEY: self.extra})
+                uri += ('?' if self.schema else '/?') + urlencode({self.EXTRA_KEY: self.extra})
 
         return uri
 
diff --git a/tests/cli/commands/test_connection_command.py b/tests/cli/commands/test_connection_command.py
index 621be5916a..eb2d575d02 100644
--- a/tests/cli/commands/test_connection_command.py
+++ b/tests/cli/commands/test_connection_command.py
@@ -242,14 +242,14 @@ class TestCliExportConnections:
                 'uri',
                 [
                     "airflow_db=mysql://root:plainpassword@mysql/airflow",
-                    "druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql",
+                    "druid_broker_default=druid://druid-broker:8082/?endpoint=druid%2Fv2%2Fsql",
                 ],
             ),
             (
                 None,  # tests that default is URI
                 [
                     "airflow_db=mysql://root:plainpassword@mysql/airflow",
-                    "druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql",
+                    "druid_broker_default=druid://druid-broker:8082/?endpoint=druid%2Fv2%2Fsql",
                 ],
             ),
             (
@@ -287,7 +287,7 @@ class TestCliExportConnections:
         connection_command.connections_export(args)
         expected_connections = [
             "airflow_db=mysql://root:plainpassword@mysql/airflow",
-            "druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql",
+            "druid_broker_default=druid://druid-broker:8082/?endpoint=druid%2Fv2%2Fsql",
         ]
 
         assert output_filepath.read_text().splitlines() == expected_connections
diff --git a/tests/hooks/test_dbapi.py b/tests/hooks/test_dbapi.py
index fd2bbd9132..a17c24aedb 100644
--- a/tests/hooks/test_dbapi.py
+++ b/tests/hooks/test_dbapi.py
@@ -17,6 +17,7 @@
 # under the License.
 #
 
+import json
 import unittest
 from unittest import mock
 
@@ -235,6 +236,111 @@ class TestDbApiHook(unittest.TestCase):
         )
         assert "conn-type://host:1/schema" == self.db_hook.get_uri()
 
+    def test_get_uri_extra(self):
+        self.db_hook.get_connection = mock.MagicMock(
+            return_value=Connection(
+                conn_type="conn-type",
+                host="host",
+                login='login',
+                password='password',
+                extra=json.dumps({'charset': 'utf-8'}),
+            )
+        )
+        assert self.db_hook.get_uri() == "conn-type://login:password@host/?charset=utf-8"
+
+    def test_get_uri_extra_with_schema(self):
+        self.db_hook.get_connection = mock.MagicMock(
+            return_value=Connection(
+                conn_type="conn-type",
+                host="host",
+                login='login',
+                password='password',
+                schema="schema",
+                extra=json.dumps({'charset': 'utf-8'}),
+            )
+        )
+        assert self.db_hook.get_uri() == "conn-type://login:password@host/schema?charset=utf-8"
+
+    def test_get_uri_extra_with_port(self):
+        self.db_hook.get_connection = mock.MagicMock(
+            return_value=Connection(
+                conn_type="conn-type",
+                host="host",
+                login='login',
+                password='password',
+                port=3306,
+                extra=json.dumps({'charset': 'utf-8'}),
+            )
+        )
+        assert self.db_hook.get_uri() == "conn-type://login:password@host:3306/?charset=utf-8"
+
+    def test_get_uri_extra_with_port_and_empty_host(self):
+        self.db_hook.get_connection = mock.MagicMock(
+            return_value=Connection(
+                conn_type="conn-type",
+                login='login',
+                password='password',
+                port=3306,
+                extra=json.dumps({'charset': 'utf-8'}),
+            )
+        )
+        assert self.db_hook.get_uri() == "conn-type://login:password@:3306/?charset=utf-8"
+
+    def test_get_uri_extra_with_port_and_schema(self):
+        self.db_hook.get_connection = mock.MagicMock(
+            return_value=Connection(
+                conn_type="conn-type",
+                host="host",
+                login='login',
+                password='password',
+                schema="schema",
+                port=3306,
+                extra=json.dumps({'charset': 'utf-8'}),
+            )
+        )
+        assert self.db_hook.get_uri() == "conn-type://login:password@host:3306/schema?charset=utf-8"
+
+    def test_get_uri_without_password(self):
+        self.db_hook.get_connection = mock.MagicMock(
+            return_value=Connection(
+                conn_type="conn-type",
+                host="host",
+                login='login',
+                password=None,
+                schema="schema",
+                port=3306,
+                extra=json.dumps({'charset': 'utf-8'}),
+            )
+        )
+        assert self.db_hook.get_uri() == "conn-type://login@host:3306/schema?charset=utf-8"
+
+    def test_get_uri_without_auth(self):
+        self.db_hook.get_connection = mock.MagicMock(
+            return_value=Connection(
+                conn_type="conn-type",
+                host="host",
+                login=None,
+                password=None,
+                schema="schema",
+                port=3306,
+                extra=json.dumps({'charset': 'utf-8'}),
+            )
+        )
+        assert self.db_hook.get_uri() == "conn-type://host:3306/schema?charset=utf-8"
+
+    def test_get_uri_without_auth_and_empty_host(self):
+        self.db_hook.get_connection = mock.MagicMock(
+            return_value=Connection(
+                conn_type="conn-type",
+                login=None,
+                password=None,
+                schema="schema",
+                port=3306,
+                extra=json.dumps({'charset': 'utf-8'}),
+            )
+        )
+        assert self.db_hook.get_uri() == "conn-type://@:3306/schema?charset=utf-8"
+
     def test_run_log(self):
         statement = 'SQL'
         self.db_hook.run(statement)
diff --git a/tests/secrets/test_local_filesystem.py b/tests/secrets/test_local_filesystem.py
index a7bbd824f0..82ef7d469f 100644
--- a/tests/secrets/test_local_filesystem.py
+++ b/tests/secrets/test_local_filesystem.py
@@ -157,8 +157,8 @@ class TestLoadConnection(unittest.TestCase):
     @parameterized.expand(
         (
             (
-                "CONN_ID=mysql://host_1?param1=val1&param2=val2",
-                {"CONN_ID": "mysql://host_1?param1=val1&param2=val2"},
+                "CONN_ID=mysql://host_1/?param1=val1&param2=val2",
+                {"CONN_ID": "mysql://host_1/?param1=val1&param2=val2"},
             ),
         )
     )


[airflow] 07/16: Unify return_code interface for task runner (#24093)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 065cd19de4f834ceca05b51d121399a3b80719ac
Author: Ping Zhang <pi...@umich.edu>
AuthorDate: Mon Jun 6 02:47:43 2022 -0700

    Unify return_code interface for task runner (#24093)
    
    (cherry picked from commit 603c555e1f0f607edb3f171ca5d206f60056c656)
---
 airflow/task/task_runner/base_task_runner.py   | 2 +-
 airflow/task/task_runner/cgroup_task_runner.py | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py
index 47be386b74..2ab649db87 100644
--- a/airflow/task/task_runner/base_task_runner.py
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -174,7 +174,7 @@ class BaseTaskRunner(LoggingMixin):
         """Start running the task instance in a subprocess."""
         raise NotImplementedError()
 
-    def return_code(self) -> Optional[int]:
+    def return_code(self, timeout: int = 0) -> Optional[int]:
         """
         :return: The return code associated with running the task instance or
             None if the task is not yet done.
diff --git a/airflow/task/task_runner/cgroup_task_runner.py b/airflow/task/task_runner/cgroup_task_runner.py
index d6c6e53abf..a5604a6d4e 100644
--- a/airflow/task/task_runner/cgroup_task_runner.py
+++ b/airflow/task/task_runner/cgroup_task_runner.py
@@ -21,6 +21,7 @@
 import datetime
 import os
 import uuid
+from typing import Optional
 
 import psutil
 from cgroupspy import trees
@@ -163,7 +164,7 @@ class CgroupTaskRunner(BaseTaskRunner):
         self.log.debug("Starting task process with cgroups cpu,memory: %s", cgroup_name)
         self.process = self.run_command(['cgexec', '-g', f'cpu,memory:{cgroup_name}'])
 
-    def return_code(self):
+    def return_code(self, timeout: int = 0) -> Optional[int]:
         return_code = self.process.poll()
         # TODO(plypaul) Monitoring the control file in the cgroup fs is better than
         # checking the return code here. The PR to use this is here:


[airflow] 14/16: Move fallible ti.task.dag assignment back inside try/except block (#24533) (#24592)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 51c58c0c681765c76a8a8e5920300f0a91712a9e
Author: EJ Kreinar <ej...@gmail.com>
AuthorDate: Thu Jun 30 12:40:25 2022 -0400

    Move fallible ti.task.dag assignment back inside try/except block (#24533) (#24592)
    
    * Move fallible ti.task.dag assignment back inside try/except block
    
    It looks like ti.task.dag was originally protected inside try/except,
    but was moved out at commit 7be87d
    
    * Remove unneeded variable annotation
    
    Co-authored-by: EJ Kreinar <ej...@he360.com>
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 97948ecae7fcbb7dfdfb169cfe653bd20a108def)
---
 airflow/providers/elasticsearch/log/es_task_handler.py |  7 ++++---
 airflow/utils/log/file_task_handler.py                 | 10 +++++-----
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index 64fce0df53..4707f523d6 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -125,12 +125,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
             else:
                 log_id_template = self.log_id_template
 
-        dag = ti.task.dag
-        assert dag is not None  # For Mypy.
         try:
-            data_interval: Tuple[datetime, datetime] = dag.get_run_data_interval(dag_run)
+            dag = ti.task.dag
         except AttributeError:  # ti.task is not always set.
             data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
+        else:
+            assert dag is not None  # For Mypy.
+            data_interval = dag.get_run_data_interval(dag_run)
 
         if self.json_format:
             data_interval_start = self._clean_date(data_interval[0])
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 2c53529a72..471d5b95be 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -19,9 +19,8 @@
 import logging
 import os
 import warnings
-from datetime import datetime
 from pathlib import Path
-from typing import TYPE_CHECKING, Optional, Tuple
+from typing import TYPE_CHECKING, Optional
 
 from airflow.configuration import AirflowConfigException, conf
 from airflow.utils.context import Context
@@ -93,12 +92,13 @@ class FileTaskHandler(logging.Handler):
             context["try_number"] = try_number
             return render_template_to_string(jinja_tpl, context)
         elif str_tpl:
-            dag = ti.task.dag
-            assert dag is not None  # For Mypy.
             try:
-                data_interval: Tuple[datetime, datetime] = dag.get_run_data_interval(dag_run)
+                dag = ti.task.dag
             except AttributeError:  # ti.task is not always set.
                 data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
+            else:
+                assert dag is not None  # For Mypy.
+                data_interval = dag.get_run_data_interval(dag_run)
             if data_interval[0]:
                 data_interval_start = data_interval[0].isoformat()
             else:


[airflow] 13/16: Fix typo (#24568)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 465834bfbe59f122811c13d90399bb6d5a9f5902
Author: Jon Crall <er...@gmail.com>
AuthorDate: Tue Jun 21 09:13:33 2022 -0400

    Fix typo (#24568)
    
    (cherry picked from commit d6cd528cab4caab2b38866db3b174ff366d9c66d)
---
 airflow/models/taskmixin.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/models/taskmixin.py b/airflow/models/taskmixin.py
index 1d66a719a6..06494946a8 100644
--- a/airflow/models/taskmixin.py
+++ b/airflow/models/taskmixin.py
@@ -237,7 +237,7 @@ class DAGNode(DependencyMixin, metaclass=ABCMeta):
         task_or_task_list: Union[DependencyMixin, Sequence[DependencyMixin]],
         edge_modifier: Optional["EdgeModifier"] = None,
     ) -> None:
-        """Set a node (or nodes) to be directly downstream from the current node."""
+        """Set a node (or nodes) to be directly upstream from the current node."""
         self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
 
     @property


[airflow] 15/16: Fix timestamp defaults for sensorinstance (#24638)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 691ceab95354d5ba6b3679ca6be026a45e172129
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Fri Jun 24 09:31:58 2022 -0700

    Fix timestamp defaults for sensorinstance (#24638)
    
    Constant values were used where callables were intended.
    
    (cherry picked from commit 40765307d8c2febbeaddff61551cbfa9d717522e)
---
 airflow/migrations/versions/0001_1_5_0_current_schema.py              | 2 +-
 .../migrations/versions/0065_2_0_0_update_schema_for_smart_sensor.py  | 4 ++--
 airflow/migrations/versions/0103_2_3_0_add_callback_request_table.py  | 2 +-
 airflow/models/sensorinstance.py                                      | 4 ++--
 4 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/airflow/migrations/versions/0001_1_5_0_current_schema.py b/airflow/migrations/versions/0001_1_5_0_current_schema.py
index 9824db7dad..c700df8c47 100644
--- a/airflow/migrations/versions/0001_1_5_0_current_schema.py
+++ b/airflow/migrations/versions/0001_1_5_0_current_schema.py
@@ -214,7 +214,7 @@ def upgrade():
             sa.Column('id', sa.Integer(), nullable=False),
             sa.Column('key', StringID(length=512), nullable=True),
             sa.Column('value', sa.PickleType(), nullable=True),
-            sa.Column('timestamp', sa.DateTime(), default=func.now(), nullable=False),
+            sa.Column('timestamp', sa.DateTime(), default=func.now, nullable=False),
             sa.Column('execution_date', sa.DateTime(), nullable=False),
             sa.Column('task_id', StringID(), nullable=False),
             sa.Column('dag_id', StringID(), nullable=False),
diff --git a/airflow/migrations/versions/0065_2_0_0_update_schema_for_smart_sensor.py b/airflow/migrations/versions/0065_2_0_0_update_schema_for_smart_sensor.py
index d98be012b7..228361dc58 100644
--- a/airflow/migrations/versions/0065_2_0_0_update_schema_for_smart_sensor.py
+++ b/airflow/migrations/versions/0065_2_0_0_update_schema_for_smart_sensor.py
@@ -61,8 +61,8 @@ def upgrade():
         sa.Column('shardcode', sa.Integer(), nullable=False),
         sa.Column('poke_context', sa.Text(), nullable=False),
         sa.Column('execution_context', sa.Text(), nullable=True),
-        sa.Column('created_at', TIMESTAMP, default=func.now(), nullable=False),
-        sa.Column('updated_at', TIMESTAMP, default=func.now(), nullable=False),
+        sa.Column('created_at', TIMESTAMP, default=func.now, nullable=False),
+        sa.Column('updated_at', TIMESTAMP, default=func.now, nullable=False),
         sa.PrimaryKeyConstraint('id'),
     )
     op.create_index('ti_primary_key', 'sensor_instance', ['dag_id', 'task_id', 'execution_date'], unique=True)
diff --git a/airflow/migrations/versions/0103_2_3_0_add_callback_request_table.py b/airflow/migrations/versions/0103_2_3_0_add_callback_request_table.py
index 637abe8aee..747a6217c2 100644
--- a/airflow/migrations/versions/0103_2_3_0_add_callback_request_table.py
+++ b/airflow/migrations/versions/0103_2_3_0_add_callback_request_table.py
@@ -45,7 +45,7 @@ def upgrade():
     op.create_table(
         TABLE_NAME,
         sa.Column('id', sa.Integer(), nullable=False, primary_key=True),
-        sa.Column('created_at', TIMESTAMP, default=func.now(), nullable=False),
+        sa.Column('created_at', TIMESTAMP, default=func.now, nullable=False),
         sa.Column('priority_weight', sa.Integer(), default=1, nullable=False),
         sa.Column('callback_data', ExtendedJSON, nullable=False),
         sa.Column('callback_type', sa.String(20), nullable=False),
diff --git a/airflow/models/sensorinstance.py b/airflow/models/sensorinstance.py
index b1681e5227..8c66536064 100644
--- a/airflow/models/sensorinstance.py
+++ b/airflow/models/sensorinstance.py
@@ -59,8 +59,8 @@ class SensorInstance(Base):
     shardcode = Column(Integer, nullable=False)
     poke_context = Column(Text, nullable=False)
     execution_context = Column(Text)
-    created_at = Column(UtcDateTime, default=timezone.utcnow(), nullable=False)
-    updated_at = Column(UtcDateTime, default=timezone.utcnow(), onupdate=timezone.utcnow(), nullable=False)
+    created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
+    updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
 
     # SmartSensor doesn't support mapped operators, but this is needed for compatibly with the
     # log_filename_template of TaskInstances


[airflow] 06/16: Handle occasional deadlocks in trigger with retries (#24071)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4284d03e98df5701a3e41cbf0826b2dccbefacee
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Wed Jun 1 19:54:40 2022 +0200

    Handle occasional deadlocks in trigger with retries (#24071)
    
    Fixes: #23639
    (cherry picked from commit d86ae090350de97e385ca4aaf128235f4c21f158)
---
 airflow/models/trigger.py | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index c1ccdd4964..2f332393f9 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -24,6 +24,7 @@ from airflow.models.base import Base
 from airflow.models.taskinstance import TaskInstance
 from airflow.triggers.base import BaseTrigger
 from airflow.utils import timezone
+from airflow.utils.retries import run_with_db_retries
 from airflow.utils.session import provide_session
 from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
 from airflow.utils.state import State
@@ -88,9 +89,11 @@ class Trigger(Base):
         (triggers have a one-to-many relationship to both)
         """
         # Update all task instances with trigger IDs that are not DEFERRED to remove them
-        session.query(TaskInstance).filter(
-            TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
-        ).update({TaskInstance.trigger_id: None})
+        for attempt in run_with_db_retries():
+            with attempt:
+                session.query(TaskInstance).filter(
+                    TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
+                ).update({TaskInstance.trigger_id: None})
         # Get all triggers that have no task instances depending on them...
         ids = [
             trigger_id


[airflow] 12/16: Add missing types to FSHook (#24470)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2b9f6d05124d4d343d9c64e5c5eaf7bd28ab4065
Author: Léni Marvaud <24...@users.noreply.github.com>
AuthorDate: Sun Jun 19 23:27:30 2022 +0200

    Add missing types to FSHook (#24470)
    
    Fix mypy error :
    ```
    error: Call to untyped function "FSHook" in typed context  [no-untyped-call]
    ```
    
    When using :
    ```python
    fs_hook = FSHook(fs_conn_id)
    ```
    
    (cherry picked from commit d4eeede1fa65d626c6ce0d17f2c3c61f6b003162)
---
 airflow/hooks/filesystem.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/hooks/filesystem.py b/airflow/hooks/filesystem.py
index d9ad6ded1c..c694940c2a 100644
--- a/airflow/hooks/filesystem.py
+++ b/airflow/hooks/filesystem.py
@@ -33,13 +33,13 @@ class FSHook(BaseHook):
     Extra: {"path": "/tmp"}
     """
 
-    def __init__(self, conn_id='fs_default'):
+    def __init__(self, conn_id: str = 'fs_default'):
         super().__init__()
         conn = self.get_connection(conn_id)
         self.basepath = conn.extra_dejson.get('path', '')
         self.conn = conn
 
-    def get_conn(self):
+    def get_conn(self) -> None:
         pass
 
     def get_path(self) -> str: