You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Rupesh Bansal (JIRA)" <ji...@apache.org> on 2018/01/02 06:35:00 UTC

[jira] [Updated] (AIRFLOW-1962) Tasks get stuck in running state

     [ https://issues.apache.org/jira/browse/AIRFLOW-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rupesh Bansal updated AIRFLOW-1962:
-----------------------------------
    Affects Version/s: 1.8.1

> Tasks get stuck in running state
> --------------------------------
>
>                 Key: AIRFLOW-1962
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1962
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.8.1
>            Reporter: Rupesh Bansal
>         Attachments: Screen Shot 2018-01-02 at 12.02.28 PM.png
>
>
> Tasks get stuck in running state when `depends_on_past` is true and time taken by a task to complete its run is more than its frequency. Please find the sample DAG, which gets stuck
> {noformat}
> # -*- coding: utf-8 -*-
> #
> # Licensed under the Apache License, Version 2.0 (the "License");
> # you may not use this file except in compliance with the License.
> # You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> #
> import airflow
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.models import DAG
> from datetime import datetime, timedelta
> import time
> args = {
>     'owner': 'airflow',
>     'start_date': airflow.utils.dates.days_ago(2),
>     'depends_on_past': True,
> }
> # BranchPython operator that depends on past
> # and where tasks may run or be skipped on
> # alternating runs
> dag = DAG(dag_id='example_branch_dop_operator_v3',schedule_interval='*/1 * * * *',  default_args=args)
> def should_run(ds, **kwargs):
>     time.sleep(75)
>     print("------------- exec dttm = {} and minute = {}".format(kwargs['execution_date'], kwargs['execution_date'].minute))
>     if kwargs['execution_date'].minute % 2 == 0:
>         return "oper_1"
>     else:
>         return "oper_2"
> cond = BranchPythonOperator(
>     task_id='condition',
>     provide_context=True,
>     python_callable=should_run,
>     dag=dag)
> oper_1 = DummyOperator(
>     task_id='oper_1',
>     dag=dag)
> oper_1.set_upstream(cond)
> oper_2 = DummyOperator(
>     task_id='oper_2',
>     dag=dag)
> oper_2.set_upstream(cond)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)