You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Samuli Holopainen (JIRA)" <ji...@apache.org> on 2018/05/08 18:45:00 UTC

[jira] [Updated] (AIRFLOW-2438) Checking for concurrency limit causes session to be committed early and potentially results in duplicate task runs

     [ https://issues.apache.org/jira/browse/AIRFLOW-2438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Samuli Holopainen updated AIRFLOW-2438:
---------------------------------------
    Description: 
When the {{DagTISlotsAvailableDep}} dependency is checked, the {{concurrency_reached}} property of the DAG instance is accessed [here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/ti_deps/deps/dag_ti_slots_available_dep.py#L24]. The session from the outer scope is not passed to the property getter, causing the session to be committed in the {{provide_session}} decorator.

As a result of this is the transaction where the task instance is locked for update in the method {{_check_and_change_state_before_execution}} is committed in the second call to {{are_dependencies_met}} [here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/models.py#L1376], i.e. before the state of the task instance has been set to RUNNING. Two simultaneous runs of the {{_check_and_change_state_before_execution}} method can therefore both return {{True}}, allowing the same task to be run more than once concurrently.

I have verified this behavior by stepping through the code with a debugger while trying to run the same task twice at the same time.

Passing the session from the outer scope to the concurrency reached check should probably fix this.

  was:
When the {{DagTISlotsAvailableDep}} dependency is checked, the {{concurrency_reached}} property of the DAG instance is accessed [here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/ti_deps/deps/dag_ti_slots_available_dep.py#L24]. The session from the outer scope is not passed to the property getter, causing the session to be committed in the {{provide_session}} decorator.

As a result of this is the transaction where the task instance is locked for update in the method {{_check_and_change_state_before_execution}} is committed in the second call to {{are_dependencies_met}} [here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/models.py#L1376], i.e. before the state of the task instance has been set to RUNNING. Two simultaneous runs of the {{_check_and_change_state_before_execution}} method can therefore both return {{True}}, causing the same task to be run more than once concurrently.

I have verified this behavior by stepping through the code with a debugger while trying to run the same task twice at the same time.

Passing the session from the outer scope to the concurrency reached check should probably fix this.


> Checking for concurrency limit causes session to be committed early and potentially results in duplicate task runs
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2438
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2438
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.9.0
>            Reporter: Samuli Holopainen
>            Priority: Major
>
> When the {{DagTISlotsAvailableDep}} dependency is checked, the {{concurrency_reached}} property of the DAG instance is accessed [here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/ti_deps/deps/dag_ti_slots_available_dep.py#L24]. The session from the outer scope is not passed to the property getter, causing the session to be committed in the {{provide_session}} decorator.
> As a result of this is the transaction where the task instance is locked for update in the method {{_check_and_change_state_before_execution}} is committed in the second call to {{are_dependencies_met}} [here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/models.py#L1376], i.e. before the state of the task instance has been set to RUNNING. Two simultaneous runs of the {{_check_and_change_state_before_execution}} method can therefore both return {{True}}, allowing the same task to be run more than once concurrently.
> I have verified this behavior by stepping through the code with a debugger while trying to run the same task twice at the same time.
> Passing the session from the outer scope to the concurrency reached check should probably fix this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)