You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/01 05:13:59 UTC

[GitHub] [airflow] dstandish opened a new pull request #15125: Add latest only decorator

dstandish opened a new pull request #15125:
URL: https://github.com/apache/airflow/pull/15125


   This decorator lets us implement latest only behavior without adding an extraneous task.
   
   You can make any operator a latest only operator by applying this decorator to `execute`.
   
   I thought for a minute about pulling the `is_latest_execution` logic out into a function shared by both latest only decorator
   and latest only operator, but the logic is just a few lines so I decided it wasn't worth the added obfuscation and object proliferation.
   
   Please let me know what you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613334840



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       Should we push this down to TI? i.e. 
   
   https://github.com/apache/airflow/blob/75603160848e4199ed368809dfd441dcc5ddbd82/airflow/models/taskinstance.py#L1288
   
   Though currently, only SubDagOperator overwrites the `pre_execute` hook, I think the `pre_execute` hook is still a public API and users may override when they build custom operators.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15125: Add latest only decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r605492436



##########
File path: airflow/utils/decorators.py
##########
@@ -97,6 +99,44 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
     return cast(T, wrapper)
 
 
+def latest_only(f):
+    """
+    Decorator for adding skip-if-not-latest behavior to any operator.
+
+    Can be disabled if the class has an attribute ``latest_only`` with value ``False``
+    """
+
+    def skip_if_not_latest(context):
+        if not context:  # assume run interactively
+            return
+        dag_run = context.get('dag_run')
+        if dag_run and dag_run.external_trigger:
+            print("Externally triggered DAG_Run: allowing execution to proceed.")

Review comment:
       Should probably be log, not print.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #15125: Add latest only decorator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r606714730



##########
File path: airflow/example_dags/example_latest_only_decorator.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example of the `latest_only` decorator"""
+
+from airflow import DAG
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.decorators import latest_only
+
+
+# [START example]
+class MyLatestOnlyOperator(BaseOperator):  # pylint: disable=missing-class-docstring
+    def __init__(self, latest_only=True, **kwargs):  # pylint: disable=redefined-outer-name
+        self.latest_only = latest_only
+        if self.latest_only:
+            self.message = 'I will skip all but the latest execution'
+        else:
+            self.message = 'I will never skip'
+        super().__init__(**kwargs)
+
+    @latest_only
+    def execute(self, context):
+        print(self.message)
+
+
+class MyBashOnlyOperator(BashOperator):  # pylint: disable=missing-class-docstring
+    @latest_only
+    def execute(self, context):
+        print('I will skip all but the latest execution')
+        super().execute(context=context)

Review comment:
       you know what, maybe latest only should just be added as a parameter to base operator?  that would seem to fit with airflow style more so than wrapping in this function. wdyt?
   so that every operator has this `latest_only` parametr




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #15125:
URL: https://github.com/apache/airflow/pull/15125


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #15125: Add latest only decorator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r605758162



##########
File path: airflow/utils/decorators.py
##########
@@ -97,6 +99,44 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
     return cast(T, wrapper)
 
 
+def latest_only(f):
+    """
+    Decorator for adding skip-if-not-latest behavior to any operator.
+
+    Can be disabled if the class has an attribute ``latest_only`` with value ``False``
+    """
+
+    def skip_if_not_latest(context):
+        if not context:  # assume run interactively
+            return
+        dag_run = context.get('dag_run')
+        if dag_run and dag_run.external_trigger:
+            print("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context['dag']
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        print(
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
+    @wraps(f)
+    def wrap(self, context):
+        if not (hasattr(self, 'latest_only') and self.latest_only is False):

Review comment:
       What this does is make it so you can build a custom operator that has optional latest only behavior. So when it is used, the user can use latest only on most cases, but say in one dag disable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613488037



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       I thought about doing something like that.
   
   Concerning `pre_execute`, as with any non-abstract method the user can call `super().pre_execute`. 
   
   But I have no issue moving this out of pre_execute if that's what folks think is best.
   
   So as you've suggested, we can move it to `ti. _prepare_and_execute_task_with_callbacks`.
   
   Do you think we should keep the skip logic (i.e. function `_skip_if_not_latest`) on base operator or move this to TI?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15125: Add latest only decorator

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r606721281



##########
File path: airflow/example_dags/example_latest_only_decorator.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example of the `latest_only` decorator"""
+
+from airflow import DAG
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.decorators import latest_only
+
+
+# [START example]
+class MyLatestOnlyOperator(BaseOperator):  # pylint: disable=missing-class-docstring
+    def __init__(self, latest_only=True, **kwargs):  # pylint: disable=redefined-outer-name
+        self.latest_only = latest_only
+        if self.latest_only:
+            self.message = 'I will skip all but the latest execution'
+        else:
+            self.message = 'I will never skip'
+        super().__init__(**kwargs)
+
+    @latest_only
+    def execute(self, context):
+        print(self.message)
+
+
+class MyBashOnlyOperator(BashOperator):  # pylint: disable=missing-class-docstring
+    @latest_only
+    def execute(self, context):
+        print('I will skip all but the latest execution')
+        super().execute(context=context)

