You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "samc1213 (via GitHub)" <gi...@apache.org> on 2023/09/19 03:45:51 UTC

[GitHub] [airflow] samc1213 opened a new pull request, #34464: Consolidate ExternalTaskSensor deferrable and non deferrable logic

samc1213 opened a new pull request, #34464:
URL: https://github.com/apache/airflow/pull/34464

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   related: #34204
   
   The existing `poke` logic for the `ExternalTaskSensor` seems sound. However, the `TaskStateTrigger`'s `run` logic is different and seems to cause some bugs (see linked issue). This change consolidates the logic, such that the `ExternalTaskSensor` will use the same logic in deferrable and non-deferrable modes.
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Consolidate ExternalTaskSensor deferrable and non deferrable logic [airflow]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34464:
URL: https://github.com/apache/airflow/pull/34464#issuecomment-1793894458

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #34464: Consolidate ExternalTaskSensor deferrable and non deferrable logic

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on PR #34464:
URL: https://github.com/apache/airflow/pull/34464#issuecomment-1724790303

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (ruff, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   - Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Consolidate ExternalTaskSensor deferrable and non deferrable logic [airflow]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #34464: Consolidate ExternalTaskSensor deferrable and non deferrable logic
URL: https://github.com/apache/airflow/pull/34464


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yermalov-here commented on a diff in pull request #34464: Consolidate ExternalTaskSensor deferrable and non deferrable logic

Posted by "yermalov-here (via GitHub)" <gi...@apache.org>.
yermalov-here commented on code in PR #34464:
URL: https://github.com/apache/airflow/pull/34464#discussion_r1330208166


##########
airflow/sensors/external_task.py:
##########
@@ -233,100 +232,18 @@ def _get_dttm_filter(self, context):
 
     @provide_session
     def poke(self, context: Context, session: Session = NEW_SESSION) -> bool:
-        # delay check to poke rather than __init__ in case it was supplied as XComArgs
-        if self.external_task_ids and len(self.external_task_ids) > len(set(self.external_task_ids)):
-            raise ValueError("Duplicate task_ids passed in external_task_ids parameter")
-
-        dttm_filter = self._get_dttm_filter(context)
-        serialized_dttm_filter = ",".join(dt.isoformat() for dt in dttm_filter)
-
-        if self.external_task_ids:
-            self.log.info(
-                "Poking for tasks %s in dag %s on %s ... ",
-                self.external_task_ids,
-                self.external_dag_id,
-                serialized_dttm_filter,
-            )
-
-        if self.external_task_group_id:
-            self.log.info(
-                "Poking for task_group '%s' in dag '%s' on %s ... ",
-                self.external_task_group_id,
-                self.external_dag_id,
-                serialized_dttm_filter,
-            )
-
-        if self.external_dag_id and not self.external_task_group_id and not self.external_task_ids:
-            self.log.info(
-                "Poking for DAG '%s' on %s ... ",
-                self.external_dag_id,
-                serialized_dttm_filter,
-            )
-
-        # In poke mode this will check dag existence only once
-        if self.check_existence and not self._has_checked_existence:
-            self._check_for_existence(session=session)
-
-        count_failed = -1
-        if self.failed_states:
-            count_failed = self.get_count(dttm_filter, session, self.failed_states)
-
-        # Fail if anything in the list has failed.
-        if count_failed > 0:
-            if self.external_task_ids:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"Some of the external tasks {self.external_task_ids} "
-                        f"in DAG {self.external_dag_id} failed. Skipping due to soft_fail."
-                    )
-                raise AirflowException(
-                    f"Some of the external tasks {self.external_task_ids} "
-                    f"in DAG {self.external_dag_id} failed."
-                )
-            elif self.external_task_group_id:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"The external task_group '{self.external_task_group_id}' "
-                        f"in DAG '{self.external_dag_id}' failed. Skipping due to soft_fail."
-                    )
-                raise AirflowException(
-                    f"The external task_group '{self.external_task_group_id}' "
-                    f"in DAG '{self.external_dag_id}' failed."
-                )
-
-            else:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"The external DAG {self.external_dag_id} failed. Skipping due to soft_fail."
-                    )
-                raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
-
-        count_skipped = -1
-        if self.skipped_states:
-            count_skipped = self.get_count(dttm_filter, session, self.skipped_states)
-
-        # Skip if anything in the list has skipped. Note if we are checking multiple tasks and one skips
-        # before another errors, we'll skip first.
-        if count_skipped > 0:
-            if self.external_task_ids:
-                raise AirflowSkipException(
-                    f"Some of the external tasks {self.external_task_ids} "
-                    f"in DAG {self.external_dag_id} reached a state in our states-to-skip-on list. Skipping."
-                )
-            elif self.external_task_group_id:
-                raise AirflowSkipException(
-                    f"The external task_group '{self.external_task_group_id}' "
-                    f"in DAG {self.external_dag_id} reached a state in our states-to-skip-on list. Skipping."
-                )
-            else:
-                raise AirflowSkipException(
-                    f"The external DAG {self.external_dag_id} reached a state in our states-to-skip-on list. "
-                    "Skipping."
-                )
-
-        # only go green if every single task has reached an allowed state
-        count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
-        return count_allowed == len(dttm_filter)
+        return TaskStateTrigger.check_external_dag(

Review Comment:
   Does it make sense to create a hook for external tasks, move the shared code there and call the hook from operator and trigger?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] samc1213 commented on pull request #34464: Consolidate ExternalTaskSensor deferrable and non deferrable logic

