You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Mishika Singh <mi...@gmail.com> on 2018/09/06 06:12:34 UTC

Use xcom in task retry

I am pushing some [key,value] using xcom in an operator, which I want to
use when that task fails and comes for retry, for which I am using
xcom_pull in that operator. But it is returning None instead of that value.
Any pointer around this will be helpful.


-- 
Regards
Mishika Singh

Re: Use xcom in task retry

Posted by Emmanuel Brard <em...@getyourguide.com>.
Hello Mishika,

I think we had the same use case in which we wanted an operator to use the
xcom values pushed from a previous try (after a retry) and also found out
that it is cleared before an execution. What we did is to extend Airflow
data model to hold the data we wanted to persist between retries in another
table. That is a bit hacky but it's been working fine so far.

Bye,
E


On Mon, Sep 10, 2018 at 9:05 AM Mishika Singh <mi...@gmail.com> wrote:

> Hi Taylor,
>
> I am providing the code to show how I am using xcom pull and push, yes the
> push is succeeding to the database.
> But, the behaviour that I observed is when execute() is called for retry,
> xcom values are deleted from the table for combination of <dag_id, task_id,
> execution_date>
> Then I searched for this in the Airflow code and I found that
> clear_xcom_data() is called before the execute() starts.(providing this
> code as well). We are using Airflow 1.9
>
> def execute(self, context, **kwargs):
>
>     pull_response = ti.xcom_pull(key='computeResponse', task_ids=None)
>     logging.info("pull pull_response %s", pull_response)
>
>     ti.xcom_push('computeResponse', self.compute_response)
>
>
> ################
>
> def clear_xcom_data(self, session=None):
>     """
>     Clears all XCom data from the database for the task instance
>     """
>     self.log.info('------------------->>>>> c')
>     session.query(XCom).filter(
>         XCom.dag_id == self.dag_id,
>         XCom.task_id == self.task_id,
>         XCom.execution_date == self.execution_date
>     ).delete()
>     session.commit()
>
>
> On Sun, Sep 9, 2018 at 1:20 AM Taylor Edmiston <te...@gmail.com>
> wrote:
>
> > XComs push and pull should work as expected when a task is retried.  It
> > shouldn't make a difference, but are you using XComs with explicit keys
> or
> > the implicit return-based style? Is the push succeeding to the database?
> >
> > Also can you show a simplified example of your code for this DAG?
> >
> > Taylor
> >
> > On Thu, Sep 6, 2018 at 12:48 PM Ben Gregory <be...@astronomer.io> wrote:
> >
> > > Hi Mishika --
> > >
> > > Posting this question on StackOverflow with some code examples you're
> > using
> > > will likely be the fastest way to have this addressed.
> > >
> > > Just make sure to tag it with "airflow" so people can find it.
> > >
> > > - Ben
> > >
> > > On Thu, Sep 6, 2018 at 12:12 AM Mishika Singh <mi...@gmail.com>
> > wrote:
> > >
> > > > I am pushing some [key,value] using xcom in an operator, which I want
> > to
> > > > use when that task fails and comes for retry, for which I am using
> > > > xcom_pull in that operator. But it is returning None instead of that
> > > value.
> > > > Any pointer around this will be helpful.
> > > >
> > > >
> > > > --
> > > > Regards
> > > > Mishika Singh
> > > >
> > >
> > >
> > > --
> > >
> > > [image: Astronomer Logo] <https://www.astronomer.io/>
> > >
> > > *Ben Gregory*
> > > Data Engineer
> > >
> > > Mobile: +1-615-483-3653 • Online: astronomer.io <
> > > https://www.astronomer.io/>
> > >
> > > Download our new ebook. <http://marketing.astronomer.io/guide/> From
> > > Volume
> > > to Value - A Guide to Data Engineering.
> > >
> > --
> > *Taylor Edmiston*
> > Blog <https://blog.tedmiston.com/> | CV
> > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor> | Stack Overflow
> > <https://stackoverflow.com/users/149428/taylor-edmiston>
> >
>
>
> --
> Regards
> Mishika Singh
>

-- 








GetYourGuide AG

Stampfenbachstrasse 48  

8006 Zürich

Switzerland



 <https://www.facebook.com/GetYourGuide>  
<https://twitter.com/GetYourGuide>  
<https://www.instagram.com/getyourguide/>  
<https://www.linkedin.com/company/getyourguide-ag>  
<http://www.getyourguide.com>








Re: Use xcom in task retry

Posted by Mishika Singh <mi...@gmail.com>.
Hi Taylor,

I am providing the code to show how I am using xcom pull and push, yes the
push is succeeding to the database.
But, the behaviour that I observed is when execute() is called for retry,
xcom values are deleted from the table for combination of <dag_id, task_id,
execution_date>
Then I searched for this in the Airflow code and I found that
clear_xcom_data() is called before the execute() starts.(providing this
code as well). We are using Airflow 1.9

