You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Furcy Pin <pi...@gmail.com> on 2020/06/01 10:45:43 UTC

Intra-task "xcom"

Hello,

I would like to open a feature request for Airflow to support "intra-task
xcom".

It seems that there are several distinct use cases for it already
and only ugly workarounds and I wanted to list them in a JIRA ticket.

I wanted to summarize links to the use cases and past attempts,
and the recommended approach (which apparently would be to create
a distinct feature from xcom to support this, it could be calle intra-com
or self-com ?)

Do you know if such ticket already exists? I couldn't find one.
Also I can't create any ticket due to some obscure bug (see my other email).

Thanks,

Furcy

Re: Intra-task "xcom"

Posted by Chris Palmer <ch...@crpalmer.com>.
Furcy,
Yes knowing that all DAGs have some sort of access to the metastore is the
one big advantage I can see to adding a state storage to Airflow. I agree
that we shouldn't expect it to handle millions of operations a second, but
if every task in every DAG is reading and writing to the state then that is
potentially a lot of added load on the metastore. Again there are trade
offs. I certainly don't think it's the worst idea to add to Airflow, but I
don't think it is the best either.

But without any others voicing agreement with my viewpoint, I'll turn away
from the "should this be added" question, and move on to the "what should
it look like".

Jarek,
To that point what would be the reasoning for only allowing keys of a
particular structure like (dag_id, task_id, execution_date)? Why not allow
for the possibility of arbitrary keys? Putting restrictions on the keys
seems unnecessary. We can certainly add convenience methods to make
standardized keys based on a task instance or a task.

I would suggest we mirror much of structure of the Secrets backend code.

   - Create a BaseStateBackend abstract base class so that it is easy to
   implement other storage options.
   - Implement a MetastoreStateBackend as the first simple implementation
   of the abstract class.

I couldn't help myself, so here is a pastebin of a really quick rough draft
of what the BaseStateBackend abstract base class could look like:
https://pastebin.com/vKx8W9DG

It has a few methods that are left up to subclasses to implement and then a
few convenience functions that take the context dictionary passed to
operators execute method and build a few predefined keys. Obviously it's
lacking in documentation and type hinting and probably a few other issues,
but I think it covers the basics.

Chris

On Tue, Jun 2, 2020 at 8:00 AM Jarek Potiuk <Ja...@polidea.com>
wrote:

> I personally think that *if* we decide to implement "state service" - it
> should be as simple as possible - and let operators decide what to do with
> it. And do not do anything with it by the Airflow Core.
>
> I think we should decide what key should be used for that data (task_id,
> dag_id, execution_date ? ) and delegate full management of the state
> lifecycle to Operator implementation. I see this as a "service" delivered
> by Airflow that the running task instance (by all instantiations of that
> task instance)  could use to access its own private data.
>
> It's more of a convenient "service" available to all operators without
> having to implement your own storage than anything else.
>
> I see the benefit for the users for that, I do not see now how abusing it
> can harm Airflow (and it's core) more than current abuse with Variables.
>
> J.
>
>
> On Tue, Jun 2, 2020 at 11:23 AM Furcy Pin <pi...@gmail.com> wrote:
>
>> Chris,
>>
>> I think that the main downside of asking users to use another external
>> storage like S3 or whatever is that it doesn't make Operator easily
>> reusable across the community.
>> If I open-source a LivyOperator that handles my problem by storing stuff
>> in Azure Blob Storage, someone on AWS won't be able to use it as-is and
>> will have to customize it,
>> or make a pull request to support S3. Then someone else will make a pull
>> request to support GS, etc. It is doable but I believe it would add
>> avoidable complexity on
>> the Operator's side.
>>
>> This is the main advantage I see with providing such functionality. I
>> think that MySQL and Postgres handle key-value use-cases just fine as long
>> as you create
>> an index for the key in the table that will store them. Of course it
>> won't handle millions of operations per second, but if you are using
>> Airflow for that you probably
>> are doing something wrong.
>>
>> I agree that in my mind Variables were meant to hold configuration and
>> secrets that can be updated manually by the users for certain use cases,
>> but not holding task internal states.
>> I don't really mind and will comply if the consensus is to use Variables
>> for that. The fact that Variables can be made read-only it a good point for
>> separating the XState use-case from it, though.
>>
>> One of the key questions if a StateService is implemented would be:
>> should the state be removed on retry or on reschedule or not at all?
>> Or maybe it's the Operator's job to know what to do depending on whether
>> it is retrying or rescheduling, but I'm not sure Operators have a way to
>> know that.
>>
>> Regards,
>>
>> Furcy
>>
>>
>> On Tue, 2 Jun 2020 at 11:03, Jarek Potiuk <Ja...@polidea.com>
>> wrote:
>>
>>> I think no-one wants to remove xcom really :)
>>>
>>> On Tue, Jun 2, 2020 at 10:32 AM Michael Lutz <mi...@gmail.com>
>>> wrote:
>>>
>>>> please do not remove existing xcom functionality.   I am using it
>>>> extensively.   If you implement something more robust or elegant, that
>>>> would be fine.  I feel that a more robust state management system would be
>>>> helpful it feels like an area of improvement.
>>>>
>>>> On Tue, Jun 2, 2020, 3:05 AM Jarek Potiuk <Ja...@polidea.com>
>>>> wrote:
>>>>
>>>>> I think this subject came so often, that I also change my mind slowly
>>>>> in favor of making an explicit state persistence "service".
>>>>>
>>>>> Whether it's only one key or more, it's secondary, I think but if
>>>>> users are already using Variables to keep state for tasks - this is a clear
>>>>> sign that we miss a crucial feature and our users are abusing Airflow
>>>>> already in the way we try to prevent by not introducing "State service".
>>>>>
>>>>> With the recent SecretBackend implementation where Variables might be
>>>>> kept in a Secret backend - not only MetaStore - potentially you might have
>>>>> no write access to the backend. There is even no "write" support in the
>>>>> current "MetastoreBackend" implementation for writing variables. So we
>>>>> already have configurations where if we try to write variables and read it
>>>>> elsewhere might not work - as far as I can see. You can set several
>>>>> backends of course and the Metastore as the last fallback of course, but
>>>>> for me, it opens up different problems - what happens if the key is present
>>>>> in both, tasks writes it to metastore, but another task reads it from the
>>>>> Secret Backend.
>>>>>
>>>>> I think it seems that variables are being abused in exactly the way we
>>>>> want to prevent the "StateService" to be abused - and shying away from that
>>>>> is really like closing our eyes and pretending it's not happening.
>>>>>
>>>>> So maybe we can make a change AIP with this approach:
>>>>>
>>>>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>>>>> configuration shared between workers (but not on a task level).
>>>>> 2) StateService (or wherever we call it) where we keep state
>>>>> information for specific dag + task + execution_date.
>>>>>
>>>>> J.
>>>>>
>>>>>
>>>>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dp...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Airflow already provides a mechanism for state persistence: the
>>>>>> Variable, and, with caveats and flaws, XCom.
>>>>>>
>>>>>> I personally persist state to the airflow metastore database for a
>>>>>> large percentage of our jobs.  They are incremental jobs and it is helpful
>>>>>> to keep track of watermark.
>>>>>>
>>>>>> I think that incremental jobs are probably very very common in
>>>>>> airflow implementations.  Though probably often times users resort to
>>>>>> imperfect vehicles for this such as `execution_date` or xcom.
>>>>>>
>>>>>> I have a very draftey draft aip that i haven't had enough time to
>>>>>> work on, which explores adding explicit support for state persistence to
>>>>>> airflow:
>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>>>>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>>>>> ready for primetime.)
>>>>>>
>>>>>> I am of the mind that being able to persist some state is not a
>>>>>> fundamental change to airflow and would just add explicit (and more
>>>>>> user-friendly) support for something that is already quite common, and fits
>>>>>> fully within the wheelhouse of what airflow exists to do.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Furcy,
>>>>>>>
>>>>>>> To clarify, when I say that Airflow should not be in the business of
>>>>>>> keeping state about external systems, I specifically mean it shouldn't be
>>>>>>> keeping state to be shared between task instances. I completely understand
>>>>>>> that there may be external systems that are harder to work with, and like
>>>>>>> in your case require the operator to be able to store some piece of
>>>>>>> information to make them idempotent. I just don't think that Airflow should
>>>>>>> provide that storage mechanism.
>>>>>>>
>>>>>>> I would think that most users of Airflow have access to some sort of
>>>>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>>>>> easy enough to write your job_id or whatever value you care about to a file
>>>>>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>>>>>> combination of them you care about. Yes it makes your operators more
>>>>>>> complex and they have to know about another system, but it keeps that
>>>>>>> complexity out of core Airflow. That's the trade off.
>>>>>>>
>>>>>>> Ash,
>>>>>>> I'm not suggesting that XCom be removed from Airflow, and I
>>>>>>> understand there are use cases where it makes some things convenient. In
>>>>>>> your example though, it would be just as easy for the sensor to write the
>>>>>>> found object path as the contents of another file in S3, with a computable
>>>>>>> prefix based on the dag/task/execution_date.
>>>>>>>
>>>>>>>
>>>>>>> At its heart XCom is just a key-value store where the keys are
>>>>>>> limited to a very specific set of possibilities, and where key-value pairs
>>>>>>> are managed in some specific ways. The request here is to add another
>>>>>>> narrowly defined set of allowable keys, and as far as I can tell with no
>>>>>>> extra management of them. The only real advantage of using the Airflow
>>>>>>> database for XCom or any expansion/variation on it is that we know that all
>>>>>>> operators have access to the database.
>>>>>>>
>>>>>>> I'm not an expert but I would wonder how well Postgres or MySQL
>>>>>>> perform as high volume key value stores. Does anyone actually use XCom at
>>>>>>> scale, and does that extra load on the database impact scheduling and other
>>>>>>> performance aspects of Airflow?
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>>>>> they, or something very like them are in Airflow for the foreseeable future.
>>>>>>>>
>>>>>>>> As an example of an appropriate use for XCom: Let's say a third
>>>>>>>> party delivers you a set of files once a week, but the exact name of the
>>>>>>>> files isn't known (by you) in advance. So you write a sensor that
>>>>>>>> polls/checks S3 for the Objects to appear in our bucket, and the sensor
>>>>>>>> outputs the S3 Object path to XCom, that then next processing step then
>>>>>>>> examines to process the files.
>>>>>>>>
>>>>>>>> That sort of use case is not going anywhere.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> -ash
>>>>>>>>
>>>>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>>>>>>>
>>>>>>>> At the risk of repeating myself (from the previous thread that
>>>>>>>> touched on this topic), I don't think Airflow should be in the business of
>>>>>>>> keeping state about external systems. Airflow is about authoring and
>>>>>>>> running workflows; it's not a messaging tool or a cluster management tool.
>>>>>>>> I'm not convinced that the existing XCom functionality should really be a
>>>>>>>> part of Airflow, and I certainly don't think it should be expanded upon or
>>>>>>>> new variations added. I think storing state is especially risky, if for no
>>>>>>>> other reason than the fact that Airflow is not the source of truth about
>>>>>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>>>>>> has saved will diverge from the actual state of the external system.
>>>>>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>>>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>>>>>> operator code complexity. Users would be much better served going to the
>>>>>>>> source of truth to determine state. If part of the problem is that Livy is
>>>>>>>> lacking in features (like being able to query the status of a particular
>>>>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>>>>>> making up for failures of other tools.
>>>>>>>>
>>>>>>>> Also as can be seen by just this discussion, it's hard to keep
>>>>>>>> these extra features from expanding in scope. Jarek proposed something that
>>>>>>>> would just store a single string, and immediately Furcy wants to expand it
>>>>>>>> to store multiple strings. Either way we are really just talking about a
>>>>>>>> key-value store, and putting limits on how that key can be structured; the
>>>>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>>>>>> proposal).
>>>>>>>>
>>>>>>>> I know in the past that I had a situation where I wanted to reuse a
>>>>>>>> cluster across multiple data intervals, if one was already running (this
>>>>>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>>>>>> can equally see use cases where I might want to share some resource for
>>>>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>>>>>>> added this then why limit it to any one of those combinations? But then we
>>>>>>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>>>>>>> then you can use Variables, if you want to use something else then you can.
>>>>>>>>
>>>>>>>> Unless Airflow is doing some extra management of these key-values
>>>>>>>> in some way (like it does with clearing out XCom's on reruns), then I see
>>>>>>>> absolutely no added benefit. And even with some potential management by
>>>>>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Thank you Jarek for the detailed explanation,
>>>>>>>>
>>>>>>>> That's exactly what I wanted to do: write a feature request to
>>>>>>>> summarize all those discussions.
>>>>>>>> I agree with you that the feature should be marked distinct from
>>>>>>>> the XCom feature and that we should not piggyback this feature into XCom.
>>>>>>>>
>>>>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>>>>> task to delete it's xcom on the beginning of the retry.
>>>>>>>> Correct me if I'm wrong but one use cases where it was necessary
>>>>>>>> was having a task A and a task B that starts immediately after A, and wait
>>>>>>>> from some 'signal' from A.
>>>>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>>>>> the signal from A's first try, which is incorrect.
>>>>>>>>
>>>>>>>> About the 3 solutions you mention:
>>>>>>>>
>>>>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in
>>>>>>>> my use-case Livy's API is poorly designed and only returns a generated
>>>>>>>> job_id, you can't specify a custom one.
>>>>>>>> You can't even find a job by name, I would have to list all the
>>>>>>>> active job_ids, and do a GET for each of them to get it's name and find
>>>>>>>> which one is the one I want. It's doable but inelegant.
>>>>>>>>
>>>>>>>> 2) Store the id in an external storage. Of course it would work but
>>>>>>>> it requires an external storage. More on that below.
>>>>>>>>
>>>>>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>>>>>> think you mean that the idempotency can be handled by the service you call
>>>>>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>>>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>>>>>> could have something to prevent a job that run twice from creating
>>>>>>>> duplicates. This is another solution we are considering, but it is
>>>>>>>> coslty to change now.
>>>>>>>>
>>>>>>>> You guess correctly that the feature I was asking for me would be
>>>>>>>> to provide some utility to let the users implement solution 2) without
>>>>>>>> requiring an external storage.
>>>>>>>> I think it would be a QOL improvement for some use cases, just like
>>>>>>>> it could be argued that XCom is just a QOL improvement and users could have
>>>>>>>> used an external storage themselves.
>>>>>>>> The main advantage that it brings is making the custom operators
>>>>>>>> much easier to share and reuse across the Apache Airflow community,
>>>>>>>> compared to having to set up some external
>>>>>>>> storage.
>>>>>>>>
>>>>>>>> I have seen that some users used the metadata store itself as an
>>>>>>>> external storage by adding a new table to the airflow model:
>>>>>>>>
>>>>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>>>>>>>
>>>>>>>> And others suggested using XCom itself as an external storage by
>>>>>>>> storing information with a special task_id:
>>>>>>>> https://stackoverflow.com/a/57515143/2087478
>>>>>>>>
>>>>>>>> In the discussion thread you provided it was also suggested to use
>>>>>>>> Variables to store some persisting information.
>>>>>>>>
>>>>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>>>>> providing such functionality would be good.
>>>>>>>>
>>>>>>>> Finally, I don't see the point of limiting the functionality to
>>>>>>>> such extent, providing a "IdempotencyIdStorage" that only allows you to
>>>>>>>> store a string
>>>>>>>> will just force people who need to store more than one id for one
>>>>>>>> task (for whatever reason) to use some hack again, like storing a json
>>>>>>>> inside the storage.
>>>>>>>>
>>>>>>>> I was more thinking about something quite similar to XCom (I liked
>>>>>>>> the XState name suggestion), where the entry would be keyed by "(dag_id,
>>>>>>>> task_id, execution_date, key)"
>>>>>>>> where "key" can be whatever you want and would be kept across
>>>>>>>> retries.
>>>>>>>>
>>>>>>>> I have read (quickly) through the "Pandora's Box" thread you
>>>>>>>> linked. Indeed it looks like there would be many ways to misuse such
>>>>>>>> feature.
>>>>>>>> I do understand the important of idempotency, and it looks like my
>>>>>>>> use case is one of the first ever listed where I do need to persist a state
>>>>>>>> across retries to make my operator really idempotent.
>>>>>>>>
>>>>>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>>>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>>>>>> another solution).
>>>>>>>>
>>>>>>>> Of course we can blame it on Livy for being poorly conceived
>>>>>>>> (unlike BigQuery) or we can blame it on Spark for not having a built-in
>>>>>>>> security mechanism to prevent double-writes,
>>>>>>>> but I think that as the above hacks show, you can't really prevent
>>>>>>>> users from shooting themselves in the foot if that's what they really want
>>>>>>>> to.
>>>>>>>>
>>>>>>>> While I do think that making things foolproof is important, I
>>>>>>>> believe it's also in Python's philosophy to *not* make things
>>>>>>>> foolproof at the detriment of simplicity for the right use cases.
>>>>>>>> But I do understand that the use cases are different and
>>>>>>>> contradictory: some would require the state to be persisted across
>>>>>>>> reschedule and not retries, mine would require the state to be persisted
>>>>>>>> across retries and not reschedule.
>>>>>>>>
>>>>>>>> Maybe the Airflow-y way for that would be to have one task that
>>>>>>>> does the submit and an xcom with the job, then one task that check the
>>>>>>>> progress of the job, but that feels very cumbersome to double the number of
>>>>>>>> tasks just for that. Plus I'm not sure we could make the first task retry
>>>>>>>> if the second task fails...
>>>>>>>>
>>>>>>>> Thanks again,
>>>>>>>>
>>>>>>>> Furcy
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> I think we've discussed several approaches like that and using Xcom
>>>>>>>> name (which for many people would mean "let's just extend XCom table for
>>>>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>>>>> different functionality/logic which we might or might not agree to
>>>>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>>>>>> table behavior might be problematic.
>>>>>>>>
>>>>>>>> Not sure if you are aware but we had very similar discussion about
>>>>>>>> it recently (without clear conclusions but at least you can see what kind
>>>>>>>> of issues/problems different people have with this approach)
>>>>>>>>
>>>>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>>>>>
>>>>>>>> I am not saying it is impossible to do, but I think it's a matter
>>>>>>>> of how we formulate the "use case". It's very tempting to implement a
>>>>>>>> generic - intra-task communication mechanism, indeed. But it can very
>>>>>>>> easily lead to people abusing it and bypassing the guarantees (idempotency
>>>>>>>> mainly) that Airflow provides for backfilling and re-running tasks. I
>>>>>>>> thought a bit after the latest discussion kind of died out, and I have one
>>>>>>>> possible solution to the problem.
>>>>>>>>
>>>>>>>> Let me explain what I think about it (but others can have different
>>>>>>>> opinions of course):
>>>>>>>>
>>>>>>>> So far the discussion was that there are several ways to achieve
>>>>>>>> what you want (and it's really about what entity is providing the
>>>>>>>> "idempotency" guarantee:
>>>>>>>>
>>>>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>>>>> https://github.com/apache/airflow/pull/8868/files - you can
>>>>>>>> provide job_id from outside. You'd need to work out the job_id naming that
>>>>>>>> works in your case and make sure that when you re-run your task with the
>>>>>>>> same (dag_id, task_id, execution date) you will get the same id. Then the
>>>>>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>>>>>
>>>>>>>> 2) Store the DAG id in some external storage (via one of the hooks
>>>>>>>> - where it can be queried in the way that will work for you). Then the
>>>>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>>>>> external storage.
>>>>>>>>
>>>>>>>> 3) Query your service and retrieve the JOB ID from it - but you
>>>>>>>> have to have a way to query for the job related to your "dag id  + task
>>>>>>>> + execution_date". Then - the idempotency is actually handling by the
>>>>>>>> Service you are using.
>>>>>>>>
>>>>>>>> In the use case, you describe - this is the only thing you need -
>>>>>>>> "idempotency source". I believe you would like to get the case 2) from
>>>>>>>> above but without having to use external storage to store the "unique id".
>>>>>>>> Something that will let each task in the same dag run to set or retrieve a
>>>>>>>> unique value for that particular task. One value should be enough -
>>>>>>>> assuming that each operator/task works on one external data "source".
>>>>>>>>
>>>>>>>> My current thinking is:
>>>>>>>>
>>>>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>>>>>> class with two methods:
>>>>>>>>
>>>>>>>>   * .set(id: str) and
>>>>>>>>   * .get() -> str
>>>>>>>>
>>>>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>>>>> task_id, execution_date)" - available also via Jinja templating. There is
>>>>>>>> no intra-task communication, here, very little possibility of abuse and it
>>>>>>>> seems to solve the major pain point where you have to provide your own
>>>>>>>> storage to get the idempotency if your service does not provide one or you
>>>>>>>> do not want to delegate it to the DAG writer.
>>>>>>>>
>>>>>>>> J.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> The use case I'm referring to is that you can't use xcom to let a
>>>>>>>> task read information from it's past attempts, because when a task starts
>>>>>>>> it's xcom is automatically deleted.
>>>>>>>>
>>>>>>>> My specific use case is that we have a custom LivyOperator that
>>>>>>>> calls Livy to start batch Spark Jobs.
>>>>>>>> When you start a batch job Livy returns a job_id
>>>>>>>> Sometimes our operator can fail for one reason or another (for
>>>>>>>> instance if Livy is unreachable for a while)
>>>>>>>> When the task retries, it calls Livy again, which start the same
>>>>>>>> spark job, but the problem is that the spark job from the first attempt can
>>>>>>>> still be running,
>>>>>>>> and then we have a batch job that runs twice simultaneously and
>>>>>>>> creates duplicates in the output.
>>>>>>>>
>>>>>>>> What we tried to do is getting the job_id from the first try, to
>>>>>>>> check if the job is still running, and wait for it to complete if it is.
>>>>>>>>
>>>>>>>> We tried using xcom to let the task send a message to itself (to
>>>>>>>> it's next try) but xcom is meant for "inter-task communication" only so
>>>>>>>> this doesn't work and is not intended to work.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Furcy,
>>>>>>>>
>>>>>>>> Can you give a concrete example of what you mean by intra-task
>>>>>>>> xcom? Depending your use case this may already be possible.
>>>>>>>>
>>>>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I would like to open a feature request for Airflow to support
>>>>>>>> "intra-task xcom".
>>>>>>>>
>>>>>>>> It seems that there are several distinct use cases for it already
>>>>>>>> and only ugly workarounds and I wanted to list them in a JIRA
>>>>>>>> ticket.
>>>>>>>>
>>>>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>>>>> and the recommended approach (which apparently would be to create
>>>>>>>> a distinct feature from xcom to support this, it could be calle
>>>>>>>> intra-com or self-com ?)
>>>>>>>>
>>>>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>>>>> Also I can't create any ticket due to some obscure bug (see my
>>>>>>>> other email).
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Furcy
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Jarek Potiuk
>>>>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>>>>
>>>>>>>> M: +48 660 796 129 <+48660796129>
>>>>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Jarek Potiuk
>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>
>>>>> M: +48 660 796 129 <+48660796129>
>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>
>>>>>
>>>
>>> --
>>>
>>> Jarek Potiuk
>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>
>>> M: +48 660 796 129 <+48660796129>
>>> [image: Polidea] <https://www.polidea.com/>
>>>
>>>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>

Re: Intra-task "xcom"

Posted by Jarek Potiuk <Ja...@polidea.com>.
I personally think that *if* we decide to implement "state service" - it
should be as simple as possible - and let operators decide what to do with
it. And do not do anything with it by the Airflow Core.

I think we should decide what key should be used for that data (task_id,
dag_id, execution_date ? ) and delegate full management of the state
lifecycle to Operator implementation. I see this as a "service" delivered
by Airflow that the running task instance (by all instantiations of that
task instance)  could use to access its own private data.

It's more of a convenient "service" available to all operators without
having to implement your own storage than anything else.