Posted by "samc1213 (via GitHub)" <gi...@apache.org>.
samc1213 commented on PR #34464:
URL: https://github.com/apache/airflow/pull/34464#issuecomment-1728600158

   > We should not mixin sync and "async" logic it will do more harm than good.
   > 
   > Better what we can do now it wraps methods which intend to use in defer mode by `sync_to_async` decorators, however we can't use this methods in the sync implementations after that.
   > 
   
   Thanks for the feedback @Taragolis .
   
   I agree with you - while implementing this change I was thinking - what is the purpose of the deferrable version here if it's not truly async anyways? The one real benefit of the deferrable version over the `poke` non-deferrable version is that the "sleep" in the `TaskStateTrigger` is using `asyncio` and so would yield back to the triggerer process correctly.
   
   Do you suggest I just close this change? I'm not sure how to proceed. I don't know that copy/pasting and using `sync_to_async` and waiting for truly-async operations is a great solution either, since the logic could diverge between deferrable and non-deferrable in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Lee-W commented on a diff in pull request #34464: Consolidate ExternalTaskSensor deferrable and non deferrable logic

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on code in PR #34464:
URL: https://github.com/apache/airflow/pull/34464#discussion_r1329840556


##########
airflow/sensors/external_task.py:
##########
@@ -20,7 +20,7 @@
 import datetime
 import os
 import warnings
-from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable
+from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, List

Review Comment:
   ```suggestion
   from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable
   ```



