You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/07 14:28:55 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #18061: Fix DagRun execution order not being properly followed

ephraimbuddy opened a new pull request #18061:
URL: https://github.com/apache/airflow/pull/18061


   We made a fix that resolved max_active_runs not allowing other dagruns to move to
   running state, see #17945 and introduced a bug that dagruns were not following the
   execution_date order when moving to running state.
   
   This PR fixes it by adding a 'max_active_runs` column in dagmodel. Also an extra test
   not connected with this change was added because I was able to trigger the bug while
   working on this
   
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #18061:
URL: https://github.com/apache/airflow/pull/18061


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18061:
URL: https://github.com/apache/airflow/pull/18061#discussion_r704379714



##########
File path: airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add max_active_runs column to dagmodel table
+
+Revision ID: 092435bf5d12
+Revises: 142555e44c17
+Create Date: 2021-09-06 21:29:24.728923
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = '092435bf5d12'
+down_revision = '7b2661a43ba3'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add max_active_runs column to dagmodel table"""
+    op.add_column('dag', sa.Column('max_active_runs', sa.Integer(), nullable=True))
+    with op.batch_alter_table('dag_run', schema=None) as batch_op:
+        # Add index to dag_run.dag_id and also add index to dag_run.state where state==running
+        batch_op.create_index('idx_dag_run_dag_id', ['dag_id'])
+        batch_op.create_index(
+            'idx_dag_run_running_dags',
+            ["state", "dag_id"],
+            postgres_where=text("state='running'"),
+            mssql_where=text("state='running'"),

Review comment:
       Wow mysql really isn't a good database is it 😉 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18061:
URL: https://github.com/apache/airflow/pull/18061#discussion_r704184367



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2689,6 +2689,101 @@ def test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self,
         )
         assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 11
 
+    def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker):
+        session = settings.Session()
+        with dag_maker('test_dag1', max_active_runs=1) as dag:
+            DummyOperator(task_id='mytask')
+        date = dag.following_schedule(DEFAULT_DATE)
+        for i in range(30):
+            dr = dag_maker.create_dagrun(
+                run_id=f'dagrun_{i}', run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date
+            )
+            date = dr.execution_date + timedelta(hours=1)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=False)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        self.scheduler_job._start_queued_dagruns(session)
+        session.flush()
+        dr = DagRun.find(run_id='dagrun_0')
+        ti = dr[0].get_task_instance(task_id='mytask', session=session)
+        ti.state = State.SUCCESS
+        session.merge(ti)
+        session.commit()
+        assert dr[0].state == State.RUNNING
+        dr[0].state = State.SUCCESS
+        session.merge(dr[0])
+        session.flush()
+        assert dr[0].state == State.SUCCESS
+        self.scheduler_job._start_queued_dagruns(session)
+        session.flush()
+        dr = DagRun.find(run_id='dagrun_1')
+        assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 1
+
+        assert dr[0].state == State.RUNNING
+
+    def test_no_dagruns_would_stuck_in_running(self, dag_maker):
+        # Test that running dagruns are not stuck in running.
+        # Create one dagrun in 'running' state and 1 in 'queued' state from one dag(max_active_runs=1)
+        # Create 16 dagruns in 'running' state and 16 in 'queued' state from another dag
+        # Create 16 dagruns in 'running' state and 16 in 'queued' state from yet another dag
+        # Finish the task of the first dag, and check that another dagrun starts running
+        # from the first dag.
+
+        session = settings.Session()
+        # first dag and dagruns
+        with dag_maker('test_dagrun_states_are_correct_1', max_active_runs=1) as dag:
+            task1 = DummyOperator(task_id='dummy_task')
+        date = DEFAULT_DATE

Review comment:
       Please have this be explicit:
   
   ```python
       date = timezone.datetime(2016, 1, 1)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18061:
URL: https://github.com/apache/airflow/pull/18061#discussion_r703570626



##########
File path: airflow/models/dagrun.py
##########
@@ -207,10 +207,26 @@ def next_dagruns_to_examine(
                 DagModel.is_paused == expression.false(),
                 DagModel.is_active == expression.true(),
             )
-            .order_by(
-                nulls_first(cls.last_scheduling_decision, session=session),
-                cls.execution_date,
+        )
+        if state == State.QUEUED:

Review comment:
       We need a comment saying why we are doing here this behaviour here.

##########
File path: airflow/models/dagrun.py
##########
@@ -207,10 +207,26 @@ def next_dagruns_to_examine(
                 DagModel.is_paused == expression.false(),
                 DagModel.is_active == expression.true(),
             )
-            .order_by(
-                nulls_first(cls.last_scheduling_decision, session=session),
-                cls.execution_date,
+        )
+        if state == State.QUEUED:
+            stmt = (
+                session.query(DagModel, func.count(DagModel.dag_id))
+                .filter(DagModel.is_active == expression.true(), DagModel.is_paused == expression.false())
+                .join(DagRun, DagRun.dag_id == DagModel.dag_id)
+                .filter(DagRun.state == State.RUNNING)
+                .group_by(DagModel.dag_id)
+                .all()
             )
+            dag_ids = []
+            for dag_model, total_runs in stmt:
+                if total_runs >= dag_model.max_active_runs:
+                    dag_ids.append(dag_model.dag_id)
+
+            query = query.filter(~cls.dag_id.in_(dag_ids))

Review comment:
       This will work for small number of DAGs, but for large numbers (1000s or tens of 1000s) this is going to put extra load on the DB by counting the running dag runs ever time, even if we don't care about that dag
   
   ```python
               running_drs = (
                   session.query(DagRun.dag_id, func.count(DagRun.state).label('num_running'))
                   .filter(DagRun.state == DagRunState.RUNNING)
                   .group_by(DagRun.dag_id)
                   .cte('running_dag_runs')
               )
   
               query = query.outerjoin(running_drs, running_drs.c.dag_id == DagRun.dag_id).filter(
                   func.coalesce(running_drs.c.num_running, 0) < DagModel.max_active_runs
               )
   ```

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2689,6 +2689,100 @@ def test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self,
         )
         assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 11
 
+    def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker):
+        session = settings.Session()
+        with dag_maker('test_dag1', max_active_runs=1) as dag:
+            DummyOperator(task_id='mytask')
+        date = dag.following_schedule(DEFAULT_DATE)
+        for i in range(30):
+            dr = dag_maker.create_dagrun(
+                run_id=f'dagrun_{i}', run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date
+            )
+            date = dr.execution_date + timedelta(hours=1)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=False)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        self.scheduler_job._start_queued_dagruns(session)
+        session.flush()
+        dr = DagRun.find(run_id='dagrun_0')
+        ti = dr[0].get_task_instance(task_id='mytask', session=session)
+        ti.state = State.SUCCESS
+        session.merge(ti)
+        session.commit()
+        assert dr[0].state == State.RUNNING
+        dr[0].state = State.SUCCESS
+        session.merge(dr[0])
+        session.flush()
+        assert dr[0].state == State.SUCCESS
+        self.scheduler_job._start_queued_dagruns(session)
+        session.flush()
+        dr = DagRun.find(run_id='dagrun_1')
+        assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 1
+
+        assert dr[0].state == State.RUNNING
+
+    def test_no_dagruns_would_stuck_in_running(self, dag_maker):
+        # Test that running dagruns are not stuck in running.
+        # Create one dagrun in 'running' state and 1 in 'queued' state from one dag(max_active_runs=1)
+        # Create 16 dagruns in 'running' state and 16 in 'queued' state from another dag
+        # Create 16 dagruns in 'running' state and 16 in 'queued' state from yet another dag
+        # Finish the task of the first dag, and check that another dagrun starts running
+        # from the first dag.
+
+        session = settings.Session()
+        with dag_maker('test_dagrun_states_are_correct_1', max_active_runs=1) as dag:
+            task1 = DummyOperator(task_id='dummy_task')
+        dr1_running = dag_maker.create_dagrun(run_id='dr1_run_1')

Review comment:
       What date is this dagrun created for? Since it matters for this text it's better to be explicit it specify an execution_date here.

##########
File path: airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
##########
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add max_active_runs column to dagmodel table
+
+Revision ID: 092435bf5d12
+Revises: 142555e44c17
+Create Date: 2021-09-06 21:29:24.728923
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = '092435bf5d12'
+down_revision = '142555e44c17'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add max_active_runs column to dagmodel table"""
+    op.add_column('dag', sa.Column('max_active_runs', sa.Integer(), nullable=False))

