You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/26 14:45:59 UTC

[GitHub] [airflow] ashb opened a new pull request #16086: Refactor `dag.clear` method

ashb opened a new pull request #16086:
URL: https://github.com/apache/airflow/pull/16086


   There were a number of "internal" parameters that were to do with
   getting the TIs which should not have been exposed via the public API.
   
   Additionally this method and `get_task_instances` shared a lot of
   similar code (albeit the later was simpler) -- they now both use a
   internal method to do the actual querying.
   
   Extracted out of @andrewgodwin's AIP-40 PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #16086:
URL: https://github.com/apache/airflow/pull/16086


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#issuecomment-849096075


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#issuecomment-867617949


   Congrats! I've got that one on my "finally green' list https://github.com/apache/airflow/pull/15515 - though not entirely green (but it will be green after we merge)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#discussion_r639816922



##########
File path: airflow/models/dag.py
##########
@@ -1050,7 +1109,113 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
                         )
                 else:
                     tis = tis.filter(TaskInstance.state.in_(state))
-        tis = tis.order_by(TaskInstance.execution_date).all()
+
+        # Next, get any of them from our parent DAG (if there is one)
+        if include_parentdag and self.is_subdag and self.parent_dag is not None:
+            p_dag = self.parent_dag.partial_subset(
+                task_ids_or_regex=r"^{}$".format(self.dag_id.split('.')[1]),
+                include_upstream=False,
+                include_downstream=True,
+            )
+            result.update(
+                p_dag.get_task_instances(
+                    task_ids=task_ids,
+                    start_date=start_date,
+                    end_date=end_date,
+                    state=state,
+                    include_subdags=include_subdags,
+                    include_parentdag=False,
+                    as_pk_tuple=True,
+                    session=session,
+                    dag_bag=dag_bag,
+                    recursion_depth=recursion_depth,
+                    max_recursion_depth=max_recursion_depth,
+                    visited_external_tis=visited_external_tis,
+                )
+            )
+
+        if include_subdags:

Review comment:
       ```suggestion
           if include_dependent_dags:
   ```
   
   Don't we want to use the new flag here? Compat is handled in `clear`?

##########
File path: airflow/models/dag.py
##########
@@ -1025,11 +1025,68 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
             start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time()))
 
-        tis = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.execution_date >= start_date,
-            TaskInstance.task_id.in_([t.task_id for t in self.tasks]),
+        tis = self._get_task_instances(
+            task_ids=None,
+            start_date=start_date,
+            end_date=end_date,
+            state=state,
+            include_subdags=False,
+            include_parentdag=False,
+            include_dependent_dags=False,
+            as_pk_tuple=False,
+            session=session,
         )
+        tis = tis.order_by(TaskInstance.execution_date).all()
+        return tis

Review comment:
       nit
   ```suggestion
           return tis.order_by(TaskInstance.execution_date).all()
   ```
   
   Or, `return self._get_task_instances(...).order_by().all()`?

##########
File path: airflow/models/dag.py
##########
@@ -1025,11 +1025,68 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
             start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time()))
 
-        tis = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.execution_date >= start_date,
-            TaskInstance.task_id.in_([t.task_id for t in self.tasks]),
+        tis = self._get_task_instances(
+            task_ids=None,
+            start_date=start_date,
+            end_date=end_date,
+            state=state,
+            include_subdags=False,
+            include_parentdag=False,
+            include_dependent_dags=False,
+            as_pk_tuple=False,
+            session=session,
         )
+        tis = tis.order_by(TaskInstance.execution_date).all()
+        return tis
+
+    def _get_task_instances(
+        self,
+        *,
+        task_ids,
+        start_date,
+        end_date,
+        state,
+        include_subdags,
+        include_parentdag,
+        include_dependent_dags,
+        as_pk_tuple,
+        session,
+        dag_bag=None,
+        recursion_depth=0,
+        max_recursion_depth=None,
+        visited_external_tis=None,
+    ):

Review comment:
       Might be nice to have type annotations here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#issuecomment-862193815


   Right I _think_ I might have finally fixed the last of the mssql issues.
   
   PTAL @andrewgodwin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#discussion_r639945905



##########
File path: airflow/models/dag.py
##########
@@ -1025,11 +1025,70 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
             start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time()))
 