##########
airflow/triggers/external_task.py:
##########
@@ -56,32 +64,43 @@ class TaskStateTrigger(BaseTrigger):
 
     def __init__(
         self,
-        dag_id: str,
         execution_dates: list[datetime],
         trigger_start_time: datetime,
-        states: list[str] | None = None,
+        external_task_group_id: str | None,
+        external_task_ids: typing.Collection[str] | None,
+        external_dag_id: str | None,
+        allowed_states: typing.Iterable[str] | None = None,

Review Comment:
   We probably want to use `Iterable[str]` for consistency



##########
airflow/sensors/external_task.py:
##########
@@ -233,100 +232,18 @@ def _get_dttm_filter(self, context):
 
     @provide_session
     def poke(self, context: Context, session: Session = NEW_SESSION) -> bool:
-        # delay check to poke rather than __init__ in case it was supplied as XComArgs
-        if self.external_task_ids and len(self.external_task_ids) > len(set(self.external_task_ids)):
-            raise ValueError("Duplicate task_ids passed in external_task_ids parameter")
-
-        dttm_filter = self._get_dttm_filter(context)
-        serialized_dttm_filter = ",".join(dt.isoformat() for dt in dttm_filter)
-
-        if self.external_task_ids:
-            self.log.info(
-                "Poking for tasks %s in dag %s on %s ... ",
-                self.external_task_ids,
-                self.external_dag_id,
-                serialized_dttm_filter,
-            )
-
-        if self.external_task_group_id:
-            self.log.info(
-                "Poking for task_group '%s' in dag '%s' on %s ... ",
-                self.external_task_group_id,
-                self.external_dag_id,
-                serialized_dttm_filter,
-            )
-
-        if self.external_dag_id and not self.external_task_group_id and not self.external_task_ids:
-            self.log.info(
-                "Poking for DAG '%s' on %s ... ",
-                self.external_dag_id,
-                serialized_dttm_filter,
-            )
-
-        # In poke mode this will check dag existence only once
-        if self.check_existence and not self._has_checked_existence:
-            self._check_for_existence(session=session)
-
-        count_failed = -1
-        if self.failed_states:
-            count_failed = self.get_count(dttm_filter, session, self.failed_states)
-
-        # Fail if anything in the list has failed.
-        if count_failed > 0:
-            if self.external_task_ids:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"Some of the external tasks {self.external_task_ids} "
-                        f"in DAG {self.external_dag_id} failed. Skipping due to soft_fail."
-                    )
-                raise AirflowException(
-                    f"Some of the external tasks {self.external_task_ids} "
-                    f"in DAG {self.external_dag_id} failed."
-                )
-            elif self.external_task_group_id:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"The external task_group '{self.external_task_group_id}' "
-                        f"in DAG '{self.external_dag_id}' failed. Skipping due to soft_fail."
-                    )
-                raise AirflowException(
-                    f"The external task_group '{self.external_task_group_id}' "
-                    f"in DAG '{self.external_dag_id}' failed."
-                )
-
-            else:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"The external DAG {self.external_dag_id} failed. Skipping due to soft_fail."
-                    )
-                raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
-
-        count_skipped = -1
-        if self.skipped_states:
-            count_skipped = self.get_count(dttm_filter, session, self.skipped_states)
-
-        # Skip if anything in the list has skipped. Note if we are checking multiple tasks and one skips
-        # before another errors, we'll skip first.
-        if count_skipped > 0:
-            if self.external_task_ids:
-                raise AirflowSkipException(
-                    f"Some of the external tasks {self.external_task_ids} "
-                    f"in DAG {self.external_dag_id} reached a state in our states-to-skip-on list. Skipping."
-                )
-            elif self.external_task_group_id:
-                raise AirflowSkipException(
-                    f"The external task_group '{self.external_task_group_id}' "
-                    f"in DAG {self.external_dag_id} reached a state in our states-to-skip-on list. Skipping."
-                )
-            else:
-                raise AirflowSkipException(
-                    f"The external DAG {self.external_dag_id} reached a state in our states-to-skip-on list. "
-                    "Skipping."
-                )
-
-        # only go green if every single task has reached an allowed state
-        count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
-        return count_allowed == len(dttm_filter)
+        return TaskStateTrigger.check_external_dag(

Review Comment:
   I feel a bit weird to running code from trigger when running the sensor in non-deferrable mode. Should we extract the logic somewhere else? 



##########
airflow/sensors/external_task.py:
##########
@@ -222,7 +221,7 @@ def __init__(
         self.deferrable = deferrable
         self.poll_interval = poll_interval
 
-    def _get_dttm_filter(self, context):
+    def _get_dttm_filter(self, context) -> List[datetime.datetime]:

Review Comment:
   ```suggestion
       def _get_dttm_filter(self, context) -> list[datetime.datetime]:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org