Review comment:
       Fails on a DB with anything in it with this error: 
   
   ```
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.NotNullViolation) column "max_active_runs" of relation "dag" contains null values
   ```
   
   
   So either we need to give it a value (0, what ever teh default is), get the "right" value out of the serialized dag model, or allow nulls.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18061:
URL: https://github.com/apache/airflow/pull/18061#discussion_r704377013



##########
File path: airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add max_active_runs column to dagmodel table
+
+Revision ID: 092435bf5d12
+Revises: 142555e44c17
+Create Date: 2021-09-06 21:29:24.728923
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = '092435bf5d12'
+down_revision = '7b2661a43ba3'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add max_active_runs column to dagmodel table"""
+    op.add_column('dag', sa.Column('max_active_runs', sa.Integer(), nullable=True))
+    with op.batch_alter_table('dag_run', schema=None) as batch_op:
+        # Add index to dag_run.dag_id and also add index to dag_run.state where state==running
+        batch_op.create_index('idx_dag_run_dag_id', ['dag_id'])
+        batch_op.create_index(
+            'idx_dag_run_running_dags',
+            ["state", "dag_id"],
+            postgres_where=text("state='running'"),
+            mssql_where=text("state='running'"),

Review comment:
       let's add `sqlite_where` too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18061:
URL: https://github.com/apache/airflow/pull/18061#discussion_r703685213



##########
File path: airflow/models/dagrun.py
##########
@@ -207,10 +207,26 @@ def next_dagruns_to_examine(
                 DagModel.is_paused == expression.false(),
                 DagModel.is_active == expression.true(),
             )
-            .order_by(
-                nulls_first(cls.last_scheduling_decision, session=session),
-                cls.execution_date,
+        )
+        if state == State.QUEUED:
+            stmt = (
+                session.query(DagModel, func.count(DagModel.dag_id))
+                .filter(DagModel.is_active == expression.true(), DagModel.is_paused == expression.false())
+                .join(DagRun, DagRun.dag_id == DagModel.dag_id)
+                .filter(DagRun.state == State.RUNNING)
+                .group_by(DagModel.dag_id)
+                .all()
             )
+            dag_ids = []
+            for dag_model, total_runs in stmt:
+                if total_runs >= dag_model.max_active_runs:
+                    dag_ids.append(dag_model.dag_id)
+
+            query = query.filter(~cls.dag_id.in_(dag_ids))

Review comment:
       Wow! This worked like a charm! Thank you!!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18061:
URL: https://github.com/apache/airflow/pull/18061#discussion_r704186923



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2689,6 +2689,101 @@ def test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self,
         )
         assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 11
 
+    def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker):
+        session = settings.Session()
+        with dag_maker('test_dag1', max_active_runs=1) as dag:
+            DummyOperator(task_id='mytask')
+        date = dag.following_schedule(DEFAULT_DATE)
+        for i in range(30):
+            dr = dag_maker.create_dagrun(
+                run_id=f'dagrun_{i}', run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date
+            )
+            date = dr.execution_date + timedelta(hours=1)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=False)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        self.scheduler_job._start_queued_dagruns(session)
+        session.flush()
+        dr = DagRun.find(run_id='dagrun_0')
+        ti = dr[0].get_task_instance(task_id='mytask', session=session)
+        ti.state = State.SUCCESS
+        session.merge(ti)
+        session.commit()
+        assert dr[0].state == State.RUNNING
+        dr[0].state = State.SUCCESS
+        session.merge(dr[0])
+        session.flush()
+        assert dr[0].state == State.SUCCESS
+        self.scheduler_job._start_queued_dagruns(session)
+        session.flush()
+        dr = DagRun.find(run_id='dagrun_1')
+        assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 1
+
+        assert dr[0].state == State.RUNNING
+
+    def test_no_dagruns_would_stuck_in_running(self, dag_maker):

Review comment:
       I'm not sure this test fully covers the behaviour we saw/fixed.
   
   I think we should have:
   
   Dag one starting in 2016 with max_active_runs=1 create 30 dag runs (1 running, 29 queued)
   Dag two starting in 2021, with some queued dags created
   
   The key to my mind is to test that the queued dags from dag one would "fill up" the dagruns to examine if we don't exclude dags at max active runs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18061:
URL: https://github.com/apache/airflow/pull/18061#discussion_r704377589



##########
File path: airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add max_active_runs column to dagmodel table
+
+Revision ID: 092435bf5d12
+Revises: 142555e44c17
+Create Date: 2021-09-06 21:29:24.728923
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = '092435bf5d12'
+down_revision = '7b2661a43ba3'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add max_active_runs column to dagmodel table"""
+    op.add_column('dag', sa.Column('max_active_runs', sa.Integer(), nullable=True))
+    with op.batch_alter_table('dag_run', schema=None) as batch_op:
+        # Add index to dag_run.dag_id and also add index to dag_run.state where state==running
+        batch_op.create_index('idx_dag_run_dag_id', ['dag_id'])
+        batch_op.create_index(
+            'idx_dag_run_running_dags',
+            ["state", "dag_id"],
+            postgres_where=text("state='running'"),
+            mssql_where=text("state='running'"),

Review comment:
       https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#partial-indexes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18061:
URL: https://github.com/apache/airflow/pull/18061#discussion_r704194610



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2689,6 +2689,101 @@ def test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self,
         )
         assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 11
 
