You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "potiuk (via GitHub)" <gi...@apache.org> on 2023/08/01 00:48:13 UTC

[GitHub] [airflow] potiuk opened a new pull request, #32991: Attempt to get logs from failed flaky bakfill test

potiuk opened a new pull request, #32991:
URL: https://github.com/apache/airflow/pull/32991

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1670857715

   > > Any particualr reason this doesn’t use `@retry_db_transaction`?
   > 
   > The only reason is that I added those retries incrementally, trying different things. But yeah. goodl idea, I can actually extract the inner code into a method and decorate it - that would be way cleaner. let me do it.
   
   Looked at - it would make it awfully inconsistent. the Backfill Job is in a VERY BAD need of refactoring. and it already ises the pattern of try/except/loop and if want to introduce the decorator, that would make it look even worse than what we have now (we would have to have another internal method in a 5-level-indented 800 line method.
   
   That would be a bit risky to do it now as we want to cherry-pick that one to 2.7 - and it will be difficult to solve any problems it might cause if we make a mistake of some sort.
   
   I think I will leave it as "post-2.7.0" to refactor the backfill job. I created a task to do so: https://github.com/apache/airflow/issues/33249 and maybe we can engage a few people to collaborate on that, I personally thing backfill job is one of the most neglected parts of Airflow and maybe for 2.8 we could have it back in order, easier to understand and reason about.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #32991:
URL: https://github.com/apache/airflow/pull/32991#discussion_r1287790666


##########
airflow/jobs/backfill_job_runner.py:
##########
@@ -588,61 +588,81 @@ def _per_task_process(key, ti: TaskInstance, session):
             try:
                 for task in self.dag.topological_sort(include_subdag_tasks=True):
                     for key, ti in list(ti_status.to_run.items()):
-                        if task.task_id != ti.task_id:
-                            continue
-
-                        pool = session.scalar(
-                            select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
-                        )
-                        if not pool:
-                            raise PoolNotFound(f"Unknown pool: {task.pool}")
-
-                        open_slots = pool.open_slots(session=session)
-                        if open_slots <= 0:
-                            raise NoAvailablePoolSlot(
-                                f"Not scheduling since there are {open_slots} open slots in pool {task.pool}"
-                            )
-
-                        num_running_task_instances_in_dag = DAG.get_num_task_instances(
-                            self.dag_id,
-                            states=self.STATES_COUNT_AS_RUNNING,
-                            session=session,
-                        )
-
-                        if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
-                            raise DagConcurrencyLimitReached(
-                                "Not scheduling since DAG max_active_tasks limit is reached."
+                        # Attempt to workaround deadlock on backfill by attempting to commit the transaction
+                        # state update few times before giving up
+                        max_attempts = 5
+                        for i in range(max_attempts):
+                            if task.task_id != ti.task_id:
+                                continue
+
+                            pool = session.scalar(
+                                select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
                             )
+                            if not pool:
+                                raise PoolNotFound(f"Unknown pool: {task.pool}")
+
+                            open_slots = pool.open_slots(session=session)
+                            if open_slots <= 0:
+                                raise NoAvailablePoolSlot(
+                                    f"Not scheduling since there are {open_slots} "
+                                    f"open slots in pool {task.pool}"
+                                )
 
-                        if task.max_active_tis_per_dag is not None:
-                            num_running_task_instances_in_task = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                task_ids=[task.task_id],
+                            num_running_task_instances_in_dag = DAG.get_num_task_instances(
+                                self.dag_id,
                                 states=self.STATES_COUNT_AS_RUNNING,
                                 session=session,
                             )
 
-                            if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency limit is reached."
+                            if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
+                                raise DagConcurrencyLimitReached(
+                                    "Not scheduling since DAG max_active_tasks limit is reached."
                                 )
 
-                        if task.max_active_tis_per_dagrun is not None:
-                            num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                run_id=ti.run_id,
-                                task_ids=[task.task_id],
-                                states=self.STATES_COUNT_AS_RUNNING,
-                                session=session,
-                            )
+                            if task.max_active_tis_per_dag is not None:
+                                num_running_task_instances_in_task = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
+                                )
 
-                            if num_running_task_instances_in_task_dagrun >= task.max_active_tis_per_dagrun:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency per DAG run limit is reached."
+                                if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency limit is reached."
+                                    )
+
+                            if task.max_active_tis_per_dagrun is not None:
+                                num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    run_id=ti.run_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
                                 )
 
-                        _per_task_process(key, ti, session)
-                        session.commit()
+                                if (
+                                    num_running_task_instances_in_task_dagrun
+                                    >= task.max_active_tis_per_dagrun
+                                ):
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency per DAG run limit is reached."
+                                    )
+
+                            _per_task_process(key, ti, session)
+                            try:
+                                session.commit()
+                                # break the retry loop
+                                break
+                            except OperationalError:
+                                self.log.error(
+                                    "Failed to commit task state due to operational error. "
+                                    "The job will retry this operation so if your backfill succeeds, "
+                                    "you can safely ignore this message.",
+                                    exc_info=True,
+                                )
+                                session.rollback()
+                                # outer loop will retry

