You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Adam Mustafa (JIRA)" <ji...@apache.org> on 2016/05/05 03:44:12 UTC
[jira] [Created] (AIRFLOW-49) xcom_pull with "task_ids=None"
returns items not upstream
Adam Mustafa created AIRFLOW-49:
-----------------------------------
Summary: xcom_pull with "task_ids=None" returns items not upstream
Key: AIRFLOW-49
URL: https://issues.apache.org/jira/browse/AIRFLOW-49
Project: Apache Airflow
Issue Type: Bug
Reporter: Adam Mustafa
Priority: Minor
dag = airflow.DAG(
"TestingError",
start_date=datetime(2015, 1, 1),
schedule_interval="@once",
default_args=args)
def foo(**kwargs):
return "Foo Ran"
def bar(**kwargs):
logging.info(kwargs['ti'].xcom_pull(task_ids=None))
return "Bar Ran"
def baz(**kwargs):
return "Baz Ran"
task1 = airflow.operators.PythonOperator(task_id='Foo', dag=dag, provide_context=True, python_callable=foo)
task2 = airflow.operators.PythonOperator(task_id='Bar1', dag=dag, provide_context=True, python_callable=bar)
task3 = airflow.operators.PythonOperator(task_id='Baz', dag=dag, provide_context=True, python_callable=baz)
task4 = airflow.operators.PythonOperator(task_id='Bar2', dag=dag, provide_context=True, python_callable=bar)
task1.set_downstream(task2)
task1.set_downstream(task3)
task3.set_downstream(task4)
Using the SequentialExecuter,the tasks execute in this order:
Foo, Baz, Bar1, Bar2
When you look in the logs, the log for task "Bar1" prints out "Baz Ran" even though that task is not upstream.
It seems that since there was no "task_ids" assigned when pulling from xcom, it just searched all the returned values, instead searching for those that were just upstream. This means that tasks are affected by items that are not upstream.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)