I see the benefit for the users for that, I do not see now how abusing it
can harm Airflow (and it's core) more than current abuse with Variables.

J.


On Tue, Jun 2, 2020 at 11:23 AM Furcy Pin <pi...@gmail.com> wrote:

> Chris,
>
> I think that the main downside of asking users to use another external
> storage like S3 or whatever is that it doesn't make Operator easily
> reusable across the community.
> If I open-source a LivyOperator that handles my problem by storing stuff
> in Azure Blob Storage, someone on AWS won't be able to use it as-is and
> will have to customize it,
> or make a pull request to support S3. Then someone else will make a pull
> request to support GS, etc. It is doable but I believe it would add
> avoidable complexity on
> the Operator's side.
>
> This is the main advantage I see with providing such functionality. I
> think that MySQL and Postgres handle key-value use-cases just fine as long
> as you create
> an index for the key in the table that will store them. Of course it won't
> handle millions of operations per second, but if you are using Airflow for
> that you probably
> are doing something wrong.
>
> I agree that in my mind Variables were meant to hold configuration and
> secrets that can be updated manually by the users for certain use cases,
> but not holding task internal states.
> I don't really mind and will comply if the consensus is to use Variables
> for that. The fact that Variables can be made read-only it a good point for
> separating the XState use-case from it, though.
>
> One of the key questions if a StateService is implemented would be: should
> the state be removed on retry or on reschedule or not at all?
> Or maybe it's the Operator's job to know what to do depending on whether
> it is retrying or rescheduling, but I'm not sure Operators have a way to
> know that.
>
> Regards,
>
> Furcy
>
>
> On Tue, 2 Jun 2020 at 11:03, Jarek Potiuk <Ja...@polidea.com>
> wrote:
>
>> I think no-one wants to remove xcom really :)
>>
>> On Tue, Jun 2, 2020 at 10:32 AM Michael Lutz <mi...@gmail.com>
>> wrote:
>>
>>> please do not remove existing xcom functionality.   I am using it
>>> extensively.   If you implement something more robust or elegant, that
>>> would be fine.  I feel that a more robust state management system would be
>>> helpful it feels like an area of improvement.
>>>
>>> On Tue, Jun 2, 2020, 3:05 AM Jarek Potiuk <Ja...@polidea.com>
>>> wrote:
>>>
>>>> I think this subject came so often, that I also change my mind slowly
>>>> in favor of making an explicit state persistence "service".
>>>>
>>>> Whether it's only one key or more, it's secondary, I think but if users
>>>> are already using Variables to keep state for tasks - this is a clear sign
>>>> that we miss a crucial feature and our users are abusing Airflow already in
>>>> the way we try to prevent by not introducing "State service".
>>>>
>>>> With the recent SecretBackend implementation where Variables might be
>>>> kept in a Secret backend - not only MetaStore - potentially you might have
>>>> no write access to the backend. There is even no "write" support in the
>>>> current "MetastoreBackend" implementation for writing variables. So we
>>>> already have configurations where if we try to write variables and read it
>>>> elsewhere might not work - as far as I can see. You can set several
>>>> backends of course and the Metastore as the last fallback of course, but
>>>> for me, it opens up different problems - what happens if the key is present
>>>> in both, tasks writes it to metastore, but another task reads it from the
>>>> Secret Backend.
>>>>
>>>> I think it seems that variables are being abused in exactly the way we
>>>> want to prevent the "StateService" to be abused - and shying away from that
>>>> is really like closing our eyes and pretending it's not happening.
>>>>
>>>> So maybe we can make a change AIP with this approach:
>>>>
>>>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>>>> configuration shared between workers (but not on a task level).
>>>> 2) StateService (or wherever we call it) where we keep state
>>>> information for specific dag + task + execution_date.
>>>>
>>>> J.
>>>>
>>>>
>>>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dp...@gmail.com>
>>>> wrote:
>>>>
>>>>> Airflow already provides a mechanism for state persistence: the
>>>>> Variable, and, with caveats and flaws, XCom.
>>>>>
>>>>> I personally persist state to the airflow metastore database for a
>>>>> large percentage of our jobs.  They are incremental jobs and it is helpful
>>>>> to keep track of watermark.
>>>>>
>>>>> I think that incremental jobs are probably very very common in airflow
>>>>> implementations.  Though probably often times users resort to imperfect
>>>>> vehicles for this such as `execution_date` or xcom.
>>>>>
>>>>> I have a very draftey draft aip that i haven't had enough time to work
>>>>> on, which explores adding explicit support for state persistence to
>>>>> airflow:
>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>>>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>>>> ready for primetime.)
>>>>>
>>>>> I am of the mind that being able to persist some state is not a
>>>>> fundamental change to airflow and would just add explicit (and more
>>>>> user-friendly) support for something that is already quite common, and fits
>>>>> fully within the wheelhouse of what airflow exists to do.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com>
>>>>> wrote:
>>>>>
>>>>>> Furcy,
>>>>>>
>>>>>> To clarify, when I say that Airflow should not be in the business of
>>>>>> keeping state about external systems, I specifically mean it shouldn't be
>>>>>> keeping state to be shared between task instances. I completely understand
>>>>>> that there may be external systems that are harder to work with, and like
>>>>>> in your case require the operator to be able to store some piece of
>>>>>> information to make them idempotent. I just don't think that Airflow should
>>>>>> provide that storage mechanism.
>>>>>>
>>>>>> I would think that most users of Airflow have access to some sort of
>>>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>>>> easy enough to write your job_id or whatever value you care about to a file
>>>>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>>>>> combination of them you care about. Yes it makes your operators more
>>>>>> complex and they have to know about another system, but it keeps that
>>>>>> complexity out of core Airflow. That's the trade off.
>>>>>>
>>>>>> Ash,
>>>>>> I'm not suggesting that XCom be removed from Airflow, and I
>>>>>> understand there are use cases where it makes some things convenient. In
>>>>>> your example though, it would be just as easy for the sensor to write the
>>>>>> found object path as the contents of another file in S3, with a computable
>>>>>> prefix based on the dag/task/execution_date.
>>>>>>
>>>>>>
>>>>>> At its heart XCom is just a key-value store where the keys are
>>>>>> limited to a very specific set of possibilities, and where key-value pairs
>>>>>> are managed in some specific ways. The request here is to add another
>>>>>> narrowly defined set of allowable keys, and as far as I can tell with no
>>>>>> extra management of them. The only real advantage of using the Airflow
>>>>>> database for XCom or any expansion/variation on it is that we know that all
>>>>>> operators have access to the database.
>>>>>>
>>>>>> I'm not an expert but I would wonder how well Postgres or MySQL
>>>>>> perform as high volume key value stores. Does anyone actually use XCom at
>>>>>> scale, and does that extra load on the database impact scheduling and other
>>>>>> performance aspects of Airflow?
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>>>> they, or something very like them are in Airflow for the foreseeable future.
>>>>>>>
>>>>>>> As an example of an appropriate use for XCom: Let's say a third
>>>>>>> party delivers you a set of files once a week, but the exact name of the
>>>>>>> files isn't known (by you) in advance. So you write a sensor that
>>>>>>> polls/checks S3 for the Objects to appear in our bucket, and the sensor
>>>>>>> outputs the S3 Object path to XCom, that then next processing step then
>>>>>>> examines to process the files.
>>>>>>>
>>>>>>> That sort of use case is not going anywhere.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> -ash
>>>>>>>
>>>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>>>>>>
>>>>>>> At the risk of repeating myself (from the previous thread that
>>>>>>> touched on this topic), I don't think Airflow should be in the business of
>>>>>>> keeping state about external systems. Airflow is about authoring and
>>>>>>> running workflows; it's not a messaging tool or a cluster management tool.
>>>>>>> I'm not convinced that the existing XCom functionality should really be a
>>>>>>> part of Airflow, and I certainly don't think it should be expanded upon or
>>>>>>> new variations added. I think storing state is especially risky, if for no
>>>>>>> other reason than the fact that Airflow is not the source of truth about
>>>>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>>>>> has saved will diverge from the actual state of the external system.
>>>>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>>>>> operator code complexity. Users would be much better served going to the
>>>>>>> source of truth to determine state. If part of the problem is that Livy is
>>>>>>> lacking in features (like being able to query the status of a particular
>>>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>>>>> making up for failures of other tools.
>>>>>>>
>>>>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>>>>> extra features from expanding in scope. Jarek proposed something that would
>>>>>>> just store a single string, and immediately Furcy wants to expand it to
>>>>>>> store multiple strings. Either way we are really just talking about a
>>>>>>> key-value store, and putting limits on how that key can be structured; the
>>>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>>>>> proposal).
>>>>>>>
>>>>>>> I know in the past that I had a situation where I wanted to reuse a
>>>>>>> cluster across multiple data intervals, if one was already running (this
>>>>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>>>>> can equally see use cases where I might want to share some resource for
>>>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>>>>>> added this then why limit it to any one of those combinations? But then we
>>>>>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>>>>>> then you can use Variables, if you want to use something else then you can.
>>>>>>>
>>>>>>> Unless Airflow is doing some extra management of these key-values in
>>>>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>>>>> absolutely no added benefit. And even with some potential management by
>>>>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Thank you Jarek for the detailed explanation,
>>>>>>>
>>>>>>> That's exactly what I wanted to do: write a feature request to
>>>>>>> summarize all those discussions.
>>>>>>> I agree with you that the feature should be marked distinct from the
>>>>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>>>>
>>>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>>>> task to delete it's xcom on the beginning of the retry.
>>>>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>>>>> having a task A and a task B that starts immediately after A, and wait from
>>>>>>> some 'signal' from A.
>>>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>>>> the signal from A's first try, which is incorrect.
>>>>>>>
>>>>>>> About the 3 solutions you mention:
>>>>>>>
>>>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>>>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>>>>>> you can't specify a custom one.
>>>>>>> You can't even find a job by name, I would have to list all the
>>>>>>> active job_ids, and do a GET for each of them to get it's name and find
>>>>>>> which one is the one I want. It's doable but inelegant.
>>>>>>>
>>>>>>> 2) Store the id in an external storage. Of course it would work but
>>>>>>> it requires an external storage. More on that below.
>>>>>>>
>>>>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>>>>> think you mean that the idempotency can be handled by the service you call
>>>>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>>>>> could have something to prevent a job that run twice from creating
>>>>>>> duplicates. This is another solution we are considering, but it is
>>>>>>> coslty to change now.
>>>>>>>
>>>>>>> You guess correctly that the feature I was asking for me would be to
>>>>>>> provide some utility to let the users implement solution 2) without
>>>>>>> requiring an external storage.
>>>>>>> I think it would be a QOL improvement for some use cases, just like
>>>>>>> it could be argued that XCom is just a QOL improvement and users could have
>>>>>>> used an external storage themselves.
>>>>>>> The main advantage that it brings is making the custom operators
>>>>>>> much easier to share and reuse across the Apache Airflow community,
>>>>>>> compared to having to set up some external
>>>>>>> storage.
>>>>>>>
>>>>>>> I have seen that some users used the metadata store itself as an
>>>>>>> external storage by adding a new table to the airflow model:
>>>>>>>
>>>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>>>>>>
>>>>>>> And others suggested using XCom itself as an external storage by
>>>>>>> storing information with a special task_id:
>>>>>>> https://stackoverflow.com/a/57515143/2087478
>>>>>>>
>>>>>>> In the discussion thread you provided it was also suggested to use
>>>>>>> Variables to store some persisting information.
>>>>>>>
>>>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>>>> providing such functionality would be good.
>>>>>>>
>>>>>>> Finally, I don't see the point of limiting the functionality to such
>>>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>>>>>> string
>>>>>>> will just force people who need to store more than one id for one
>>>>>>> task (for whatever reason) to use some hack again, like storing a json
>>>>>>> inside the storage.
>>>>>>>
>>>>>>> I was more thinking about something quite similar to XCom (I liked
>>>>>>> the XState name suggestion), where the entry would be keyed by "(dag_id,
>>>>>>> task_id, execution_date, key)"
>>>>>>> where "key" can be whatever you want and would be kept across
>>>>>>> retries.
>>>>>>>
>>>>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>>>>> I do understand the important of idempotency, and it looks like my
>>>>>>> use case is one of the first ever listed where I do need to persist a state
>>>>>>> across retries to make my operator really idempotent.
>>>>>>>
>>>>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>>>>> another solution).
>>>>>>>
>>>>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>>>>> mechanism to prevent double-writes,
>>>>>>> but I think that as the above hacks show, you can't really prevent
>>>>>>> users from shooting themselves in the foot if that's what they really want
>>>>>>> to.
>>>>>>>
>>>>>>> While I do think that making things foolproof is important, I
>>>>>>> believe it's also in Python's philosophy to *not* make things
>>>>>>> foolproof at the detriment of simplicity for the right use cases.
>>>>>>> But I do understand that the use cases are different and
>>>>>>> contradictory: some would require the state to be persisted across
>>>>>>> reschedule and not retries, mine would require the state to be persisted
>>>>>>> across retries and not reschedule.
>>>>>>>
>>>>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>>>>> the submit and an xcom with the job, then one task that check the progress
>>>>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>>>>> just for that. Plus I'm not sure we could make the first task retry if the
>>>>>>> second task fails...
>>>>>>>
>>>>>>> Thanks again,
>>>>>>>
>>>>>>> Furcy
>>>>>>>
>>>>>>>
>>>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> I think we've discussed several approaches like that and using Xcom
>>>>>>> name (which for many people would mean "let's just extend XCom table for
>>>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>>>> different functionality/logic which we might or might not agree to
>>>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>>>>> table behavior might be problematic.
>>>>>>>
>>>>>>> Not sure if you are aware but we had very similar discussion about
>>>>>>> it recently (without clear conclusions but at least you can see what kind
>>>>>>> of issues/problems different people have with this approach)
>>>>>>>
>>>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>>>>
>>>>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>>>>> - intra-task communication mechanism, indeed. But it can very easily lead
>>>>>>> to people abusing it and bypassing the guarantees (idempotency mainly) that
>>>>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>>>>> after the latest discussion kind of died out, and I have one possible
>>>>>>> solution to the problem.
>>>>>>>
>>>>>>> Let me explain what I think about it (but others can have different
>>>>>>> opinions of course):
>>>>>>>
>>>>>>> So far the discussion was that there are several ways to achieve
>>>>>>> what you want (and it's really about what entity is providing the
>>>>>>> "idempotency" guarantee:
>>>>>>>
>>>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>>>>>> job_id from outside. You'd need to work out the job_id naming that works in
>>>>>>> your case and make sure that when you re-run your task with the same
>>>>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>>>>
>>>>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>>>>> where it can be queried in the way that will work for you). Then the
>>>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>>>> external storage.
>>>>>>>
>>>>>>> 3) Query your service and retrieve the JOB ID from it - but you have
>>>>>>> to have a way to query for the job related to your "dag id  + task
>>>>>>> + execution_date". Then - the idempotency is actually handling by the
>>>>>>> Service you are using.
>>>>>>>
>>>>>>> In the use case, you describe - this is the only thing you need -
>>>>>>> "idempotency source". I believe you would like to get the case 2) from
>>>>>>> above but without having to use external storage to store the "unique id".
>>>>>>> Something that will let each task in the same dag run to set or retrieve a
>>>>>>> unique value for that particular task. One value should be enough -
>>>>>>> assuming that each operator/task works on one external data "source".
>>>>>>>
>>>>>>> My current thinking is:
>>>>>>>
>>>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>>>>> class with two methods:
>>>>>>>
>>>>>>>   * .set(id: str) and
>>>>>>>   * .get() -> str
>>>>>>>
>>>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>>>> task_id, execution_date)" - available also via Jinja templating. There is
>>>>>>> no intra-task communication, here, very little possibility of abuse and it
>>>>>>> seems to solve the major pain point where you have to provide your own
>>>>>>> storage to get the idempotency if your service does not provide one or you
>>>>>>> do not want to delegate it to the DAG writer.
>>>>>>>
>>>>>>> J.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> The use case I'm referring to is that you can't use xcom to let a
>>>>>>> task read information from it's past attempts, because when a task starts
>>>>>>> it's xcom is automatically deleted.
>>>>>>>
>>>>>>> My specific use case is that we have a custom LivyOperator that
>>>>>>> calls Livy to start batch Spark Jobs.
>>>>>>> When you start a batch job Livy returns a job_id
>>>>>>> Sometimes our operator can fail for one reason or another (for
>>>>>>> instance if Livy is unreachable for a while)
>>>>>>> When the task retries, it calls Livy again, which start the same
>>>>>>> spark job, but the problem is that the spark job from the first attempt can
>>>>>>> still be running,
>>>>>>> and then we have a batch job that runs twice simultaneously and
>>>>>>> creates duplicates in the output.
>>>>>>>
>>>>>>> What we tried to do is getting the job_id from the first try, to
>>>>>>> check if the job is still running, and wait for it to complete if it is.
>>>>>>>
>>>>>>> We tried using xcom to let the task send a message to itself (to
>>>>>>> it's next try) but xcom is meant for "inter-task communication" only so
>>>>>>> this doesn't work and is not intended to work.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Furcy,
>>>>>>>
>>>>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>>>>> Depending your use case this may already be possible.
>>>>>>>
>>>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I would like to open a feature request for Airflow to support
>>>>>>> "intra-task xcom".
>>>>>>>
>>>>>>> It seems that there are several distinct use cases for it already
>>>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>>>>
>>>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>>>> and the recommended approach (which apparently would be to create
>>>>>>> a distinct feature from xcom to support this, it could be calle
>>>>>>> intra-com or self-com ?)
>>>>>>>
>>>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>>>>> email).
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Furcy
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Jarek Potiuk
>>>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>>>
>>>>>>> M: +48 660 796 129 <+48660796129>
>>>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>>
>>>> Jarek Potiuk
>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>
>>>> M: +48 660 796 129 <+48660796129>
>>>> [image: Polidea] <https://www.polidea.com/>
>>>>
>>>>
>>
>> --
>>
>> Jarek Potiuk
>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>
>> M: +48 660 796 129 <+48660796129>
>> [image: Polidea] <https://www.polidea.com/>
>>
>>

-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Re: Intra-task "xcom"

Posted by Furcy Pin <pi...@gmail.com>.
Chris,

I think that the main downside of asking users to use another external
storage like S3 or whatever is that it doesn't make Operator easily
reusable across the community.
If I open-source a LivyOperator that handles my problem by storing stuff in
Azure Blob Storage, someone on AWS won't be able to use it as-is and will
have to customize it,
or make a pull request to support S3. Then someone else will make a pull
request to support GS, etc. It is doable but I believe it would add
avoidable complexity on
the Operator's side.

This is the main advantage I see with providing such functionality. I think
that MySQL and Postgres handle key-value use-cases just fine as long as you
create
an index for the key in the table that will store them. Of course it won't
handle millions of operations per second, but if you are using Airflow for
that you probably
are doing something wrong.

I agree that in my mind Variables were meant to hold configuration and
secrets that can be updated manually by the users for certain use cases,
but not holding task internal states.
I don't really mind and will comply if the consensus is to use Variables
for that. The fact that Variables can be made read-only it a good point for
separating the XState use-case from it, though.

One of the key questions if a StateService is implemented would be: should
the state be removed on retry or on reschedule or not at all?
Or maybe it's the Operator's job to know what to do depending on whether it
is retrying or rescheduling, but I'm not sure Operators have a way to know
that.

Regards,

Furcy


On Tue, 2 Jun 2020 at 11:03, Jarek Potiuk <Ja...@polidea.com> wrote:

> I think no-one wants to remove xcom really :)
>
> On Tue, Jun 2, 2020 at 10:32 AM Michael Lutz <mi...@gmail.com>
> wrote:
>
>> please do not remove existing xcom functionality.   I am using it
>> extensively.   If you implement something more robust or elegant, that
>> would be fine.  I feel that a more robust state management system would be
>> helpful it feels like an area of improvement.
>>
>> On Tue, Jun 2, 2020, 3:05 AM Jarek Potiuk <Ja...@polidea.com>
>> wrote:
>>
>>> I think this subject came so often, that I also change my mind slowly in
>>> favor of making an explicit state persistence "service".
>>>
>>> Whether it's only one key or more, it's secondary, I think but if users
>>> are already using Variables to keep state for tasks - this is a clear sign
>>> that we miss a crucial feature and our users are abusing Airflow already in
>>> the way we try to prevent by not introducing "State service".
>>>
>>> With the recent SecretBackend implementation where Variables might be
>>> kept in a Secret backend - not only MetaStore - potentially you might have
>>> no write access to the backend. There is even no "write" support in the
>>> current "MetastoreBackend" implementation for writing variables. So we
>>> already have configurations where if we try to write variables and read it
>>> elsewhere might not work - as far as I can see. You can set several
>>> backends of course and the Metastore as the last fallback of course, but
>>> for me, it opens up different problems - what happens if the key is present
>>> in both, tasks writes it to metastore, but another task reads it from the
>>> Secret Backend.
>>>
>>> I think it seems that variables are being abused in exactly the way we
>>> want to prevent the "StateService" to be abused - and shying away from that
>>> is really like closing our eyes and pretending it's not happening.
>>>
>>> So maybe we can make a change AIP with this approach:
>>>
>>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>>> configuration shared between workers (but not on a task level).
>>> 2) StateService (or wherever we call it) where we keep state information
>>> for specific dag + task + execution_date.
>>>
>>> J.
>>>
>>>
>>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dp...@gmail.com>
>>> wrote:
>>>
>>>> Airflow already provides a mechanism for state persistence: the
>>>> Variable, and, with caveats and flaws, XCom.
>>>>
>>>> I personally persist state to the airflow metastore database for a
>>>> large percentage of our jobs.  They are incremental jobs and it is helpful
>>>> to keep track of watermark.
>>>>
>>>> I think that incremental jobs are probably very very common in airflow
>>>> implementations.  Though probably often times users resort to imperfect
>>>> vehicles for this such as `execution_date` or xcom.
>>>>
>>>> I have a very draftey draft aip that i haven't had enough time to work
>>>> on, which explores adding explicit support for state persistence to
>>>> airflow:
>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>>> ready for primetime.)
>>>>
>>>> I am of the mind that being able to persist some state is not a
>>>> fundamental change to airflow and would just add explicit (and more
>>>> user-friendly) support for something that is already quite common, and fits
>>>> fully within the wheelhouse of what airflow exists to do.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com> wrote:
>>>>
>>>>> Furcy,
>>>>>
>>>>> To clarify, when I say that Airflow should not be in the business of
>>>>> keeping state about external systems, I specifically mean it shouldn't be
>>>>> keeping state to be shared between task instances. I completely understand
>>>>> that there may be external systems that are harder to work with, and like
>>>>> in your case require the operator to be able to store some piece of
>>>>> information to make them idempotent. I just don't think that Airflow should
>>>>> provide that storage mechanism.
>>>>>
>>>>> I would think that most users of Airflow have access to some sort of
>>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>>> easy enough to write your job_id or whatever value you care about to a file
>>>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>>>> combination of them you care about. Yes it makes your operators more
>>>>> complex and they have to know about another system, but it keeps that
>>>>> complexity out of core Airflow. That's the trade off.
>>>>>
>>>>> Ash,
>>>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>>>> there are use cases where it makes some things convenient. In your example
>>>>> though, it would be just as easy for the sensor to write the found object
>>>>> path as the contents of another file in S3, with a computable prefix based
>>>>> on the dag/task/execution_date.
>>>>>
>>>>>
>>>>> At its heart XCom is just a key-value store where the keys are limited
>>>>> to a very specific set of possibilities, and where key-value pairs are
>>>>> managed in some specific ways. The request here is to add another narrowly
>>>>> defined set of allowable keys, and as far as I can tell with no extra
>>>>> management of them. The only real advantage of using the Airflow database
>>>>> for XCom or any expansion/variation on it is that we know that all
>>>>> operators have access to the database.
>>>>>
>>>>> I'm not an expert but I would wonder how well Postgres or MySQL
>>>>> perform as high volume key value stores. Does anyone actually use XCom at
>>>>> scale, and does that extra load on the database impact scheduling and other
>>>>> performance aspects of Airflow?
>>>>>
>>>>> Chris
>>>>>
>>>>>
>>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>>> they, or something very like them are in Airflow for the foreseeable future.
>>>>>>
>>>>>> As an example of an appropriate use for XCom: Let's say a third
>>>>>> party delivers you a set of files once a week, but the exact name of the
>>>>>> files isn't known (by you) in advance. So you write a sensor that
>>>>>> polls/checks S3 for the Objects to appear in our bucket, and the sensor
>>>>>> outputs the S3 Object path to XCom, that then next processing step then
>>>>>> examines to process the files.
>>>>>>
>>>>>> That sort of use case is not going anywhere.
>>>>>>
>>>>>> Cheers,
>>>>>> -ash
>>>>>>
>>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>>>>>
>>>>>> At the risk of repeating myself (from the previous thread that
>>>>>> touched on this topic), I don't think Airflow should be in the business of
>>>>>> keeping state about external systems. Airflow is about authoring and
>>>>>> running workflows; it's not a messaging tool or a cluster management tool.
>>>>>> I'm not convinced that the existing XCom functionality should really be a
>>>>>> part of Airflow, and I certainly don't think it should be expanded upon or
>>>>>> new variations added. I think storing state is especially risky, if for no
>>>>>> other reason than the fact that Airflow is not the source of truth about
>>>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>>>> has saved will diverge from the actual state of the external system.
>>>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>>>> operator code complexity. Users would be much better served going to the
>>>>>> source of truth to determine state. If part of the problem is that Livy is
>>>>>> lacking in features (like being able to query the status of a particular
>>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>>>> making up for failures of other tools.
>>>>>>
>>>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>>>> extra features from expanding in scope. Jarek proposed something that would
>>>>>> just store a single string, and immediately Furcy wants to expand it to
>>>>>> store multiple strings. Either way we are really just talking about a
>>>>>> key-value store, and putting limits on how that key can be structured; the
>>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>>>> proposal).
>>>>>>
>>>>>> I know in the past that I had a situation where I wanted to reuse a
>>>>>> cluster across multiple data intervals, if one was already running (this
>>>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>>>> can equally see use cases where I might want to share some resource for
>>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>>>>> added this then why limit it to any one of those combinations? But then we
>>>>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>>>>> then you can use Variables, if you want to use something else then you can.
>>>>>>
>>>>>> Unless Airflow is doing some extra management of these key-values in
>>>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>>>> absolutely no added benefit. And even with some potential management by
>>>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>>>
>>>>>> Thank you Jarek for the detailed explanation,
>>>>>>
>>>>>> That's exactly what I wanted to do: write a feature request to
>>>>>> summarize all those discussions.
>>>>>> I agree with you that the feature should be marked distinct from the
>>>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>>>
>>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>>> task to delete it's xcom on the beginning of the retry.
>>>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>>>> having a task A and a task B that starts immediately after A, and wait from
>>>>>> some 'signal' from A.
>>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>>> the signal from A's first try, which is incorrect.
>>>>>>
>>>>>> About the 3 solutions you mention:
>>>>>>
>>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>>>>> you can't specify a custom one.
>>>>>> You can't even find a job by name, I would have to list all the
>>>>>> active job_ids, and do a GET for each of them to get it's name and find
>>>>>> which one is the one I want. It's doable but inelegant.
>>>>>>
>>>>>> 2) Store the id in an external storage. Of course it would work but
>>>>>> it requires an external storage. More on that below.
>>>>>>
>>>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>>>> think you mean that the idempotency can be handled by the service you call
>>>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>>>> could have something to prevent a job that run twice from creating
>>>>>> duplicates. This is another solution we are considering, but it is
>>>>>> coslty to change now.
>>>>>>
>>>>>> You guess correctly that the feature I was asking for me would be to
>>>>>> provide some utility to let the users implement solution 2) without
>>>>>> requiring an external storage.
>>>>>> I think it would be a QOL improvement for some use cases, just like
>>>>>> it could be argued that XCom is just a QOL improvement and users could have
>>>>>> used an external storage themselves.
>>>>>> The main advantage that it brings is making the custom operators much
>>>>>> easier to share and reuse across the Apache Airflow community, compared to
>>>>>> having to set up some external
>>>>>> storage.
>>>>>>
>>>>>> I have seen that some users used the metadata store itself as an
>>>>>> external storage by adding a new table to the airflow model:
>>>>>>
>>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>>>>>
>>>>>> And others suggested using XCom itself as an external storage by
>>>>>> storing information with a special task_id:
>>>>>> https://stackoverflow.com/a/57515143/2087478
>>>>>>
>>>>>> In the discussion thread you provided it was also suggested to use
>>>>>> Variables to store some persisting information.
>>>>>>
>>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>>> providing such functionality would be good.
>>>>>>
>>>>>> Finally, I don't see the point of limiting the functionality to such
>>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>>>>> string
>>>>>> will just force people who need to store more than one id for one
>>>>>> task (for whatever reason) to use some hack again, like storing a json
>>>>>> inside the storage.
>>>>>>
>>>>>> I was more thinking about something quite similar to XCom (I liked
>>>>>> the XState name suggestion), where the entry would be keyed by "(dag_id,
>>>>>> task_id, execution_date, key)"
>>>>>> where "key" can be whatever you want and would be kept across retries.
>>>>>>
>>>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>>>> I do understand the important of idempotency, and it looks like my
>>>>>> use case is one of the first ever listed where I do need to persist a state
>>>>>> across retries to make my operator really idempotent.
>>>>>>
>>>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>>>> another solution).
>>>>>>
>>>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>>>> mechanism to prevent double-writes,
>>>>>> but I think that as the above hacks show, you can't really prevent
>>>>>> users from shooting themselves in the foot if that's what they really want
>>>>>> to.
>>>>>>
>>>>>> While I do think that making things foolproof is important, I believe
>>>>>> it's also in Python's philosophy to *not* make things foolproof at
>>>>>> the detriment of simplicity for the right use cases.
>>>>>> But I do understand that the use cases are different and
>>>>>> contradictory: some would require the state to be persisted across
>>>>>> reschedule and not retries, mine would require the state to be persisted
>>>>>> across retries and not reschedule.
>>>>>>
>>>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>>>> the submit and an xcom with the job, then one task that check the progress
>>>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>>>> just for that. Plus I'm not sure we could make the first task retry if the
>>>>>> second task fails...
>>>>>>
>>>>>> Thanks again,
>>>>>>
>>>>>> Furcy
>>>>>>
>>>>>>
>>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>>>>>> wrote:
>>>>>>
>>>>>> I think we've discussed several approaches like that and using Xcom
>>>>>> name (which for many people would mean "let's just extend XCom table for
>>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>>> different functionality/logic which we might or might not agree to
>>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>>>> table behavior might be problematic.
>>>>>>
>>>>>> Not sure if you are aware but we had very similar discussion about it
>>>>>> recently (without clear conclusions but at least you can see what kind of
>>>>>> issues/problems different people have with this approach)
>>>>>>
>>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>>>
>>>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>>>> - intra-task communication mechanism, indeed. But it can very easily lead
>>>>>> to people abusing it and bypassing the guarantees (idempotency mainly) that
>>>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>>>> after the latest discussion kind of died out, and I have one possible
>>>>>> solution to the problem.
>>>>>>
>>>>>> Let me explain what I think about it (but others can have different
>>>>>> opinions of course):
>>>>>>
>>>>>> So far the discussion was that there are several ways to achieve what
>>>>>> you want (and it's really about what entity is providing the "idempotency"
>>>>>> guarantee:
>>>>>>
>>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>>>>> job_id from outside. You'd need to work out the job_id naming that works in
>>>>>> your case and make sure that when you re-run your task with the same
>>>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>>>
>>>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>>>> where it can be queried in the way that will work for you). Then the
>>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>>> external storage.
>>>>>>
>>>>>> 3) Query your service and retrieve the JOB ID from it - but you have
>>>>>> to have a way to query for the job related to your "dag id  + task
>>>>>> + execution_date". Then - the idempotency is actually handling by the
>>>>>> Service you are using.
>>>>>>
>>>>>> In the use case, you describe - this is the only thing you need -
>>>>>> "idempotency source". I believe you would like to get the case 2) from
>>>>>> above but without having to use external storage to store the "unique id".
>>>>>> Something that will let each task in the same dag run to set or retrieve a
>>>>>> unique value for that particular task. One value should be enough -
>>>>>> assuming that each operator/task works on one external data "source".
>>>>>>
>>>>>> My current thinking is:
>>>>>>
>>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>>>> class with two methods:
>>>>>>
>>>>>>   * .set(id: str) and
>>>>>>   * .get() -> str
>>>>>>
>>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>>> task_id, execution_date)" - available also via Jinja templating. There is
>>>>>> no intra-task communication, here, very little possibility of abuse and it
>>>>>> seems to solve the major pain point where you have to provide your own
>>>>>> storage to get the idempotency if your service does not provide one or you
>>>>>> do not want to delegate it to the DAG writer.
>>>>>>
>>>>>> J.
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>>>
>>>>>> The use case I'm referring to is that you can't use xcom to let a
>>>>>> task read information from it's past attempts, because when a task starts
>>>>>> it's xcom is automatically deleted.
>>>>>>
>>>>>> My specific use case is that we have a custom LivyOperator that calls
>>>>>> Livy to start batch Spark Jobs.
>>>>>> When you start a batch job Livy returns a job_id
>>>>>> Sometimes our operator can fail for one reason or another (for
>>>>>> instance if Livy is unreachable for a while)
>>>>>> When the task retries, it calls Livy again, which start the same
>>>>>> spark job, but the problem is that the spark job from the first attempt can
>>>>>> still be running,
>>>>>> and then we have a batch job that runs twice simultaneously and
>>>>>> creates duplicates in the output.
>>>>>>
>>>>>> What we tried to do is getting the job_id from the first try, to
>>>>>> check if the job is still running, and wait for it to complete if it is.
>>>>>>
>>>>>> We tried using xcom to let the task send a message to itself (to it's
>>>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>>>> doesn't work and is not intended to work.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Furcy,
>>>>>>
>>>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>>>> Depending your use case this may already be possible.
>>>>>>
>>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I would like to open a feature request for Airflow to support
>>>>>> "intra-task xcom".
>>>>>>
>>>>>> It seems that there are several distinct use cases for it already
>>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>>>
>>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>>> and the recommended approach (which apparently would be to create
>>>>>> a distinct feature from xcom to support this, it could be calle
>>>>>> intra-com or self-com ?)
>>>>>>
>>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>>>> email).
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Furcy
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Jarek Potiuk
>>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>>
>>>>>> M: +48 660 796 129 <+48660796129>
>>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>>
>>>>>>
>>>
>>> --
>>>
>>> Jarek Potiuk
>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>
>>> M: +48 660 796 129 <+48660796129>
>>> [image: Polidea] <https://www.polidea.com/>
>>>
>>>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>

Re: Intra-task "xcom"

Posted by Jarek Potiuk <Ja...@polidea.com>.
I think no-one wants to remove xcom really :)

On Tue, Jun 2, 2020 at 10:32 AM Michael Lutz <mi...@gmail.com>
wrote:

> please do not remove existing xcom functionality.   I am using it
> extensively.   If you implement something more robust or elegant, that
> would be fine.  I feel that a more robust state management system would be
> helpful it feels like an area of improvement.
>
> On Tue, Jun 2, 2020, 3:05 AM Jarek Potiuk <Ja...@polidea.com>
> wrote:
>
>> I think this subject came so often, that I also change my mind slowly in
>> favor of making an explicit state persistence "service".
>>
>> Whether it's only one key or more, it's secondary, I think but if users
>> are already using Variables to keep state for tasks - this is a clear sign
>> that we miss a crucial feature and our users are abusing Airflow already in
>> the way we try to prevent by not introducing "State service".
>>
>> With the recent SecretBackend implementation where Variables might be
>> kept in a Secret backend - not only MetaStore - potentially you might have
>> no write access to the backend. There is even no "write" support in the
>> current "MetastoreBackend" implementation for writing variables. So we
>> already have configurations where if we try to write variables and read it
>> elsewhere might not work - as far as I can see. You can set several
>> backends of course and the Metastore as the last fallback of course, but
>> for me, it opens up different problems - what happens if the key is present
>> in both, tasks writes it to metastore, but another task reads it from the
>> Secret Backend.
>>
>> I think it seems that variables are being abused in exactly the way we
>> want to prevent the "StateService" to be abused - and shying away from that
>> is really like closing our eyes and pretending it's not happening.
>>
>> So maybe we can make a change AIP with this approach:
>>
>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>> configuration shared between workers (but not on a task level).
>> 2) StateService (or wherever we call it) where we keep state information
>> for specific dag + task + execution_date.
>>
>> J.
>>
>>
>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dp...@gmail.com>
>> wrote:
>>
>>> Airflow already provides a mechanism for state persistence: the
>>> Variable, and, with caveats and flaws, XCom.
>>>
>>> I personally persist state to the airflow metastore database for a large
>>> percentage of our jobs.  They are incremental jobs and it is helpful to
>>> keep track of watermark.
>>>
>>> I think that incremental jobs are probably very very common in airflow
>>> implementations.  Though probably often times users resort to imperfect
>>> vehicles for this such as `execution_date` or xcom.
>>>
>>> I have a very draftey draft aip that i haven't had enough time to work
>>> on, which explores adding explicit support for state persistence to
>>> airflow:
>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>> ready for primetime.)
>>>
>>> I am of the mind that being able to persist some state is not a
>>> fundamental change to airflow and would just add explicit (and more
>>> user-friendly) support for something that is already quite common, and fits
>>> fully within the wheelhouse of what airflow exists to do.
>>>
>>>
>>>
>>>
>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com> wrote:
>>>
>>>> Furcy,
>>>>
>>>> To clarify, when I say that Airflow should not be in the business of
>>>> keeping state about external systems, I specifically mean it shouldn't be
>>>> keeping state to be shared between task instances. I completely understand
>>>> that there may be external systems that are harder to work with, and like
>>>> in your case require the operator to be able to store some piece of
>>>> information to make them idempotent. I just don't think that Airflow should
>>>> provide that storage mechanism.
>>>>
>>>> I would think that most users of Airflow have access to some sort of
>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>> easy enough to write your job_id or whatever value you care about to a file
>>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>>> combination of them you care about. Yes it makes your operators more
>>>> complex and they have to know about another system, but it keeps that
>>>> complexity out of core Airflow. That's the trade off.
>>>>
>>>> Ash,
>>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>>> there are use cases where it makes some things convenient. In your example
>>>> though, it would be just as easy for the sensor to write the found object
>>>> path as the contents of another file in S3, with a computable prefix based
>>>> on the dag/task/execution_date.
>>>>
>>>>
>>>> At its heart XCom is just a key-value store where the keys are limited
>>>> to a very specific set of possibilities, and where key-value pairs are
>>>> managed in some specific ways. The request here is to add another narrowly
>>>> defined set of allowable keys, and as far as I can tell with no extra
>>>> management of them. The only real advantage of using the Airflow database
>>>> for XCom or any expansion/variation on it is that we know that all
>>>> operators have access to the database.
>>>>
>>>> I'm not an expert but I would wonder how well Postgres or MySQL perform
>>>> as high volume key value stores. Does anyone actually use XCom at scale,
>>>> and does that extra load on the database impact scheduling and other
>>>> performance aspects of Airflow?
>>>>
>>>> Chris
>>>>
>>>>
>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org>
>>>> wrote:
>>>>
>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>> they, or something very like them are in Airflow for the foreseeable future.
>>>>>
>>>>> As an example of an appropriate use for XCom: Let's say a third party
>>>>> delivers you a set of files once a week, but the exact name of the files
>>>>> isn't known (by you) in advance. So you write a sensor that polls/checks S3
>>>>> for the Objects to appear in our bucket, and the sensor outputs the S3
>>>>> Object path to XCom, that then next processing step then examines to
>>>>> process the files.
>>>>>
>>>>> That sort of use case is not going anywhere.
>>>>>
>>>>> Cheers,
>>>>> -ash
>>>>>
>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>>>>
>>>>> At the risk of repeating myself (from the previous thread that touched
>>>>> on this topic), I don't think Airflow should be in the business of keeping
>>>>> state about external systems. Airflow is about authoring and running
>>>>> workflows; it's not a messaging tool or a cluster management tool. I'm not
>>>>> convinced that the existing XCom functionality should really be a part of
>>>>> Airflow, and I certainly don't think it should be expanded upon or new
>>>>> variations added. I think storing state is especially risky, if for no
>>>>> other reason than the fact that Airflow is not the source of truth about
>>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>>> has saved will diverge from the actual state of the external system.
>>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>>> operator code complexity. Users would be much better served going to the
>>>>> source of truth to determine state. If part of the problem is that Livy is
>>>>> lacking in features (like being able to query the status of a particular
>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>>> making up for failures of other tools.
>>>>>
>>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>>> extra features from expanding in scope. Jarek proposed something that would
>>>>> just store a single string, and immediately Furcy wants to expand it to
>>>>> store multiple strings. Either way we are really just talking about a
>>>>> key-value store, and putting limits on how that key can be structured; the
>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>>> proposal).
>>>>>
>>>>> I know in the past that I had a situation where I wanted to reuse a
>>>>> cluster across multiple data intervals, if one was already running (this
>>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>>> can equally see use cases where I might want to share some resource for
>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>>>> added this then why limit it to any one of those combinations? But then we
>>>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>>>> then you can use Variables, if you want to use something else then you can.
>>>>>
>>>>> Unless Airflow is doing some extra management of these key-values in
>>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>>> absolutely no added benefit. And even with some potential management by
>>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>>
>>>>> Chris
>>>>>
>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>>
>>>>> Thank you Jarek for the detailed explanation,
>>>>>
>>>>> That's exactly what I wanted to do: write a feature request to
>>>>> summarize all those discussions.
>>>>> I agree with you that the feature should be marked distinct from the
>>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>>
>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>> task to delete it's xcom on the beginning of the retry.
>>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>>> having a task A and a task B that starts immediately after A, and wait from
>>>>> some 'signal' from A.
>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>> the signal from A's first try, which is incorrect.
>>>>>
>>>>> About the 3 solutions you mention:
>>>>>
>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>>>> you can't specify a custom one.
>>>>> You can't even find a job by name, I would have to list all the active
>>>>> job_ids, and do a GET for each of them to get it's name and find which one
>>>>> is the one I want. It's doable but inelegant.
>>>>>
>>>>> 2) Store the id in an external storage. Of course it would work but it
>>>>> requires an external storage. More on that below.
>>>>>
>>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>>> think you mean that the idempotency can be handled by the service you call
>>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>>> could have something to prevent a job that run twice from creating
>>>>> duplicates. This is another solution we are considering, but it is
>>>>> coslty to change now.
>>>>>
>>>>> You guess correctly that the feature I was asking for me would be to
>>>>> provide some utility to let the users implement solution 2) without
>>>>> requiring an external storage.
>>>>> I think it would be a QOL improvement for some use cases, just like it
>>>>> could be argued that XCom is just a QOL improvement and users could have
>>>>> used an external storage themselves.
>>>>> The main advantage that it brings is making the custom operators much
>>>>> easier to share and reuse across the Apache Airflow community, compared to
>>>>> having to set up some external
>>>>> storage.
>>>>>
>>>>> I have seen that some users used the metadata store itself as an
>>>>> external storage by adding a new table to the airflow model:
>>>>>
>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>>>>
>>>>> And others suggested using XCom itself as an external storage by
>>>>> storing information with a special task_id:
>>>>> https://stackoverflow.com/a/57515143/2087478
>>>>>
>>>>> In the discussion thread you provided it was also suggested to use
>>>>> Variables to store some persisting information.
>>>>>
>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>> providing such functionality would be good.
>>>>>
>>>>> Finally, I don't see the point of limiting the functionality to such
>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>>>> string
>>>>> will just force people who need to store more than one id for one task
>>>>> (for whatever reason) to use some hack again, like storing a json inside
>>>>> the storage.
>>>>>
>>>>> I was more thinking about something quite similar to XCom (I liked the
>>>>> XState name suggestion), where the entry would be keyed by "(dag_id,
>>>>> task_id, execution_date, key)"
>>>>> where "key" can be whatever you want and would be kept across retries.
>>>>>
>>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>>> I do understand the important of idempotency, and it looks like my use
>>>>> case is one of the first ever listed where I do need to persist a state
>>>>> across retries to make my operator really idempotent.
>>>>>
>>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>>> another solution).
>>>>>
>>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>>> mechanism to prevent double-writes,
>>>>> but I think that as the above hacks show, you can't really prevent
>>>>> users from shooting themselves in the foot if that's what they really want
>>>>> to.
>>>>>
>>>>> While I do think that making things foolproof is important, I believe
>>>>> it's also in Python's philosophy to *not* make things foolproof at
>>>>> the detriment of simplicity for the right use cases.
>>>>> But I do understand that the use cases are different and
>>>>> contradictory: some would require the state to be persisted across
>>>>> reschedule and not retries, mine would require the state to be persisted
>>>>> across retries and not reschedule.
>>>>>
>>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>>> the submit and an xcom with the job, then one task that check the progress
>>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>>> just for that. Plus I'm not sure we could make the first task retry if the
>>>>> second task fails...
>>>>>
>>>>> Thanks again,
>>>>>
>>>>> Furcy
>>>>>
>>>>>
>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>>>>> wrote:
>>>>>
>>>>> I think we've discussed several approaches like that and using Xcom
>>>>> name (which for many people would mean "let's just extend XCom table for
>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>> different functionality/logic which we might or might not agree to
>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>>> table behavior might be problematic.
>>>>>
>>>>> Not sure if you are aware but we had very similar discussion about it
>>>>> recently (without clear conclusions but at least you can see what kind of
>>>>> issues/problems different people have with this approach)
>>>>>
>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>>
>>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>>> - intra-task communication mechanism, indeed. But it can very easily lead
>>>>> to people abusing it and bypassing the guarantees (idempotency mainly) that
>>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>>> after the latest discussion kind of died out, and I have one possible
>>>>> solution to the problem.
>>>>>
>>>>> Let me explain what I think about it (but others can have different
>>>>> opinions of course):
>>>>>
>>>>> So far the discussion was that there are several ways to achieve what
>>>>> you want (and it's really about what entity is providing the "idempotency"
>>>>> guarantee:
>>>>>
>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>>>> job_id from outside. You'd need to work out the job_id naming that works in
>>>>> your case and make sure that when you re-run your task with the same
>>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>>
>>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>>> where it can be queried in the way that will work for you). Then the
>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>> external storage.
>>>>>
>>>>> 3) Query your service and retrieve the JOB ID from it - but you have
>>>>> to have a way to query for the job related to your "dag id  + task
>>>>> + execution_date". Then - the idempotency is actually handling by the
>>>>> Service you are using.
>>>>>
>>>>> In the use case, you describe - this is the only thing you need -
>>>>> "idempotency source". I believe you would like to get the case 2) from
>>>>> above but without having to use external storage to store the "unique id".
>>>>> Something that will let each task in the same dag run to set or retrieve a
>>>>> unique value for that particular task. One value should be enough -
>>>>> assuming that each operator/task works on one external data "source".
>>>>>
>>>>> My current thinking is:
>>>>>
>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>>> class with two methods:
>>>>>
>>>>>   * .set(id: str) and
>>>>>   * .get() -> str
>>>>>
>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>> task_id, execution_date)" - available also via Jinja templating. There is
>>>>> no intra-task communication, here, very little possibility of abuse and it
>>>>> seems to solve the major pain point where you have to provide your own
>>>>> storage to get the idempotency if your service does not provide one or you
>>>>> do not want to delegate it to the DAG writer.
>>>>>
>>>>> J.
>>>>>
>>>>>
>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>>
>>>>> The use case I'm referring to is that you can't use xcom to let a task
>>>>> read information from it's past attempts, because when a task starts it's
>>>>> xcom is automatically deleted.
>>>>>
>>>>> My specific use case is that we have a custom LivyOperator that calls
>>>>> Livy to start batch Spark Jobs.
>>>>> When you start a batch job Livy returns a job_id
>>>>> Sometimes our operator can fail for one reason or another (for
>>>>> instance if Livy is unreachable for a while)
>>>>> When the task retries, it calls Livy again, which start the same spark
>>>>> job, but the problem is that the spark job from the first attempt can still
>>>>> be running,
>>>>> and then we have a batch job that runs twice simultaneously and
>>>>> creates duplicates in the output.
>>>>>
>>>>> What we tried to do is getting the job_id from the first try, to check
>>>>> if the job is still running, and wait for it to complete if it is.
>>>>>
>>>>> We tried using xcom to let the task send a message to itself (to it's
>>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>>> doesn't work and is not intended to work.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>>>>
>>>>> Hi Furcy,
>>>>>
>>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>>> Depending your use case this may already be possible.
>>>>>
>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I would like to open a feature request for Airflow to support
>>>>> "intra-task xcom".
>>>>>
>>>>> It seems that there are several distinct use cases for it already
>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>>
>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>> and the recommended approach (which apparently would be to create
>>>>> a distinct feature from xcom to support this, it could be calle
>>>>> intra-com or self-com ?)
>>>>>
>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>>> email).
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Furcy
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Jarek Potiuk
>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>
>>>>> M: +48 660 796 129 <+48660796129>
>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>
>>>>>
>>
>> --
>>
>> Jarek Potiuk
>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>
>> M: +48 660 796 129 <+48660796129>
>> [image: Polidea] <https://www.polidea.com/>
>>
>>

-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Re: Intra-task "xcom"

Posted by Michael Lutz <mi...@gmail.com>.
please do not remove existing xcom functionality.   I am using it
extensively.   If you implement something more robust or elegant, that
would be fine.  I feel that a more robust state management system would be
helpful it feels like an area of improvement.

On Tue, Jun 2, 2020, 3:05 AM Jarek Potiuk <Ja...@polidea.com> wrote:

> I think this subject came so often, that I also change my mind slowly in
> favor of making an explicit state persistence "service".
>
> Whether it's only one key or more, it's secondary, I think but if users
> are already using Variables to keep state for tasks - this is a clear sign
> that we miss a crucial feature and our users are abusing Airflow already in
> the way we try to prevent by not introducing "State service".
>
> With the recent SecretBackend implementation where Variables might be kept
> in a Secret backend - not only MetaStore - potentially you might have no
> write access to the backend. There is even no "write" support in the
> current "MetastoreBackend" implementation for writing variables. So we
> already have configurations where if we try to write variables and read it
> elsewhere might not work - as far as I can see. You can set several
> backends of course and the Metastore as the last fallback of course, but
> for me, it opens up different problems - what happens if the key is present
> in both, tasks writes it to metastore, but another task reads it from the
> Secret Backend.
>
> I think it seems that variables are being abused in exactly the way we
> want to prevent the "StateService" to be abused - and shying away from that
> is really like closing our eyes and pretending it's not happening.
>
> So maybe we can make a change AIP with this approach:
>
> 1) Variables -> mostly read-only (for tasks)  and used to keep
> configuration shared between workers (but not on a task level).
> 2) StateService (or wherever we call it) where we keep state information
> for specific dag + task + execution_date.
>
> J.
>
>
> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dp...@gmail.com>
> wrote:
>
>> Airflow already provides a mechanism for state persistence: the Variable,
>> and, with caveats and flaws, XCom.
>>
>> I personally persist state to the airflow metastore database for a large
>> percentage of our jobs.  They are incremental jobs and it is helpful to
>> keep track of watermark.
>>
>> I think that incremental jobs are probably very very common in airflow
>> implementations.  Though probably often times users resort to imperfect
>> vehicles for this such as `execution_date` or xcom.
>>
>> I have a very draftey draft aip that i haven't had enough time to work
>> on, which explores adding explicit support for state persistence to
>> airflow:
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>> Though I understand it is a controversial idea.  (note: The AIP is not
>> ready for primetime.)
>>
>> I am of the mind that being able to persist some state is not a
>> fundamental change to airflow and would just add explicit (and more
>> user-friendly) support for something that is already quite common, and fits
>> fully within the wheelhouse of what airflow exists to do.
>>
>>
>>
>>
>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com> wrote:
>>
>>> Furcy,
>>>
>>> To clarify, when I say that Airflow should not be in the business of
>>> keeping state about external systems, I specifically mean it shouldn't be
>>> keeping state to be shared between task instances. I completely understand
>>> that there may be external systems that are harder to work with, and like
>>> in your case require the operator to be able to store some piece of
>>> information to make them idempotent. I just don't think that Airflow should
>>> provide that storage mechanism.
>>>
>>> I would think that most users of Airflow have access to some sort of
>>> cloud storage like S3 (which are really just key-value stores), and it's
>>> easy enough to write your job_id or whatever value you care about to a file
>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>> combination of them you care about. Yes it makes your operators more
>>> complex and they have to know about another system, but it keeps that
>>> complexity out of core Airflow. That's the trade off.
>>>
>>> Ash,
>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>> there are use cases where it makes some things convenient. In your example
>>> though, it would be just as easy for the sensor to write the found object
>>> path as the contents of another file in S3, with a computable prefix based
>>> on the dag/task/execution_date.
>>>
>>>
>>> At its heart XCom is just a key-value store where the keys are limited
>>> to a very specific set of possibilities, and where key-value pairs are
>>> managed in some specific ways. The request here is to add another narrowly
>>> defined set of allowable keys, and as far as I can tell with no extra
>>> management of them. The only real advantage of using the Airflow database
>>> for XCom or any expansion/variation on it is that we know that all
>>> operators have access to the database.
>>>
>>> I'm not an expert but I would wonder how well Postgres or MySQL perform
>>> as high volume key value stores. Does anyone actually use XCom at scale,
>>> and does that extra load on the database impact scheduling and other
>>> performance aspects of Airflow?
>>>
>>> Chris
>>>
>>>
>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org> wrote:
>>>
>>>> Just to touch on one point about XCom, and to re-assure people that
>>>> they, or something very like them are in Airflow for the foreseeable future.
>>>>
>>>> As an example of an appropriate use for XCom: Let's say a third party
>>>> delivers you a set of files once a week, but the exact name of the files
>>>> isn't known (by you) in advance. So you write a sensor that polls/checks S3
>>>> for the Objects to appear in our bucket, and the sensor outputs the S3
>>>> Object path to XCom, that then next processing step then examines to
>>>> process the files.
>>>>
>>>> That sort of use case is not going anywhere.
>>>>
>>>> Cheers,
>>>> -ash
>>>>
>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>>>
>>>> At the risk of repeating myself (from the previous thread that touched
>>>> on this topic), I don't think Airflow should be in the business of keeping
>>>> state about external systems. Airflow is about authoring and running
>>>> workflows; it's not a messaging tool or a cluster management tool. I'm not
>>>> convinced that the existing XCom functionality should really be a part of
>>>> Airflow, and I certainly don't think it should be expanded upon or new
>>>> variations added. I think storing state is especially risky, if for no
>>>> other reason than the fact that Airflow is not the source of truth about
>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>> has saved will diverge from the actual state of the external system.
>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>> operator code complexity. Users would be much better served going to the
>>>> source of truth to determine state. If part of the problem is that Livy is
>>>> lacking in features (like being able to query the status of a particular
>>>> job_id) then I think it would be more appropriate to add the needed
>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>> making up for failures of other tools.
>>>>
>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>> extra features from expanding in scope. Jarek proposed something that would
>>>> just store a single string, and immediately Furcy wants to expand it to
>>>> store multiple strings. Either way we are really just talking about a
>>>> key-value store, and putting limits on how that key can be structured; the
>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>> proposal).
>>>>
>>>> I know in the past that I had a situation where I wanted to reuse a
>>>> cluster across multiple data intervals, if one was already running (this
>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>> can equally see use cases where I might want to share some resource for
>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>>> added this then why limit it to any one of those combinations? But then we
>>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>>> then you can use Variables, if you want to use something else then you can.
>>>>
>>>> Unless Airflow is doing some extra management of these key-values in
>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>> absolutely no added benefit. And even with some potential management by
>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>
>>>> Chris
>>>>
>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>
>>>> Thank you Jarek for the detailed explanation,
>>>>
>>>> That's exactly what I wanted to do: write a feature request to
>>>> summarize all those discussions.
>>>> I agree with you that the feature should be marked distinct from the
>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>
>>>> The crux of the problem, I think is that with XCom you do want the task
>>>> to delete it's xcom on the beginning of the retry.
>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>> having a task A and a task B that starts immediately after A, and wait from
>>>> some 'signal' from A.
>>>> If A and B restart and A doesn't reset it's signal, then B will use the
>>>> signal from A's first try, which is incorrect.
>>>>
>>>> About the 3 solutions you mention:
>>>>
>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>>> you can't specify a custom one.
>>>> You can't even find a job by name, I would have to list all the active
>>>> job_ids, and do a GET for each of them to get it's name and find which one
>>>> is the one I want. It's doable but inelegant.
>>>>
>>>> 2) Store the id in an external storage. Of course it would work but it
>>>> requires an external storage. More on that below.
>>>>
>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>> think you mean that the idempotency can be handled by the service you call
>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>> could have something to prevent a job that run twice from creating
>>>> duplicates. This is another solution we are considering, but it is
>>>> coslty to change now.
>>>>
>>>> You guess correctly that the feature I was asking for me would be to
>>>> provide some utility to let the users implement solution 2) without
>>>> requiring an external storage.
>>>> I think it would be a QOL improvement for some use cases, just like it
>>>> could be argued that XCom is just a QOL improvement and users could have
>>>> used an external storage themselves.
>>>> The main advantage that it brings is making the custom operators much
>>>> easier to share and reuse across the Apache Airflow community, compared to
>>>> having to set up some external
>>>> storage.
>>>>
>>>> I have seen that some users used the metadata store itself as an
>>>> external storage by adding a new table to the airflow model:
>>>>
>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>>>
>>>> And others suggested using XCom itself as an external storage by
>>>> storing information with a special task_id:
>>>> https://stackoverflow.com/a/57515143/2087478
>>>>
>>>> In the discussion thread you provided it was also suggested to use
>>>> Variables to store some persisting information.
>>>>
>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>> providing such functionality would be good.
>>>>
>>>> Finally, I don't see the point of limiting the functionality to such
>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>>> string
>>>> will just force people who need to store more than one id for one task
>>>> (for whatever reason) to use some hack again, like storing a json inside
>>>> the storage.
>>>>
>>>> I was more thinking about something quite similar to XCom (I liked the
>>>> XState name suggestion), where the entry would be keyed by "(dag_id,
>>>> task_id, execution_date, key)"
>>>> where "key" can be whatever you want and would be kept across retries.
>>>>
>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>> I do understand the important of idempotency, and it looks like my use
>>>> case is one of the first ever listed where I do need to persist a state
>>>> across retries to make my operator really idempotent.
>>>>
>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>> another solution).
>>>>
>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>> mechanism to prevent double-writes,
>>>> but I think that as the above hacks show, you can't really prevent
>>>> users from shooting themselves in the foot if that's what they really want
>>>> to.
>>>>
>>>> While I do think that making things foolproof is important, I believe
>>>> it's also in Python's philosophy to *not* make things foolproof at the
>>>> detriment of simplicity for the right use cases.
>>>> But I do understand that the use cases are different and contradictory:
>>>> some would require the state to be persisted across reschedule and not
>>>> retries, mine would require the state to be persisted across retries and
>>>> not reschedule.
>>>>
>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>> the submit and an xcom with the job, then one task that check the progress
>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>> just for that. Plus I'm not sure we could make the first task retry if the
>>>> second task fails...
>>>>
>>>> Thanks again,
>>>>
>>>> Furcy
>>>>
>>>>
>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>>>> wrote:
>>>>
>>>> I think we've discussed several approaches like that and using Xcom
>>>> name (which for many people would mean "let's just extend XCom table for
>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>> different functionality/logic which we might or might not agree to
>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>> table behavior might be problematic.
>>>>
>>>> Not sure if you are aware but we had very similar discussion about it
>>>> recently (without clear conclusions but at least you can see what kind of
>>>> issues/problems different people have with this approach)
>>>>
>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>
>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>> - intra-task communication mechanism, indeed. But it can very easily lead
>>>> to people abusing it and bypassing the guarantees (idempotency mainly) that
>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>> after the latest discussion kind of died out, and I have one possible
>>>> solution to the problem.
>>>>
>>>> Let me explain what I think about it (but others can have different
>>>> opinions of course):
>>>>
>>>> So far the discussion was that there are several ways to achieve what
>>>> you want (and it's really about what entity is providing the "idempotency"
>>>> guarantee:
>>>>
>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>>> job_id from outside. You'd need to work out the job_id naming that works in
>>>> your case and make sure that when you re-run your task with the same
>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>
>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>> where it can be queried in the way that will work for you). Then the
>>>> idempotency is actually handled by the logic in your Operator + some
>>>> external storage.
>>>>
>>>> 3) Query your service and retrieve the JOB ID from it - but you have to
>>>> have a way to query for the job related to your "dag id  + task
>>>> + execution_date". Then - the idempotency is actually handling by the
>>>> Service you are using.
>>>>
>>>> In the use case, you describe - this is the only thing you need -
>>>> "idempotency source". I believe you would like to get the case 2) from
>>>> above but without having to use external storage to store the "unique id".
>>>> Something that will let each task in the same dag run to set or retrieve a
>>>> unique value for that particular task. One value should be enough -
>>>> assuming that each operator/task works on one external data "source".
>>>>
>>>> My current thinking is:
>>>>
>>>> Why don't we provide such a dedicated, idempotency service inside
>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>> class with two methods:
>>>>
>>>>   * .set(id: str) and
>>>>   * .get() -> str
>>>>
>>>> And the data stored there should be a string keyed by "dag_id, task_id,
>>>> execution_date)" - available also via Jinja templating. There is no
>>>> intra-task communication, here, very little possibility of abuse and it
>>>> seems to solve the major pain point where you have to provide your own
>>>> storage to get the idempotency if your service does not provide one or you
>>>> do not want to delegate it to the DAG writer.
>>>>
>>>> J.
>>>>
>>>>
>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>
>>>> The use case I'm referring to is that you can't use xcom to let a task
>>>> read information from it's past attempts, because when a task starts it's
>>>> xcom is automatically deleted.
>>>>
>>>> My specific use case is that we have a custom LivyOperator that calls
>>>> Livy to start batch Spark Jobs.
>>>> When you start a batch job Livy returns a job_id
>>>> Sometimes our operator can fail for one reason or another (for instance
>>>> if Livy is unreachable for a while)
>>>> When the task retries, it calls Livy again, which start the same spark
>>>> job, but the problem is that the spark job from the first attempt can still
>>>> be running,
>>>> and then we have a batch job that runs twice simultaneously and creates
>>>> duplicates in the output.
>>>>
>>>> What we tried to do is getting the job_id from the first try, to check
>>>> if the job is still running, and wait for it to complete if it is.
>>>>
>>>> We tried using xcom to let the task send a message to itself (to it's
>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>> doesn't work and is not intended to work.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>>>
>>>> Hi Furcy,
>>>>
>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>> Depending your use case this may already be possible.
>>>>
>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I would like to open a feature request for Airflow to support
>>>> "intra-task xcom".
>>>>
>>>> It seems that there are several distinct use cases for it already
>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>
>>>> I wanted to summarize links to the use cases and past attempts,
>>>> and the recommended approach (which apparently would be to create
>>>> a distinct feature from xcom to support this, it could be calle
>>>> intra-com or self-com ?)
>>>>
>>>> Do you know if such ticket already exists? I couldn't find one.
>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>> email).
>>>>
>>>> Thanks,
>>>>
>>>> Furcy
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Jarek Potiuk
>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>
>>>> M: +48 660 796 129 <+48660796129>
>>>> [image: Polidea] <https://www.polidea.com/>
>>>>
>>>>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>

Re: Intra-task "xcom"

Posted by Kaxil Naik <ka...@gmail.com>.
Definitely, let's connect.

On Wed, Jul 22, 2020 at 11:26 PM Daniel Standish <dp...@gmail.com>
wrote:

> We are using state persistence pretty heavily right now with plugin models
> that I have called ProcessState and TaskState
>
> Our implementation might be too idiosyncratic to contribute to airflow,
> but then again it might not.  I would be happy to do a call to demo what we
> are doing to see if there is any interest, and to receive guidance from
> interested parties re what if anything might make sense in airflow.  Kaxil
> do you have any interest in that?
>
> On Thu, Jun 4, 2020 at 4:49 PM Kaxil Naik <ka...@gmail.com> wrote:
>
>> I definitely feel we can support this uses-cases by improving XCom. The
>> concept of XCom was to allow sharing messages & state between tasks.
>>
>> Here is the first line from the docs about Xcom:
>>
>> XComs let tasks exchange messages, allowing more nuanced forms of control
>> and shared state. The name is an abbreviation of “cross-communication”.
>>
>> I read the AIP (
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence
>> ) from @Daniel Standish <dp...@gmail.com> , the "namespacing" of
>> this state would be a good feature,
>> XCom already allows that with "dag_id, task_id" key. *Solution 1* in the
>> AIP would solve the issue without much impact and whilst
>> maintaining backwards-compatiblilty.
>>
>> I am against the idea of using Secrets Backend for storing "State".
>> Storing state for some kind of persistence should be "short-lived" and
>> temporary.
>>
>> The "writers" & "readers" of both (Secrets & State) are different.
>> Generally, Sysadmins / Teamleads are responsible for managing secrets
>> (writing, rotating, auditing) etc whereas for State it is written via
>> Airflow Workers  and would (or should) be short-lived and you don't care by
>> auditing or rotating the value in "State".
>>
>> The only problem that I can see in the current XCom implementation is 1)
>> the use of execution_date and the fact that 2) XCom are cleared at the
>> start.
>>
>> One of the issue we already want to address in Airflow is to remove the
>> hard-requirement of "execution_date" for DagRun and TaskInstance. This
>> would also help in fixing (1) above.
>>
>> (2) can be solved by a flag as mentioned in the AIP.
>>
>> Regards,
>> Kaxil
>>
>>
>> On Tue, Jun 2, 2020 at 8:05 AM Jarek Potiuk <Ja...@polidea.com>
>> wrote:
>>
>>> I think this subject came so often, that I also change my mind slowly in
>>> favor of making an explicit state persistence "service".
>>>
>>> Whether it's only one key or more, it's secondary, I think but if users
>>> are already using Variables to keep state for tasks - this is a clear sign
>>> that we miss a crucial feature and our users are abusing Airflow already in
>>> the way we try to prevent by not introducing "State service".
>>>
>>> With the recent SecretBackend implementation where Variables might be
>>> kept in a Secret backend - not only MetaStore - potentially you might have
>>> no write access to the backend. There is even no "write" support in the
>>> current "MetastoreBackend" implementation for writing variables. So we
>>> already have configurations where if we try to write variables and read it
>>> elsewhere might not work - as far as I can see. You can set several
>>> backends of course and the Metastore as the last fallback of course, but
>>> for me, it opens up different problems - what happens if the key is present
>>> in both, tasks writes it to metastore, but another task reads it from the
>>> Secret Backend.
>>>
>>> I think it seems that variables are being abused in exactly the way we
>>> want to prevent the "StateService" to be abused - and shying away from that
>>> is really like closing our eyes and pretending it's not happening.
>>>
>>> So maybe we can make a change AIP with this approach:
>>>
>>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>>> configuration shared between workers (but not on a task level).
>>> 2) StateService (or wherever we call it) where we keep state information
>>> for specific dag + task + execution_date.
>>>
>>> J.
>>>
>>>
>>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dp...@gmail.com>
>>> wrote:
>>>
>>>> Airflow already provides a mechanism for state persistence: the
>>>> Variable, and, with caveats and flaws, XCom.
>>>>
>>>> I personally persist state to the airflow metastore database for a
>>>> large percentage of our jobs.  They are incremental jobs and it is helpful
>>>> to keep track of watermark.
>>>>
>>>> I think that incremental jobs are probably very very common in airflow
>>>> implementations.  Though probably often times users resort to imperfect
>>>> vehicles for this such as `execution_date` or xcom.
>>>>
>>>> I have a very draftey draft aip that i haven't had enough time to work
>>>> on, which explores adding explicit support for state persistence to
>>>> airflow:
>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>>> ready for primetime.)
>>>>
>>>> I am of the mind that being able to persist some state is not a
>>>> fundamental change to airflow and would just add explicit (and more
>>>> user-friendly) support for something that is already quite common, and fits
>>>> fully within the wheelhouse of what airflow exists to do.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com> wrote:
>>>>
>>>>> Furcy,
>>>>>
>>>>> To clarify, when I say that Airflow should not be in the business of
>>>>> keeping state about external systems, I specifically mean it shouldn't be
>>>>> keeping state to be shared between task instances. I completely understand
>>>>> that there may be external systems that are harder to work with, and like
>>>>> in your case require the operator to be able to store some piece of
>>>>> information to make them idempotent. I just don't think that Airflow should
>>>>> provide that storage mechanism.
>>>>>
>>>>> I would think that most users of Airflow have access to some sort of
>>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>>> easy enough to write your job_id or whatever value you care about to a file
>>>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>>>> combination of them you care about. Yes it makes your operators more
>>>>> complex and they have to know about another system, but it keeps that
>>>>> complexity out of core Airflow. That's the trade off.
>>>>>
>>>>> Ash,
>>>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>>>> there are use cases where it makes some things convenient. In your example
>>>>> though, it would be just as easy for the sensor to write the found object
>>>>> path as the contents of another file in S3, with a computable prefix based
>>>>> on the dag/task/execution_date.
>>>>>
>>>>>
>>>>> At its heart XCom is just a key-value store where the keys are limited
>>>>> to a very specific set of possibilities, and where key-value pairs are
>>>>> managed in some specific ways. The request here is to add another narrowly
>>>>> defined set of allowable keys, and as far as I can tell with no extra
>>>>> management of them. The only real advantage of using the Airflow database
>>>>> for XCom or any expansion/variation on it is that we know that all
>>>>> operators have access to the database.
>>>>>
>>>>> I'm not an expert but I would wonder how well Postgres or MySQL
>>>>> perform as high volume key value stores. Does anyone actually use XCom at
>>>>> scale, and does that extra load on the database impact scheduling and other
>>>>> performance aspects of Airflow?
>>>>>
>>>>> Chris
>>>>>
>>>>>
>>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>>> they, or something very like them are in Airflow for the foreseeable future.
>>>>>>
>>>>>> As an example of an appropriate use for XCom: Let's say a third
>>>>>> party delivers you a set of files once a week, but the exact name of the
>>>>>> files isn't known (by you) in advance. So you write a sensor that
>>>>>> polls/checks S3 for the Objects to appear in our bucket, and the sensor
>>>>>> outputs the S3 Object path to XCom, that then next processing step then
>>>>>> examines to process the files.
>>>>>>
>>>>>> That sort of use case is not going anywhere.
>>>>>>
>>>>>> Cheers,
>>>>>> -ash
>>>>>>
>>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>>>>>
>>>>>> At the risk of repeating myself (from the previous thread that
>>>>>> touched on this topic), I don't think Airflow should be in the business of
>>>>>> keeping state about external systems. Airflow is about authoring and
>>>>>> running workflows; it's not a messaging tool or a cluster management tool.
>>>>>> I'm not convinced that the existing XCom functionality should really be a
>>>>>> part of Airflow, and I certainly don't think it should be expanded upon or
>>>>>> new variations added. I think storing state is especially risky, if for no
>>>>>> other reason than the fact that Airflow is not the source of truth about
>>>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>>>> has saved will diverge from the actual state of the external system.
>>>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>>>> operator code complexity. Users would be much better served going to the
>>>>>> source of truth to determine state. If part of the problem is that Livy is
>>>>>> lacking in features (like being able to query the status of a particular
>>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>>>> making up for failures of other tools.
>>>>>>
>>>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>>>> extra features from expanding in scope. Jarek proposed something that would
>>>>>> just store a single string, and immediately Furcy wants to expand it to
>>>>>> store multiple strings. Either way we are really just talking about a
>>>>>> key-value store, and putting limits on how that key can be structured; the
>>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>>>> proposal).
>>>>>>
>>>>>> I know in the past that I had a situation where I wanted to reuse a
>>>>>> cluster across multiple data intervals, if one was already running (this
>>>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>>>> can equally see use cases where I might want to share some resource for
>>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>>>>> added this then why limit it to any one of those combinations? But then we
>>>>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>>>>> then you can use Variables, if you want to use something else then you can.
>>>>>>
>>>>>> Unless Airflow is doing some extra management of these key-values in
>>>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>>>> absolutely no added benefit. And even with some potential management by
>>>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>>>
>>>>>> Thank you Jarek for the detailed explanation,
>>>>>>
>>>>>> That's exactly what I wanted to do: write a feature request to
>>>>>> summarize all those discussions.
>>>>>> I agree with you that the feature should be marked distinct from the
>>>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>>>
>>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>>> task to delete it's xcom on the beginning of the retry.
>>>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>>>> having a task A and a task B that starts immediately after A, and wait from
>>>>>> some 'signal' from A.
>>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>>> the signal from A's first try, which is incorrect.
>>>>>>
>>>>>> About the 3 solutions you mention:
>>>>>>
>>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>>>>> you can't specify a custom one.
>>>>>> You can't even find a job by name, I would have to list all the
>>>>>> active job_ids, and do a GET for each of them to get it's name and find
>>>>>> which one is the one I want. It's doable but inelegant.
>>>>>>
>>>>>> 2) Store the id in an external storage. Of course it would work but
>>>>>> it requires an external storage. More on that below.
>>>>>>
>>>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>>>> think you mean that the idempotency can be handled by the service you call
>>>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>>>> could have something to prevent a job that run twice from creating
>>>>>> duplicates. This is another solution we are considering, but it is
>>>>>> coslty to change now.
>>>>>>
>>>>>> You guess correctly that the feature I was asking for me would be to
>>>>>> provide some utility to let the users implement solution 2) without
>>>>>> requiring an external storage.
>>>>>> I think it would be a QOL improvement for some use cases, just like
>>>>>> it could be argued that XCom is just a QOL improvement and users could have
>>>>>> used an external storage themselves.
>>>>>> The main advantage that it brings is making the custom operators much
>>>>>> easier to share and reuse across the Apache Airflow community, compared to
>>>>>> having to set up some external
>>>>>> storage.
>>>>>>
>>>>>> I have seen that some users used the metadata store itself as an
>>>>>> external storage by adding a new table to the airflow model:
>>>>>>
>>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>>>>>
>>>>>> And others suggested using XCom itself as an external storage by
>>>>>> storing information with a special task_id:
>>>>>> https://stackoverflow.com/a/57515143/2087478
>>>>>>
>>>>>> In the discussion thread you provided it was also suggested to use
>>>>>> Variables to store some persisting information.
>>>>>>
>>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>>> providing such functionality would be good.
>>>>>>
>>>>>> Finally, I don't see the point of limiting the functionality to such
>>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>>>>> string
>>>>>> will just force people who need to store more than one id for one
>>>>>> task (for whatever reason) to use some hack again, like storing a json
>>>>>> inside the storage.
>>>>>>
>>>>>> I was more thinking about something quite similar to XCom (I liked
>>>>>> the XState name suggestion), where the entry would be keyed by "(dag_id,
>>>>>> task_id, execution_date, key)"
>>>>>> where "key" can be whatever you want and would be kept across retries.
>>>>>>
>>>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>>>> I do understand the important of idempotency, and it looks like my
>>>>>> use case is one of the first ever listed where I do need to persist a state
>>>>>> across retries to make my operator really idempotent.
>>>>>>
>>>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>>>> another solution).
>>>>>>
>>>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>>>> mechanism to prevent double-writes,
>>>>>> but I think that as the above hacks show, you can't really prevent
>>>>>> users from shooting themselves in the foot if that's what they really want
>>>>>> to.
>>>>>>
>>>>>> While I do think that making things foolproof is important, I believe
>>>>>> it's also in Python's philosophy to *not* make things foolproof at
>>>>>> the detriment of simplicity for the right use cases.
>>>>>> But I do understand that the use cases are different and
>>>>>> contradictory: some would require the state to be persisted across
>>>>>> reschedule and not retries, mine would require the state to be persisted
>>>>>> across retries and not reschedule.
>>>>>>
>>>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>>>> the submit and an xcom with the job, then one task that check the progress
>>>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>>>> just for that. Plus I'm not sure we could make the first task retry if the
>>>>>> second task fails...
>>>>>>
>>>>>> Thanks again,
>>>>>>
>>>>>> Furcy
>>>>>>
>>>>>>
>>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>>>>>> wrote:
>>>>>>
>>>>>> I think we've discussed several approaches like that and using Xcom
>>>>>> name (which for many people would mean "let's just extend XCom table for
>>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>>> different functionality/logic which we might or might not agree to
>>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>>>> table behavior might be problematic.
>>>>>>
>>>>>> Not sure if you are aware but we had very similar discussion about it
>>>>>> recently (without clear conclusions but at least you can see what kind of
>>>>>> issues/problems different people have with this approach)
>>>>>>
>>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>>>
>>>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>>>> - intra-task communication mechanism, indeed. But it can very easily lead
>>>>>> to people abusing it and bypassing the guarantees (idempotency mainly) that
>>>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>>>> after the latest discussion kind of died out, and I have one possible
>>>>>> solution to the problem.
>>>>>>
>>>>>> Let me explain what I think about it (but others can have different
>>>>>> opinions of course):
>>>>>>
>>>>>> So far the discussion was that there are several ways to achieve what
>>>>>> you want (and it's really about what entity is providing the "idempotency"
>>>>>> guarantee:
>>>>>>
>>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>>>>> job_id from outside. You'd need to work out the job_id naming that works in
>>>>>> your case and make sure that when you re-run your task with the same
>>>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>>>
>>>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>>>> where it can be queried in the way that will work for you). Then the
>>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>>> external storage.
>>>>>>
>>>>>> 3) Query your service and retrieve the JOB ID from it - but you have
>>>>>> to have a way to query for the job related to your "dag id  + task
>>>>>> + execution_date". Then - the idempotency is actually handling by the
>>>>>> Service you are using.
>>>>>>
>>>>>> In the use case, you describe - this is the only thing you need -
>>>>>> "idempotency source". I believe you would like to get the case 2) from
>>>>>> above but without having to use external storage to store the "unique id".
>>>>>> Something that will let each task in the same dag run to set or retrieve a
>>>>>> unique value for that particular task. One value should be enough -
>>>>>> assuming that each operator/task works on one external data "source".
>>>>>>
>>>>>> My current thinking is:
>>>>>>
>>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>>>> class with two methods:
>>>>>>
>>>>>>   * .set(id: str) and
>>>>>>   * .get() -> str
>>>>>>
>>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>>> task_id, execution_date)" - available also via Jinja templating. There is
>>>>>> no intra-task communication, here, very little possibility of abuse and it
>>>>>> seems to solve the major pain point where you have to provide your own
>>>>>> storage to get the idempotency if your service does not provide one or you
>>>>>> do not want to delegate it to the DAG writer.
>>>>>>
>>>>>> J.
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>>>
>>>>>> The use case I'm referring to is that you can't use xcom to let a
>>>>>> task read information from it's past attempts, because when a task starts
>>>>>> it's xcom is automatically deleted.
>>>>>>
>>>>>> My specific use case is that we have a custom LivyOperator that calls
>>>>>> Livy to start batch Spark Jobs.
>>>>>> When you start a batch job Livy returns a job_id
>>>>>> Sometimes our operator can fail for one reason or another (for
>>>>>> instance if Livy is unreachable for a while)
>>>>>> When the task retries, it calls Livy again, which start the same
>>>>>> spark job, but the problem is that the spark job from the first attempt can
>>>>>> still be running,
>>>>>> and then we have a batch job that runs twice simultaneously and
>>>>>> creates duplicates in the output.
>>>>>>
>>>>>> What we tried to do is getting the job_id from the first try, to
>>>>>> check if the job is still running, and wait for it to complete if it is.
>>>>>>
>>>>>> We tried using xcom to let the task send a message to itself (to it's
>>>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>>>> doesn't work and is not intended to work.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Furcy,
>>>>>>
>>>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>>>> Depending your use case this may already be possible.
>>>>>>
>>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I would like to open a feature request for Airflow to support
>>>>>> "intra-task xcom".
>>>>>>
>>>>>> It seems that there are several distinct use cases for it already
>>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>>>
>>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>>> and the recommended approach (which apparently would be to create
>>>>>> a distinct feature from xcom to support this, it could be calle
>>>>>> intra-com or self-com ?)
>>>>>>
>>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>>>> email).
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Furcy
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Jarek Potiuk
>>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>>
>>>>>> M: +48 660 796 129 <+48660796129>
>>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>>
>>>>>>
>>>
>>> --
>>>
>>> Jarek Potiuk
>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>
>>> M: +48 660 796 129 <+48660796129>
>>> [image: Polidea] <https://www.polidea.com/>
>>>
>>>

Re: Intra-task "xcom"

Posted by Jarek Potiuk <Ja...@polidea.com>.
I'd love to take part - but I am going for week of vacations (finally!)
next week, so week after next would be great!

On Thu, Jul 23, 2020 at 12:23 AM Daniel Standish <dp...@gmail.com>
wrote:

> We are using state persistence pretty heavily right now with plugin models
> that I have called ProcessState and TaskState
>
> Our implementation might be too idiosyncratic to contribute to airflow,
> but then again it might not.  I would be happy to do a call to demo what we
> are doing to see if there is any interest, and to receive guidance from
> interested parties re what if anything might make sense in airflow.  Kaxil
> do you have any interest in that?
>
> On Thu, Jun 4, 2020 at 4:49 PM Kaxil Naik <ka...@gmail.com> wrote:
>
>> I definitely feel we can support this uses-cases by improving XCom. The
>> concept of XCom was to allow sharing messages & state between tasks.
>>
>> Here is the first line from the docs about Xcom:
>>
>> XComs let tasks exchange messages, allowing more nuanced forms of control
>> and shared state. The name is an abbreviation of “cross-communication”.
>>
>> I read the AIP (
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence
>> ) from @Daniel Standish <dp...@gmail.com> , the "namespacing" of
>> this state would be a good feature,
>> XCom already allows that with "dag_id, task_id" key. *Solution 1* in the
>> AIP would solve the issue without much impact and whilst
>> maintaining backwards-compatiblilty.
>>
>> I am against the idea of using Secrets Backend for storing "State".
>> Storing state for some kind of persistence should be "short-lived" and
>> temporary.
>>
>> The "writers" & "readers" of both (Secrets & State) are different.
>> Generally, Sysadmins / Teamleads are responsible for managing secrets
>> (writing, rotating, auditing) etc whereas for State it is written via
>> Airflow Workers  and would (or should) be short-lived and you don't care by
>> auditing or rotating the value in "State".
>>
>> The only problem that I can see in the current XCom implementation is 1)
>> the use of execution_date and the fact that 2) XCom are cleared at the
>> start.
>>
>> One of the issue we already want to address in Airflow is to remove the
>> hard-requirement of "execution_date" for DagRun and TaskInstance. This
>> would also help in fixing (1) above.
>>
>> (2) can be solved by a flag as mentioned in the AIP.
>>
>> Regards,
>> Kaxil
>>
>>
>> On Tue, Jun 2, 2020 at 8:05 AM Jarek Potiuk <Ja...@polidea.com>
>> wrote:
>>
>>> I think this subject came so often, that I also change my mind slowly in
>>> favor of making an explicit state persistence "service".
>>>
>>> Whether it's only one key or more, it's secondary, I think but if users
>>> are already using Variables to keep state for tasks - this is a clear sign
>>> that we miss a crucial feature and our users are abusing Airflow already in
>>> the way we try to prevent by not introducing "State service".
>>>
>>> With the recent SecretBackend implementation where Variables might be
>>> kept in a Secret backend - not only MetaStore - potentially you might have
>>> no write access to the backend. There is even no "write" support in the
>>> current "MetastoreBackend" implementation for writing variables. So we
>>> already have configurations where if we try to write variables and read it
>>> elsewhere might not work - as far as I can see. You can set several
>>> backends of course and the Metastore as the last fallback of course, but
>>> for me, it opens up different problems - what happens if the key is present
>>> in both, tasks writes it to metastore, but another task reads it from the
>>> Secret Backend.
>>>
>>> I think it seems that variables are being abused in exactly the way we
>>> want to prevent the "StateService" to be abused - and shying away from that
>>> is really like closing our eyes and pretending it's not happening.
>>>
>>> So maybe we can make a change AIP with this approach:
>>>
>>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>>> configuration shared between workers (but not on a task level).
>>> 2) StateService (or wherever we call it) where we keep state information
>>> for specific dag + task + execution_date.
>>>
>>> J.
>>>
>>>
>>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dp...@gmail.com>
>>> wrote:
>>>
>>>> Airflow already provides a mechanism for state persistence: the
>>>> Variable, and, with caveats and flaws, XCom.
>>>>
>>>> I personally persist state to the airflow metastore database for a
>>>> large percentage of our jobs.  They are incremental jobs and it is helpful
>>>> to keep track of watermark.
>>>>
>>>> I think that incremental jobs are probably very very common in airflow
>>>> implementations.  Though probably often times users resort to imperfect
>>>> vehicles for this such as `execution_date` or xcom.
>>>>
>>>> I have a very draftey draft aip that i haven't had enough time to work
>>>> on, which explores adding explicit support for state persistence to
>>>> airflow:
>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>>> ready for primetime.)
>>>>
>>>> I am of the mind that being able to persist some state is not a
>>>> fundamental change to airflow and would just add explicit (and more
>>>> user-friendly) support for something that is already quite common, and fits
>>>> fully within the wheelhouse of what airflow exists to do.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com> wrote:
>>>>
>>>>> Furcy,
>>>>>
>>>>> To clarify, when I say that Airflow should not be in the business of
>>>>> keeping state about external systems, I specifically mean it shouldn't be
>>>>> keeping state to be shared between task instances. I completely understand
>>>>> that there may be external systems that are harder to work with, and like
>>>>> in your case require the operator to be able to store some piece of
>>>>> information to make them idempotent. I just don't think that Airflow should
>>>>> provide that storage mechanism.
>>>>>
>>>>> I would think that most users of Airflow have access to some sort of
>>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>>> easy enough to write your job_id or whatever value you care about to a file
>>>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>>>> combination of them you care about. Yes it makes your operators more
>>>>> complex and they have to know about another system, but it keeps that
>>>>> complexity out of core Airflow. That's the trade off.
>>>>>
>>>>> Ash,
>>>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>>>> there are use cases where it makes some things convenient. In your example
>>>>> though, it would be just as easy for the sensor to write the found object
>>>>> path as the contents of another file in S3, with a computable prefix based
>>>>> on the dag/task/execution_date.
>>>>>
>>>>>
>>>>> At its heart XCom is just a key-value store where the keys are limited
>>>>> to a very specific set of possibilities, and where key-value pairs are
>>>>> managed in some specific ways. The request here is to add another narrowly
>>>>> defined set of allowable keys, and as far as I can tell with no extra
>>>>> management of them. The only real advantage of using the Airflow database
>>>>> for XCom or any expansion/variation on it is that we know that all
>>>>> operators have access to the database.
>>>>>
>>>>> I'm not an expert but I would wonder how well Postgres or MySQL
>>>>> perform as high volume key value stores. Does anyone actually use XCom at
>>>>> scale, and does that extra load on the database impact scheduling and other
>>>>> performance aspects of Airflow?
>>>>>
>>>>> Chris
>>>>>
>>>>>
>>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>>> they, or something very like them are in Airflow for the foreseeable future.
>>>>>>
>>>>>> As an example of an appropriate use for XCom: Let's say a third
>>>>>> party delivers you a set of files once a week, but the exact name of the
>>>>>> files isn't known (by you) in advance. So you write a sensor that
>>>>>> polls/checks S3 for the Objects to appear in our bucket, and the sensor
>>>>>> outputs the S3 Object path to XCom, that then next processing step then
>>>>>> examines to process the files.
>>>>>>
>>>>>> That sort of use case is not going anywhere.
>>>>>>
>>>>>> Cheers,
>>>>>> -ash
>>>>>>
>>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>>>>>
>>>>>> At the risk of repeating myself (from the previous thread that
>>>>>> touched on this topic), I don't think Airflow should be in the business of
>>>>>> keeping state about external systems. Airflow is about authoring and
>>>>>> running workflows; it's not a messaging tool or a cluster management tool.
>>>>>> I'm not convinced that the existing XCom functionality should really be a
>>>>>> part of Airflow, and I certainly don't think it should be expanded upon or
>>>>>> new variations added. I think storing state is especially risky, if for no
>>>>>> other reason than the fact that Airflow is not the source of truth about
>>>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>>>> has saved will diverge from the actual state of the external system.
>>>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>>>> operator code complexity. Users would be much better served going to the
>>>>>> source of truth to determine state. If part of the problem is that Livy is
>>>>>> lacking in features (like being able to query the status of a particular
>>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>>>> making up for failures of other tools.
>>>>>>
>>>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>>>> extra features from expanding in scope. Jarek proposed something that would
>>>>>> just store a single string, and immediately Furcy wants to expand it to
>>>>>> store multiple strings. Either way we are really just talking about a
>>>>>> key-value store, and putting limits on how that key can be structured; the
>>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>>>> proposal).
>>>>>>
>>>>>> I know in the past that I had a situation where I wanted to reuse a
>>>>>> cluster across multiple data intervals, if one was already running (this
>>>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>>>> can equally see use cases where I might want to share some resource for
>>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>>>>> added this then why limit it to any one of those combinations? But then we
>>>>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>>>>> then you can use Variables, if you want to use something else then you can.
>>>>>>
>>>>>> Unless Airflow is doing some extra management of these key-values in
>>>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>>>> absolutely no added benefit. And even with some potential management by
>>>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>>>
>>>>>> Thank you Jarek for the detailed explanation,
>>>>>>
>>>>>> That's exactly what I wanted to do: write a feature request to
>>>>>> summarize all those discussions.
>>>>>> I agree with you that the feature should be marked distinct from the
>>>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>>>
>>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>>> task to delete it's xcom on the beginning of the retry.
>>>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>>>> having a task A and a task B that starts immediately after A, and wait from
>>>>>> some 'signal' from A.
>>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>>> the signal from A's first try, which is incorrect.
>>>>>>
>>>>>> About the 3 solutions you mention:
>>>>>>
>>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>>>>> you can't specify a custom one.
>>>>>> You can't even find a job by name, I would have to list all the
>>>>>> active job_ids, and do a GET for each of them to get it's name and find
>>>>>> which one is the one I want. It's doable but inelegant.
>>>>>>
>>>>>> 2) Store the id in an external storage. Of course it would work but
>>>>>> it requires an external storage. More on that below.
>>>>>>
>>>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>>>> think you mean that the idempotency can be handled by the service you call
>>>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>>>> could have something to prevent a job that run twice from creating
>>>>>> duplicates. This is another solution we are considering, but it is
>>>>>> coslty to change now.
>>>>>>
>>>>>> You guess correctly that the feature I was asking for me would be to
>>>>>> provide some utility to let the users implement solution 2) without
>>>>>> requiring an external storage.
>>>>>> I think it would be a QOL improvement for some use cases, just like
>>>>>> it could be argued that XCom is just a QOL improvement and users could have
>>>>>> used an external storage themselves.
>>>>>> The main advantage that it brings is making the custom operators much
>>>>>> easier to share and reuse across the Apache Airflow community, compared to
>>>>>> having to set up some external
>>>>>> storage.
>>>>>>
>>>>>> I have seen that some users used the metadata store itself as an
>>>>>> external storage by adding a new table to the airflow model:
>>>>>>
>>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>>>>>
>>>>>> And others suggested using XCom itself as an external storage by
>>>>>> storing information with a special task_id:
>>>>>> https://stackoverflow.com/a/57515143/2087478
>>>>>>
>>>>>> In the discussion thread you provided it was also suggested to use
>>>>>> Variables to store some persisting information.
>>>>>>
>>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>>> providing such functionality would be good.
>>>>>>
>>>>>> Finally, I don't see the point of limiting the functionality to such
>>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>>>>> string
>>>>>> will just force people who need to store more than one id for one
>>>>>> task (for whatever reason) to use some hack again, like storing a json
>>>>>> inside the storage.
>>>>>>
>>>>>> I was more thinking about something quite similar to XCom (I liked
>>>>>> the XState name suggestion), where the entry would be keyed by "(dag_id,
>>>>>> task_id, execution_date, key)"
>>>>>> where "key" can be whatever you want and would be kept across retries.
>>>>>>
>>>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>>>> I do understand the important of idempotency, and it looks like my
>>>>>> use case is one of the first ever listed where I do need to persist a state
>>>>>> across retries to make my operator really idempotent.
>>>>>>
>>>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>>>> another solution).
>>>>>>
>>>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>>>> mechanism to prevent double-writes,
>>>>>> but I think that as the above hacks show, you can't really prevent
>>>>>> users from shooting themselves in the foot if that's what they really want
>>>>>> to.
>>>>>>
>>>>>> While I do think that making things foolproof is important, I believe
>>>>>> it's also in Python's philosophy to *not* make things foolproof at
>>>>>> the detriment of simplicity for the right use cases.
>>>>>> But I do understand that the use cases are different and
>>>>>> contradictory: some would require the state to be persisted across
>>>>>> reschedule and not retries, mine would require the state to be persisted
>>>>>> across retries and not reschedule.
>>>>>>
>>>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>>>> the submit and an xcom with the job, then one task that check the progress
>>>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>>>> just for that. Plus I'm not sure we could make the first task retry if the
>>>>>> second task fails...
>>>>>>
>>>>>> Thanks again,
>>>>>>
>>>>>> Furcy
>>>>>>
>>>>>>
>>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>>>>>> wrote:
>>>>>>
>>>>>> I think we've discussed several approaches like that and using Xcom
>>>>>> name (which for many people would mean "let's just extend XCom table for
>>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>>> different functionality/logic which we might or might not agree to
>>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>>>> table behavior might be problematic.
>>>>>>
>>>>>> Not sure if you are aware but we had very similar discussion about it
>>>>>> recently (without clear conclusions but at least you can see what kind of
>>>>>> issues/problems different people have with this approach)
>>>>>>
>>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>>>
>>>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>>>> - intra-task communication mechanism, indeed. But it can very easily lead
>>>>>> to people abusing it and bypassing the guarantees (idempotency mainly) that
>>>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>>>> after the latest discussion kind of died out, and I have one possible
>>>>>> solution to the problem.
>>>>>>
>>>>>> Let me explain what I think about it (but others can have different
>>>>>> opinions of course):
>>>>>>
>>>>>> So far the discussion was that there are several ways to achieve what
>>>>>> you want (and it's really about what entity is providing the "idempotency"
>>>>>> guarantee:
>>>>>>
>>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>>>>> job_id from outside. You'd need to work out the job_id naming that works in
>>>>>> your case and make sure that when you re-run your task with the same
>>>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>>>
>>>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>>>> where it can be queried in the way that will work for you). Then the
>>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>>> external storage.
>>>>>>
>>>>>> 3) Query your service and retrieve the JOB ID from it - but you have
>>>>>> to have a way to query for the job related to your "dag id  + task
>>>>>> + execution_date". Then - the idempotency is actually handling by the
>>>>>> Service you are using.
>>>>>>
>>>>>> In the use case, you describe - this is the only thing you need -
>>>>>> "idempotency source". I believe you would like to get the case 2) from
>>>>>> above but without having to use external storage to store the "unique id".
>>>>>> Something that will let each task in the same dag run to set or retrieve a
>>>>>> unique value for that particular task. One value should be enough -
>>>>>> assuming that each operator/task works on one external data "source".
>>>>>>
>>>>>> My current thinking is:
>>>>>>
>>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>>>> class with two methods:
>>>>>>
>>>>>>   * .set(id: str) and
>>>>>>   * .get() -> str
>>>>>>
>>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>>> task_id, execution_date)" - available also via Jinja templating. There is
>>>>>> no intra-task communication, here, very little possibility of abuse and it
>>>>>> seems to solve the major pain point where you have to provide your own
>>>>>> storage to get the idempotency if your service does not provide one or you
>>>>>> do not want to delegate it to the DAG writer.
>>>>>>
>>>>>> J.
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>>>
>>>>>> The use case I'm referring to is that you can't use xcom to let a
>>>>>> task read information from it's past attempts, because when a task starts
>>>>>> it's xcom is automatically deleted.
>>>>>>
>>>>>> My specific use case is that we have a custom LivyOperator that calls
>>>>>> Livy to start batch Spark Jobs.
>>>>>> When you start a batch job Livy returns a job_id
>>>>>> Sometimes our operator can fail for one reason or another (for
>>>>>> instance if Livy is unreachable for a while)
>>>>>> When the task retries, it calls Livy again, which start the same
>>>>>> spark job, but the problem is that the spark job from the first attempt can
>>>>>> still be running,
>>>>>> and then we have a batch job that runs twice simultaneously and
>>>>>> creates duplicates in the output.
>>>>>>
>>>>>> What we tried to do is getting the job_id from the first try, to
>>>>>> check if the job is still running, and wait for it to complete if it is.
>>>>>>
>>>>>> We tried using xcom to let the task send a message to itself (to it's
>>>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>>>> doesn't work and is not intended to work.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Furcy,
>>>>>>
>>>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>>>> Depending your use case this may already be possible.
>>>>>>
>>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I would like to open a feature request for Airflow to support
>>>>>> "intra-task xcom".
>>>>>>
>>>>>> It seems that there are several distinct use cases for it already
>>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>>>
>>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>>> and the recommended approach (which apparently would be to create
>>>>>> a distinct feature from xcom to support this, it could be calle
>>>>>> intra-com or self-com ?)
>>>>>>
>>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>>>> email).
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Furcy
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Jarek Potiuk
>>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>>
>>>>>> M: +48 660 796 129 <+48660796129>
>>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>>
>>>>>>
>>>
>>> --
>>>
>>> Jarek Potiuk
>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>
>>> M: +48 660 796 129 <+48660796129>
>>> [image: Polidea] <https://www.polidea.com/>
>>>
>>>