Review Comment:
   Ah yeah
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #32991: Workaround failing dedlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed pull request #32991: Workaround failing dedlock when running backfill
URL: https://github.com/apache/airflow/pull/32991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #32991: Workaround failing dedlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed pull request #32991: Workaround failing dedlock when running backfill
URL: https://github.com/apache/airflow/pull/32991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #32991: Workaround failing dedlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1666785905

   I have not seen a single failure from the backfill tests  after I added the retry loop. There are other flaky tests (Which I am going to take a look as well at) - but IMHO this one looks like ready to go (and included in 2.7.0) even if we have not solved the root cause of the deadlock.
   
   I believe that in this case attempting to do self-healing by retrying is better than failing it outright with nasty Deadlock that we won't be able to help with quickly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #32991: Workaround failing deadlock when running backfill

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1670823232

   Any particualr reason this doesn’t use `@retry_db_transaction`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #32991: Attempt to get logs from failed flaky backfill test

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed pull request #32991: Attempt to get logs from failed flaky backfill test
URL: https://github.com/apache/airflow/pull/32991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #32991: Workaround failing dedlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1666800791

   And once more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1670834612

   > Any particualr reason this doesn’t use `@retry_db_transaction`?
   
   The only reason is that I added those retries incrementally, trying different things. But yeah. goodl idea,  I can actually extract the inner code into a method and decorate it - that would be way cleaner. let me do it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #32991: Workaround failing dedlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1666667269

   OK. More food for thoughts: 
   
   https://github.com/apache/airflow/actions/runs/5773634484/job/15649876570?pr=32991#step:6:10818
   
   The test failed there but with a different error now after the rollback.
   
   So the rollback did not **workaround** it this time but it actually skipped the job.
   
   
   Attempting to simply add addiotional retries for the failed transaction. Not perfect, not beautiful but 🤷 
   
   
   ```
   __________________ TestDaskExecutor.test_backfill_integration __________________
   
   self = <tests.providers.daskexecutor.test_dask_executor.TestDaskExecutor object at 0x7fd56b88afa0>
   
       @pytest.mark.execution_timeout(180)
       def test_backfill_integration(self):
           """
           Test that DaskExecutor can be used to backfill example dags
           """
           dag = self.dagbag.get_dag("example_bash_operator")
       
           job = Job(
               executor=DaskExecutor(cluster_address=self.cluster.scheduler_address),
           )
           job_runner = BackfillJobRunner(
               job=job,
               dag=dag,
               start_date=DEFAULT_DATE,
               end_date=DEFAULT_DATE,
               ignore_first_depends_on_past=True,
           )
   >       run_job(job=job, execute_callable=job_runner._execute)
   
   tests/providers/daskexecutor/test_dask_executor.py:124: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   airflow/utils/session.py:77: in wrapper
       return func(*args, session=session, **kwargs)
   airflow/jobs/job.py:280: in run_job
       return execute_job(job, execute_callable=execute_callable)
   airflow/jobs/job.py:309: in execute_job
       ret = execute_callable()
   airflow/utils/session.py:77: in wrapper
       return func(*args, session=session, **kwargs)
                       "Backfill cannot be created for DagRun %s in %s, as there's already %s in a RUNNING "
                       "state.",
                       run.run_id,
                       run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"),
                       run.run_type,
                   )
               self.log.error(
                   "Changing DagRun into BACKFILL would cause scheduler to lose track of executing "
                   "tasks. Not changing DagRun type into BACKFILL, and trying insert another DagRun into "
                   "database would cause database constraint violation for dag_id + execution_date "
                   "combination. Please adjust backfill dates or wait for this DagRun to finish.",
               )
               return
           # picklin'
           pickle_id = None
       
           executor_class, _ = ExecutorLoader.import_default_executor_cls()
       
           if not self.donot_pickle and executor_class.supports_pickling:
               pickle = DagPickle(self.dag)
               session.add(pickle)
               session.commit()
               pickle_id = pickle.id
       
           executor = self.job.executor
           executor.job_id = self.job.id
           executor.start()
       
           ti_status.total_runs = len(dagrun_infos)  # total dag runs in backfill
       
           try:
               remaining_dates = ti_status.total_runs
               while remaining_dates > 0:
                   dagrun_infos_to_process = [
                       dagrun_info
                       for dagrun_info in dagrun_infos
                       if dagrun_info.logical_date not in ti_status.executed_dag_run_dates
                   ]
                   self._execute_dagruns(
                       dagrun_infos=dagrun_infos_to_process,
                       ti_status=ti_status,
                       executor=executor,
                       pickle_id=pickle_id,
                       start_date=start_date,
                       session=session,
                   )
       
                   remaining_dates = ti_status.total_runs - len(ti_status.executed_dag_run_dates)
                   err = "".join(self._collect_errors(ti_status=ti_status, session=session))
                   if err:
                       if not self.continue_on_failures or ti_status.deadlocked:
   >                       raise BackfillUnfinished(err, ti_status)
   E                       airflow.exceptions.BackfillUnfinished: Some task instances failed:
   E                       DAG ID                 Task ID         Run ID                                 Try number
   E                       ---------------------  --------------  -----------------------------------  ------------
   E                       example_bash_operator  run_this_last   backfill__2017-01-01T00:00:00+00:00             1
   E                       example_bash_operator  this_will_skip  backfill__2017-01-01T00:00:00+00:00             1
   
   airflow/jobs/backfill_job_runner.py:937: BackfillUnfinished
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1666879387

   Happened once: https://github.com/apache/airflow/actions/runs/5776982503/job/15656723055?pr=32991 
   
   3 repeats might be not enough ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #32991:
URL: https://github.com/apache/airflow/pull/32991#discussion_r1287789679


##########
airflow/jobs/backfill_job_runner.py:
##########
@@ -588,61 +588,81 @@ def _per_task_process(key, ti: TaskInstance, session):
             try:
                 for task in self.dag.topological_sort(include_subdag_tasks=True):
                     for key, ti in list(ti_status.to_run.items()):
-                        if task.task_id != ti.task_id:
-                            continue
-
-                        pool = session.scalar(
-                            select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
-                        )
-                        if not pool:
-                            raise PoolNotFound(f"Unknown pool: {task.pool}")
-
-                        open_slots = pool.open_slots(session=session)
-                        if open_slots <= 0:
-                            raise NoAvailablePoolSlot(
-                                f"Not scheduling since there are {open_slots} open slots in pool {task.pool}"
-                            )
-
-                        num_running_task_instances_in_dag = DAG.get_num_task_instances(
-                            self.dag_id,
-                            states=self.STATES_COUNT_AS_RUNNING,
-                            session=session,
-                        )
-
-                        if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
-                            raise DagConcurrencyLimitReached(
-                                "Not scheduling since DAG max_active_tasks limit is reached."
+                        # Attempt to workaround deadlock on backfill by attempting to commit the transaction
+                        # state update few times before giving up
+                        max_attempts = 5
+                        for i in range(max_attempts):
+                            if task.task_id != ti.task_id:
+                                continue
+
+                            pool = session.scalar(
+                                select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
                             )
+                            if not pool:
+                                raise PoolNotFound(f"Unknown pool: {task.pool}")
+
+                            open_slots = pool.open_slots(session=session)
+                            if open_slots <= 0:
+                                raise NoAvailablePoolSlot(
+                                    f"Not scheduling since there are {open_slots} "
+                                    f"open slots in pool {task.pool}"
+                                )
 
-                        if task.max_active_tis_per_dag is not None:
-                            num_running_task_instances_in_task = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                task_ids=[task.task_id],
+                            num_running_task_instances_in_dag = DAG.get_num_task_instances(
+                                self.dag_id,
                                 states=self.STATES_COUNT_AS_RUNNING,
                                 session=session,
                             )
 
-                            if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency limit is reached."
+                            if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
+                                raise DagConcurrencyLimitReached(
+                                    "Not scheduling since DAG max_active_tasks limit is reached."
                                 )
 
-                        if task.max_active_tis_per_dagrun is not None:
-                            num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                run_id=ti.run_id,
-                                task_ids=[task.task_id],
-                                states=self.STATES_COUNT_AS_RUNNING,
-                                session=session,
-                            )
+                            if task.max_active_tis_per_dag is not None:
+                                num_running_task_instances_in_task = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
+                                )
 
-                            if num_running_task_instances_in_task_dagrun >= task.max_active_tis_per_dagrun:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency per DAG run limit is reached."
+                                if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency limit is reached."
+                                    )
+
+                            if task.max_active_tis_per_dagrun is not None:
+                                num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    run_id=ti.run_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
                                 )
 
-                        _per_task_process(key, ti, session)
-                        session.commit()
+                                if (
+                                    num_running_task_instances_in_task_dagrun
+                                    >= task.max_active_tis_per_dagrun
+                                ):
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency per DAG run limit is reached."
+                                    )
+
+                            _per_task_process(key, ti, session)
+                            try:
+                                session.commit()
+                                # break the retry loop
+                                break
+                            except OperationalError:
+                                self.log.error(
+                                    "Failed to commit task state due to operational error. "
+                                    "The job will retry this operation so if your backfill succeeds, "
+                                    "you can safely ignore this message.",
+                                    exc_info=True,
+                                )
+                                session.rollback()
+                                # outer loop will retry

Review Comment:
   > is there a missing check for the current attempt number?
   
   I don't think so . Why ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #32991: Workaround failing deadlock when running backfill

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #32991:
URL: https://github.com/apache/airflow/pull/32991#discussion_r1287773284


##########
airflow/jobs/backfill_job_runner.py:
##########
@@ -588,61 +588,81 @@ def _per_task_process(key, ti: TaskInstance, session):
             try:
                 for task in self.dag.topological_sort(include_subdag_tasks=True):
                     for key, ti in list(ti_status.to_run.items()):
-                        if task.task_id != ti.task_id:
-                            continue
-
-                        pool = session.scalar(
-                            select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
-                        )
-                        if not pool:
-                            raise PoolNotFound(f"Unknown pool: {task.pool}")
-
-                        open_slots = pool.open_slots(session=session)
-                        if open_slots <= 0:
-                            raise NoAvailablePoolSlot(
-                                f"Not scheduling since there are {open_slots} open slots in pool {task.pool}"
-                            )
-
-                        num_running_task_instances_in_dag = DAG.get_num_task_instances(
-                            self.dag_id,
-                            states=self.STATES_COUNT_AS_RUNNING,
-                            session=session,
-                        )
-
-                        if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
-                            raise DagConcurrencyLimitReached(
-                                "Not scheduling since DAG max_active_tasks limit is reached."
+                        # Attempt to workaround deadlock on backfill by attempting to commit the transaction
+                        # state update few times before giving up
+                        max_attempts = 5
+                        for i in range(max_attempts):
+                            if task.task_id != ti.task_id:
+                                continue
+
+                            pool = session.scalar(
+                                select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
                             )
+                            if not pool:
+                                raise PoolNotFound(f"Unknown pool: {task.pool}")
+
+                            open_slots = pool.open_slots(session=session)
+                            if open_slots <= 0:
+                                raise NoAvailablePoolSlot(
+                                    f"Not scheduling since there are {open_slots} "
+                                    f"open slots in pool {task.pool}"
+                                )
 
-                        if task.max_active_tis_per_dag is not None:
-                            num_running_task_instances_in_task = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                task_ids=[task.task_id],
+                            num_running_task_instances_in_dag = DAG.get_num_task_instances(
+                                self.dag_id,
                                 states=self.STATES_COUNT_AS_RUNNING,
                                 session=session,
                             )
 
-                            if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency limit is reached."
+                            if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
+                                raise DagConcurrencyLimitReached(
+                                    "Not scheduling since DAG max_active_tasks limit is reached."
                                 )
 
-                        if task.max_active_tis_per_dagrun is not None:
-                            num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                run_id=ti.run_id,
-                                task_ids=[task.task_id],
-                                states=self.STATES_COUNT_AS_RUNNING,
-                                session=session,
-                            )
+                            if task.max_active_tis_per_dag is not None:
+                                num_running_task_instances_in_task = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
+                                )
 
-                            if num_running_task_instances_in_task_dagrun >= task.max_active_tis_per_dagrun:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency per DAG run limit is reached."
+                                if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency limit is reached."
+                                    )
+
+                            if task.max_active_tis_per_dagrun is not None:
+                                num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    run_id=ti.run_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
                                 )
 
-                        _per_task_process(key, ti, session)
-                        session.commit()
+                                if (
+                                    num_running_task_instances_in_task_dagrun
+                                    >= task.max_active_tis_per_dagrun
+                                ):
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency per DAG run limit is reached."
+                                    )
+
+                            _per_task_process(key, ti, session)
+                            try:
+                                session.commit()
+                                # break the retry loop
+                                break
+                            except OperationalError:
+                                self.log.error(
+                                    "Failed to commit task state due to operational error. "
+                                    "The job will retry this operation so if your backfill succeeds, "
+                                    "you can safely ignore this message.",
+                                    exc_info=True,
+                                )
+                                session.rollback()
+                                # outer loop will retry

Review Comment:
   is there a missing check for the current attempt number?
   ```suggestion
                                   if i = max_attempts - 1:
                                       raise
                                   # outer loop will retry
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #32991:
URL: https://github.com/apache/airflow/pull/32991#discussion_r1287792960


