You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/01 18:25:58 UTC

[GitHub] [airflow] Jorricks opened a new pull request #19353: Introduce DagRun action to change state to queued.

Jorricks opened a new pull request #19353:
URL: https://github.com/apache/airflow/pull/19353


   When a DAG has been running for a bit, it often occurs in our workflow that a task is added.
   Often, people want to run these tasks for historical dates.
   However, when there are more TaskInstances you want to run than the `max_active_runs` DagRuns, you might cross the limit.
   Therefore, I felt we should add the option to mark the DagRun immediately as Queued.
   
   For now, the work around I did was mark the tasks I wanted to run as success first, then clear them. 
   This yields the same result but requires more click, and Airflow is of course all about efficiency :)
   
   Love to hear some feedback on what you think about what I've done with the refactoring. In particulair:
   - Added the typing information to all args/kwargs
   - Instead of providing session with None, we use the asterix (*) to indicate it's a keyword and give it correct typing info.
   - Instead of repeating ourselves, I extracted a function that we can use for Queued and Running DagRun state changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#discussion_r741085782



##########
File path: airflow/www/views.py
##########
@@ -3977,26 +3981,37 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_muldelete(self, items, session=None):
+    def action_muldelete(self, items: List[DagRun]):
         """Multiple delete."""
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @action('set_queued', "Set state to 'queued'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_queued(self, drs: List[DagRun]):
+        """Set state to queued."""
+        return self.set_dag_runs_to_active_state(drs, State.QUEUED)
+
     @action('set_running', "Set state to 'running'", '', single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_set_running(self, drs, session=None):
+    def action_set_running(self, drs: List[DagRun]):

Review comment:
       Because only some of them needed the session.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
Jorricks commented on pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#issuecomment-965709564


   Rewrote the session kwarg to the 'standard' way of defining.
   Let me know if anything is missing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#discussion_r740983925



##########
File path: airflow/www/views.py
##########
@@ -3977,26 +3981,37 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_muldelete(self, items, session=None):
+    def action_muldelete(self, items: List[DagRun]):
         """Multiple delete."""
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @action('set_queued', "Set state to 'queued'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_queued(self, drs: List[DagRun]):
+        """Set state to queued."""
+        return self.set_dag_runs_to_active_state(drs, State.QUEUED)
+
     @action('set_running', "Set state to 'running'", '', single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_set_running(self, drs, session=None):
+    def action_set_running(self, drs: List[DagRun]):

Review comment:
       Why have you removed `provide_sesion` decorator for some but not all




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#discussion_r741085782



##########
File path: airflow/www/views.py
##########
@@ -3977,26 +3981,37 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_muldelete(self, items, session=None):
+    def action_muldelete(self, items: List[DagRun]):
         """Multiple delete."""
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @action('set_queued', "Set state to 'queued'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_queued(self, drs: List[DagRun]):
+        """Set state to queued."""
+        return self.set_dag_runs_to_active_state(drs, State.QUEUED)
+
     @action('set_running', "Set state to 'running'", '', single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_set_running(self, drs, session=None):
+    def action_set_running(self, drs: List[DagRun]):

Review comment:
       Because only some of them needed the session.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#issuecomment-968693454


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#discussion_r740983925



##########
File path: airflow/www/views.py
##########
@@ -3977,26 +3981,37 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_muldelete(self, items, session=None):
+    def action_muldelete(self, items: List[DagRun]):
         """Multiple delete."""
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @action('set_queued', "Set state to 'queued'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_queued(self, drs: List[DagRun]):
+        """Set state to queued."""
+        return self.set_dag_runs_to_active_state(drs, State.QUEUED)
+
     @action('set_running', "Set state to 'running'", '', single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_set_running(self, drs, session=None):
+    def action_set_running(self, drs: List[DagRun]):

Review comment:
       Why have you removed `provide_sesion` decorator for some but not all




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#discussion_r747320869



##########
File path: airflow/www/views.py
##########
@@ -3977,26 +3978,36 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_muldelete(self, items, session=None):
+    def action_muldelete(self, items: List[DagRun]):
         """Multiple delete."""
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @action('set_queued', "Set state to 'queued'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_queued(self, drs: List[DagRun]):
+        """Set state to queued."""
+        return self._set_dag_runs_to_active_state(drs, State.QUEUED)
+
     @action('set_running', "Set state to 'running'", '', single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_set_running(self, drs, session=None):
+    def action_set_running(self, drs: List[DagRun]):
         """Set state to running."""
+        return self._set_dag_runs_to_active_state(drs, State.RUNNING)
+
+    @provide_session
+    def _set_dag_runs_to_active_state(self, drs: List[DagRun], state: str, session=None):
+        """This routine only support Running and Queued state."""

Review comment:
       ```suggestion
           """This routine only supports Running and Queued state."""
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#issuecomment-971875946


   Closed/re-opened to rebuild


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#discussion_r740983925



##########
File path: airflow/www/views.py
##########
@@ -3977,26 +3981,37 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_muldelete(self, items, session=None):
+    def action_muldelete(self, items: List[DagRun]):
         """Multiple delete."""
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @action('set_queued', "Set state to 'queued'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_queued(self, drs: List[DagRun]):
+        """Set state to queued."""
+        return self.set_dag_runs_to_active_state(drs, State.QUEUED)
+
     @action('set_running', "Set state to 'running'", '', single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_set_running(self, drs, session=None):
+    def action_set_running(self, drs: List[DagRun]):

Review comment:
       Why have you removed `provide_sesion` decorator for some but not all




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#discussion_r741085782



##########
File path: airflow/www/views.py
##########
@@ -3977,26 +3981,37 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_muldelete(self, items, session=None):
+    def action_muldelete(self, items: List[DagRun]):
         """Multiple delete."""
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @action('set_queued', "Set state to 'queued'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_queued(self, drs: List[DagRun]):
+        """Set state to queued."""
+        return self.set_dag_runs_to_active_state(drs, State.QUEUED)
+
     @action('set_running', "Set state to 'running'", '', single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_set_running(self, drs, session=None):
+    def action_set_running(self, drs: List[DagRun]):

Review comment:
       Because only some of them needed the session.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#discussion_r747255613



##########
File path: airflow/www/views.py
##########
@@ -3977,26 +3978,37 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_muldelete(self, items, session=None):
+    def action_muldelete(self, items: List[DagRun]):
         """Multiple delete."""
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @action('set_queued', "Set state to 'queued'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_queued(self, drs: List[DagRun]):
+        """Set state to queued."""
+        return self.set_dag_runs_to_active_state(drs, State.QUEUED)
+
     @action('set_running', "Set state to 'running'", '', single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_set_running(self, drs, session=None):
+    def action_set_running(self, drs: List[DagRun]):
         """Set state to running."""
+        return self.set_dag_runs_to_active_state(drs, State.RUNNING)
+
+    @provide_session
+    def set_dag_runs_to_active_state(self, drs: List[DagRun], state: str, session=None):
+        if state not in [State.RUNNING, State.QUEUED]:
+            raise ValueError("This routine only supports Running and Queued.")

Review comment:
       I feel we should
   
   1. Make this a private name (with an underscore prefix)
   2. Instead of checking explicitly, just add a docstring saying this currently is only designed to work with `RUNNING` and `QUEUED`.
   
   This funtion is not supposed to be exposed to the user, so we should make it more flexible for contributors to work on this function, instead of guarding people “on the same team”.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #19353:
URL: https://github.com/apache/airflow/pull/19353


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #19353:
URL: https://github.com/apache/airflow/pull/19353


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19353: Introduce DagRun action to change state to queued.

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19353:
URL: https://github.com/apache/airflow/pull/19353#discussion_r747256631



##########
File path: airflow/www/views.py
##########
@@ -3977,26 +3978,37 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_muldelete(self, items, session=None):
+    def action_muldelete(self, items: List[DagRun]):
         """Multiple delete."""
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @action('set_queued', "Set state to 'queued'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_queued(self, drs: List[DagRun]):
+        """Set state to queued."""
+        return self.set_dag_runs_to_active_state(drs, State.QUEUED)
+
     @action('set_running', "Set state to 'running'", '', single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_set_running(self, drs, session=None):
+    def action_set_running(self, drs: List[DagRun]):
         """Set state to running."""
+        return self.set_dag_runs_to_active_state(drs, State.RUNNING)
+
+    @provide_session
+    def set_dag_runs_to_active_state(self, drs: List[DagRun], state: str, session=None):
+        if state not in [State.RUNNING, State.QUEUED]:
+            raise ValueError("This routine only supports Running and Queued.")
         try:
             count = 0
             for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])).all():

Review comment:
       Also why do we need to re-select the DagRun instances, if the input is already a list of DagRun instances? Is this due to some weird SQLAlchemy or Flask-Appbuilder quirk?

##########
File path: airflow/www/views.py
##########
@@ -3977,26 +3978,37 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_muldelete(self, items, session=None):
+    def action_muldelete(self, items: List[DagRun]):
         """Multiple delete."""
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @action('set_queued', "Set state to 'queued'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_queued(self, drs: List[DagRun]):
+        """Set state to queued."""
+        return self.set_dag_runs_to_active_state(drs, State.QUEUED)
+
     @action('set_running', "Set state to 'running'", '', single=False)
     @action_has_dag_edit_access
-    @provide_session
-    def action_set_running(self, drs, session=None):
+    def action_set_running(self, drs: List[DagRun]):
         """Set state to running."""
+        return self.set_dag_runs_to_active_state(drs, State.RUNNING)
+
+    @provide_session
+    def set_dag_runs_to_active_state(self, drs: List[DagRun], state: str, session=None):
+        if state not in [State.RUNNING, State.QUEUED]:
+            raise ValueError("This routine only supports Running and Queued.")
         try:
             count = 0
             for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])).all():

Review comment:
       ```suggestion
               for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])):
   ```
   
   Not related but let’s remove this unnecessary `all()` call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org