-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Re: Intra-task "xcom"

Posted by Daniel Standish <dp...@gmail.com>.
We are using state persistence pretty heavily right now with plugin models
that I have called ProcessState and TaskState

Our implementation might be too idiosyncratic to contribute to airflow, but
then again it might not.  I would be happy to do a call to demo what we are
doing to see if there is any interest, and to receive guidance from
interested parties re what if anything might make sense in airflow.  Kaxil
do you have any interest in that?

On Thu, Jun 4, 2020 at 4:49 PM Kaxil Naik <ka...@gmail.com> wrote:

> I definitely feel we can support this uses-cases by improving XCom. The
> concept of XCom was to allow sharing messages & state between tasks.
>
> Here is the first line from the docs about Xcom:
>
> XComs let tasks exchange messages, allowing more nuanced forms of control
> and shared state. The name is an abbreviation of “cross-communication”.
>
> I read the AIP (
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence
> ) from @Daniel Standish <dp...@gmail.com> , the "namespacing" of
> this state would be a good feature,
> XCom already allows that with "dag_id, task_id" key. *Solution 1* in the
> AIP would solve the issue without much impact and whilst
> maintaining backwards-compatiblilty.
>
> I am against the idea of using Secrets Backend for storing "State".
> Storing state for some kind of persistence should be "short-lived" and
> temporary.
>
> The "writers" & "readers" of both (Secrets & State) are different.
> Generally, Sysadmins / Teamleads are responsible for managing secrets
> (writing, rotating, auditing) etc whereas for State it is written via
> Airflow Workers  and would (or should) be short-lived and you don't care by
> auditing or rotating the value in "State".
>
> The only problem that I can see in the current XCom implementation is 1)
> the use of execution_date and the fact that 2) XCom are cleared at the
> start.
>
> One of the issue we already want to address in Airflow is to remove the
> hard-requirement of "execution_date" for DagRun and TaskInstance. This
> would also help in fixing (1) above.
>
> (2) can be solved by a flag as mentioned in the AIP.
>
> Regards,
> Kaxil
>
>
> On Tue, Jun 2, 2020 at 8:05 AM Jarek Potiuk <Ja...@polidea.com>
> wrote:
>
>> I think this subject came so often, that I also change my mind slowly in
>> favor of making an explicit state persistence "service".
>>
>> Whether it's only one key or more, it's secondary, I think but if users
>> are already using Variables to keep state for tasks - this is a clear sign
>> that we miss a crucial feature and our users are abusing Airflow already in
>> the way we try to prevent by not introducing "State service".
>>
>> With the recent SecretBackend implementation where Variables might be
>> kept in a Secret backend - not only MetaStore - potentially you might have
>> no write access to the backend. There is even no "write" support in the
>> current "MetastoreBackend" implementation for writing variables. So we
>> already have configurations where if we try to write variables and read it
>> elsewhere might not work - as far as I can see. You can set several
>> backends of course and the Metastore as the last fallback of course, but
>> for me, it opens up different problems - what happens if the key is present
>> in both, tasks writes it to metastore, but another task reads it from the
>> Secret Backend.
>>
>> I think it seems that variables are being abused in exactly the way we
>> want to prevent the "StateService" to be abused - and shying away from that
>> is really like closing our eyes and pretending it's not happening.
>>
>> So maybe we can make a change AIP with this approach:
>>
>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>> configuration shared between workers (but not on a task level).
>> 2) StateService (or wherever we call it) where we keep state information
>> for specific dag + task + execution_date.
>>
>> J.
>>
>>
>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dp...@gmail.com>
>> wrote:
>>
>>> Airflow already provides a mechanism for state persistence: the
>>> Variable, and, with caveats and flaws, XCom.
>>>
>>> I personally persist state to the airflow metastore database for a large
>>> percentage of our jobs.  They are incremental jobs and it is helpful to
>>> keep track of watermark.
>>>
>>> I think that incremental jobs are probably very very common in airflow
>>> implementations.  Though probably often times users resort to imperfect
>>> vehicles for this such as `execution_date` or xcom.
>>>
>>> I have a very draftey draft aip that i haven't had enough time to work
>>> on, which explores adding explicit support for state persistence to
>>> airflow:
>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>> ready for primetime.)
>>>
>>> I am of the mind that being able to persist some state is not a
>>> fundamental change to airflow and would just add explicit (and more
>>> user-friendly) support for something that is already quite common, and fits
>>> fully within the wheelhouse of what airflow exists to do.
>>>
>>>
>>>
>>>
>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com> wrote:
>>>
>>>> Furcy,
>>>>
>>>> To clarify, when I say that Airflow should not be in the business of
>>>> keeping state about external systems, I specifically mean it shouldn't be
>>>> keeping state to be shared between task instances. I completely understand
>>>> that there may be external systems that are harder to work with, and like
>>>> in your case require the operator to be able to store some piece of
>>>> information to make them idempotent. I just don't think that Airflow should
>>>> provide that storage mechanism.
>>>>
>>>> I would think that most users of Airflow have access to some sort of
>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>> easy enough to write your job_id or whatever value you care about to a file
>>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>>> combination of them you care about. Yes it makes your operators more
>>>> complex and they have to know about another system, but it keeps that
>>>> complexity out of core Airflow. That's the trade off.
>>>>
>>>> Ash,
>>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>>> there are use cases where it makes some things convenient. In your example
>>>> though, it would be just as easy for the sensor to write the found object
>>>> path as the contents of another file in S3, with a computable prefix based
>>>> on the dag/task/execution_date.
>>>>
>>>>
>>>> At its heart XCom is just a key-value store where the keys are limited
>>>> to a very specific set of possibilities, and where key-value pairs are
>>>> managed in some specific ways. The request here is to add another narrowly
>>>> defined set of allowable keys, and as far as I can tell with no extra
>>>> management of them. The only real advantage of using the Airflow database
>>>> for XCom or any expansion/variation on it is that we know that all
>>>> operators have access to the database.
>>>>
>>>> I'm not an expert but I would wonder how well Postgres or MySQL perform
>>>> as high volume key value stores. Does anyone actually use XCom at scale,
>>>> and does that extra load on the database impact scheduling and other
>>>> performance aspects of Airflow?
>>>>
>>>> Chris
>>>>
>>>>
>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org>
>>>> wrote:
>>>>
>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>> they, or something very like them are in Airflow for the foreseeable future.
>>>>>
>>>>> As an example of an appropriate use for XCom: Let's say a third party
>>>>> delivers you a set of files once a week, but the exact name of the files
>>>>> isn't known (by you) in advance. So you write a sensor that polls/checks S3
>>>>> for the Objects to appear in our bucket, and the sensor outputs the S3
>>>>> Object path to XCom, that then next processing step then examines to
>>>>> process the files.
>>>>>
>>>>> That sort of use case is not going anywhere.
>>>>>
>>>>> Cheers,
>>>>> -ash
>>>>>
>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>>>>
>>>>> At the risk of repeating myself (from the previous thread that touched
>>>>> on this topic), I don't think Airflow should be in the business of keeping
>>>>> state about external systems. Airflow is about authoring and running
>>>>> workflows; it's not a messaging tool or a cluster management tool. I'm not
>>>>> convinced that the existing XCom functionality should really be a part of
>>>>> Airflow, and I certainly don't think it should be expanded upon or new
>>>>> variations added. I think storing state is especially risky, if for no
>>>>> other reason than the fact that Airflow is not the source of truth about
>>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>>> has saved will diverge from the actual state of the external system.
>>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>>> operator code complexity. Users would be much better served going to the
>>>>> source of truth to determine state. If part of the problem is that Livy is
>>>>> lacking in features (like being able to query the status of a particular
>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>>> making up for failures of other tools.
>>>>>
>>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>>> extra features from expanding in scope. Jarek proposed something that would
>>>>> just store a single string, and immediately Furcy wants to expand it to
>>>>> store multiple strings. Either way we are really just talking about a
>>>>> key-value store, and putting limits on how that key can be structured; the
>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>>> proposal).
>>>>>
>>>>> I know in the past that I had a situation where I wanted to reuse a
>>>>> cluster across multiple data intervals, if one was already running (this
>>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>>> can equally see use cases where I might want to share some resource for
>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>>>> added this then why limit it to any one of those combinations? But then we
>>>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>>>> then you can use Variables, if you want to use something else then you can.
>>>>>
>>>>> Unless Airflow is doing some extra management of these key-values in
>>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>>> absolutely no added benefit. And even with some potential management by
>>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>>
>>>>> Chris
>>>>>
>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>>
>>>>> Thank you Jarek for the detailed explanation,
>>>>>
>>>>> That's exactly what I wanted to do: write a feature request to
>>>>> summarize all those discussions.
>>>>> I agree with you that the feature should be marked distinct from the
>>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>>
>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>> task to delete it's xcom on the beginning of the retry.
>>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>>> having a task A and a task B that starts immediately after A, and wait from
>>>>> some 'signal' from A.
>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>> the signal from A's first try, which is incorrect.
>>>>>
>>>>> About the 3 solutions you mention:
>>>>>
>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>>>> you can't specify a custom one.
>>>>> You can't even find a job by name, I would have to list all the active
>>>>> job_ids, and do a GET for each of them to get it's name and find which one
>>>>> is the one I want. It's doable but inelegant.
>>>>>
>>>>> 2) Store the id in an external storage. Of course it would work but it
>>>>> requires an external storage. More on that below.
>>>>>
>>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>>> think you mean that the idempotency can be handled by the service you call
>>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>>> could have something to prevent a job that run twice from creating
>>>>> duplicates. This is another solution we are considering, but it is
>>>>> coslty to change now.
>>>>>
>>>>> You guess correctly that the feature I was asking for me would be to
>>>>> provide some utility to let the users implement solution 2) without
>>>>> requiring an external storage.
>>>>> I think it would be a QOL improvement for some use cases, just like it
>>>>> could be argued that XCom is just a QOL improvement and users could have
>>>>> used an external storage themselves.
>>>>> The main advantage that it brings is making the custom operators much
>>>>> easier to share and reuse across the Apache Airflow community, compared to
>>>>> having to set up some external
>>>>> storage.
>>>>>
>>>>> I have seen that some users used the metadata store itself as an
>>>>> external storage by adding a new table to the airflow model:
>>>>>
>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>>>>
>>>>> And others suggested using XCom itself as an external storage by
>>>>> storing information with a special task_id:
>>>>> https://stackoverflow.com/a/57515143/2087478
>>>>>
>>>>> In the discussion thread you provided it was also suggested to use
>>>>> Variables to store some persisting information.
>>>>>
>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>> providing such functionality would be good.
>>>>>
>>>>> Finally, I don't see the point of limiting the functionality to such
>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>>>> string
>>>>> will just force people who need to store more than one id for one task
>>>>> (for whatever reason) to use some hack again, like storing a json inside
>>>>> the storage.
>>>>>
>>>>> I was more thinking about something quite similar to XCom (I liked the
>>>>> XState name suggestion), where the entry would be keyed by "(dag_id,
>>>>> task_id, execution_date, key)"
>>>>> where "key" can be whatever you want and would be kept across retries.
>>>>>
>>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>>> I do understand the important of idempotency, and it looks like my use
>>>>> case is one of the first ever listed where I do need to persist a state
>>>>> across retries to make my operator really idempotent.
>>>>>
>>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>>> another solution).
>>>>>
>>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>>> mechanism to prevent double-writes,
>>>>> but I think that as the above hacks show, you can't really prevent
>>>>> users from shooting themselves in the foot if that's what they really want
>>>>> to.
>>>>>
>>>>> While I do think that making things foolproof is important, I believe
>>>>> it's also in Python's philosophy to *not* make things foolproof at
>>>>> the detriment of simplicity for the right use cases.
>>>>> But I do understand that the use cases are different and
>>>>> contradictory: some would require the state to be persisted across
>>>>> reschedule and not retries, mine would require the state to be persisted
>>>>> across retries and not reschedule.
>>>>>
>>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>>> the submit and an xcom with the job, then one task that check the progress
>>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>>> just for that. Plus I'm not sure we could make the first task retry if the
>>>>> second task fails...
>>>>>
>>>>> Thanks again,
>>>>>
>>>>> Furcy
>>>>>
>>>>>
>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>>>>> wrote:
>>>>>
>>>>> I think we've discussed several approaches like that and using Xcom
>>>>> name (which for many people would mean "let's just extend XCom table for
>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>> different functionality/logic which we might or might not agree to
>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>>> table behavior might be problematic.
>>>>>
>>>>> Not sure if you are aware but we had very similar discussion about it
>>>>> recently (without clear conclusions but at least you can see what kind of
>>>>> issues/problems different people have with this approach)
>>>>>
>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>>
>>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>>> - intra-task communication mechanism, indeed. But it can very easily lead
>>>>> to people abusing it and bypassing the guarantees (idempotency mainly) that
>>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>>> after the latest discussion kind of died out, and I have one possible
>>>>> solution to the problem.
>>>>>
>>>>> Let me explain what I think about it (but others can have different
>>>>> opinions of course):
>>>>>
>>>>> So far the discussion was that there are several ways to achieve what
>>>>> you want (and it's really about what entity is providing the "idempotency"
>>>>> guarantee:
>>>>>
>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>>>> job_id from outside. You'd need to work out the job_id naming that works in
>>>>> your case and make sure that when you re-run your task with the same
>>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>>
>>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>>> where it can be queried in the way that will work for you). Then the
>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>> external storage.
>>>>>
>>>>> 3) Query your service and retrieve the JOB ID from it - but you have
>>>>> to have a way to query for the job related to your "dag id  + task
>>>>> + execution_date". Then - the idempotency is actually handling by the
>>>>> Service you are using.
>>>>>
>>>>> In the use case, you describe - this is the only thing you need -
>>>>> "idempotency source". I believe you would like to get the case 2) from
>>>>> above but without having to use external storage to store the "unique id".
>>>>> Something that will let each task in the same dag run to set or retrieve a
>>>>> unique value for that particular task. One value should be enough -
>>>>> assuming that each operator/task works on one external data "source".
>>>>>
>>>>> My current thinking is:
>>>>>
>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>>> class with two methods:
>>>>>
>>>>>   * .set(id: str) and
>>>>>   * .get() -> str
>>>>>
>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>> task_id, execution_date)" - available also via Jinja templating. There is
>>>>> no intra-task communication, here, very little possibility of abuse and it
>>>>> seems to solve the major pain point where you have to provide your own
>>>>> storage to get the idempotency if your service does not provide one or you
>>>>> do not want to delegate it to the DAG writer.
>>>>>
>>>>> J.
>>>>>
>>>>>
>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>>
>>>>> The use case I'm referring to is that you can't use xcom to let a task
>>>>> read information from it's past attempts, because when a task starts it's
>>>>> xcom is automatically deleted.
>>>>>
>>>>> My specific use case is that we have a custom LivyOperator that calls
>>>>> Livy to start batch Spark Jobs.
>>>>> When you start a batch job Livy returns a job_id
>>>>> Sometimes our operator can fail for one reason or another (for
>>>>> instance if Livy is unreachable for a while)
>>>>> When the task retries, it calls Livy again, which start the same spark
>>>>> job, but the problem is that the spark job from the first attempt can still
>>>>> be running,
>>>>> and then we have a batch job that runs twice simultaneously and
>>>>> creates duplicates in the output.
>>>>>
>>>>> What we tried to do is getting the job_id from the first try, to check
>>>>> if the job is still running, and wait for it to complete if it is.
>>>>>
>>>>> We tried using xcom to let the task send a message to itself (to it's
>>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>>> doesn't work and is not intended to work.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>>>>
>>>>> Hi Furcy,
>>>>>
>>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>>> Depending your use case this may already be possible.
>>>>>
>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I would like to open a feature request for Airflow to support
>>>>> "intra-task xcom".
>>>>>
>>>>> It seems that there are several distinct use cases for it already
>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>>
>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>> and the recommended approach (which apparently would be to create
>>>>> a distinct feature from xcom to support this, it could be calle
>>>>> intra-com or self-com ?)
>>>>>
>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>>> email).
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Furcy
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Jarek Potiuk
>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>
>>>>> M: +48 660 796 129 <+48660796129>
>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>
>>>>>
>>
>> --
>>
>> Jarek Potiuk
>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>
>> M: +48 660 796 129 <+48660796129>
>> [image: Polidea] <https://www.polidea.com/>
>>
>>

Re: Intra-task "xcom"

Posted by Kaxil Naik <ka...@gmail.com>.
I definitely feel we can support this uses-cases by improving XCom. The
concept of XCom was to allow sharing messages & state between tasks.

Here is the first line from the docs about Xcom:

XComs let tasks exchange messages, allowing more nuanced forms of control
and shared state. The name is an abbreviation of “cross-communication”.

I read the AIP (
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence
) from @Daniel Standish <dp...@gmail.com> , the "namespacing" of this
state would be a good feature,
XCom already allows that with "dag_id, task_id" key. *Solution 1* in the
AIP would solve the issue without much impact and whilst
maintaining backwards-compatiblilty.

I am against the idea of using Secrets Backend for storing "State". Storing
state for some kind of persistence should be "short-lived" and temporary.

The "writers" & "readers" of both (Secrets & State) are different.
Generally, Sysadmins / Teamleads are responsible for managing secrets
(writing, rotating, auditing) etc whereas for State it is written via
Airflow Workers  and would (or should) be short-lived and you don't care by
auditing or rotating the value in "State".

The only problem that I can see in the current XCom implementation is 1)
the use of execution_date and the fact that 2) XCom are cleared at the
start.

One of the issue we already want to address in Airflow is to remove the
hard-requirement of "execution_date" for DagRun and TaskInstance. This
would also help in fixing (1) above.

(2) can be solved by a flag as mentioned in the AIP.

Regards,
Kaxil


On Tue, Jun 2, 2020 at 8:05 AM Jarek Potiuk <Ja...@polidea.com>
wrote:

> I think this subject came so often, that I also change my mind slowly in
> favor of making an explicit state persistence "service".
>
> Whether it's only one key or more, it's secondary, I think but if users
> are already using Variables to keep state for tasks - this is a clear sign
> that we miss a crucial feature and our users are abusing Airflow already in
> the way we try to prevent by not introducing "State service".
>
> With the recent SecretBackend implementation where Variables might be kept
> in a Secret backend - not only MetaStore - potentially you might have no
> write access to the backend. There is even no "write" support in the
> current "MetastoreBackend" implementation for writing variables. So we
> already have configurations where if we try to write variables and read it
> elsewhere might not work - as far as I can see. You can set several
> backends of course and the Metastore as the last fallback of course, but
> for me, it opens up different problems - what happens if the key is present
> in both, tasks writes it to metastore, but another task reads it from the
> Secret Backend.
>
> I think it seems that variables are being abused in exactly the way we
> want to prevent the "StateService" to be abused - and shying away from that
> is really like closing our eyes and pretending it's not happening.
>
> So maybe we can make a change AIP with this approach:
>
> 1) Variables -> mostly read-only (for tasks)  and used to keep
> configuration shared between workers (but not on a task level).
> 2) StateService (or wherever we call it) where we keep state information
> for specific dag + task + execution_date.
>
> J.
>
>
> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dp...@gmail.com>
> wrote:
>
>> Airflow already provides a mechanism for state persistence: the Variable,
>> and, with caveats and flaws, XCom.
>>
>> I personally persist state to the airflow metastore database for a large
>> percentage of our jobs.  They are incremental jobs and it is helpful to
>> keep track of watermark.
>>
>> I think that incremental jobs are probably very very common in airflow
>> implementations.  Though probably often times users resort to imperfect
>> vehicles for this such as `execution_date` or xcom.
>>
>> I have a very draftey draft aip that i haven't had enough time to work
>> on, which explores adding explicit support for state persistence to
>> airflow:
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>> Though I understand it is a controversial idea.  (note: The AIP is not
>> ready for primetime.)
>>
>> I am of the mind that being able to persist some state is not a
>> fundamental change to airflow and would just add explicit (and more
>> user-friendly) support for something that is already quite common, and fits
>> fully within the wheelhouse of what airflow exists to do.
>>
>>
>>
>>
>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com> wrote:
>>
>>> Furcy,
>>>
>>> To clarify, when I say that Airflow should not be in the business of
>>> keeping state about external systems, I specifically mean it shouldn't be
>>> keeping state to be shared between task instances. I completely understand
>>> that there may be external systems that are harder to work with, and like
>>> in your case require the operator to be able to store some piece of
>>> information to make them idempotent. I just don't think that Airflow should
>>> provide that storage mechanism.
>>>
>>> I would think that most users of Airflow have access to some sort of
>>> cloud storage like S3 (which are really just key-value stores), and it's
>>> easy enough to write your job_id or whatever value you care about to a file
>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>> combination of them you care about. Yes it makes your operators more
>>> complex and they have to know about another system, but it keeps that
>>> complexity out of core Airflow. That's the trade off.
>>>
>>> Ash,
>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>> there are use cases where it makes some things convenient. In your example
>>> though, it would be just as easy for the sensor to write the found object
>>> path as the contents of another file in S3, with a computable prefix based
>>> on the dag/task/execution_date.
>>>
>>>
>>> At its heart XCom is just a key-value store where the keys are limited
>>> to a very specific set of possibilities, and where key-value pairs are
>>> managed in some specific ways. The request here is to add another narrowly
>>> defined set of allowable keys, and as far as I can tell with no extra
>>> management of them. The only real advantage of using the Airflow database
>>> for XCom or any expansion/variation on it is that we know that all
>>> operators have access to the database.
>>>
>>> I'm not an expert but I would wonder how well Postgres or MySQL perform
>>> as high volume key value stores. Does anyone actually use XCom at scale,
>>> and does that extra load on the database impact scheduling and other
>>> performance aspects of Airflow?
>>>
>>> Chris
>>>
>>>
>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org> wrote:
>>>
>>>> Just to touch on one point about XCom, and to re-assure people that
>>>> they, or something very like them are in Airflow for the foreseeable future.
>>>>
>>>> As an example of an appropriate use for XCom: Let's say a third party
>>>> delivers you a set of files once a week, but the exact name of the files
>>>> isn't known (by you) in advance. So you write a sensor that polls/checks S3
>>>> for the Objects to appear in our bucket, and the sensor outputs the S3
>>>> Object path to XCom, that then next processing step then examines to
>>>> process the files.
>>>>
>>>> That sort of use case is not going anywhere.
>>>>
>>>> Cheers,
>>>> -ash
>>>>
>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>>>
>>>> At the risk of repeating myself (from the previous thread that touched
>>>> on this topic), I don't think Airflow should be in the business of keeping
>>>> state about external systems. Airflow is about authoring and running
>>>> workflows; it's not a messaging tool or a cluster management tool. I'm not
>>>> convinced that the existing XCom functionality should really be a part of
>>>> Airflow, and I certainly don't think it should be expanded upon or new
>>>> variations added. I think storing state is especially risky, if for no
>>>> other reason than the fact that Airflow is not the source of truth about
>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>> has saved will diverge from the actual state of the external system.
>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>> operator code complexity. Users would be much better served going to the
>>>> source of truth to determine state. If part of the problem is that Livy is
>>>> lacking in features (like being able to query the status of a particular
>>>> job_id) then I think it would be more appropriate to add the needed
>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>> making up for failures of other tools.
>>>>
>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>> extra features from expanding in scope. Jarek proposed something that would
>>>> just store a single string, and immediately Furcy wants to expand it to
>>>> store multiple strings. Either way we are really just talking about a
>>>> key-value store, and putting limits on how that key can be structured; the
>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>> proposal).
>>>>
>>>> I know in the past that I had a situation where I wanted to reuse a
>>>> cluster across multiple data intervals, if one was already running (this
>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>> can equally see use cases where I might want to share some resource for
>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>>> added this then why limit it to any one of those combinations? But then we
>>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>>> then you can use Variables, if you want to use something else then you can.
>>>>
>>>> Unless Airflow is doing some extra management of these key-values in
>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>> absolutely no added benefit. And even with some potential management by
>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>
>>>> Chris
>>>>
>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>
>>>> Thank you Jarek for the detailed explanation,
>>>>
>>>> That's exactly what I wanted to do: write a feature request to
>>>> summarize all those discussions.
>>>> I agree with you that the feature should be marked distinct from the
>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>
>>>> The crux of the problem, I think is that with XCom you do want the task
>>>> to delete it's xcom on the beginning of the retry.
>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>> having a task A and a task B that starts immediately after A, and wait from
>>>> some 'signal' from A.
>>>> If A and B restart and A doesn't reset it's signal, then B will use the
>>>> signal from A's first try, which is incorrect.
>>>>
>>>> About the 3 solutions you mention:
>>>>
>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>>> you can't specify a custom one.
>>>> You can't even find a job by name, I would have to list all the active
>>>> job_ids, and do a GET for each of them to get it's name and find which one
>>>> is the one I want. It's doable but inelegant.
>>>>
>>>> 2) Store the id in an external storage. Of course it would work but it
>>>> requires an external storage. More on that below.
>>>>
>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>> think you mean that the idempotency can be handled by the service you call
>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>> could have something to prevent a job that run twice from creating
>>>> duplicates. This is another solution we are considering, but it is
>>>> coslty to change now.
>>>>
>>>> You guess correctly that the feature I was asking for me would be to
>>>> provide some utility to let the users implement solution 2) without
>>>> requiring an external storage.
>>>> I think it would be a QOL improvement for some use cases, just like it
>>>> could be argued that XCom is just a QOL improvement and users could have
>>>> used an external storage themselves.
>>>> The main advantage that it brings is making the custom operators much
>>>> easier to share and reuse across the Apache Airflow community, compared to
>>>> having to set up some external
>>>> storage.
>>>>
>>>> I have seen that some users used the metadata store itself as an
>>>> external storage by adding a new table to the airflow model:
>>>>
>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>>>
>>>> And others suggested using XCom itself as an external storage by
>>>> storing information with a special task_id:
>>>> https://stackoverflow.com/a/57515143/2087478
>>>>
>>>> In the discussion thread you provided it was also suggested to use
>>>> Variables to store some persisting information.
>>>>
>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>> providing such functionality would be good.
>>>>
>>>> Finally, I don't see the point of limiting the functionality to such
>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>>> string
>>>> will just force people who need to store more than one id for one task
>>>> (for whatever reason) to use some hack again, like storing a json inside
>>>> the storage.
>>>>
>>>> I was more thinking about something quite similar to XCom (I liked the
>>>> XState name suggestion), where the entry would be keyed by "(dag_id,
>>>> task_id, execution_date, key)"
>>>> where "key" can be whatever you want and would be kept across retries.
>>>>
>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>> I do understand the important of idempotency, and it looks like my use
>>>> case is one of the first ever listed where I do need to persist a state
>>>> across retries to make my operator really idempotent.
>>>>
>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>> another solution).
>>>>
>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>> mechanism to prevent double-writes,
>>>> but I think that as the above hacks show, you can't really prevent
>>>> users from shooting themselves in the foot if that's what they really want
>>>> to.
>>>>
>>>> While I do think that making things foolproof is important, I believe
>>>> it's also in Python's philosophy to *not* make things foolproof at the
>>>> detriment of simplicity for the right use cases.
>>>> But I do understand that the use cases are different and contradictory:
>>>> some would require the state to be persisted across reschedule and not
>>>> retries, mine would require the state to be persisted across retries and
>>>> not reschedule.
>>>>
>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>> the submit and an xcom with the job, then one task that check the progress
>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>> just for that. Plus I'm not sure we could make the first task retry if the
>>>> second task fails...
>>>>
>>>> Thanks again,
>>>>
>>>> Furcy
>>>>
>>>>
>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>>>> wrote:
>>>>
>>>> I think we've discussed several approaches like that and using Xcom
>>>> name (which for many people would mean "let's just extend XCom table for
>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>> different functionality/logic which we might or might not agree to
>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>> table behavior might be problematic.
>>>>
>>>> Not sure if you are aware but we had very similar discussion about it
>>>> recently (without clear conclusions but at least you can see what kind of
>>>> issues/problems different people have with this approach)
>>>>
>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>
>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>> - intra-task communication mechanism, indeed. But it can very easily lead
>>>> to people abusing it and bypassing the guarantees (idempotency mainly) that
>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>> after the latest discussion kind of died out, and I have one possible
>>>> solution to the problem.
>>>>
>>>> Let me explain what I think about it (but others can have different
>>>> opinions of course):
>>>>
>>>> So far the discussion was that there are several ways to achieve what
>>>> you want (and it's really about what entity is providing the "idempotency"
>>>> guarantee:
>>>>
>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>>> job_id from outside. You'd need to work out the job_id naming that works in
>>>> your case and make sure that when you re-run your task with the same
>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>
>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>> where it can be queried in the way that will work for you). Then the
>>>> idempotency is actually handled by the logic in your Operator + some
>>>> external storage.
>>>>
>>>> 3) Query your service and retrieve the JOB ID from it - but you have to
>>>> have a way to query for the job related to your "dag id  + task
>>>> + execution_date". Then - the idempotency is actually handling by the
>>>> Service you are using.
>>>>
>>>> In the use case, you describe - this is the only thing you need -
>>>> "idempotency source". I believe you would like to get the case 2) from
>>>> above but without having to use external storage to store the "unique id".
>>>> Something that will let each task in the same dag run to set or retrieve a
>>>> unique value for that particular task. One value should be enough -
>>>> assuming that each operator/task works on one external data "source".
>>>>
>>>> My current thinking is:
>>>>
>>>> Why don't we provide such a dedicated, idempotency service inside
>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>> class with two methods:
>>>>
>>>>   * .set(id: str) and
>>>>   * .get() -> str
>>>>
>>>> And the data stored there should be a string keyed by "dag_id, task_id,
>>>> execution_date)" - available also via Jinja templating. There is no
>>>> intra-task communication, here, very little possibility of abuse and it
>>>> seems to solve the major pain point where you have to provide your own
>>>> storage to get the idempotency if your service does not provide one or you
>>>> do not want to delegate it to the DAG writer.
>>>>
>>>> J.
>>>>
>>>>
>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>>>
>>>> The use case I'm referring to is that you can't use xcom to let a task
>>>> read information from it's past attempts, because when a task starts it's
>>>> xcom is automatically deleted.
>>>>
>>>> My specific use case is that we have a custom LivyOperator that calls
>>>> Livy to start batch Spark Jobs.
>>>> When you start a batch job Livy returns a job_id
>>>> Sometimes our operator can fail for one reason or another (for instance
>>>> if Livy is unreachable for a while)
>>>> When the task retries, it calls Livy again, which start the same spark
>>>> job, but the problem is that the spark job from the first attempt can still
>>>> be running,
>>>> and then we have a batch job that runs twice simultaneously and creates
>>>> duplicates in the output.
>>>>
>>>> What we tried to do is getting the job_id from the first try, to check
>>>> if the job is still running, and wait for it to complete if it is.
>>>>
>>>> We tried using xcom to let the task send a message to itself (to it's
>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>> doesn't work and is not intended to work.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>>>
>>>> Hi Furcy,
>>>>
>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>> Depending your use case this may already be possible.
>>>>
>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I would like to open a feature request for Airflow to support
>>>> "intra-task xcom".
>>>>
>>>> It seems that there are several distinct use cases for it already
>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>
>>>> I wanted to summarize links to the use cases and past attempts,
>>>> and the recommended approach (which apparently would be to create
>>>> a distinct feature from xcom to support this, it could be calle
>>>> intra-com or self-com ?)
>>>>
>>>> Do you know if such ticket already exists? I couldn't find one.
>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>> email).
>>>>
>>>> Thanks,
>>>>
>>>> Furcy
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Jarek Potiuk
>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>
>>>> M: +48 660 796 129 <+48660796129>
>>>> [image: Polidea] <https://www.polidea.com/>
>>>>
>>>>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>