Review comment:
       I like the idea to be just `latest_only ` param, and it allows all operators to benefit from it right away!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15125: Add latest only decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r605491675



##########
File path: airflow/example_dags/example_latest_only_decorator.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example of the `latest_only` decorator"""
+
+from airflow import DAG
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.decorators import latest_only
+
+
+# [START example]
+class MyLatestOnlyOperator(BaseOperator):  # pylint: disable=missing-class-docstring
+    def __init__(self, latest_only=True, **kwargs):  # pylint: disable=redefined-outer-name
+        self.latest_only = latest_only
+        if self.latest_only:
+            self.message = 'I will skip all but the latest execution'
+        else:
+            self.message = 'I will never skip'
+        super().__init__(**kwargs)
+
+    @latest_only
+    def execute(self, context):
+        print(self.message)
+
+
+class MyBashOnlyOperator(BashOperator):  # pylint: disable=missing-class-docstring
+    @latest_only
+    def execute(self, context):
+        print('I will skip all but the latest execution')
+        super().execute(context=context)

Review comment:
       It would be nice if this could work as
   
   ```
        subclass_example = latest_only(BashOperator(task_id='latest_only_bash_task', bash_command='echo "hello world"'))
   ```
   
   or similar, but I'm not sure if that is realistically achievable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15125: Add latest only decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#issuecomment-812943575


   [The Workflow run](https://github.com/apache/airflow/actions/runs/715424255) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613334840



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       Should we push this down to TI? Somewhere around here:
   
   https://github.com/apache/airflow/blob/75603160848e4199ed368809dfd441dcc5ddbd82/airflow/models/taskinstance.py#L1286-L1289
   
   Though currently, only SubDagOperator overwrites the `pre_execute` hook, I think the `pre_execute` hook is still a public API and users may override when they build custom operators.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613023625



##########
File path: airflow/example_dags/example_latest_only_decorator.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example of the `latest_only` decorator"""
+
+from airflow import DAG
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.decorators import latest_only
+
+
+# [START example]
+class MyLatestOnlyOperator(BaseOperator):  # pylint: disable=missing-class-docstring
+    def __init__(self, latest_only=True, **kwargs):  # pylint: disable=redefined-outer-name
+        self.latest_only = latest_only
+        if self.latest_only:
+            self.message = 'I will skip all but the latest execution'
+        else:
+            self.message = 'I will never skip'
+        super().__init__(**kwargs)
+
+    @latest_only
+    def execute(self, context):
+        print(self.message)
+
+
+class MyBashOnlyOperator(BashOperator):  # pylint: disable=missing-class-docstring
+    @latest_only
+    def execute(self, context):
+        print('I will skip all but the latest execution')
+        super().execute(context=context)

Review comment:
       okie doke, i have added it to base operator, ptal 🙏 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613334840



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       Should we push this down to TI? i.e. 
   
   https://github.com/apache/airflow/blob/75603160848e4199ed368809dfd441dcc5ddbd82/airflow/models/taskinstance.py#L1286-L1289
   
   Though currently, only SubDagOperator overwrites the `pre_execute` hook, I think the `pre_execute` hook is still a public API and users may override when they build custom operators.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #15125: Add latest only decorator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r606714730



