You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/23 19:30:00 UTC

[GitHub] [airflow] turbaszek opened a new pull request #12574: Improve code quality of ExternalTaskSensor

turbaszek opened a new pull request #12574:
URL: https://github.com/apache/airflow/pull/12574


   This PR adds type hints and refactors slightly the code of ExternalTaskSensor and ExternalTaskMarker
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12574:
URL: https://github.com/apache/airflow/pull/12574#discussion_r528953959



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -203,35 +204,21 @@ def get_count(self, dttm_filter, session, states):
         :type states: list
         :return: count of record against the filters
         """
-        TI = TaskInstance
-        DR = DagRun
+        qry = session.query(func.count()).filter(
+            DagRun.dag_id == self.external_dag_id,
+            DagRun.state.in_(states),  # pylint: disable=no-member
+            DagRun.execution_date.in_(dttm_filter),
+        )
 
         if self.external_task_id:
-            # .count() is inefficient
-            count = (
-                session.query(func.count())
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
-                .scalar()
+            qry = qry.filter(

Review comment:
       I may have missed or mis-understood something: the original code is filtering against `TaskInstance` and `DagRun` separately in different conditions. But this new version is filtering against `DagRun` or `TaskInstance` PLUS `DagRun`.
   
   But I'm simply comparing the lines, so as I said above, possibly I misunderstood something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12574:
URL: https://github.com/apache/airflow/pull/12574#discussion_r528965373



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -203,35 +204,21 @@ def get_count(self, dttm_filter, session, states):
         :type states: list
         :return: count of record against the filters
         """
-        TI = TaskInstance
-        DR = DagRun
+        qry = session.query(func.count()).filter(
+            DagRun.dag_id == self.external_dag_id,
+            DagRun.state.in_(states),  # pylint: disable=no-member
+            DagRun.execution_date.in_(dttm_filter),
+        )
 
         if self.external_task_id:
-            # .count() is inefficient
-            count = (
-                session.query(func.count())
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
-                .scalar()
+            qry = qry.filter(

Review comment:
       No worries at all ;-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #12574:
URL: https://github.com/apache/airflow/pull/12574#discussion_r528990071



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -149,38 +148,22 @@ def poke(self, context, session=None):
             dttm = context['execution_date']
 
         dttm_filter = dttm if isinstance(dttm, list) else [dttm]

Review comment:
       Created https://github.com/apache/airflow/issues/12577




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #12574:
URL: https://github.com/apache/airflow/pull/12574#discussion_r528976792



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -149,38 +148,22 @@ def poke(self, context, session=None):
             dttm = context['execution_date']
 
         dttm_filter = dttm if isinstance(dttm, list) else [dttm]

Review comment:
       Create a back-log ticket maybe to improve the design further or a flag to have one or the other option




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #12574:
URL: https://github.com/apache/airflow/pull/12574#discussion_r528963335



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -203,35 +204,21 @@ def get_count(self, dttm_filter, session, states):
         :type states: list
         :return: count of record against the filters
         """
-        TI = TaskInstance
-        DR = DagRun
+        qry = session.query(func.count()).filter(
+            DagRun.dag_id == self.external_dag_id,
+            DagRun.state.in_(states),  # pylint: disable=no-member
+            DagRun.execution_date.in_(dttm_filter),
+        )
 
         if self.external_task_id:
-            # .count() is inefficient
-            count = (
-                session.query(func.count())
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
-                .scalar()
+            qry = qry.filter(

Review comment:
       Yes, my bad - I should read the code more thoughtfully! 🤯 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #12574:
URL: https://github.com/apache/airflow/pull/12574#discussion_r528948415



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -203,35 +204,21 @@ def get_count(self, dttm_filter, session, states):
         :type states: list
         :return: count of record against the filters
         """
-        TI = TaskInstance
-        DR = DagRun
+        qry = session.query(func.count()).filter(
+            DagRun.dag_id == self.external_dag_id,
+            DagRun.state.in_(states),  # pylint: disable=no-member
+            DagRun.execution_date.in_(dttm_filter),
+        )
 
         if self.external_task_id:
-            # .count() is inefficient
-            count = (
-                session.query(func.count())
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
-                .scalar()
+            qry = qry.filter(
+                TaskInstance.task_id == self.external_task_id,
             )
-        else:
-            # .count() is inefficient
-            count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
-                .scalar()
-            )
-        return count
 
-    def _handle_execution_date_fn(self, context):
+        # .count() is inefficient

Review comment:
       Misplaced comment, I think this needs to be above L207




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek merged pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
turbaszek merged pull request #12574:
URL: https://github.com/apache/airflow/pull/12574


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #12574:
URL: https://github.com/apache/airflow/pull/12574#discussion_r528972045



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -203,35 +204,21 @@ def get_count(self, dttm_filter, session, states):
         :type states: list
         :return: count of record against the filters
         """
-        TI = TaskInstance
-        DR = DagRun
+        qry = session.query(func.count()).filter(
+            DagRun.dag_id == self.external_dag_id,
+            DagRun.state.in_(states),  # pylint: disable=no-member
+            DagRun.execution_date.in_(dttm_filter),
+        )
 
         if self.external_task_id:
-            # .count() is inefficient
-            count = (
-                session.query(func.count())
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
-                .scalar()
+            qry = qry.filter(

Review comment:
       eeks -- Good catch @XD-DENG . 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #12574:
URL: https://github.com/apache/airflow/pull/12574#discussion_r528967901



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -149,38 +148,22 @@ def poke(self, context, session=None):
             dttm = context['execution_date']
 
         dttm_filter = dttm if isinstance(dttm, list) else [dttm]

Review comment:
       Btw. this is something that bothers me. We "believe" in operator atomicity but this functionality allows users to provide a function that returns a list of dates. If list is provided then we query not for single DAG/task but for multiple ones.
   
   But users will get 1/0 response - task/dag failed. there will be not information how many instances failed or for which dates. I think that the proper Airflow approach would be to have multiple sensors each checking single exec date.
   
   @kaxil @XD-DENG WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12574:
URL: https://github.com/apache/airflow/pull/12574#discussion_r528974178



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -149,38 +148,22 @@ def poke(self, context, session=None):
             dttm = context['execution_date']
 
         dttm_filter = dttm if isinstance(dttm, list) else [dttm]

Review comment:
       The use case I can think of to justify this setting-up/design is that a user is checking & depending a series of external DAG/Task.  For example, I want to depend on DAG A, and what I want to ensure is DAG A has succeeded in the past a few consecutive days (or maybe more complex case, not consecutive). Using Multiple Sensors may multiply the resource usage and may make the DAG authoring more complex in this scenario?
   
   From another perspective, if someone prefers to have multiple sensors each checking single exec date, the current design can meet his/her requirement as well.
   
   So personally this doesn't bother me (but as you mentioned, not giving information about how many TI failed or failed for which dates is not nice enough)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12574: Improve code quality of ExternalTaskSensor

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12574:
URL: https://github.com/apache/airflow/pull/12574#issuecomment-732383655


   The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org