You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/10/18 13:10:16 UTC

[airflow] branch v2-4-test updated (b6c8fd6ada -> 5bf2850490)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a change to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


    from b6c8fd6ada Trigger constraint build
     new b7afa67253 Retry on Airflow Schedule DAG Run DB Deadlock (#26347)
     new 48ce0eaca2 Add last_updated_at and total_updates to datasets list view (#26358)
     new c652493e22 Bump FAB to 4.1.4 (#26393)
     new 8e836db485 Revert "No grid auto-refresh for backfill dag runs (#25042)" (#26463)
     new a643184ed9 Fix version for a couple configurations (#26491)
     new 804abc71e6 Fix airflow tasks run --local when dags_folder differs from that of processor (#26509)
     new bd6d5e7bf5 isort is now a dev dependency, add it to devel in setup.py (#26510)
     new 6cc6d004fd Add fixture for CLI tests requiring sample dags (#26536)
     new 14e55b64ce Add a note against use of top level code in timetable (#26649)
     new 29bc32a67a Simplify RTIF.delete_old_records() (#26667)
     new 357549c84b make consistency on markup title string level (#26696)
     new a325ba7327 Remove TaskFail duplicates check (#26714)
     new 463900ce89 Fix non-hidden cumulative chart on duration view (#26716)
     new ba281d69af Bump sphinx and sphinx-autoapi (#26743)
     new 3e09020946 Remove DAG parsing from StandardTaskRunner (#26750)
     new efe1142541 Allow retrieving error message from data.detail (#26762)
     new a19783e674 Add restarting state to TaskState Enum in REST API (#26776)
     new 88bed082e6 Ensure the log messages from operators during parsing go somewhere (#26779)
     new 4fe9eef773 add icon legend to datasets graph (#26781)
     new 332959208f A few docs fixups (#26788)
     new a1aef37cae demote Removed state in priority for displaying task summaries (#26789)
     new 56f1e473e7 Fix warning when using xcomarg dependencies (#26801)
     new 9e31a1154a Fix running debuggers inside `airflow tasks test` (#26806)
     new 9bcf5b342e Move user-facing string to template (#26815)
     new bcfa418637 Documentation fixes (#26819)
     new 177959ab8b Add missing colors to state_color_mapping jinja global (#26822)
     new 2cc75dac8c Fixed triple quotes in task group example (#26829)
     new 000718613f Bump min version of jinja2 (#26866)
     new 05408cbe96 Clean-ups around task-mapping code (#26879)
     new d760ee6ff8 Change dag audit log sort by date from asc to desc (#26895)
     new b9e09af530 fix next run dataset modal links (#26897)
     new 66724540e7 Fix auto refresh for graph view (#26926)
     new 24912355fb Remove double collection of dags in `airflow dags reserialize`  (#27030)
     new ee2cdbf185 Filter dataset dependency data on webserver (#27046)
     new 2f8ba62322 Remove info log about closing parent pipe (#27054)
     new 34052dbbee Avoid 500 on dag redirect (#27064)
     new ecd943915f Fix task duration cumulative chart (#26717)
     new fa62205ff9 Handle mapped tasks in task duration chart (#26722)
     new c0fa6afd1b Fix broken URL for `docker-compose.yaml` (#26726)
     new c1daf84a75 Don't re-patch pods that are already controlled by current worker (#26778)
     new 5bf2850490 Add separate error handler for 405(Method not allowed) errors (#26880)

The 41 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 RELEASE_NOTES.rst                                  |   2 +-
 airflow/api_connexion/openapi/v1.yaml              |   2 +
 airflow/cli/cli_parser.py                          |   2 +-
 airflow/cli/commands/task_command.py               |  10 +-
 airflow/config_templates/airflow_local_settings.py |   6 +-
 airflow/config_templates/config.yml                |   4 +-
 airflow/dag_processing/processor.py                |   2 -
 airflow/executors/kubernetes_executor.py           |   5 +-
 airflow/jobs/scheduler_job.py                      |  19 +-
 airflow/models/baseoperator.py                     |   5 +-
 airflow/models/dag.py                              |  20 +-
 airflow/models/dagbag.py                           |  26 +-
 airflow/models/dagrun.py                           |  54 ++--
 airflow/models/mappedoperator.py                   |  10 +-
 airflow/models/renderedtifields.py                 |  68 ++---
 airflow/providers/microsoft/winrm/hooks/winrm.py   |   4 +-
 airflow/providers/ssh/hooks/ssh.py                 |   4 +-
 airflow/serialization/schema.json                  |   8 +-
 airflow/serialization/serialized_objects.py        |  10 +-
 airflow/settings.py                                |   3 +
 airflow/task/task_runner/standard_task_runner.py   |   8 +-
 airflow/utils/cli.py                               |  58 +++-
 airflow/utils/db.py                                |  43 ++-
 airflow/utils/log/file_processor_handler.py        |   3 +
 airflow/utils/log/file_task_handler.py             |   3 +
 airflow/utils/log/logging_mixin.py                 |  18 +-
 airflow/utils/log/secrets_masker.py                |   1 +
 airflow/utils/sqlalchemy.py                        |  20 +-
 airflow/www/extensions/init_views.py               |  10 +-
 airflow/www/jest-setup.js                          |   3 +
 airflow/www/static/js/api/useDataset.ts            |   8 +-
 .../www/static/js/api/useDatasetDependencies.ts    |  13 +-
 airflow/www/static/js/api/useDatasets.ts           |   6 +-
 airflow/www/static/js/api/useGridData.test.ts      |  64 +---
 airflow/www/static/js/api/useGridData.ts           |   2 +-
 airflow/www/static/js/datasetUtils.js              |   2 +-
 airflow/www/static/js/datasets/Graph/Legend.tsx    |  76 +++++
 airflow/www/static/js/datasets/Graph/index.tsx     |  37 +--
 airflow/www/static/js/datasets/List.tsx            |  47 ++-
 airflow/www/static/js/duration_chart.js            |   1 +
 airflow/www/static/js/graph.js                     |  13 +-
 airflow/www/static/js/types/api-generated.ts       |   4 +-
 airflow/www/static/js/types/index.ts               |  10 +-
 airflow/www/static/js/utils/useErrorToast.ts       |   2 +-
 airflow/www/templates/airflow/dags.html            |   4 +-
 airflow/www/templates/airflow/datasets.html        |   3 +-
 .../airflow/{not_found.html => error.html}         |   6 +-
 airflow/www/templates/airflow/graph.html           |   1 +
 airflow/www/utils.py                               |   2 +-
 airflow/www/views.py                               | 156 ++++++++--
 .../operators.rst                                  |   2 +
 docs/apache-airflow/best-practices.rst             |  35 +++
 docs/apache-airflow/concepts/cluster-policies.rst  |   4 +-
 docs/apache-airflow/concepts/dags.rst              |   2 +-
 docs/apache-airflow/concepts/timetable.rst         |   6 +
 docs/apache-airflow/executor/local.rst             |   2 +-
 docs/apache-airflow/howto/customize-ui.rst         |  13 +-
 docs/apache-airflow/howto/docker-compose/index.rst |   4 +-
 setup.cfg                                          |   4 +-
 setup.py                                           |   9 +-
 tests/api_connexion/test_error_handling.py         |  47 ++-
 tests/cli/commands/test_task_command.py            | 140 +++++----
 tests/cli/conftest.py                              |   7 +
 tests/conftest.py                                  |  11 +
 tests/dags/{test_sensor.py => test_dags_folder.py} |  20 +-
 tests/decorators/test_python.py                    |  15 +
 tests/jobs/test_scheduler_job.py                   |  10 +-
 tests/models/test_baseoperator.py                  |  10 +
 tests/models/test_dag.py                           |  72 ++++-
 tests/models/test_dagbag.py                        |  13 +
 tests/providers/arangodb/sensors/test_arangodb.py  |   8 +-
 tests/serialization/test_dag_serialization.py      |  19 +-
 tests/test_utils/mapping.py                        |   2 +-
 tests/utils/log/test_secrets_masker.py             |  12 +
 tests/utils/test_cli_util.py                       |  21 ++
 tests/www/views/test_views_dataset.py              | 332 +++++++++++++++++++++
 tests/www/views/test_views_tasks.py                | 271 +++++++++++++++--
 77 files changed, 1522 insertions(+), 457 deletions(-)
 create mode 100644 airflow/www/static/js/datasets/Graph/Legend.tsx
 rename airflow/www/templates/airflow/{not_found.html => error.html} (91%)
 copy tests/dags/{test_sensor.py => test_dags_folder.py} (69%)
 create mode 100644 tests/www/views/test_views_dataset.py


[airflow] 03/41: Bump FAB to 4.1.4 (#26393)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c652493e22f01eb2a5681cf949b6ef141b7cdac4
Author: Gabriel Machado <ga...@hotmail.com>
AuthorDate: Thu Sep 15 10:32:45 2022 +0200

    Bump FAB to 4.1.4 (#26393)
    
    No changes that should be replicated in `airflow/www/fab_security`
    was found.
    
    See https://github.com/dpgaspar/Flask-AppBuilder/compare/v4.1.3...v4.1.4
    
    I expect that the constraints dependencies got updated to:
    
    flask-wtf==1.0.1
    wtforms==3.0.1
    
    (cherry picked from commit da06ff0833a1e96248fe71e083ec1870621049e5)
---
 setup.cfg | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/setup.cfg b/setup.cfg
index 5108d2a012..3744e09c1c 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -108,7 +108,7 @@ install_requires =
     # Every time we update FAB version here, please make sure that you review the classes and models in
     # `airflow/www/fab_security` with their upstream counterparts. In particular, make sure any breaking changes,
     # for example any new methods, are accounted for.
-    flask-appbuilder==4.1.3
+    flask-appbuilder==4.1.4
     flask-caching>=1.5.0
     flask-login>=0.6.2
     flask-session>=0.4.0


[airflow] 39/41: Fix broken URL for `docker-compose.yaml` (#26726)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c0fa6afd1bf5291ceb4841fb7668523d83ff422e
Author: Kaxil Naik <ka...@apache.org>
AuthorDate: Tue Sep 27 22:42:31 2022 +0100

    Fix broken URL for `docker-compose.yaml` (#26726)
    
    This actually fixes it. There was some attempts of it on #26721, #26695, #26711
    
    (cherry picked from commit b4ce2944d804010a55b308bb825b290089070681)
---
 docs/apache-airflow/howto/docker-compose/index.rst | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/howto/docker-compose/index.rst b/docs/apache-airflow/howto/docker-compose/index.rst
index f8b5dc2935..5cde8f7bc1 100644
--- a/docs/apache-airflow/howto/docker-compose/index.rst
+++ b/docs/apache-airflow/howto/docker-compose/index.rst
@@ -54,10 +54,10 @@ Older versions of ``docker-compose`` do not support all the features required by
 Fetching ``docker-compose.yaml``
 ================================
 
-To deploy Airflow on Docker Compose, you should fetch `docker-compose.yaml <../docker-compose.yaml>`__.
-
 .. jinja:: quick_start_ctx
 
+    To deploy Airflow on Docker Compose, you should fetch `docker-compose.yaml <{{ doc_root_url }}docker-compose.yaml>`__.
+
     .. code-block:: bash
 
         curl -LfO '{{ doc_root_url }}docker-compose.yaml'


[airflow] 29/41: Clean-ups around task-mapping code (#26879)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 05408cbe96a5793f59afdb44ebbecf20781ec239
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Tue Oct 11 09:32:01 2022 +0800

    Clean-ups around task-mapping code (#26879)
    
    (cherry picked from commit a2d872481e4075669eca35849431139af75b8c07)
---
 airflow/models/dagrun.py         | 54 ++++++++++++++++------------------------
 airflow/models/mappedoperator.py | 10 +++++++-
 tests/test_utils/mapping.py      |  2 +-
 3 files changed, 32 insertions(+), 34 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 99d969ab19..299fc4b8cc 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -759,7 +759,7 @@ class DagRun(Base, LoggingMixin):
                 expansion_happened = True
             if schedulable.state in SCHEDULEABLE_STATES:
                 task = schedulable.task
-                if isinstance(schedulable.task, MappedOperator):
+                if isinstance(task, MappedOperator):
                     # Ensure the task indexes are complete
                     created = self._revise_mapped_task_indexes(task, session=session)
                     ready_tis.extend(created)
@@ -872,8 +872,6 @@ class DagRun(Base, LoggingMixin):
         hook_is_noop: Literal[True, False] = getattr(task_instance_mutation_hook, 'is_noop', False)
 
         dag = self.get_dag()
-        task_ids: set[str] = set()
-
         task_ids = self._check_for_removed_or_restored_tasks(
             dag, task_instance_mutation_hook, session=session
         )
@@ -951,8 +949,8 @@ class DagRun(Base, LoggingMixin):
                     ti.state = State.REMOVED
             else:
                 #  What if it is _now_ dynamically mapped, but wasn't before?
-                task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
-                total_length = task.run_time_mapped_ti_count(self.run_id, session=session)
+                task.get_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
+                total_length = task.get_mapped_ti_count(self.run_id, session=session)
 
                 if total_length is None:
                     # Not all upstreams finished, so we can't tell what should be here. Remove everything.
@@ -1045,19 +1043,13 @@ class DagRun(Base, LoggingMixin):
         :param session: the session to use
         """
 
-        def expand_mapped_literals(
-            task: Operator, sequence: Sequence[int] | None = None
-        ) -> tuple[Operator, Sequence[int]]:
+        def expand_mapped_literals(task: Operator) -> tuple[Operator, Sequence[int]]:
             if not task.is_mapped:
                 return (task, (-1,))
             task = cast("MappedOperator", task)
-            count = task.parse_time_mapped_ti_count or task.run_time_mapped_ti_count(
-                self.run_id, session=session
-            )
+            count = task.get_mapped_ti_count(self.run_id, session=session)
             if not count:
                 return (task, (-1,))
-            if sequence:
-                return (task, sequence)
             return (task, range(count))
 
         tasks_and_map_idxs = map(expand_mapped_literals, filter(task_filter, dag.task_dict.values()))
@@ -1110,21 +1102,19 @@ class DagRun(Base, LoggingMixin):
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(self, task, session: Session):
+    def _revise_mapped_task_indexes(self, task: MappedOperator, session: Session) -> Iterable[TI]:
         """Check if task increased or reduced in length and handle appropriately"""
-        from airflow.models.taskinstance import TaskInstance
         from airflow.settings import task_instance_mutation_hook
 
-        task.run_time_mapped_ti_count.cache_clear()
-        total_length = (
-            task.parse_time_mapped_ti_count
-            or task.run_time_mapped_ti_count(self.run_id, session=session)
-            or 0
-        )
-        query = session.query(TaskInstance.map_index).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == task.task_id,
-            TaskInstance.run_id == self.run_id,
+        task.get_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
+        total_length = task.get_mapped_ti_count(self.run_id, session=session)
+        if total_length is None:  # Upstreams not ready, don't need to revise this yet.
+            return []
+
+        query = session.query(TI.map_index).filter(
+            TI.dag_id == self.dag_id,
+            TI.task_id == task.task_id,
+            TI.run_id == self.run_id,
         )
         existing_indexes = {i for (i,) in query}
         missing_indexes = set(range(total_length)).difference(existing_indexes)
@@ -1133,7 +1123,7 @@ class DagRun(Base, LoggingMixin):
 
         if missing_indexes:
             for index in missing_indexes:
-                ti = TaskInstance(task, run_id=self.run_id, map_index=index, state=None)
+                ti = TI(task, run_id=self.run_id, map_index=index, state=None)
                 self.log.debug("Expanding TIs upserted %s", ti)
                 task_instance_mutation_hook(ti)
                 ti = session.merge(ti)
@@ -1141,12 +1131,12 @@ class DagRun(Base, LoggingMixin):
                 session.flush()
                 created_tis.append(ti)
         elif removed_indexes:
-            session.query(TaskInstance).filter(
-                TaskInstance.dag_id == self.dag_id,
-                TaskInstance.task_id == task.task_id,
-                TaskInstance.run_id == self.run_id,
-                TaskInstance.map_index.in_(removed_indexes),
-            ).update({TaskInstance.state: TaskInstanceState.REMOVED})
+            session.query(TI).filter(
+                TI.dag_id == self.dag_id,
+                TI.task_id == task.task_id,
+                TI.run_id == self.run_id,
+                TI.map_index.in_(removed_indexes),
+            ).update({TI.state: TaskInstanceState.REMOVED})
             session.flush()
         return created_tis
 
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 8d1c3c4559..62cc22f379 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -727,15 +727,23 @@ class MappedOperator(AbstractOperator):
     def parse_time_mapped_ti_count(self) -> int | None:
         """Number of mapped TaskInstances that can be created at DagRun create time.
 
+        This only considers literal mapped arguments, and would return *None*
+        when any non-literal values are used for mapping.
+
         :return: None if non-literal mapped arg encountered, or the total
             number of mapped TIs this task should have.
         """
         return self._get_specified_expand_input().get_parse_time_mapped_ti_count()
 
     @cache
-    def run_time_mapped_ti_count(self, run_id: str, *, session: Session) -> int | None:
+    def get_mapped_ti_count(self, run_id: str, *, session: Session) -> int | None:
         """Number of mapped TaskInstances that can be created at run time.
 
+        This considers both literal and non-literal mapped arguments, and the
+        result is therefore available when all depended tasks have finished. The
+        return value should be identical to ``parse_time_mapped_ti_count`` if
+        all mapped arguments are literal.
+
         :return: None if upstream tasks are not complete yet, or the total
             number of mapped TIs this task should have.
         """
diff --git a/tests/test_utils/mapping.py b/tests/test_utils/mapping.py
index 5cfa230369..984446343c 100644
--- a/tests/test_utils/mapping.py
+++ b/tests/test_utils/mapping.py
@@ -42,4 +42,4 @@ def expand_mapped_task(
     session.flush()
 
     mapped.expand_mapped_task(run_id, session=session)
-    mapped.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
+    mapped.get_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]


[airflow] 28/41: Bump min version of jinja2 (#26866)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 000718613ffbd7c3d609b9ee7adad4afd17892ec
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Oct 4 15:11:15 2022 -0700

    Bump min version of jinja2 (#26866)
    
    We use the `dumps` kwarg, which was added in 3.0 of jinja2.
    
    (cherry picked from commit 4fa476bf162a42512c17c6198db2de8253779f55)
---
 setup.cfg | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/setup.cfg b/setup.cfg
index 3744e09c1c..7f8344f310 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -119,7 +119,7 @@ install_requires =
     importlib_metadata>=1.7;python_version<"3.9"
     importlib_resources>=5.2;python_version<"3.9"
     itsdangerous>=2.0
-    jinja2>=2.10.1
+    jinja2>=3.0.0
     jsonschema>=3.2.0
     lazy-object-proxy
     linkify-it-py>=2.0.0


[airflow] 08/41: Add fixture for CLI tests requiring sample dags (#26536)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6cc6d004fda71a11531aa607f007c7e72c76e4f9
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Sep 20 21:37:18 2022 -0700

    Add fixture for CLI tests requiring sample dags (#26536)
    
    I noticed that various CLI tests were failing (specifically in test_task_command.py) when run locally but not when run in breeze.  I figured out the cause is that breeze has ('core', 'load_examples') set to true but I had it false locally.  To fix this we can add a fixture that patches the conf settings.  But we have to go one step further.  If we grab the conf value in the default for the argument in DagBag, this means the value is fixed at the time the class gets defined.  So the va [...]
    
    We do the same for safe_mode while we're at it, because why not.
---
 airflow/models/dagbag.py         | 14 ++++++++++++--
 tests/cli/conftest.py            |  7 +++++++
 tests/jobs/test_scheduler_job.py | 10 ++++++++--
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 8a5557c2c2..cabc142f31 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -54,6 +54,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries
 from airflow.utils.session import provide_session
 from airflow.utils.timeout import timeout
+from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
     import pathlib
@@ -92,8 +93,8 @@ class DagBag(LoggingMixin):
     def __init__(
         self,
         dag_folder: str | pathlib.Path | None = None,
-        include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
-        safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
+        include_examples: bool | ArgNotSet = NOTSET,
+        safe_mode: bool | ArgNotSet = NOTSET,
         read_dags_from_db: bool = False,
         store_serialized_dags: bool | None = None,
         load_op_links: bool = True,
@@ -103,6 +104,15 @@ class DagBag(LoggingMixin):
 
         super().__init__()
 
+        include_examples = (
+            include_examples
+            if isinstance(include_examples, bool)
+            else conf.getboolean('core', 'LOAD_EXAMPLES')
+        )
+        safe_mode = (
+            safe_mode if isinstance(safe_mode, bool) else conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')
+        )
+
         if store_serialized_dags:
             warnings.warn(
                 "The store_serialized_dags parameter has been deprecated. "
diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py
index 0a2c062c66..67de520c38 100644
--- a/tests/cli/conftest.py
+++ b/tests/cli/conftest.py
@@ -24,6 +24,7 @@ import pytest
 from airflow import models
 from airflow.cli import cli_parser
 from airflow.executors import celery_executor, celery_kubernetes_executor
+from tests.test_utils.config import conf_vars
 
 # Create custom executors here because conftest is imported first
 custom_executor_module = type(sys)('custom_executor')
@@ -36,6 +37,12 @@ custom_executor_module.CustomCeleryKubernetesExecutor = type(  # type: ignore
 sys.modules['custom_executor'] = custom_executor_module
 
 
+@pytest.fixture(autouse=True)
+def load_examples():
+    with conf_vars({('core', 'load_examples'): 'True'}):
+        yield
+
+
 @pytest.fixture(scope="session")
 def dagbag():
     return models.DagBag(include_examples=True)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 4284bf02d9..81fd6a3017 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -104,6 +104,12 @@ def dagbag():
     return DagBag(read_dags_from_db=True)
 
 
+@pytest.fixture
+def load_examples():
+    with conf_vars({('core', 'load_examples'): 'True'}):
+        yield
+
+
 @pytest.mark.usefixtures("disable_load_example")
 @pytest.mark.need_serialized_dag
 class TestSchedulerJob:
@@ -4014,7 +4020,7 @@ class TestSchedulerJob:
 
             self.scheduler_job.executor.callback_sink.send.assert_not_called()
 
-    def test_find_zombies(self):
+    def test_find_zombies(self, load_examples):
         dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
         with create_session() as session:
             session.query(LocalTaskJob).delete()
@@ -4072,7 +4078,7 @@ class TestSchedulerJob:
             session.query(TaskInstance).delete()
             session.query(LocalTaskJob).delete()
 
-    def test_zombie_message(self):
+    def test_zombie_message(self, load_examples):
         """
         Check that the zombie message comes out as expected
         """


[airflow] 07/41: isort is now a dev dependency, add it to devel in setup.py (#26510)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bd6d5e7bf54dd6201735e611ad84d69d5c87e22c
Author: Niko <on...@amazon.com>
AuthorDate: Tue Sep 20 00:33:59 2022 -0700

    isort is now a dev dependency, add it to devel in setup.py (#26510)
    
    (cherry picked from commit 0bca962cd2c9671adbe68923e17ebecf66a0c6be)
---
 setup.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/setup.py b/setup.py
index 81637e0855..bd032f3566 100644
--- a/setup.py
+++ b/setup.py
@@ -376,6 +376,7 @@ devel_only = [
     'freezegun',
     'gitpython',
     'ipdb',
+    'isort',
     'jira',
     'jsondiff',
     'mongomock',


[airflow] 34/41: Filter dataset dependency data on webserver (#27046)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ee2cdbf185d06fa75455e078b1ec1b3b8e267ea4
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Fri Oct 14 13:34:19 2022 -0400

    Filter dataset dependency data on webserver (#27046)
    
    * filter dataset dependency data on webserver
    
    * rename u,v to source,target
    
    (cherry picked from commit 1c9a87e0c3e792e0fcd0e53f77b2b0158f94f02e)
---
 .../www/static/js/api/useDatasetDependencies.ts    | 13 ++----------
 airflow/www/static/js/types/index.ts               |  4 ++--
 airflow/www/views.py                               | 24 ++++++++++++----------
 3 files changed, 17 insertions(+), 24 deletions(-)

diff --git a/airflow/www/static/js/api/useDatasetDependencies.ts b/airflow/www/static/js/api/useDatasetDependencies.ts
index 104a7025ba..d7ec3260e6 100644
--- a/airflow/www/static/js/api/useDatasetDependencies.ts
+++ b/airflow/www/static/js/api/useDatasetDependencies.ts
@@ -72,7 +72,7 @@ const generateGraph = ({ nodes, edges, font }: GenerateProps) => ({
     height: 40,
     value,
   })),
-  edges: edges.map((e) => ({ id: `${e.u}-${e.v}`, sources: [e.u], targets: [e.v] })),
+  edges: edges.map((e) => ({ id: `${e.source}-${e.target}`, sources: [e.source], targets: [e.target] })),
 });
 
 const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
@@ -81,17 +81,8 @@ const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
   // get computed style to calculate how large each node should be
   const font = `bold ${16}px ${window.getComputedStyle(document.body).fontFamily}`;
 
-  // Make sure we only show edges that are connected to two nodes.
-  const newEdges = edges.filter((e) => {
-    const edgeNodes = nodes.filter((n) => n.id === e.u || n.id === e.v);
-    return edgeNodes.length === 2;
-  });
-
-  // Then filter out any nodes without an edge.
-  const newNodes = nodes.filter((n) => newEdges.some((e) => e.u === n.id || e.v === n.id));
-
   // Finally generate the graph data with elk
-  const data = await elk.layout(generateGraph({ nodes: newNodes, edges: newEdges, font }));
+  const data = await elk.layout(generateGraph({ nodes, edges, font }));
   return data as Data;
 };
 
diff --git a/airflow/www/static/js/types/index.ts b/airflow/www/static/js/types/index.ts
index bf0f7d9b9c..7fc88afa67 100644
--- a/airflow/www/static/js/types/index.ts
+++ b/airflow/www/static/js/types/index.ts
@@ -93,8 +93,8 @@ interface DepNode {
 }
 
 interface DepEdge {
-  u: string;
-  v: string;
+  source: string;
+  target: string;
 }
 
 interface DatasetListItem extends API.Dataset {
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b6b6836256..dd6c9226af 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3526,19 +3526,21 @@ class Airflow(AirflowBaseView):
 
         for dag, dependencies in SerializedDagModel.get_dag_dependencies().items():
             dag_node_id = f"dag:{dag}"
-            if dag_node_id not in nodes_dict:
-                nodes_dict[dag_node_id] = node_dict(dag_node_id, dag, "dag")
-
-            for dep in dependencies:
-                if dep.node_id not in nodes_dict and (
-                    dep.dependency_type == 'dag' or dep.dependency_type == 'dataset'
-                ):
-                    nodes_dict[dep.node_id] = node_dict(dep.node_id, dep.dependency_id, dep.dependency_type)
-                edge_tuples.add((f"dag:{dep.source}", dep.node_id))
-                edge_tuples.add((dep.node_id, f"dag:{dep.target}"))
+            if dag_node_id not in nodes_dict and len(dependencies) > 0:
+                for dep in dependencies:
+                    if dep.dependency_type == 'dag' or dep.dependency_type == 'dataset':
+                        nodes_dict[dag_node_id] = node_dict(dag_node_id, dag, 'dag')
+                        if dep.node_id not in nodes_dict:
+                            nodes_dict[dep.node_id] = node_dict(
+                                dep.node_id, dep.dependency_id, dep.dependency_type
+                            )
+                        if dep.source != 'dataset':
+                            edge_tuples.add((f"dag:{dep.source}", dep.node_id))
+                        if dep.target != 'dataset':
+                            edge_tuples.add((dep.node_id, f"dag:{dep.target}"))
 
         nodes = list(nodes_dict.values())
-        edges = [{"u": u, "v": v} for u, v in edge_tuples]
+        edges = [{"source": source, "target": target} for source, target in edge_tuples]
 
         data = {
             'nodes': nodes,


[airflow] 33/41: Remove double collection of dags in `airflow dags reserialize` (#27030)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 24912355fb317e5b83f38947f0055c89b09bb6a4
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Oct 13 19:29:00 2022 +0100

    Remove double collection of dags in `airflow dags reserialize`  (#27030)
    
    We explicitly call dagbag.collect_dags after instantiating DagBag in
    airflow dags reserialize code.
    
    The method collect_dags is called on instantiation
    of the DagBag so calling it again means more processing of the same dags.
    
    Here, we use a variable to achieve the same needed effect on reserialization
    
    Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
    (cherry picked from commit 36e2e43def6a27d9bf2cab4d27d104414bea3f7f)
---
 airflow/models/dagbag.py    | 12 +++++++-----
 airflow/utils/db.py         |  4 ++--
 tests/models/test_dagbag.py | 13 +++++++++++++
 3 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index cabc142f31..849498c1e4 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -98,6 +98,7 @@ class DagBag(LoggingMixin):
         read_dags_from_db: bool = False,
         store_serialized_dags: bool | None = None,
         load_op_links: bool = True,
+        collect_dags: bool = True,
     ):
         # Avoid circular import
         from airflow.models.dag import DAG
@@ -137,11 +138,12 @@ class DagBag(LoggingMixin):
 
         self.dagbag_import_error_tracebacks = conf.getboolean('core', 'dagbag_import_error_tracebacks')
         self.dagbag_import_error_traceback_depth = conf.getint('core', 'dagbag_import_error_traceback_depth')
-        self.collect_dags(
-            dag_folder=dag_folder,
-            include_examples=include_examples,
-            safe_mode=safe_mode,
-        )
+        if collect_dags:
+            self.collect_dags(
+                dag_folder=dag_folder,
+                include_examples=include_examples,
+                safe_mode=safe_mode,
+            )
         # Should the extra operator link be loaded via plugins?
         # This flag is set to False in Scheduler so that Extra Operator links are not loaded
         self.load_op_links = load_op_links
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index cec3d58c36..b2ff28f68a 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -819,8 +819,8 @@ def reserialize_dags(*, session: Session = NEW_SESSION) -> None:
     from airflow.models.serialized_dag import SerializedDagModel
 
     session.query(SerializedDagModel).delete(synchronize_session=False)
-    dagbag = DagBag()
-    dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+    dagbag = DagBag(collect_dags=False)
+    dagbag.collect_dags(only_if_updated=False)
     dagbag.sync_to_db(session=session)
 
 
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index ed20d41ef0..f55a31400d 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -1060,3 +1060,16 @@ class TestDagBag:
         dagbag = DagBag(dag_folder=dag_file, include_examples=False)
         assert len(dagbag.dag_ids) == 0
         assert "has no tags" in dagbag.import_errors[dag_file]
+
+    def test_dagbag_dag_collection(self):
+
+        dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False, collect_dags=False)
+        # since collect_dags is False, dagbag.dags should be empty
+        assert not dagbag.dags
+
+        dagbag.collect_dags()
+        assert dagbag.dags
+
+        # test that dagbag.dags is not empty if collect_dags is True
+        dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
+        assert dagbag.dags


[airflow] 13/41: Fix non-hidden cumulative chart on duration view (#26716)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 463900ce89bc5b205aa4bdf04da2325ea042c8f9
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Sep 27 15:54:04 2022 -0700

    Fix non-hidden cumulative chart on duration view (#26716)
    
    When you first load the page, cumulative is unchecked, but the chart itself is unhidden and appears below the non-cumulative chart.  Adding this line ensures that the initial value of the checkbox is respected.
    
    (cherry picked from commit f105343759a30fbab6a0a95617e9df3493b1ed3e)
---
 airflow/www/static/js/duration_chart.js | 1 +
 1 file changed, 1 insertion(+)

diff --git a/airflow/www/static/js/duration_chart.js b/airflow/www/static/js/duration_chart.js
index f5f7376c53..7c3d7cacd0 100644
--- a/airflow/www/static/js/duration_chart.js
+++ b/airflow/www/static/js/duration_chart.js
@@ -29,5 +29,6 @@ function handleCheck() {
   }
 }
 $(document).on('chartload', handleCheck);
+$(document).ready(handleCheck);
 
 $('#isCumulative').on('click', handleCheck);


[airflow] 06/41: Fix airflow tasks run --local when dags_folder differs from that of processor (#26509)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 804abc71e674bbdc52b0ed0187bad21ba3a171d7
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Wed Sep 28 15:56:14 2022 -0700

    Fix airflow tasks run --local when dags_folder differs from that of processor (#26509)
    
    Previously the code used the dags_folder of the "current" process (e.g. the celery worker, or k8s executor worker pod) to calculate the relative fileloc based on the full fileloc stored in the serialized dag.  But if the worker dags_folder folder is different from the dags folder configured on the dag processor, then airflow can't calculate the relative path, so it will just use the full path, which in this case will be a bad path.  We can fix this by keeping track of the dags_folder  [...]
    
    (cherry picked from commit c94f978a66a7cfc31b6d461bbcbfd0f2ddb2962e)
---
 airflow/models/dag.py                         | 11 +++-
 airflow/serialization/schema.json             |  8 ++-
 airflow/serialization/serialized_objects.py   |  4 +-
 airflow/utils/cli.py                          | 45 ++++++++++++--
 tests/cli/commands/test_task_command.py       | 84 ++++++++++++++++++++++++++-
 tests/dags/test_dags_folder.py                | 38 ++++++++++++
 tests/models/test_dag.py                      | 57 +++++++++++++++++-
 tests/serialization/test_dag_serialization.py | 11 +++-
 tests/utils/test_cli_util.py                  | 21 +++++++
 9 files changed, 268 insertions(+), 11 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 60660ce0fb..1a6b9906b5 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -584,6 +584,11 @@ class DAG(LoggingMixin):
                 f"Bad formatted links are: {wrong_links}"
             )
 
+        # this will only be set at serialization time
+        # it's only use is for determining the relative
+        # fileloc based only on the serialize dag
+        self._processor_dags_folder = None
+
     def get_doc_md(self, doc_md: str | None) -> str | None:
         if doc_md is None:
             return doc_md
@@ -1189,7 +1194,11 @@ class DAG(LoggingMixin):
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self._processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json
index ddbedad42c..13e91b33d6 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -133,7 +133,13 @@
         "catchup": { "type": "boolean" },
         "is_subdag": { "type": "boolean" },
         "fileloc": { "type" : "string"},
-        "orientation": { "type" : "string"},
+        "_processor_dags_folder": {
+            "anyOf": [
+                { "type": "null" },
+                {"type": "string"}
+            ]
+        },
+         "orientation": { "type" : "string"},
         "_description": { "type" : "string"},
         "_concurrency": { "type" : "number"},
         "_max_active_tasks": { "type" : "number"},
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 969b6014db..542573fbcc 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -50,7 +50,7 @@ from airflow.providers_manager import ProvidersManager
 from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
 from airflow.serialization.helpers import serialize_template_field
 from airflow.serialization.json_schema import Validator, load_dag_schema
-from airflow.settings import json
+from airflow.settings import DAGS_FOLDER, json
 from airflow.timetables.base import Timetable
 from airflow.utils.code_utils import get_python_source
 from airflow.utils.docs import get_docs_url
@@ -1120,6 +1120,8 @@ class SerializedDAG(DAG, BaseSerialization):
         try:
             serialized_dag = cls.serialize_to_json(dag, cls._decorated_fields)
 
+            serialized_dag['_processor_dags_folder'] = DAGS_FOLDER
+
             # If schedule_interval is backed by timetable, serialize only
             # timetable; vice versa for a timetable backed by schedule_interval.
             if dag.timetable.summary == dag.schedule_interval:
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 37fe0d2702..87313f46f5 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -29,6 +29,7 @@ import traceback
 import warnings
 from argparse import Namespace
 from datetime import datetime
+from pathlib import Path
 from typing import TYPE_CHECKING, Callable, TypeVar, cast
 
 from airflow import settings
@@ -43,6 +44,8 @@ T = TypeVar("T", bound=Callable)
 if TYPE_CHECKING:
     from airflow.models.dag import DAG
 
+logger = logging.getLogger(__name__)
+
 
 def _check_cli_args(args):
     if not args:
@@ -181,15 +184,47 @@ def get_dag_by_file_location(dag_id: str):
     return dagbag.dags[dag_id]
 
 
+def _search_for_dag_file(val: str | None) -> str | None:
+    """
+    Search for the file referenced at fileloc.
+
+    By the time we get to this function, we've already run this `val` through `process_subdir`
+    and loaded the DagBag there and came up empty.  So here, if `val` is a file path, we make
+    a last ditch effort to try and find a dag file with the same name in our dags folder. (This
+    avoids the unnecessary dag parsing that would occur if we just parsed the dags folder).
+
+    If `val` is a path to a file, this likely means that the serializing process had a dags_folder
+    equal to only the dag file in question. This prevents us from determining the relative location.
+    And if the paths are different between worker and dag processor / scheduler, then we won't find
+    the dag at the given location.
+    """
+    if val and Path(val).suffix in ('.zip', '.py'):
+        matches = list(Path(settings.DAGS_FOLDER).rglob(Path(val).name))
+        if len(matches) == 1:
+            return matches[0].as_posix()
+    return None
+
+
 def get_dag(subdir: str | None, dag_id: str) -> DAG:
-    """Returns DAG of a given dag_id"""
+    """
+    Returns DAG of a given dag_id
+
+    First it we'll try to use the given subdir.  If that doesn't work, we'll try to
+    find the correct path (assuming it's a file) and failing that, use the configured
+    dags folder.
+    """
     from airflow.models import DagBag
 
-    dagbag = DagBag(process_subdir(subdir))
+    first_path = process_subdir(subdir)
+    dagbag = DagBag(first_path)
     if dag_id not in dagbag.dags:
-        raise AirflowException(
-            f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
-        )
+        fallback_path = _search_for_dag_file(subdir) or settings.DAGS_FOLDER
+        logger.warning("Dag %r not found in path %s; trying path %s", dag_id, first_path, fallback_path)
+        dagbag = DagBag(dag_folder=fallback_path)
+        if dag_id not in dagbag.dags:
+            raise AirflowException(
+                f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
+            )
     return dagbag.dags[dag_id]
 
 
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index 56a955b0ab..03b9259f8d 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -22,9 +22,10 @@ import json
 import logging
 import os
 import re
+import tempfile
 import unittest
 from argparse import ArgumentParser
-from contextlib import redirect_stdout
+from contextlib import contextmanager, redirect_stdout
 from pathlib import Path
 from unittest import mock
 
@@ -60,6 +61,13 @@ def reset(dag_id):
         runs.delete()
 
 
+@contextmanager
+def move_back(old_path, new_path):
+    os.rename(old_path, new_path)
+    yield
+    os.rename(new_path, old_path)
+
+
 # TODO: Check if tests needs side effects - locally there's missing DAG
 class TestCliTasks:
     run_id = 'TEST_RUN_ID'
@@ -183,6 +191,80 @@ class TestCliTasks:
         )
         mock_get_dag_by_deserialization.assert_called_once_with(self.dag_id)
 
+    def test_cli_test_different_path(self, session):
+        """
+        When thedag processor has a different dags folder
+        from the worker, ``airflow tasks run --local`` should still work.
+        """
+        repo_root = Path(__file__).parent.parent.parent.parent
+        orig_file_path = repo_root / 'tests/dags/test_dags_folder.py'
+        orig_dags_folder = orig_file_path.parent
+
+        # parse dag in original path
+        with conf_vars({('core', 'dags_folder'): orig_dags_folder.as_posix()}):
+            dagbag = DagBag(include_examples=False)
+            dag = dagbag.get_dag('test_dags_folder')
+            dagbag.sync_to_db(session=session)
+
+        dag.create_dagrun(
+            state=State.NONE,
+            run_id='abc123',
+            run_type=DagRunType.MANUAL,
+            execution_date=pendulum.now('UTC'),
+            session=session,
+        )
+        session.commit()
+
+        # now let's move the file
+        # additionally let's update the dags folder to be the new path
+        # ideally since dags_folder points correctly to the file, airflow
+        # should be able to find the dag.
+        with tempfile.TemporaryDirectory() as td:
+            new_file_path = Path(td) / Path(orig_file_path).name
+            new_dags_folder = new_file_path.parent
+            with move_back(orig_file_path, new_file_path), conf_vars(
+                {('core', 'dags_folder'): new_dags_folder.as_posix()}
+            ):
+                ser_dag = (
+                    session.query(SerializedDagModel)
+                    .filter(SerializedDagModel.dag_id == 'test_dags_folder')
+                    .one()
+                )
+                # confirm that the serialized dag location has not been updated
+                assert ser_dag.fileloc == orig_file_path.as_posix()
+                assert ser_dag.data['dag']['_processor_dags_folder'] == orig_dags_folder.as_posix()
+                assert ser_dag.data['dag']['fileloc'] == orig_file_path.as_posix()
+                assert ser_dag.dag._processor_dags_folder == orig_dags_folder.as_posix()
+                from airflow.settings import DAGS_FOLDER
+
+                assert DAGS_FOLDER == new_dags_folder.as_posix() != orig_dags_folder.as_posix()
+                task_command.task_run(
+                    self.parser.parse_args(
+                        [
+                            'tasks',
+                            'run',
+                            '--ignore-all-dependencies',
+                            '--local',
+                            'test_dags_folder',
+                            'task',
+                            'abc123',
+                        ]
+                    )
+                )
+            ti = (
+                session.query(TaskInstance)
+                .filter(
+                    TaskInstance.task_id == 'task',
+                    TaskInstance.dag_id == 'test_dags_folder',
+                    TaskInstance.run_id == 'abc123',
+                    TaskInstance.map_index == -1,
+                )
+                .one()
+            )
+            assert ti.state == 'success'
+            # verify that the file was in different location when run
+            assert ti.xcom_pull(ti.task_id) == new_file_path.as_posix()
+
     @mock.patch("airflow.cli.commands.task_command.get_dag_by_deserialization")
     @mock.patch("airflow.cli.commands.task_command.LocalTaskJob")
     def test_run_get_serialized_dag_fallback(self, mock_local_job, mock_get_dag_by_deserialization):
diff --git a/tests/dags/test_dags_folder.py b/tests/dags/test_dags_folder.py
new file mode 100644
index 0000000000..e4b15a0857
--- /dev/null
+++ b/tests/dags/test_dags_folder.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pendulum
+
+from airflow import DAG
+from airflow.decorators import task
+
+with DAG(
+    dag_id='test_dags_folder',
+    schedule=None,
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    catchup=False,
+) as dag:
+
+    @task(task_id="task")
+    def return_file_path():
+        """Print the Airflow context and ds variable from the context."""
+        print(f"dag file location: {__file__}")
+        return __file__
+
+    return_file_path()
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 62f057c376..54634e463f 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -37,6 +37,7 @@ from dateutil.relativedelta import relativedelta
 from freezegun import freeze_time
 from sqlalchemy import inspect
 
+import airflow
 from airflow import models, settings
 from airflow.configuration import conf
 from airflow.datasets import Dataset
@@ -47,6 +48,7 @@ from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DagOwnerAttributes, dag as dag_decorator, get_dataset_triggered_next_run_info
 from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel, TaskOutletDatasetReference
 from airflow.models.param import DagParam, Param, ParamsDict
+from airflow.models.serialized_dag import SerializedDagModel
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
@@ -65,12 +67,24 @@ from airflow.utils.types import DagRunType
 from airflow.utils.weight_rule import WeightRule
 from tests.models import DEFAULT_DATE
 from tests.test_utils.asserts import assert_queries_count
-from tests.test_utils.db import clear_db_dags, clear_db_datasets, clear_db_runs
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_dags, clear_db_datasets, clear_db_runs, clear_db_serialized_dags
 from tests.test_utils.mapping import expand_mapped_task
 from tests.test_utils.timetables import cron_timetable, delta_timetable
 
 TEST_DATE = datetime_tz(2015, 1, 2, 0, 0)
 
+repo_root = Path(airflow.__file__).parent.parent
+
+
+@pytest.fixture
+def clear_dags():
+    clear_db_dags()
+    clear_db_serialized_dags()
+    yield
+    clear_db_dags()
+    clear_db_serialized_dags()
+
 
 class TestDag:
     def setup_method(self) -> None:
@@ -2273,6 +2287,47 @@ class TestDagModel:
 
         assert dag.relative_fileloc == expected_relative
 
+    @pytest.mark.parametrize(
+        'reader_dags_folder', [settings.DAGS_FOLDER, str(repo_root / 'airflow/example_dags')]
+    )
+    @pytest.mark.parametrize(
+        ('fileloc', 'expected_relative'),
+        [
+            (str(Path(settings.DAGS_FOLDER, 'a.py')), Path('a.py')),
+            ('/tmp/foo.py', Path('/tmp/foo.py')),
+        ],
+    )
+    def test_relative_fileloc_serialized(
+        self, fileloc, expected_relative, session, clear_dags, reader_dags_folder
+    ):
+        """
+        The serialized dag model includes the dags folder as configured on the thing serializing
+        the dag.  On the thing deserializing the dag, when determining relative fileloc,
+        we should use the dags folder of the processor.  So even if the dags folder of
+        the deserializer is different (meaning that the full path is no longer relative to
+        the dags folder) then we should still get the relative fileloc as it existed on the
+        serializer process.  When the full path is not relative to the configured dags folder,
+        then relative fileloc should just be the full path.
+        """
+        dag = DAG(dag_id='test')
+        dag.fileloc = fileloc
+        sdm = SerializedDagModel(dag)
+        session.add(sdm)
+        session.commit()
+        session.expunge_all()
+        sdm = SerializedDagModel.get(dag.dag_id, session)
+        dag = sdm.dag
+        with conf_vars({('core', 'dags_folder'): reader_dags_folder}):
+            assert dag.relative_fileloc == expected_relative
+
+    def test__processor_dags_folder(self, session):
+        """Only populated after deserializtion"""
+        dag = DAG(dag_id='test')
+        dag.fileloc = '/abc/test.py'
+        assert dag._processor_dags_folder is None
+        sdm = SerializedDagModel(dag)
+        assert sdm.dag._processor_dags_folder == settings.DAGS_FOLDER
+
     @pytest.mark.need_serialized_dag
     def test_dags_needing_dagruns_dataset_triggered_dag_info_queued_times(self, session, dag_maker):
         dataset1 = Dataset(uri="ds1")
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 44a218290e..6b485f7465 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -27,6 +27,7 @@ import os
 import pickle
 from datetime import datetime, timedelta
 from glob import glob
+from pathlib import Path
 from unittest import mock
 
 import pendulum
@@ -34,6 +35,7 @@ import pytest
 from dateutil.relativedelta import FR, relativedelta
 from kubernetes.client import models as k8s
 
+import airflow
 from airflow.datasets import Dataset
 from airflow.exceptions import SerializationError
 from airflow.hooks.base import BaseHook
@@ -63,6 +65,8 @@ from tests.test_utils.config import conf_vars
 from tests.test_utils.mock_operators import CustomOperator, GoogleLink, MockOperator
 from tests.test_utils.timetables import CustomSerializationTimetable, cron_timetable, delta_timetable
 
+repo_root = Path(airflow.__file__).parent.parent
+
 
 class CustomDepOperator(BashOperator):
     """
@@ -133,6 +137,7 @@ serialized_simple_dag_ground_truth = {
         "_dag_id": "simple_dag",
         "doc_md": "### DAG Tutorial Documentation",
         "fileloc": None,
+        "_processor_dags_folder": f"{repo_root}/tests/dags",
         "tasks": [
             {
                 "task_id": "bash_task",
@@ -494,13 +499,17 @@ class TestStringifiedDAGs:
             'default_args',
             "_task_group",
             'params',
+            '_processor_dags_folder',
         }
         fields_to_check = dag.get_serialized_fields() - exclusion_list
         for field in fields_to_check:
             assert getattr(serialized_dag, field) == getattr(
                 dag, field
             ), f'{dag.dag_id}.{field} does not match'
-
+        # _processor_dags_folder is only populated at serialization time
+        # it's only used when relying on serialized dag to determine a dag's relative path
+        assert dag._processor_dags_folder is None
+        assert serialized_dag._processor_dags_folder == str(repo_root / 'tests/dags')
         if dag.default_args:
             for k, v in dag.default_args.items():
                 if callable(v):
diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py
index 126f25d90d..e814d6bfd2 100644
--- a/tests/utils/test_cli_util.py
+++ b/tests/utils/test_cli_util.py
@@ -24,14 +24,19 @@ import sys
 from argparse import Namespace
 from contextlib import contextmanager
 from datetime import datetime
+from pathlib import Path
 from unittest import mock
 
 import pytest
 
+import airflow
 from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.models.log import Log
 from airflow.utils import cli, cli_action_loggers, timezone
+from airflow.utils.cli import _search_for_dag_file
+
+repo_root = Path(airflow.__file__).parent.parent
 
 
 class TestCliUtil:
@@ -189,3 +194,19 @@ def fail_func(_):
 @cli.action_cli(check_db=False)
 def success_func(_):
     pass
+
+
+def test__search_for_dags_file():
+    dags_folder = settings.DAGS_FOLDER
+    assert _search_for_dag_file('') is None
+    assert _search_for_dag_file(None) is None
+    # if it's a file, and one can be find in subdir, should return full path
+    assert _search_for_dag_file('any/hi/test_dags_folder.py') == str(
+        Path(dags_folder) / 'test_dags_folder.py'
+    )
+    # if a folder, even if exists, should return dags folder
+    existing_folder = Path(settings.DAGS_FOLDER, 'subdir1')
+    assert existing_folder.exists()
+    assert _search_for_dag_file(existing_folder.as_posix()) is None
+    # when multiple files found, default to the dags folder
+    assert _search_for_dag_file('any/hi/__init__.py') is None


[airflow] 04/41: Revert "No grid auto-refresh for backfill dag runs (#25042)" (#26463)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8e836db4855db681f7539733f6b08fb190d899ed
Author: Tim Sanders <go...@gmail.com>
AuthorDate: Wed Sep 21 12:35:01 2022 -0500

    Revert "No grid auto-refresh for backfill dag runs (#25042)" (#26463)
    
    * Revert "No grid auto-refresh for backfill dag runs (#25042)"
    
    This reverts commit de6938e173773d88bd741e43c7b0aa16d8a1a167.
    
    * Fix Grid unit test
    
    (cherry picked from commit d0b3d59c958f98d3b9856d661c8479284e70bb39)
---
 airflow/www/static/js/api/useGridData.test.ts | 64 +++------------------------
 airflow/www/static/js/api/useGridData.ts      |  2 +-
 2 files changed, 7 insertions(+), 59 deletions(-)

diff --git a/airflow/www/static/js/api/useGridData.test.ts b/airflow/www/static/js/api/useGridData.test.ts
index c5a0e6c9de..43d581f000 100644
--- a/airflow/www/static/js/api/useGridData.test.ts
+++ b/airflow/www/static/js/api/useGridData.test.ts
@@ -27,6 +27,7 @@ const commonDagRunParams = {
   executionDate: '2022-01-01T10:00+00:00',
   dataIntervalStart: '2022-01-01T05:00+00:00',
   dataIntervalEnd: '2022-01-01T10:00+00:00',
+  runType: 'scheduled' as DagRun['runType'],
   startDate: null,
   endDate: null,
   lastSchedulingDecision: null,
@@ -35,75 +36,22 @@ const commonDagRunParams = {
 describe('Test areActiveRuns()', () => {
   test('Correctly detects active runs', () => {
     const runs: DagRun[] = [
-      { runType: 'scheduled', state: 'success', ...commonDagRunParams },
-      { runType: 'manual', state: 'queued', ...commonDagRunParams },
+      { state: 'success', ...commonDagRunParams },
+      { state: 'queued', ...commonDagRunParams },
     ];
     expect(areActiveRuns(runs)).toBe(true);
   });
 
   test('Returns false when all runs are resolved', () => {
     const runs: DagRun[] = [
-      { runType: 'scheduled', state: 'success', ...commonDagRunParams },
-      { runType: 'manual', state: 'failed', ...commonDagRunParams },
-      { runType: 'manual', state: 'failed', ...commonDagRunParams },
+      { state: 'success', ...commonDagRunParams },
+      { state: 'failed', ...commonDagRunParams },
+      { state: 'failed', ...commonDagRunParams },
     ];
     const result = areActiveRuns(runs);
     expect(result).toBe(false);
   });
 
-  test('Returns false when filtering runs runtype ["backfill"]', () => {
-    const runs: DagRun[] = [
-      { runType: 'scheduled', state: 'success', ...commonDagRunParams },
-      { runType: 'manual', state: 'failed', ...commonDagRunParams },
-      { runType: 'backfill', state: 'failed', ...commonDagRunParams },
-    ];
-    const result = areActiveRuns(runs);
-    expect(result).toBe(false);
-  });
-
-  test('Returns false when filtering runs runtype ["backfill"] and state ["queued"]', () => {
-    const runs: DagRun[] = [
-      { runType: 'scheduled', state: 'success', ...commonDagRunParams },
-      { runType: 'manual', state: 'failed', ...commonDagRunParams },
-      { runType: 'backfill', state: 'queued', ...commonDagRunParams },
-    ];
-    const result = areActiveRuns(runs);
-    expect(result).toBe(false);
-  });
-
-  [
-    {
-      runType: 'manual', state: 'queued', expectedResult: true,
-    },
-    {
-      runType: 'manual', state: 'running', expectedResult: true,
-    },
-    {
-      runType: 'scheduled', state: 'queued', expectedResult: true,
-    },
-    {
-      runType: 'scheduled', state: 'running', expectedResult: true,
-    },
-    {
-      runType: 'dataset_triggered', state: 'queued', expectedResult: true,
-    },
-    {
-      runType: 'dataset_triggered', state: 'running', expectedResult: true,
-    },
-    {
-      runType: 'backfill', state: 'queued', expectedResult: false,
-    },
-    {
-      runType: 'backfill', state: 'running', expectedResult: false,
-    },
-  ].forEach(({ state, runType, expectedResult }) => {
-    test(`Returns ${expectedResult} when filtering runs with runtype ["${runType}"] and state ["${state}"]`, () => {
-      const runs: DagRun[] = [{ runType, state, ...commonDagRunParams } as DagRun];
-      const result = areActiveRuns(runs);
-      expect(result).toBe(expectedResult);
-    });
-  });
-
   test('Returns false when there are no runs', () => {
     const result = areActiveRuns();
     expect(result).toBe(false);
diff --git a/airflow/www/static/js/api/useGridData.ts b/airflow/www/static/js/api/useGridData.ts
index f9a5737c5d..737167552c 100644
--- a/airflow/www/static/js/api/useGridData.ts
+++ b/airflow/www/static/js/api/useGridData.ts
@@ -57,7 +57,7 @@ const formatOrdering = (data: GridData) => ({
   ordering: data.ordering.map((o: string) => camelCase(o)) as RunOrdering,
 });
 
-export const areActiveRuns = (runs: DagRun[] = []) => runs.filter((run) => ['manual', 'scheduled', 'dataset_triggered'].includes(run.runType)).filter((run) => ['queued', 'running'].includes(run.state)).length > 0;
+export const areActiveRuns = (runs: DagRun[] = []) => runs.filter((run) => ['queued', 'running'].includes(run.state)).length > 0;
 
 const useGridData = () => {
   const { isRefreshOn, stopRefresh } = useAutoRefresh();


[airflow] 40/41: Don't re-patch pods that are already controlled by current worker (#26778)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c1daf84a751d751a8e825acbb5bde009fb427638
Author: HTErik <89...@users.noreply.github.com>
AuthorDate: Tue Oct 18 14:54:42 2022 +0200

    Don't re-patch pods that are already controlled by current worker (#26778)
    
    After the scheduler has launched many pods, it keeps trying to
    re-adopt them by patching every pod. Each patch-operation
    involves a remote API-call which can be be very slow.
    In the meantime the scheduler can not do anything else.
    
    By ignoring the pods that already have the expected label,
    the list query-result will be shorter and the number of
    patch-queries much less.
    
    We had an unlucky moment in our environment, where each
    patch-operation started taking 100ms each, with 200 pods in
    flight it accumulates into 20 seconds of blocked scheduler.
    
    (cherry picked from commit 27ec5620b3d1b93ae68a98e9253e243c6cf70d71)
---
 airflow/executors/kubernetes_executor.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index a718aa1907..b1822872ed 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -754,14 +754,15 @@ class KubernetesExecutor(BaseExecutor):
         """
         if not self.scheduler_job_id:
             raise AirflowException(NOT_STARTED_MESSAGE)
+        new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id)
         kwargs = {
             'field_selector': "status.phase=Succeeded",
-            'label_selector': 'kubernetes_executor=True',
+            'label_selector': f'kubernetes_executor=True,airflow-worker!={new_worker_id_label}',
         }
         pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
         for pod in pod_list.items:
             self.log.info("Attempting to adopt pod %s", pod.metadata.name)
-            pod.metadata.labels['airflow-worker'] = pod_generator.make_safe_label_value(self.scheduler_job_id)
+            pod.metadata.labels['airflow-worker'] = new_worker_id_label
             try:
                 kube_client.patch_namespaced_pod(
                     name=pod.metadata.name,


[airflow] 11/41: make consistency on markup title string level (#26696)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 357549c84bd10e121246d4dc4493448a3be9eb38
Author: Changhoon Oh <81...@users.noreply.github.com>
AuthorDate: Sun Oct 2 11:59:23 2022 +0900

    make consistency on markup title string level (#26696)
    
    Co-authored-by: ok9897 <ok...@krafton.com>
    (cherry picked from commit f977804255ca123bfea24774328ba0a9ca63688b)
---
 docs/apache-airflow/concepts/cluster-policies.rst | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/concepts/cluster-policies.rst b/docs/apache-airflow/concepts/cluster-policies.rst
index 0f5f978a64..a8cee72003 100644
--- a/docs/apache-airflow/concepts/cluster-policies.rst
+++ b/docs/apache-airflow/concepts/cluster-policies.rst
@@ -59,7 +59,7 @@ This policy checks if each DAG has at least one tag defined:
     DAG policies are applied after the DAG has been completely loaded, so overriding the ``default_args`` parameter has no effect. If you want to override the default operator settings, use task policies instead.
 
 Task policies
--------------
+~~~~~~~~~~~~~
 
 Here's an example of enforcing a maximum timeout policy on every task:
 
@@ -86,7 +86,7 @@ For example, your ``airflow_local_settings.py`` might follow this pattern:
 
 
 Task instance mutation
-----------------------
+~~~~~~~~~~~~~~~~~~~~~~
 
 Here's an example of re-routing tasks that are on their second (or greater) retry to a different queue:
 


[airflow] 24/41: Move user-facing string to template (#26815)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9bcf5b342e44ff5b749f9ff4318d61546df4428e
Author: blag <bl...@users.noreply.github.com>
AuthorDate: Wed Oct 5 23:04:34 2022 -0700

    Move user-facing string to template (#26815)
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    (cherry picked from commit bab6dbec3883084e5872123b515c2a8491c32380)
---
 airflow/models/dag.py                   |  9 ++++++---
 airflow/www/templates/airflow/dags.html |  4 +++-
 tests/models/test_dag.py                | 15 ++++++++++++---
 3 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 1a6b9906b5..6b5bdde6de 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -198,13 +198,16 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
-def get_dataset_triggered_next_run_info(dag_ids: list[str], *, session: Session) -> dict[str, str]:
+def get_dataset_triggered_next_run_info(dag_ids: list[str], *, session: Session) -> dict[str, dict[str, int]]:
     """
     Given a list of dag_ids, get string representing how close any that are dataset triggered are
     their next run, e.g. "1 of 2 datasets updated"
     """
     return {
-        x.dag_id: f"{x.ready} of {x.total} datasets updated"
+        x.dag_id: {
+            "ready": x.ready,
+            "total": x.total,
+        }
         for x in session.query(
             DagScheduleDatasetReference.dag_id,
             func.count().label("total"),
@@ -3338,7 +3341,7 @@ class DagModel(Base):
         )
 
     @provide_session
-    def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> str | None:
+    def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int] | None:
         if self.schedule_interval != "Dataset":
             return None
         return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id]
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 0884a06c17..c675b4a05a 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -308,7 +308,9 @@
                     data-dag-id="{{ dag.dag_id }}"
                     data-summary="{{ dataset_triggered_next_run_info[dag.dag_id] }}"
                   >
-                    {{ dataset_triggered_next_run_info[dag.dag_id] }}
+                    {% with ds_info = dataset_triggered_next_run_info[dag.dag_id] %}
+                    {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
+                    {% endwith %}
                   </div>
                 </span>
               {% endif %}
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 54634e463f..775f557094 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2953,12 +2953,21 @@ def test_get_dataset_triggered_next_run_info(dag_maker):
     session.flush()
 
     info = get_dataset_triggered_next_run_info([dag1.dag_id], session=session)
-    assert "0 of 1 datasets updated" == info[dag1.dag_id]
+    assert info[dag1.dag_id] == {
+        "ready": 0,
+        "total": 1,
+    }
 
     # This time, check both dag2 and dag3 at the same time (tests filtering)
     info = get_dataset_triggered_next_run_info([dag2.dag_id, dag3.dag_id], session=session)
-    assert "1 of 2 datasets updated" == info[dag2.dag_id]
-    assert "1 of 3 datasets updated" == info[dag3.dag_id]
+    assert info[dag2.dag_id] == {
+        "ready": 1,
+        "total": 2,
+    }
+    assert info[dag3.dag_id] == {
+        "ready": 1,
+        "total": 3,
+    }
 
 
 def test_dag_uses_timetable_for_run_id(session):


[airflow] 22/41: Fix warning when using xcomarg dependencies (#26801)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 56f1e473e7c8b9b2fbb2c0f72c6b06596bdb07e3
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Fri Sep 30 17:03:07 2022 +0100

    Fix warning when using xcomarg dependencies (#26801)
    
    This warning was invisible before 2.4 due to a bug in our logging config
    (fixed by commit 7363e35) and AIP-45 which suddenly made this appear.
    
    The problem was being caused by set_xcomargs_dependencies being called
    once for each class in the hierarchy, and each of them doing the same
    logic.
    
    The fix is to look at the _actual_ function of `self.__init__` and
    compare it to the function we're about to call so that we don't set
    dependencies until we have finished the "outer" most class's
    apply_defaults invocation.
    
    (cherry picked from commit d77f0563b403ae9e1a92e8e9e998a1142bb6f359)
---
 airflow/models/baseoperator.py  |  5 +++--
 tests/decorators/test_python.py | 15 +++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 3a95a3945c..8bc96df735 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -410,8 +410,9 @@ class BaseOperatorMeta(abc.ABCMeta):
             # Store the args passed to init -- we need them to support task.map serialzation!
             self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
 
-            if not instantiated_from_mapped:
-                # Set upstream task defined by XComArgs passed to template fields of the operator.
+            # Set upstream task defined by XComArgs passed to template fields of the operator.
+            # BUT: only do this _ONCE_, not once for each class in the hierarchy
+            if not instantiated_from_mapped and func == self.__init__.__wrapped__:  # type: ignore[misc]
                 self.set_xcomargs_dependencies()
                 # Mark instance as instantiated.
                 self._BaseOperator__instantiated = True
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 3dc1516682..876c4b23d4 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -817,3 +817,18 @@ def test_upstream_exception_produces_none_xcom(dag_maker, session):
     assert len(decision.schedulable_tis) == 1  # "down"
     decision.schedulable_tis[0].run(session=session)
     assert result == "'example' None"
+
+
+@pytest.mark.filterwarnings("error")
+def test_no_warnings(reset_logging_config, caplog):
+    @task_decorator
+    def some_task():
+        return 1
+
+    @task_decorator
+    def other(x):
+        ...
+
+    with DAG(dag_id='test', start_date=DEFAULT_DATE, schedule=None):
+        other(some_task())
+    assert caplog.messages == []


[airflow] 01/41: Retry on Airflow Schedule DAG Run DB Deadlock (#26347)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b7afa6725369f86cabdebc5e191a7ce96ceb1958
Author: Anthony Panat <ap...@ycharts.com>
AuthorDate: Sat Oct 1 23:33:59 2022 -0400

    Retry on Airflow Schedule DAG Run DB Deadlock (#26347)
    
    Co-authored-by: Anthony Panat <an...@Anthonys-MacBook-Pro-2.local>
    Co-authored-by: Anthony Panat <an...@anthonys-mbp-2.mynetworksettings.com>
    (cherry picked from commit 0da49935000476b1d1941b63d0d66d3c58d64fea)
---
 airflow/jobs/scheduler_job.py | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 53a96cf9ac..74541a1cf8 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -943,12 +943,7 @@ class SchedulerJob(BaseJob):
             # Bulk fetch the currently active dag runs for the dags we are
             # examining, rather than making one query per DagRun
 
-            callback_tuples = []
-            for dag_run in dag_runs:
-                callback_to_run = self._schedule_dag_run(dag_run, session)
-                callback_tuples.append((dag_run, callback_to_run))
-
-            guard.commit()
+            callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
 
         # Send the callbacks after we commit to ensure the context is up to date when it gets run
         for dag_run, callback_to_run in callback_tuples:
@@ -1232,6 +1227,18 @@ class SchedulerJob(BaseJob):
                 active_runs_of_dags[dag_run.dag_id] += 1
                 _update_state(dag, dag_run)
 
+    @retry_db_transaction
+    def _schedule_all_dag_runs(self, guard, dag_runs, session):
+        """Makes scheduling decisions for all `dag_runs`"""
+        callback_tuples = []
+        for dag_run in dag_runs:
+            callback_to_run = self._schedule_dag_run(dag_run, session)
+            callback_tuples.append((dag_run, callback_to_run))
+
+        guard.commit()
+
+        return callback_tuples
+
     def _schedule_dag_run(
         self,
         dag_run: DagRun,


[airflow] 37/41: Fix task duration cumulative chart (#26717)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ecd943915fdd41febdfde760fb28f7c3e29f994d
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Sep 27 15:54:59 2022 -0700

    Fix task duration cumulative chart (#26717)
    
    Previously the chart could go down, which should not happen for a cumulative chart.  We fix that here.
    
    Also, we add task fail durations to the non-cumulative chart for consistency.
    
    (cherry picked from commit afa8e10df16a3806077c471818053b38076e37fb)
---
 airflow/www/views.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index ea05b1e0c7..ce322d7a62 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2933,10 +2933,11 @@ class Airflow(AirflowBaseView):
             if task_instance.duration:
                 date_time = wwwutils.epoch(task_instance.execution_date)
                 x_points[task_instance.task_id].append(date_time)
-                y_points[task_instance.task_id].append(float(task_instance.duration))
                 fails_dict_key = (task_instance.dag_id, task_instance.task_id, task_instance.run_id)
                 fails_total = fails_totals[fails_dict_key]
-                cumulative_y[task_instance.task_id].append(float(task_instance.duration + fails_total))
+                y_points[task_instance.task_id].append(float(task_instance.duration + fails_total))
+
+        cumulative_y = {k: list(itertools.accumulate(v)) for k, v in y_points.items()}
 
         # determine the most relevant time unit for the set of task instance
         # durations for the DAG


[airflow] 36/41: Avoid 500 on dag redirect (#27064)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 34052dbbee571ff03e256739b0251b01816d6095
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Oct 14 12:34:46 2022 -0700

    Avoid 500 on dag redirect (#27064)
    
    (cherry picked from commit c550bbf7ffe36cafae75f01f1a6f60b169ceb111)
---
 airflow/www/views.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index dd6c9226af..ea05b1e0c7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2489,7 +2489,8 @@ class Airflow(AirflowBaseView):
     @action_logging
     def dag(self, dag_id):
         """Redirect to default DAG view."""
-        return redirect(url_for('Airflow.grid', dag_id=dag_id, **request.args))
+        kwargs = {**request.args, "dag_id": dag_id}
+        return redirect(url_for('Airflow.grid', **kwargs))
 
     @expose('/legacy_tree')
     @auth.has_access(


[airflow] 41/41: Add separate error handler for 405(Method not allowed) errors (#26880)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5bf28504902ff9eded740ff9b7f9da98ac404f5a
Author: Dakshin K <36...@users.noreply.github.com>
AuthorDate: Tue Oct 18 18:20:13 2022 +0530

    Add separate error handler for 405(Method not allowed) errors (#26880)
    
    Co-authored-by: Dakshin K <da...@gmail.com>
    (cherry picked from commit 8efb678e771c8b7e351220a1eb7eb246ae8ed97f)
---
 airflow/www/extensions/init_views.py               | 10 ++++-
 .../airflow/{not_found.html => error.html}         |  6 +--
 airflow/www/views.py                               | 19 ++++++++-
 tests/api_connexion/test_error_handling.py         | 47 ++++++++++++++++++----
 4 files changed, 69 insertions(+), 13 deletions(-)

diff --git a/airflow/www/extensions/init_views.py b/airflow/www/extensions/init_views.py
index fcb2a87265..cad715085b 100644
--- a/airflow/www/extensions/init_views.py
+++ b/airflow/www/extensions/init_views.py
@@ -188,8 +188,7 @@ def init_api_connexion(app: Flask) -> None:
     from airflow.www import views
 
     @app.errorhandler(404)
-    @app.errorhandler(405)
-    def _handle_api_error(ex):
+    def _handle_api_not_found(ex):
         if request.path.startswith(base_path):
             # 404 errors are never handled on the blueprint level
             # unless raised from a view func so actual 404 errors,
@@ -199,6 +198,13 @@ def init_api_connexion(app: Flask) -> None:
         else:
             return views.not_found(ex)
 
+    @app.errorhandler(405)
+    def _handle_method_not_allowed(ex):
+        if request.path.startswith(base_path):
+            return common_error_handler(ex)
+        else:
+            return views.method_not_allowed(ex)
+
     spec_dir = path.join(ROOT_APP_DIR, 'api_connexion', 'openapi')
     connexion_app = App(__name__, specification_dir=spec_dir, skip_error_handlers=True)
     connexion_app.app = app
diff --git a/airflow/www/templates/airflow/not_found.html b/airflow/www/templates/airflow/error.html
similarity index 91%
rename from airflow/www/templates/airflow/not_found.html
rename to airflow/www/templates/airflow/error.html
index e9e1231e3a..cfb38a25b5 100644
--- a/airflow/www/templates/airflow/not_found.html
+++ b/airflow/www/templates/airflow/error.html
@@ -20,14 +20,14 @@
 <!DOCTYPE html>
 <html lang="en">
   <head>
-    <title>Airflow 404</title>
+    <title>Airflow {{ status_code }}</title>
     <link rel="icon" type="image/png" href="{{ url_for('static', filename='pin_32.png') }}">
   </head>
   <body>
     <div style="font-family: verdana; text-align: center; margin-top: 200px;">
       <img src="{{ url_for('static', filename='pin_100.png') }}" width="50px" alt="pin-logo" />
-      <h1>Airflow 404</h1>
-      <p>Page cannot be found.</p>
+      <h1>Airflow {{ status_code }}</h1>
+      <p>{{ error_message }}</p>
       <a href="/">Return to the main page</a>
       <p>{{ hostname }}</p>
     </div>
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 8a45a15444..e27ad84c9f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -482,15 +482,32 @@ def not_found(error):
     """Show Not Found on screen for any error in the Webserver"""
     return (
         render_template(
-            'airflow/not_found.html',
+            'airflow/error.html',
             hostname=get_hostname()
             if conf.getboolean('webserver', 'EXPOSE_HOSTNAME', fallback=True)
             else 'redact',
+            status_code=404,
+            error_message='Page cannot be found.',
         ),
         404,
     )
 
 
+def method_not_allowed(error):
+    """Show Method Not Allowed on screen for any error in the Webserver"""
+    return (
+        render_template(
+            'airflow/error.html',
+            hostname=get_hostname()
+            if conf.getboolean('webserver', 'EXPOSE_HOSTNAME', fallback=True)
+            else 'redact',
+            status_code=405,
+            error_message='Received an invalid request.',
+        ),
+        405,
+    )
+
+
 def show_traceback(error):
     """Show Traceback for a given error"""
     return (
diff --git a/tests/api_connexion/test_error_handling.py b/tests/api_connexion/test_error_handling.py
index 8793e7c150..86effad88c 100644
--- a/tests/api_connexion/test_error_handling.py
+++ b/tests/api_connexion/test_error_handling.py
@@ -21,22 +21,55 @@ def test_incorrect_endpoint_should_return_json(minimal_app_for_api):
     client = minimal_app_for_api.test_client()
 
     # Given we have application with Connexion added
-    # When we hitting incorrect endpoint in API path
+    # When we are hitting incorrect endpoint in API path
 
-    resp_json = client.get("/api/v1/incorrect_endpoint").json
+    resp = client.get("/api/v1/incorrect_endpoint")
 
     # Then we have parsable JSON as output
 
-    assert 404 == resp_json["status"]
+    assert 'Not Found' == resp.json["title"]
+    assert 404 == resp.json["status"]
+    assert 404 == resp.status_code
+
+
+def test_incorrect_endpoint_should_return_html(minimal_app_for_api):
+    client = minimal_app_for_api.test_client()
 
     # When we are hitting non-api incorrect endpoint
 
-    resp_json = client.get("/incorrect_endpoint").json
+    resp = client.get("/incorrect_endpoint")
 
     # Then we do not have JSON as response, rather standard HTML
 
-    assert resp_json is None
+    assert resp.json is None
+    assert resp.mimetype == 'text/html'
+    assert resp.status_code == 404
+
+
+def test_incorrect_method_should_return_json(minimal_app_for_api):
+    client = minimal_app_for_api.test_client()
+
+    # Given we have application with Connexion added
+    # When we are hitting incorrect HTTP method in API path
+
+    resp = client.put("/api/v1/version")
 
-    resp_json = client.put("/api/v1/variables").json
+    # Then we have parsable JSON as output
+
+    assert 'Method Not Allowed' == resp.json["title"]
+    assert 405 == resp.json["status"]
+    assert 405 == resp.status_code
+
+
+def test_incorrect_method_should_return_html(minimal_app_for_api):
+    client = minimal_app_for_api.test_client()
+
+    # When we are hitting non-api incorrect HTTP method
+
+    resp = client.put("/")
+
+    # Then we do not have JSON as response, rather standard HTML
 
-    assert 'Method Not Allowed' == resp_json["title"]
+    assert resp.json is None
+    assert resp.mimetype == 'text/html'
+    assert resp.status_code == 405


[airflow] 20/41: A few docs fixups (#26788)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 332959208ffe9c7043d4756095e34f833ed245e9
Author: blag <bl...@users.noreply.github.com>
AuthorDate: Sat Oct 1 15:16:08 2022 -0700

    A few docs fixups (#26788)
    
    (cherry picked from commit 674f9ce6eaae533cfe31bc92cc92fa75ed7223fc)
---
 airflow/providers/microsoft/winrm/hooks/winrm.py       | 4 ++--
 airflow/providers/ssh/hooks/ssh.py                     | 4 ++--
 docs/apache-airflow-providers-common-sql/operators.rst | 2 ++
 docs/apache-airflow/executor/local.rst                 | 2 +-
 4 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/microsoft/winrm/hooks/winrm.py b/airflow/providers/microsoft/winrm/hooks/winrm.py
index ee11dbac9c..23d31af59e 100644
--- a/airflow/providers/microsoft/winrm/hooks/winrm.py
+++ b/airflow/providers/microsoft/winrm/hooks/winrm.py
@@ -40,8 +40,8 @@ class WinRMHook(BaseHook):
     :seealso: https://github.com/diyan/pywinrm/blob/master/winrm/protocol.py
 
     :param ssh_conn_id: connection id from airflow Connections from where
-        all the required parameters can be fetched like username and password.
-        Thought the priority is given to the param passed during init
+        all the required parameters can be fetched like username and password,
+        though priority is given to the params passed during init.
     :param endpoint: When not set, endpoint will be constructed like this:
         'http://{remote_host}:{remote_port}/wsman'
     :param remote_host: Remote host to connect to. Ignored if `endpoint` is set.
diff --git a/airflow/providers/ssh/hooks/ssh.py b/airflow/providers/ssh/hooks/ssh.py
index 17545b1755..5b4170aeca 100644
--- a/airflow/providers/ssh/hooks/ssh.py
+++ b/airflow/providers/ssh/hooks/ssh.py
@@ -48,8 +48,8 @@ class SSHHook(BaseHook):
 
     :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>` from airflow
         Connections from where all the required parameters can be fetched like
-        username, password or key_file. Thought the priority is given to the
-        param passed during init
+        username, password or key_file, though priority is given to the
+        params passed during init.
     :param remote_host: remote host to connect
     :param username: username to connect to the remote_host
     :param password: password of the username to connect to the remote_host
diff --git a/docs/apache-airflow-providers-common-sql/operators.rst b/docs/apache-airflow-providers-common-sql/operators.rst
index 039e10b966..402197fb52 100644
--- a/docs/apache-airflow-providers-common-sql/operators.rst
+++ b/docs/apache-airflow-providers-common-sql/operators.rst
@@ -49,6 +49,7 @@ mapping is a set of three nested dictionaries and looks like:
 
 Where col_name is the name of the column to run checks on, and each entry in its dictionary is a check.
 The valid checks are:
+
 - null_check: checks the number of NULL values in the column
 - distinct_check: checks the COUNT of values in the column that are distinct
 - unique_check: checks the number of distinct values in a column against the number of rows
@@ -57,6 +58,7 @@ The valid checks are:
 
 Each entry in the check's dictionary is either a condition for success of the check or the tolerance. The
 conditions for success are:
+
 - greater_than
 - geq_to
 - less_than
diff --git a/docs/apache-airflow/executor/local.rst b/docs/apache-airflow/executor/local.rst
index 6721cf93f1..256c612553 100644
--- a/docs/apache-airflow/executor/local.rst
+++ b/docs/apache-airflow/executor/local.rst
@@ -43,7 +43,7 @@ The following strategies are implemented:
   | LocalExecutor receives the call to shutdown the executor a poison token is sent to the
   | workers to terminate them. Processes used in this strategy are of class :class:`~airflow.executors.local_executor.QueuedLocalWorker`.
 
-Arguably, :class:`~airflow.executors.sequential_executor.SequentialExecutor` could be thought as a ``LocalExecutor`` with limited
+Arguably, :class:`~airflow.executors.sequential_executor.SequentialExecutor` could be thought of as a ``LocalExecutor`` with limited
 parallelism of just 1 worker, i.e. ``self.parallelism = 1``.
 This option could lead to the unification of the executor implementations, running
 locally, into just one :class:`~airflow.executors.local_executor.LocalExecutor` with multiple modes.


[airflow] 21/41: demote Removed state in priority for displaying task summaries (#26789)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a1aef37cae453c5acf363b94e5b38cea4d098ba3
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Fri Sep 30 11:52:28 2022 -0400

    demote Removed state in priority for displaying task summaries (#26789)
    
    (cherry picked from commit 14b38d714c747604425568059c4fc8ac9bb2dc16)
---
 airflow/www/static/js/graph.js           | 8 +-------
 airflow/www/templates/airflow/graph.html | 1 +
 airflow/www/utils.py                     | 2 +-
 airflow/www/views.py                     | 3 +++
 4 files changed, 6 insertions(+), 8 deletions(-)

diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index 4e32da005b..6e34cc2829 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -21,7 +21,7 @@
 
 /*
   global d3, document, nodes, taskInstances, tasks, edges, dagreD3, localStorage, $,
-  autoRefreshInterval, moment, convertSecsToHumanReadable
+  autoRefreshInterval, moment, convertSecsToHumanReadable, priority
 */
 
 import { getMetaValue, finalStatesMap } from './utils';
@@ -600,12 +600,6 @@ function getNodeState(nodeId, tis) {
     }
   });
 
-  // In this order, if any of these states appeared in childrenStates, return it as
-  // the group state.
-  const priority = ['failed', 'upstream_failed', 'up_for_retry', 'up_for_reschedule',
-    'queued', 'scheduled', 'running', 'shutdown', 'restarting', 'removed',
-    'no_status', 'success', 'skipped'];
-
   return priority.find((state) => childrenStates.has(state)) || 'no_status';
 }
 
diff --git a/airflow/www/templates/airflow/graph.html b/airflow/www/templates/airflow/graph.html
index 975eb98f84..c4e28c32fb 100644
--- a/airflow/www/templates/airflow/graph.html
+++ b/airflow/www/templates/airflow/graph.html
@@ -136,6 +136,7 @@
     const tasks = {{ tasks|tojson }};
     let taskInstances = {{ task_instances|tojson }};
     const autoRefreshInterval = {{ auto_refresh_interval }};
+    const priority = {{ state_priority|tojson }};
   </script>
   <script src="{{ url_for_asset('d3.min.js') }}"></script>
   <script src="{{ url_for_asset('dagre-d3.min.js') }}"></script>
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 0aaaf2b26e..453c38ce49 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -93,10 +93,10 @@ priority = [
     TaskInstanceState.RUNNING,
     TaskInstanceState.SHUTDOWN,
     TaskInstanceState.RESTARTING,
-    TaskInstanceState.REMOVED,
     None,
     TaskInstanceState.SUCCESS,
     TaskInstanceState.SKIPPED,
+    TaskInstanceState.REMOVED,
 ]
 
 
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 3fe94e0fd9..a2b0d1e76d 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2808,6 +2808,8 @@ class Airflow(AirflowBaseView):
         else:
             external_log_name = None
 
+        state_priority = ['no_status' if p is None else p for p in wwwutils.priority]
+
         return self.render_template(
             'airflow/graph.html',
             dag=dag,
@@ -2830,6 +2832,7 @@ class Airflow(AirflowBaseView):
             dag_run_state=dt_nr_dr_data['dr_state'],
             dag_model=dag_model,
             auto_refresh_interval=conf.getint('webserver', 'auto_refresh_interval'),
+            state_priority=state_priority,
         )
 
     @expose('/duration')


[airflow] 17/41: Add restarting state to TaskState Enum in REST API (#26776)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a19783e674d03673afb3a1fe01393af76d916716
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Fri Sep 30 00:28:12 2022 +0100

    Add restarting state to TaskState Enum in REST API (#26776)
    
    (cherry picked from commit af368243f87dfb5a4bc98a571d7b4775186d214c)
---
 airflow/api_connexion/openapi/v1.yaml        | 2 ++
 airflow/www/static/js/types/api-generated.ts | 4 +++-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index f6ec246d90..9967bf1ed1 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -4224,6 +4224,7 @@ components:
         *Changed in version 2.2.0*&#58; 'deferred' is added as a possible value.
 
         *Changed in version 2.4.0*&#58; 'sensing' state has been removed.
+        *Changed in version 2.4.2*&#58; 'restarting' is added as a possible value
       type: string
       enum:
         - success
@@ -4238,6 +4239,7 @@ components:
         - scheduled
         - deferred
         - removed
+        - restarting
 
     DagState:
       description: |
diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts
index adeec4e6cb..da7fc40783 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1928,6 +1928,7 @@ export interface components {
      * *Changed in version 2.2.0*&#58; 'deferred' is added as a possible value.
      *
      * *Changed in version 2.4.0*&#58; 'sensing' state has been removed.
+     * *Changed in version 2.4.2*&#58; 'restarting' is added as a possible value
      *
      * @enum {string}
      */
@@ -1943,7 +1944,8 @@ export interface components {
       | "none"
       | "scheduled"
       | "deferred"
-      | "removed";
+      | "removed"
+      | "restarting";
     /**
      * @description DAG State.
      *


[airflow] 27/41: Fixed triple quotes in task group example (#26829)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2cc75dac8cb645749a836e84f0f66279bdfb2d9f
Author: Mati O <ma...@users.noreply.github.com>
AuthorDate: Sun Oct 2 19:38:17 2022 +0300

    Fixed triple quotes in task group example (#26829)
    
    (cherry picked from commit 6ddb6467e93cf927ec17104f8684b90b8bf7bb26)
---
 docs/apache-airflow/concepts/dags.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index 541a2adad5..aa73c66992 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -513,7 +513,7 @@ TaskGroup also supports ``default_args`` like DAG, it will overwrite the ``defau
     ):
         @task_group(default_args={'retries': 3}):
         def group1():
-            """This docstring will become the tooltip for the TaskGroup."
+            """This docstring will become the tooltip for the TaskGroup."""
             task1 = EmptyOperator(task_id='task1')
             task2 = BashOperator(task_id='task2', bash_command='echo Hello World!', retries=2)
             print(task1.retries) # 3


[airflow] 02/41: Add last_updated_at and total_updates to datasets list view (#26358)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 48ce0eaca286f80e1c7b01ee957b877cfa89f27f
Author: blag <bl...@users.noreply.github.com>
AuthorDate: Wed Oct 5 06:51:09 2022 -0700

    Add last_updated_at and total_updates to datasets list view (#26358)
    
    * wip
    
    * add update info to datasets list in UI
    
    * wip
    
    * wip
    
    * Cleanup
    
    * Fixups and cleanups
    
    * Add support for MySQL sorting by null first/last
    
    * Add support for MSSQL sorting by null first/last
    
    * Remove unnecessary extras
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    
    * Clear dataset-related objects before and after tests
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    
    * Revert using single letters for datasets
    
    * Remove self.default_time and its uses
    
    * Betterize fixture creation
    
    * Remove comment
    
    * Add __future__.annotations import
    
    * Black formatting
    
    * Sort nulls last, using normal SQL rules
    
    * Fixup: code and tests
    
    * Add missing import
    
    * Expand tests to try to get more dataset events than correct
    
    * Fix aggregate counts for datasets with multiple producing task or multiple consuming dags
    
    * Fix COUNT DISTINCT for MySQL and MSSQL
    
    * change last update to a sortable column
    
    * server-side sorting
    
    * Entirely remove producing_task_count and consuming_dag_count from the query and response
    
    * Remove some now-unnecessary outer joins
    
    * Rename endpoint to the more accurate datasets_summary
    
    * Rename dataset summary function to match endpoint
    
    * fixup
    
    Co-authored-by: Brent Bovenzi <br...@gmail.com>
    
    Co-authored-by: Jed Cunningham <je...@apache.org>
    Co-authored-by: Brent Bovenzi <br...@gmail.com>
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    (cherry picked from commit d600cbd5fe9f0cd7101ff572190a8e5e64110e22)
---
 airflow/www/static/js/api/useDataset.ts     |   8 +-
 airflow/www/static/js/api/useDatasets.ts    |   6 +-
 airflow/www/static/js/datasets/List.tsx     |  47 +++-
 airflow/www/static/js/types/index.ts        |   6 +
 airflow/www/templates/airflow/datasets.html |   3 +-
 airflow/www/views.py                        |  76 ++++++-
 tests/www/views/test_views_dataset.py       | 332 ++++++++++++++++++++++++++++
 7 files changed, 464 insertions(+), 14 deletions(-)

diff --git a/airflow/www/static/js/api/useDataset.ts b/airflow/www/static/js/api/useDataset.ts
index 900bdbc2d1..4633b47242 100644
--- a/airflow/www/static/js/api/useDataset.ts
+++ b/airflow/www/static/js/api/useDataset.ts
@@ -23,11 +23,15 @@ import { useQuery } from 'react-query';
 import { getMetaValue } from 'src/utils';
 import type { API } from 'src/types';
 
-export default function useDataset({ uri }: API.GetDatasetVariables) {
+interface Props {
+  uri: string;
+}
+
+export default function useDataset({ uri }: Props) {
   return useQuery(
     ['dataset', uri],
     () => {
-      const datasetUrl = `${getMetaValue('datasets_api') || '/api/v1/datasets'}/${encodeURIComponent(uri)}`;
+      const datasetUrl = getMetaValue('dataset_api').replace('__URI__', encodeURIComponent(uri));
       return axios.get<AxiosResponse, API.Dataset>(datasetUrl);
     },
   );
diff --git a/airflow/www/static/js/api/useDatasets.ts b/airflow/www/static/js/api/useDatasets.ts
index 34e5bd93ec..ae92ffacbd 100644
--- a/airflow/www/static/js/api/useDatasets.ts
+++ b/airflow/www/static/js/api/useDatasets.ts
@@ -21,10 +21,10 @@ import axios, { AxiosResponse } from 'axios';
 import { useQuery } from 'react-query';
 
 import { getMetaValue } from 'src/utils';
-import type { API } from 'src/types';
+import type { DatasetListItem } from 'src/types';
 
 interface DatasetsData {
-  datasets: API.Dataset[];
+  datasets: DatasetListItem[];
   totalEntries: number;
 }
 
@@ -41,7 +41,7 @@ export default function useDatasets({
   const query = useQuery(
     ['datasets', limit, offset, order, uri],
     () => {
-      const datasetsUrl = getMetaValue('datasets_api') || '/api/v1/datasets';
+      const datasetsUrl = getMetaValue('datasets_api');
       const orderParam = order ? { order_by: order } : {};
       const uriParam = uri ? { uri_pattern: uri } : {};
       return axios.get<AxiosResponse, DatasetsData>(
diff --git a/airflow/www/static/js/datasets/List.tsx b/airflow/www/static/js/datasets/List.tsx
index 8e38cbefbd..8ac1bb7622 100644
--- a/airflow/www/static/js/datasets/List.tsx
+++ b/airflow/www/static/js/datasets/List.tsx
@@ -25,33 +25,66 @@ import {
   Text,
   Link,
 } from '@chakra-ui/react';
-import { snakeCase } from 'lodash';
 import type { Row, SortingRule } from 'react-table';
 
 import { useDatasets } from 'src/api';
-import { Table } from 'src/components/Table';
+import { Table, TimeCell } from 'src/components/Table';
 import type { API } from 'src/types';
 import { getMetaValue } from 'src/utils';
+import { snakeCase } from 'lodash';
 
 interface Props {
   onSelect: (datasetId: string) => void;
 }
 
+interface CellProps {
+  cell: {
+    value: any;
+    row: {
+      original: Record<string, any>;
+    }
+  }
+}
+
+const DetailCell = ({ cell: { row } }: CellProps) => {
+  const { totalUpdates, uri } = row.original;
+  return (
+    <Box>
+      <Text>{uri}</Text>
+      <Text fontSize="sm" mt={2}>
+        Total Updates:
+        {' '}
+        {totalUpdates}
+      </Text>
+    </Box>
+  );
+};
+
 const DatasetsList = ({ onSelect }: Props) => {
   const limit = 25;
   const [offset, setOffset] = useState(0);
-  const [sortBy, setSortBy] = useState<SortingRule<object>[]>([]);
+  const [sortBy, setSortBy] = useState<SortingRule<object>[]>([{ id: 'lastDatasetUpdate', desc: true }]);
 
   const sort = sortBy[0];
   const order = sort ? `${sort.desc ? '-' : ''}${snakeCase(sort.id)}` : '';
 
-  const { data: { datasets, totalEntries }, isLoading } = useDatasets({ limit, offset, order });
+  const { data: { datasets, totalEntries }, isLoading } = useDatasets({
+    limit,
+    offset,
+    order,
+  });
 
   const columns = useMemo(
     () => [
       {
         Header: 'URI',
         accessor: 'uri',
+        Cell: DetailCell,
+      },
+      {
+        Header: 'Last Update',
+        accessor: 'lastDatasetUpdate',
+        Cell: TimeCell,
       },
     ],
     [],
@@ -61,6 +94,7 @@ const DatasetsList = ({ onSelect }: Props) => {
     () => datasets,
     [datasets],
   );
+  const memoSort = useMemo(() => sortBy, [sortBy]);
 
   const onDatasetSelect = (row: Row<API.Dataset>) => {
     if (row.original.uri) onSelect(row.original.uri);
@@ -76,7 +110,7 @@ const DatasetsList = ({ onSelect }: Props) => {
         </Heading>
       </Flex>
       {!datasets.length && !isLoading && (
-        <Text>
+        <Text mb={4}>
           Looks like you do not have any datasets yet. Check out the
           {' '}
           <Link color="blue" href={docsUrl} isExternal>docs</Link>
@@ -94,11 +128,12 @@ const DatasetsList = ({ onSelect }: Props) => {
             setOffset,
             totalEntries,
           }}
-          pageSize={limit}
           manualSort={{
             setSortBy,
             sortBy,
+            initialSortBy: memoSort,
           }}
+          pageSize={limit}
           onRowClicked={onDatasetSelect}
         />
       </Box>
diff --git a/airflow/www/static/js/types/index.ts b/airflow/www/static/js/types/index.ts
index d1386b8ee1..bf0f7d9b9c 100644
--- a/airflow/www/static/js/types/index.ts
+++ b/airflow/www/static/js/types/index.ts
@@ -97,6 +97,11 @@ interface DepEdge {
   v: string;
 }
 
+interface DatasetListItem extends API.Dataset {
+  lastDatasetUpdate: string | null;
+  totalUpdates: number;
+}
+
 export type {
   Dag,
   DagRun,
@@ -108,4 +113,5 @@ export type {
   DepEdge,
   API,
   RunOrdering,
+  DatasetListItem,
 };
diff --git a/airflow/www/templates/airflow/datasets.html b/airflow/www/templates/airflow/datasets.html
index 36c7a9b689..6b3555c616 100644
--- a/airflow/www/templates/airflow/datasets.html
+++ b/airflow/www/templates/airflow/datasets.html
@@ -22,7 +22,8 @@
 
 {% block head_meta %}
   {{ super() }}
-  <meta name="datasets_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dataset_endpoint_get_datasets') }}">
+  <meta name="datasets_api" content="{{ url_for('Airflow.datasets_summary') }}">
+  <meta name="dataset_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dataset_endpoint_get_dataset', uri='__URI__') }}">
   <meta name="dataset_events_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dataset_endpoint_get_dataset_events') }}">
   <meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id='__DAG_ID__') }}">
   <meta name="datasets_docs" content="{{ get_docs_url('concepts/datasets.html') }}">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b1d3c1209b..3fe94e0fd9 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -71,7 +71,7 @@ from pendulum.datetime import DateTime
 from pendulum.parsing.exceptions import ParserError
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
-from sqlalchemy import Date, and_, desc, func, inspect, union_all
+from sqlalchemy import Date, and_, desc, distinct, func, inspect, union_all
 from sqlalchemy.exc import IntegrityError
 from sqlalchemy.orm import Session, joinedload
 from wtforms import SelectField, validators
@@ -108,7 +108,7 @@ from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
 from airflow.timetables.base import DataInterval, TimeRestriction
 from airflow.timetables.interval import CronDataIntervalTimetable
-from airflow.utils import timezone, yaml
+from airflow.utils import json as utils_json, timezone, yaml
 from airflow.utils.airflow_flask_app import get_airflow_app
 from airflow.utils.dag_edges import dag_edges
 from airflow.utils.dates import infer_time_unit, scale_time_units
@@ -3547,6 +3547,78 @@ class Airflow(AirflowBaseView):
             {'Content-Type': 'application/json; charset=utf-8'},
         )
 
+    @expose('/object/datasets_summary')
+    @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET)])
+    def datasets_summary(self):
+        """Get a summary of datasets, including the datetime they were last updated and how many updates
+        they've ever had
+        """
+        allowed_attrs = ['uri', 'last_dataset_update']
+
+        limit = int(request.args.get("limit", 25))
+        offset = int(request.args.get("offset", 0))
+        order_by = request.args.get("order_by", "uri")
+        lstripped_orderby = order_by.lstrip('-')
+
+        if lstripped_orderby not in allowed_attrs:
+            return {
+                "detail": (
+                    f"Ordering with '{lstripped_orderby}' is disallowed or the attribute does not "
+                    "exist on the model"
+                )
+            }, 400
+
+        limit = 50 if limit > 50 else limit
+
+        with create_session() as session:
+            if lstripped_orderby == "uri":
+                if order_by[0] == "-":
+                    order_by = (DatasetModel.uri.desc(),)
+                else:
+                    order_by = (DatasetModel.uri.asc(),)
+            elif lstripped_orderby == "last_dataset_update":
+                if order_by[0] == "-":
+                    order_by = (
+                        func.max(DatasetEvent.timestamp).desc(),
+                        DatasetModel.uri.asc(),
+                    )
+                    if session.bind.dialect.name == "postgresql":
+                        order_by = (order_by[0].nulls_last(), *order_by[1:])
+                else:
+                    order_by = (
+                        func.max(DatasetEvent.timestamp).asc(),
+                        DatasetModel.uri.desc(),
+                    )
+                    if session.bind.dialect.name == "postgresql":
+                        order_by = (order_by[0].nulls_first(), *order_by[1:])
+
+            total_entries = session.query(func.count(DatasetModel.id)).scalar()
+
+            datasets = [
+                dict(dataset)
+                for dataset in session.query(
+                    DatasetModel.id,
+                    DatasetModel.uri,
+                    func.max(DatasetEvent.timestamp).label("last_dataset_update"),
+                    func.count(distinct(DatasetEvent.id)).label("total_updates"),
+                )
+                .outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)
+                .group_by(
+                    DatasetModel.id,
+                    DatasetModel.uri,
+                )
+                .order_by(*order_by)
+                .offset(offset)
+                .limit(limit)
+                .all()
+            ]
+            data = {"datasets": datasets, "total_entries": total_entries}
+
+            return (
+                htmlsafe_json_dumps(data, separators=(',', ':'), cls=utils_json.AirflowJsonEncoder),
+                {'Content-Type': 'application/json; charset=utf-8'},
+            )
+
     @expose('/robots.txt')
     @action_logging
     def robots(self):
diff --git a/tests/www/views/test_views_dataset.py b/tests/www/views/test_views_dataset.py
new file mode 100644
index 0000000000..c67298876d
--- /dev/null
+++ b/tests/www/views/test_views_dataset.py
@@ -0,0 +1,332 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pendulum
+import pytest
+from dateutil.tz import UTC
+
+from airflow import Dataset
+from airflow.models.dataset import DatasetEvent, DatasetModel
+from airflow.operators.empty import EmptyOperator
+from tests.test_utils.asserts import assert_queries_count
+from tests.test_utils.db import clear_db_datasets
+
+
+class TestDatasetEndpoint:
+    @pytest.fixture(autouse=True)
+    def cleanup(self):
+        clear_db_datasets()
+        yield
+        clear_db_datasets()
+
+
+class TestGetDatasets(TestDatasetEndpoint):
+    def test_should_respond_200(self, admin_client, session):
+        datasets = [
+            DatasetModel(
+                id=i,
+                uri=f"s3://bucket/key/{i}",
+            )
+            for i in [1, 2]
+        ]
+        session.add_all(datasets)
+        session.commit()
+        assert session.query(DatasetModel).count() == 2
+
+        with assert_queries_count(8):
+            response = admin_client.get("/object/datasets_summary")
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "datasets": [
+                {
+                    "id": 1,
+                    "uri": "s3://bucket/key/1",
+                    "last_dataset_update": None,
+                    "total_updates": 0,
+                },
+                {
+                    "id": 2,
+                    "uri": "s3://bucket/key/2",
+                    "last_dataset_update": None,
+                    "total_updates": 0,
+                },
+            ],
+            "total_entries": 2,
+        }
+
+    def test_order_by_raises_400_for_invalid_attr(self, admin_client, session):
+        datasets = [
+            DatasetModel(
+                uri=f"s3://bucket/key/{i}",
+            )
+            for i in [1, 2]
+        ]
+        session.add_all(datasets)
+        session.commit()
+        assert session.query(DatasetModel).count() == 2
+
+        response = admin_client.get("/object/datasets_summary?order_by=fake")
+
+        assert response.status_code == 400
+        msg = "Ordering with 'fake' is disallowed or the attribute does not exist on the model"
+        assert response.json['detail'] == msg
+
+    @pytest.mark.parametrize(
+        "order_by, ordered_dataset_ids",
+        [
+            ("uri", [1, 2, 3, 4]),
+            ("-uri", [4, 3, 2, 1]),
+            ("last_dataset_update", [4, 1, 3, 2]),
+            ("-last_dataset_update", [2, 3, 1, 4]),
+        ],
+    )
+    def test_order_by(self, admin_client, session, order_by, ordered_dataset_ids):
+        datasets = [
+            DatasetModel(
+                id=i,
+                uri=f"s3://bucket/key/{i}",
+            )
+            for i in range(1, len(ordered_dataset_ids) + 1)
+        ]
+        session.add_all(datasets)
+        dataset_events = [
+            DatasetEvent(
+                dataset_id=datasets[2].id,
+                timestamp=pendulum.today('UTC').add(days=-3),
+            ),
+            DatasetEvent(
+                dataset_id=datasets[1].id,
+                timestamp=pendulum.today('UTC').add(days=-2),
+            ),
+            DatasetEvent(
+                dataset_id=datasets[1].id,
+                timestamp=pendulum.today('UTC').add(days=-1),
+            ),
+        ]
+        session.add_all(dataset_events)
+        session.commit()
+        assert session.query(DatasetModel).count() == len(ordered_dataset_ids)
+
+        response = admin_client.get(f"/object/datasets_summary?order_by={order_by}")
+
+        assert response.status_code == 200
+        assert ordered_dataset_ids == [json_dict['id'] for json_dict in response.json['datasets']]
+        assert response.json['total_entries'] == len(ordered_dataset_ids)
+
+    @pytest.mark.need_serialized_dag
+    def test_correct_counts_update(self, admin_client, session, dag_maker, app, monkeypatch):
+        with monkeypatch.context() as m:
+            datasets = [Dataset(uri=f"s3://bucket/key/{i}") for i in [1, 2, 3, 4, 5]]
+
+            # DAG that produces dataset #1
+            with dag_maker(dag_id='upstream', schedule=None, serialized=True, session=session):
+                EmptyOperator(task_id='task1', outlets=[datasets[0]])
+
+            # DAG that is consumes only datasets #1 and #2
+            with dag_maker(dag_id='downstream', schedule=datasets[:2], serialized=True, session=session):
+                EmptyOperator(task_id='task1')
+
+            # We create multiple dataset-producing and dataset-consuming DAGs because the query requires
+            # COUNT(DISTINCT ...) for total_updates, or else it returns a multiple of the correct number due
+            # to the outer joins with DagScheduleDatasetReference and TaskOutletDatasetReference
+            # Two independent DAGs that produce dataset #3
+            with dag_maker(dag_id='independent_producer_1', serialized=True, session=session):
+                EmptyOperator(task_id='task1', outlets=[datasets[2]])
+            with dag_maker(dag_id='independent_producer_2', serialized=True, session=session):
+                EmptyOperator(task_id='task1', outlets=[datasets[2]])
+            # Two independent DAGs that consume dataset #4
+            with dag_maker(
+                dag_id='independent_consumer_1',
+                schedule=[datasets[3]],
+                serialized=True,
+                session=session,
+            ):
+                EmptyOperator(task_id='task1')
+            with dag_maker(
+                dag_id='independent_consumer_2',
+                schedule=[datasets[3]],
+                serialized=True,
+                session=session,
+            ):
+                EmptyOperator(task_id='task1')
+
+            # Independent DAG that is produces and consumes the same dataset, #5
+            with dag_maker(
+                dag_id='independent_producer_self_consumer',
+                schedule=[datasets[4]],
+                serialized=True,
+                session=session,
+            ):
+                EmptyOperator(task_id='task1', outlets=[datasets[4]])
+
+            m.setattr(app, 'dag_bag', dag_maker.dagbag)
+
+            ds1_id = session.query(DatasetModel.id).filter_by(uri=datasets[0].uri).scalar()
+            ds2_id = session.query(DatasetModel.id).filter_by(uri=datasets[1].uri).scalar()
+            ds3_id = session.query(DatasetModel.id).filter_by(uri=datasets[2].uri).scalar()
+            ds4_id = session.query(DatasetModel.id).filter_by(uri=datasets[3].uri).scalar()
+            ds5_id = session.query(DatasetModel.id).filter_by(uri=datasets[4].uri).scalar()
+
+            # dataset 1 events
+            session.add_all(
+                [
+                    DatasetEvent(
+                        dataset_id=ds1_id,
+                        timestamp=pendulum.DateTime(2022, 8, 1, i, tzinfo=UTC),
+                    )
+                    for i in range(3)
+                ]
+            )
+            # dataset 3 events
+            session.add_all(
+                [
+                    DatasetEvent(
+                        dataset_id=ds3_id,
+                        timestamp=pendulum.DateTime(2022, 8, 1, i, tzinfo=UTC),
+                    )
+                    for i in range(3)
+                ]
+            )
+            # dataset 4 events
+            session.add_all(
+                [
+                    DatasetEvent(
+                        dataset_id=ds4_id,
+                        timestamp=pendulum.DateTime(2022, 8, 1, i, tzinfo=UTC),
+                    )
+                    for i in range(4)
+                ]
+            )
+            # dataset 5 events
+            session.add_all(
+                [
+                    DatasetEvent(
+                        dataset_id=ds5_id,
+                        timestamp=pendulum.DateTime(2022, 8, 1, i, tzinfo=UTC),
+                    )
+                    for i in range(5)
+                ]
+            )
+            session.commit()
+
+            response = admin_client.get("/object/datasets_summary")
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "datasets": [
+                {
+                    "id": ds1_id,
+                    "uri": "s3://bucket/key/1",
+                    "last_dataset_update": "2022-08-01T02:00:00+00:00",
+                    "total_updates": 3,
+                },
+                {
+                    "id": ds2_id,
+                    "uri": "s3://bucket/key/2",
+                    "last_dataset_update": None,
+                    "total_updates": 0,
+                },
+                {
+                    "id": ds3_id,
+                    "uri": "s3://bucket/key/3",
+                    "last_dataset_update": "2022-08-01T02:00:00+00:00",
+                    "total_updates": 3,
+                },
+                {
+                    "id": ds4_id,
+                    "uri": "s3://bucket/key/4",
+                    "last_dataset_update": "2022-08-01T03:00:00+00:00",
+                    "total_updates": 4,
+                },
+                {
+                    "id": ds5_id,
+                    "uri": "s3://bucket/key/5",
+                    "last_dataset_update": "2022-08-01T04:00:00+00:00",
+                    "total_updates": 5,
+                },
+            ],
+            "total_entries": 5,
+        }
+
+
+class TestGetDatasetsEndpointPagination(TestDatasetEndpoint):
+    @pytest.mark.parametrize(
+        "url, expected_dataset_uris",
+        [
+            # Limit test data
+            ("/object/datasets_summary?limit=1", ["s3://bucket/key/1"]),
+            ("/object/datasets_summary?limit=5", [f"s3://bucket/key/{i}" for i in range(1, 6)]),
+            # Offset test data
+            ("/object/datasets_summary?offset=1", [f"s3://bucket/key/{i}" for i in range(2, 10)]),
+            ("/object/datasets_summary?offset=3", [f"s3://bucket/key/{i}" for i in range(4, 10)]),
+            # Limit and offset test data
+            ("/object/datasets_summary?offset=3&limit=3", [f"s3://bucket/key/{i}" for i in [4, 5, 6]]),
+        ],
+    )
+    def test_limit_and_offset(self, admin_client, session, url, expected_dataset_uris):
+        datasets = [
+            DatasetModel(
+                uri=f"s3://bucket/key/{i}",
+                extra={"foo": "bar"},
+            )
+            for i in range(1, 10)
+        ]
+        session.add_all(datasets)
+        session.commit()
+
+        response = admin_client.get(url)
+
+        assert response.status_code == 200
+        dataset_uris = [dataset["uri"] for dataset in response.json["datasets"]]
+        assert dataset_uris == expected_dataset_uris
+
+    def test_should_respect_page_size_limit_default(self, admin_client, session):
+        datasets = [
+            DatasetModel(
+                uri=f"s3://bucket/key/{i}",
+                extra={"foo": "bar"},
+            )
+            for i in range(1, 60)
+        ]
+        session.add_all(datasets)
+        session.commit()
+
+        response = admin_client.get("/object/datasets_summary")
+
+        assert response.status_code == 200
+        assert len(response.json['datasets']) == 25
+
+    def test_should_return_max_if_req_above(self, admin_client, session):
+        datasets = [
+            DatasetModel(
+                uri=f"s3://bucket/key/{i}",
+                extra={"foo": "bar"},
+            )
+            for i in range(1, 60)
+        ]
+        session.add_all(datasets)
+        session.commit()
+
+        response = admin_client.get("/object/datasets_summary?limit=180")
+
+        assert response.status_code == 200
+        assert len(response.json['datasets']) == 50


[airflow] 10/41: Simplify RTIF.delete_old_records() (#26667)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 29bc32a67ae9dc645324996eaf64ba19b032193c
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Mon Sep 26 21:49:17 2022 +0800

    Simplify RTIF.delete_old_records() (#26667)
    
    A lot of the codes and comments are actually not relevant since we've
    removed the funky execution_date based filtering in 2.3, and we can
    simply the implementation quite a bit now.
    
    (cherry picked from commit 0e79dd0b1722a610c898da0ba8557b8a94da568c)
---
 airflow/models/renderedtifields.py | 68 +++++++++++++++-----------------------
 airflow/utils/sqlalchemy.py        | 20 ++++++++++-
 2 files changed, 46 insertions(+), 42 deletions(-)

diff --git a/airflow/models/renderedtifields.py b/airflow/models/renderedtifields.py
index 2de03fba67..c0f30a2a1a 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -19,9 +19,10 @@
 from __future__ import annotations
 
 import os
+from typing import TYPE_CHECKING
 
 import sqlalchemy_jsonfield
-from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, and_, not_, text, tuple_
+from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, text
 from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.orm import Session, relationship
 
@@ -32,6 +33,10 @@ from airflow.serialization.helpers import serialize_template_field
 from airflow.settings import json
 from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.sqlalchemy import tuple_not_in_condition
+
+if TYPE_CHECKING:
+    from sqlalchemy.sql import FromClause
 
 
 class RenderedTaskInstanceFields(Base):
@@ -183,9 +188,9 @@ class RenderedTaskInstanceFields(Base):
         cls,
         task_id: str,
         dag_id: str,
-        num_to_keep=conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0),
-        session: Session = None,
-    ):
+        num_to_keep: int = conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0),
+        session: Session = NEW_SESSION,
+    ) -> None:
         """
         Keep only Last X (num_to_keep) number of records for a task by deleting others.
 
@@ -211,49 +216,30 @@ class RenderedTaskInstanceFields(Base):
             .limit(num_to_keep)
         )
 
-        if session.bind.dialect.name in ["postgresql", "sqlite"]:
-            # Fetch Top X records given dag_id & task_id ordered by Execution Date
-            subq1 = tis_to_keep_query.subquery()
-            excluded = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id)
-            session.query(cls).filter(
-                cls.dag_id == dag_id,
-                cls.task_id == task_id,
-                tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(excluded),
-            ).delete(synchronize_session=False)
-        elif session.bind.dialect.name in ["mysql"]:
-            cls._remove_old_rendered_ti_fields_mysql(dag_id, session, task_id, tis_to_keep_query)
-        else:
-            # Fetch Top X records given dag_id & task_id ordered by Execution Date
-            tis_to_keep = tis_to_keep_query.all()
-
-            filter_tis = [
-                not_(
-                    and_(
-                        cls.dag_id == ti.dag_id,
-                        cls.task_id == ti.task_id,
-                        cls.run_id == ti.run_id,
-                    )
-                )
-                for ti in tis_to_keep
-            ]
-
-            session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)
-
+        cls._do_delete_old_records(
+            dag_id=dag_id,
+            task_id=task_id,
+            ti_clause=tis_to_keep_query.subquery(),
+            session=session,
+        )
         session.flush()
 
     @classmethod
     @retry_db_transaction
-    def _remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id, tis_to_keep_query):
-        # Fetch Top X records given dag_id & task_id ordered by Execution Date
-        subq1 = tis_to_keep_query.subquery('subq1')
-        # Second Subquery
-        # Workaround for MySQL Limitation (https://stackoverflow.com/a/19344141/5691525)
-        # Limitation: This version of MySQL does not yet support
-        # LIMIT & IN/ALL/ANY/SOME subquery
-        subq2 = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id).subquery('subq2')
+    def _do_delete_old_records(
+        cls,
+        *,
+        task_id: str,
+        dag_id: str,
+        ti_clause: FromClause,
+        session: Session,
+    ) -> None:
         # This query might deadlock occasionally and it should be retried if fails (see decorator)
         session.query(cls).filter(
             cls.dag_id == dag_id,
             cls.task_id == task_id,
-            tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(subq2),
+            tuple_not_in_condition(
+                (cls.dag_id, cls.task_id, cls.run_id),
+                session.query(ti_clause.c.dag_id, ti_clause.c.task_id, ti_clause.c.run_id),
+            ),
         ).delete(synchronize_session=False)
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index ef0d29ebf8..0531c29dbf 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -25,7 +25,7 @@ from typing import Any, Iterable
 
 import pendulum
 from dateutil import relativedelta
-from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, tuple_
+from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
 from sqlalchemy.dialects import mssql, mysql
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import Session
@@ -422,3 +422,21 @@ def tuple_in_condition(
     if not clauses:
         return false()
     return or_(*clauses)
+
+
+def tuple_not_in_condition(
+    columns: tuple[ColumnElement, ...],
+    collection: Iterable[Any],
+) -> ColumnOperators:
+    """Generates a tuple-not-in-collection operator to use in ``.filter()``.
+
+    This is similar to ``tuple_in_condition`` except generating ``NOT IN``.
+
+    :meta private:
+    """
+    if settings.engine.dialect.name != "mssql":
+        return tuple_(*columns).not_in(collection)
+    clauses = [or_(*(c != v for c, v in zip(columns, values))) for values in collection]
+    if not clauses:
+        return true()
+    return and_(*clauses)


[airflow] 25/41: Documentation fixes (#26819)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bcfa4186376edcdac4a9a66073eec49b51f9e73a
Author: Karthikeyan Singaravelan <ti...@gmail.com>
AuthorDate: Sun Oct 2 12:14:45 2022 +0530

    Documentation fixes (#26819)
    
    (cherry picked from commit 69da98cdb194de3544368b6bd7c47dcc7ace8814)
---
 RELEASE_NOTES.rst         | 2 +-
 airflow/cli/cli_parser.py | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst
index fadda3a911..366b2edeea 100644
--- a/RELEASE_NOTES.rst
+++ b/RELEASE_NOTES.rst
@@ -895,7 +895,7 @@ Details in the `SQLAlchemy Changelog <https://docs.sqlalchemy.org/en/14/changelo
 ``auth_backends`` replaces ``auth_backend`` configuration setting (#21472)
 """"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
 
-Previously, only one backend was used to authorize use of the REST API. In 2.3 this was changed to support multiple backends, separated by whitespace. Each will be tried in turn until a successful response is returned.
+Previously, only one backend was used to authorize use of the REST API. In 2.3 this was changed to support multiple backends, separated by comma. Each will be tried in turn until a successful response is returned.
 
 This setting is also used for the deprecated experimental API, which only uses the first option even if multiple are given.
 
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index c26b44e178..015174575d 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -577,7 +577,7 @@ ARG_DB_FROM_REVISION = Arg(
 ARG_DB_SQL_ONLY = Arg(
     ("-s", "--show-sql-only"),
     help="Don't actually run migrations; just print out sql scripts for offline migration. "
-    "Required if using either `--from-version` or `--from-version`.",
+    "Required if using either `--from-revision` or `--from-version`.",
     action="store_true",
     default=False,
 )


[airflow] 30/41: Change dag audit log sort by date from asc to desc (#26895)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d760ee6ff85b5fbbbbcfc131ccafb967a870603f
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Thu Oct 6 06:11:09 2022 -0400

    Change dag audit log sort by date from asc to desc (#26895)
    
    (cherry picked from commit 8b928b4172c7d3abb42c141a8d874e85639decf8)
---
 airflow/www/views.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index a2b0d1e76d..b6b6836256 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3671,7 +3671,7 @@ class Airflow(AirflowBaseView):
 
         current_page = request.args.get('page', default=0, type=int)
         arg_sorting_key = request.args.get('sorting_key', 'dttm')
-        arg_sorting_direction = request.args.get('sorting_direction', default='asc')
+        arg_sorting_direction = request.args.get('sorting_direction', default='desc')
 
         logs_per_page = PAGE_SIZE
         audit_logs_count = query.count()


[airflow] 14/41: Bump sphinx and sphinx-autoapi (#26743)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ba281d69afb64faecfc9337f6423bb98904dd0e3
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Wed Sep 28 13:42:43 2022 +0200

    Bump sphinx and sphinx-autoapi (#26743)
    
    The sphinx-autoapi bug https://github.com/readthedocs/sphinx-autoapi/issues/352
    has been resolved and 2.0.0 version has been released. Since we
    are using sphinx/sphinx-autoapi only for `doc` extra, we can
    safely bump the minimum version of both to latest released minor
    versions.
    
    (cherry picked from commit 9e06c99f6102d0227c6e7b20b258d628c2bc6d5c)
---
 setup.py | 8 ++------
 1 file changed, 2 insertions(+), 6 deletions(-)

diff --git a/setup.py b/setup.py
index bd032f3566..71db6f7796 100644
--- a/setup.py
+++ b/setup.py
@@ -266,15 +266,11 @@ doc = [
     'importlib-metadata>=4.4; python_version < "3.8"',
     'sphinx-airflow-theme',
     'sphinx-argparse>=0.1.13',
-    'sphinx-autoapi>=1.8.0',
+    'sphinx-autoapi>=2.0.0',
     'sphinx-copybutton',
     'sphinx-jinja>=2.0',
     'sphinx-rtd-theme>=0.1.6',
-    # Spinx 5.2.0 introduced deprecation for property documentation and autoapi 1.9.0 generates
-    # documentation that uses the old way of documenting it. This is tracked in
-    # https://github.com/readthedocs/sphinx-autoapi/issues/352 of autoapi and until it is solved
-    # we need to limit Sphinx to <5.2.0
-    'sphinx>=4.4.0,<5.2.0',
+    'sphinx>=5.2.0',
     'sphinxcontrib-httpdomain>=1.7.0',
     'sphinxcontrib-redoc>=1.6.0',
     'sphinxcontrib-spelling>=7.3',


[airflow] 15/41: Remove DAG parsing from StandardTaskRunner (#26750)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3e090209461b603fa5fb695ee83a02236e08d192
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Sep 29 22:51:46 2022 +0100

    Remove DAG parsing from StandardTaskRunner (#26750)
    
    This makes the starting of StandardTaskRunner faster as the parsing of DAG will now be done once at task_run.
    Also removed parsing of example dags when running a task
    
    (cherry picked from commit ce071172e22fba018889db7dcfac4a4d0fc41cda)
---
 airflow/cli/commands/task_command.py             | 10 +---
 airflow/task/task_runner/standard_task_runner.py |  8 ++-
 airflow/utils/cli.py                             | 19 +++----
 tests/cli/commands/test_task_command.py          | 64 ------------------------
 4 files changed, 10 insertions(+), 91 deletions(-)

diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 9caa8bb4bd..982aa31fd5 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -47,7 +47,6 @@ from airflow.typing_compat import Literal
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import (
     get_dag,
-    get_dag_by_deserialization,
     get_dag_by_file_location,
     get_dag_by_pickle,
     get_dags,
@@ -364,14 +363,7 @@ def task_run(args, dag=None):
         print(f'Loading pickle id: {args.pickle}')
         dag = get_dag_by_pickle(args.pickle)
     elif not dag:
-        if args.local:
-            try:
-                dag = get_dag_by_deserialization(args.dag_id)
-            except AirflowException:
-                print(f'DAG {args.dag_id} does not exist in the database, trying to parse the dag_file')
-                dag = get_dag(args.subdir, args.dag_id)
-        else:
-            dag = get_dag(args.subdir, args.dag_id)
+        dag = get_dag(args.subdir, args.dag_id, include_examples=False)
     else:
         # Use DAG from parameter
         pass
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 3c13a28df4..27fd11b1b1 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -36,6 +36,7 @@ class StandardTaskRunner(BaseTaskRunner):
     def __init__(self, local_task_job):
         super().__init__(local_task_job)
         self._rc = None
+        self.dag = local_task_job.task_instance.task.dag
 
     def start(self):
         if CAN_FORK and not self.run_as_user:
@@ -64,7 +65,6 @@ class StandardTaskRunner(BaseTaskRunner):
             from airflow import settings
             from airflow.cli.cli_parser import get_parser
             from airflow.sentry import Sentry
-            from airflow.utils.cli import get_dag
 
             # Force a new SQLAlchemy session. We can't share open DB handles
             # between process. The cli code will re-create this as part of its
@@ -92,10 +92,8 @@ class StandardTaskRunner(BaseTaskRunner):
                     dag_id=self._task_instance.dag_id,
                     task_id=self._task_instance.task_id,
                 ):
-                    # parse dag file since `airflow tasks run --local` does not parse dag file
-                    dag = get_dag(args.subdir, args.dag_id)
-                    args.func(args, dag=dag)
-                return_code = 0
+                    args.func(args, dag=self.dag)
+                    return_code = 0
             except Exception as exc:
                 return_code = 1
 
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 87313f46f5..522bf963e2 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -33,6 +33,7 @@ from pathlib import Path
 from typing import TYPE_CHECKING, Callable, TypeVar, cast
 
 from airflow import settings
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.utils import cli_action_loggers
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
@@ -205,7 +206,9 @@ def _search_for_dag_file(val: str | None) -> str | None:
     return None
 
 
-def get_dag(subdir: str | None, dag_id: str) -> DAG:
+def get_dag(
+    subdir: str | None, dag_id: str, include_examples=conf.getboolean('core', 'LOAD_EXAMPLES')
+) -> DAG:
     """
     Returns DAG of a given dag_id
 
@@ -216,11 +219,11 @@ def get_dag(subdir: str | None, dag_id: str) -> DAG:
     from airflow.models import DagBag
 
     first_path = process_subdir(subdir)
-    dagbag = DagBag(first_path)
+    dagbag = DagBag(first_path, include_examples=include_examples)
     if dag_id not in dagbag.dags:
         fallback_path = _search_for_dag_file(subdir) or settings.DAGS_FOLDER
         logger.warning("Dag %r not found in path %s; trying path %s", dag_id, first_path, fallback_path)
-        dagbag = DagBag(dag_folder=fallback_path)
+        dagbag = DagBag(dag_folder=fallback_path, include_examples=include_examples)
         if dag_id not in dagbag.dags:
             raise AirflowException(
                 f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
@@ -228,16 +231,6 @@ def get_dag(subdir: str | None, dag_id: str) -> DAG:
     return dagbag.dags[dag_id]
 
 
-def get_dag_by_deserialization(dag_id: str) -> DAG:
-    from airflow.models.serialized_dag import SerializedDagModel
-
-    dag_model = SerializedDagModel.get(dag_id)
-    if dag_model is None:
-        raise AirflowException(f"Serialized DAG: {dag_id} could not be found")
-
-    return dag_model.dag
-
-
 def get_dags(subdir: str | None, dag_id: str, use_regex: bool = False):
     """Returns DAG(s) matching a given regex or dag_id"""
     from airflow.models import DagBag
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index 03b9259f8d..802140755f 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -159,38 +159,6 @@ class TestCliTasks:
             task_command.task_test(args)
         assert capsys.readouterr().out.endswith(f"{not_password}\n")
 
-    @mock.patch("airflow.cli.commands.task_command.get_dag_by_deserialization")
-    @mock.patch("airflow.cli.commands.task_command.LocalTaskJob")
-    def test_run_get_serialized_dag(self, mock_local_job, mock_get_dag_by_deserialization):
-        """
-        Test using serialized dag for local task_run
-        """
-        task_id = self.dag.task_ids[0]
-        args = [
-            'tasks',
-            'run',
-            '--ignore-all-dependencies',
-            '--local',
-            self.dag_id,
-            task_id,
-            self.run_id,
-        ]
-        mock_get_dag_by_deserialization.return_value = SerializedDagModel.get(self.dag_id).dag
-
-        task_command.task_run(self.parser.parse_args(args))
-        mock_local_job.assert_called_once_with(
-            task_instance=mock.ANY,
-            mark_success=False,
-            ignore_all_deps=True,
-            ignore_depends_on_past=False,
-            ignore_task_deps=False,
-            ignore_ti_state=False,
-            pickle_id=None,
-            pool=None,
-            external_executor_id=None,
-        )
-        mock_get_dag_by_deserialization.assert_called_once_with(self.dag_id)
-
     def test_cli_test_different_path(self, session):
         """
         When thedag processor has a different dags folder
@@ -265,38 +233,6 @@ class TestCliTasks:
             # verify that the file was in different location when run
             assert ti.xcom_pull(ti.task_id) == new_file_path.as_posix()
 
-    @mock.patch("airflow.cli.commands.task_command.get_dag_by_deserialization")
-    @mock.patch("airflow.cli.commands.task_command.LocalTaskJob")
-    def test_run_get_serialized_dag_fallback(self, mock_local_job, mock_get_dag_by_deserialization):
-        """
-        Fallback to parse dag_file when serialized dag does not exist in the db
-        """
-        task_id = self.dag.task_ids[0]
-        args = [
-            'tasks',
-            'run',
-            '--ignore-all-dependencies',
-            '--local',
-            self.dag_id,
-            task_id,
-            self.run_id,
-        ]
-        mock_get_dag_by_deserialization.side_effect = mock.Mock(side_effect=AirflowException('Not found'))
-
-        task_command.task_run(self.parser.parse_args(args))
-        mock_local_job.assert_called_once_with(
-            task_instance=mock.ANY,
-            mark_success=False,
-            ignore_all_deps=True,
-            ignore_depends_on_past=False,
-            ignore_task_deps=False,
-            ignore_ti_state=False,
-            pickle_id=None,
-            pool=None,
-            external_executor_id=None,
-        )
-        mock_get_dag_by_deserialization.assert_called_once_with(self.dag_id)
-
     @mock.patch("airflow.cli.commands.task_command.LocalTaskJob")
     def test_run_with_existing_dag_run_id(self, mock_local_job):
         """


[airflow] 09/41: Add a note against use of top level code in timetable (#26649)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 14e55b64ce803858fde5ff9369e11e79bbcdd668
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Mon Sep 26 23:01:14 2022 +0100

    Add a note against use of top level code in timetable (#26649)
    
    Accessing Variables, Connections at top level of code is bad practice
    
    Add a section in best practices ref for timetable
    
    (cherry picked from commit 37c0cb6d3240062106388449cf8eed9c948fb539)
---
 airflow/serialization/serialized_objects.py   |  6 ++++-
 docs/apache-airflow/best-practices.rst        | 35 +++++++++++++++++++++++++++
 docs/apache-airflow/concepts/timetable.rst    |  6 +++++
 tests/serialization/test_dag_serialization.py |  8 ++++--
 4 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 542573fbcc..06608ba865 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -165,7 +165,11 @@ class _TimetableNotRegistered(ValueError):
         self.type_string = type_string
 
     def __str__(self) -> str:
-        return f"Timetable class {self.type_string!r} is not registered"
+        return (
+            f"Timetable class {self.type_string!r} is not registered or "
+            "you have a top level database access that disrupted the session. "
+            "Please check the airflow best practices documentation."
+        )
 
 
 def _encode_timetable(var: Timetable) -> dict[str, Any]:
diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst
index 33f31654cc..2beb777eca 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -216,6 +216,41 @@ or if you need to deserialize a json object from the variable :
 For security purpose, you're recommended to use the :ref:`Secrets Backend<secrets_backend_configuration>`
 for any variable that contains sensitive data.
 
+.. _best_practices/timetables:
+
+Timetables
+----------
+Avoid using Airflow Variables/Connections or accessing airflow database at the top level of your timetable code.
+Database access should be delayed until the execution time of the DAG. This means that you should not have variables/connections retrieval
+as argument to your timetable class initialization or have Variable/connection at the top level of your custom timetable module.
+
+Bad example:
+
+.. code-block:: python
+
+    from airflow.models.variable import Variable
+    from airflow.timetables.interval import CronDataIntervalTimetable
+
+
+    class CustomTimetable(CronDataIntervalTimetable):
+        def __init__(self, *args, something=Variable.get('something'), **kwargs):
+            self._something = something
+            super().__init__(*args, **kwargs)
+
+Good example:
+
+.. code-block:: python
+
+    from airflow.models.variable import Variable
+    from airflow.timetables.interval import CronDataIntervalTimetable
+
+
+    class CustomTimetable(CronDataIntervalTimetable):
+        def __init__(self, *args, something='something', **kwargs):
+            self._something = Variable.get(something)
+            super().__init__(*args, **kwargs)
+
+
 Triggering DAGs after changes
 -----------------------------
 
diff --git a/docs/apache-airflow/concepts/timetable.rst b/docs/apache-airflow/concepts/timetable.rst
index 9e1a9e0bf3..c76d63ea48 100644
--- a/docs/apache-airflow/concepts/timetable.rst
+++ b/docs/apache-airflow/concepts/timetable.rst
@@ -51,6 +51,12 @@ As such, Airflow allows for custom timetables to be written in plugins and used
 DAGs. An example demonstrating a custom timetable can be found in the
 :doc:`/howto/timetable` how-to guide.
 
+.. note::
+
+    As a general rule, always access Variables, Connections etc or anything that would access
+    the database as late as possible in your code. See :ref:`best_practices/timetables`
+    for more best practices to follow.
+
 Built-in Timetables
 -------------------
 
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 6b485f7465..7a6b9a1ec1 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -411,7 +411,9 @@ class TestStringifiedDAGs:
         message = (
             "Failed to serialize DAG 'simple_dag': Timetable class "
             "'tests.test_utils.timetables.CustomSerializationTimetable' "
-            "is not registered"
+            "is not registered or "
+            "you have a top level database access that disrupted the session. "
+            "Please check the airflow best practices documentation."
         )
         assert str(ctx.value) == message
 
@@ -721,7 +723,9 @@ class TestStringifiedDAGs:
         message = (
             "Timetable class "
             "'tests.test_utils.timetables.CustomSerializationTimetable' "
-            "is not registered"
+            "is not registered or "
+            "you have a top level database access that disrupted the session. "
+            "Please check the airflow best practices documentation."
         )
         assert str(ctx.value) == message
 


[airflow] 19/41: add icon legend to datasets graph (#26781)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4fe9eef773400a778ba3b77d95e81ac6ce4d9361
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Thu Sep 29 14:14:45 2022 -0400

    add icon legend to datasets graph (#26781)
    
    (cherry picked from commit 2e66d2d89e1e4a3c7b31a43b62d0b0ec97165dd4)
---
 airflow/www/static/js/datasets/Graph/Legend.tsx | 76 +++++++++++++++++++++++++
 airflow/www/static/js/datasets/Graph/index.tsx  | 37 +++++-------
 2 files changed, 90 insertions(+), 23 deletions(-)

diff --git a/airflow/www/static/js/datasets/Graph/Legend.tsx b/airflow/www/static/js/datasets/Graph/Legend.tsx
new file mode 100644
index 0000000000..1884a6aa4c
--- /dev/null
+++ b/airflow/www/static/js/datasets/Graph/Legend.tsx
@@ -0,0 +1,76 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React from 'react';
+import {
+  Flex, Box, IconButton, Text,
+} from '@chakra-ui/react';
+import {
+  MdOutlineZoomOutMap, MdFilterCenterFocus, MdOutlineAccountTree,
+} from 'react-icons/md';
+import { HiDatabase } from 'react-icons/hi';
+
+interface Props {
+  zoom: any;
+  center: () => void;
+}
+
+const Legend = ({ zoom, center }: Props) => (
+  <Flex justifyContent="space-between" alignItems="center">
+    <Box>
+      <IconButton
+        onClick={zoom.reset}
+        fontSize="2xl"
+        m={2}
+        title="Reset zoom"
+        aria-label="Reset zoom"
+        icon={<MdOutlineZoomOutMap />}
+      />
+      <IconButton
+        onClick={center}
+        fontSize="2xl"
+        m={2}
+        title="Center"
+        aria-label="Center"
+        icon={<MdFilterCenterFocus />}
+      />
+    </Box>
+    <Box
+      backgroundColor="white"
+      p={2}
+      borderColor="gray.200"
+      borderLeftWidth={1}
+      borderTopWidth={1}
+    >
+      <Text>Legend</Text>
+      <Flex>
+        <Flex mr={2} alignItems="center">
+          <MdOutlineAccountTree size="16px" />
+          <Text ml={1}>DAG</Text>
+        </Flex>
+        <Flex alignItems="center">
+          <HiDatabase size="16px" />
+          <Text ml={1}>Dataset</Text>
+        </Flex>
+      </Flex>
+    </Box>
+  </Flex>
+);
+
+export default Legend;
diff --git a/airflow/www/static/js/datasets/Graph/index.tsx b/airflow/www/static/js/datasets/Graph/index.tsx
index 49df5bd7d4..c49c902081 100644
--- a/airflow/www/static/js/datasets/Graph/index.tsx
+++ b/airflow/www/static/js/datasets/Graph/index.tsx
@@ -18,15 +18,16 @@
  */
 
 import React, { useState, useEffect, RefObject } from 'react';
-import { Box, IconButton, Spinner } from '@chakra-ui/react';
+import { Box, Spinner } from '@chakra-ui/react';
 import { Zoom } from '@visx/zoom';
-import { MdOutlineZoomOutMap, MdFilterCenterFocus } from 'react-icons/md';
+import { Group } from '@visx/group';
 import { debounce } from 'lodash';
 
 import { useDatasetDependencies } from 'src/api';
 
 import Node from './Node';
 import Edge from './Edge';
+import Legend from './Legend';
 
 interface Props {
   onSelect: (datasetId: string) => void;
@@ -117,28 +118,18 @@ const Graph = ({ onSelect, selectedUri }: Props) => {
                   ))}
                 </g>
               </g>
+              <Group top={height - 50} left={0} height={50} width={width}>
+                <foreignObject width={width} height={50}>
+                  <Legend
+                    zoom={zoom}
+                    center={() => zoom.translateTo({
+                      x: (width - (data.width ?? 0)) / 2,
+                      y: (height - (data.height ?? 0)) / 2,
+                    })}
+                  />
+                </foreignObject>
+              </Group>
             </svg>
-            <Box>
-              <IconButton
-                onClick={zoom.reset}
-                fontSize="2xl"
-                m={2}
-                title="Reset zoom"
-                aria-label="Reset zoom"
-                icon={<MdOutlineZoomOutMap />}
-              />
-              <IconButton
-                onClick={() => zoom.translateTo({
-                  x: (width - (data.width ?? 0)) / 2,
-                  y: (height - (data.height ?? 0)) / 2,
-                })}
-                fontSize="2xl"
-                m={2}
-                title="Center"
-                aria-label="Center"
-                icon={<MdFilterCenterFocus />}
-              />
-            </Box>
           </Box>
         )}
       </Zoom>


[airflow] 26/41: Add missing colors to state_color_mapping jinja global (#26822)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 177959ab8bfbb19ff969eb0545f58dd5a6edd0c8
Author: pierrejeambrun <pi...@gmail.com>
AuthorDate: Mon Oct 3 18:58:19 2022 +0200

    Add missing colors to state_color_mapping jinja global (#26822)
    
    (cherry picked from commit 9bd304628c57f43b22f05001070ed58eb2ca2164)
---
 airflow/settings.py                        |  3 +++
 airflow/www/jest-setup.js                  |  3 +++
 docs/apache-airflow/howto/customize-ui.rst | 13 ++++++++-----
 3 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/airflow/settings.py b/airflow/settings.py
index fbae3c9883..10e963d5f7 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -94,8 +94,11 @@ STATE_COLORS = {
     "deferred": "mediumpurple",
     "failed": "red",
     "queued": "gray",
+    "removed": "lightgrey",
+    "restarting": "violet",
     "running": "lime",
     "scheduled": "tan",
+    "shutdown": "blue",
     "skipped": "hotpink",
     "success": "green",
     "up_for_reschedule": "turquoise",
diff --git a/airflow/www/jest-setup.js b/airflow/www/jest-setup.js
index 14974f3f65..55a83464e5 100644
--- a/airflow/www/jest-setup.js
+++ b/airflow/www/jest-setup.js
@@ -44,8 +44,11 @@ global.stateColors = {
   deferred: 'mediumpurple',
   failed: 'red',
   queued: 'gray',
+  removed: 'lightgrey',
+  restarting: 'violet',
   running: 'lime',
   scheduled: 'tan',
+  shutdown: 'blue',
   skipped: 'hotpink',
   success: 'green',
   up_for_reschedule: 'turquoise',
diff --git a/docs/apache-airflow/howto/customize-ui.rst b/docs/apache-airflow/howto/customize-ui.rst
index 2258b588a1..664502a596 100644
--- a/docs/apache-airflow/howto/customize-ui.rst
+++ b/docs/apache-airflow/howto/customize-ui.rst
@@ -36,16 +36,19 @@ following steps:
     .. code-block:: python
 
       STATE_COLORS = {
+          "deferred": "mediumpurple",
+          "failed": "firebrick",
           "queued": "darkgray",
+          "removed": "lightgrey",
+          "restarting": "violet",
           "running": "#01FF70",
+          "scheduled": "tan",
+          "shutdown": "blue",
+          "skipped": "darkorchid",
           "success": "#2ECC40",
-          "failed": "firebrick",
-          "up_for_retry": "yellow",
           "up_for_reschedule": "turquoise",
+          "up_for_retry": "yellow",
           "upstream_failed": "orange",
-          "skipped": "darkorchid",
-          "scheduled": "tan",
-          "deferred": "mediumpurple",
       }
 
 


[airflow] 23/41: Fix running debuggers inside `airflow tasks test` (#26806)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9e31a1154a4d9af56bb8b7ec1dd05c3e26dceabf
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Fri Sep 30 18:46:14 2022 +0100

    Fix running debuggers inside `airflow tasks test` (#26806)
    
    As part of 2.3.3 we added redaction to output from the tasks test
    command, but unfortunately that broke using a debugger with this error:
    
    ```
      File "/usr/lib/python3.10/pdb.py", line 262, in user_line
        self.interaction(frame, None)
      File "/home/ash/.virtualenvs/airflow/lib/python3.10/site-packages/pdb.py", line 231, in interaction
        self._cmdloop()
      File "/usr/lib/python3.10/pdb.py", line 322, in _cmdloop
        self.cmdloop()
      File "/usr/lib/python3.10/cmd.py", line 126, in cmdloop
        line = input(self.prompt)
    TypeError: 'NoneType' object cannot be interpreted as an integer
    ```
    
    (ipdb has a similar but different error)
    
    The "fix" is to assign a fileno attribute to the object. `input()` needs
    this to write the prompt. It feels like a "bug" that it doesn't work
    without it, but as this class is only used in `tasks test` this is a
    safe change
    
    (cherry picked from commit 029ebacd9cbbb5e307a03530bdaf111c2c3d4f51)
---
 airflow/utils/log/secrets_masker.py    |  1 +
 tests/utils/log/test_secrets_masker.py | 12 ++++++++++++
 2 files changed, 13 insertions(+)

diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py
index 4200056abc..17234bf408 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -271,6 +271,7 @@ class RedactedIO(TextIO):
 
     def __init__(self):
         self.target = sys.stdout
+        self.fileno = sys.stdout.fileno
 
     def write(self, s: str) -> int:
         s = redact(s)
diff --git a/tests/utils/log/test_secrets_masker.py b/tests/utils/log/test_secrets_masker.py
index c07be27bb7..6de30eb574 100644
--- a/tests/utils/log/test_secrets_masker.py
+++ b/tests/utils/log/test_secrets_masker.py
@@ -18,9 +18,11 @@ from __future__ import annotations
 
 import contextlib
 import inspect
+import io
 import logging
 import logging.config
 import os
+import sys
 import textwrap
 
 import pytest
@@ -363,3 +365,13 @@ class TestRedactedIO:
         RedactedIO().write(p)
         stdout = capsys.readouterr().out
         assert stdout == "***"
+
+    def test_input_builtin(self, monkeypatch):
+        """
+        Test that when redirect is inplace the `input()` builtin works.
+
+        This is used by debuggers!
+        """
+        monkeypatch.setattr(sys, 'stdin', io.StringIO("a\n"))
+        with contextlib.redirect_stdout(RedactedIO()):
+            assert input() == "a"


[airflow] 35/41: Remove info log about closing parent pipe (#27054)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2f8ba6232265c6c7396788338dea928f41944b82
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Fri Oct 14 14:59:35 2022 +0100

    Remove info log about closing parent pipe (#27054)
    
    This log line is better removed, it doesn't seem like it's worth having since now it pollutes
    the logs whenever there's an import error
    
    (cherry picked from commit 3310618d4a8f1119fb734ea040311db731a54c26)
---
 airflow/dag_processing/processor.py | 2 --
 1 file changed, 2 deletions(-)

diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index cebed6a13c..de3572906b 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -136,8 +136,6 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
 
         # Since we share all open FDs from the parent, we need to close the parent side of the pipe here in
         # the child, else it won't get closed properly until we exit.
-        log.info("Closing parent pipe")
-
         parent_channel.close()
         del parent_channel
 


[airflow] 18/41: Ensure the log messages from operators during parsing go somewhere (#26779)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 88bed082e67c04b64bc024a21fddcee9f372f711
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Fri Sep 30 14:08:41 2022 +0100

    Ensure the log messages from operators during parsing go somewhere (#26779)
    
    * Ensure the log messages from operators during parsing go somewhere
    
    While investigating #26599 and the change from AIP-45, I noticed that
    these warning messages weren't new! The only thing that was new was that
    we started seeing them.
    
    This is because the logger for BaseOperator and all subclasses is
    `airflow.task.operators`, and the `airflow.task` logger is not
    configured (with `set_context()`) until we have a TaskInstance, so it
    just dropped all messages on the floor!
    
    This changes it so that log messages are propagated to parent loggers by
    default, but when we configure a context (and thus have a file to write
    to) we stop that. A similar change was made for the `airflow.processor`
    (but that is unlikely to suffer the same fate)
    
    * Give a real row count value so logs don't fail
    
    The ArangoDB sensor test was logging a mock object, which previously was
    getting dropped before emitting, but with this change now fails with
    "Mock is not an integer" when attempting the  `%d` interpolation.
    
    To avoid making the mock overly specific (`arangodb_client_for_test.db.`
    `return_value.aql.execute.return_value.count.return_value`!) I have
    changed the test to mock the hook entirely (which is already tested)
    
    (cherry picked from commit 7363e35c9dbb9860eabf2444307f4d6f8140ab70)
---
 airflow/config_templates/airflow_local_settings.py |  6 ++++--
 airflow/utils/log/file_processor_handler.py        |  3 +++
 airflow/utils/log/file_task_handler.py             |  3 +++
 airflow/utils/log/logging_mixin.py                 | 18 ++++++++++--------
 tests/conftest.py                                  | 11 +++++++++++
 tests/models/test_baseoperator.py                  | 10 ++++++++++
 tests/providers/arangodb/sensors/test_arangodb.py  |  8 ++++----
 7 files changed, 45 insertions(+), 14 deletions(-)

diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index e08274de31..317544ca7e 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -120,12 +120,14 @@ DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
         'airflow.processor': {
             'handlers': ['processor_to_stdout' if DAG_PROCESSOR_LOG_TARGET == "stdout" else 'processor'],
             'level': LOG_LEVEL,
-            'propagate': False,
+            # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
+            'propagate': True,
         },
         'airflow.task': {
             'handlers': ['task'],
             'level': LOG_LEVEL,
-            'propagate': False,
+            # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
+            'propagate': True,
             'filters': ['mask_secrets'],
         },
         'flask_appbuilder': {
diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py
index e9d2923978..11e473ecdf 100644
--- a/airflow/utils/log/file_processor_handler.py
+++ b/airflow/utils/log/file_processor_handler.py
@@ -24,6 +24,7 @@ from pathlib import Path
 
 from airflow import settings
 from airflow.utils.helpers import parse_template_string
+from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 
 
@@ -64,6 +65,8 @@ class FileProcessorHandler(logging.Handler):
             self._symlink_latest_log_directory()
             self._cur_date = datetime.today()
 
+        return DISABLE_PROPOGATE
+
     def emit(self, record):
         if self.handler is not None:
             self.handler.emit(record)
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 80addd3ded..a84dabf7d1 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -29,6 +29,7 @@ from airflow.configuration import AirflowConfigException, conf
 from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.utils.context import Context
 from airflow.utils.helpers import parse_template_string, render_template_to_string
+from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 from airflow.utils.session import create_session
 
@@ -72,6 +73,8 @@ class FileTaskHandler(logging.Handler):
             self.handler.setFormatter(self.formatter)
         self.handler.setLevel(self.level)
 
+        return DISABLE_PROPOGATE
+
     def emit(self, record):
         if self.handler:
             self.handler.emit(record)
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
index 89bddb3558..b54a6f3baa 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -28,6 +28,9 @@ from typing import IO
 # 7-bit C1 ANSI escape sequences
 ANSI_ESCAPE = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
 
+# Private: A sentinel object
+DISABLE_PROPOGATE = object()
+
 
 def remove_escape_codes(text: str) -> str:
     """
@@ -179,15 +182,14 @@ def set_context(logger, value):
     :param logger: logger
     :param value: value to set
     """
-    _logger = logger
-    while _logger:
-        for handler in _logger.handlers:
+    while logger:
+        for handler in logger.handlers:
             # Not all handlers need to have context passed in so we ignore
             # the error when handlers do not have set_context defined.
             set_context = getattr(handler, 'set_context', None)
-            if set_context:
-                set_context(value)
-        if _logger.propagate is True:
-            _logger = _logger.parent
+            if set_context and set_context(value) is DISABLE_PROPOGATE:
+                logger.propagate = False
+        if logger.propagate is True:
+            logger = logger.parent
         else:
-            _logger = None
+            break
diff --git a/tests/conftest.py b/tests/conftest.py
index cb5f449eb1..1026dc3c01 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -840,3 +840,14 @@ def create_log_template(request):
         request.addfinalizer(_delete_log_template)
 
     return _create_log_template
+
+
+@pytest.fixture()
+def reset_logging_config():
+    import logging.config
+
+    from airflow import settings
+    from airflow.utils.module_loading import import_string
+
+    logging_config = import_string(settings.LOGGING_CLASS_PATH)
+    logging.config.dictConfig(logging_config)
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index 6dd0e22396..f1728db9c9 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -685,6 +685,16 @@ class TestBaseOperator:
         op = BaseOperator(task_id="test_task", weight_rule="upstream")
         assert WeightRule.UPSTREAM == op.weight_rule
 
+    # ensure the default logging config is used for this test, no matter what ran before
+    @pytest.mark.usefixtures('reset_logging_config')
+    def test_logging_propogated_by_default(self, caplog):
+        """Test that when set_context hasn't been called that log records are emitted"""
+        BaseOperator(task_id="test").log.warning("test")
+        # This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing
+        # the other case (that when we have set_context it goes to the file is harder to achieve without
+        # leaking a lot of state)
+        assert caplog.messages == ["test"]
+
 
 def test_init_subclass_args():
     class InitSubclassOp(BaseOperator):
diff --git a/tests/providers/arangodb/sensors/test_arangodb.py b/tests/providers/arangodb/sensors/test_arangodb.py
index 3d03724634..be75e9845f 100644
--- a/tests/providers/arangodb/sensors/test_arangodb.py
+++ b/tests/providers/arangodb/sensors/test_arangodb.py
@@ -26,7 +26,7 @@ from airflow.providers.arangodb.sensors.arangodb import AQLSensor
 from airflow.utils import db, timezone
 
 DEFAULT_DATE = timezone.datetime(2017, 1, 1)
-arangodb_client_mock = Mock(name="arangodb_client_for_test")
+arangodb_hook_mock = Mock(name="arangodb_hook_for_test", **{'query.return_value.count.return_value': 1})
 
 
 class TestAQLSensor(unittest.TestCase):
@@ -46,9 +46,9 @@ class TestAQLSensor(unittest.TestCase):
         )
 
     @patch(
-        "airflow.providers.arangodb.hooks.arangodb.ArangoDBClient",
+        "airflow.providers.arangodb.sensors.arangodb.ArangoDBHook",
         autospec=True,
-        return_value=arangodb_client_mock,
+        return_value=arangodb_hook_mock,
     )
     def test_arangodb_document_created(self, arangodb_mock):
         query = "FOR doc IN students FILTER doc.name == 'judy' RETURN doc"
@@ -62,4 +62,4 @@ class TestAQLSensor(unittest.TestCase):
         )
 
         arangodb_tag_sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-        assert arangodb_mock.return_value.db.called
+        assert arangodb_hook_mock.query.return_value.count.called


[airflow] 12/41: Remove TaskFail duplicates check (#26714)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a325ba7327809c80e384976427b60bd733aa5f6e
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Sep 27 22:12:54 2022 -0700

    Remove TaskFail duplicates check (#26714)
    
    This check was added in error.  The duplicates are supposed to be there.  The task duration
    and gantt views process TaskFail records with the apparent intention of including durations
    from non-final failed attempts to the overall duration of the run.
    
    (cherry picked from commit ee21c1bac4cb5bb1c19ea9e5e84ee9b5854ab039)
---
 airflow/utils/db.py | 39 +++++++++++++++++++--------------------
 1 file changed, 19 insertions(+), 20 deletions(-)

diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 3acbaf6f22..cec3d58c36 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -941,31 +941,31 @@ def reflect_tables(tables: list[Base | str] | None, session):
     return metadata
 
 
-def check_task_fail_for_duplicates(session):
-    """Check that there are no duplicates in the task_fail table before creating FK"""
-    from airflow.models.taskfail import TaskFail
-
-    metadata = reflect_tables([TaskFail], session)
-    task_fail = metadata.tables.get(TaskFail.__tablename__)  # type: ignore
-    if task_fail is None:  # table not there
-        return
-    if "run_id" in task_fail.columns:  # upgrade already applied
-        return
-    yield from check_table_for_duplicates(
-        table_name=task_fail.name,
-        uniqueness=['dag_id', 'task_id', 'execution_date'],
-        session=session,
-        version='2.3',
-    )
-
-
 def check_table_for_duplicates(
     *, session: Session, table_name: str, uniqueness: list[str], version: str
 ) -> Iterable[str]:
     """
     Check table for duplicates, given a list of columns which define the uniqueness of the table.
 
-    Call from ``run_duplicates_checks``.
+    Usage example:
+
+    .. code-block:: python
+
+        def check_task_fail_for_duplicates(session):
+            from airflow.models.taskfail import TaskFail
+
+            metadata = reflect_tables([TaskFail], session)
+            task_fail = metadata.tables.get(TaskFail.__tablename__)  # type: ignore
+            if task_fail is None:  # table not there
+                return
+            if "run_id" in task_fail.columns:  # upgrade already applied
+                return
+            yield from check_table_for_duplicates(
+                table_name=task_fail.name,
+                uniqueness=['dag_id', 'task_id', 'execution_date'],
+                session=session,
+                version='2.3',
+            )
 
     :param table_name: table name to check
     :param uniqueness: uniqueness constraint to evaluate against
@@ -1388,7 +1388,6 @@ def _check_migration_errors(session: Session = NEW_SESSION) -> Iterable[str]:
     :rtype: list[str]
     """
     check_functions: tuple[Callable[..., Iterable[str]], ...] = (
-        check_task_fail_for_duplicates,
         check_conn_id_duplicates,
         check_conn_type_null,
         check_run_id_null,


[airflow] 38/41: Handle mapped tasks in task duration chart (#26722)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fa62205ff977ffa0755380d3f8ea41089ec36d73
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Sep 27 23:31:22 2022 -0700

    Handle mapped tasks in task duration chart (#26722)
    
    Previously mapped tasks each existed as their own independent-but-unlabeled points.  Makes sense to aggregate them somehow.
    
    (cherry picked from commit 8829c079a675a60b78a4bc3a00f91ebe33b27eb6)
---
 airflow/www/views.py | 26 ++++++++++++++++----------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index ce322d7a62..8a45a15444 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2900,7 +2900,6 @@ class Airflow(AirflowBaseView):
 
         y_points = defaultdict(list)
         x_points = defaultdict(list)
-        cumulative_y = defaultdict(list)
 
         task_instances = dag.get_task_instances_before(base_date, num_runs, session=session)
         if task_instances:
@@ -2918,7 +2917,6 @@ class Airflow(AirflowBaseView):
         )
         if dag.partial:
             ti_fails = ti_fails.filter(TaskFail.task_id.in_([t.task_id for t in dag.tasks]))
-
         fails_totals = defaultdict(int)
         for failed_task_instance in ti_fails:
             dict_key = (
@@ -2929,13 +2927,21 @@ class Airflow(AirflowBaseView):
             if failed_task_instance.duration:
                 fails_totals[dict_key] += failed_task_instance.duration
 
-        for task_instance in task_instances:
-            if task_instance.duration:
-                date_time = wwwutils.epoch(task_instance.execution_date)
-                x_points[task_instance.task_id].append(date_time)
-                fails_dict_key = (task_instance.dag_id, task_instance.task_id, task_instance.run_id)
+        # we must group any mapped TIs by dag_id, task_id, run_id
+        mapped_tis = set()
+        tis_grouped = itertools.groupby(task_instances, lambda x: (x.dag_id, x.task_id, x.run_id))
+        for key, tis in tis_grouped:
+            tis = list(tis)
+            duration = sum(x.duration for x in tis if x.duration)
+            if duration:
+                first_ti = tis[0]
+                if first_ti.map_index >= 0:
+                    mapped_tis.add(first_ti.task_id)
+                date_time = wwwutils.epoch(first_ti.execution_date)
+                x_points[first_ti.task_id].append(date_time)
+                fails_dict_key = (first_ti.dag_id, first_ti.task_id, first_ti.run_id)
                 fails_total = fails_totals[fails_dict_key]
-                y_points[task_instance.task_id].append(float(task_instance.duration + fails_total))
+                y_points[first_ti.task_id].append(float(duration + fails_total))
 
         cumulative_y = {k: list(itertools.accumulate(v)) for k, v in y_points.items()}
 
@@ -2951,12 +2957,12 @@ class Airflow(AirflowBaseView):
 
         for task_id in x_points:
             chart.add_serie(
-                name=task_id,
+                name=task_id + '[]' if task_id in mapped_tis else task_id,
                 x=x_points[task_id],
                 y=scale_time_units(y_points[task_id], y_unit),
             )
             cum_chart.add_serie(
-                name=task_id,
+                name=task_id + '[]' if task_id in mapped_tis else task_id,
                 x=x_points[task_id],
                 y=scale_time_units(cumulative_y[task_id], cum_y_unit),
             )


[airflow] 32/41: Fix auto refresh for graph view (#26926)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 66724540e7e4a0246625a0970b3dee531ec88e6e
Author: pierrejeambrun <pi...@gmail.com>
AuthorDate: Tue Oct 11 17:04:32 2022 +0200

    Fix auto refresh for graph view (#26926)
    
    * Fix auto refresh for graph view
    
    * Add task_instances view test
    
    * Use freezegun to mock datetime
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 64622929a043436b235b9fb61fb076c5d2e02124)
---
 airflow/www/static/js/graph.js      |   5 +-
 tests/www/views/test_views_tasks.py | 271 ++++++++++++++++++++++++++++++++----
 2 files changed, 249 insertions(+), 27 deletions(-)

diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index 6e34cc2829..715c46b3e2 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -434,11 +434,10 @@ function handleRefresh() {
         // only refresh if the data has changed
         if (prevTis !== tis) {
         // eslint-disable-next-line no-global-assign
-          taskInstances = JSON.parse(tis);
-          updateNodesStates(taskInstances);
+          updateNodesStates(tis);
 
           // Only redraw the graph if labels have changed
-          const haveLabelsChanged = updateNodeLabels(nodes, taskInstances);
+          const haveLabelsChanged = updateNodeLabels(nodes, tis);
           if (haveLabelsChanged) draw();
 
           // end refresh if all states are final
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 2b6ca3c271..7d1ee8680e 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -24,6 +24,7 @@ import unittest.mock
 import urllib.parse
 from datetime import timedelta
 
+import freezegun
 import pytest
 
 from airflow import settings
@@ -60,30 +61,31 @@ def reset_dagruns():
 
 @pytest.fixture(autouse=True)
 def init_dagruns(app, reset_dagruns):
-    app.dag_bag.get_dag("example_bash_operator").create_dagrun(
-        run_id=DEFAULT_DAGRUN,
-        run_type=DagRunType.SCHEDULED,
-        execution_date=DEFAULT_DATE,
-        data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-        start_date=timezone.utcnow(),
-        state=State.RUNNING,
-    )
-    app.dag_bag.get_dag("example_subdag_operator").create_dagrun(
-        run_id=DEFAULT_DAGRUN,
-        run_type=DagRunType.SCHEDULED,
-        execution_date=DEFAULT_DATE,
-        data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-        start_date=timezone.utcnow(),
-        state=State.RUNNING,
-    )
-    app.dag_bag.get_dag("example_xcom").create_dagrun(
-        run_id=DEFAULT_DAGRUN,
-        run_type=DagRunType.SCHEDULED,
-        execution_date=DEFAULT_DATE,
-        data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-        start_date=timezone.utcnow(),
-        state=State.RUNNING,
-    )
+    with freezegun.freeze_time(DEFAULT_DATE):
+        app.dag_bag.get_dag("example_bash_operator").create_dagrun(
+            run_id=DEFAULT_DAGRUN,
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+        )
+        app.dag_bag.get_dag("example_subdag_operator").create_dagrun(
+            run_id=DEFAULT_DAGRUN,
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+        )
+        app.dag_bag.get_dag("example_xcom").create_dagrun(
+            run_id=DEFAULT_DAGRUN,
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+        )
     yield
     clear_db_runs()
 
@@ -993,3 +995,224 @@ def test_graph_view_doesnt_fail_on_recursion_error(app, dag_maker, admin_client)
         url = f'/dags/{dag.dag_id}/graph'
         resp = admin_client.get(url, follow_redirects=True)
         assert resp.status_code == 200
+
+
+def test_task_instances(admin_client):
+    """Test task_instances view."""
+    resp = admin_client.get(
+        f'/object/task_instances?dag_id=example_bash_operator&execution_date={DEFAULT_DATE}',
+        follow_redirects=True,
+    )
+    assert resp.status_code == 200
+    assert resp.json == {
+        'also_run_this': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 2,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'also_run_this',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'run_after_loop': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 2,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'run_after_loop',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'run_this_last': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'EmptyOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 1,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'run_this_last',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'runme_0': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 3,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'runme_0',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'runme_1': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 3,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'runme_1',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'runme_2': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 3,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'runme_2',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'this_will_skip': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 2,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'this_will_skip',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+    }


[airflow] 05/41: Fix version for a couple configurations (#26491)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a643184ed94aa3c535cea0586df9e5ecb4c4a07e
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Sep 19 08:15:16 2022 -0700

    Fix version for a couple configurations (#26491)
    
    These were released with 2.4.0.
    
    (cherry picked from commit cc4902d21024ae1c700ddfc8bd33e91594de3dab)
---
 airflow/config_templates/config.yml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 97e544fc83..b20fea7ff2 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -253,7 +253,7 @@
       description: |
           The number of seconds each task is going to wait by default between retries. Can be overridden at
           dag or task level.
-      version_added: 2.3.2
+      version_added: 2.4.0
       type: integer
       example: ~
       default: "300"
@@ -669,7 +669,7 @@
     - name: dag_processor_log_format
       description: |
         Format of Dag Processor Log line
-      version_added: 2.3.4
+      version_added: 2.4.0
       type: string
       example: ~
       default: "[%%(asctime)s] [SOURCE:DAG_PROCESSOR]


[airflow] 31/41: fix next run dataset modal links (#26897)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b9e09af530dc597ae4b0e627d71b990616caf1de
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Thu Oct 6 10:34:42 2022 -0400

    fix next run dataset modal links (#26897)
    
    (cherry picked from commit 8898db999c88c98b71f4a5999462e6858aab10eb)
---
 airflow/www/static/js/datasetUtils.js | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/www/static/js/datasetUtils.js b/airflow/www/static/js/datasetUtils.js
index f0e63d4bb3..e88b1ad040 100644
--- a/airflow/www/static/js/datasetUtils.js
+++ b/airflow/www/static/js/datasetUtils.js
@@ -33,7 +33,7 @@ export function openDatasetModal(dagId, summary = '', nextDatasets = [], error =
 
     const uriCell = document.createElement('td');
     const datasetLink = document.createElement('a');
-    datasetLink.href = `${datasetsUrl}?dataset_uri=${encodeURIComponent(d.id)}`;
+    datasetLink.href = `${datasetsUrl}?uri=${encodeURIComponent(d.uri)}`;
     datasetLink.innerText = d.uri;
     uriCell.append(datasetLink);
 


[airflow] 16/41: Allow retrieving error message from data.detail (#26762)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit efe11425415253c6b4328b848c1fb9019c1399fc
Author: pierrejeambrun <pi...@gmail.com>
AuthorDate: Thu Sep 29 16:06:52 2022 +0200

    Allow retrieving error message from data.detail (#26762)
    
    (cherry picked from commit bec80af0718e44212d02a969a65d3201648735f4)
---
 airflow/www/static/js/utils/useErrorToast.ts | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/www/static/js/utils/useErrorToast.ts b/airflow/www/static/js/utils/useErrorToast.ts
index f361af0491..4ee0a085e5 100644
--- a/airflow/www/static/js/utils/useErrorToast.ts
+++ b/airflow/www/static/js/utils/useErrorToast.ts
@@ -28,7 +28,7 @@ export const getErrorDescription = (error?: ErrorType, fallbackMessage?: string)
   if (error instanceof Error) {
     let { message } = error;
     if (axios.isAxiosError(error)) {
-      message = error.response?.data || error.message;
+      message = error.response?.data?.detail || error.response?.data || error.message;
     }
     return message;
   }