You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/14 15:09:02 UTC

[GitHub] [airflow] xinbinhuang commented on a change in pull request #15125: Add `latest_only` parameter to BaseOperator

xinbinhuang commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613334840



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       Should we push this down to TI? i.e. https://github.com/astronomer/airflow/blob/e4c0689535f1353c9e647773c06bedf8cd22b239/airflow/models/taskinstance.py#L1288
   
   Though currently, only SubDagOperator overwrites the `pre_execute` hook, I think the `pre_execute` hook is still a public API and users may override when they build custom operators.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org