You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/17 21:58:13 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

ephraimbuddy opened a new pull request #20391:
URL: https://github.com/apache/airflow/pull/20391


   Currently, if a DAG with large tasks reaches a concurrency limit, task instances
   in other dags will get stuck in scheduled state. The reason is that at each
   scheduler loop, task instances of this large DAG are picked up for examination
   despite that the DAG has reached the concurrency limit.
   This makes it almost impossible to run other task instances in other DAGs unless the
   large DAG is stopped.
   
   In this PR, I added a column on the task instance table to know when the scheduler
   examined the task instance. Once a dag has reached any of the concurrency limits, the value
   of this column is updated and when any task instance has finished running,
   the column is nullified so that the scheduler will examine the task instances again.
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r772704327



##########
File path: airflow/models/dagrun.py
##########
@@ -96,14 +94,6 @@ class DagRun(Base, LoggingMixin):
     # When a scheduler last attempted to schedule TIs for this DagRun
     last_scheduling_decision = Column(UtcDateTime)
     dag_hash = Column(String(32))
-    # Foreign key to LogFilename. DagRun rows created prior to this column's
-    # existence have this set to NULL. Later rows automatically populate this on
-    # insert to point to the latest LogFilename entry.
-    log_filename_id = Column(
-        Integer,
-        ForeignKey("log_filename.id", name="task_instance_log_filename_id_fkey", ondelete="NO ACTION"),
-        default=select([func.max(LogFilename.__table__.c.id)]),
-    )

Review comment:
       surprised! not sure how I removed all these




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r778136627



