You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/09 22:38:28 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #16358: Only create dagruns when max_active_runs is not reached

ephraimbuddy opened a new pull request #16358:
URL: https://github.com/apache/airflow/pull/16358


   Currently, we create dagruns unconditionaly when a DAG is unpaused or
   when a dagrun is manually triggered. Because of this, when we set max_active_runs=1
   and trigger a dagrun manually, the scheduled run will start at its time without respecting 
   the max_active_runs
       
   This change fixes it and only create dagruns when the conditions are right.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimon222 commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
dimon222 commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-858831825


   Does this PR literally restrict from queuing new dagruns of there's already running one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimon222 commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
dimon222 commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-859192608


   > > Does this PR literally restrict from staggering new dagruns of there's already running one?
   > 
   > I don't think I understand you correctly but if you mean creating more dagruns when the max_active_runs have been reached, then yes. You can't create more runs if the max_active_runs has been reached
   
   I would typically expect that making more dagruns would imply their sequential execution and queuing one after another. In this case, it would straight not allow it to queue them. Removing feature that people looking for doesn't sound like a solution for initial problem we have here. Afterall, why would people want to ever trigger day multiple times, if they want only one be ran at a time. The only consequence if this gets merged would mean more people turning to doing poll as workaround (spam trigger until it actually is successful)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#discussion_r649077543



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1593,7 +1592,9 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
             # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
+            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns and len(
+                active_dagruns
+            ) < dag.max_active_runs:

Review comment:
       Two things:
   
   1) This variable is badly named, and the "active" prefix is incorrect -- it turns out it is just _all_ dagruns, not just active/running ones
   2) len(active_dagruns) includes all dags, not just the current `dag.dag_id`
   
   so this check needs changing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16358:
URL: https://github.com/apache/airflow/pull/16358


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] fj-sanchez edited a comment on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
fj-sanchez edited a comment on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-859459745






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-859110569


   > Does this PR literally restrict from staggering new dagruns of there's already running one?
   
   I don't think I understand you correctly but if you mean creating more dagruns when the max_active_runs have been reached, then yes. You can't create more runs if the max_active_runs has been reached


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-858887591


   I haven't looked into the details. But based on the idea I get from the PR subject, this may affect the DAGs whose 'catchup' is False


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimon222 edited a comment on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
dimon222 edited a comment on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-858831825


   Does this PR literally restrict from staggering new dagruns of there's already running one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-858533746


   Tackling https://github.com/apache/airflow/issues/16366 _may_ be less work, not sure, but it's certainly a better fix long term


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#discussion_r649047629



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -3864,6 +3864,102 @@ def test_scheduler_create_dag_runs_check_existing_run(self):
         assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1)
         session.rollback()
 
+    def test_scheduler_loop_dont_create_dagruns_when_max_active_runs_is_reached(self):
+        """
+        Test that if max_active_runs is reached, scheduler loop do not create extra dagruns.
+
+        With max_active_runs=1, scheduler loop won't create extra dagrun if there's a running
+        dagrun
+        """
+        dag = DAG(
+            dag_id='test_scheduler_max_active_runs_1',
+            start_date=DEFAULT_DATE,
+            schedule_interval=timedelta(seconds=1),
+            max_active_runs=1,
+        )
+
+        BashOperator(
+            task_id='dummy',
+            bash_command="sleep 10",
+            dag=dag,
+        )
+        dag2 = DAG(
+            dag_id='test_scheduler_max_active_runs_2',
+            start_date=DEFAULT_DATE,
+            schedule_interval=timedelta(seconds=1),
+            max_active_runs=2,
+        )
+
+        BashOperator(
+            task_id='dummy',
+            bash_command="sleep 10",
+            dag=dag2,
+        )
+
+        session = settings.Session()
+        assert dag.get_last_dagrun(session) is None
+
+        dagbag = DagBag(
+            dag_folder=os.devnull,
+            include_examples=False,
+            read_dags_from_db=False,
+        )
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.bag_dag(dag=dag2, root_dag=dag2)
+
+        # Create DagModel
+        DAG.bulk_write_to_db(dagbag.dags.values())
+        dag_model = DagModel.get_dagmodel(dag.dag_id)
+
+        # Assert dag_model.next_dagrun is set correctly
+        assert dag_model.next_dagrun == DEFAULT_DATE
+
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+        dag2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag2))
+
+        dagrun = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=dag_model.next_dagrun,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            external_trigger=False,
+            session=session,
+            creating_job_id=2,
+        )
+        dagrun2 = dag2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=dag_model.next_dagrun,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            external_trigger=False,
+            session=session,
+            creating_job_id=2,
+        )
+        session.flush()
+
+        assert dag.get_last_dagrun(session) == dagrun
+        assert dag2.get_last_dagrun(session) == dagrun2
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the update_dagrun_state_for_paused_dag_interval instead

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-859291874






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-859116510


   > I haven't looked into the details. But based on the idea I get from the PR subject, this may affect the DAGs whose 'catchup' is False
   
   It works fine in both I _think_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-858488505


   Yeah, this generates a `SELECT COUNT(dag_id) FROM dag_run` basically, and since we have an index on `(dag_id, state)` the query should be pretty fast.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-858445260


   > Need to look at other ways of achieving this, as not batching the query will be a big performance hit
   
   I’m thinking that since the query is just a count it’s fine?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimon222 edited a comment on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
dimon222 edited a comment on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-859192608






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-858152724


   ![max_active_runs](https://user-images.githubusercontent.com/4122866/121439365-b8980e80-c97d-11eb-96cc-57fc621cb701.PNG)
   max_active_runs=1, can't trigger manually


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-858794233


   > Tackling #16366 _may_ be less work, not sure, but it's certainly a better fix long term
   
   I have updated this PR and will start working on #16366. From my thinking around the queued state, this change may still be valid. Basing my PR on this for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-858448181


   > > Need to look at other ways of achieving this, as not batching the query will be a big performance hit
   > 
   > I’m thinking that since the query is just a count it’s fine?
   
   I counted distinct dag_ids and returned a scalar instead of object


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#discussion_r649364776



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1593,7 +1592,9 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
             # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
+            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns and len(
+                active_dagruns
+            ) < dag.max_active_runs:

Review comment:
       I have corrected it. Surprised why it was working previously. Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] fj-sanchez commented on pull request #16358: Only create dagruns when max_active_runs is not reached

Posted by GitBox <gi...@apache.org>.
fj-sanchez commented on pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#issuecomment-859459745


   > > > > Does this PR literally restrict from staggering new dagruns of there's already running one?
   > > > 
   > > > 
   > > > I don't think I understand you correctly but if you mean creating more dagruns when the max_active_runs have been reached, then yes. You can't create more runs if the max_active_runs has been reached
   > > 
   > > 
   > > I would typically expect that making more dagruns would imply their sequential execution and queuing one after another. In this case, it would straight not allow it to queue them. Removing feature that people looking for doesn't sound like a solution for initial problem we have here. Afterall, why would people want to ever trigger dag multiple times, if they want only one dagrun be present at a time... Because they want to add another run to queue! The only consequence if this gets merged would be more people turning to doing poll as workaround (spam trigger until it actually is successful)
   > 
   > I don't think you understand the change. You can increase the number of dagruns. It's controlled by the max_active_runs in a dag. The default is 16 and you can increase it as well.
   
   I agree with @dimon222, this isn't addressing the problem that we have. I have an external system triggering triggering dag_runs (parameterised using different `conf`), I would expect that Airflow, as an orchestrator, would enqueue requests when it's already at `max_active_runs` capacity, not to refuse any further external request to run a dag.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org