-        tis = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.execution_date >= start_date,
-            TaskInstance.task_id.in_([t.task_id for t in self.tasks]),
+        return (
+            self._get_task_instances(
+                task_ids=None,
+                start_date=start_date,
+                end_date=end_date,
+                state=state,
+                include_subdags=False,
+                include_parentdag=False,
+                include_dependent_dags=False,
+                as_pk_tuple=False,
+                session=session,
+            )
+            .order_by(TaskInstance.execution_date)
+            .all()
         )
+
+    def _get_task_instances(
+        self,
+        *,
+        task_ids,
+        start_date: Optional[datetime],
+        end_date: Optional[datetime],

Review comment:
       Should we set default as `None` for them to make them truly optional?
   
   ```suggestion
           start_date: Optional[datetime],
           end_date: Optional[datetime],
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#discussion_r639942923



##########
File path: airflow/models/dag.py
##########
@@ -1025,11 +1025,70 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
             start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time()))
 
-        tis = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.execution_date >= start_date,
-            TaskInstance.task_id.in_([t.task_id for t in self.tasks]),
+        return (
+            self._get_task_instances(
+                task_ids=None,
+                start_date=start_date,
+                end_date=end_date,
+                state=state,
+                include_subdags=False,
+                include_parentdag=False,
+                include_dependent_dags=False,
+                as_pk_tuple=False,
+                session=session,
+            )
+            .order_by(TaskInstance.execution_date)
+            .all()
         )
+
+    def _get_task_instances(
+        self,
+        *,
+        task_ids,

Review comment:
       We can make `task_ids` optional too here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#discussion_r639824059



##########
File path: airflow/models/dag.py
##########
@@ -1050,7 +1109,113 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
                         )
                 else:
                     tis = tis.filter(TaskInstance.state.in_(state))
-        tis = tis.order_by(TaskInstance.execution_date).all()
+
+        # Next, get any of them from our parent DAG (if there is one)
+        if include_parentdag and self.is_subdag and self.parent_dag is not None:
+            p_dag = self.parent_dag.partial_subset(
+                task_ids_or_regex=r"^{}$".format(self.dag_id.split('.')[1]),
+                include_upstream=False,
+                include_downstream=True,
+            )
+            result.update(
+                p_dag.get_task_instances(
+                    task_ids=task_ids,
+                    start_date=start_date,
+                    end_date=end_date,
+                    state=state,
+                    include_subdags=include_subdags,
+                    include_parentdag=False,
+                    as_pk_tuple=True,
+                    session=session,
+                    dag_bag=dag_bag,
+                    recursion_depth=recursion_depth,
+                    max_recursion_depth=max_recursion_depth,
+                    visited_external_tis=visited_external_tis,
+                )
+            )
+
+        if include_subdags:

Review comment:
       Yes, messed this up in rebasing it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#discussion_r639999414



##########
File path: airflow/models/dag.py
##########
@@ -1025,11 +1025,70 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
             start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time()))
 
-        tis = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.execution_date >= start_date,
-            TaskInstance.task_id.in_([t.task_id for t in self.tasks]),
+        return (
+            self._get_task_instances(
+                task_ids=None,
+                start_date=start_date,
+                end_date=end_date,
+                state=state,
+                include_subdags=False,
+                include_parentdag=False,
+                include_dependent_dags=False,
+                as_pk_tuple=False,
+                session=session,
+            )
+            .order_by(TaskInstance.execution_date)
+            .all()
         )
+
+    def _get_task_instances(
+        self,
+        *,
+        task_ids,

Review comment:
       It's internal, so I went with explicit rather than implicit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#discussion_r640000039



##########
File path: airflow/models/dag.py
##########
@@ -1025,11 +1025,70 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
             start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time()))
 
-        tis = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.execution_date >= start_date,
-            TaskInstance.task_id.in_([t.task_id for t in self.tasks]),
+        return (
+            self._get_task_instances(
+                task_ids=None,
+                start_date=start_date,
+                end_date=end_date,
+                state=state,
+                include_subdags=False,
+                include_parentdag=False,
+                include_dependent_dags=False,
+                as_pk_tuple=False,
+                session=session,
+            )
+            .order_by(TaskInstance.execution_date)
+            .all()
         )
+
+    def _get_task_instances(
+        self,
+        *,
+        task_ids,
+        start_date: Optional[datetime],
+        end_date: Optional[datetime],

Review comment:
       Internal method, so no I'd rather it be passed -- optional just means None is an allowed value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#discussion_r639832501



##########
File path: airflow/models/dag.py
##########
@@ -1025,11 +1025,68 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
             start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time()))
 
-        tis = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.execution_date >= start_date,
-            TaskInstance.task_id.in_([t.task_id for t in self.tasks]),
+        tis = self._get_task_instances(
+            task_ids=None,
+            start_date=start_date,
+            end_date=end_date,
+            state=state,
+            include_subdags=False,
+            include_parentdag=False,
+            include_dependent_dags=False,
+            as_pk_tuple=False,
+            session=session,
         )
+        tis = tis.order_by(TaskInstance.execution_date).all()
+        return tis
+
+    def _get_task_instances(
+        self,
+        *,
+        task_ids,
+        start_date,
+        end_date,
+        state,
+        include_subdags,
+        include_parentdag,
+        include_dependent_dags,
+        as_pk_tuple,
+        session,
+        dag_bag=None,
+        recursion_depth=0,
+        max_recursion_depth=None,
+        visited_external_tis=None,
+    ):

Review comment:
       Done

##########
File path: airflow/models/dag.py
##########
@@ -1025,11 +1025,68 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
             start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time()))
 
-        tis = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.execution_date >= start_date,
-            TaskInstance.task_id.in_([t.task_id for t in self.tasks]),
+        tis = self._get_task_instances(
+            task_ids=None,
+            start_date=start_date,
+            end_date=end_date,
+            state=state,
+            include_subdags=False,
+            include_parentdag=False,
+            include_dependent_dags=False,
+            as_pk_tuple=False,
+            session=session,
         )
+        tis = tis.order_by(TaskInstance.execution_date).all()
+        return tis

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#discussion_r640515973



##########
File path: airflow/models/dag.py
##########
@@ -1025,11 +1026,110 @@ def get_task_instances(self, start_date=None, end_date=None, state=None, session
             start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time()))
 
-        tis = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.execution_date >= start_date,
-            TaskInstance.task_id.in_([t.task_id for t in self.tasks]),
+        return (
+            self._get_task_instances(
+                task_ids=None,
+                start_date=start_date,
+                end_date=end_date,
+                state=state,
+                include_subdags=False,
+                include_parentdag=False,
+                include_dependent_dags=False,
+                as_pk_tuple=False,
+                session=session,
+            )
+            .order_by(TaskInstance.execution_date)
+            .all()
         )
+
+    @overload
+    def _get_task_instances(
+        self,
+        *,
+        task_ids,
+        start_date: Optional[datetime],
+        end_date: Optional[datetime],
+        state: Union[str, List[str]],
+        include_subdags: bool,
+        include_parentdag: bool,
+        include_dependent_dags: bool,
+        as_pk_tuple: Literal[True],
+        session: Session,
+        dag_bag: "DagBag" = None,
+        recursion_depth: int = 0,
+        max_recursion_depth: int = None,
+        visited_external_tis: Set[Tuple[str, str, datetime]] = None,
+    ) -> Set[Tuple[str, str, datetime]]:
+        ...
+
+    @overload
+    def _get_task_instances(
+        self,
+        *,
+        task_ids,
+        start_date: Optional[datetime],
+        end_date: Optional[datetime],
+        state: Union[str, List[str]],
+        include_subdags: bool,
+        include_parentdag: bool,
+        include_dependent_dags: bool,
+        as_pk_tuple: Literal[False],
+        session: Session,
+        dag_bag: "DagBag" = None,
+        recursion_depth: int = 0,
+        max_recursion_depth: int = None,
+        visited_external_tis: Set[Tuple[str, str, datetime]] = None,
+    ) -> Iterable[TaskInstance]:
+        ...

Review comment:
       I added a `typing.overload` here so that mypy etc knows the right type based on the `as_pk_tuple` value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16086: Refactor `dag.clear` method

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16086:
URL: https://github.com/apache/airflow/pull/16086#issuecomment-867611568


   OMG GREEN. FINALLY!!one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org