You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by mo...@apache.org on 2023/07/21 10:46:37 UTC

[airflow] branch listener-move-onrunning-callback updated (3c7ba730f2 -> 53d31fd5df)

This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a change to branch listener-move-onrunning-callback
in repository https://gitbox.apache.org/repos/asf/airflow.git


    omit 3c7ba730f2 listener: call on_task_instance_running after rendering templates
     add 8b5da2134d Update Dag trigger API and command docs (#32696)
     add 27b5f696a4 Add Deferrable mode for EMR Serverless Start Job Operator (#32534)
     add 848c69a194 Refresh GKE OAuth2 tokens (#32673)
     add 8b7ae76026 Fixup docs and optimize system test for DataprocSubmitJobOperator (Hadoop job) (#32722)
     add e8287734cb Fixup docstring for deprecated DataprocSubmitHiveJobOperator (#32723)
     add dda3dcdcfc Add deferrable mode to ExternalTaskSensor (#29260)
     add ac524826f3 Refactor setup/teardown ctx mgr to operate freely with other task definitions (#32687)
     add 3c14753b03 Fix BigQueryGetDataOperator where project_id is not being respected in deferrable mode (#32488)
     add 99b8a90346 Filtering and ordering results of DataprocListBatchesOperator (#32500)
     add 8e67546660 Fix DataformCreateWorkflowInvocationOperator system test (#32599)
     add 75ed3bc3f8 [bugfix] fix AWS triggers where deserialization would crash if region was not specified (#32729)
     add d70fecfaf6 Add initial docs for setup / teardown (#32169)
     add 15d42b4320 Reduce default for max TIs per query, enforce <= parallelism (#32572)
     add c7c0deecb4 Refactor Sqlalchemy queries to 2.0 style (Part 6) (#32645)
     add b09e1f97d5 Fix dagProcessor not including webserver-config volume (#32644)
     new 53d31fd5df listener: call on_task_instance_running after rendering templates

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3c7ba730f2)
            \
             N -- N -- N   refs/heads/listener-move-onrunning-callback (53d31fd5df)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/api_connexion/openapi/v1.yaml              |   5 +-
 airflow/cli/cli_config.py                          |   5 +-
 airflow/config_templates/config.yml                |   5 +-
 airflow/configuration.py                           |  20 ++
 .../example_setup_teardown_taskflow.py             |  85 +++----
 airflow/models/serialized_dag.py                   |  50 +++--
 airflow/models/taskinstance.py                     |   5 +-
 airflow/models/trigger.py                          |  75 ++++---
 airflow/providers/amazon/aws/operators/emr.py      |  52 ++++-
 airflow/providers/amazon/aws/triggers/batch.py     |   2 +-
 airflow/providers/amazon/aws/triggers/ecs.py       |   4 +-
 airflow/providers/amazon/aws/triggers/eks.py       |   6 +-
 airflow/providers/amazon/aws/triggers/emr.py       |  74 ++++++-
 airflow/providers/amazon/aws/triggers/rds.py       |   6 +-
 airflow/providers/google/cloud/hooks/dataproc.py   |  12 +
 .../google/cloud/hooks/kubernetes_engine.py        |   4 +
 .../providers/google/cloud/operators/bigquery.py   |  34 ++-
 .../providers/google/cloud/operators/dataproc.py   |  20 +-
 airflow/sensors/external_task.py                   |  43 ++++
 airflow/triggers/external_task.py                  |  80 +++++--
 airflow/utils/cli.py                               |   3 +-
 airflow/utils/db_cleanup.py                        |   6 +-
 airflow/utils/setup_teardown.py                    | 246 ++++++++++++++++-----
 airflow/www/static/js/types/api-generated.ts       |   2 +
 .../dag-processor/dag-processor-deployment.yaml    |   5 +
 .../operators/emr/emr_serverless.rst               |   2 +
 .../administration-and-deployment/scheduler.rst    |   7 +-
 docs/apache-airflow/core-concepts/dags.rst         |  18 +-
 docs/apache-airflow/howto/index.rst                |   1 +
 .../howto/operator/external_task_sensor.rst        |   9 +
 docs/apache-airflow/howto/setup-and-teardown.rst   | 184 +++++++++++++++
 docs/spelling_wordlist.txt                         |   3 +
 helm_tests/airflow_core/test_dag_processor.py      |  39 ++++
 newsfragments/32572.significant.rst                |  10 +
 tests/core/test_configuration.py                   |  27 +++
 tests/decorators/test_setup_teardown.py            |  44 ++++
 .../amazon/aws/operators/test_emr_serverless.py    |  41 +++-
 .../{test_batch.py => test_emr_serverless.py}      |  43 ++--
 .../providers/google/cloud/hooks/test_dataproc.py  |   4 +
 .../google/cloud/hooks/test_kubernetes_engine.py   |  46 ++++
 .../google/cloud/operators/test_bigquery.py        |  13 +-
 .../google/cloud/operators/test_dataproc.py        |   6 +
 tests/sensors/test_external_task_sensor.py         |  82 ++++++-
 .../system/providers/core}/__init__.py             |   0
 .../example_external_task_child_deferrable.py}     |  25 ++-
 .../example_external_task_parent_deferrable.py}    |  71 +++---
 .../google/cloud/dataform/example_dataform.py      |  19 +-
 .../cloud/dataproc/example_dataproc_hadoop.py      |  32 +--
 .../google/cloud/dataproc/example_dataproc_hive.py |  11 +-
 tests/triggers/test_external_task.py               |   6 +
 50 files changed, 1278 insertions(+), 314 deletions(-)
 create mode 100644 docs/apache-airflow/howto/setup-and-teardown.rst
 create mode 100644 newsfragments/32572.significant.rst
 copy tests/providers/amazon/aws/triggers/{test_batch.py => test_emr_serverless.py} (59%)
 copy {airflow/api_connexion => tests/system/providers/core}/__init__.py (100%)
 copy tests/{dags_corrupted/test_nonstring_owner.py => system/providers/core/example_external_task_child_deferrable.py} (65%)
 copy tests/system/providers/{google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py => core/example_external_task_parent_deferrable.py} (50%)


[airflow] 01/01: listener: call on_task_instance_running after rendering templates

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch listener-move-onrunning-callback
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 53d31fd5df87c6ce056030cb32fc1ad23f0110d5
Author: Maciej Obuchowski <ob...@gmail.com>
AuthorDate: Thu Jul 20 14:20:00 2023 +0200

    listener: call on_task_instance_running after rendering templates
    
    Signed-off-by: Maciej Obuchowski <ob...@gmail.com>
---
 airflow/models/taskinstance.py | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 4abfd94cd0..520d07f092 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1498,14 +1498,9 @@ class TaskInstance(Base, LoggingMixin):
         self.task = self.task.prepare_for_execution()
         context = self.get_template_context(ignore_param_exceptions=False)
 
-        # We lose previous state because it's changed in other process in LocalTaskJob.
-        # We could probably pass it through here though...
-        get_listener_manager().hook.on_task_instance_running(
-            previous_state=TaskInstanceState.QUEUED, task_instance=self, session=session
-        )
         try:
             if not mark_success:
-                self._execute_task_with_callbacks(context, test_mode)
+                self._execute_task_with_callbacks(context, test_mode, session=session)
             if not test_mode:
                 self.refresh_from_db(lock_for_update=True, session=session)
             self.state = TaskInstanceState.SUCCESS
@@ -1601,7 +1596,7 @@ class TaskInstance(Base, LoggingMixin):
                     session=session,
                 )
 
-    def _execute_task_with_callbacks(self, context, test_mode=False):
+    def _execute_task_with_callbacks(self, context, test_mode: bool = False, *, session: Session):
         """Prepare Task for Execution."""
         from airflow.models.renderedtifields import RenderedTaskInstanceFields
 
@@ -1651,16 +1646,23 @@ class TaskInstance(Base, LoggingMixin):
                 )
 
             # Run pre_execute callback
-            self.task.pre_execute(context=context)
+            # Is never MappedOperator at this point
+            self.task.pre_execute(context=context)  # type: ignore[union-attr]
 
             # Run on_execute callback
             self._run_execute_callback(context, self.task)
 
+            # Run on_task_instance_running event
+            get_listener_manager().hook.on_task_instance_running(
+                previous_state=TaskInstanceState.QUEUED, task_instance=self, session=session
+            )
+
             # Execute the task
             with set_current_context(context):
                 result = self._execute_task(context, task_orig)
             # Run post_execute callback
-            self.task.post_execute(context=context, result=result)
+            # Is never MappedOperator at this point
+            self.task.post_execute(context=context, result=result)  # type: ignore[union-attr]
 
         Stats.incr(f"operator_successes_{self.task.task_type}", tags=self.stats_tags)
         # Same metric with tagging