##########
airflow/jobs/backfill_job_runner.py:
##########
@@ -588,61 +588,81 @@ def _per_task_process(key, ti: TaskInstance, session):
             try:
                 for task in self.dag.topological_sort(include_subdag_tasks=True):
                     for key, ti in list(ti_status.to_run.items()):
-                        if task.task_id != ti.task_id:
-                            continue
-
-                        pool = session.scalar(
-                            select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
-                        )
-                        if not pool:
-                            raise PoolNotFound(f"Unknown pool: {task.pool}")
-
-                        open_slots = pool.open_slots(session=session)
-                        if open_slots <= 0:
-                            raise NoAvailablePoolSlot(
-                                f"Not scheduling since there are {open_slots} open slots in pool {task.pool}"
-                            )
-
-                        num_running_task_instances_in_dag = DAG.get_num_task_instances(
-                            self.dag_id,
-                            states=self.STATES_COUNT_AS_RUNNING,
-                            session=session,
-                        )
-
-                        if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
-                            raise DagConcurrencyLimitReached(
-                                "Not scheduling since DAG max_active_tasks limit is reached."
+                        # Attempt to workaround deadlock on backfill by attempting to commit the transaction
+                        # state update few times before giving up
+                        max_attempts = 5
+                        for i in range(max_attempts):
+                            if task.task_id != ti.task_id:
+                                continue
+
+                            pool = session.scalar(
+                                select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
                             )
+                            if not pool:
+                                raise PoolNotFound(f"Unknown pool: {task.pool}")
+
+                            open_slots = pool.open_slots(session=session)
+                            if open_slots <= 0:
+                                raise NoAvailablePoolSlot(
+                                    f"Not scheduling since there are {open_slots} "
+                                    f"open slots in pool {task.pool}"
+                                )
 
-                        if task.max_active_tis_per_dag is not None:
-                            num_running_task_instances_in_task = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                task_ids=[task.task_id],
+                            num_running_task_instances_in_dag = DAG.get_num_task_instances(
+                                self.dag_id,
                                 states=self.STATES_COUNT_AS_RUNNING,
                                 session=session,
                             )
 
-                            if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency limit is reached."
+                            if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
+                                raise DagConcurrencyLimitReached(
+                                    "Not scheduling since DAG max_active_tasks limit is reached."
                                 )
 
-                        if task.max_active_tis_per_dagrun is not None:
-                            num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                run_id=ti.run_id,
-                                task_ids=[task.task_id],
-                                states=self.STATES_COUNT_AS_RUNNING,
-                                session=session,
-                            )
+                            if task.max_active_tis_per_dag is not None:
+                                num_running_task_instances_in_task = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
+                                )
 
