You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by twinkle <tw...@gmail.com> on 2016/12/22 16:32:28 UTC

Regarding getting the result of another task in your current task

Hi,

I want to get the result of one task t1, inside task t2.

I have set the result of t1, by returning a value, which is set for the key
result_value.

While using the PythonOperator as t2, i am able to get the value of the
result of task t1.
Code for doing this in PythonOperator is:

pull = PythonOperator(

    task_id='puller', dag=main_dag, provide_context=True,
python_callable=puller)


def puller(**kwargs):

    ti = kwargs['ti']

    v1 = ti.xcom_pull(key=None, task_ids=[t1])

    logging.info("v1 :%s" %v1)

How can i do that in any other operator?
I tried, accessing 'ti' from context as well as kwargs, for both of them i
get the following error:

task_instance = context['ti']

    task_instance = context['ti']
TypeError: string indices must be integers


Any pointers?

Regards,
Twinkle

Re: Regarding getting the result of another task in your current task

Posted by twinkle <tw...@gmail.com>.
Hi Max and Jeremiah,

Currently i am querying a database to fetch a date,that date I need to use
in querying another database.

Right now,I am trying to use it during Hive Access.

I read that,there can be SQL which can be used to do it. But I don't know
how to use it. Ideally, I would like to make a utility function which can
return me the return value of a task id,which has run in the same dag
run/context.

Please let me know if I can share some more information/context.

Regards,
Twinkle

On Dec 23, 2016 3:28 AM, "Maxime Beauchemin" <ma...@gmail.com>
wrote:

That's right Jeremiah.

Keep in mind that XComs should only be used for small amount of metadata.

In general, pipelines' structure (similarly to data structures) shouldn't
change based on the data that is coming through them. There are ways to
author data pipelines and workflows so that the DAG and tasks logic are
unaffected by what is coming going on inside the pipeline.

What is the nature of the metadata you are moving around with XComs?

Max

On Thu, Dec 22, 2016 at 8:49 AM, Jeremiah Lowin <jl...@apache.org> wrote:

> Could you provide a little more detail on the type of Operator you want to
> use? It's certainly easiest with PythonOperator. However, the `ti` object
> is also available in jinja templates, so for example you can reference it
> in a BashOperator as (I believe) {{ ti.xcom_pull("t1") }}.
>
> On Thu, Dec 22, 2016 at 11:32 AM twinkle <tw...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I want to get the result of one task t1, inside task t2.
> >
> > I have set the result of t1, by returning a value, which is set for the
> key
> > result_value.
> >
> > While using the PythonOperator as t2, i am able to get the value of the
> > result of task t1.
> > Code for doing this in PythonOperator is:
> >
> > pull = PythonOperator(
> >
> >     task_id='puller', dag=main_dag, provide_context=True,
> > python_callable=puller)
> >
> >
> > def puller(**kwargs):
> >
> >     ti = kwargs['ti']
> >
> >     v1 = ti.xcom_pull(key=None, task_ids=[t1])
> >
> >     logging.info("v1 :%s" %v1)
> >
> > How can i do that in any other operator?
> > I tried, accessing 'ti' from context as well as kwargs, for both of them
> i
> > get the following error:
> >
> > task_instance = context['ti']
> >
> >     task_instance = context['ti']
> > TypeError: string indices must be integers
> >
> >
> > Any pointers?
> >
> > Regards,
> > Twinkle
> >
>

Re: Regarding getting the result of another task in your current task

Posted by Maxime Beauchemin <ma...@gmail.com>.
That's right Jeremiah.

Keep in mind that XComs should only be used for small amount of metadata.

In general, pipelines' structure (similarly to data structures) shouldn't
change based on the data that is coming through them. There are ways to
author data pipelines and workflows so that the DAG and tasks logic are
unaffected by what is coming going on inside the pipeline.

What is the nature of the metadata you are moving around with XComs?

Max

On Thu, Dec 22, 2016 at 8:49 AM, Jeremiah Lowin <jl...@apache.org> wrote:

> Could you provide a little more detail on the type of Operator you want to
> use? It's certainly easiest with PythonOperator. However, the `ti` object
> is also available in jinja templates, so for example you can reference it
> in a BashOperator as (I believe) {{ ti.xcom_pull("t1") }}.
>
> On Thu, Dec 22, 2016 at 11:32 AM twinkle <tw...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I want to get the result of one task t1, inside task t2.
> >
> > I have set the result of t1, by returning a value, which is set for the
> key
> > result_value.
> >
> > While using the PythonOperator as t2, i am able to get the value of the
> > result of task t1.
> > Code for doing this in PythonOperator is:
> >
> > pull = PythonOperator(
> >
> >     task_id='puller', dag=main_dag, provide_context=True,
> > python_callable=puller)
> >
> >
> > def puller(**kwargs):
> >
> >     ti = kwargs['ti']
> >
> >     v1 = ti.xcom_pull(key=None, task_ids=[t1])
> >
> >     logging.info("v1 :%s" %v1)
> >
> > How can i do that in any other operator?
> > I tried, accessing 'ti' from context as well as kwargs, for both of them
> i
> > get the following error:
> >
> > task_instance = context['ti']
> >
> >     task_instance = context['ti']
> > TypeError: string indices must be integers
> >
> >
> > Any pointers?
> >
> > Regards,
> > Twinkle
> >
>

Re: Regarding getting the result of another task in your current task

Posted by Jeremiah Lowin <jl...@apache.org>.
Could you provide a little more detail on the type of Operator you want to
use? It's certainly easiest with PythonOperator. However, the `ti` object
is also available in jinja templates, so for example you can reference it
in a BashOperator as (I believe) {{ ti.xcom_pull("t1") }}.

On Thu, Dec 22, 2016 at 11:32 AM twinkle <tw...@gmail.com> wrote:

> Hi,
>
> I want to get the result of one task t1, inside task t2.
>
> I have set the result of t1, by returning a value, which is set for the key
> result_value.
>
> While using the PythonOperator as t2, i am able to get the value of the
> result of task t1.
> Code for doing this in PythonOperator is:
>
> pull = PythonOperator(
>
>     task_id='puller', dag=main_dag, provide_context=True,
> python_callable=puller)
>
>
> def puller(**kwargs):
>
>     ti = kwargs['ti']
>
>     v1 = ti.xcom_pull(key=None, task_ids=[t1])
>
>     logging.info("v1 :%s" %v1)
>
> How can i do that in any other operator?
> I tried, accessing 'ti' from context as well as kwargs, for both of them i
> get the following error:
>
> task_instance = context['ti']
>
>     task_instance = context['ti']
> TypeError: string indices must be integers
>
>
> Any pointers?
>
> Regards,
> Twinkle
>