Re: Intra-task "xcom"

Posted by Jarek Potiuk <Ja...@polidea.com>.
I think this subject came so often, that I also change my mind slowly in
favor of making an explicit state persistence "service".

Whether it's only one key or more, it's secondary, I think but if users are
already using Variables to keep state for tasks - this is a clear sign that
we miss a crucial feature and our users are abusing Airflow already in the
way we try to prevent by not introducing "State service".

With the recent SecretBackend implementation where Variables might be kept
in a Secret backend - not only MetaStore - potentially you might have no
write access to the backend. There is even no "write" support in the
current "MetastoreBackend" implementation for writing variables. So we
already have configurations where if we try to write variables and read it
elsewhere might not work - as far as I can see. You can set several
backends of course and the Metastore as the last fallback of course, but
for me, it opens up different problems - what happens if the key is present
in both, tasks writes it to metastore, but another task reads it from the
Secret Backend.

I think it seems that variables are being abused in exactly the way we want
to prevent the "StateService" to be abused - and shying away from that is
really like closing our eyes and pretending it's not happening.

So maybe we can make a change AIP with this approach:

1) Variables -> mostly read-only (for tasks)  and used to keep
configuration shared between workers (but not on a task level).
2) StateService (or wherever we call it) where we keep state information
for specific dag + task + execution_date.

J.


On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dp...@gmail.com>
wrote:

> Airflow already provides a mechanism for state persistence: the Variable,
> and, with caveats and flaws, XCom.
>
> I personally persist state to the airflow metastore database for a large
> percentage of our jobs.  They are incremental jobs and it is helpful to
> keep track of watermark.
>
> I think that incremental jobs are probably very very common in airflow
> implementations.  Though probably often times users resort to imperfect
> vehicles for this such as `execution_date` or xcom.
>
> I have a very draftey draft aip that i haven't had enough time to work on,
> which explores adding explicit support for state persistence to airflow:
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
> Though I understand it is a controversial idea.  (note: The AIP is not
> ready for primetime.)
>
> I am of the mind that being able to persist some state is not a
> fundamental change to airflow and would just add explicit (and more
> user-friendly) support for something that is already quite common, and fits
> fully within the wheelhouse of what airflow exists to do.
>
>
>
>
> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com> wrote:
>
>> Furcy,
>>
>> To clarify, when I say that Airflow should not be in the business of
>> keeping state about external systems, I specifically mean it shouldn't be
>> keeping state to be shared between task instances. I completely understand
>> that there may be external systems that are harder to work with, and like
>> in your case require the operator to be able to store some piece of
>> information to make them idempotent. I just don't think that Airflow should
>> provide that storage mechanism.
>>
>> I would think that most users of Airflow have access to some sort of
>> cloud storage like S3 (which are really just key-value stores), and it's
>> easy enough to write your job_id or whatever value you care about to a file
>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>> combination of them you care about. Yes it makes your operators more
>> complex and they have to know about another system, but it keeps that
>> complexity out of core Airflow. That's the trade off.
>>
>> Ash,
>> I'm not suggesting that XCom be removed from Airflow, and I understand
>> there are use cases where it makes some things convenient. In your example
>> though, it would be just as easy for the sensor to write the found object
>> path as the contents of another file in S3, with a computable prefix based
>> on the dag/task/execution_date.
>>
>>
>> At its heart XCom is just a key-value store where the keys are limited to
>> a very specific set of possibilities, and where key-value pairs are managed
>> in some specific ways. The request here is to add another narrowly defined
>> set of allowable keys, and as far as I can tell with no extra management of
>> them. The only real advantage of using the Airflow database for XCom or any
>> expansion/variation on it is that we know that all operators have access to
>> the database.
>>
>> I'm not an expert but I would wonder how well Postgres or MySQL perform
>> as high volume key value stores. Does anyone actually use XCom at scale,
>> and does that extra load on the database impact scheduling and other
>> performance aspects of Airflow?
>>
>> Chris
>>
>>
>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org> wrote:
>>
>>> Just to touch on one point about XCom, and to re-assure people that
>>> they, or something very like them are in Airflow for the foreseeable future.
>>>
>>> As an example of an appropriate use for XCom: Let's say a third party
>>> delivers you a set of files once a week, but the exact name of the files
>>> isn't known (by you) in advance. So you write a sensor that polls/checks S3
>>> for the Objects to appear in our bucket, and the sensor outputs the S3
>>> Object path to XCom, that then next processing step then examines to
>>> process the files.
>>>
>>> That sort of use case is not going anywhere.
>>>
>>> Cheers,
>>> -ash
>>>
>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>>
>>> At the risk of repeating myself (from the previous thread that touched
>>> on this topic), I don't think Airflow should be in the business of keeping
>>> state about external systems. Airflow is about authoring and running
>>> workflows; it's not a messaging tool or a cluster management tool. I'm not
>>> convinced that the existing XCom functionality should really be a part of
>>> Airflow, and I certainly don't think it should be expanded upon or new
>>> variations added. I think storing state is especially risky, if for no
>>> other reason than the fact that Airflow is not the source of truth about
>>> those systems. It's very likely that at some times the "state" that Airflow
>>> has saved will diverge from the actual state of the external system.
>>> Handling that nicely, probably requires a bunch of custom code in the
>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>> operator code complexity. Users would be much better served going to the
>>> source of truth to determine state. If part of the problem is that Livy is
>>> lacking in features (like being able to query the status of a particular
>>> job_id) then I think it would be more appropriate to add the needed
>>> features to that project. Airflow at its core shouldn't be concerned with
>>> making up for failures of other tools.
>>>
>>> Also as can be seen by just this discussion, it's hard to keep these
>>> extra features from expanding in scope. Jarek proposed something that would
>>> just store a single string, and immediately Furcy wants to expand it to
>>> store multiple strings. Either way we are really just talking about a
>>> key-value store, and putting limits on how that key can be structured; the
>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>> proposal).
>>>
>>> I know in the past that I had a situation where I wanted to reuse a
>>> cluster across multiple data intervals, if one was already running (this
>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>> can equally see use cases where I might want to share some resource for
>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>> added this then why limit it to any one of those combinations? But then we
>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>> then you can use Variables, if you want to use something else then you can.
>>>
>>> Unless Airflow is doing some extra management of these key-values in
>>> some way (like it does with clearing out XCom's on reruns), then I see
>>> absolutely no added benefit. And even with some potential management by
>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>
>>> Chris
>>>
>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>>>
>>> Thank you Jarek for the detailed explanation,
>>>
>>> That's exactly what I wanted to do: write a feature request to summarize
>>> all those discussions.
>>> I agree with you that the feature should be marked distinct from the
>>> XCom feature and that we should not piggyback this feature into XCom.
>>>
>>> The crux of the problem, I think is that with XCom you do want the task
>>> to delete it's xcom on the beginning of the retry.
>>> Correct me if I'm wrong but one use cases where it was necessary was
>>> having a task A and a task B that starts immediately after A, and wait from
>>> some 'signal' from A.
>>> If A and B restart and A doesn't reset it's signal, then B will use the
>>> signal from A's first try, which is incorrect.
>>>
>>> About the 3 solutions you mention:
>>>
>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>> you can't specify a custom one.
>>> You can't even find a job by name, I would have to list all the active
>>> job_ids, and do a GET for each of them to get it's name and find which one
>>> is the one I want. It's doable but inelegant.
>>>
>>> 2) Store the id in an external storage. Of course it would work but it
>>> requires an external storage. More on that below.
>>>
>>> 3) I'm not sure I understand completely what you mean there, but I think
>>> you mean that the idempotency can be handled by the service you call (for
>>> instance BigQuery). Indeed that is another solution. If we were using Spark
>>> with a Hive metastore + locking or the deltalake storage format, we could
>>> have something to prevent a job that run twice from creating duplicates.
>>> This is another solution we are considering, but it is coslty to change now.
>>>
>>> You guess correctly that the feature I was asking for me would be to
>>> provide some utility to let the users implement solution 2) without
>>> requiring an external storage.
>>> I think it would be a QOL improvement for some use cases, just like it
>>> could be argued that XCom is just a QOL improvement and users could have
>>> used an external storage themselves.
>>> The main advantage that it brings is making the custom operators much
>>> easier to share and reuse across the Apache Airflow community, compared to
>>> having to set up some external
>>> storage.
>>>
>>> I have seen that some users used the metadata store itself as an
>>> external storage by adding a new table to the airflow model:
>>>
>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>>
>>> And others suggested using XCom itself as an external storage by storing
>>> information with a special task_id:
>>> https://stackoverflow.com/a/57515143/2087478
>>>
>>> In the discussion thread you provided it was also suggested to use
>>> Variables to store some persisting information.
>>>
>>> These 3 approaches work but feel quite "hacky" and I believe that
>>> providing such functionality would be good.
>>>
>>> Finally, I don't see the point of limiting the functionality to such
>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>> string
>>> will just force people who need to store more than one id for one task
>>> (for whatever reason) to use some hack again, like storing a json inside
>>> the storage.
>>>
>>> I was more thinking about something quite similar to XCom (I liked the
>>> XState name suggestion), where the entry would be keyed by "(dag_id,
>>> task_id, execution_date, key)"
>>> where "key" can be whatever you want and would be kept across retries.
>>>
>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>> Indeed it looks like there would be many ways to misuse such feature.
>>> I do understand the important of idempotency, and it looks like my use
>>> case is one of the first ever listed where I do need to persist a state
>>> across retries to make my operator really idempotent.
>>>
>>> I'm surprised no one came up with it given how frequent the Spark +
>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>> another solution).
>>>
>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>> mechanism to prevent double-writes,
>>> but I think that as the above hacks show, you can't really prevent users
>>> from shooting themselves in the foot if that's what they really want to.
>>>
>>> While I do think that making things foolproof is important, I believe
>>> it's also in Python's philosophy to *not* make things foolproof at the
>>> detriment of simplicity for the right use cases.
>>> But I do understand that the use cases are different and contradictory:
>>> some would require the state to be persisted across reschedule and not
>>> retries, mine would require the state to be persisted across retries and
>>> not reschedule.
>>>
>>> Maybe the Airflow-y way for that would be to have one task that does the
>>> submit and an xcom with the job, then one task that check the progress of
>>> the job, but that feels very cumbersome to double the number of tasks just
>>> for that. Plus I'm not sure we could make the first task retry if the
>>> second task fails...
>>>
>>> Thanks again,
>>>
>>> Furcy
>>>
>>>
>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>>> wrote:
>>>
>>> I think we've discussed several approaches like that and using Xcom
>>> name (which for many people would mean "let's just extend XCom table for
>>> that" is not a very good idea to use it IMHO. I think this is very
>>> different functionality/logic which we might or might not agree to
>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>> table behavior might be problematic.
>>>
>>> Not sure if you are aware but we had very similar discussion about it
>>> recently (without clear conclusions but at least you can see what kind of
>>> issues/problems different people have with this approach)
>>>
>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>
>>> I am not saying it is impossible to do, but I think it's a matter of how
>>> we formulate the "use case". It's very tempting to implement a generic -
>>> intra-task communication mechanism, indeed. But it can very easily lead to
>>> people abusing it and bypassing the guarantees (idempotency mainly) that
>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>> after the latest discussion kind of died out, and I have one possible
>>> solution to the problem.
>>>
>>> Let me explain what I think about it (but others can have different
>>> opinions of course):
>>>
>>> So far the discussion was that there are several ways to achieve what
>>> you want (and it's really about what entity is providing the "idempotency"
>>> guarantee:
>>>
>>> 1) Similarly as just merged in the BigQuery Insert Job
>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>> job_id from outside. You'd need to work out the job_id naming that works in
>>> your case and make sure that when you re-run your task with the same
>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>
>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>> where it can be queried in the way that will work for you). Then the
>>> idempotency is actually handled by the logic in your Operator + some
>>> external storage.
>>>
>>> 3) Query your service and retrieve the JOB ID from it - but you have to
>>> have a way to query for the job related to your "dag id  + task
>>> + execution_date". Then - the idempotency is actually handling by the
>>> Service you are using.
>>>
>>> In the use case, you describe - this is the only thing you need -
>>> "idempotency source". I believe you would like to get the case 2) from
>>> above but without having to use external storage to store the "unique id".
>>> Something that will let each task in the same dag run to set or retrieve a
>>> unique value for that particular task. One value should be enough -
>>> assuming that each operator/task works on one external data "source".
>>>
>>> My current thinking is:
>>>
>>> Why don't we provide such a dedicated, idempotency service inside
>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>> class with two methods:
>>>
>>>   * .set(id: str) and
>>>   * .get() -> str
>>>
>>> And the data stored there should be a string keyed by "dag_id, task_id,
>>> execution_date)" - available also via Jinja templating. There is no
>>> intra-task communication, here, very little possibility of abuse and it
>>> seems to solve the major pain point where you have to provide your own
>>> storage to get the idempotency if your service does not provide one or you
>>> do not want to delegate it to the DAG writer.
>>>
>>> J.
>>>
>>>
>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>>
>>> The use case I'm referring to is that you can't use xcom to let a task
>>> read information from it's past attempts, because when a task starts it's
>>> xcom is automatically deleted.
>>>
>>> My specific use case is that we have a custom LivyOperator that calls
>>> Livy to start batch Spark Jobs.
>>> When you start a batch job Livy returns a job_id
>>> Sometimes our operator can fail for one reason or another (for instance
>>> if Livy is unreachable for a while)
>>> When the task retries, it calls Livy again, which start the same spark
>>> job, but the problem is that the spark job from the first attempt can still
>>> be running,
>>> and then we have a batch job that runs twice simultaneously and creates
>>> duplicates in the output.
>>>
>>> What we tried to do is getting the job_id from the first try, to check
>>> if the job is still running, and wait for it to complete if it is.
>>>
>>> We tried using xcom to let the task send a message to itself (to it's
>>> next try) but xcom is meant for "inter-task communication" only so this
>>> doesn't work and is not intended to work.
>>>
>>>
>>>
>>>
>>>
>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>>
>>> Hi Furcy,
>>>
>>> Can you give a concrete example of what you mean by intra-task xcom?
>>> Depending your use case this may already be possible.
>>>
>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> I would like to open a feature request for Airflow to support
>>> "intra-task xcom".
>>>
>>> It seems that there are several distinct use cases for it already
>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>
>>> I wanted to summarize links to the use cases and past attempts,
>>> and the recommended approach (which apparently would be to create
>>> a distinct feature from xcom to support this, it could be calle
>>> intra-com or self-com ?)
>>>
>>> Do you know if such ticket already exists? I couldn't find one.
>>> Also I can't create any ticket due to some obscure bug (see my other
>>> email).
>>>
>>> Thanks,
>>>
>>> Furcy
>>>
>>>
>>>
>>> --
>>>
>>> Jarek Potiuk
>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>
>>> M: +48 660 796 129 <+48660796129>
>>> [image: Polidea] <https://www.polidea.com/>
>>>
>>>

-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Re: Intra-task "xcom"

Posted by Daniel Standish <dp...@gmail.com>.
Airflow already provides a mechanism for state persistence: the Variable,
and, with caveats and flaws, XCom.

I personally persist state to the airflow metastore database for a large
percentage of our jobs.  They are incremental jobs and it is helpful to
keep track of watermark.

I think that incremental jobs are probably very very common in airflow
implementations.  Though probably often times users resort to imperfect
vehicles for this such as `execution_date` or xcom.

I have a very draftey draft aip that i haven't had enough time to work on,
which explores adding explicit support for state persistence to airflow:
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
Though I understand it is a controversial idea.  (note: The AIP is not
ready for primetime.)

I am of the mind that being able to persist some state is not a fundamental
change to airflow and would just add explicit (and more user-friendly)
support for something that is already quite common, and fits fully within
the wheelhouse of what airflow exists to do.




On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <ch...@crpalmer.com> wrote:

> Furcy,
>
> To clarify, when I say that Airflow should not be in the business of
> keeping state about external systems, I specifically mean it shouldn't be
> keeping state to be shared between task instances. I completely understand
> that there may be external systems that are harder to work with, and like
> in your case require the operator to be able to store some piece of
> information to make them idempotent. I just don't think that Airflow should
> provide that storage mechanism.
>
> I would think that most users of Airflow have access to some sort of cloud
> storage like S3 (which are really just key-value stores), and it's easy
> enough to write your job_id or whatever value you care about to a file with
> a prefix computed from the dag_id, task_id, execution_date or whatever
> combination of them you care about. Yes it makes your operators more
> complex and they have to know about another system, but it keeps that
> complexity out of core Airflow. That's the trade off.
>
> Ash,
> I'm not suggesting that XCom be removed from Airflow, and I understand
> there are use cases where it makes some things convenient. In your example
> though, it would be just as easy for the sensor to write the found object
> path as the contents of another file in S3, with a computable prefix based
> on the dag/task/execution_date.
>
>
> At its heart XCom is just a key-value store where the keys are limited to
> a very specific set of possibilities, and where key-value pairs are managed
> in some specific ways. The request here is to add another narrowly defined
> set of allowable keys, and as far as I can tell with no extra management of
> them. The only real advantage of using the Airflow database for XCom or any
> expansion/variation on it is that we know that all operators have access to
> the database.
>
> I'm not an expert but I would wonder how well Postgres or MySQL perform as
> high volume key value stores. Does anyone actually use XCom at scale, and
> does that extra load on the database impact scheduling and other
> performance aspects of Airflow?
>
> Chris
>
>
> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org> wrote:
>
>> Just to touch on one point about XCom, and to re-assure people that they,
>> or something very like them are in Airflow for the foreseeable future.
>>
>> As an example of an appropriate use for XCom: Let's say a third party
>> delivers you a set of files once a week, but the exact name of the files
>> isn't known (by you) in advance. So you write a sensor that polls/checks S3
>> for the Objects to appear in our bucket, and the sensor outputs the S3
>> Object path to XCom, that then next processing step then examines to
>> process the files.
>>
>> That sort of use case is not going anywhere.
>>
>> Cheers,
>> -ash
>>
>> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>>
>> At the risk of repeating myself (from the previous thread that touched on
>> this topic), I don't think Airflow should be in the business of keeping
>> state about external systems. Airflow is about authoring and running
>> workflows; it's not a messaging tool or a cluster management tool. I'm not
>> convinced that the existing XCom functionality should really be a part of
>> Airflow, and I certainly don't think it should be expanded upon or new
>> variations added. I think storing state is especially risky, if for no
>> other reason than the fact that Airflow is not the source of truth about
>> those systems. It's very likely that at some times the "state" that Airflow
>> has saved will diverge from the actual state of the external system.
>> Handling that nicely, probably requires a bunch of custom code in the
>> operators/hooks anyway, so I don't think it saves anything in terms of
>> operator code complexity. Users would be much better served going to the
>> source of truth to determine state. If part of the problem is that Livy is
>> lacking in features (like being able to query the status of a particular
>> job_id) then I think it would be more appropriate to add the needed
>> features to that project. Airflow at its core shouldn't be concerned with
>> making up for failures of other tools.
>>
>> Also as can be seen by just this discussion, it's hard to keep these
>> extra features from expanding in scope. Jarek proposed something that would
>> just store a single string, and immediately Furcy wants to expand it to
>> store multiple strings. Either way we are really just talking about a
>> key-value store, and putting limits on how that key can be structured; the
>> key is made up of some predefined set of Airflow entities (for Jarek's
>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>> proposal).
>>
>> I know in the past that I had a situation where I wanted to reuse a
>> cluster across multiple data intervals, if one was already running (this
>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>> can equally see use cases where I might want to share some resource for
>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>> added this then why limit it to any one of those combinations? But then we
>> just have an arbitrary key-value store. If you want to use Airflow for that
>> then you can use Variables, if you want to use something else then you can.
>>
>> Unless Airflow is doing some extra management of these key-values in some
>> way (like it does with clearing out XCom's on reruns), then I see
>> absolutely no added benefit. And even with some potential management by
>> Airflow I'm still not convinced that Airflow is the right place for it.
>>
>> Chris
>>
>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>>
>> Thank you Jarek for the detailed explanation,
>>
>> That's exactly what I wanted to do: write a feature request to summarize
>> all those discussions.
>> I agree with you that the feature should be marked distinct from the XCom
>> feature and that we should not piggyback this feature into XCom.
>>
>> The crux of the problem, I think is that with XCom you do want the task
>> to delete it's xcom on the beginning of the retry.
>> Correct me if I'm wrong but one use cases where it was necessary was
>> having a task A and a task B that starts immediately after A, and wait from
>> some 'signal' from A.
>> If A and B restart and A doesn't reset it's signal, then B will use the
>> signal from A's first try, which is incorrect.
>>
>> About the 3 solutions you mention:
>>
>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>> use-case Livy's API is poorly designed and only returns a generated job_id,
>> you can't specify a custom one.
>> You can't even find a job by name, I would have to list all the active
>> job_ids, and do a GET for each of them to get it's name and find which one
>> is the one I want. It's doable but inelegant.
>>
>> 2) Store the id in an external storage. Of course it would work but it
>> requires an external storage. More on that below.
>>
>> 3) I'm not sure I understand completely what you mean there, but I think
>> you mean that the idempotency can be handled by the service you call (for
>> instance BigQuery). Indeed that is another solution. If we were using Spark
>> with a Hive metastore + locking or the deltalake storage format, we could
>> have something to prevent a job that run twice from creating duplicates.
>> This is another solution we are considering, but it is coslty to change now.
>>
>> You guess correctly that the feature I was asking for me would be to
>> provide some utility to let the users implement solution 2) without
>> requiring an external storage.
>> I think it would be a QOL improvement for some use cases, just like it
>> could be argued that XCom is just a QOL improvement and users could have
>> used an external storage themselves.
>> The main advantage that it brings is making the custom operators much
>> easier to share and reuse across the Apache Airflow community, compared to
>> having to set up some external
>> storage.
>>
>> I have seen that some users used the metadata store itself as an external
>> storage by adding a new table to the airflow model:
>>
>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>
>> And others suggested using XCom itself as an external storage by storing
>> information with a special task_id:
>> https://stackoverflow.com/a/57515143/2087478
>>
>> In the discussion thread you provided it was also suggested to use
>> Variables to store some persisting information.
>>
>> These 3 approaches work but feel quite "hacky" and I believe that
>> providing such functionality would be good.
>>
>> Finally, I don't see the point of limiting the functionality to such
>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>> string
>> will just force people who need to store more than one id for one task
>> (for whatever reason) to use some hack again, like storing a json inside
>> the storage.
>>
>> I was more thinking about something quite similar to XCom (I liked the
>> XState name suggestion), where the entry would be keyed by "(dag_id,
>> task_id, execution_date, key)"
>> where "key" can be whatever you want and would be kept across retries.
>>
>> I have read (quickly) through the "Pandora's Box" thread you linked.
>> Indeed it looks like there would be many ways to misuse such feature.
>> I do understand the important of idempotency, and it looks like my use
>> case is one of the first ever listed where I do need to persist a state
>> across retries to make my operator really idempotent.
>>
>> I'm surprised no one came up with it given how frequent the Spark +
>> Airflow combination is (well, the BigQueryOperator was one too but found
>> another solution).
>>
>> Of course we can blame it on Livy for being poorly conceived (unlike
>> BigQuery) or we can blame it on Spark for not having a built-in security
>> mechanism to prevent double-writes,
>> but I think that as the above hacks show, you can't really prevent users
>> from shooting themselves in the foot if that's what they really want to.
>>
>> While I do think that making things foolproof is important, I believe
>> it's also in Python's philosophy to *not* make things foolproof at the
>> detriment of simplicity for the right use cases.
>> But I do understand that the use cases are different and contradictory:
>> some would require the state to be persisted across reschedule and not
>> retries, mine would require the state to be persisted across retries and
>> not reschedule.
>>
>> Maybe the Airflow-y way for that would be to have one task that does the
>> submit and an xcom with the job, then one task that check the progress of
>> the job, but that feels very cumbersome to double the number of tasks just
>> for that. Plus I'm not sure we could make the first task retry if the
>> second task fails...
>>
>> Thanks again,
>>
>> Furcy
>>
>>
>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>> wrote:
>>
>> I think we've discussed several approaches like that and using Xcom
>> name (which for many people would mean "let's just extend XCom table for
>> that" is not a very good idea to use it IMHO. I think this is very
>> different functionality/logic which we might or might not agree to
>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>> table behavior might be problematic.
>>
>> Not sure if you are aware but we had very similar discussion about it
>> recently (without clear conclusions but at least you can see what kind of
>> issues/problems different people have with this approach)
>>
>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>
>> I am not saying it is impossible to do, but I think it's a matter of how
>> we formulate the "use case". It's very tempting to implement a generic -
>> intra-task communication mechanism, indeed. But it can very easily lead to
>> people abusing it and bypassing the guarantees (idempotency mainly) that
>> Airflow provides for backfilling and re-running tasks. I thought a bit
>> after the latest discussion kind of died out, and I have one possible
>> solution to the problem.
>>
>> Let me explain what I think about it (but others can have different
>> opinions of course):
>>
>> So far the discussion was that there are several ways to achieve what you
>> want (and it's really about what entity is providing the "idempotency"
>> guarantee:
>>
>> 1) Similarly as just merged in the BigQuery Insert Job
>> https://github.com/apache/airflow/pull/8868/files - you can provide
>> job_id from outside. You'd need to work out the job_id naming that works in
>> your case and make sure that when you re-run your task with the same
>> (dag_id, task_id, execution date) you will get the same id. Then the
>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>
>> 2) Store the DAG id in some external storage (via one of the hooks -
>> where it can be queried in the way that will work for you). Then the
>> idempotency is actually handled by the logic in your Operator + some
>> external storage.
>>
>> 3) Query your service and retrieve the JOB ID from it - but you have to
>> have a way to query for the job related to your "dag id  + task
>> + execution_date". Then - the idempotency is actually handling by the
>> Service you are using.
>>
>> In the use case, you describe - this is the only thing you need -
>> "idempotency source". I believe you would like to get the case 2) from
>> above but without having to use external storage to store the "unique id".
>> Something that will let each task in the same dag run to set or retrieve a
>> unique value for that particular task. One value should be enough -
>> assuming that each operator/task works on one external data "source".
>>
>> My current thinking is:
>>
>> Why don't we provide such a dedicated, idempotency service inside
>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>> class with two methods:
>>
>>   * .set(id: str) and
>>   * .get() -> str
>>
>> And the data stored there should be a string keyed by "dag_id, task_id,
>> execution_date)" - available also via Jinja templating. There is no
>> intra-task communication, here, very little possibility of abuse and it
>> seems to solve the major pain point where you have to provide your own
>> storage to get the idempotency if your service does not provide one or you
>> do not want to delegate it to the DAG writer.
>>
>> J.
>>
>>
>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>
>> The use case I'm referring to is that you can't use xcom to let a task
>> read information from it's past attempts, because when a task starts it's
>> xcom is automatically deleted.
>>
>> My specific use case is that we have a custom LivyOperator that calls
>> Livy to start batch Spark Jobs.
>> When you start a batch job Livy returns a job_id
>> Sometimes our operator can fail for one reason or another (for instance
>> if Livy is unreachable for a while)
>> When the task retries, it calls Livy again, which start the same spark
>> job, but the problem is that the spark job from the first attempt can still
>> be running,
>> and then we have a batch job that runs twice simultaneously and creates
>> duplicates in the output.
>>
>> What we tried to do is getting the job_id from the first try, to check if
>> the job is still running, and wait for it to complete if it is.
>>
>> We tried using xcom to let the task send a message to itself (to it's
>> next try) but xcom is meant for "inter-task communication" only so this
>> doesn't work and is not intended to work.
>>
>>
>>
>>
>>
>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>
>> Hi Furcy,
>>
>> Can you give a concrete example of what you mean by intra-task xcom?
>> Depending your use case this may already be possible.
>>
>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>
>> Hello,
>>
>> I would like to open a feature request for Airflow to support "intra-task
>> xcom".
>>
>> It seems that there are several distinct use cases for it already
>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>
>> I wanted to summarize links to the use cases and past attempts,
>> and the recommended approach (which apparently would be to create
>> a distinct feature from xcom to support this, it could be calle intra-com
>> or self-com ?)
>>
>> Do you know if such ticket already exists? I couldn't find one.
>> Also I can't create any ticket due to some obscure bug (see my other
>> email).
>>
>> Thanks,
>>
>> Furcy
>>
>>
>>
>> --
>>
>> Jarek Potiuk
>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>
>> M: +48 660 796 129 <+48660796129>
>> [image: Polidea] <https://www.polidea.com/>
>>
>>

Re: Intra-task "xcom"

Posted by Chris Palmer <ch...@crpalmer.com>.
Furcy,

To clarify, when I say that Airflow should not be in the business of
keeping state about external systems, I specifically mean it shouldn't be
keeping state to be shared between task instances. I completely understand
that there may be external systems that are harder to work with, and like
in your case require the operator to be able to store some piece of
information to make them idempotent. I just don't think that Airflow should
provide that storage mechanism.