-                            if num_running_task_instances_in_task_dagrun >= task.max_active_tis_per_dagrun:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency per DAG run limit is reached."
+                                if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency limit is reached."
+                                    )
+
+                            if task.max_active_tis_per_dagrun is not None:
+                                num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    run_id=ti.run_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
                                 )
 
-                        _per_task_process(key, ti, session)
-                        session.commit()
+                                if (
+                                    num_running_task_instances_in_task_dagrun
+                                    >= task.max_active_tis_per_dagrun
+                                ):
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency per DAG run limit is reached."
+                                    )
+
+                            _per_task_process(key, ti, session)
+                            try:
+                                session.commit()
+                                # break the retry loop
+                                break
+                            except OperationalError:
+                                self.log.error(
+                                    "Failed to commit task state due to operational error. "
+                                    "The job will retry this operation so if your backfill succeeds, "
+                                    "you can safely ignore this message.",
+                                    exc_info=True,
+                                )
+                                session.rollback()
+                                # outer loop will retry

Review Comment:
   I was under the impression initially that when I skip it, backfill will attempt it again with outer loop but it was wrong. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1669169442

   How about we merge that one?
   
   * looks stable (increased max_attempts to 5)
   * unquarantines the failing backfill job
   * has better user experience than "failing with deadllock" when apparently the deadlock is recoverable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #32991: Workaround failing dedlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1666663312

   Hey everyone - I am not very proud of this PR, but I am cautiously optimisting it is **likely** workrarounding the curesed DeadLock that started to show up with test_backfill_job of DaskExecutor.
   
   I **believe** it was a real issue, not a test problem. And I **think** workaround I came up with should also handle similar "production" issues when they happen.
   
   It is not dissimilar to other similar fixes for backfill job causing deadlocks (you can see similar workarounds in other places).
   
   Generally speaking - i was able to reproduce it locally (happens once every 30-50 times or so) so it is definitely not a "side-effect" of another test- I run this one individually with --with-db-init flag and still (with difficulty) I was able to produce deadlock. When I added some debugging it turned out that the deadlock in "commit()" in the outer loop was actually caused by a deadlock exception raised in the inner loop - right after `_per_task_process`  - then the exception was propagated up and then the finally clause in line ~ 960 attempted to commit it again:
   
   ```
           finally:
               session.commit()
               executor.end()
   ```
   
   So we have not seen the original place where the deadlock occured - because we have not logged that OperationalError in the inner loop and the deadlock was reported again when the outer loop tried to run the commit for already deadlocked session.
   
   It looks like the deadlock is cause by the lack of "for update" on Dag Run in the backfill job, while the backfill job runs "for update" on task instance. So if scheduler happens to want to update task_instance and keeps the dag_run "for update", the backfill job tries to update.  
   
   BTW. Possibly this is an inherent problem with the backfill job. Maybe this is expected really and maybe the workaround is the best what we can do ?
   
   I gathered server-side logging where I could see both sides of the deadlock:
   
   ```
   	Process 11417 waits for ShareLock on transaction 70151; blocked by process 11337.
   
           # !!!! This is what backfill job does !!!
   
   	Process 11337: UPDATE dag_run SET last_scheduling_decision=NULL, updated_at='2023-08-05T23:07:39.040007+00:00'::timestamptz WHERE dag_run.id = 1
   
           # !!!!! This is what I presume Scheduler does !!!! 
   
   	Process 11417: UPDATE task_instance SET start_date='2023-08-05T23:07:42.249709+00:00'::timestamptz, end_date='2023-08-05T23:07:42.249709+00:00'::timestamptz, duration=0.0, state='skipped', updated_at='2023-08-05T23:07:42.253235+00:00'::timestamptz WHERE task_instance.dag_id = 'example_bash_operator' AND task_instance.task_id = 'run_this_last' AND task_instance.run_id = 'backfill__2017-01-01T00:00:00+00:00' AND task_instance.map_index =  -1
   ```
   
   
   I have few more examples of server logs where I was able to reproduce it locally https://gist.github.com/potiuk/6bd2d855a01ed3e8877a30cefcec01b3 
   
   I added a bit more logging and I specifically call `rollback()` in the inner loop - without re-raising the error. This **should not** be a problem because backfill job runs in a loop and it will again pick up the task next time. So the most harm that might happen is that the task that caused DeadLock will get slightly delayed.
   
   It was far more often replicable on CI (often 1 or 2 postgres jobs failed with it) so I will now close/reopen that PR few times tomorrow to see if I can make the deadlock happen in CI with this fix.
   
   I'd love to hear more from those who are better experts than me on this one. Maybe collectively we can solve the puzzle better than just via workaround.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #32991: Workaround failing dedlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed pull request #32991: Workaround failing dedlock when running backfill
