You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Adam Mustafa (JIRA)" <ji...@apache.org> on 2016/05/05 03:44:12 UTC

[jira] [Created] (AIRFLOW-49) xcom_pull with "task_ids=None" returns items not upstream

Adam Mustafa created AIRFLOW-49:
-----------------------------------

             Summary: xcom_pull with "task_ids=None" returns items not upstream
                 Key: AIRFLOW-49
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-49
             Project: Apache Airflow
          Issue Type: Bug
            Reporter: Adam Mustafa
            Priority: Minor


dag = airflow.DAG(
    "TestingError",
    start_date=datetime(2015, 1, 1),
    schedule_interval="@once",
    default_args=args)

def foo(**kwargs):
    return "Foo Ran"

def bar(**kwargs):
    logging.info(kwargs['ti'].xcom_pull(task_ids=None))
    return "Bar Ran"

def baz(**kwargs):
    return "Baz Ran"

task1 = airflow.operators.PythonOperator(task_id='Foo', dag=dag, provide_context=True, python_callable=foo)

task2 = airflow.operators.PythonOperator(task_id='Bar1', dag=dag, provide_context=True, python_callable=bar)

task3 = airflow.operators.PythonOperator(task_id='Baz', dag=dag, provide_context=True, python_callable=baz)

task4 = airflow.operators.PythonOperator(task_id='Bar2', dag=dag, provide_context=True, python_callable=bar)

task1.set_downstream(task2)
task1.set_downstream(task3)
task3.set_downstream(task4)


Using the SequentialExecuter,the tasks execute in this order:
Foo, Baz, Bar1, Bar2

When you look in the logs, the log for task "Bar1" prints out "Baz Ran" even though that task is not upstream. 

It seems that since there was no "task_ids" assigned when pulling from xcom, it just searched all the returned values, instead searching for those that were just upstream. This means that tasks are affected by items that are not upstream. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)