I would think that most users of Airflow have access to some sort of cloud
storage like S3 (which are really just key-value stores), and it's easy
enough to write your job_id or whatever value you care about to a file with
a prefix computed from the dag_id, task_id, execution_date or whatever
combination of them you care about. Yes it makes your operators more
complex and they have to know about another system, but it keeps that
complexity out of core Airflow. That's the trade off.

Ash,
I'm not suggesting that XCom be removed from Airflow, and I understand
there are use cases where it makes some things convenient. In your example
though, it would be just as easy for the sensor to write the found object
path as the contents of another file in S3, with a computable prefix based
on the dag/task/execution_date.


At its heart XCom is just a key-value store where the keys are limited to a
very specific set of possibilities, and where key-value pairs are managed
in some specific ways. The request here is to add another narrowly defined
set of allowable keys, and as far as I can tell with no extra management of
them. The only real advantage of using the Airflow database for XCom or any
expansion/variation on it is that we know that all operators have access to
the database.

I'm not an expert but I would wonder how well Postgres or MySQL perform as
high volume key value stores. Does anyone actually use XCom at scale, and
does that extra load on the database impact scheduling and other
performance aspects of Airflow?

Chris


On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <as...@apache.org> wrote:

> Just to touch on one point about XCom, and to re-assure people that they,
> or something very like them are in Airflow for the foreseeable future.
>
> As an example of an appropriate use for XCom: Let's say a third party
> delivers you a set of files once a week, but the exact name of the files
> isn't known (by you) in advance. So you write a sensor that polls/checks S3
> for the Objects to appear in our bucket, and the sensor outputs the S3
> Object path to XCom, that then next processing step then examines to
> process the files.
>
> That sort of use case is not going anywhere.
>
> Cheers,
> -ash
>
> On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
>
> At the risk of repeating myself (from the previous thread that touched on
> this topic), I don't think Airflow should be in the business of keeping
> state about external systems. Airflow is about authoring and running
> workflows; it's not a messaging tool or a cluster management tool. I'm not
> convinced that the existing XCom functionality should really be a part of
> Airflow, and I certainly don't think it should be expanded upon or new
> variations added. I think storing state is especially risky, if for no
> other reason than the fact that Airflow is not the source of truth about
> those systems. It's very likely that at some times the "state" that Airflow
> has saved will diverge from the actual state of the external system.
> Handling that nicely, probably requires a bunch of custom code in the
> operators/hooks anyway, so I don't think it saves anything in terms of
> operator code complexity. Users would be much better served going to the
> source of truth to determine state. If part of the problem is that Livy is
> lacking in features (like being able to query the status of a particular
> job_id) then I think it would be more appropriate to add the needed
> features to that project. Airflow at its core shouldn't be concerned with
> making up for failures of other tools.
>
> Also as can be seen by just this discussion, it's hard to keep these extra
> features from expanding in scope. Jarek proposed something that would just
> store a single string, and immediately Furcy wants to expand it to store
> multiple strings. Either way we are really just talking about a key-value
> store, and putting limits on how that key can be structured; the key is
> made up of some predefined set of Airflow entities (for Jarek's proposal)
> or some arbitrary key along with those Airflow entities (Furcy's proposal).
>
> I know in the past that I had a situation where I wanted to reuse a
> cluster across multiple data intervals, if one was already running (this
> was before I discovered Airflow so wasn't "execution dates" precisely). I
> can equally see use cases where I might want to share some resource for
> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
> added this then why limit it to any one of those combinations? But then we
> just have an arbitrary key-value store. If you want to use Airflow for that
> then you can use Variables, if you want to use something else then you can.
>
> Unless Airflow is doing some extra management of these key-values in some
> way (like it does with clearing out XCom's on reruns), then I see
> absolutely no added benefit. And even with some potential management by
> Airflow I'm still not convinced that Airflow is the right place for it.
>
> Chris
>
> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>
> Thank you Jarek for the detailed explanation,
>
> That's exactly what I wanted to do: write a feature request to summarize
> all those discussions.
> I agree with you that the feature should be marked distinct from the XCom
> feature and that we should not piggyback this feature into XCom.
>
> The crux of the problem, I think is that with XCom you do want the task to
> delete it's xcom on the beginning of the retry.
> Correct me if I'm wrong but one use cases where it was necessary was
> having a task A and a task B that starts immediately after A, and wait from
> some 'signal' from A.
> If A and B restart and A doesn't reset it's signal, then B will use the
> signal from A's first try, which is incorrect.
>
> About the 3 solutions you mention:
>
> 1) Providing the job_id from outside. That works indeed. Sadly in my
> use-case Livy's API is poorly designed and only returns a generated job_id,
> you can't specify a custom one.
> You can't even find a job by name, I would have to list all the active
> job_ids, and do a GET for each of them to get it's name and find which one
> is the one I want. It's doable but inelegant.
>
> 2) Store the id in an external storage. Of course it would work but it
> requires an external storage. More on that below.
>
> 3) I'm not sure I understand completely what you mean there, but I think
> you mean that the idempotency can be handled by the service you call (for
> instance BigQuery). Indeed that is another solution. If we were using Spark
> with a Hive metastore + locking or the deltalake storage format, we could
> have something to prevent a job that run twice from creating duplicates.
> This is another solution we are considering, but it is coslty to change now.
>
> You guess correctly that the feature I was asking for me would be to
> provide some utility to let the users implement solution 2) without
> requiring an external storage.
> I think it would be a QOL improvement for some use cases, just like it
> could be argued that XCom is just a QOL improvement and users could have
> used an external storage themselves.
> The main advantage that it brings is making the custom operators much
> easier to share and reuse across the Apache Airflow community, compared to
> having to set up some external
> storage.
>
> I have seen that some users used the metadata store itself as an external
> storage by adding a new table to the airflow model:
>
> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>
> And others suggested using XCom itself as an external storage by storing
> information with a special task_id:
> https://stackoverflow.com/a/57515143/2087478
>
> In the discussion thread you provided it was also suggested to use
> Variables to store some persisting information.
>
> These 3 approaches work but feel quite "hacky" and I believe that
> providing such functionality would be good.
>
> Finally, I don't see the point of limiting the functionality to such
> extent, providing a "IdempotencyIdStorage" that only allows you to store a
> string
> will just force people who need to store more than one id for one task
> (for whatever reason) to use some hack again, like storing a json inside
> the storage.
>
> I was more thinking about something quite similar to XCom (I liked the
> XState name suggestion), where the entry would be keyed by "(dag_id,
> task_id, execution_date, key)"
> where "key" can be whatever you want and would be kept across retries.
>
> I have read (quickly) through the "Pandora's Box" thread you linked.
> Indeed it looks like there would be many ways to misuse such feature.
> I do understand the important of idempotency, and it looks like my use
> case is one of the first ever listed where I do need to persist a state
> across retries to make my operator really idempotent.
>
> I'm surprised no one came up with it given how frequent the Spark +
> Airflow combination is (well, the BigQueryOperator was one too but found
> another solution).
>
> Of course we can blame it on Livy for being poorly conceived (unlike
> BigQuery) or we can blame it on Spark for not having a built-in security
> mechanism to prevent double-writes,
> but I think that as the above hacks show, you can't really prevent users
> from shooting themselves in the foot if that's what they really want to.
>
> While I do think that making things foolproof is important, I believe it's
> also in Python's philosophy to *not* make things foolproof at the
> detriment of simplicity for the right use cases.
> But I do understand that the use cases are different and contradictory:
> some would require the state to be persisted across reschedule and not
> retries, mine would require the state to be persisted across retries and
> not reschedule.
>
> Maybe the Airflow-y way for that would be to have one task that does the
> submit and an xcom with the job, then one task that check the progress of
> the job, but that feels very cumbersome to double the number of tasks just
> for that. Plus I'm not sure we could make the first task retry if the
> second task fails...
>
> Thanks again,
>
> Furcy
>
>
> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
> wrote:
>
> I think we've discussed several approaches like that and using Xcom
> name (which for many people would mean "let's just extend XCom table for
> that" is not a very good idea to use it IMHO. I think this is very
> different functionality/logic which we might or might not agree to
> implement as a community. Naming it "Xcom" to trying to extend the XCom
> table behavior might be problematic.
>
> Not sure if you are aware but we had very similar discussion about it
> recently (without clear conclusions but at least you can see what kind of
> issues/problems different people have with this approach)
>
> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>
> I am not saying it is impossible to do, but I think it's a matter of how
> we formulate the "use case". It's very tempting to implement a generic -
> intra-task communication mechanism, indeed. But it can very easily lead to
> people abusing it and bypassing the guarantees (idempotency mainly) that
> Airflow provides for backfilling and re-running tasks. I thought a bit
> after the latest discussion kind of died out, and I have one possible
> solution to the problem.
>
> Let me explain what I think about it (but others can have different
> opinions of course):
>
> So far the discussion was that there are several ways to achieve what you
> want (and it's really about what entity is providing the "idempotency"
> guarantee:
>
> 1) Similarly as just merged in the BigQuery Insert Job
> https://github.com/apache/airflow/pull/8868/files - you can provide
> job_id from outside. You'd need to work out the job_id naming that works in
> your case and make sure that when you re-run your task with the same
> (dag_id, task_id, execution date) you will get the same id. Then the
> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>
> 2) Store the DAG id in some external storage (via one of the hooks - where
> it can be queried in the way that will work for you). Then the idempotency
> is actually handled by the logic in your Operator + some external storage.
>
> 3) Query your service and retrieve the JOB ID from it - but you have to
> have a way to query for the job related to your "dag id  + task
> + execution_date". Then - the idempotency is actually handling by the
> Service you are using.
>
> In the use case, you describe - this is the only thing you need -
> "idempotency source". I believe you would like to get the case 2) from
> above but without having to use external storage to store the "unique id".
> Something that will let each task in the same dag run to set or retrieve a
> unique value for that particular task. One value should be enough -
> assuming that each operator/task works on one external data "source".
>
> My current thinking is:
>
> Why don't we provide such a dedicated, idempotency service inside Airflow?
> We already have a DB and we could have an"IdempotencyIdStorage" class with
> two methods:
>
>   * .set(id: str) and
>   * .get() -> str
>
> And the data stored there should be a string keyed by "dag_id, task_id,
> execution_date)" - available also via Jinja templating. There is no
> intra-task communication, here, very little possibility of abuse and it
> seems to solve the major pain point where you have to provide your own
> storage to get the idempotency if your service does not provide one or you
> do not want to delegate it to the DAG writer.
>
> J.
>
>
> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>
> The use case I'm referring to is that you can't use xcom to let a task
> read information from it's past attempts, because when a task starts it's
> xcom is automatically deleted.
>
> My specific use case is that we have a custom LivyOperator that calls Livy
> to start batch Spark Jobs.
> When you start a batch job Livy returns a job_id
> Sometimes our operator can fail for one reason or another (for instance if
> Livy is unreachable for a while)
> When the task retries, it calls Livy again, which start the same spark
> job, but the problem is that the spark job from the first attempt can still
> be running,
> and then we have a batch job that runs twice simultaneously and creates
> duplicates in the output.
>
> What we tried to do is getting the job_id from the first try, to check if
> the job is still running, and wait for it to complete if it is.
>
> We tried using xcom to let the task send a message to itself (to it's next
> try) but xcom is meant for "inter-task communication" only so this doesn't
> work and is not intended to work.
>
>
>
>
>
> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>
> Hi Furcy,
>
> Can you give a concrete example of what you mean by intra-task xcom?
> Depending your use case this may already be possible.
>
> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>
> Hello,
>
> I would like to open a feature request for Airflow to support "intra-task
> xcom".
>
> It seems that there are several distinct use cases for it already
> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>
> I wanted to summarize links to the use cases and past attempts,
> and the recommended approach (which apparently would be to create
> a distinct feature from xcom to support this, it could be calle intra-com
> or self-com ?)
>
> Do you know if such ticket already exists? I couldn't find one.
> Also I can't create any ticket due to some obscure bug (see my other
> email).
>
> Thanks,
>
> Furcy
>
>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>

Re: Intra-task "xcom"

Posted by Ash Berlin-Taylor <as...@apache.org>.
Just to touch on one point about XCom, and to re-assure people that they, or something very like them are in Airflow for the foreseeable future.

As an example of an appropriate use for XCom: Let's say a third party delivers you a set of files once a week, but the exact name of the files isn't known (by you) in advance. So you write a sensor that polls/checks S3 for the Objects to appear in our bucket, and the sensor outputs the S3 Object path to XCom, that then next processing step then examines to process the files.
That sort of use case is not going anywhere.
Cheers,
-ash

On Jun 1 2020, at 7:37 pm, Chris Palmer <ch...@crpalmer.com> wrote:
> At the risk of repeating myself (from the previous thread that touched on this topic), I don't think Airflow should be in the business of keeping state about external systems. Airflow is about authoring and running workflows; it's not a messaging tool or a cluster management tool. I'm not convinced that the existing XCom functionality should really be a part of Airflow, and I certainly don't think it should be expanded upon or new variations added. I think storing state is especially risky, if for no other reason than the fact that Airflow is not the source of truth about those systems. It's very likely that at some times the "state" that Airflow has saved will diverge from the actual state of the external system. Handling that nicely, probably requires a bunch of custom code in the operators/hooks anyway, so I don't think it saves anything in terms of operator code complexity. Users would be much better served going to the source of truth to determine state. If part of the problem
 is that Livy is lacking in features (like being able to query the status of a particular job_id) then I think it would be more appropriate to add the needed features to that project. Airflow at its core shouldn't be concerned with making up for failures of other tools.
>
> Also as can be seen by just this discussion, it's hard to keep these extra features from expanding in scope. Jarek proposed something that would just store a single string, and immediately Furcy wants to expand it to store multiple strings. Either way we are really just talking about a key-value store, and putting limits on how that key can be structured; the key is made up of some predefined set of Airflow entities (for Jarek's proposal) or some arbitrary key along with those Airflow entities (Furcy's proposal).
> I know in the past that I had a situation where I wanted to reuse a cluster across multiple data intervals, if one was already running (this was before I discovered Airflow so wasn't "execution dates" precisely). I can equally see use cases where I might want to share some resource for multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we added this then why limit it to any one of those combinations? But then we just have an arbitrary key-value store. If you want to use Airflow for that then you can use Variables, if you want to use something else then you can.
> Unless Airflow is doing some extra management of these key-values in some way (like it does with clearing out XCom's on reruns), then I see absolutely no added benefit. And even with some potential management by Airflow I'm still not convinced that Airflow is the right place for it.
>
> Chris
> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pin.furcy@gmail.com (mailto:pin.furcy@gmail.com)> wrote:
> > Thank you Jarek for the detailed explanation,
> >
> > That's exactly what I wanted to do: write a feature request to summarize all those discussions.
> > I agree with you that the feature should be marked distinct from the XCom feature and that we should not piggyback this feature into XCom.
> >
> > The crux of the problem, I think is that with XCom you do want the task to delete it's xcom on the beginning of the retry.
> > Correct me if I'm wrong but one use cases where it was necessary was having a task A and a task B that starts immediately after A, and wait from some 'signal' from A.
> > If A and B restart and A doesn't reset it's signal, then B will use the signal from A's first try, which is incorrect.
> >
> > About the 3 solutions you mention:
> > 1) Providing the job_id from outside. That works indeed. Sadly in my use-case Livy's API is poorly designed and only returns a generated job_id, you can't specify a custom one.
> > You can't even find a job by name, I would have to list all the active job_ids, and do a GET for each of them to get it's name and find which one is the one I want. It's doable but inelegant.
> >
> > 2) Store the id in an external storage. Of course it would work but it requires an external storage. More on that below.
> > 3) I'm not sure I understand completely what you mean there, but I think you mean that the idempotency can be handled by the service you call (for instance BigQuery). Indeed that is another solution. If we were using Spark with a Hive metastore + locking or the deltalake storage format, we could have something to prevent a job that run twice from creating duplicates. This is another solution we are considering, but it is coslty to change now.
> > You guess correctly that the feature I was asking for me would be to provide some utility to let the users implement solution 2) without requiring an external storage.
> > I think it would be a QOL improvement for some use cases, just like it could be argued that XCom is just a QOL improvement and users could have used an external storage themselves.
> > The main advantage that it brings is making the custom operators much easier to share and reuse across the Apache Airflow community, compared to having to set up some external
> > storage.
> >
> > I have seen that some users used the metadata store itself as an external storage by adding a new table to the airflow model:
> > http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
> >
> > And others suggested using XCom itself as an external storage by storing information with a special task_id: https://stackoverflow.com/a/57515143/2087478
> > In the discussion thread you provided it was also suggested to use Variables to store some persisting information.
> > These 3 approaches work but feel quite "hacky" and I believe that providing such functionality would be good.
> > Finally, I don't see the point of limiting the functionality to such extent, providing a "IdempotencyIdStorage" that only allows you to store a string
> > will just force people who need to store more than one id for one task (for whatever reason) to use some hack again, like storing a json inside the storage.
> >
> > I was more thinking about something quite similar to XCom (I liked the XState name suggestion), where the entry would be keyed by "(dag_id, task_id, execution_date, key)"
> > where "key" can be whatever you want and would be kept across retries.
> >
> > I have read (quickly) through the "Pandora's Box" thread you linked. Indeed it looks like there would be many ways to misuse such feature.
> > I do understand the important of idempotency, and it looks like my use case is one of the first ever listed where I do need to persist a state
> > across retries to make my operator really idempotent.
> >
> > I'm surprised no one came up with it given how frequent the Spark + Airflow combination is (well, the BigQueryOperator was one too but found another solution).
> > Of course we can blame it on Livy for being poorly conceived (unlike BigQuery) or we can blame it on Spark for not having a built-in security mechanism to prevent double-writes,
> > but I think that as the above hacks show, you can't really prevent users from shooting themselves in the foot if that's what they really want to.
> >
> > While I do think that making things foolproof is important, I believe it's also in Python's philosophy to not make things foolproof at the detriment of simplicity for the right use cases.
> > But I do understand that the use cases are different and contradictory: some would require the state to be persisted across reschedule and not retries, mine would require the state to be persisted across retries and not reschedule.
> >
> > Maybe the Airflow-y way for that would be to have one task that does the submit and an xcom with the job, then one task that check the progress of the job, but that feels very cumbersome to double the number of tasks just for that. Plus I'm not sure we could make the first task retry if the second task fails...
> > Thanks again,
> > Furcy
> >
> > On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Jarek.Potiuk@polidea.com (mailto:Jarek.Potiuk@polidea.com)> wrote:
> > > I think we've discussed several approaches like that and using Xcom name (which for many people would mean "let's just extend XCom table for that" is not a very good idea to use it IMHO. I think this is very different functionality/logic which we might or might not agree to implement as a community. Naming it "Xcom" to trying to extend the XCom table behavior might be problematic.
> > >
> > > Not sure if you are aware but we had very similar discussion about it recently (without clear conclusions but at least you can see what kind of issues/problems different people have with this approach)
> > > https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
> > >
> > > I am not saying it is impossible to do, but I think it's a matter of how we formulate the "use case". It's very tempting to implement a generic - intra-task communication mechanism, indeed. But it can very easily lead to people abusing it and bypassing the guarantees (idempotency mainly) that Airflow provides for backfilling and re-running tasks. I thought a bit after the latest discussion kind of died out, and I have one possible solution to the problem.
> > >
> > > Let me explain what I think about it (but others can have different opinions of course):
> > >
> > > So far the discussion was that there are several ways to achieve what you want (and it's really about what entity is providing the "idempotency" guarantee:
> > >
> > > 1) Similarly as just merged in the BigQuery Insert Job https://github.com/apache/airflow/pull/8868/files - you can provide job_id from outside. You'd need to work out the job_id naming that works in your case and make sure that when you re-run your task with the same (dag_id, task_id, execution date) you will get the same id. Then the "uniqueness" thus idempotency is handled by the logic written in the DAG.
> > >
> > > 2) Store the DAG id in some external storage (via one of the hooks - where it can be queried in the way that will work for you). Then the idempotency is actually handled by the logic in your Operator + some external storage.
> > >
> > > 3) Query your service and retrieve the JOB ID from it - but you have to have a way to query for the job related to your "dag id + task + execution_date". Then - the idempotency is actually handling by the Service you are using.
> > >
> > > In the use case, you describe - this is the only thing you need - "idempotency source". I believe you would like to get the case 2) from above but without having to use external storage to store the "unique id". Something that will let each task in the same dag run to set or retrieve a unique value for that particular task. One value should be enough - assuming that each operator/task works on one external data "source".
> > >
> > > My current thinking is:
> > >
> > > Why don't we provide such a dedicated, idempotency service inside Airflow? We already have a DB and we could have an"IdempotencyIdStorage" class with two methods:
> > >
> > > * .set(id: str) and
> > > * .get() -> str
> > >
> > > And the data stored there should be a string keyed by "dag_id, task_id, execution_date)" - available also via Jinja templating. There is no intra-task communication, here, very little possibility of abuse and it seems to solve the major pain point where you have to provide your own storage to get the idempotency if your service does not provide one or you do not want to delegate it to the DAG writer.
> > >
> > > J.
> > >
> > >
> > > On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pin.furcy@gmail.com (mailto:pin.furcy@gmail.com)> wrote:
> > > > The use case I'm referring to is that you can't use xcom to let a task read information from it's past attempts, because when a task starts it's xcom is automatically deleted.
> > > >
> > > > My specific use case is that we have a custom LivyOperator that calls Livy to start batch Spark Jobs.
> > > > When you start a batch job Livy returns a job_id
> > > > Sometimes our operator can fail for one reason or another (for instance if Livy is unreachable for a while)
> > > > When the task retries, it calls Livy again, which start the same spark job, but the problem is that the spark job from the first attempt can still be running,
> > > > and then we have a batch job that runs twice simultaneously and creates duplicates in the output.
> > > >
> > > > What we tried to do is getting the job_id from the first try, to check if the job is still running, and wait for it to complete if it is.
> > > > We tried using xcom to let the task send a message to itself (to it's next try) but xcom is meant for "inter-task communication" only so this doesn't work and is not intended to work.
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <ash@apache.org (mailto:ash@apache.org)> wrote:
> > > > > Hi Furcy,
> > > > >
> > > > > Can you give a concrete example of what you mean by intra-task xcom? Depending your use case this may already be possible.
> > > > > On Jun 1 2020, at 11:45 am, Furcy Pin <pin.furcy@gmail.com (mailto:pin.furcy@gmail.com)> wrote:
> > > > > > Hello,
> > > > > >
> > > > > > I would like to open a feature request for Airflow to support "intra-task xcom".
> > > > > > It seems that there are several distinct use cases for it already
> > > > > > and only ugly workarounds and I wanted to list them in a JIRA ticket.
> > > > > >
> > > > > > I wanted to summarize links to the use cases and past attempts,
> > > > > > and the recommended approach (which apparently would be to create
> > > > > > a distinct feature from xcom to support this, it could be calle intra-com or self-com ?)
> > > > > >
> > > > > > Do you know if such ticket already exists? I couldn't find one.
> > > > > > Also I can't create any ticket due to some obscure bug (see my other email).
> > > > > >
> > > > > > Thanks,
> > > > > > Furcy
> > >
> > > --
> > >
> > >
> > > Jarek Potiuk
> > > Polidea (https://www.polidea.com/) | Principal Software Engineer
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > M: +48 660 796 129 (tel:+48660796129)
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
> >
>
>


Re: Intra-task "xcom"

Posted by Furcy Pin <pi...@gmail.com>.
Thank you Chris for your answer.

I agree with you that if there is something to fix, it's more on the Livy
side. I also did not considered the Variables feature, mostly because I saw
it as a feature
to pass mutable configuration to the tasks and I did not know tasks could
set variables too.

I guess we may try using this as a workaround, thank you for the suggestion.

If I may, I would like to tamper your argument that "Airflow should not be
in the business of keeping state about external systems".
While I agree with you on that, I would tend to say that while "checking
the state of external systems" is not Airflow's business, I believe it is
the exact business of Airflow operators.
As you said, operators should rather check the source of truth, but after
doing this operators will be keeping the state of the external system in
RAM.
The question then is how should failure be handled by the operator ? For
instance my LivyOperator regularly poke Livy to know the state of the
running job.
If there is a network interruption between the operator and the external
system, what should happen? Should the operator handle the retries itself ?
If yes for how long ? What if the operator itself fails for another
completely unexpected reason ? If the system isn't itself designed to be
idempotent (like a RDMS),
isn't it the Operator's job to behave in an idempotent way ? Or should we
design a whole system in-between the operator and the system to handle the
idempotency ?
Or just use another system ?

If we say that it's the Operator's job to be idempotent despite the system,
then to be able to do that it might need to know some information
from it's previous executions (to perform cleanup or check a state), and
since any state stored outside the external system might diverge, it must
also be able to check that it did not. But if an Operator does exactly that
I don't see why it should be considered a bad thing to let it use some kind
of provided storage.

Also, sadly, it is often easier to write a temporary workaround for a
recurring production issue in your custom operator rather than in the
external system
that you don't own (and some system aren't open source like Livy).


Thanks again for the explanation,

Furcy

On Mon, 1 Jun 2020 at 20:38, Chris Palmer <ch...@crpalmer.com> wrote:

> At the risk of repeating myself (from the previous thread that touched on
> this topic), I don't think Airflow should be in the business of keeping
> state about external systems. Airflow is about authoring and running
> workflows; it's not a messaging tool or a cluster management tool. I'm not
> convinced that the existing XCom functionality should really be a part of
> Airflow, and I certainly don't think it should be expanded upon or new
> variations added. I think storing state is especially risky, if for no
> other reason than the fact that Airflow is not the source of truth about
> those systems. It's very likely that at some times the "state" that Airflow
> has saved will diverge from the actual state of the external system.
> Handling that nicely, probably requires a bunch of custom code in the
> operators/hooks anyway, so I don't think it saves anything in terms of
> operator code complexity. Users would be much better served going to the
> source of truth to determine state. If part of the problem is that Livy is
> lacking in features (like being able to query the status of a particular
> job_id) then I think it would be more appropriate to add the needed
> features to that project. Airflow at its core shouldn't be concerned with
> making up for failures of other tools.
>
> Also as can be seen by just this discussion, it's hard to keep these extra
> features from expanding in scope. Jarek proposed something that would just
> store a single string, and immediately Furcy wants to expand it to store
> multiple strings. Either way we are really just talking about a key-value
> store, and putting limits on how that key can be structured; the key is
> made up of some predefined set of Airflow entities (for Jarek's proposal)
> or some arbitrary key along with those Airflow entities (Furcy's proposal).
>
> I know in the past that I had a situation where I wanted to reuse a
> cluster across multiple data intervals, if one was already running (this
> was before I discovered Airflow so wasn't "execution dates" precisely). I
> can equally see use cases where I might want to share some resource for
> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
> added this then why limit it to any one of those combinations? But then we
> just have an arbitrary key-value store. If you want to use Airflow for that
> then you can use Variables, if you want to use something else then you can.
>
> Unless Airflow is doing some extra management of these key-values in some
> way (like it does with clearing out XCom's on reruns), then I see
> absolutely no added benefit. And even with some potential management by
> Airflow I'm still not convinced that Airflow is the right place for it.
>
> Chris
>
> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:
>
>> Thank you Jarek for the detailed explanation,
>>
>> That's exactly what I wanted to do: write a feature request to summarize
>> all those discussions.
>> I agree with you that the feature should be marked distinct from the XCom
>> feature and that we should not piggyback this feature into XCom.
>>
>> The crux of the problem, I think is that with XCom you do want the task
>> to delete it's xcom on the beginning of the retry.
>> Correct me if I'm wrong but one use cases where it was necessary was
>> having a task A and a task B that starts immediately after A, and wait from
>> some 'signal' from A.
>> If A and B restart and A doesn't reset it's signal, then B will use the
>> signal from A's first try, which is incorrect.
>>
>> About the 3 solutions you mention:
>>
>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>> use-case Livy's API is poorly designed and only returns a generated job_id,
>> you can't specify a custom one.
>> You can't even find a job by name, I would have to list all the active
>> job_ids, and do a GET for each of them to get it's name and find which one
>> is the one I want. It's doable but inelegant.
>>
>> 2) Store the id in an external storage. Of course it would work but it
>> requires an external storage. More on that below.
>>
>> 3) I'm not sure I understand completely what you mean there, but I think
>> you mean that the idempotency can be handled by the service you call (for
>> instance BigQuery). Indeed that is another solution. If we were using Spark
>> with a Hive metastore + locking or the deltalake storage format, we could
>> have something to prevent a job that run twice from creating duplicates.
>> This is another solution we are considering, but it is coslty to change now.
>>
>> You guess correctly that the feature I was asking for me would be to
>> provide some utility to let the users implement solution 2) without
>> requiring an external storage.
>> I think it would be a QOL improvement for some use cases, just like it
>> could be argued that XCom is just a QOL improvement and users could have
>> used an external storage themselves.
>> The main advantage that it brings is making the custom operators much
>> easier to share and reuse across the Apache Airflow community, compared to
>> having to set up some external
>> storage.
>>
>> I have seen that some users used the metadata store itself as an external
>> storage by adding a new table to the airflow model:
>>
>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>>
>> And others suggested using XCom itself as an external storage by storing
>> information with a special task_id:
>> https://stackoverflow.com/a/57515143/2087478
>>
>> In the discussion thread you provided it was also suggested to use
>> Variables to store some persisting information.
>>
>> These 3 approaches work but feel quite "hacky" and I believe that
>> providing such functionality would be good.
>>
>> Finally, I don't see the point of limiting the functionality to such
>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>> string
>> will just force people who need to store more than one id for one task
>> (for whatever reason) to use some hack again, like storing a json inside
>> the storage.
>>
>> I was more thinking about something quite similar to XCom (I liked the
>> XState name suggestion), where the entry would be keyed by "(dag_id,
>> task_id, execution_date, key)"
>> where "key" can be whatever you want and would be kept across retries.
>>
>> I have read (quickly) through the "Pandora's Box" thread you linked.
>> Indeed it looks like there would be many ways to misuse such feature.
>> I do understand the important of idempotency, and it looks like my use
>> case is one of the first ever listed where I do need to persist a state
>> across retries to make my operator really idempotent.
>>
>> I'm surprised no one came up with it given how frequent the Spark +
>> Airflow combination is (well, the BigQueryOperator was one too but found
>> another solution).
>>
>> Of course we can blame it on Livy for being poorly conceived (unlike
>> BigQuery) or we can blame it on Spark for not having a built-in security
>> mechanism to prevent double-writes,
>> but I think that as the above hacks show, you can't really prevent users
>> from shooting themselves in the foot if that's what they really want to.
>>
>> While I do think that making things foolproof is important, I believe
>> it's also in Python's philosophy to *not* make things foolproof at the
>> detriment of simplicity for the right use cases.
>> But I do understand that the use cases are different and contradictory:
>> some would require the state to be persisted across reschedule and not
>> retries, mine would require the state to be persisted across retries and
>> not reschedule.
>>
>> Maybe the Airflow-y way for that would be to have one task that does the
>> submit and an xcom with the job, then one task that check the progress of
>> the job, but that feels very cumbersome to double the number of tasks just
>> for that. Plus I'm not sure we could make the first task retry if the
>> second task fails...
>>
>> Thanks again,
>>
>> Furcy
>>
>>
>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
>> wrote:
>>
>>> I think we've discussed several approaches like that and using Xcom
>>> name (which for many people would mean "let's just extend XCom table for
>>> that" is not a very good idea to use it IMHO. I think this is very
>>> different functionality/logic which we might or might not agree to
>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>> table behavior might be problematic.
>>>
>>> Not sure if you are aware but we had very similar discussion about it
>>> recently (without clear conclusions but at least you can see what kind of
>>> issues/problems different people have with this approach)
>>>
>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>
>>> I am not saying it is impossible to do, but I think it's a matter of how
>>> we formulate the "use case". It's very tempting to implement a generic -
>>> intra-task communication mechanism, indeed. But it can very easily lead to
>>> people abusing it and bypassing the guarantees (idempotency mainly) that
>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>> after the latest discussion kind of died out, and I have one possible
>>> solution to the problem.
>>>
>>> Let me explain what I think about it (but others can have different
>>> opinions of course):
>>>
>>> So far the discussion was that there are several ways to achieve what
>>> you want (and it's really about what entity is providing the "idempotency"
>>> guarantee:
>>>
>>> 1) Similarly as just merged in the BigQuery Insert Job
>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>> job_id from outside. You'd need to work out the job_id naming that works in
>>> your case and make sure that when you re-run your task with the same
>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>
>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>> where it can be queried in the way that will work for you). Then the
>>> idempotency is actually handled by the logic in your Operator + some
>>> external storage.
>>>
>>> 3) Query your service and retrieve the JOB ID from it - but you have to
>>> have a way to query for the job related to your "dag id  + task
>>> + execution_date". Then - the idempotency is actually handling by the
>>> Service you are using.
>>>
>>> In the use case, you describe - this is the only thing you need -
>>> "idempotency source". I believe you would like to get the case 2) from
>>> above but without having to use external storage to store the "unique id".
>>> Something that will let each task in the same dag run to set or retrieve a
>>> unique value for that particular task. One value should be enough -
>>> assuming that each operator/task works on one external data "source".
>>>
>>> My current thinking is:
>>>
>>> Why don't we provide such a dedicated, idempotency service inside
>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>> class with two methods:
>>>
>>>   * .set(id: str) and
>>>   * .get() -> str
>>>
>>> And the data stored there should be a string keyed by "dag_id, task_id,
>>> execution_date)" - available also via Jinja templating. There is no
>>> intra-task communication, here, very little possibility of abuse and it
>>> seems to solve the major pain point where you have to provide your own
>>> storage to get the idempotency if your service does not provide one or you
>>> do not want to delegate it to the DAG writer.
>>>
>>> J.
>>>
>>>
>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>>
>>>> The use case I'm referring to is that you can't use xcom to let a task
>>>> read information from it's past attempts, because when a task starts it's
>>>> xcom is automatically deleted.
>>>>
>>>> My specific use case is that we have a custom LivyOperator that calls
>>>> Livy to start batch Spark Jobs.
>>>> When you start a batch job Livy returns a job_id
>>>> Sometimes our operator can fail for one reason or another (for instance
>>>> if Livy is unreachable for a while)
>>>> When the task retries, it calls Livy again, which start the same spark
>>>> job, but the problem is that the spark job from the first attempt can still
>>>> be running,
>>>> and then we have a batch job that runs twice simultaneously and creates
>>>> duplicates in the output.
>>>>
>>>> What we tried to do is getting the job_id from the first try, to check
>>>> if the job is still running, and wait for it to complete if it is.
>>>>
>>>> We tried using xcom to let the task send a message to itself (to it's
>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>> doesn't work and is not intended to work.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>>>
>>>>> Hi Furcy,
>>>>>
>>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>>> Depending your use case this may already be possible.
>>>>>
>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I would like to open a feature request for Airflow to support
>>>>> "intra-task xcom".
>>>>>
>>>>> It seems that there are several distinct use cases for it already
>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>>
>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>> and the recommended approach (which apparently would be to create
>>>>> a distinct feature from xcom to support this, it could be calle
>>>>> intra-com or self-com ?)
>>>>>
>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>>> email).
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Furcy
>>>>>
>>>>>
>>>
>>> --
>>>
>>> Jarek Potiuk
>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>
>>> M: +48 660 796 129 <+48660796129>
>>> [image: Polidea] <https://www.polidea.com/>
>>>
>>>