URL: https://github.com/apache/airflow/pull/32991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #32991: Workaround failing dedlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed pull request #32991: Workaround failing dedlock when running backfill
URL: https://github.com/apache/airflow/pull/32991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #32991: Workaround failing deadlock when running backfill

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #32991:
URL: https://github.com/apache/airflow/pull/32991#discussion_r1287773284


##########
airflow/jobs/backfill_job_runner.py:
##########
@@ -588,61 +588,81 @@ def _per_task_process(key, ti: TaskInstance, session):
             try:
                 for task in self.dag.topological_sort(include_subdag_tasks=True):
                     for key, ti in list(ti_status.to_run.items()):
-                        if task.task_id != ti.task_id:
-                            continue
-
-                        pool = session.scalar(
-                            select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
-                        )
-                        if not pool:
-                            raise PoolNotFound(f"Unknown pool: {task.pool}")
-
-                        open_slots = pool.open_slots(session=session)
-                        if open_slots <= 0:
-                            raise NoAvailablePoolSlot(
-                                f"Not scheduling since there are {open_slots} open slots in pool {task.pool}"
-                            )
-
-                        num_running_task_instances_in_dag = DAG.get_num_task_instances(
-                            self.dag_id,
-                            states=self.STATES_COUNT_AS_RUNNING,
-                            session=session,
-                        )
-
-                        if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
-                            raise DagConcurrencyLimitReached(
-                                "Not scheduling since DAG max_active_tasks limit is reached."
+                        # Attempt to workaround deadlock on backfill by attempting to commit the transaction
+                        # state update few times before giving up
+                        max_attempts = 5
+                        for i in range(max_attempts):
+                            if task.task_id != ti.task_id:
+                                continue
+
+                            pool = session.scalar(
+                                select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
                             )
+                            if not pool:
+                                raise PoolNotFound(f"Unknown pool: {task.pool}")
+
+                            open_slots = pool.open_slots(session=session)
+                            if open_slots <= 0:
+                                raise NoAvailablePoolSlot(
+                                    f"Not scheduling since there are {open_slots} "
+                                    f"open slots in pool {task.pool}"
+                                )
 
-                        if task.max_active_tis_per_dag is not None:
-                            num_running_task_instances_in_task = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                task_ids=[task.task_id],
+                            num_running_task_instances_in_dag = DAG.get_num_task_instances(
+                                self.dag_id,
                                 states=self.STATES_COUNT_AS_RUNNING,
                                 session=session,
                             )
 
-                            if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency limit is reached."
+                            if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
+                                raise DagConcurrencyLimitReached(
+                                    "Not scheduling since DAG max_active_tasks limit is reached."
                                 )
 
-                        if task.max_active_tis_per_dagrun is not None:
-                            num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                run_id=ti.run_id,
-                                task_ids=[task.task_id],
-                                states=self.STATES_COUNT_AS_RUNNING,
-                                session=session,
-                            )
+                            if task.max_active_tis_per_dag is not None:
+                                num_running_task_instances_in_task = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
+                                )
 