##########
File path: airflow/migrations/versions/3457dcfe0528_add_last_scheduling_decision_column_to_.py
##########
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add last_scheduling_decision column to taskinstance
+
+Revision ID: 3457dcfe0528
+Revises: 587bdf053233
+Create Date: 2021-12-20 21:35:33.133670
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.migrations.db_types import TIMESTAMP
+
+# revision identifiers, used by Alembic.
+revision = '3457dcfe0528'
+down_revision = '587bdf053233'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply add last_scheduling_decision column to taskinstance"""
+    with op.batch_alter_table('task_instance', schema=None) as batch_op:
+        batch_op.add_column(sa.Column('last_scheduling_decision', TIMESTAMP, nullable=True))
+        batch_op.create_index('idx_ti_last_scheduling_decision', ['last_scheduling_decision'], unique=False)

Review comment:
       This index should probably include at least the dag_id as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r771779335



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -581,6 +593,13 @@ def _process_executor_events(self, session: Session = None) -> int:
                 self.log.info("Setting external_id for %s to %s", ti, info)
                 continue
 
+            # update the last_scheduling_decision for all task in this dag
+            session.query(TI).filter(TI.dag_id == ti.dag_id, TI.state == State.SCHEDULED,).update(
+                {
+                    TI.last_scheduling_decision: timezone.utcnow(),

Review comment:
       ```suggestion
               # reset the last_scheduling_decision for all task in this dag
               session.query(TI).filter(TI.dag_id == ti.dag_id, TI.state == State.SCHEDULED).update(
                   {
                       TI.last_scheduling_decision: None,
   ```
   
   Don't we want to set them to null here? How does this interplay with the mini scheduler loop?

##########
File path: airflow/logs/scheduler/latest
##########
@@ -0,0 +1 @@
+./logs/scheduler/2021-12-17

Review comment:
       I assume this was accidental?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r771782173



##########
File path: airflow/logs/scheduler/latest
##########
@@ -0,0 +1 @@
+./logs/scheduler/2021-12-17

Review comment:
       Yeah!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r781479926



##########
File path: airflow/models/baseoperator.py
##########
@@ -666,7 +666,7 @@ def __init__(
             )
         self.weight_rule = weight_rule
         self.resources: Optional[Resources] = Resources(**resources) if resources else None
-        if task_concurrency and not max_active_tis_per_dag:

Review comment:
       Not related but it seems like what we should do and might be ok in this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r781479015



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -581,6 +604,13 @@ def _process_executor_events(self, session: Session = None) -> int:
                 self.log.info("Setting external_id for %s to %s", ti, info)
                 continue
 
+            # reset the last_scheduling_decision for all task in this dag

Review comment:
       ```suggestion
               # reset the last_scheduling_decision for all scheduled task in this dag
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-1013935448


   > I'm kinda torn on the name `last_scheduling_decision` here. By the name, one could assume it'd be populated any time the scheduler considers the TI, not just when `max_active_tasks` is hit. It makes me wonder if we should populate that in more places?
   > 
   > Related, nulling it out after another task finishes also feels odd given the name of the column.
   
   Makes sense, do you have suggestions on the name? Thinking about it too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-1014354599


   I think we need to benchmark this approach and #19747 and see which one performs better under load


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r772925339



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -395,6 +401,15 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
                         dag_id,
                         max_active_tasks_per_dag_limit,
                     )
+                    # reset the last_scheduling_decision for all task in this dag

Review comment:
       ```suggestion
                       # update the last_scheduling_decision for all task instances in this dag
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-999081949


   Are there scenarios where tasks in a DAG would have different `last_scheduling_decision`? I kind of feel it makes more sense to put this on DagRun instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r781478624



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -395,6 +401,15 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
                         dag_id,
                         max_active_tasks_per_dag_limit,
                     )
+                    # update the last_scheduling_decision for all task instances in this dag
+                    session.query(TI).filter(
+                        TI.dag_id == task_instance.dag_id, TI.state == State.SCHEDULED
+                    ).update(
+                        {
+                            TI.last_scheduling_decision: timezone.utcnow(),
+                        },
+                        synchronize_session=False,

Review comment:
       My comment here was wrong, it actually updates only those in scheduled state




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #20391:
URL: https://github.com/apache/airflow/pull/20391


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-1028455505


   Okay, I benchmarked 2.2.3, #19747 on latest main, and #20391 on latest main with:
   
   - 100 DAGs, 10 runs each, 10 task per DAG for 10k total TIs
   - 2 workers (1 CPU core each)
   - 1 scheduler (1 CPU core)
   
   It took each this long to get through them all:
   
   * 2.2.3: 51 minutes
   * 19747: 38 minutes
   * 20391: 38 minutes
   
   #19747 however does spew this into the scheduler logs ~16x per second (quite chatty!):
   
   ```
   [2022-02-02 23:21:32,947] {scheduler_job.py:522} INFO - Setting the following tasks to queued state:
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r785483597



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -289,8 +290,21 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == TaskInstanceState.SCHEDULED)
             .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
         )
+        if session.bind.dialect.name == "postgresql":
+            query = query.order_by(
+                TI.priority_weight.desc(),
+                TI.last_scheduling_decision.desc().nullsfirst(),

Review comment:
       Using desc helps us go from the oldest date to recent. Using asc may make it difficult to reach a dag that first hit the max_active_runs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-997055104


   Will add more tests later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimon222 edited a comment on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
dimon222 edited a comment on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-997284630


   Out of curiosity, is this a fix for newly introduced regression by one of recent commits or its been present in 2.x tree of Airflow since day one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r778131474



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -289,7 +290,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == TaskInstanceState.SCHEDULED)
             .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
+            .order_by(
+                TI.priority_weight.desc(),
+                desc(case([(TI.last_scheduling_decision.is_(None), 1)], else_=0)),
+                TI.last_scheduling_decision.desc(),

Review comment:
       Please use `airflow.utils.sqlalchemy.nulls_first` here if we can.
   
   It might not work out of the box for `DESC` sorted column.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -395,6 +401,15 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
                         dag_id,
                         max_active_tasks_per_dag_limit,
                     )
+                    # update the last_scheduling_decision for all task instances in this dag
+                    session.query(TI).filter(
+                        TI.dag_id == task_instance.dag_id, TI.state == State.SCHEDULED
+                    ).update(
+                        {
+                            TI.last_scheduling_decision: timezone.utcnow(),
+                        },
+                        synchronize_session=False,

Review comment:
       This update is overly broad -- it is going to update _every single task for this DAG_, even completed ones.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r772354021



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -645,6 +645,89 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker):
         session.rollback()
         session.close()
 
+    def test_max_active_task_in_a_dag_with_large_task_does_not_affect_other_dags(self, dag_maker, session):
+        dag_id_1 = 'first_dag'
+        dag_id_2 = 'second_dag'
+        with dag_maker(dag_id_1, max_active_tasks=1):
+            for i in range(10):
+                BashOperator(task_id=f"mytask{i}", bash_command='sleep 1d')
+        dr1 = dag_maker.create_dagrun(run_id=dag_id_1, state=DagRunState.RUNNING)
+        tis1 = dr1.get_task_instances()
+        for ti in tis1:
+            ti.state = TaskInstanceState.SCHEDULED
+            session.merge(ti)
+        session.flush()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        assert 1 == len(res)
+        assert session.query(TaskInstance).filter(TaskInstance.state == TaskInstanceState.QUEUED).count() == 1
+        self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        # assert last_scheduling_decision is updated
+        assert (
+            session.query(TaskInstance)
+            .filter(TaskInstance.state == State.SCHEDULED, TaskInstance.last_scheduling_decision.__ne__(None))

Review comment:
       https://docs.sqlalchemy.org/en/14/core/sqlelement.html#sqlalchemy.sql.expression.ColumnOperators.__ne__
   
   >In a column context, produces the clause a != b. If the target is None, produces a IS NOT NULL.
   
   I am surprised it didn't work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r772360396



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -645,6 +645,89 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker):
         session.rollback()
         session.close()
 
+    def test_max_active_task_in_a_dag_with_large_task_does_not_affect_other_dags(self, dag_maker, session):
+        dag_id_1 = 'first_dag'
+        dag_id_2 = 'second_dag'
+        with dag_maker(dag_id_1, max_active_tasks=1):
+            for i in range(10):
+                BashOperator(task_id=f"mytask{i}", bash_command='sleep 1d')
+        dr1 = dag_maker.create_dagrun(run_id=dag_id_1, state=DagRunState.RUNNING)
+        tis1 = dr1.get_task_instances()
+        for ti in tis1:
+            ti.state = TaskInstanceState.SCHEDULED
+            session.merge(ti)
+        session.flush()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        assert 1 == len(res)
+        assert session.query(TaskInstance).filter(TaskInstance.state == TaskInstanceState.QUEUED).count() == 1
+        self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        # assert last_scheduling_decision is updated
+        assert (
+            session.query(TaskInstance)
+            .filter(TaskInstance.state == State.SCHEDULED, TaskInstance.last_scheduling_decision.__ne__(None))

Review comment:
       Maybe it's not available in SQLAlchemy==1.3.24 which we use




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r772788453



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -645,6 +645,89 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker):
         session.rollback()
         session.close()
 
+    def test_max_active_task_in_a_dag_with_large_task_does_not_affect_other_dags(self, dag_maker, session):
+        dag_id_1 = 'first_dag'
+        dag_id_2 = 'second_dag'
+        with dag_maker(dag_id_1, max_active_tasks=1):
+            for i in range(10):
+                BashOperator(task_id=f"mytask{i}", bash_command='sleep 1d')
+        dr1 = dag_maker.create_dagrun(run_id=dag_id_1, state=DagRunState.RUNNING)
+        tis1 = dr1.get_task_instances()
+        for ti in tis1:
+            ti.state = TaskInstanceState.SCHEDULED
+            session.merge(ti)
+        session.flush()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        assert 1 == len(res)
+        assert session.query(TaskInstance).filter(TaskInstance.state == TaskInstanceState.QUEUED).count() == 1
+        self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        # assert last_scheduling_decision is updated
+        assert (
+            session.query(TaskInstance)
+            .filter(TaskInstance.state == State.SCHEDULED, TaskInstance.last_scheduling_decision.__ne__(None))

Review comment:
       I think `isnot()` works in all 1.x versions.
   
   Edit: I quickly searched and yes we do use `isnot()` in a couple of places. But not `is_not()`, that name is probably introduced only in 1.4.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #20391:
URL: https://github.com/apache/airflow/pull/20391


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r772704327



##########
File path: airflow/models/dagrun.py
##########
@@ -96,14 +94,6 @@ class DagRun(Base, LoggingMixin):
     # When a scheduler last attempted to schedule TIs for this DagRun
     last_scheduling_decision = Column(UtcDateTime)
     dag_hash = Column(String(32))
-    # Foreign key to LogFilename. DagRun rows created prior to this column's
-    # existence have this set to NULL. Later rows automatically populate this on
-    # insert to point to the latest LogFilename entry.
-    log_filename_id = Column(
-        Integer,
-        ForeignKey("log_filename.id", name="task_instance_log_filename_id_fkey", ondelete="NO ACTION"),
-        default=select([func.max(LogFilename.__table__.c.id)]),
-    )

Review comment:
       surprised! not sure how I removed all these




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimon222 commented on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
dimon222 commented on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-997284630


   Out of curiosity, is this a fix for newly introduced regression by one of recent commits or its been present in 2.x tree of Airflow since day one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r778166171



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -289,7 +290,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == TaskInstanceState.SCHEDULED)
             .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
+            .order_by(
+                TI.priority_weight.desc(),
+                desc(case([(TI.last_scheduling_decision.is_(None), 1)], else_=0)),
+                TI.last_scheduling_decision.desc(),

Review comment:
       The nulls_first didn't work for other DBs except postgres. 
   
   I'm preferring this PR https://github.com/apache/airflow/pull/19747 over this. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r781478095



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -289,7 +290,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == TaskInstanceState.SCHEDULED)
             .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
+            .order_by(
+                TI.priority_weight.desc(),
+                desc(case([(TI.last_scheduling_decision.is_(None), 1)], else_=0)),
+                TI.last_scheduling_decision.desc(),

Review comment:
       Used nullsfirst for postgresql. I used it on the column directly instead of using `airflow.utils.sqlalchemy.nulls_first` since we are sure it's postgres




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r773571647



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -289,7 +290,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == TaskInstanceState.SCHEDULED)
             .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
+            .order_by(
+                -TI.priority_weight,

Review comment:
       ```suggestion
                   TI.priority_weight.desc(),
   ```
   
   For consistency? Doesn’t really matter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-999346497


   > Are there scenarios where tasks in a DAG would have different `last_scheduling_decision`? I kind of feel it makes more sense to put this on DagRun instead.
   
   There's `last_scheduling_decision` on DagRun. It helps on solving this kind of case for dagrun creation.
   
   In this particular case, task instances of a DAG can have different values for last_scheduling_decision and some may not have at all. It's only task instances in SCHEDULE state that has their last_scheduling_decision updated when the DAG reaches the concurrency limit.
   
   I have a simple DAGs to reproduce this behaviour and here is it:
   
   1st DAG:
   ```python
   
   from airflow import DAG
   from datetime import datetime
   
   dag = DAG(
       "airflow_bug",
       schedule_interval="0 1 * * *",
       start_date=datetime(2021, 1, 1),
       max_active_runs=1,
       concurrency=1,
   )
   
   for i in range(100):
       @dag.task(task_id=f'mytasrk{i}')
       def sleeping():
           import time
           time.sleep(60)
       sleeping()
   ```
   2nd DAG:
   
   ```python
   from airflow import DAG
   from datetime import datetime
   
   dag = DAG(
       "airflow_bug2",
       schedule_interval="0 1 * * *",
       start_date=datetime(2021, 1, 1),
       max_active_runs=1,
       concurrency=2,
   )
   
   for i in range(100):
       @dag.task(task_id=f'mytasrk{i}')
       def sleeping():
           import time
           time.sleep(60)
       sleeping()
   ```
   
   When any one of these DAG reaches the concurrency limit, the other one will not run. 
   To test this solution, you can duplicate the DAG further with different names and ensure the concurrency in one is not limiting the others. I got into a problem where some DAGs with older dagruns stops being picked. I have resolved it but will appreciate another eyes on that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-1034624723


   Closing this in preference of https://github.com/apache/airflow/pull/19747


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-997670336


   > Out of curiosity, is this a fix for newly introduced regression by one of recent commits or its been present in 2.x tree of Airflow since day one?
   
   It has been present in 2.x. Using pools properly will help solve it to reasonable extent


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r772275134



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -645,6 +645,89 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker):
         session.rollback()
         session.close()
 
+    def test_max_active_task_in_a_dag_with_large_task_does_not_affect_other_dags(self, dag_maker, session):
+        dag_id_1 = 'first_dag'
+        dag_id_2 = 'second_dag'
+        with dag_maker(dag_id_1, max_active_tasks=1):
+            for i in range(10):
+                BashOperator(task_id=f"mytask{i}", bash_command='sleep 1d')
+        dr1 = dag_maker.create_dagrun(run_id=dag_id_1, state=DagRunState.RUNNING)
+        tis1 = dr1.get_task_instances()
+        for ti in tis1:
+            ti.state = TaskInstanceState.SCHEDULED
+            session.merge(ti)
+        session.flush()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        assert 1 == len(res)
+        assert session.query(TaskInstance).filter(TaskInstance.state == TaskInstanceState.QUEUED).count() == 1
+        self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        # assert last_scheduling_decision is updated
+        assert (
+            session.query(TaskInstance)
+            .filter(TaskInstance.state == State.SCHEDULED, TaskInstance.last_scheduling_decision.__ne__(None))

Review comment:
       Had to use `__ne__` because `is_not` was not working(I think cos of our sqlalchemy version) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r772788453



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -645,6 +645,89 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker):
         session.rollback()
         session.close()
 
+    def test_max_active_task_in_a_dag_with_large_task_does_not_affect_other_dags(self, dag_maker, session):
+        dag_id_1 = 'first_dag'
+        dag_id_2 = 'second_dag'
+        with dag_maker(dag_id_1, max_active_tasks=1):
+            for i in range(10):
+                BashOperator(task_id=f"mytask{i}", bash_command='sleep 1d')
+        dr1 = dag_maker.create_dagrun(run_id=dag_id_1, state=DagRunState.RUNNING)
+        tis1 = dr1.get_task_instances()
+        for ti in tis1:
+            ti.state = TaskInstanceState.SCHEDULED
+            session.merge(ti)
+        session.flush()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        assert 1 == len(res)
+        assert session.query(TaskInstance).filter(TaskInstance.state == TaskInstanceState.QUEUED).count() == 1
+        self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        # assert last_scheduling_decision is updated
+        assert (
+            session.query(TaskInstance)
+            .filter(TaskInstance.state == State.SCHEDULED, TaskInstance.last_scheduling_decision.__ne__(None))

Review comment:
       I think `isnot()` works in all 1.x versions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r778136854



##########
File path: airflow/models/baseoperator.py
##########
@@ -666,7 +666,7 @@ def __init__(
             )
         self.weight_rule = weight_rule
         self.resources: Optional[Resources] = Resources(**resources) if resources else None
-        if task_concurrency and not max_active_tis_per_dag:

Review comment:
       This change seems unrelated?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r785818439



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -593,6 +616,13 @@ def _process_executor_events(self, session: Session = None) -> int:
                 self.log.info("Setting external_id for %s to %s", ti, info)
                 continue
 
+            # reset the last_scheduling_decision for all task instances in this dag

Review comment:
       Please change these comments to say why we need to reset it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r785071599



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -289,8 +290,21 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == TaskInstanceState.SCHEDULED)
             .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
         )
+        if session.bind.dialect.name == "postgresql":
+            query = query.order_by(
+                TI.priority_weight.desc(),
+                TI.last_scheduling_decision.desc().nullsfirst(),

Review comment:
       Don't we want to look in asc order with nullsfirst?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r787940394



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -289,8 +290,21 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == TaskInstanceState.SCHEDULED)
             .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
         )
+        if session.bind.dialect.name == "postgresql":
+            query = query.order_by(
+                TI.priority_weight.desc(),
+                TI.last_scheduling_decision.desc().nullsfirst(),

Review comment:
       Yeah, I figured we wanted oldest first, which would be asc, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham edited a comment on pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

Posted by GitBox <gi...@apache.org>.
jedcunningham edited a comment on pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#issuecomment-1028455505


   Okay, I benchmarked 2.2.3, #19747 on latest main, and #20391 on latest main with:
   
   - 100 DAGs, 10 runs each, 10 task per DAG for 10k total TIs
   - 2 workers (1 CPU core each)
   - 1 scheduler (1 CPU core)
   
   It took each this long to get through them all:
   
   * 2.2.3: 51 minutes
   * 19747: 38 minutes
   * 20391: 38 minutes
   
   (I should say, yes, they took the same amount of time, I double checked my data)
   
   #19747 however does spew this into the scheduler logs ~16x per second (quite chatty!):
   
   ```
   [2022-02-02 23:21:32,947] {scheduler_job.py:522} INFO - Setting the following tasks to queued state:
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org