def execute(self, context, **kwargs):

    pull_response = ti.xcom_pull(key='computeResponse', task_ids=None)
    logging.info("pull pull_response %s", pull_response)

    ti.xcom_push('computeResponse', self.compute_response)


################

def clear_xcom_data(self, session=None):
    """
    Clears all XCom data from the database for the task instance
    """
    self.log.info('------------------->>>>> c')
    session.query(XCom).filter(
        XCom.dag_id == self.dag_id,
        XCom.task_id == self.task_id,
        XCom.execution_date == self.execution_date
    ).delete()
    session.commit()


On Sun, Sep 9, 2018 at 1:20 AM Taylor Edmiston <te...@gmail.com> wrote:

> XComs push and pull should work as expected when a task is retried.  It
> shouldn't make a difference, but are you using XComs with explicit keys or
> the implicit return-based style? Is the push succeeding to the database?
>
> Also can you show a simplified example of your code for this DAG?
>
> Taylor
>
> On Thu, Sep 6, 2018 at 12:48 PM Ben Gregory <be...@astronomer.io> wrote:
>
> > Hi Mishika --
> >
> > Posting this question on StackOverflow with some code examples you're
> using
> > will likely be the fastest way to have this addressed.
> >
> > Just make sure to tag it with "airflow" so people can find it.
> >
> > - Ben
> >
> > On Thu, Sep 6, 2018 at 12:12 AM Mishika Singh <mi...@gmail.com>
> wrote:
> >
> > > I am pushing some [key,value] using xcom in an operator, which I want
> to
> > > use when that task fails and comes for retry, for which I am using
> > > xcom_pull in that operator. But it is returning None instead of that
> > value.
> > > Any pointer around this will be helpful.
> > >
> > >
> > > --
> > > Regards
> > > Mishika Singh
> > >
> >
> >
> > --
> >
> > [image: Astronomer Logo] <https://www.astronomer.io/>
> >
> > *Ben Gregory*
> > Data Engineer
> >
> > Mobile: +1-615-483-3653 • Online: astronomer.io <
> > https://www.astronomer.io/>
> >
> > Download our new ebook. <http://marketing.astronomer.io/guide/> From
> > Volume
> > to Value - A Guide to Data Engineering.
> >
> --
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>


-- 
Regards
Mishika Singh

Re: Use xcom in task retry

Posted by Taylor Edmiston <te...@gmail.com>.
XComs push and pull should work as expected when a task is retried.  It
shouldn't make a difference, but are you using XComs with explicit keys or
the implicit return-based style? Is the push succeeding to the database?

Also can you show a simplified example of your code for this DAG?

Taylor

On Thu, Sep 6, 2018 at 12:48 PM Ben Gregory <be...@astronomer.io> wrote:

> Hi Mishika --
>
> Posting this question on StackOverflow with some code examples you're using
> will likely be the fastest way to have this addressed.
>
> Just make sure to tag it with "airflow" so people can find it.
>
> - Ben
>
> On Thu, Sep 6, 2018 at 12:12 AM Mishika Singh <mi...@gmail.com> wrote:
>
> > I am pushing some [key,value] using xcom in an operator, which I want to
> > use when that task fails and comes for retry, for which I am using
> > xcom_pull in that operator. But it is returning None instead of that
> value.
> > Any pointer around this will be helpful.
> >
> >
> > --
> > Regards
> > Mishika Singh
> >
>
>
> --
>
> [image: Astronomer Logo] <https://www.astronomer.io/>
>
> *Ben Gregory*
> Data Engineer
>
> Mobile: +1-615-483-3653 • Online: astronomer.io <
> https://www.astronomer.io/>
>
> Download our new ebook. <http://marketing.astronomer.io/guide/> From
> Volume
> to Value - A Guide to Data Engineering.
>
-- 
*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>

Re: Use xcom in task retry

Posted by Ben Gregory <be...@astronomer.io>.
Hi Mishika --

Posting this question on StackOverflow with some code examples you're using
will likely be the fastest way to have this addressed.

Just make sure to tag it with "airflow" so people can find it.

- Ben

On Thu, Sep 6, 2018 at 12:12 AM Mishika Singh <mi...@gmail.com> wrote:

> I am pushing some [key,value] using xcom in an operator, which I want to
> use when that task fails and comes for retry, for which I am using
> xcom_pull in that operator. But it is returning None instead of that value.
> Any pointer around this will be helpful.
>
>
> --
> Regards
> Mishika Singh
>


-- 

[image: Astronomer Logo] <https://www.astronomer.io/>

*Ben Gregory*
Data Engineer

Mobile: +1-615-483-3653 • Online: astronomer.io <https://www.astronomer.io/>

Download our new ebook. <http://marketing.astronomer.io/guide/> From Volume
to Value - A Guide to Data Engineering.