+    def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker):
+        session = settings.Session()
+        with dag_maker('test_dag1', max_active_runs=1) as dag:
+            DummyOperator(task_id='mytask')
+        date = dag.following_schedule(DEFAULT_DATE)
+        for i in range(30):
+            dr = dag_maker.create_dagrun(
+                run_id=f'dagrun_{i}', run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date
+            )
+            date = dr.execution_date + timedelta(hours=1)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=False)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        self.scheduler_job._start_queued_dagruns(session)
+        session.flush()
+        dr = DagRun.find(run_id='dagrun_0')
+        ti = dr[0].get_task_instance(task_id='mytask', session=session)
+        ti.state = State.SUCCESS
+        session.merge(ti)
+        session.commit()
+        assert dr[0].state == State.RUNNING
+        dr[0].state = State.SUCCESS
+        session.merge(dr[0])
+        session.flush()
+        assert dr[0].state == State.SUCCESS
+        self.scheduler_job._start_queued_dagruns(session)
+        session.flush()
+        dr = DagRun.find(run_id='dagrun_1')
+        assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 1
+
+        assert dr[0].state == State.RUNNING
+
+    def test_no_dagruns_would_stuck_in_running(self, dag_maker):

Review comment:
       We have a similar test already in the previous PR See: https://github.com/apache/airflow/blob/13e7d4adfe1685c5b22a1f0dca314eb03d9d8ddc/tests/jobs/test_scheduler_job.py#L2655
   
   This one is different. I got it while changing some codes and added this test to prevent such bug in the future. 
   In the current main, without this PR, the test passes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18061:
URL: https://github.com/apache/airflow/pull/18061#discussion_r704379634



##########
File path: airflow/models/dagrun.py
##########
@@ -91,6 +103,14 @@ class DagRun(Base, LoggingMixin):
         UniqueConstraint('dag_id', 'execution_date', name='dag_run_dag_id_execution_date_key'),
         UniqueConstraint('dag_id', 'run_id', name='dag_run_dag_id_run_id_key'),
         Index('idx_last_scheduling_decision', last_scheduling_decision),
+        Index('idx_dag_run_dag_id', dag_id),
+        Index(
+            'idx_dag_run_running_dags',
+            'state',
+            'dag_id',
+            postgres_where=text("state='running'"),
+            mssql_where=text("state='running'"),
+        ),

Review comment:
       same as https://github.com/apache/airflow/pull/18061/files#r704377013




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #18061:
URL: https://github.com/apache/airflow/pull/18061


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18061: Fix DagRun execution order from queued to running not being properly followed

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18061:
URL: https://github.com/apache/airflow/pull/18061#discussion_r704181414



##########
File path: airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add max_active_runs column to dagmodel table
+
+Revision ID: 092435bf5d12
+Revises: 142555e44c17
+Create Date: 2021-09-06 21:29:24.728923
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = '092435bf5d12'
+down_revision = '7b2661a43ba3'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add max_active_runs column to dagmodel table"""
+    op.add_column('dag', sa.Column('max_active_runs', sa.Integer(), nullable=True))
+    with op.batch_alter_table('dag_run', schema=None) as batch_op:
+        # Add index to dag_run.dag_id and also add index to dag_run.state where state==running
+        batch_op.create_index('idx_dag_run_dag_id', ['dag_id'])
+        batch_op.create_index(
+            'idx_dag_run_running_dags', ["state", "dag_id"], postgres_where=text("state='running'")

Review comment:
       We should add `mssql_where` here too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org