-                            if num_running_task_instances_in_task_dagrun >= task.max_active_tis_per_dagrun:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency per DAG run limit is reached."
+                                if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency limit is reached."
+                                    )
+
+                            if task.max_active_tis_per_dagrun is not None:
+                                num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    run_id=ti.run_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
                                 )
 
-                        _per_task_process(key, ti, session)
-                        session.commit()
+                                if (
+                                    num_running_task_instances_in_task_dagrun
+                                    >= task.max_active_tis_per_dagrun
+                                ):
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency per DAG run limit is reached."
+                                    )
+
+                            _per_task_process(key, ti, session)
+                            try:
+                                session.commit()
+                                # break the retry loop
+                                break
+                            except OperationalError:
+                                self.log.error(
+                                    "Failed to commit task state due to operational error. "
+                                    "The job will retry this operation so if your backfill succeeds, "
+                                    "you can safely ignore this message.",
+                                    exc_info=True,
+                                )
+                                session.rollback()
+                                # outer loop will retry

Review Comment:
   is there a missing check for the current attempt number?
   ```suggestion
                                   if i == max_attempts - 1:
                                       raise
                                   # outer loop will retry
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #32991: Workaround failing dedlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1666836731

   So far I run it ~ 7 times with full tests needed, not a single backfill_job deadlock occured.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #32991: Workaround failing dedlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed pull request #32991: Workaround failing dedlock when running backfill
URL: https://github.com/apache/airflow/pull/32991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed pull request #32991: Workaround failing deadlock when running backfill
URL: https://github.com/apache/airflow/pull/32991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk merged PR #32991:
URL: https://github.com/apache/airflow/pull/32991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #32991:
URL: https://github.com/apache/airflow/pull/32991#discussion_r1287794019


##########
airflow/jobs/backfill_job_runner.py:
##########
@@ -588,61 +588,81 @@ def _per_task_process(key, ti: TaskInstance, session):
             try:
                 for task in self.dag.topological_sort(include_subdag_tasks=True):
                     for key, ti in list(ti_status.to_run.items()):
-                        if task.task_id != ti.task_id:
-                            continue
-
-                        pool = session.scalar(
-                            select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
-                        )
-                        if not pool:
-                            raise PoolNotFound(f"Unknown pool: {task.pool}")
-
-                        open_slots = pool.open_slots(session=session)
-                        if open_slots <= 0:
-                            raise NoAvailablePoolSlot(
-                                f"Not scheduling since there are {open_slots} open slots in pool {task.pool}"
-                            )
-
-                        num_running_task_instances_in_dag = DAG.get_num_task_instances(
-                            self.dag_id,
-                            states=self.STATES_COUNT_AS_RUNNING,
-                            session=session,
-                        )
-
-                        if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
-                            raise DagConcurrencyLimitReached(
-                                "Not scheduling since DAG max_active_tasks limit is reached."
+                        # Attempt to workaround deadlock on backfill by attempting to commit the transaction
+                        # state update few times before giving up
+                        max_attempts = 5
+                        for i in range(max_attempts):
+                            if task.task_id != ti.task_id:
+                                continue
+
+                            pool = session.scalar(
+                                select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
                             )
+                            if not pool:
+                                raise PoolNotFound(f"Unknown pool: {task.pool}")
+
+                            open_slots = pool.open_slots(session=session)
+                            if open_slots <= 0:
+                                raise NoAvailablePoolSlot(
+                                    f"Not scheduling since there are {open_slots} "
+                                    f"open slots in pool {task.pool}"
+                                )
 
-                        if task.max_active_tis_per_dag is not None:
-                            num_running_task_instances_in_task = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                task_ids=[task.task_id],
+                            num_running_task_instances_in_dag = DAG.get_num_task_instances(
+                                self.dag_id,
                                 states=self.STATES_COUNT_AS_RUNNING,
                                 session=session,
                             )
 
-                            if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency limit is reached."
+                            if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
+                                raise DagConcurrencyLimitReached(
+                                    "Not scheduling since DAG max_active_tasks limit is reached."
                                 )
 
-                        if task.max_active_tis_per_dagrun is not None:
-                            num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                run_id=ti.run_id,
-                                task_ids=[task.task_id],
-                                states=self.STATES_COUNT_AS_RUNNING,
-                                session=session,
-                            )
+                            if task.max_active_tis_per_dag is not None:
+                                num_running_task_instances_in_task = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
+                                )
 
-                            if num_running_task_instances_in_task_dagrun >= task.max_active_tis_per_dagrun:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency per DAG run limit is reached."
+                                if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency limit is reached."
+                                    )
+
+                            if task.max_active_tis_per_dagrun is not None:
+                                num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    run_id=ti.run_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
                                 )
 
-                        _per_task_process(key, ti, session)
-                        session.commit()
+                                if (
+                                    num_running_task_instances_in_task_dagrun
+                                    >= task.max_active_tis_per_dagrun
+                                ):
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency per DAG run limit is reached."
+                                    )
+
+                            _per_task_process(key, ti, session)
+                            try:
+                                session.commit()
+                                # break the retry loop
+                                break
+                            except OperationalError:
+                                self.log.error(
+                                    "Failed to commit task state due to operational error. "
+                                    "The job will retry this operation so if your backfill succeeds, "
+                                    "you can safely ignore this message.",
+                                    exc_info=True,
+                                )
+                                session.rollback()
+                                # outer loop will retry

Review Comment:
   Fixed - also changed the comment to reflect that it is **just** loop, not the outer one as I thought that will take care of retry initially (that was really wrong assumption).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #32991: Workaround failing deadlock when running backfill

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #32991:
URL: https://github.com/apache/airflow/pull/32991#discussion_r1287791148


##########
airflow/jobs/backfill_job_runner.py:
##########
@@ -588,61 +588,81 @@ def _per_task_process(key, ti: TaskInstance, session):
             try:
                 for task in self.dag.topological_sort(include_subdag_tasks=True):
                     for key, ti in list(ti_status.to_run.items()):
-                        if task.task_id != ti.task_id:
-                            continue
-
-                        pool = session.scalar(
-                            select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
-                        )
-                        if not pool:
-                            raise PoolNotFound(f"Unknown pool: {task.pool}")
-
-                        open_slots = pool.open_slots(session=session)
-                        if open_slots <= 0:
-                            raise NoAvailablePoolSlot(
-                                f"Not scheduling since there are {open_slots} open slots in pool {task.pool}"
-                            )
-
-                        num_running_task_instances_in_dag = DAG.get_num_task_instances(
-                            self.dag_id,
-                            states=self.STATES_COUNT_AS_RUNNING,
-                            session=session,
-                        )
-
-                        if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
-                            raise DagConcurrencyLimitReached(
-                                "Not scheduling since DAG max_active_tasks limit is reached."
+                        # Attempt to workaround deadlock on backfill by attempting to commit the transaction
+                        # state update few times before giving up
+                        max_attempts = 5
+                        for i in range(max_attempts):
+                            if task.task_id != ti.task_id:
+                                continue
+
+                            pool = session.scalar(
+                                select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
                             )
+                            if not pool:
+                                raise PoolNotFound(f"Unknown pool: {task.pool}")
+
+                            open_slots = pool.open_slots(session=session)
+                            if open_slots <= 0:
+                                raise NoAvailablePoolSlot(
+                                    f"Not scheduling since there are {open_slots} "
+                                    f"open slots in pool {task.pool}"
+                                )
 
-                        if task.max_active_tis_per_dag is not None:
-                            num_running_task_instances_in_task = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                task_ids=[task.task_id],
+                            num_running_task_instances_in_dag = DAG.get_num_task_instances(
+                                self.dag_id,
                                 states=self.STATES_COUNT_AS_RUNNING,
                                 session=session,
                             )
 
-                            if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency limit is reached."
+                            if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
+                                raise DagConcurrencyLimitReached(
+                                    "Not scheduling since DAG max_active_tasks limit is reached."
                                 )
 
-                        if task.max_active_tis_per_dagrun is not None:
-                            num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                run_id=ti.run_id,
-                                task_ids=[task.task_id],
-                                states=self.STATES_COUNT_AS_RUNNING,
-                                session=session,
-                            )
+                            if task.max_active_tis_per_dag is not None:
+                                num_running_task_instances_in_task = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
+                                )
 
-                            if num_running_task_instances_in_task_dagrun >= task.max_active_tis_per_dagrun:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency per DAG run limit is reached."
+                                if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency limit is reached."
+                                    )
+
+                            if task.max_active_tis_per_dagrun is not None:
+                                num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    run_id=ti.run_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
                                 )
 
-                        _per_task_process(key, ti, session)
-                        session.commit()
+                                if (
+                                    num_running_task_instances_in_task_dagrun
+                                    >= task.max_active_tis_per_dagrun
+                                ):
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency per DAG run limit is reached."
+                                    )
+
+                            _per_task_process(key, ti, session)
+                            try:
+                                session.commit()
+                                # break the retry loop
+                                break
+                            except OperationalError:
+                                self.log.error(
+                                    "Failed to commit task state due to operational error. "
+                                    "The job will retry this operation so if your backfill succeeds, "
+                                    "you can safely ignore this message.",
+                                    exc_info=True,
+                                )
+                                session.rollback()
+                                # outer loop will retry

Review Comment:
   I thought it's better to just skip the backfill - the error is a bit less scary then but yes, likely better to reraise it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org