##########
File path: airflow/example_dags/example_latest_only_decorator.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example of the `latest_only` decorator"""
+
+from airflow import DAG
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.decorators import latest_only
+
+
+# [START example]
+class MyLatestOnlyOperator(BaseOperator):  # pylint: disable=missing-class-docstring
+    def __init__(self, latest_only=True, **kwargs):  # pylint: disable=redefined-outer-name
+        self.latest_only = latest_only
+        if self.latest_only:
+            self.message = 'I will skip all but the latest execution'
+        else:
+            self.message = 'I will never skip'
+        super().__init__(**kwargs)
+
+    @latest_only
+    def execute(self, context):
+        print(self.message)
+
+
+class MyBashOnlyOperator(BashOperator):  # pylint: disable=missing-class-docstring
+    @latest_only
+    def execute(self, context):
+        print('I will skip all but the latest execution')
+        super().execute(context=context)

Review comment:
       you know what, maybe latest only should just be added as a parameter to base operator?  that would seem to fit with airflow style more so than wrapping in this function. so that every operator has this `latest_only` parameter. wdyt @ashb @xinbinhuang 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613488037



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       I thought about doing something like that.
   
   One thought is pre_execute has the `@prepare_lineage` decorator so people should probably call `super().pre_execute(context)` anyway 
   
   Of course usage of lineage is probably not that common but calling super is probably "the right way"?
   
   In any case i have no issue moving this out of pre_execute.
   
   There are two questions to answer: (1) where to locate the skip logic, and (2) where to call it from
   
   Re (2) You are proposing calling from ti. _prepare_and_execute_task_with_callbacks.  I'm ok with that.
   
   Re (1) do you think we should keep the skip logic (i.e. function `_skip_if_not_latest`) on base operator? or also move to TI?  I guess TI makes sense cus it's more a scheduling concern than operator functionality.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15125: Add latest only decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#issuecomment-812943455


   [The Workflow run](https://github.com/apache/airflow/actions/runs/715420105) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#issuecomment-851088414


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15125: Add latest only decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r605492209



##########
File path: airflow/utils/decorators.py
##########
@@ -97,6 +99,44 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
     return cast(T, wrapper)
 
 
+def latest_only(f):
+    """
+    Decorator for adding skip-if-not-latest behavior to any operator.
+
+    Can be disabled if the class has an attribute ``latest_only`` with value ``False``
+    """
+
+    def skip_if_not_latest(context):
+        if not context:  # assume run interactively
+            return
+        dag_run = context.get('dag_run')
+        if dag_run and dag_run.external_trigger:
+            print("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context['dag']
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        print(
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
+    @wraps(f)
+    def wrap(self, context):
+        if not (hasattr(self, 'latest_only') and self.latest_only is False):

Review comment:
       Why do we need the attribute at all here? Surely if you don't want it, then don't apply the decorator?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #15125:
URL: https://github.com/apache/airflow/pull/15125


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613334840



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       Should we push this down to TI? i.e. https://github.com/astronomer/airflow/blob/e4c0689535f1353c9e647773c06bedf8cd22b239/airflow/models/taskinstance.py#L1288
   
   Though currently, only SubDagOperator overwrites the `pre_execute` hook, I think the `pre_execute` hook is still a public API and users may override when they build custom operators.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613488037



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       I thought about doing something like that.
   
   Concerning `pre_execute`, with any non-abstract method the user can call `super().pre_execute`. 
   
   But I have no issue moving this out of pre_execute if that's what folks think is best.
   
   So as you've suggested, we can move it to `ti. _prepare_and_execute_task_with_callbacks`.
   
   Do you think we should keep the skip logic (i.e. function `_skip_if_not_latest`) on base operator or move this to TI?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15125: Add `latest_only` parameter to BaseOperator

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613334840



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       Should we push this down to TI? Somewhere around here:
   
   https://github.com/apache/airflow/blob/75603160848e4199ed368809dfd441dcc5ddbd82/airflow/models/taskinstance.py#L1286-L1289
   
   Though currently, only SubDagOperator overwrites the `pre_execute` hook, I think the `pre_execute` hook is still a public API and users may override it when they build custom operators.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #15125: Add latest only decorator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r605759740



##########
File path: airflow/utils/decorators.py
##########
@@ -97,6 +99,44 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
     return cast(T, wrapper)
 
 
+def latest_only(f):
+    """
+    Decorator for adding skip-if-not-latest behavior to any operator.
+
+    Can be disabled if the class has an attribute ``latest_only`` with value ``False``
+    """
+
+    def skip_if_not_latest(context):
+        if not context:  # assume run interactively
+            return
+        dag_run = context.get('dag_run')
+        if dag_run and dag_run.external_trigger:
+            print("Externally triggered DAG_Run: allowing execution to proceed.")
+            return
+
+        dag = context['dag']
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        print(
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
+    @wraps(f)
+    def wrap(self, context):
+        if not (hasattr(self, 'latest_only') and self.latest_only is False):

Review comment:
       But admittedly it's not the most discoverable of features :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org