Re: Intra-task "xcom"

Posted by Chris Palmer <ch...@crpalmer.com>.
At the risk of repeating myself (from the previous thread that touched on
this topic), I don't think Airflow should be in the business of keeping
state about external systems. Airflow is about authoring and running
workflows; it's not a messaging tool or a cluster management tool. I'm not
convinced that the existing XCom functionality should really be a part of
Airflow, and I certainly don't think it should be expanded upon or new
variations added. I think storing state is especially risky, if for no
other reason than the fact that Airflow is not the source of truth about
those systems. It's very likely that at some times the "state" that Airflow
has saved will diverge from the actual state of the external system.
Handling that nicely, probably requires a bunch of custom code in the
operators/hooks anyway, so I don't think it saves anything in terms of
operator code complexity. Users would be much better served going to the
source of truth to determine state. If part of the problem is that Livy is
lacking in features (like being able to query the status of a particular
job_id) then I think it would be more appropriate to add the needed
features to that project. Airflow at its core shouldn't be concerned with
making up for failures of other tools.

Also as can be seen by just this discussion, it's hard to keep these extra
features from expanding in scope. Jarek proposed something that would just
store a single string, and immediately Furcy wants to expand it to store
multiple strings. Either way we are really just talking about a key-value
store, and putting limits on how that key can be structured; the key is
made up of some predefined set of Airflow entities (for Jarek's proposal)
or some arbitrary key along with those Airflow entities (Furcy's proposal).

I know in the past that I had a situation where I wanted to reuse a cluster
across multiple data intervals, if one was already running (this was before
I discovered Airflow so wasn't "execution dates" precisely). I can equally
see use cases where I might want to share some resource for multiple tasks
in a DAG, or across similar tasks in multiple DAGs. So if we added this
then why limit it to any one of those combinations? But then we just have
an arbitrary key-value store. If you want to use Airflow for that then you
can use Variables, if you want to use something else then you can.

Unless Airflow is doing some extra management of these key-values in some
way (like it does with clearing out XCom's on reruns), then I see
absolutely no added benefit. And even with some potential management by
Airflow I'm still not convinced that Airflow is the right place for it.

Chris

On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pi...@gmail.com> wrote:

> Thank you Jarek for the detailed explanation,
>
> That's exactly what I wanted to do: write a feature request to summarize
> all those discussions.
> I agree with you that the feature should be marked distinct from the XCom
> feature and that we should not piggyback this feature into XCom.
>
> The crux of the problem, I think is that with XCom you do want the task to
> delete it's xcom on the beginning of the retry.
> Correct me if I'm wrong but one use cases where it was necessary was
> having a task A and a task B that starts immediately after A, and wait from
> some 'signal' from A.
> If A and B restart and A doesn't reset it's signal, then B will use the
> signal from A's first try, which is incorrect.
>
> About the 3 solutions you mention:
>
> 1) Providing the job_id from outside. That works indeed. Sadly in my
> use-case Livy's API is poorly designed and only returns a generated job_id,
> you can't specify a custom one.
> You can't even find a job by name, I would have to list all the active
> job_ids, and do a GET for each of them to get it's name and find which one
> is the one I want. It's doable but inelegant.
>
> 2) Store the id in an external storage. Of course it would work but it
> requires an external storage. More on that below.
>
> 3) I'm not sure I understand completely what you mean there, but I think
> you mean that the idempotency can be handled by the service you call (for
> instance BigQuery). Indeed that is another solution. If we were using Spark
> with a Hive metastore + locking or the deltalake storage format, we could
> have something to prevent a job that run twice from creating duplicates.
> This is another solution we are considering, but it is coslty to change now.
>
> You guess correctly that the feature I was asking for me would be to
> provide some utility to let the users implement solution 2) without
> requiring an external storage.
> I think it would be a QOL improvement for some use cases, just like it
> could be argued that XCom is just a QOL improvement and users could have
> used an external storage themselves.
> The main advantage that it brings is making the custom operators much
> easier to share and reuse across the Apache Airflow community, compared to
> having to set up some external
> storage.
>
> I have seen that some users used the metadata store itself as an external
> storage by adding a new table to the airflow model:
>
> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e
>
> And others suggested using XCom itself as an external storage by storing
> information with a special task_id:
> https://stackoverflow.com/a/57515143/2087478
>
> In the discussion thread you provided it was also suggested to use
> Variables to store some persisting information.
>
> These 3 approaches work but feel quite "hacky" and I believe that
> providing such functionality would be good.
>
> Finally, I don't see the point of limiting the functionality to such
> extent, providing a "IdempotencyIdStorage" that only allows you to store a
> string
> will just force people who need to store more than one id for one task
> (for whatever reason) to use some hack again, like storing a json inside
> the storage.
>
> I was more thinking about something quite similar to XCom (I liked the
> XState name suggestion), where the entry would be keyed by "(dag_id,
> task_id, execution_date, key)"
> where "key" can be whatever you want and would be kept across retries.
>
> I have read (quickly) through the "Pandora's Box" thread you linked.
> Indeed it looks like there would be many ways to misuse such feature.
> I do understand the important of idempotency, and it looks like my use
> case is one of the first ever listed where I do need to persist a state
> across retries to make my operator really idempotent.
>
> I'm surprised no one came up with it given how frequent the Spark +
> Airflow combination is (well, the BigQueryOperator was one too but found
> another solution).
>
> Of course we can blame it on Livy for being poorly conceived (unlike
> BigQuery) or we can blame it on Spark for not having a built-in security
> mechanism to prevent double-writes,
> but I think that as the above hacks show, you can't really prevent users
> from shooting themselves in the foot if that's what they really want to.
>
> While I do think that making things foolproof is important, I believe it's
> also in Python's philosophy to *not* make things foolproof at the
> detriment of simplicity for the right use cases.
> But I do understand that the use cases are different and contradictory:
> some would require the state to be persisted across reschedule and not
> retries, mine would require the state to be persisted across retries and
> not reschedule.
>
> Maybe the Airflow-y way for that would be to have one task that does the
> submit and an xcom with the job, then one task that check the progress of
> the job, but that feels very cumbersome to double the number of tasks just
> for that. Plus I'm not sure we could make the first task retry if the
> second task fails...
>
> Thanks again,
>
> Furcy
>
>
> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com>
> wrote:
>
>> I think we've discussed several approaches like that and using Xcom
>> name (which for many people would mean "let's just extend XCom table for
>> that" is not a very good idea to use it IMHO. I think this is very
>> different functionality/logic which we might or might not agree to
>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>> table behavior might be problematic.
>>
>> Not sure if you are aware but we had very similar discussion about it
>> recently (without clear conclusions but at least you can see what kind of
>> issues/problems different people have with this approach)
>>
>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>
>> I am not saying it is impossible to do, but I think it's a matter of how
>> we formulate the "use case". It's very tempting to implement a generic -
>> intra-task communication mechanism, indeed. But it can very easily lead to
>> people abusing it and bypassing the guarantees (idempotency mainly) that
>> Airflow provides for backfilling and re-running tasks. I thought a bit
>> after the latest discussion kind of died out, and I have one possible
>> solution to the problem.
>>
>> Let me explain what I think about it (but others can have different
>> opinions of course):
>>
>> So far the discussion was that there are several ways to achieve what you
>> want (and it's really about what entity is providing the "idempotency"
>> guarantee:
>>
>> 1) Similarly as just merged in the BigQuery Insert Job
>> https://github.com/apache/airflow/pull/8868/files - you can provide
>> job_id from outside. You'd need to work out the job_id naming that works in
>> your case and make sure that when you re-run your task with the same
>> (dag_id, task_id, execution date) you will get the same id. Then the
>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>
>> 2) Store the DAG id in some external storage (via one of the hooks -
>> where it can be queried in the way that will work for you). Then the
>> idempotency is actually handled by the logic in your Operator + some
>> external storage.
>>
>> 3) Query your service and retrieve the JOB ID from it - but you have to
>> have a way to query for the job related to your "dag id  + task
>> + execution_date". Then - the idempotency is actually handling by the
>> Service you are using.
>>
>> In the use case, you describe - this is the only thing you need -
>> "idempotency source". I believe you would like to get the case 2) from
>> above but without having to use external storage to store the "unique id".
>> Something that will let each task in the same dag run to set or retrieve a
>> unique value for that particular task. One value should be enough -
>> assuming that each operator/task works on one external data "source".
>>
>> My current thinking is:
>>
>> Why don't we provide such a dedicated, idempotency service inside
>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>> class with two methods:
>>
>>   * .set(id: str) and
>>   * .get() -> str
>>
>> And the data stored there should be a string keyed by "dag_id, task_id,
>> execution_date)" - available also via Jinja templating. There is no
>> intra-task communication, here, very little possibility of abuse and it
>> seems to solve the major pain point where you have to provide your own
>> storage to get the idempotency if your service does not provide one or you
>> do not want to delegate it to the DAG writer.
>>
>> J.
>>
>>
>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>>
>>> The use case I'm referring to is that you can't use xcom to let a task
>>> read information from it's past attempts, because when a task starts it's
>>> xcom is automatically deleted.
>>>
>>> My specific use case is that we have a custom LivyOperator that calls
>>> Livy to start batch Spark Jobs.
>>> When you start a batch job Livy returns a job_id
>>> Sometimes our operator can fail for one reason or another (for instance
>>> if Livy is unreachable for a while)
>>> When the task retries, it calls Livy again, which start the same spark
>>> job, but the problem is that the spark job from the first attempt can still
>>> be running,
>>> and then we have a batch job that runs twice simultaneously and creates
>>> duplicates in the output.
>>>
>>> What we tried to do is getting the job_id from the first try, to check
>>> if the job is still running, and wait for it to complete if it is.
>>>
>>> We tried using xcom to let the task send a message to itself (to it's
>>> next try) but xcom is meant for "inter-task communication" only so this
>>> doesn't work and is not intended to work.
>>>
>>>
>>>
>>>
>>>
>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>>
>>>> Hi Furcy,
>>>>
>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>> Depending your use case this may already be possible.
>>>>
>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I would like to open a feature request for Airflow to support
>>>> "intra-task xcom".
>>>>
>>>> It seems that there are several distinct use cases for it already
>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>
>>>> I wanted to summarize links to the use cases and past attempts,
>>>> and the recommended approach (which apparently would be to create
>>>> a distinct feature from xcom to support this, it could be calle
>>>> intra-com or self-com ?)
>>>>
>>>> Do you know if such ticket already exists? I couldn't find one.
>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>> email).
>>>>
>>>> Thanks,
>>>>
>>>> Furcy
>>>>
>>>>
>>
>> --
>>
>> Jarek Potiuk
>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>
>> M: +48 660 796 129 <+48660796129>
>> [image: Polidea] <https://www.polidea.com/>
>>
>>

Re: Intra-task "xcom"

Posted by Furcy Pin <pi...@gmail.com>.
Thank you Jarek for the detailed explanation,

That's exactly what I wanted to do: write a feature request to summarize
all those discussions.
I agree with you that the feature should be marked distinct from the XCom
feature and that we should not piggyback this feature into XCom.

The crux of the problem, I think is that with XCom you do want the task to
delete it's xcom on the beginning of the retry.
Correct me if I'm wrong but one use cases where it was necessary was
having a task A and a task B that starts immediately after A, and wait from
some 'signal' from A.
If A and B restart and A doesn't reset it's signal, then B will use the
signal from A's first try, which is incorrect.

About the 3 solutions you mention:

1) Providing the job_id from outside. That works indeed. Sadly in my
use-case Livy's API is poorly designed and only returns a generated job_id,
you can't specify a custom one.
You can't even find a job by name, I would have to list all the active
job_ids, and do a GET for each of them to get it's name and find which one
is the one I want. It's doable but inelegant.

2) Store the id in an external storage. Of course it would work but it
requires an external storage. More on that below.

3) I'm not sure I understand completely what you mean there, but I think
you mean that the idempotency can be handled by the service you call (for
instance BigQuery). Indeed that is another solution. If we were using Spark
with a Hive metastore + locking or the deltalake storage format, we could
have something to prevent a job that run twice from creating duplicates.
This is another solution we are considering, but it is coslty to change now.

You guess correctly that the feature I was asking for me would be to
provide some utility to let the users implement solution 2) without
requiring an external storage.
I think it would be a QOL improvement for some use cases, just like it
could be argued that XCom is just a QOL improvement and users could have
used an external storage themselves.
The main advantage that it brings is making the custom operators much
easier to share and reuse across the Apache Airflow community, compared to
having to set up some external
storage.

I have seen that some users used the metadata store itself as an external
storage by adding a new table to the airflow model:
http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e

And others suggested using XCom itself as an external storage by storing
information with a special task_id:
https://stackoverflow.com/a/57515143/2087478

In the discussion thread you provided it was also suggested to use
Variables to store some persisting information.

These 3 approaches work but feel quite "hacky" and I believe that providing
such functionality would be good.

Finally, I don't see the point of limiting the functionality to such
extent, providing a "IdempotencyIdStorage" that only allows you to store a
string
will just force people who need to store more than one id for one task (for
whatever reason) to use some hack again, like storing a json inside the
storage.

I was more thinking about something quite similar to XCom (I liked the
XState name suggestion), where the entry would be keyed by "(dag_id,
task_id, execution_date, key)"
where "key" can be whatever you want and would be kept across retries.

I have read (quickly) through the "Pandora's Box" thread you linked. Indeed
it looks like there would be many ways to misuse such feature.
I do understand the important of idempotency, and it looks like my use case
is one of the first ever listed where I do need to persist a state
across retries to make my operator really idempotent.

I'm surprised no one came up with it given how frequent the Spark + Airflow
combination is (well, the BigQueryOperator was one too but found another
solution).

Of course we can blame it on Livy for being poorly conceived (unlike
BigQuery) or we can blame it on Spark for not having a built-in security
mechanism to prevent double-writes,
but I think that as the above hacks show, you can't really prevent users
from shooting themselves in the foot if that's what they really want to.

While I do think that making things foolproof is important, I believe it's
also in Python's philosophy to *not* make things foolproof at the detriment
of simplicity for the right use cases.
But I do understand that the use cases are different and contradictory:
some would require the state to be persisted across reschedule and not
retries, mine would require the state to be persisted across retries and
not reschedule.

Maybe the Airflow-y way for that would be to have one task that does the
submit and an xcom with the job, then one task that check the progress of
the job, but that feels very cumbersome to double the number of tasks just
for that. Plus I'm not sure we could make the first task retry if the
second task fails...

Thanks again,

Furcy


On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <Ja...@polidea.com> wrote:

> I think we've discussed several approaches like that and using Xcom
> name (which for many people would mean "let's just extend XCom table for
> that" is not a very good idea to use it IMHO. I think this is very
> different functionality/logic which we might or might not agree to
> implement as a community. Naming it "Xcom" to trying to extend the XCom
> table behavior might be problematic.
>
> Not sure if you are aware but we had very similar discussion about it
> recently (without clear conclusions but at least you can see what kind of
> issues/problems different people have with this approach)
>
> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>
> I am not saying it is impossible to do, but I think it's a matter of how
> we formulate the "use case". It's very tempting to implement a generic -
> intra-task communication mechanism, indeed. But it can very easily lead to
> people abusing it and bypassing the guarantees (idempotency mainly) that
> Airflow provides for backfilling and re-running tasks. I thought a bit
> after the latest discussion kind of died out, and I have one possible
> solution to the problem.
>
> Let me explain what I think about it (but others can have different
> opinions of course):
>
> So far the discussion was that there are several ways to achieve what you
> want (and it's really about what entity is providing the "idempotency"
> guarantee:
>
> 1) Similarly as just merged in the BigQuery Insert Job
> https://github.com/apache/airflow/pull/8868/files - you can provide
> job_id from outside. You'd need to work out the job_id naming that works in
> your case and make sure that when you re-run your task with the same
> (dag_id, task_id, execution date) you will get the same id. Then the
> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>
> 2) Store the DAG id in some external storage (via one of the hooks - where
> it can be queried in the way that will work for you). Then the idempotency
> is actually handled by the logic in your Operator + some external storage.
>
> 3) Query your service and retrieve the JOB ID from it - but you have to
> have a way to query for the job related to your "dag id  + task
> + execution_date". Then - the idempotency is actually handling by the
> Service you are using.
>
> In the use case, you describe - this is the only thing you need -
> "idempotency source". I believe you would like to get the case 2) from
> above but without having to use external storage to store the "unique id".
> Something that will let each task in the same dag run to set or retrieve a
> unique value for that particular task. One value should be enough -
> assuming that each operator/task works on one external data "source".
>
> My current thinking is:
>
> Why don't we provide such a dedicated, idempotency service inside Airflow?
> We already have a DB and we could have an"IdempotencyIdStorage" class with
> two methods:
>
>   * .set(id: str) and
>   * .get() -> str
>
> And the data stored there should be a string keyed by "dag_id, task_id,
> execution_date)" - available also via Jinja templating. There is no
> intra-task communication, here, very little possibility of abuse and it
> seems to solve the major pain point where you have to provide your own
> storage to get the idempotency if your service does not provide one or you
> do not want to delegate it to the DAG writer.
>
> J.
>
>
> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:
>
>> The use case I'm referring to is that you can't use xcom to let a task
>> read information from it's past attempts, because when a task starts it's
>> xcom is automatically deleted.
>>
>> My specific use case is that we have a custom LivyOperator that calls
>> Livy to start batch Spark Jobs.
>> When you start a batch job Livy returns a job_id
>> Sometimes our operator can fail for one reason or another (for instance
>> if Livy is unreachable for a while)
>> When the task retries, it calls Livy again, which start the same spark
>> job, but the problem is that the spark job from the first attempt can still
>> be running,
>> and then we have a batch job that runs twice simultaneously and creates
>> duplicates in the output.
>>
>> What we tried to do is getting the job_id from the first try, to check if
>> the job is still running, and wait for it to complete if it is.
>>
>> We tried using xcom to let the task send a message to itself (to it's
>> next try) but xcom is meant for "inter-task communication" only so this
>> doesn't work and is not intended to work.
>>
>>
>>
>>
>>
>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>
>>> Hi Furcy,
>>>
>>> Can you give a concrete example of what you mean by intra-task xcom?
>>> Depending your use case this may already be possible.
>>>
>>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> I would like to open a feature request for Airflow to support
>>> "intra-task xcom".
>>>
>>> It seems that there are several distinct use cases for it already
>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>
>>> I wanted to summarize links to the use cases and past attempts,
>>> and the recommended approach (which apparently would be to create
>>> a distinct feature from xcom to support this, it could be calle
>>> intra-com or self-com ?)
>>>
>>> Do you know if such ticket already exists? I couldn't find one.
>>> Also I can't create any ticket due to some obscure bug (see my other
>>> email).
>>>
>>> Thanks,
>>>
>>> Furcy
>>>
>>>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>

Re: Intra-task "xcom"

Posted by Jarek Potiuk <Ja...@polidea.com>.
I think we've discussed several approaches like that and using Xcom
name (which for many people would mean "let's just extend XCom table for
that" is not a very good idea to use it IMHO. I think this is very
different functionality/logic which we might or might not agree to
implement as a community. Naming it "Xcom" to trying to extend the XCom
table behavior might be problematic.

Not sure if you are aware but we had very similar discussion about it
recently (without clear conclusions but at least you can see what kind of
issues/problems different people have with this approach)

https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E

I am not saying it is impossible to do, but I think it's a matter of how we
formulate the "use case". It's very tempting to implement a generic -
intra-task communication mechanism, indeed. But it can very easily lead to
people abusing it and bypassing the guarantees (idempotency mainly) that
Airflow provides for backfilling and re-running tasks. I thought a bit
after the latest discussion kind of died out, and I have one possible
solution to the problem.

Let me explain what I think about it (but others can have different
opinions of course):

So far the discussion was that there are several ways to achieve what you
want (and it's really about what entity is providing the "idempotency"
guarantee:

1) Similarly as just merged in the BigQuery Insert Job
https://github.com/apache/airflow/pull/8868/files - you can provide job_id
from outside. You'd need to work out the job_id naming that works in your
case and make sure that when you re-run your task with the same (dag_id,
task_id, execution date) you will get the same id. Then the "uniqueness"
thus idempotency is handled by the logic written in the DAG.

2) Store the DAG id in some external storage (via one of the hooks - where
it can be queried in the way that will work for you). Then the idempotency
is actually handled by the logic in your Operator + some external storage.

3) Query your service and retrieve the JOB ID from it - but you have to
have a way to query for the job related to your "dag id  + task
+ execution_date". Then - the idempotency is actually handling by the
Service you are using.

In the use case, you describe - this is the only thing you need -
"idempotency source". I believe you would like to get the case 2) from
above but without having to use external storage to store the "unique id".
Something that will let each task in the same dag run to set or retrieve a
unique value for that particular task. One value should be enough -
assuming that each operator/task works on one external data "source".

My current thinking is:

Why don't we provide such a dedicated, idempotency service inside Airflow?
We already have a DB and we could have an"IdempotencyIdStorage" class with
two methods:

  * .set(id: str) and
  * .get() -> str

And the data stored there should be a string keyed by "dag_id, task_id,
execution_date)" - available also via Jinja templating. There is no
intra-task communication, here, very little possibility of abuse and it
seems to solve the major pain point where you have to provide your own
storage to get the idempotency if your service does not provide one or you
do not want to delegate it to the DAG writer.

J.


On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <pi...@gmail.com> wrote:

> The use case I'm referring to is that you can't use xcom to let a task
> read information from it's past attempts, because when a task starts it's
> xcom is automatically deleted.
>
> My specific use case is that we have a custom LivyOperator that calls Livy
> to start batch Spark Jobs.
> When you start a batch job Livy returns a job_id
> Sometimes our operator can fail for one reason or another (for instance if
> Livy is unreachable for a while)
> When the task retries, it calls Livy again, which start the same spark
> job, but the problem is that the spark job from the first attempt can still
> be running,
> and then we have a batch job that runs twice simultaneously and creates
> duplicates in the output.
>
> What we tried to do is getting the job_id from the first try, to check if
> the job is still running, and wait for it to complete if it is.
>
> We tried using xcom to let the task send a message to itself (to it's next
> try) but xcom is meant for "inter-task communication" only so this doesn't
> work and is not intended to work.
>
>
>
>
>
> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>
>> Hi Furcy,
>>
>> Can you give a concrete example of what you mean by intra-task xcom?
>> Depending your use case this may already be possible.
>>
>> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>>
>> Hello,
>>
>> I would like to open a feature request for Airflow to support "intra-task
>> xcom".
>>
>> It seems that there are several distinct use cases for it already
>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>
>> I wanted to summarize links to the use cases and past attempts,
>> and the recommended approach (which apparently would be to create
>> a distinct feature from xcom to support this, it could be calle intra-com
>> or self-com ?)
>>
>> Do you know if such ticket already exists? I couldn't find one.
>> Also I can't create any ticket due to some obscure bug (see my other
>> email).
>>
>> Thanks,
>>
>> Furcy
>>
>>

-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Re: Intra-task "xcom"

Posted by Furcy Pin <pi...@gmail.com>.
The use case I'm referring to is that you can't use xcom to let a task read
information from it's past attempts, because when a task starts it's xcom
is automatically deleted.

My specific use case is that we have a custom LivyOperator that calls Livy
to start batch Spark Jobs.
When you start a batch job Livy returns a job_id
Sometimes our operator can fail for one reason or another (for instance if
Livy is unreachable for a while)
When the task retries, it calls Livy again, which start the same spark job,
but the problem is that the spark job from the first attempt can still be
running,
and then we have a batch job that runs twice simultaneously and creates
duplicates in the output.

What we tried to do is getting the job_id from the first try, to check if
the job is still running, and wait for it to complete if it is.

We tried using xcom to let the task send a message to itself (to it's next
try) but xcom is meant for "inter-task communication" only so this doesn't
work and is not intended to work.





On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <as...@apache.org> wrote:

> Hi Furcy,
>
> Can you give a concrete example of what you mean by intra-task xcom?
> Depending your use case this may already be possible.
>
> On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
>
> Hello,
>
> I would like to open a feature request for Airflow to support "intra-task
> xcom".
>
> It seems that there are several distinct use cases for it already
> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>
> I wanted to summarize links to the use cases and past attempts,
> and the recommended approach (which apparently would be to create
> a distinct feature from xcom to support this, it could be calle intra-com
> or self-com ?)
>
> Do you know if such ticket already exists? I couldn't find one.
> Also I can't create any ticket due to some obscure bug (see my other
> email).
>
> Thanks,
>
> Furcy
>
>

Re: Intra-task "xcom"

Posted by Ash Berlin-Taylor <as...@apache.org>.
Hi Furcy,

Can you give a concrete example of what you mean by intra-task xcom? Depending your use case this may already be possible.
On Jun 1 2020, at 11:45 am, Furcy Pin <pi...@gmail.com> wrote:
> Hello,
>
> I would like to open a feature request for Airflow to support "intra-task xcom".
> It seems that there are several distinct use cases for it already
> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>
> I wanted to summarize links to the use cases and past attempts,
> and the recommended approach (which apparently would be to create
> a distinct feature from xcom to support this, it could be calle intra-com or self-com ?)
>
> Do you know if such ticket already exists? I couldn't find one.
> Also I can't create any ticket due to some obscure bug (see my other email).
>
> Thanks,
> Furcy