You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Ash Berlin-Taylor <as...@apache.org> on 2021/02/25 14:27:24 UTC

[DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Hi everyone,

I recently started a thread about scoping out this feature, and with 
the feedback there and a lot of exploring and thinking James Timmins 
and I have come up with the following AIP

<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval>

To quote the AIP: we propose a new name for execution_date, introduce a 
new concept called "data interval", and present an interface for more 
powerful pluggable scheduling behaviours.

Once we have this feature in place we have the ability to give users 
much more control over /when/ their DAGs are scheduled, and hopefully 
we also remove one of the long standing difficult concepts to learn, 
that of the "execution_date", by replacing it with a more explicit 
concept of data_interval.

Feedback appreciated, and I would ask that we don't bike shed /too/ 
much on the name of things, but alternative suggestions are 
appreciated, as we want to find something that we don't have to rename 
again in the future :)

Thanks,
Ash and James


Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Daniel Standish <dp...@gmail.com>.
Just to explore whether this would really produce confusion for users...

There already are a number of Schedule* classes.

   - ScheduleIntervalSchema
   - SchedulerInfoSchema
   - SchedulerJob
   - SchedulerMetricsJob

(excludes test classes)

But there isn't already a Scheduler class.

Moreover, I think it's fair to say that none of these classes is
user-facing.

So airflow users would not be confonted with a Schedule class alongside a
Scheduler class.

Though it's true there remains simply the concept of the airflow scheduler,
and the associated CLI command.

I think what's salient to me is the fact that Schedule is really the
simplest and most accurate expression of what this object is.  And using a
different-but-closely-related word instead, just to stay away from `airflow
scheduler`, to me seems like a bad trade off.

I don't really like what I'm about to suggest (i.e. i like AbstractSchedule
/ CronSchedule better) but we could add *Dag* to the front of the work e.g.
AbstractDagSchedule / CronDagSchedule etc which might make things easier to
separate.

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Malthe <mb...@gmail.com>.
It might then be

DAG(
    ...
    timetable=CalendarTimeTable(
        // Wraps a time table
        TimeTable(
            execution_plan=…,
        ),
        // Whatever options define the calendar
        ...
    )
)

The idea being that a calendar wraps an existing time table, skipping
certain days (e.g. holidays). There could be lots of ways to define those
days, but a rule-based approach is of course rather nice.

As an aside, I do think "Schedule" is a better word because it's short,
rather obvious and doesn't have the Timetable vs TimeTable problem.

On Sat, 15 May 2021 at 04:06, TP Chung <tp...@astronomer.io.invalid> wrote:

> I feel you are sort of operating on a different level of abstraction from
> AIP-39. While it is true that Airflow does generally take a declarative
> approach for scheduling currently (which is a good thing and should be
> continued), AIP-39 is more about providing a foundation so richer things
> can be declared. Its design does not preclude declarative things to be
> implemented, much like how most of Python is procedural in the first place,
> but that did not prevent Airflow from having a declarative interface.
>
> Timetable does not take away much of the declarative possibility, since we
> can easily have something like
>
> DAG(
>     ...
>     timetable=CalendarTimeTable(
>         calendar=…,
>         execution_plan=…,
>     ),
> )
>
> that implements what you want. But the nice thing about this extra
> abstraction is it keeps doors open for things that might not work well for
> calendars. You may argue those are uncommon cases, but what prompted AIP-39
> in the first place were uncommon cases not considered (or intentionally
> ignored for simplicity) by the original Airflow implementation in the first
> place. AIP-39 does well providing a good foundation for most flexibility
> without sacrificing much of the declarative goodness (if at all; it’s
> arguable the TimeTable class is actually an improvement for explicitness).
>
> TP
>
>
> On 14 May 2021, at 04:59, Malthe <mb...@gmail.com> wrote:
>
> When it comes to scheduling, Airflow does take a rather declarative
> approach I would say, but it is certainly correct that it very much stops
> there.
>
> I appreciate the arguments favoring a more object-oriented design, but I
> do think that adding a couple of additional scheduling options could go a
> very long way in terms of providing that extra bit of scheduling
> flexibility – while preserving the "scripting ergonomics".
>
> The current proposal leaves most of the interesting use-cases on the table
> rather than aiming to show that the abstraction actually meets the
> requirements.
>
> Cheers
>
> On Thu, 13 May 2021 at 15:01, Kaxil Naik <ka...@gmail.com> wrote:
>
>> And also the proposed items with Timetables are more "extensible" too --
>> Users can develop some classes for their own use and create a library for
>> reusing it.
>>
>> Using arguments like you are proposing @malthe -- it can be difficult to
>> understand on all the "related" arguments to understand the scheduling /
>> schedule_interval.
>>
>> On Thu, May 13, 2021 at 3:46 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>>> I much more on Ash's proposal with this one. I think we do not want to
>>> optimize for a number of changes but instead we want to make sure what we
>>> come up with is an  easy and natural to use long-term solution. Even if it
>>> departs  a lot from what we have now, a few months (or maybe years) after
>>> it becomes mainstream nobody will remember the "old way" hopefully.
>>>
>>> The idea of "pythonic" pluggable schedule rather than "declarative" way
>>> goes perfectly in-line with the whole concept of Airflow where DAGs are
>>> defined as Python code rather than declaratively. So making schedules
>>> follow the same approach seems very natural for anyone who uses Airflow.
>>>
>>>
>>> J.
>>>
>>>
>>> On Thu, May 13, 2021 at 9:27 AM Malthe <mb...@gmail.com> wrote:
>>>
>>>> I'm a bit late to the discussion, but it might be interesting to
>>>> explore a more simple approach, cutting back on the number of changes.
>>>>
>>>> As a general remark, "execution_date" may be a slightly difficult
>>>> concept to grasp, but changing it now is perhaps counterproductive.
>>>>
>>>> From the AIP examples:
>>>>
>>>> 1. The MON-FRI problem could be handled using an optional keyword
>>>> argument "execution_interval" which defaults to `None` (meaning automatic –
>>>> this is the current behavior). But instead a `timedelta` could be provided,
>>>> i.e. `timedelta(days=1)`.
>>>>
>>>> 2. This could easily be supported
>>>> <https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in
>>>> croniter.
>>>>
>>>> 3. The trading days only (or business days, etc) problem is handled in
>>>> other systems using a calendar option. It might be that an optional keyword
>>>> argument "calendar" could be used to automatically skip days that are not
>>>> included in the calendar – the default calendar would include all days. If
>>>> `calendar` is some calendar object, `~calendar` could be the inverse,
>>>> allowing a DAG to be easily scheduled on holidays. A requirement to
>>>> schedule on the nth day of the calendar (e.g. 10th business day of the
>>>> month) could be implemented using a derived calendar
>>>> `calendar.nth_day_of_month(10)` which would further filter down the number
>>>> of included days based on an existing calendar.
>>>>
>>>> 4. The cron- vs data scheduling seems to come down to whether the dag
>>>> run is kicked off at the start of the period or immediately after. This
>>>> could be handled using an optional keyword argument "execution_plan" which
>>>> defaults to INTERVAL_END (the current behavior), but can be optionally set
>>>> to INTERVAL_START. The "execution_date" column then remains unchanged, but
>>>> the actual dag run time will be vary according to which execution plan was
>>>> specified.
>>>>
>>>> Cheers
>>>>
>>>> On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <bi...@gmail.com>
>>>> wrote:
>>>>
>>>>> I agree with Tomek.
>>>>>
>>>>> TBH, *Timetable *to me does seem to be a complex concept, and I can't
>>>>> quite understand what it is at first sight.
>>>>>
>>>>> I think *Schedule *does convey the message better - consider
>>>>> the sentence: "*the Scheduler arranges jobs to run based on some
>>>>> _______**.*"  Here, *"schedules" *seem to fit better than
>>>>> *"timetables"*
>>>>>
>>>>> As for search results on schedule vs scheduler, I wonder if
>>>>> reorganizing the docs to have `schedule` in the top section and have
>>>>> `scheduler` under operation::architecture will help with the search result?
>>>>> (I don't know much about SEO)
>>>>>
>>>>> Nevertheless, the naming shouldn't be a blocker for this feature to
>>>>> move forward.
>>>>>
>>>>> Best
>>>>> Bin
>>>>>
>>>>> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <tu...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> > Timetable is a synonym of ’Schedule’
>>>>>>
>>>>>> I'm not a native speaker and I don't get it easily as a synonym.
>>>>>>
>>>>>> To be honest the "timetable" sounds like a complex piece of software.
>>>>>> Personally I experienced that people use schedule and
>>>>>> schedule_interval interchangeably. Additionally, schedule being more linked
>>>>>> to scheduler imho is an advantage because it suggests some connection
>>>>>> between these two.
>>>>>>
>>>>>> I feel that by introducing timetable we will bring yet more
>>>>>> complexity to airflow vocabulary. And I personally would treat it as yet
>>>>>> another moving part of an already complex system.
>>>>>>
>>>>>> I think we can move forward with this feature. We renamed "functional
>>>>>> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get
>>>>>> consensus we can always ask the users - they will use the feature.
>>>>>>
>>>>>> Best,
>>>>>> Tomek
>>>>>>
>>>>>>
>>>>>> On Fri, 12 Mar 2021 at 01:15, James Timmins <
>>>>>> james@astronomer.io.invalid> wrote:
>>>>>>
>>>>>>> *Timetable vs Schedule*
>>>>>>> Re Timetable. I agree that if this was a greenfield project, it
>>>>>>> might make sense to use Schedule. But as it stands, we need to find the
>>>>>>> right balance between the most specific name and being sufficiently unique
>>>>>>> that it’s easy to work with in code and, perhaps most importantly, easy to
>>>>>>> find when searching on Google and in the Airflow Docs.
>>>>>>>
>>>>>>> There are more than 10,000 references to `schedule*` in the Airflow
>>>>>>> codebase. `schedule` and `scheduler` are also identical to most search
>>>>>>> engines/libraries, since they have the same stem, `schedule`. This means
>>>>>>> that when a user Googles `Airflow Schedule`, they will get back intermixed
>>>>>>> results of the Schedule class and the Scheduler.
>>>>>>>
>>>>>>> Timetable is a synonym of ’Schedule’, so it passes the accuracy
>>>>>>> test, won’t ever be ambiguous in code, and is distinct in search results.
>>>>>>>
>>>>>>> *Should "interval-less DAGs” have data_interval_start and end
>>>>>>> available in the context?*
>>>>>>> I think they should be present so it’s consistent across DAGs. Let’s
>>>>>>> not make users think too hard about what values are available in what
>>>>>>> context. What if someone sets the interval to 0? What if sometimes the
>>>>>>> interval is 0, and sometimes it’s 1 hour? Rather than changing the rules
>>>>>>> depending on usage, it’s easiest to have one rule that the users can depend
>>>>>>> upon.
>>>>>>>
>>>>>>> *Re set_context_variables()*
>>>>>>> What context is being defined here? The code comment says "Update or
>>>>>>> set new context variables to become available in task templates and
>>>>>>> operators.” The Timetable seems like the wrong place for variables that
>>>>>>> will get passed into task templates/operators, unless this is actually a
>>>>>>> way to pass Airflow macros into the Timetable context. In which case I
>>>>>>> fully support this. If not, we may want to add this functionality.
>>>>>>>
>>>>>>> James
>>>>>>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <ka...@gmail.com>,
>>>>>>> wrote:
>>>>>>>
>>>>>>> Yup I have no strong opinions on either so happy to keep it
>>>>>>> TimeTable or if there is another suggestion.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Kaxil
>>>>>>>
>>>>>>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins <
>>>>>>> james@astronomer.io.invalid> wrote:
>>>>>>>
>>>>>>>> Respectfully, I strongly disagree with the renaming of Timetable to
>>>>>>>> Schedule. Schedule and Scheduler aren't meaningfully different, which can
>>>>>>>> lead to a lot of confusion. Even as a native English speaker, and someone
>>>>>>>> who works on Airflow full time, I routinely need to ask for clarification
>>>>>>>> about what schedule-related concept someone is referring to. I foresee
>>>>>>>> Schedule and Scheduler as two distinct yet closely related concepts
>>>>>>>> becoming a major source of confusion.
>>>>>>>>
>>>>>>>> If folks dislike Timetable, we could certainly change to something
>>>>>>>> else, but let's not use something so similar to existing Airflow classes.
>>>>>>>>
>>>>>>>> -James
>>>>>>>>
>>>>>>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Summary of changes so far on the AIP:
>>>>>>>>>
>>>>>>>>> My proposed rename of DagRun.execution_date is now
>>>>>>>>> DagRun.schedule_date (previously I had proposed run_date. Thanks dstandish!)
>>>>>>>>>
>>>>>>>>> Timetable classes are renamed to Schedule classes (CronSchedule
>>>>>>>>> etc), similarly the DAG argument is now schedule (reminder:
>>>>>>>>> schedule_interval will not be removed or deprecated, and will still be the
>>>>>>>>> way to use "simple" expressions)
>>>>>>>>>
>>>>>>>>> -ash
>>>>>>>>>
>>>>>>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Could change Timetable To Schedule -- that would mean the DAG arg
>>>>>>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current
>>>>>>>>> `schedule_interval` but I think clear enough difference.
>>>>>>>>>
>>>>>>>>> I do like the name but my one worry with "schedule" is that
>>>>>>>>> Scheduler and Schedule are very similar, and might be be confused with each
>>>>>>>>> other for non-native English speakers? (I defer to others' judgment here,
>>>>>>>>> as this is not something I can experience myself.)
>>>>>>>>>
>>>>>>>>> @Kevin Yang <yr...@gmail.com> @Daniel Standish
>>>>>>>>> <dp...@gmail.com> any final input on this AIP?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Ash and all,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What do people think of this? Worth it? Too complex to reason
>>>>>>>>>> about what context variables might exist as a result?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I think I wouldn't worry about it right now or maybe not as part
>>>>>>>>> of this AIP. Currently, in one of the Github Issue, a user mentioned that
>>>>>>>>> it is not straightforward to know what is inside the context dictionary-
>>>>>>>>> https://github.com/apache/airflow/issues/14396. So maybe we can
>>>>>>>>> tackle this issue separately once the AbstractTimetable is built.
>>>>>>>>>
>>>>>>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>>>>>> proposal vs "DataTimetable") have data_interval_start and end available in
>>>>>>>>>> the context?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> hmm.. I would say No but then it contradicts my suggestion to
>>>>>>>>> remove context dict from this AIP. If we are going to use it in scheduler
>>>>>>>>> then yes, where data_interval_start = data_interval_end from CronTimetable.
>>>>>>>>>
>>>>>>>>> Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>>> DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>>>>> up until release, so not important to get this correct *now*.)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> No strong opinion here. Just an alternate suggestion can
>>>>>>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Should I try to roll AIP-30
>>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
>>>>>>>>>> to this, or should we make that a future addition? (My vote is for future
>>>>>>>>>> addition)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I would vote for Future addition too.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Kaxil
>>>>>>>>>
>>>>>>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I think, yes, AIP-35 or something like it would happily co-exist
>>>>>>>>>> with this proposal.
>>>>>>>>>>
>>>>>>>>>> @Daniel <dp...@gmail.com> and I have been discussing this a
>>>>>>>>>> bit on Slack, and one of the questions he asked was if the concept of
>>>>>>>>>> data_interval should be moved from DagRun as James and I suggested down on
>>>>>>>>>> to the individual task:
>>>>>>>>>>
>>>>>>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data
>>>>>>>>>> to s3. suppose that yesterday 4 of them succeeded but one failed. today, 4
>>>>>>>>>> of them should pull from yesterday. but the one that failed should pull
>>>>>>>>>> from 2 days back. so even though these normally have the same interval,
>>>>>>>>>> today they should not.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> My view on this is two fold: one, this should primarily be
>>>>>>>>>> handled by retries on the task, and secondly, having different TaskIstances
>>>>>>>>>> in the same DagRun  have different data intervals would be much harder to
>>>>>>>>>> reason about/design the UI around, so for those reasons I still think
>>>>>>>>>> interval should be a DagRun-level concept.
>>>>>>>>>>
>>>>>>>>>> (He has a stalled AIP-30 where he proposed something to address
>>>>>>>>>> this kind of "watermark" case, which we might pick up next after this AIP
>>>>>>>>>> is complete)
>>>>>>>>>>
>>>>>>>>>> One thing we might want to do is extend the interface of
>>>>>>>>>> AbstractTimetable to be able to add/update parameters in the context dict,
>>>>>>>>>> so the interface could become this:
>>>>>>>>>>
>>>>>>>>>> class AbstractTimetable(ABC):
>>>>>>>>>>     @abstractmethod
>>>>>>>>>>     def next_dagrun_info(
>>>>>>>>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>>>>>>>
>>>>>>>>>>         session: Session,
>>>>>>>>>>     ) -> Optional[DagRunInfo]:
>>>>>>>>>>         """
>>>>>>>>>>         Get information about the next DagRun of this dag after
>>>>>>>>>> ``date_last_automated_dagrun`` -- the
>>>>>>>>>>         execution date, and the earliest it could be scheduled
>>>>>>>>>>
>>>>>>>>>>         :param date_last_automated_dagrun: The
>>>>>>>>>> max(execution_date) of existing
>>>>>>>>>>             "automated" DagRuns for this dag (scheduled or
>>>>>>>>>> backfill, but not
>>>>>>>>>>             manual)
>>>>>>>>>>         """
>>>>>>>>>>
>>>>>>>>>>     @abstractmethod
>>>>>>>>>>     def set_context_variables(self, dagrun: DagRun, context: Dict
>>>>>>>>>> [str, Any]) -> None:
>>>>>>>>>>         """
>>>>>>>>>>         Update or set new context variables to become available
>>>>>>>>>> in task templates and operators.
>>>>>>>>>>         """
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What do people think of this? Worth it? Too complex to reason
>>>>>>>>>> about what context variables might exist as a result?
>>>>>>>>>>
>>>>>>>>>> *Outstanding question*:
>>>>>>>>>>
>>>>>>>>>>    - Should "interval-less DAGs" (ones using "CronTimetable" in
>>>>>>>>>>    my proposal vs "DataTimetable") have data_interval_start and end available
>>>>>>>>>>    in the context?
>>>>>>>>>>    - Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>>>    DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>>>>>    up until release, so not important to get this correct *now*.)
>>>>>>>>>>    - Should I try to roll AIP-30
>>>>>>>>>>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>>>>>>    in to this, or should we make that a future addition? (My vote is for
>>>>>>>>>>    future addition)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I'd like to start voting on this AIP next week (probably on
>>>>>>>>>> Tuesday) as I think this will be a powerful feature that eases confusing to
>>>>>>>>>> new users.
>>>>>>>>>>
>>>>>>>>>> -Ash
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based
>>>>>>>>>> Scheduling To Airflow"?
>>>>>>>>>> I think streaming was also discussed there (though it wasn't
>>>>>>>>>> really the use case).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>>>>>>>>>>
>>>>>>>>>> Hi Kevin,
>>>>>>>>>>
>>>>>>>>>> Interesting idea. My original idea was actually for
>>>>>>>>>> "interval-less DAGs" (i.e. ones where it's just "run at this time") would
>>>>>>>>>> not have data_interval_start or end, but (while drafting the AIP) we
>>>>>>>>>> decided that it was probably "easier" if those values were always datetimes.
>>>>>>>>>>
>>>>>>>>>> That said, I think having the DB model have those values be
>>>>>>>>>> nullable would future proof it without needing another migration to change
>>>>>>>>>> it. Do you think this is worth doing now?
>>>>>>>>>>
>>>>>>>>>> I haven't (yet! It's on my list) spent any significant time
>>>>>>>>>> thinking about how to make Airflow play nicely with streaming jobs. If
>>>>>>>>>> anyone else has ideas here please share them
>>>>>>>>>>
>>>>>>>>>> -ash
>>>>>>>>>>
>>>>>>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Ash and James,
>>>>>>>>>>
>>>>>>>>>> This is an exciting move. What do you think about using this
>>>>>>>>>> opportunity to extend Airflow's support to streaming like use cases? I.e
>>>>>>>>>> DAGs/tasks that want to run forever like a service. For such use cases,
>>>>>>>>>> schedule interval might not be meaningful, then do we want to make the date
>>>>>>>>>> interval param optional to DagRun and task instances? That sounds like a
>>>>>>>>>> pretty major change to the underlying model of Airflow, but this AIP is so
>>>>>>>>>> far the best opportunity I saw that can level up Airflow's support for
>>>>>>>>>> streaming/service use cases.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Kevin Y
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <
>>>>>>>>>> dpstandish@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Very excited to see this proposal come through and love the
>>>>>>>>>> direction this has gone.
>>>>>>>>>>
>>>>>>>>>> Couple comments...
>>>>>>>>>>
>>>>>>>>>> *Tree view / Data completeness view*
>>>>>>>>>>
>>>>>>>>>> When you design your tasks with the canonical idempotence
>>>>>>>>>> pattern, the tree view shows you both data completeness and task execution
>>>>>>>>>> history (success / failure etc).
>>>>>>>>>>
>>>>>>>>>> When you don't use that pattern (which is my general preference),
>>>>>>>>>> tree view is only task execution history.
>>>>>>>>>>
>>>>>>>>>> This change has the potential to unlock a data completeness view
>>>>>>>>>> for canonical tasks.  It's possible that the "data completeness view" can
>>>>>>>>>> simply be the tree view.  I.e. somehow it can use these new classes to know
>>>>>>>>>> what data was successfully filled and what data wasn't.
>>>>>>>>>>
>>>>>>>>>> To the extent we like the idea of either extending / plugging /
>>>>>>>>>> modifying tree view, or adding a distinct data completeness view, we might
>>>>>>>>>> want to anticipate the needs of that in this change.  And maybe no
>>>>>>>>>> alteration to the proposal would be needed but just want to throw the idea
>>>>>>>>>> out there.
>>>>>>>>>>
>>>>>>>>>> *Watermark workflow / incremental processing*
>>>>>>>>>>
>>>>>>>>>> A common pattern in data warehousing is pulling data
>>>>>>>>>> incrementally from a source.
>>>>>>>>>>
>>>>>>>>>> A standard way to achieve this is at the start of the task,
>>>>>>>>>> select max `updated_at` in source table and hold on to that value for a
>>>>>>>>>> minute.  This is your tentative new high watermark.
>>>>>>>>>> Now it's time to pull your data.  If your task ran before, grab
>>>>>>>>>> last high watermark.  If not, use initial load value.
>>>>>>>>>> If successful, update high watermark.
>>>>>>>>>>
>>>>>>>>>> On my team we implemented this with a stateful tasks / stateful
>>>>>>>>>> processes concept (there's a dormant draft AIP here
>>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>>>>>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>>>>>>
>>>>>>>>>> Again here, I don't have a specific suggestion at this moment.
>>>>>>>>>> But I wanted to articulate this workflow because it is common and it wasn't
>>>>>>>>>> immediately obvious to me in reading AIP-39 how I would use it to implement
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>> AIP-39 makes airflow more data-aware.  So if it can support this
>>>>>>>>>> kind of workflow that's great.  @Ash Berlin-Taylor
>>>>>>>>>> <as...@astronomer.io> do you have thoughts on how it might be
>>>>>>>>>> compatible with this kind of thing as is?
>>>>>>>>>>
>>>>>>>>>> ---
>>>>>>>>>>
>>>>>>>>>> * The base operator is designed so that Subclasses only need to
>>>>>>>>>> implement two methods:
>>>>>>>>>>     - `get_high_watermark`: produce the tentative new high
>>>>>>>>>> watermark
>>>>>>>>>>     ' `watermark_execute`: analogous to implementing poke in a
>>>>>>>>>> sensor, this is where your work is done. `execute` is left to the base
>>>>>>>>>> class, and it orchestrates (1) getting last high watermark or inital load
>>>>>>>>>> value and (2) updating new high watermark if job successful.
>>>>>>>>>>
>>>>>>>>>>
>>>
>>> --
>>> +48 660 796 129
>>>
>>
>

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Jarek Potiuk <ja...@potiuk.com>.
Very much agree with TP!

On Sat, May 15, 2021 at 6:06 AM TP Chung <tp...@astronomer.io.invalid> wrote:

> I feel you are sort of operating on a different level of abstraction from
> AIP-39. While it is true that Airflow does generally take a declarative
> approach for scheduling currently (which is a good thing and should be
> continued), AIP-39 is more about providing a foundation so richer things
> can be declared. Its design does not preclude declarative things to be
> implemented, much like how most of Python is procedural in the first place,
> but that did not prevent Airflow from having a declarative interface.
>
> Timetable does not take away much of the declarative possibility, since we
> can easily have something like
>
> DAG(
>     ...
>     timetable=CalendarTimeTable(
>         calendar=…,
>         execution_plan=…,
>     ),
> )
>
> that implements what you want. But the nice thing about this extra
> abstraction is it keeps doors open for things that might not work well for
> calendars. You may argue those are uncommon cases, but what prompted AIP-39
> in the first place were uncommon cases not considered (or intentionally
> ignored for simplicity) by the original Airflow implementation in the first
> place. AIP-39 does well providing a good foundation for most flexibility
> without sacrificing much of the declarative goodness (if at all; it’s
> arguable the TimeTable class is actually an improvement for explicitness).
>
> TP
>
>
> On 14 May 2021, at 04:59, Malthe <mb...@gmail.com> wrote:
>
> When it comes to scheduling, Airflow does take a rather declarative
> approach I would say, but it is certainly correct that it very much stops
> there.
>
> I appreciate the arguments favoring a more object-oriented design, but I
> do think that adding a couple of additional scheduling options could go a
> very long way in terms of providing that extra bit of scheduling
> flexibility – while preserving the "scripting ergonomics".
>
> The current proposal leaves most of the interesting use-cases on the table
> rather than aiming to show that the abstraction actually meets the
> requirements.
>
> Cheers
>
> On Thu, 13 May 2021 at 15:01, Kaxil Naik <ka...@gmail.com> wrote:
>
>> And also the proposed items with Timetables are more "extensible" too --
>> Users can develop some classes for their own use and create a library for
>> reusing it.
>>
>> Using arguments like you are proposing @malthe -- it can be difficult to
>> understand on all the "related" arguments to understand the scheduling /
>> schedule_interval.
>>
>> On Thu, May 13, 2021 at 3:46 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>>> I much more on Ash's proposal with this one. I think we do not want to
>>> optimize for a number of changes but instead we want to make sure what we
>>> come up with is an  easy and natural to use long-term solution. Even if it
>>> departs  a lot from what we have now, a few months (or maybe years) after
>>> it becomes mainstream nobody will remember the "old way" hopefully.
>>>
>>> The idea of "pythonic" pluggable schedule rather than "declarative" way
>>> goes perfectly in-line with the whole concept of Airflow where DAGs are
>>> defined as Python code rather than declaratively. So making schedules
>>> follow the same approach seems very natural for anyone who uses Airflow.
>>>
>>>
>>> J.
>>>
>>>
>>> On Thu, May 13, 2021 at 9:27 AM Malthe <mb...@gmail.com> wrote:
>>>
>>>> I'm a bit late to the discussion, but it might be interesting to
>>>> explore a more simple approach, cutting back on the number of changes.
>>>>
>>>> As a general remark, "execution_date" may be a slightly difficult
>>>> concept to grasp, but changing it now is perhaps counterproductive.
>>>>
>>>> From the AIP examples:
>>>>
>>>> 1. The MON-FRI problem could be handled using an optional keyword
>>>> argument "execution_interval" which defaults to `None` (meaning automatic –
>>>> this is the current behavior). But instead a `timedelta` could be provided,
>>>> i.e. `timedelta(days=1)`.
>>>>
>>>> 2. This could easily be supported
>>>> <https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in
>>>> croniter.
>>>>
>>>> 3. The trading days only (or business days, etc) problem is handled in
>>>> other systems using a calendar option. It might be that an optional keyword
>>>> argument "calendar" could be used to automatically skip days that are not
>>>> included in the calendar – the default calendar would include all days. If
>>>> `calendar` is some calendar object, `~calendar` could be the inverse,
>>>> allowing a DAG to be easily scheduled on holidays. A requirement to
>>>> schedule on the nth day of the calendar (e.g. 10th business day of the
>>>> month) could be implemented using a derived calendar
>>>> `calendar.nth_day_of_month(10)` which would further filter down the number
>>>> of included days based on an existing calendar.
>>>>
>>>> 4. The cron- vs data scheduling seems to come down to whether the dag
>>>> run is kicked off at the start of the period or immediately after. This
>>>> could be handled using an optional keyword argument "execution_plan" which
>>>> defaults to INTERVAL_END (the current behavior), but can be optionally set
>>>> to INTERVAL_START. The "execution_date" column then remains unchanged, but
>>>> the actual dag run time will be vary according to which execution plan was
>>>> specified.
>>>>
>>>> Cheers
>>>>
>>>> On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <bi...@gmail.com>
>>>> wrote:
>>>>
>>>>> I agree with Tomek.
>>>>>
>>>>> TBH, *Timetable *to me does seem to be a complex concept, and I can't
>>>>> quite understand what it is at first sight.
>>>>>
>>>>> I think *Schedule *does convey the message better - consider
>>>>> the sentence: "*the Scheduler arranges jobs to run based on some
>>>>> _______**.*"  Here, *"schedules" *seem to fit better than
>>>>> *"timetables"*
>>>>>
>>>>> As for search results on schedule vs scheduler, I wonder if
>>>>> reorganizing the docs to have `schedule` in the top section and have
>>>>> `scheduler` under operation::architecture will help with the search result?
>>>>> (I don't know much about SEO)
>>>>>
>>>>> Nevertheless, the naming shouldn't be a blocker for this feature to
>>>>> move forward.
>>>>>
>>>>> Best
>>>>> Bin
>>>>>
>>>>> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <tu...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> > Timetable is a synonym of ’Schedule’
>>>>>>
>>>>>> I'm not a native speaker and I don't get it easily as a synonym.
>>>>>>
>>>>>> To be honest the "timetable" sounds like a complex piece of software.
>>>>>> Personally I experienced that people use schedule and
>>>>>> schedule_interval interchangeably. Additionally, schedule being more linked
>>>>>> to scheduler imho is an advantage because it suggests some connection
>>>>>> between these two.
>>>>>>
>>>>>> I feel that by introducing timetable we will bring yet more
>>>>>> complexity to airflow vocabulary. And I personally would treat it as yet
>>>>>> another moving part of an already complex system.
>>>>>>
>>>>>> I think we can move forward with this feature. We renamed "functional
>>>>>> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get
>>>>>> consensus we can always ask the users - they will use the feature.
>>>>>>
>>>>>> Best,
>>>>>> Tomek
>>>>>>
>>>>>>
>>>>>> On Fri, 12 Mar 2021 at 01:15, James Timmins <
>>>>>> james@astronomer.io.invalid> wrote:
>>>>>>
>>>>>>> *Timetable vs Schedule*
>>>>>>> Re Timetable. I agree that if this was a greenfield project, it
>>>>>>> might make sense to use Schedule. But as it stands, we need to find the
>>>>>>> right balance between the most specific name and being sufficiently unique
>>>>>>> that it’s easy to work with in code and, perhaps most importantly, easy to
>>>>>>> find when searching on Google and in the Airflow Docs.
>>>>>>>
>>>>>>> There are more than 10,000 references to `schedule*` in the Airflow
>>>>>>> codebase. `schedule` and `scheduler` are also identical to most search
>>>>>>> engines/libraries, since they have the same stem, `schedule`. This means
>>>>>>> that when a user Googles `Airflow Schedule`, they will get back intermixed
>>>>>>> results of the Schedule class and the Scheduler.
>>>>>>>
>>>>>>> Timetable is a synonym of ’Schedule’, so it passes the accuracy
>>>>>>> test, won’t ever be ambiguous in code, and is distinct in search results.
>>>>>>>
>>>>>>> *Should "interval-less DAGs” have data_interval_start and end
>>>>>>> available in the context?*
>>>>>>> I think they should be present so it’s consistent across DAGs. Let’s
>>>>>>> not make users think too hard about what values are available in what
>>>>>>> context. What if someone sets the interval to 0? What if sometimes the
>>>>>>> interval is 0, and sometimes it’s 1 hour? Rather than changing the rules
>>>>>>> depending on usage, it’s easiest to have one rule that the users can depend
>>>>>>> upon.
>>>>>>>
>>>>>>> *Re set_context_variables()*
>>>>>>> What context is being defined here? The code comment says "Update or
>>>>>>> set new context variables to become available in task templates and
>>>>>>> operators.” The Timetable seems like the wrong place for variables that
>>>>>>> will get passed into task templates/operators, unless this is actually a
>>>>>>> way to pass Airflow macros into the Timetable context. In which case I
>>>>>>> fully support this. If not, we may want to add this functionality.
>>>>>>>
>>>>>>> James
>>>>>>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <ka...@gmail.com>,
>>>>>>> wrote:
>>>>>>>
>>>>>>> Yup I have no strong opinions on either so happy to keep it
>>>>>>> TimeTable or if there is another suggestion.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Kaxil
>>>>>>>
>>>>>>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins <
>>>>>>> james@astronomer.io.invalid> wrote:
>>>>>>>
>>>>>>>> Respectfully, I strongly disagree with the renaming of Timetable to
>>>>>>>> Schedule. Schedule and Scheduler aren't meaningfully different, which can
>>>>>>>> lead to a lot of confusion. Even as a native English speaker, and someone
>>>>>>>> who works on Airflow full time, I routinely need to ask for clarification
>>>>>>>> about what schedule-related concept someone is referring to. I foresee
>>>>>>>> Schedule and Scheduler as two distinct yet closely related concepts
>>>>>>>> becoming a major source of confusion.
>>>>>>>>
>>>>>>>> If folks dislike Timetable, we could certainly change to something
>>>>>>>> else, but let's not use something so similar to existing Airflow classes.
>>>>>>>>
>>>>>>>> -James
>>>>>>>>
>>>>>>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Summary of changes so far on the AIP:
>>>>>>>>>
>>>>>>>>> My proposed rename of DagRun.execution_date is now
>>>>>>>>> DagRun.schedule_date (previously I had proposed run_date. Thanks dstandish!)
>>>>>>>>>
>>>>>>>>> Timetable classes are renamed to Schedule classes (CronSchedule
>>>>>>>>> etc), similarly the DAG argument is now schedule (reminder:
>>>>>>>>> schedule_interval will not be removed or deprecated, and will still be the
>>>>>>>>> way to use "simple" expressions)
>>>>>>>>>
>>>>>>>>> -ash
>>>>>>>>>
>>>>>>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Could change Timetable To Schedule -- that would mean the DAG arg
>>>>>>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current
>>>>>>>>> `schedule_interval` but I think clear enough difference.
>>>>>>>>>
>>>>>>>>> I do like the name but my one worry with "schedule" is that
>>>>>>>>> Scheduler and Schedule are very similar, and might be be confused with each
>>>>>>>>> other for non-native English speakers? (I defer to others' judgment here,
>>>>>>>>> as this is not something I can experience myself.)
>>>>>>>>>
>>>>>>>>> @Kevin Yang <yr...@gmail.com> @Daniel Standish
>>>>>>>>> <dp...@gmail.com> any final input on this AIP?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Ash and all,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What do people think of this? Worth it? Too complex to reason
>>>>>>>>>> about what context variables might exist as a result?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I think I wouldn't worry about it right now or maybe not as part
>>>>>>>>> of this AIP. Currently, in one of the Github Issue, a user mentioned that
>>>>>>>>> it is not straightforward to know what is inside the context dictionary-
>>>>>>>>> https://github.com/apache/airflow/issues/14396. So maybe we can
>>>>>>>>> tackle this issue separately once the AbstractTimetable is built.
>>>>>>>>>
>>>>>>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>>>>>> proposal vs "DataTimetable") have data_interval_start and end available in
>>>>>>>>>> the context?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> hmm.. I would say No but then it contradicts my suggestion to
>>>>>>>>> remove context dict from this AIP. If we are going to use it in scheduler
>>>>>>>>> then yes, where data_interval_start = data_interval_end from CronTimetable.
>>>>>>>>>
>>>>>>>>> Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>>> DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>>>>> up until release, so not important to get this correct *now*.)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> No strong opinion here. Just an alternate suggestion can
>>>>>>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Should I try to roll AIP-30
>>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
>>>>>>>>>> to this, or should we make that a future addition? (My vote is for future
>>>>>>>>>> addition)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I would vote for Future addition too.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Kaxil
>>>>>>>>>
>>>>>>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I think, yes, AIP-35 or something like it would happily co-exist
>>>>>>>>>> with this proposal.
>>>>>>>>>>
>>>>>>>>>> @Daniel <dp...@gmail.com> and I have been discussing this a
>>>>>>>>>> bit on Slack, and one of the questions he asked was if the concept of
>>>>>>>>>> data_interval should be moved from DagRun as James and I suggested down on
>>>>>>>>>> to the individual task:
>>>>>>>>>>
>>>>>>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data
>>>>>>>>>> to s3. suppose that yesterday 4 of them succeeded but one failed. today, 4
>>>>>>>>>> of them should pull from yesterday. but the one that failed should pull
>>>>>>>>>> from 2 days back. so even though these normally have the same interval,
>>>>>>>>>> today they should not.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> My view on this is two fold: one, this should primarily be
>>>>>>>>>> handled by retries on the task, and secondly, having different TaskIstances
>>>>>>>>>> in the same DagRun  have different data intervals would be much harder to
>>>>>>>>>> reason about/design the UI around, so for those reasons I still think
>>>>>>>>>> interval should be a DagRun-level concept.
>>>>>>>>>>
>>>>>>>>>> (He has a stalled AIP-30 where he proposed something to address
>>>>>>>>>> this kind of "watermark" case, which we might pick up next after this AIP
>>>>>>>>>> is complete)
>>>>>>>>>>
>>>>>>>>>> One thing we might want to do is extend the interface of
>>>>>>>>>> AbstractTimetable to be able to add/update parameters in the context dict,
>>>>>>>>>> so the interface could become this:
>>>>>>>>>>
>>>>>>>>>> class AbstractTimetable(ABC):
>>>>>>>>>>     @abstractmethod
>>>>>>>>>>     def next_dagrun_info(
>>>>>>>>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>>>>>>>
>>>>>>>>>>         session: Session,
>>>>>>>>>>     ) -> Optional[DagRunInfo]:
>>>>>>>>>>         """
>>>>>>>>>>         Get information about the next DagRun of this dag after
>>>>>>>>>> ``date_last_automated_dagrun`` -- the
>>>>>>>>>>         execution date, and the earliest it could be scheduled
>>>>>>>>>>
>>>>>>>>>>         :param date_last_automated_dagrun: The
>>>>>>>>>> max(execution_date) of existing
>>>>>>>>>>             "automated" DagRuns for this dag (scheduled or
>>>>>>>>>> backfill, but not
>>>>>>>>>>             manual)
>>>>>>>>>>         """
>>>>>>>>>>
>>>>>>>>>>     @abstractmethod
>>>>>>>>>>     def set_context_variables(self, dagrun: DagRun, context: Dict
>>>>>>>>>> [str, Any]) -> None:
>>>>>>>>>>         """
>>>>>>>>>>         Update or set new context variables to become available
>>>>>>>>>> in task templates and operators.
>>>>>>>>>>         """
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What do people think of this? Worth it? Too complex to reason
>>>>>>>>>> about what context variables might exist as a result?
>>>>>>>>>>
>>>>>>>>>> *Outstanding question*:
>>>>>>>>>>
>>>>>>>>>>    - Should "interval-less DAGs" (ones using "CronTimetable" in
>>>>>>>>>>    my proposal vs "DataTimetable") have data_interval_start and end available
>>>>>>>>>>    in the context?
>>>>>>>>>>    - Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>>>    DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>>>>>    up until release, so not important to get this correct *now*.)
>>>>>>>>>>    - Should I try to roll AIP-30
>>>>>>>>>>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>>>>>>    in to this, or should we make that a future addition? (My vote is for
>>>>>>>>>>    future addition)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I'd like to start voting on this AIP next week (probably on
>>>>>>>>>> Tuesday) as I think this will be a powerful feature that eases confusing to
>>>>>>>>>> new users.
>>>>>>>>>>
>>>>>>>>>> -Ash
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based
>>>>>>>>>> Scheduling To Airflow"?
>>>>>>>>>> I think streaming was also discussed there (though it wasn't
>>>>>>>>>> really the use case).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>>>>>>>>>>
>>>>>>>>>> Hi Kevin,
>>>>>>>>>>
>>>>>>>>>> Interesting idea. My original idea was actually for
>>>>>>>>>> "interval-less DAGs" (i.e. ones where it's just "run at this time") would
>>>>>>>>>> not have data_interval_start or end, but (while drafting the AIP) we
>>>>>>>>>> decided that it was probably "easier" if those values were always datetimes.
>>>>>>>>>>
>>>>>>>>>> That said, I think having the DB model have those values be
>>>>>>>>>> nullable would future proof it without needing another migration to change
>>>>>>>>>> it. Do you think this is worth doing now?
>>>>>>>>>>
>>>>>>>>>> I haven't (yet! It's on my list) spent any significant time
>>>>>>>>>> thinking about how to make Airflow play nicely with streaming jobs. If
>>>>>>>>>> anyone else has ideas here please share them
>>>>>>>>>>
>>>>>>>>>> -ash
>>>>>>>>>>
>>>>>>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Ash and James,
>>>>>>>>>>
>>>>>>>>>> This is an exciting move. What do you think about using this
>>>>>>>>>> opportunity to extend Airflow's support to streaming like use cases? I.e
>>>>>>>>>> DAGs/tasks that want to run forever like a service. For such use cases,
>>>>>>>>>> schedule interval might not be meaningful, then do we want to make the date
>>>>>>>>>> interval param optional to DagRun and task instances? That sounds like a
>>>>>>>>>> pretty major change to the underlying model of Airflow, but this AIP is so
>>>>>>>>>> far the best opportunity I saw that can level up Airflow's support for
>>>>>>>>>> streaming/service use cases.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Kevin Y
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <
>>>>>>>>>> dpstandish@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Very excited to see this proposal come through and love the
>>>>>>>>>> direction this has gone.
>>>>>>>>>>
>>>>>>>>>> Couple comments...
>>>>>>>>>>
>>>>>>>>>> *Tree view / Data completeness view*
>>>>>>>>>>
>>>>>>>>>> When you design your tasks with the canonical idempotence
>>>>>>>>>> pattern, the tree view shows you both data completeness and task execution
>>>>>>>>>> history (success / failure etc).
>>>>>>>>>>
>>>>>>>>>> When you don't use that pattern (which is my general preference),
>>>>>>>>>> tree view is only task execution history.
>>>>>>>>>>
>>>>>>>>>> This change has the potential to unlock a data completeness view
>>>>>>>>>> for canonical tasks.  It's possible that the "data completeness view" can
>>>>>>>>>> simply be the tree view.  I.e. somehow it can use these new classes to know
>>>>>>>>>> what data was successfully filled and what data wasn't.
>>>>>>>>>>
>>>>>>>>>> To the extent we like the idea of either extending / plugging /
>>>>>>>>>> modifying tree view, or adding a distinct data completeness view, we might
>>>>>>>>>> want to anticipate the needs of that in this change.  And maybe no
>>>>>>>>>> alteration to the proposal would be needed but just want to throw the idea
>>>>>>>>>> out there.
>>>>>>>>>>
>>>>>>>>>> *Watermark workflow / incremental processing*
>>>>>>>>>>
>>>>>>>>>> A common pattern in data warehousing is pulling data
>>>>>>>>>> incrementally from a source.
>>>>>>>>>>
>>>>>>>>>> A standard way to achieve this is at the start of the task,
>>>>>>>>>> select max `updated_at` in source table and hold on to that value for a
>>>>>>>>>> minute.  This is your tentative new high watermark.
>>>>>>>>>> Now it's time to pull your data.  If your task ran before, grab
>>>>>>>>>> last high watermark.  If not, use initial load value.
>>>>>>>>>> If successful, update high watermark.
>>>>>>>>>>
>>>>>>>>>> On my team we implemented this with a stateful tasks / stateful
>>>>>>>>>> processes concept (there's a dormant draft AIP here
>>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>>>>>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>>>>>>
>>>>>>>>>> Again here, I don't have a specific suggestion at this moment.
>>>>>>>>>> But I wanted to articulate this workflow because it is common and it wasn't
>>>>>>>>>> immediately obvious to me in reading AIP-39 how I would use it to implement
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>> AIP-39 makes airflow more data-aware.  So if it can support this
>>>>>>>>>> kind of workflow that's great.  @Ash Berlin-Taylor
>>>>>>>>>> <as...@astronomer.io> do you have thoughts on how it might be
>>>>>>>>>> compatible with this kind of thing as is?
>>>>>>>>>>
>>>>>>>>>> ---
>>>>>>>>>>
>>>>>>>>>> * The base operator is designed so that Subclasses only need to
>>>>>>>>>> implement two methods:
>>>>>>>>>>     - `get_high_watermark`: produce the tentative new high
>>>>>>>>>> watermark
>>>>>>>>>>     ' `watermark_execute`: analogous to implementing poke in a
>>>>>>>>>> sensor, this is where your work is done. `execute` is left to the base
>>>>>>>>>> class, and it orchestrates (1) getting last high watermark or inital load
>>>>>>>>>> value and (2) updating new high watermark if job successful.
>>>>>>>>>>
>>>>>>>>>>
>>>
>>> --
>>> +48 660 796 129
>>>
>>
>

-- 
+48 660 796 129

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by TP Chung <tp...@astronomer.io.INVALID>.
I feel you are sort of operating on a different level of abstraction from AIP-39. While it is true that Airflow does generally take a declarative approach for scheduling currently (which is a good thing and should be continued), AIP-39 is more about providing a foundation so richer things can be declared. Its design does not preclude declarative things to be implemented, much like how most of Python is procedural in the first place, but that did not prevent Airflow from having a declarative interface.

Timetable does not take away much of the declarative possibility, since we can easily have something like

DAG(
    ...
    timetable=CalendarTimeTable(
        calendar=…,
        execution_plan=…,
    ),
)

that implements what you want. But the nice thing about this extra abstraction is it keeps doors open for things that might not work well for calendars. You may argue those are uncommon cases, but what prompted AIP-39 in the first place were uncommon cases not considered (or intentionally ignored for simplicity) by the original Airflow implementation in the first place. AIP-39 does well providing a good foundation for most flexibility without sacrificing much of the declarative goodness (if at all; it’s arguable the TimeTable class is actually an improvement for explicitness).

TP


> On 14 May 2021, at 04:59, Malthe <mb...@gmail.com> wrote:
> 
> When it comes to scheduling, Airflow does take a rather declarative approach I would say, but it is certainly correct that it very much stops there.
> 
> I appreciate the arguments favoring a more object-oriented design, but I do think that adding a couple of additional scheduling options could go a very long way in terms of providing that extra bit of scheduling flexibility – while preserving the "scripting ergonomics".
> 
> The current proposal leaves most of the interesting use-cases on the table rather than aiming to show that the abstraction actually meets the requirements.
> 
> Cheers
> 
> On Thu, 13 May 2021 at 15:01, Kaxil Naik <kaxilnaik@gmail.com <ma...@gmail.com>> wrote:
> And also the proposed items with Timetables are more "extensible" too -- Users can develop some classes for their own use and create a library for reusing it.
> 
> Using arguments like you are proposing @malthe -- it can be difficult to understand on all the "related" arguments to understand the scheduling / schedule_interval.
> 
> On Thu, May 13, 2021 at 3:46 PM Jarek Potiuk <jarek@potiuk.com <ma...@potiuk.com>> wrote:
> I much more on Ash's proposal with this one. I think we do not want to optimize for a number of changes but instead we want to make sure what we come up with is an  easy and natural to use long-term solution. Even if it departs  a lot from what we have now, a few months (or maybe years) after it becomes mainstream nobody will remember the "old way" hopefully.
> 
> The idea of "pythonic" pluggable schedule rather than "declarative" way goes perfectly in-line with the whole concept of Airflow where DAGs are defined as Python code rather than declaratively. So making schedules follow the same approach seems very natural for anyone who uses Airflow.
> 
> 
> J.
> 
> 
> On Thu, May 13, 2021 at 9:27 AM Malthe <mborch@gmail.com <ma...@gmail.com>> wrote:
> I'm a bit late to the discussion, but it might be interesting to explore a more simple approach, cutting back on the number of changes.
> 
> As a general remark, "execution_date" may be a slightly difficult concept to grasp, but changing it now is perhaps counterproductive.
> 
> From the AIP examples:
> 
> 1. The MON-FRI problem could be handled using an optional keyword argument "execution_interval" which defaults to `None` (meaning automatic – this is the current behavior). But instead a `timedelta` could be provided, i.e. `timedelta(days=1)`.
> 
> 2. This could easily be supported <https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in croniter.
> 
> 3. The trading days only (or business days, etc) problem is handled in other systems using a calendar option. It might be that an optional keyword argument "calendar" could be used to automatically skip days that are not included in the calendar – the default calendar would include all days. If `calendar` is some calendar object, `~calendar` could be the inverse, allowing a DAG to be easily scheduled on holidays. A requirement to schedule on the nth day of the calendar (e.g. 10th business day of the month) could be implemented using a derived calendar `calendar.nth_day_of_month(10)` which would further filter down the number of included days based on an existing calendar.
> 
> 4. The cron- vs data scheduling seems to come down to whether the dag run is kicked off at the start of the period or immediately after. This could be handled using an optional keyword argument "execution_plan" which defaults to INTERVAL_END (the current behavior), but can be optionally set to INTERVAL_START. The "execution_date" column then remains unchanged, but the actual dag run time will be vary according to which execution plan was specified.
> 
> Cheers
> 
> On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <bin.huangxb@gmail.com <ma...@gmail.com>> wrote:
> I agree with Tomek.
> 
> TBH, Timetable to me does seem to be a complex concept, and I can't quite understand what it is at first sight. 
> 
> I think Schedule does convey the message better - consider the sentence: "the Scheduler arranges jobs to run based on some _______."  Here, "schedules" seem to fit better than "timetables"
> 
> As for search results on schedule vs scheduler, I wonder if reorganizing the docs to have `schedule` in the top section and have `scheduler` under operation::architecture will help with the search result? (I don't know much about SEO)
> 
> Nevertheless, the naming shouldn't be a blocker for this feature to move forward.
> 
> Best
> Bin
> 
> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <turbaszek@apache.org <ma...@apache.org>> wrote:
> > Timetable is a synonym of ’Schedule’
> 
> I'm not a native speaker and I don't get it easily as a synonym. 
> 
> To be honest the "timetable" sounds like a complex piece of software. Personally I experienced that people use schedule and schedule_interval interchangeably. Additionally, schedule being more linked to scheduler imho is an advantage because it suggests some connection between these two.
> 
> I feel that by introducing timetable we will bring yet more complexity to airflow vocabulary. And I personally would treat it as yet another moving part of an already complex system.
> 
> I think we can move forward with this feature. We renamed "functional DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get consensus we can always ask the users - they will use the feature.
> 
> Best,
> Tomek
> 
> 
> On Fri, 12 Mar 2021 at 01:15, James Timmins <ja...@astronomer.io.invalid> wrote:
> Timetable vs Schedule
> Re Timetable. I agree that if this was a greenfield project, it might make sense to use Schedule. But as it stands, we need to find the right balance between the most specific name and being sufficiently unique that it’s easy to work with in code and, perhaps most importantly, easy to find when searching on Google and in the Airflow Docs.
> 
> There are more than 10,000 references to `schedule*` in the Airflow codebase. `schedule` and `scheduler` are also identical to most search engines/libraries, since they have the same stem, `schedule`. This means that when a user Googles `Airflow Schedule`, they will get back intermixed results of the Schedule class and the Scheduler.
> 
> Timetable is a synonym of ’Schedule’, so it passes the accuracy test, won’t ever be ambiguous in code, and is distinct in search results.
> 
> Should "interval-less DAGs” have data_interval_start and end available in the context?
> I think they should be present so it’s consistent across DAGs. Let’s not make users think too hard about what values are available in what context. What if someone sets the interval to 0? What if sometimes the interval is 0, and sometimes it’s 1 hour? Rather than changing the rules depending on usage, it’s easiest to have one rule that the users can depend upon.
> 
> Re set_context_variables()
> What context is being defined here? The code comment says "Update or set new context variables to become available in task templates and operators.” The Timetable seems like the wrong place for variables that will get passed into task templates/operators, unless this is actually a way to pass Airflow macros into the Timetable context. In which case I fully support this. If not, we may want to add this functionality.
> 
> James
> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <kaxilnaik@gmail.com <ma...@gmail.com>>, wrote:
>> Yup I have no strong opinions on either so happy to keep it TimeTable or if there is another suggestion.
>> 
>> Regards,
>> Kaxil
>> 
>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins <ja...@astronomer.io.invalid> wrote:
>> Respectfully, I strongly disagree with the renaming of Timetable to Schedule. Schedule and Scheduler aren't meaningfully different, which can lead to a lot of confusion. Even as a native English speaker, and someone who works on Airflow full time, I routinely need to ask for clarification about what schedule-related concept someone is referring to. I foresee Schedule and Scheduler as two distinct yet closely related concepts becoming a major source of confusion.
>> 
>> If folks dislike Timetable, we could certainly change to something else, but let's not use something so similar to existing Airflow classes.
>> 
>> -James
>> 
>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <ash@apache.org <ma...@apache.org>> wrote:
>> Summary of changes so far on the AIP:
>> 
>> My proposed rename of DagRun.execution_date is now DagRun.schedule_date (previously I had proposed run_date. Thanks dstandish!)
>> 
>> Timetable classes are renamed to Schedule classes (CronSchedule etc), similarly the DAG argument is now schedule (reminder: schedule_interval will not be removed or deprecated, and will still be the way to use "simple" expressions)
>> 
>> -ash
>> 
>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <ash@apache.org <ma...@apache.org>> wrote:
>>> Could change Timetable To Schedule -- that would mean the DAG arg becomes `schedule=CronSchedule(...)` -- a bit close to the current `schedule_interval` but I think clear enough difference.
>>> 
>>> I do like the name but my one worry with "schedule" is that Scheduler and Schedule are very similar, and might be be confused with each other for non-native English speakers? (I defer to others' judgment here, as this is not something I can experience myself.) 
>>> 
>>> @Kevin Yang <ma...@gmail.com> @Daniel Standish <ma...@gmail.com> any final input on this AIP?
>>> 
>>> 
>>> 
>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <kaxilnaik@gmail.com <ma...@gmail.com>> wrote:
>>>> Hi Ash and all,
>>>> 
>>>> 
>>>> What do people think of this? Worth it? Too complex to reason about what context variables might exist as a result?
>>>> 
>>>> I think I wouldn't worry about it right now or maybe not as part of this AIP. Currently, in one of the Github Issue, a user mentioned that it is not straightforward to know what is inside the context dictionary- https://github.com/apache/airflow/issues/14396 <https://github.com/apache/airflow/issues/14396>. So maybe we can tackle this issue separately once the AbstractTimetable is built.
>>>> 
>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs "DataTimetable") have data_interval_start and end available in the context?
>>>> 
>>>> hmm.. I would say No but then it contradicts my suggestion to remove context dict from this AIP. If we are going to use it in scheduler then yes, where data_interval_start = data_interval_end from CronTimetable.
>>>> 
>>>> Does anyone have any better names than TimeDeltaTimetable, DataTimetable, and CronTimetable? (We can probably change these names right up until release, so not important to get this correct now.)
>>>> 
>>>> No strong opinion here. Just an alternate suggestion can be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>>  
>>>> Should I try to roll AIP-30 <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in to this, or should we make that a future addition? (My vote is for future addition)
>>>> 
>>>> I would vote for Future addition too.
>>>> 
>>>> Regards,
>>>> Kaxil
>>>> 
>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <ash@apache.org <ma...@apache.org>> wrote:
>>>> I think, yes, AIP-35 or something like it would happily co-exist with this proposal.
>>>> 
>>>> @Daniel <ma...@gmail.com> and I have been discussing this a bit on Slack, and one of the questions he asked was if the concept of data_interval should be moved from DagRun as James and I suggested down on to the individual task:
>>>> 
>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data to s3. suppose that yesterday 4 of them succeeded but one failed. today, 4 of them should pull from yesterday. but the one that failed should pull from 2 days back. so even though these normally have the same interval, today they should not.
>>>> 
>>>> My view on this is two fold: one, this should primarily be handled by retries on the task, and secondly, having different TaskIstances in the same DagRun  have different data intervals would be much harder to reason about/design the UI around, so for those reasons I still think interval should be a DagRun-level concept.
>>>> 
>>>> (He has a stalled AIP-30 where he proposed something to address this kind of "watermark" case, which we might pick up next after this AIP is complete)
>>>> 
>>>> One thing we might want to do is extend the interface of AbstractTimetable to be able to add/update parameters in the context dict, so the interface could become this:
>>>> 
>>>> class AbstractTimetable(ABC):
>>>>     @abstractmethod
>>>>     def next_dagrun_info(
>>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>  
>>>>         session: Session,
>>>>     ) -> Optional[DagRunInfo]:
>>>>         """
>>>>         Get information about the next DagRun of this dag after ``date_last_automated_dagrun`` -- the
>>>>         execution date, and the earliest it could be scheduled
>>>>  
>>>>         :param date_last_automated_dagrun: The max(execution_date) of existing
>>>>             "automated" DagRuns for this dag (scheduled or backfill, but not
>>>>             manual)
>>>>         """
>>>>  
>>>>     @abstractmethod
>>>>     def set_context_variables(self, dagrun: DagRun, context: Dict[str, Any]) -> None:
>>>>         """
>>>>         Update or set new context variables to become available in task templates and operators.
>>>>         """
>>>> 
>>>> 
>>>> What do people think of this? Worth it? Too complex to reason about what context variables might exist as a result?
>>>> 
>>>> Outstanding question:
>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs "DataTimetable") have data_interval_start and end available in the context?
>>>> Does anyone have any better names than TimeDeltaTimetable, DataTimetable, and CronTimetable? (We can probably change these names right up until release, so not important to get this correct now.)
>>>> Should I try to roll AIP-30 <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in to this, or should we make that a future addition? (My vote is for future addition)
>>>> 
>>>> I'd like to start voting on this AIP next week (probably on Tuesday) as I think this will be a powerful feature that eases confusing to new users.
>>>> 
>>>> -Ash
>>>> 
>>>> 
>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <alexinhert@yandex.com <ma...@yandex.com>> wrote:
>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling To Airflow"?
>>>>> I think streaming was also discussed there (though it wasn't really the use case).
>>>>> 
>>>>> 
>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <ash@apache.org <ma...@apache.org>>:
>>>>> Hi Kevin,
>>>>> 
>>>>> Interesting idea. My original idea was actually for "interval-less DAGs" (i.e. ones where it's just "run at this time") would not have data_interval_start or end, but (while drafting the AIP) we decided that it was probably "easier" if those values were always datetimes.
>>>>> 
>>>>> That said, I think having the DB model have those values be nullable would future proof it without needing another migration to change it. Do you think this is worth doing now?
>>>>> 
>>>>> I haven't (yet! It's on my list) spent any significant time thinking about how to make Airflow play nicely with streaming jobs. If anyone else has ideas here please share them
>>>>> 
>>>>> -ash
>>>>> 
>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yrqls21@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hi Ash and James,
>>>>> 
>>>>> This is an exciting move. What do you think about using this opportunity to extend Airflow's support to streaming like use cases? I.e DAGs/tasks that want to run forever like a service. For such use cases, schedule interval might not be meaningful, then do we want to make the date interval param optional to DagRun and task instances? That sounds like a pretty major change to the underlying model of Airflow, but this AIP is so far the best opportunity I saw that can level up Airflow's support for streaming/service use cases.
>>>>> 
>>>>> 
>>>>> Cheers,
>>>>> Kevin Y
>>>>> 
>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dpstandish@gmail.com <ma...@gmail.com>> wrote:
>>>>> Very excited to see this proposal come through and love the direction this has gone.
>>>>> 
>>>>> Couple comments...
>>>>> 
>>>>> Tree view / Data completeness view
>>>>> 
>>>>> When you design your tasks with the canonical idempotence pattern, the tree view shows you both data completeness and task execution history (success / failure etc).
>>>>> 
>>>>> When you don't use that pattern (which is my general preference), tree view is only task execution history.
>>>>> 
>>>>> This change has the potential to unlock a data completeness view for canonical tasks.  It's possible that the "data completeness view" can simply be the tree view.  I.e. somehow it can use these new classes to know what data was successfully filled and what data wasn't.
>>>>> 
>>>>> To the extent we like the idea of either extending / plugging / modifying tree view, or adding a distinct data completeness view, we might want to anticipate the needs of that in this change.  And maybe no alteration to the proposal would be needed but just want to throw the idea out there.
>>>>> 
>>>>> Watermark workflow / incremental processing
>>>>> 
>>>>> A common pattern in data warehousing is pulling data incrementally from a source.
>>>>> 
>>>>> A standard way to achieve this is at the start of the task, select max `updated_at` in source table and hold on to that value for a minute.  This is your tentative new high watermark.  
>>>>> Now it's time to pull your data.  If your task ran before, grab last high watermark.  If not, use initial load value.
>>>>> If successful, update high watermark.
>>>>> 
>>>>> On my team we implemented this with a stateful tasks / stateful processes concept (there's a dormant draft AIP here <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) and a WatermarkOperator that handled the boilerplate*.
>>>>> 
>>>>> Again here, I don't have a specific suggestion at this moment.  But I wanted to articulate this workflow because it is common and it wasn't immediately obvious to me in reading AIP-39 how I would use it to implement it.
>>>>> 
>>>>> AIP-39 makes airflow more data-aware.  So if it can support this kind of workflow that's great.  @Ash Berlin-Taylor <ma...@astronomer.io> do you have thoughts on how it might be compatible with this kind of thing as is?
>>>>> 
>>>>> ---
>>>>> 
>>>>> * The base operator is designed so that Subclasses only need to implement two methods:
>>>>>     - `get_high_watermark`: produce the tentative new high watermark
>>>>>     ' `watermark_execute`: analogous to implementing poke in a sensor, this is where your work is done. `execute` is left to the base class, and it orchestrates (1) getting last high watermark or inital load value and (2) updating new high watermark if job successful.
>>>>> 
> 
> 
> -- 
> +48 660 796 129


Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Jarek Potiuk <ja...@potiuk.com>.
>
> The current proposal leaves most of the interesting use-cases on the table
> rather than aiming to show that the abstraction actually meets the
> requirements.
>

I think quite the opposite. I think we will have a number of "interesting
use-cases" implemented as ready-to-use timetables so in vast majority of
cases you will have a simple, predefined "Timetable" implementation that
will have at most some simple configuration, and it will be even better and
simpler to use in "Scripting Ergonomics" and cover vast majority of cases.
And being able to define your own aptly named schedules and use them in
DAGs rather than copy & paste complex set of parameters to set in multiple
Dags that want to reuse the same "kind" of schedule seems "ergonomically
superior" by all means.

J.

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Malthe <mb...@gmail.com>.
When it comes to scheduling, Airflow does take a rather declarative
approach I would say, but it is certainly correct that it very much stops
there.

I appreciate the arguments favoring a more object-oriented design, but I do
think that adding a couple of additional scheduling options could go a very
long way in terms of providing that extra bit of scheduling flexibility –
while preserving the "scripting ergonomics".

The current proposal leaves most of the interesting use-cases on the table
rather than aiming to show that the abstraction actually meets the
requirements.

Cheers

On Thu, 13 May 2021 at 15:01, Kaxil Naik <ka...@gmail.com> wrote:

> And also the proposed items with Timetables are more "extensible" too --
> Users can develop some classes for their own use and create a library for
> reusing it.
>
> Using arguments like you are proposing @malthe -- it can be difficult to
> understand on all the "related" arguments to understand the scheduling /
> schedule_interval.
>
> On Thu, May 13, 2021 at 3:46 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> I much more on Ash's proposal with this one. I think we do not want to
>> optimize for a number of changes but instead we want to make sure what we
>> come up with is an  easy and natural to use long-term solution. Even if it
>> departs  a lot from what we have now, a few months (or maybe years) after
>> it becomes mainstream nobody will remember the "old way" hopefully.
>>
>> The idea of "pythonic" pluggable schedule rather than "declarative" way
>> goes perfectly in-line with the whole concept of Airflow where DAGs are
>> defined as Python code rather than declaratively. So making schedules
>> follow the same approach seems very natural for anyone who uses Airflow.
>>
>>
>> J.
>>
>>
>> On Thu, May 13, 2021 at 9:27 AM Malthe <mb...@gmail.com> wrote:
>>
>>> I'm a bit late to the discussion, but it might be interesting to explore
>>> a more simple approach, cutting back on the number of changes.
>>>
>>> As a general remark, "execution_date" may be a slightly difficult
>>> concept to grasp, but changing it now is perhaps counterproductive.
>>>
>>> From the AIP examples:
>>>
>>> 1. The MON-FRI problem could be handled using an optional keyword
>>> argument "execution_interval" which defaults to `None` (meaning automatic –
>>> this is the current behavior). But instead a `timedelta` could be provided,
>>> i.e. `timedelta(days=1)`.
>>>
>>> 2. This could easily be supported
>>> <https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in
>>> croniter.
>>>
>>> 3. The trading days only (or business days, etc) problem is handled in
>>> other systems using a calendar option. It might be that an optional keyword
>>> argument "calendar" could be used to automatically skip days that are not
>>> included in the calendar – the default calendar would include all days. If
>>> `calendar` is some calendar object, `~calendar` could be the inverse,
>>> allowing a DAG to be easily scheduled on holidays. A requirement to
>>> schedule on the nth day of the calendar (e.g. 10th business day of the
>>> month) could be implemented using a derived calendar
>>> `calendar.nth_day_of_month(10)` which would further filter down the number
>>> of included days based on an existing calendar.
>>>
>>> 4. The cron- vs data scheduling seems to come down to whether the dag
>>> run is kicked off at the start of the period or immediately after. This
>>> could be handled using an optional keyword argument "execution_plan" which
>>> defaults to INTERVAL_END (the current behavior), but can be optionally set
>>> to INTERVAL_START. The "execution_date" column then remains unchanged, but
>>> the actual dag run time will be vary according to which execution plan was
>>> specified.
>>>
>>> Cheers
>>>
>>> On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <bi...@gmail.com>
>>> wrote:
>>>
>>>> I agree with Tomek.
>>>>
>>>> TBH, *Timetable *to me does seem to be a complex concept, and I can't
>>>> quite understand what it is at first sight.
>>>>
>>>> I think *Schedule *does convey the message better - consider
>>>> the sentence: "*the Scheduler arranges jobs to run based on some
>>>> _______**.*"  Here, *"schedules" *seem to fit better than
>>>> *"timetables"*
>>>>
>>>> As for search results on schedule vs scheduler, I wonder if
>>>> reorganizing the docs to have `schedule` in the top section and have
>>>> `scheduler` under operation::architecture will help with the search result?
>>>> (I don't know much about SEO)
>>>>
>>>> Nevertheless, the naming shouldn't be a blocker for this feature to
>>>> move forward.
>>>>
>>>> Best
>>>> Bin
>>>>
>>>> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <tu...@apache.org>
>>>> wrote:
>>>>
>>>>> > Timetable is a synonym of ’Schedule’
>>>>>
>>>>> I'm not a native speaker and I don't get it easily as a synonym.
>>>>>
>>>>> To be honest the "timetable" sounds like a complex piece of software.
>>>>> Personally I experienced that people use schedule and
>>>>> schedule_interval interchangeably. Additionally, schedule being more linked
>>>>> to scheduler imho is an advantage because it suggests some connection
>>>>> between these two.
>>>>>
>>>>> I feel that by introducing timetable we will bring yet more complexity
>>>>> to airflow vocabulary. And I personally would treat it as yet another
>>>>> moving part of an already complex system.
>>>>>
>>>>> I think we can move forward with this feature. We renamed "functional
>>>>> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get
>>>>> consensus we can always ask the users - they will use the feature.
>>>>>
>>>>> Best,
>>>>> Tomek
>>>>>
>>>>>
>>>>> On Fri, 12 Mar 2021 at 01:15, James Timmins
>>>>> <ja...@astronomer.io.invalid> wrote:
>>>>>
>>>>>> *Timetable vs Schedule*
>>>>>> Re Timetable. I agree that if this was a greenfield project, it might
>>>>>> make sense to use Schedule. But as it stands, we need to find the right
>>>>>> balance between the most specific name and being sufficiently unique that
>>>>>> it’s easy to work with in code and, perhaps most importantly, easy to find
>>>>>> when searching on Google and in the Airflow Docs.
>>>>>>
>>>>>> There are more than 10,000 references to `schedule*` in the Airflow
>>>>>> codebase. `schedule` and `scheduler` are also identical to most search
>>>>>> engines/libraries, since they have the same stem, `schedule`. This means
>>>>>> that when a user Googles `Airflow Schedule`, they will get back intermixed
>>>>>> results of the Schedule class and the Scheduler.
>>>>>>
>>>>>> Timetable is a synonym of ’Schedule’, so it passes the accuracy test,
>>>>>> won’t ever be ambiguous in code, and is distinct in search results.
>>>>>>
>>>>>> *Should "interval-less DAGs” have data_interval_start and end
>>>>>> available in the context?*
>>>>>> I think they should be present so it’s consistent across DAGs. Let’s
>>>>>> not make users think too hard about what values are available in what
>>>>>> context. What if someone sets the interval to 0? What if sometimes the
>>>>>> interval is 0, and sometimes it’s 1 hour? Rather than changing the rules
>>>>>> depending on usage, it’s easiest to have one rule that the users can depend
>>>>>> upon.
>>>>>>
>>>>>> *Re set_context_variables()*
>>>>>> What context is being defined here? The code comment says "Update or
>>>>>> set new context variables to become available in task templates and
>>>>>> operators.” The Timetable seems like the wrong place for variables that
>>>>>> will get passed into task templates/operators, unless this is actually a
>>>>>> way to pass Airflow macros into the Timetable context. In which case I
>>>>>> fully support this. If not, we may want to add this functionality.
>>>>>>
>>>>>> James
>>>>>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <ka...@gmail.com>,
>>>>>> wrote:
>>>>>>
>>>>>> Yup I have no strong opinions on either so happy to keep it TimeTable
>>>>>> or if there is another suggestion.
>>>>>>
>>>>>> Regards,
>>>>>> Kaxil
>>>>>>
>>>>>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins
>>>>>> <ja...@astronomer.io.invalid> wrote:
>>>>>>
>>>>>>> Respectfully, I strongly disagree with the renaming of Timetable to
>>>>>>> Schedule. Schedule and Scheduler aren't meaningfully different, which can
>>>>>>> lead to a lot of confusion. Even as a native English speaker, and someone
>>>>>>> who works on Airflow full time, I routinely need to ask for clarification
>>>>>>> about what schedule-related concept someone is referring to. I foresee
>>>>>>> Schedule and Scheduler as two distinct yet closely related concepts
>>>>>>> becoming a major source of confusion.
>>>>>>>
>>>>>>> If folks dislike Timetable, we could certainly change to something
>>>>>>> else, but let's not use something so similar to existing Airflow classes.
>>>>>>>
>>>>>>> -James
>>>>>>>
>>>>>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Summary of changes so far on the AIP:
>>>>>>>>
>>>>>>>> My proposed rename of DagRun.execution_date is now
>>>>>>>> DagRun.schedule_date (previously I had proposed run_date. Thanks dstandish!)
>>>>>>>>
>>>>>>>> Timetable classes are renamed to Schedule classes (CronSchedule
>>>>>>>> etc), similarly the DAG argument is now schedule (reminder:
>>>>>>>> schedule_interval will not be removed or deprecated, and will still be the
>>>>>>>> way to use "simple" expressions)
>>>>>>>>
>>>>>>>> -ash
>>>>>>>>
>>>>>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Could change Timetable To Schedule -- that would mean the DAG arg
>>>>>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current
>>>>>>>> `schedule_interval` but I think clear enough difference.
>>>>>>>>
>>>>>>>> I do like the name but my one worry with "schedule" is that
>>>>>>>> Scheduler and Schedule are very similar, and might be be confused with each
>>>>>>>> other for non-native English speakers? (I defer to others' judgment here,
>>>>>>>> as this is not something I can experience myself.)
>>>>>>>>
>>>>>>>> @Kevin Yang <yr...@gmail.com> @Daniel Standish
>>>>>>>> <dp...@gmail.com> any final input on this AIP?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Ash and all,
>>>>>>>>
>>>>>>>>
>>>>>>>> What do people think of this? Worth it? Too complex to reason about
>>>>>>>>> what context variables might exist as a result?
>>>>>>>>
>>>>>>>>
>>>>>>>> I think I wouldn't worry about it right now or maybe not as part of
>>>>>>>> this AIP. Currently, in one of the Github Issue, a user mentioned that it
>>>>>>>> is not straightforward to know what is inside the context dictionary-
>>>>>>>> https://github.com/apache/airflow/issues/14396. So maybe we can
>>>>>>>> tackle this issue separately once the AbstractTimetable is built.
>>>>>>>>
>>>>>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>>>>> proposal vs "DataTimetable") have data_interval_start and end available in
>>>>>>>>> the context?
>>>>>>>>
>>>>>>>>
>>>>>>>> hmm.. I would say No but then it contradicts my suggestion to
>>>>>>>> remove context dict from this AIP. If we are going to use it in scheduler
>>>>>>>> then yes, where data_interval_start = data_interval_end from CronTimetable.
>>>>>>>>
>>>>>>>> Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>> DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>>>> up until release, so not important to get this correct *now*.)
>>>>>>>>
>>>>>>>>
>>>>>>>> No strong opinion here. Just an alternate suggestion can
>>>>>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>>>>>>
>>>>>>>>
>>>>>>>>> Should I try to roll AIP-30
>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
>>>>>>>>> to this, or should we make that a future addition? (My vote is for future
>>>>>>>>> addition)
>>>>>>>>
>>>>>>>>
>>>>>>>> I would vote for Future addition too.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Kaxil
>>>>>>>>
>>>>>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think, yes, AIP-35 or something like it would happily co-exist
>>>>>>>>> with this proposal.
>>>>>>>>>
>>>>>>>>> @Daniel <dp...@gmail.com> and I have been discussing this a
>>>>>>>>> bit on Slack, and one of the questions he asked was if the concept of
>>>>>>>>> data_interval should be moved from DagRun as James and I suggested down on
>>>>>>>>> to the individual task:
>>>>>>>>>
>>>>>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data
>>>>>>>>> to s3. suppose that yesterday 4 of them succeeded but one failed. today, 4
>>>>>>>>> of them should pull from yesterday. but the one that failed should pull
>>>>>>>>> from 2 days back. so even though these normally have the same interval,
>>>>>>>>> today they should not.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> My view on this is two fold: one, this should primarily be handled
>>>>>>>>> by retries on the task, and secondly, having different TaskIstances in the
>>>>>>>>> same DagRun  have different data intervals would be much harder to reason
>>>>>>>>> about/design the UI around, so for those reasons I still think interval
>>>>>>>>> should be a DagRun-level concept.
>>>>>>>>>
>>>>>>>>> (He has a stalled AIP-30 where he proposed something to address
>>>>>>>>> this kind of "watermark" case, which we might pick up next after this AIP
>>>>>>>>> is complete)
>>>>>>>>>
>>>>>>>>> One thing we might want to do is extend the interface of
>>>>>>>>> AbstractTimetable to be able to add/update parameters in the context dict,
>>>>>>>>> so the interface could become this:
>>>>>>>>>
>>>>>>>>> class AbstractTimetable(ABC):
>>>>>>>>>     @abstractmethod
>>>>>>>>>     def next_dagrun_info(
>>>>>>>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>>>>>>
>>>>>>>>>         session: Session,
>>>>>>>>>     ) -> Optional[DagRunInfo]:
>>>>>>>>>         """
>>>>>>>>>         Get information about the next DagRun of this dag after
>>>>>>>>> ``date_last_automated_dagrun`` -- the
>>>>>>>>>         execution date, and the earliest it could be scheduled
>>>>>>>>>
>>>>>>>>>         :param date_last_automated_dagrun: The
>>>>>>>>> max(execution_date) of existing
>>>>>>>>>             "automated" DagRuns for this dag (scheduled or
>>>>>>>>> backfill, but not
>>>>>>>>>             manual)
>>>>>>>>>         """
>>>>>>>>>
>>>>>>>>>     @abstractmethod
>>>>>>>>>     def set_context_variables(self, dagrun: DagRun, context: Dict[
>>>>>>>>> str, Any]) -> None:
>>>>>>>>>         """
>>>>>>>>>         Update or set new context variables to become available
>>>>>>>>> in task templates and operators.
>>>>>>>>>         """
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What do people think of this? Worth it? Too complex to reason
>>>>>>>>> about what context variables might exist as a result?
>>>>>>>>>
>>>>>>>>> *Outstanding question*:
>>>>>>>>>
>>>>>>>>>    - Should "interval-less DAGs" (ones using "CronTimetable" in
>>>>>>>>>    my proposal vs "DataTimetable") have data_interval_start and end available
>>>>>>>>>    in the context?
>>>>>>>>>    - Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>>    DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>>>>    up until release, so not important to get this correct *now*.)
>>>>>>>>>    - Should I try to roll AIP-30
>>>>>>>>>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>>>>>    in to this, or should we make that a future addition? (My vote is for
>>>>>>>>>    future addition)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I'd like to start voting on this AIP next week (probably on
>>>>>>>>> Tuesday) as I think this will be a powerful feature that eases confusing to
>>>>>>>>> new users.
>>>>>>>>>
>>>>>>>>> -Ash
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based
>>>>>>>>> Scheduling To Airflow"?
>>>>>>>>> I think streaming was also discussed there (though it wasn't
>>>>>>>>> really the use case).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>>>>>>>>>
>>>>>>>>> Hi Kevin,
>>>>>>>>>
>>>>>>>>> Interesting idea. My original idea was actually for "interval-less
>>>>>>>>> DAGs" (i.e. ones where it's just "run at this time") would not have
>>>>>>>>> data_interval_start or end, but (while drafting the AIP) we decided that it
>>>>>>>>> was probably "easier" if those values were always datetimes.
>>>>>>>>>
>>>>>>>>> That said, I think having the DB model have those values be
>>>>>>>>> nullable would future proof it without needing another migration to change
>>>>>>>>> it. Do you think this is worth doing now?
>>>>>>>>>
>>>>>>>>> I haven't (yet! It's on my list) spent any significant time
>>>>>>>>> thinking about how to make Airflow play nicely with streaming jobs. If
>>>>>>>>> anyone else has ideas here please share them
>>>>>>>>>
>>>>>>>>> -ash
>>>>>>>>>
>>>>>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Ash and James,
>>>>>>>>>
>>>>>>>>> This is an exciting move. What do you think about using this
>>>>>>>>> opportunity to extend Airflow's support to streaming like use cases? I.e
>>>>>>>>> DAGs/tasks that want to run forever like a service. For such use cases,
>>>>>>>>> schedule interval might not be meaningful, then do we want to make the date
>>>>>>>>> interval param optional to DagRun and task instances? That sounds like a
>>>>>>>>> pretty major change to the underlying model of Airflow, but this AIP is so
>>>>>>>>> far the best opportunity I saw that can level up Airflow's support for
>>>>>>>>> streaming/service use cases.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Kevin Y
>>>>>>>>>
>>>>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <
>>>>>>>>> dpstandish@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Very excited to see this proposal come through and love the
>>>>>>>>> direction this has gone.
>>>>>>>>>
>>>>>>>>> Couple comments...
>>>>>>>>>
>>>>>>>>> *Tree view / Data completeness view*
>>>>>>>>>
>>>>>>>>> When you design your tasks with the canonical idempotence pattern,
>>>>>>>>> the tree view shows you both data completeness and task execution history
>>>>>>>>> (success / failure etc).
>>>>>>>>>
>>>>>>>>> When you don't use that pattern (which is my general preference),
>>>>>>>>> tree view is only task execution history.
>>>>>>>>>
>>>>>>>>> This change has the potential to unlock a data completeness view
>>>>>>>>> for canonical tasks.  It's possible that the "data completeness view" can
>>>>>>>>> simply be the tree view.  I.e. somehow it can use these new classes to know
>>>>>>>>> what data was successfully filled and what data wasn't.
>>>>>>>>>
>>>>>>>>> To the extent we like the idea of either extending / plugging /
>>>>>>>>> modifying tree view, or adding a distinct data completeness view, we might
>>>>>>>>> want to anticipate the needs of that in this change.  And maybe no
>>>>>>>>> alteration to the proposal would be needed but just want to throw the idea
>>>>>>>>> out there.
>>>>>>>>>
>>>>>>>>> *Watermark workflow / incremental processing*
>>>>>>>>>
>>>>>>>>> A common pattern in data warehousing is pulling data incrementally
>>>>>>>>> from a source.
>>>>>>>>>
>>>>>>>>> A standard way to achieve this is at the start of the task, select
>>>>>>>>> max `updated_at` in source table and hold on to that value for a minute.
>>>>>>>>> This is your tentative new high watermark.
>>>>>>>>> Now it's time to pull your data.  If your task ran before, grab
>>>>>>>>> last high watermark.  If not, use initial load value.
>>>>>>>>> If successful, update high watermark.
>>>>>>>>>
>>>>>>>>> On my team we implemented this with a stateful tasks / stateful
>>>>>>>>> processes concept (there's a dormant draft AIP here
>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>>>>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>>>>>
>>>>>>>>> Again here, I don't have a specific suggestion at this moment.
>>>>>>>>> But I wanted to articulate this workflow because it is common and it wasn't
>>>>>>>>> immediately obvious to me in reading AIP-39 how I would use it to implement
>>>>>>>>> it.
>>>>>>>>>
>>>>>>>>> AIP-39 makes airflow more data-aware.  So if it can support this
>>>>>>>>> kind of workflow that's great.  @Ash Berlin-Taylor
>>>>>>>>> <as...@astronomer.io> do you have thoughts on how it might be
>>>>>>>>> compatible with this kind of thing as is?
>>>>>>>>>
>>>>>>>>> ---
>>>>>>>>>
>>>>>>>>> * The base operator is designed so that Subclasses only need to
>>>>>>>>> implement two methods:
>>>>>>>>>     - `get_high_watermark`: produce the tentative new high
>>>>>>>>> watermark
>>>>>>>>>     ' `watermark_execute`: analogous to implementing poke in a
>>>>>>>>> sensor, this is where your work is done. `execute` is left to the base
>>>>>>>>> class, and it orchestrates (1) getting last high watermark or inital load
>>>>>>>>> value and (2) updating new high watermark if job successful.
>>>>>>>>>
>>>>>>>>>
>>
>> --
>> +48 660 796 129
>>
>

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Kaxil Naik <ka...@gmail.com>.
And also the proposed items with Timetables are more "extensible" too --
Users can develop some classes for their own use and create a library for
reusing it.

Using arguments like you are proposing @malthe -- it can be difficult to
understand on all the "related" arguments to understand the scheduling /
schedule_interval.

On Thu, May 13, 2021 at 3:46 PM Jarek Potiuk <ja...@potiuk.com> wrote:

> I much more on Ash's proposal with this one. I think we do not want to
> optimize for a number of changes but instead we want to make sure what we
> come up with is an  easy and natural to use long-term solution. Even if it
> departs  a lot from what we have now, a few months (or maybe years) after
> it becomes mainstream nobody will remember the "old way" hopefully.
>
> The idea of "pythonic" pluggable schedule rather than "declarative" way
> goes perfectly in-line with the whole concept of Airflow where DAGs are
> defined as Python code rather than declaratively. So making schedules
> follow the same approach seems very natural for anyone who uses Airflow.
>
>
> J.
>
>
> On Thu, May 13, 2021 at 9:27 AM Malthe <mb...@gmail.com> wrote:
>
>> I'm a bit late to the discussion, but it might be interesting to explore
>> a more simple approach, cutting back on the number of changes.
>>
>> As a general remark, "execution_date" may be a slightly difficult concept
>> to grasp, but changing it now is perhaps counterproductive.
>>
>> From the AIP examples:
>>
>> 1. The MON-FRI problem could be handled using an optional keyword
>> argument "execution_interval" which defaults to `None` (meaning automatic –
>> this is the current behavior). But instead a `timedelta` could be provided,
>> i.e. `timedelta(days=1)`.
>>
>> 2. This could easily be supported
>> <https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in
>> croniter.
>>
>> 3. The trading days only (or business days, etc) problem is handled in
>> other systems using a calendar option. It might be that an optional keyword
>> argument "calendar" could be used to automatically skip days that are not
>> included in the calendar – the default calendar would include all days. If
>> `calendar` is some calendar object, `~calendar` could be the inverse,
>> allowing a DAG to be easily scheduled on holidays. A requirement to
>> schedule on the nth day of the calendar (e.g. 10th business day of the
>> month) could be implemented using a derived calendar
>> `calendar.nth_day_of_month(10)` which would further filter down the number
>> of included days based on an existing calendar.
>>
>> 4. The cron- vs data scheduling seems to come down to whether the dag run
>> is kicked off at the start of the period or immediately after. This could
>> be handled using an optional keyword argument "execution_plan" which
>> defaults to INTERVAL_END (the current behavior), but can be optionally set
>> to INTERVAL_START. The "execution_date" column then remains unchanged, but
>> the actual dag run time will be vary according to which execution plan was
>> specified.
>>
>> Cheers
>>
>> On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <bi...@gmail.com> wrote:
>>
>>> I agree with Tomek.
>>>
>>> TBH, *Timetable *to me does seem to be a complex concept, and I can't
>>> quite understand what it is at first sight.
>>>
>>> I think *Schedule *does convey the message better - consider
>>> the sentence: "*the Scheduler arranges jobs to run based on some
>>> _______**.*"  Here, *"schedules" *seem to fit better than *"timetables"*
>>>
>>> As for search results on schedule vs scheduler, I wonder if reorganizing
>>> the docs to have `schedule` in the top section and have `scheduler` under
>>> operation::architecture will help with the search result? (I don't know
>>> much about SEO)
>>>
>>> Nevertheless, the naming shouldn't be a blocker for this feature to move
>>> forward.
>>>
>>> Best
>>> Bin
>>>
>>> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <tu...@apache.org>
>>> wrote:
>>>
>>>> > Timetable is a synonym of ’Schedule’
>>>>
>>>> I'm not a native speaker and I don't get it easily as a synonym.
>>>>
>>>> To be honest the "timetable" sounds like a complex piece of software.
>>>> Personally I experienced that people use schedule and
>>>> schedule_interval interchangeably. Additionally, schedule being more linked
>>>> to scheduler imho is an advantage because it suggests some connection
>>>> between these two.
>>>>
>>>> I feel that by introducing timetable we will bring yet more complexity
>>>> to airflow vocabulary. And I personally would treat it as yet another
>>>> moving part of an already complex system.
>>>>
>>>> I think we can move forward with this feature. We renamed "functional
>>>> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get
>>>> consensus we can always ask the users - they will use the feature.
>>>>
>>>> Best,
>>>> Tomek
>>>>
>>>>
>>>> On Fri, 12 Mar 2021 at 01:15, James Timmins <ja...@astronomer.io.invalid>
>>>> wrote:
>>>>
>>>>> *Timetable vs Schedule*
>>>>> Re Timetable. I agree that if this was a greenfield project, it might
>>>>> make sense to use Schedule. But as it stands, we need to find the right
>>>>> balance between the most specific name and being sufficiently unique that
>>>>> it’s easy to work with in code and, perhaps most importantly, easy to find
>>>>> when searching on Google and in the Airflow Docs.
>>>>>
>>>>> There are more than 10,000 references to `schedule*` in the Airflow
>>>>> codebase. `schedule` and `scheduler` are also identical to most search
>>>>> engines/libraries, since they have the same stem, `schedule`. This means
>>>>> that when a user Googles `Airflow Schedule`, they will get back intermixed
>>>>> results of the Schedule class and the Scheduler.
>>>>>
>>>>> Timetable is a synonym of ’Schedule’, so it passes the accuracy test,
>>>>> won’t ever be ambiguous in code, and is distinct in search results.
>>>>>
>>>>> *Should "interval-less DAGs” have data_interval_start and end
>>>>> available in the context?*
>>>>> I think they should be present so it’s consistent across DAGs. Let’s
>>>>> not make users think too hard about what values are available in what
>>>>> context. What if someone sets the interval to 0? What if sometimes the
>>>>> interval is 0, and sometimes it’s 1 hour? Rather than changing the rules
>>>>> depending on usage, it’s easiest to have one rule that the users can depend
>>>>> upon.
>>>>>
>>>>> *Re set_context_variables()*
>>>>> What context is being defined here? The code comment says "Update or
>>>>> set new context variables to become available in task templates and
>>>>> operators.” The Timetable seems like the wrong place for variables that
>>>>> will get passed into task templates/operators, unless this is actually a
>>>>> way to pass Airflow macros into the Timetable context. In which case I
>>>>> fully support this. If not, we may want to add this functionality.
>>>>>
>>>>> James
>>>>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <ka...@gmail.com>,
>>>>> wrote:
>>>>>
>>>>> Yup I have no strong opinions on either so happy to keep it TimeTable
>>>>> or if there is another suggestion.
>>>>>
>>>>> Regards,
>>>>> Kaxil
>>>>>
>>>>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins
>>>>> <ja...@astronomer.io.invalid> wrote:
>>>>>
>>>>>> Respectfully, I strongly disagree with the renaming of Timetable to
>>>>>> Schedule. Schedule and Scheduler aren't meaningfully different, which can
>>>>>> lead to a lot of confusion. Even as a native English speaker, and someone
>>>>>> who works on Airflow full time, I routinely need to ask for clarification
>>>>>> about what schedule-related concept someone is referring to. I foresee
>>>>>> Schedule and Scheduler as two distinct yet closely related concepts
>>>>>> becoming a major source of confusion.
>>>>>>
>>>>>> If folks dislike Timetable, we could certainly change to something
>>>>>> else, but let's not use something so similar to existing Airflow classes.
>>>>>>
>>>>>> -James
>>>>>>
>>>>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Summary of changes so far on the AIP:
>>>>>>>
>>>>>>> My proposed rename of DagRun.execution_date is now
>>>>>>> DagRun.schedule_date (previously I had proposed run_date. Thanks dstandish!)
>>>>>>>
>>>>>>> Timetable classes are renamed to Schedule classes (CronSchedule
>>>>>>> etc), similarly the DAG argument is now schedule (reminder:
>>>>>>> schedule_interval will not be removed or deprecated, and will still be the
>>>>>>> way to use "simple" expressions)
>>>>>>>
>>>>>>> -ash
>>>>>>>
>>>>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Could change Timetable To Schedule -- that would mean the DAG arg
>>>>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current
>>>>>>> `schedule_interval` but I think clear enough difference.
>>>>>>>
>>>>>>> I do like the name but my one worry with "schedule" is that
>>>>>>> Scheduler and Schedule are very similar, and might be be confused with each
>>>>>>> other for non-native English speakers? (I defer to others' judgment here,
>>>>>>> as this is not something I can experience myself.)
>>>>>>>
>>>>>>> @Kevin Yang <yr...@gmail.com> @Daniel Standish
>>>>>>> <dp...@gmail.com> any final input on this AIP?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Ash and all,
>>>>>>>
>>>>>>>
>>>>>>> What do people think of this? Worth it? Too complex to reason about
>>>>>>>> what context variables might exist as a result?
>>>>>>>
>>>>>>>
>>>>>>> I think I wouldn't worry about it right now or maybe not as part of
>>>>>>> this AIP. Currently, in one of the Github Issue, a user mentioned that it
>>>>>>> is not straightforward to know what is inside the context dictionary-
>>>>>>> https://github.com/apache/airflow/issues/14396. So maybe we can
>>>>>>> tackle this issue separately once the AbstractTimetable is built.
>>>>>>>
>>>>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>>>> proposal vs "DataTimetable") have data_interval_start and end available in
>>>>>>>> the context?
>>>>>>>
>>>>>>>
>>>>>>> hmm.. I would say No but then it contradicts my suggestion to remove
>>>>>>> context dict from this AIP. If we are going to use it in scheduler then
>>>>>>> yes, where data_interval_start = data_interval_end from CronTimetable.
>>>>>>>
>>>>>>> Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>> DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>>> up until release, so not important to get this correct *now*.)
>>>>>>>
>>>>>>>
>>>>>>> No strong opinion here. Just an alternate suggestion can
>>>>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>>>>>
>>>>>>>
>>>>>>>> Should I try to roll AIP-30
>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
>>>>>>>> to this, or should we make that a future addition? (My vote is for future
>>>>>>>> addition)
>>>>>>>
>>>>>>>
>>>>>>> I would vote for Future addition too.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Kaxil
>>>>>>>
>>>>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I think, yes, AIP-35 or something like it would happily co-exist
>>>>>>>> with this proposal.
>>>>>>>>
>>>>>>>> @Daniel <dp...@gmail.com> and I have been discussing this a
>>>>>>>> bit on Slack, and one of the questions he asked was if the concept of
>>>>>>>> data_interval should be moved from DagRun as James and I suggested down on
>>>>>>>> to the individual task:
>>>>>>>>
>>>>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data
>>>>>>>> to s3. suppose that yesterday 4 of them succeeded but one failed. today, 4
>>>>>>>> of them should pull from yesterday. but the one that failed should pull
>>>>>>>> from 2 days back. so even though these normally have the same interval,
>>>>>>>> today they should not.
>>>>>>>>
>>>>>>>>
>>>>>>>> My view on this is two fold: one, this should primarily be handled
>>>>>>>> by retries on the task, and secondly, having different TaskIstances in the
>>>>>>>> same DagRun  have different data intervals would be much harder to reason
>>>>>>>> about/design the UI around, so for those reasons I still think interval
>>>>>>>> should be a DagRun-level concept.
>>>>>>>>
>>>>>>>> (He has a stalled AIP-30 where he proposed something to address
>>>>>>>> this kind of "watermark" case, which we might pick up next after this AIP
>>>>>>>> is complete)
>>>>>>>>
>>>>>>>> One thing we might want to do is extend the interface of
>>>>>>>> AbstractTimetable to be able to add/update parameters in the context dict,
>>>>>>>> so the interface could become this:
>>>>>>>>
>>>>>>>> class AbstractTimetable(ABC):
>>>>>>>>     @abstractmethod
>>>>>>>>     def next_dagrun_info(
>>>>>>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>>>>>
>>>>>>>>         session: Session,
>>>>>>>>     ) -> Optional[DagRunInfo]:
>>>>>>>>         """
>>>>>>>>         Get information about the next DagRun of this dag after
>>>>>>>> ``date_last_automated_dagrun`` -- the
>>>>>>>>         execution date, and the earliest it could be scheduled
>>>>>>>>
>>>>>>>>         :param date_last_automated_dagrun: The max(execution_date)
>>>>>>>> of existing
>>>>>>>>             "automated" DagRuns for this dag (scheduled or
>>>>>>>> backfill, but not
>>>>>>>>             manual)
>>>>>>>>         """
>>>>>>>>
>>>>>>>>     @abstractmethod
>>>>>>>>     def set_context_variables(self, dagrun: DagRun, context: Dict[
>>>>>>>> str, Any]) -> None:
>>>>>>>>         """
>>>>>>>>         Update or set new context variables to become available in
>>>>>>>> task templates and operators.
>>>>>>>>         """
>>>>>>>>
>>>>>>>>
>>>>>>>> What do people think of this? Worth it? Too complex to reason about
>>>>>>>> what context variables might exist as a result?
>>>>>>>>
>>>>>>>> *Outstanding question*:
>>>>>>>>
>>>>>>>>    - Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>>>>    proposal vs "DataTimetable") have data_interval_start and end available in
>>>>>>>>    the context?
>>>>>>>>    - Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>    DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>>>    up until release, so not important to get this correct *now*.)
>>>>>>>>    - Should I try to roll AIP-30
>>>>>>>>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>>>>    in to this, or should we make that a future addition? (My vote is for
>>>>>>>>    future addition)
>>>>>>>>
>>>>>>>>
>>>>>>>> I'd like to start voting on this AIP next week (probably on
>>>>>>>> Tuesday) as I think this will be a powerful feature that eases confusing to
>>>>>>>> new users.
>>>>>>>>
>>>>>>>> -Ash
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based
>>>>>>>> Scheduling To Airflow"?
>>>>>>>> I think streaming was also discussed there (though it wasn't really
>>>>>>>> the use case).
>>>>>>>>
>>>>>>>>
>>>>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>>>>>>>>
>>>>>>>> Hi Kevin,
>>>>>>>>
>>>>>>>> Interesting idea. My original idea was actually for "interval-less
>>>>>>>> DAGs" (i.e. ones where it's just "run at this time") would not have
>>>>>>>> data_interval_start or end, but (while drafting the AIP) we decided that it
>>>>>>>> was probably "easier" if those values were always datetimes.
>>>>>>>>
>>>>>>>> That said, I think having the DB model have those values be
>>>>>>>> nullable would future proof it without needing another migration to change
>>>>>>>> it. Do you think this is worth doing now?
>>>>>>>>
>>>>>>>> I haven't (yet! It's on my list) spent any significant time
>>>>>>>> thinking about how to make Airflow play nicely with streaming jobs. If
>>>>>>>> anyone else has ideas here please share them
>>>>>>>>
>>>>>>>> -ash
>>>>>>>>
>>>>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Ash and James,
>>>>>>>>
>>>>>>>> This is an exciting move. What do you think about using this
>>>>>>>> opportunity to extend Airflow's support to streaming like use cases? I.e
>>>>>>>> DAGs/tasks that want to run forever like a service. For such use cases,
>>>>>>>> schedule interval might not be meaningful, then do we want to make the date
>>>>>>>> interval param optional to DagRun and task instances? That sounds like a
>>>>>>>> pretty major change to the underlying model of Airflow, but this AIP is so
>>>>>>>> far the best opportunity I saw that can level up Airflow's support for
>>>>>>>> streaming/service use cases.
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Kevin Y
>>>>>>>>
>>>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <
>>>>>>>> dpstandish@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Very excited to see this proposal come through and love the
>>>>>>>> direction this has gone.
>>>>>>>>
>>>>>>>> Couple comments...
>>>>>>>>
>>>>>>>> *Tree view / Data completeness view*
>>>>>>>>
>>>>>>>> When you design your tasks with the canonical idempotence pattern,
>>>>>>>> the tree view shows you both data completeness and task execution history
>>>>>>>> (success / failure etc).
>>>>>>>>
>>>>>>>> When you don't use that pattern (which is my general preference),
>>>>>>>> tree view is only task execution history.
>>>>>>>>
>>>>>>>> This change has the potential to unlock a data completeness view
>>>>>>>> for canonical tasks.  It's possible that the "data completeness view" can
>>>>>>>> simply be the tree view.  I.e. somehow it can use these new classes to know
>>>>>>>> what data was successfully filled and what data wasn't.
>>>>>>>>
>>>>>>>> To the extent we like the idea of either extending / plugging /
>>>>>>>> modifying tree view, or adding a distinct data completeness view, we might
>>>>>>>> want to anticipate the needs of that in this change.  And maybe no
>>>>>>>> alteration to the proposal would be needed but just want to throw the idea
>>>>>>>> out there.
>>>>>>>>
>>>>>>>> *Watermark workflow / incremental processing*
>>>>>>>>
>>>>>>>> A common pattern in data warehousing is pulling data incrementally
>>>>>>>> from a source.
>>>>>>>>
>>>>>>>> A standard way to achieve this is at the start of the task, select
>>>>>>>> max `updated_at` in source table and hold on to that value for a minute.
>>>>>>>> This is your tentative new high watermark.
>>>>>>>> Now it's time to pull your data.  If your task ran before, grab
>>>>>>>> last high watermark.  If not, use initial load value.
>>>>>>>> If successful, update high watermark.
>>>>>>>>
>>>>>>>> On my team we implemented this with a stateful tasks / stateful
>>>>>>>> processes concept (there's a dormant draft AIP here
>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>>>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>>>>
>>>>>>>> Again here, I don't have a specific suggestion at this moment.  But
>>>>>>>> I wanted to articulate this workflow because it is common and it wasn't
>>>>>>>> immediately obvious to me in reading AIP-39 how I would use it to implement
>>>>>>>> it.
>>>>>>>>
>>>>>>>> AIP-39 makes airflow more data-aware.  So if it can support this
>>>>>>>> kind of workflow that's great.  @Ash Berlin-Taylor
>>>>>>>> <as...@astronomer.io> do you have thoughts on how it might be
>>>>>>>> compatible with this kind of thing as is?
>>>>>>>>
>>>>>>>> ---
>>>>>>>>
>>>>>>>> * The base operator is designed so that Subclasses only need to
>>>>>>>> implement two methods:
>>>>>>>>     - `get_high_watermark`: produce the tentative new high watermark
>>>>>>>>     ' `watermark_execute`: analogous to implementing poke in a
>>>>>>>> sensor, this is where your work is done. `execute` is left to the base
>>>>>>>> class, and it orchestrates (1) getting last high watermark or inital load
>>>>>>>> value and (2) updating new high watermark if job successful.
>>>>>>>>
>>>>>>>>
>
> --
> +48 660 796 129
>

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Jarek Potiuk <ja...@potiuk.com>.
I much more on Ash's proposal with this one. I think we do not want to
optimize for a number of changes but instead we want to make sure what we
come up with is an  easy and natural to use long-term solution. Even if it
departs  a lot from what we have now, a few months (or maybe years) after
it becomes mainstream nobody will remember the "old way" hopefully.

The idea of "pythonic" pluggable schedule rather than "declarative" way
goes perfectly in-line with the whole concept of Airflow where DAGs are
defined as Python code rather than declaratively. So making schedules
follow the same approach seems very natural for anyone who uses Airflow.


J.


On Thu, May 13, 2021 at 9:27 AM Malthe <mb...@gmail.com> wrote:

> I'm a bit late to the discussion, but it might be interesting to explore a
> more simple approach, cutting back on the number of changes.
>
> As a general remark, "execution_date" may be a slightly difficult concept
> to grasp, but changing it now is perhaps counterproductive.
>
> From the AIP examples:
>
> 1. The MON-FRI problem could be handled using an optional keyword argument
> "execution_interval" which defaults to `None` (meaning automatic – this is
> the current behavior). But instead a `timedelta` could be provided, i.e.
> `timedelta(days=1)`.
>
> 2. This could easily be supported
> <https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in
> croniter.
>
> 3. The trading days only (or business days, etc) problem is handled in
> other systems using a calendar option. It might be that an optional keyword
> argument "calendar" could be used to automatically skip days that are not
> included in the calendar – the default calendar would include all days. If
> `calendar` is some calendar object, `~calendar` could be the inverse,
> allowing a DAG to be easily scheduled on holidays. A requirement to
> schedule on the nth day of the calendar (e.g. 10th business day of the
> month) could be implemented using a derived calendar
> `calendar.nth_day_of_month(10)` which would further filter down the number
> of included days based on an existing calendar.
>
> 4. The cron- vs data scheduling seems to come down to whether the dag run
> is kicked off at the start of the period or immediately after. This could
> be handled using an optional keyword argument "execution_plan" which
> defaults to INTERVAL_END (the current behavior), but can be optionally set
> to INTERVAL_START. The "execution_date" column then remains unchanged, but
> the actual dag run time will be vary according to which execution plan was
> specified.
>
> Cheers
>
> On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <bi...@gmail.com> wrote:
>
>> I agree with Tomek.
>>
>> TBH, *Timetable *to me does seem to be a complex concept, and I can't
>> quite understand what it is at first sight.
>>
>> I think *Schedule *does convey the message better - consider
>> the sentence: "*the Scheduler arranges jobs to run based on some _______*
>> *.*"  Here, *"schedules" *seem to fit better than *"timetables"*
>>
>> As for search results on schedule vs scheduler, I wonder if reorganizing
>> the docs to have `schedule` in the top section and have `scheduler` under
>> operation::architecture will help with the search result? (I don't know
>> much about SEO)
>>
>> Nevertheless, the naming shouldn't be a blocker for this feature to move
>> forward.
>>
>> Best
>> Bin
>>
>> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <tu...@apache.org>
>> wrote:
>>
>>> > Timetable is a synonym of ’Schedule’
>>>
>>> I'm not a native speaker and I don't get it easily as a synonym.
>>>
>>> To be honest the "timetable" sounds like a complex piece of software.
>>> Personally I experienced that people use schedule and
>>> schedule_interval interchangeably. Additionally, schedule being more linked
>>> to scheduler imho is an advantage because it suggests some connection
>>> between these two.
>>>
>>> I feel that by introducing timetable we will bring yet more complexity
>>> to airflow vocabulary. And I personally would treat it as yet another
>>> moving part of an already complex system.
>>>
>>> I think we can move forward with this feature. We renamed "functional
>>> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get
>>> consensus we can always ask the users - they will use the feature.
>>>
>>> Best,
>>> Tomek
>>>
>>>
>>> On Fri, 12 Mar 2021 at 01:15, James Timmins <ja...@astronomer.io.invalid>
>>> wrote:
>>>
>>>> *Timetable vs Schedule*
>>>> Re Timetable. I agree that if this was a greenfield project, it might
>>>> make sense to use Schedule. But as it stands, we need to find the right
>>>> balance between the most specific name and being sufficiently unique that
>>>> it’s easy to work with in code and, perhaps most importantly, easy to find
>>>> when searching on Google and in the Airflow Docs.
>>>>
>>>> There are more than 10,000 references to `schedule*` in the Airflow
>>>> codebase. `schedule` and `scheduler` are also identical to most search
>>>> engines/libraries, since they have the same stem, `schedule`. This means
>>>> that when a user Googles `Airflow Schedule`, they will get back intermixed
>>>> results of the Schedule class and the Scheduler.
>>>>
>>>> Timetable is a synonym of ’Schedule’, so it passes the accuracy test,
>>>> won’t ever be ambiguous in code, and is distinct in search results.
>>>>
>>>> *Should "interval-less DAGs” have data_interval_start and end available
>>>> in the context?*
>>>> I think they should be present so it’s consistent across DAGs. Let’s
>>>> not make users think too hard about what values are available in what
>>>> context. What if someone sets the interval to 0? What if sometimes the
>>>> interval is 0, and sometimes it’s 1 hour? Rather than changing the rules
>>>> depending on usage, it’s easiest to have one rule that the users can depend
>>>> upon.
>>>>
>>>> *Re set_context_variables()*
>>>> What context is being defined here? The code comment says "Update or
>>>> set new context variables to become available in task templates and
>>>> operators.” The Timetable seems like the wrong place for variables that
>>>> will get passed into task templates/operators, unless this is actually a
>>>> way to pass Airflow macros into the Timetable context. In which case I
>>>> fully support this. If not, we may want to add this functionality.
>>>>
>>>> James
>>>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <ka...@gmail.com>,
>>>> wrote:
>>>>
>>>> Yup I have no strong opinions on either so happy to keep it TimeTable
>>>> or if there is another suggestion.
>>>>
>>>> Regards,
>>>> Kaxil
>>>>
>>>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins
>>>> <ja...@astronomer.io.invalid> wrote:
>>>>
>>>>> Respectfully, I strongly disagree with the renaming of Timetable to
>>>>> Schedule. Schedule and Scheduler aren't meaningfully different, which can
>>>>> lead to a lot of confusion. Even as a native English speaker, and someone
>>>>> who works on Airflow full time, I routinely need to ask for clarification
>>>>> about what schedule-related concept someone is referring to. I foresee
>>>>> Schedule and Scheduler as two distinct yet closely related concepts
>>>>> becoming a major source of confusion.
>>>>>
>>>>> If folks dislike Timetable, we could certainly change to something
>>>>> else, but let's not use something so similar to existing Airflow classes.
>>>>>
>>>>> -James
>>>>>
>>>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Summary of changes so far on the AIP:
>>>>>>
>>>>>> My proposed rename of DagRun.execution_date is now
>>>>>> DagRun.schedule_date (previously I had proposed run_date. Thanks dstandish!)
>>>>>>
>>>>>> Timetable classes are renamed to Schedule classes (CronSchedule etc),
>>>>>> similarly the DAG argument is now schedule (reminder: schedule_interval
>>>>>> will not be removed or deprecated, and will still be the way to use
>>>>>> "simple" expressions)
>>>>>>
>>>>>> -ash
>>>>>>
>>>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> Could change Timetable To Schedule -- that would mean the DAG arg
>>>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current
>>>>>> `schedule_interval` but I think clear enough difference.
>>>>>>
>>>>>> I do like the name but my one worry with "schedule" is that Scheduler
>>>>>> and Schedule are very similar, and might be be confused with each other for
>>>>>> non-native English speakers? (I defer to others' judgment here, as this is
>>>>>> not something I can experience myself.)
>>>>>>
>>>>>> @Kevin Yang <yr...@gmail.com> @Daniel Standish
>>>>>> <dp...@gmail.com> any final input on this AIP?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Ash and all,
>>>>>>
>>>>>>
>>>>>> What do people think of this? Worth it? Too complex to reason about
>>>>>>> what context variables might exist as a result?
>>>>>>
>>>>>>
>>>>>> I think I wouldn't worry about it right now or maybe not as part of
>>>>>> this AIP. Currently, in one of the Github Issue, a user mentioned that it
>>>>>> is not straightforward to know what is inside the context dictionary-
>>>>>> https://github.com/apache/airflow/issues/14396. So maybe we can
>>>>>> tackle this issue separately once the AbstractTimetable is built.
>>>>>>
>>>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>>> proposal vs "DataTimetable") have data_interval_start and end available in
>>>>>>> the context?
>>>>>>
>>>>>>
>>>>>> hmm.. I would say No but then it contradicts my suggestion to remove
>>>>>> context dict from this AIP. If we are going to use it in scheduler then
>>>>>> yes, where data_interval_start = data_interval_end from CronTimetable.
>>>>>>
>>>>>> Does anyone have any better names than TimeDeltaTimetable,
>>>>>>> DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>> up until release, so not important to get this correct *now*.)
>>>>>>
>>>>>>
>>>>>> No strong opinion here. Just an alternate suggestion can
>>>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>>>>
>>>>>>
>>>>>>> Should I try to roll AIP-30
>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
>>>>>>> to this, or should we make that a future addition? (My vote is for future
>>>>>>> addition)
>>>>>>
>>>>>>
>>>>>> I would vote for Future addition too.
>>>>>>
>>>>>> Regards,
>>>>>> Kaxil
>>>>>>
>>>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I think, yes, AIP-35 or something like it would happily co-exist
>>>>>>> with this proposal.
>>>>>>>
>>>>>>> @Daniel <dp...@gmail.com> and I have been discussing this a
>>>>>>> bit on Slack, and one of the questions he asked was if the concept of
>>>>>>> data_interval should be moved from DagRun as James and I suggested down on
>>>>>>> to the individual task:
>>>>>>>
>>>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data to
>>>>>>> s3. suppose that yesterday 4 of them succeeded but one failed. today, 4 of
>>>>>>> them should pull from yesterday. but the one that failed should pull from 2
>>>>>>> days back. so even though these normally have the same interval, today they
>>>>>>> should not.
>>>>>>>
>>>>>>>
>>>>>>> My view on this is two fold: one, this should primarily be handled
>>>>>>> by retries on the task, and secondly, having different TaskIstances in the
>>>>>>> same DagRun  have different data intervals would be much harder to reason
>>>>>>> about/design the UI around, so for those reasons I still think interval
>>>>>>> should be a DagRun-level concept.
>>>>>>>
>>>>>>> (He has a stalled AIP-30 where he proposed something to address this
>>>>>>> kind of "watermark" case, which we might pick up next after this AIP is
>>>>>>> complete)
>>>>>>>
>>>>>>> One thing we might want to do is extend the interface of
>>>>>>> AbstractTimetable to be able to add/update parameters in the context dict,
>>>>>>> so the interface could become this:
>>>>>>>
>>>>>>> class AbstractTimetable(ABC):
>>>>>>>     @abstractmethod
>>>>>>>     def next_dagrun_info(
>>>>>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>>>>
>>>>>>>         session: Session,
>>>>>>>     ) -> Optional[DagRunInfo]:
>>>>>>>         """
>>>>>>>         Get information about the next DagRun of this dag after
>>>>>>> ``date_last_automated_dagrun`` -- the
>>>>>>>         execution date, and the earliest it could be scheduled
>>>>>>>
>>>>>>>         :param date_last_automated_dagrun: The max(execution_date)
>>>>>>> of existing
>>>>>>>             "automated" DagRuns for this dag (scheduled or
>>>>>>> backfill, but not
>>>>>>>             manual)
>>>>>>>         """
>>>>>>>
>>>>>>>     @abstractmethod
>>>>>>>     def set_context_variables(self, dagrun: DagRun, context: Dict[
>>>>>>> str, Any]) -> None:
>>>>>>>         """
>>>>>>>         Update or set new context variables to become available in
>>>>>>> task templates and operators.
>>>>>>>         """
>>>>>>>
>>>>>>>
>>>>>>> What do people think of this? Worth it? Too complex to reason about
>>>>>>> what context variables might exist as a result?
>>>>>>>
>>>>>>> *Outstanding question*:
>>>>>>>
>>>>>>>    - Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>>>    proposal vs "DataTimetable") have data_interval_start and end available in
>>>>>>>    the context?
>>>>>>>    - Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>    DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>>    up until release, so not important to get this correct *now*.)
>>>>>>>    - Should I try to roll AIP-30
>>>>>>>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>>>    in to this, or should we make that a future addition? (My vote is for
>>>>>>>    future addition)
>>>>>>>
>>>>>>>
>>>>>>> I'd like to start voting on this AIP next week (probably on Tuesday)
>>>>>>> as I think this will be a powerful feature that eases confusing to new
>>>>>>> users.
>>>>>>>
>>>>>>> -Ash
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based
>>>>>>> Scheduling To Airflow"?
>>>>>>> I think streaming was also discussed there (though it wasn't really
>>>>>>> the use case).
>>>>>>>
>>>>>>>
>>>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>>>>>>>
>>>>>>> Hi Kevin,
>>>>>>>
>>>>>>> Interesting idea. My original idea was actually for "interval-less
>>>>>>> DAGs" (i.e. ones where it's just "run at this time") would not have
>>>>>>> data_interval_start or end, but (while drafting the AIP) we decided that it
>>>>>>> was probably "easier" if those values were always datetimes.
>>>>>>>
>>>>>>> That said, I think having the DB model have those values be nullable
>>>>>>> would future proof it without needing another migration to change it. Do
>>>>>>> you think this is worth doing now?
>>>>>>>
>>>>>>> I haven't (yet! It's on my list) spent any significant time thinking
>>>>>>> about how to make Airflow play nicely with streaming jobs. If anyone else
>>>>>>> has ideas here please share them
>>>>>>>
>>>>>>> -ash
>>>>>>>
>>>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Ash and James,
>>>>>>>
>>>>>>> This is an exciting move. What do you think about using this
>>>>>>> opportunity to extend Airflow's support to streaming like use cases? I.e
>>>>>>> DAGs/tasks that want to run forever like a service. For such use cases,
>>>>>>> schedule interval might not be meaningful, then do we want to make the date
>>>>>>> interval param optional to DagRun and task instances? That sounds like a
>>>>>>> pretty major change to the underlying model of Airflow, but this AIP is so
>>>>>>> far the best opportunity I saw that can level up Airflow's support for
>>>>>>> streaming/service use cases.
>>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Kevin Y
>>>>>>>
>>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <
>>>>>>> dpstandish@gmail.com> wrote:
>>>>>>>
>>>>>>> Very excited to see this proposal come through and love the
>>>>>>> direction this has gone.
>>>>>>>
>>>>>>> Couple comments...
>>>>>>>
>>>>>>> *Tree view / Data completeness view*
>>>>>>>
>>>>>>> When you design your tasks with the canonical idempotence pattern,
>>>>>>> the tree view shows you both data completeness and task execution history
>>>>>>> (success / failure etc).
>>>>>>>
>>>>>>> When you don't use that pattern (which is my general preference),
>>>>>>> tree view is only task execution history.
>>>>>>>
>>>>>>> This change has the potential to unlock a data completeness view for
>>>>>>> canonical tasks.  It's possible that the "data completeness view" can
>>>>>>> simply be the tree view.  I.e. somehow it can use these new classes to know
>>>>>>> what data was successfully filled and what data wasn't.
>>>>>>>
>>>>>>> To the extent we like the idea of either extending / plugging /
>>>>>>> modifying tree view, or adding a distinct data completeness view, we might
>>>>>>> want to anticipate the needs of that in this change.  And maybe no
>>>>>>> alteration to the proposal would be needed but just want to throw the idea
>>>>>>> out there.
>>>>>>>
>>>>>>> *Watermark workflow / incremental processing*
>>>>>>>
>>>>>>> A common pattern in data warehousing is pulling data incrementally
>>>>>>> from a source.
>>>>>>>
>>>>>>> A standard way to achieve this is at the start of the task, select
>>>>>>> max `updated_at` in source table and hold on to that value for a minute.
>>>>>>> This is your tentative new high watermark.
>>>>>>> Now it's time to pull your data.  If your task ran before, grab last
>>>>>>> high watermark.  If not, use initial load value.
>>>>>>> If successful, update high watermark.
>>>>>>>
>>>>>>> On my team we implemented this with a stateful tasks / stateful
>>>>>>> processes concept (there's a dormant draft AIP here
>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>>>
>>>>>>> Again here, I don't have a specific suggestion at this moment.  But
>>>>>>> I wanted to articulate this workflow because it is common and it wasn't
>>>>>>> immediately obvious to me in reading AIP-39 how I would use it to implement
>>>>>>> it.
>>>>>>>
>>>>>>> AIP-39 makes airflow more data-aware.  So if it can support this
>>>>>>> kind of workflow that's great.  @Ash Berlin-Taylor
>>>>>>> <as...@astronomer.io> do you have thoughts on how it might be
>>>>>>> compatible with this kind of thing as is?
>>>>>>>
>>>>>>> ---
>>>>>>>
>>>>>>> * The base operator is designed so that Subclasses only need to
>>>>>>> implement two methods:
>>>>>>>     - `get_high_watermark`: produce the tentative new high watermark
>>>>>>>     ' `watermark_execute`: analogous to implementing poke in a
>>>>>>> sensor, this is where your work is done. `execute` is left to the base
>>>>>>> class, and it orchestrates (1) getting last high watermark or inital load
>>>>>>> value and (2) updating new high watermark if job successful.
>>>>>>>
>>>>>>>

-- 
+48 660 796 129

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Malthe <mb...@gmail.com>.
I'm a bit late to the discussion, but it might be interesting to explore a
more simple approach, cutting back on the number of changes.

As a general remark, "execution_date" may be a slightly difficult concept
to grasp, but changing it now is perhaps counterproductive.

From the AIP examples:

1. The MON-FRI problem could be handled using an optional keyword argument
"execution_interval" which defaults to `None` (meaning automatic – this is
the current behavior). But instead a `timedelta` could be provided, i.e.
`timedelta(days=1)`.

2. This could easily be supported
<https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in
croniter.

3. The trading days only (or business days, etc) problem is handled in
other systems using a calendar option. It might be that an optional keyword
argument "calendar" could be used to automatically skip days that are not
included in the calendar – the default calendar would include all days. If
`calendar` is some calendar object, `~calendar` could be the inverse,
allowing a DAG to be easily scheduled on holidays. A requirement to
schedule on the nth day of the calendar (e.g. 10th business day of the
month) could be implemented using a derived calendar
`calendar.nth_day_of_month(10)` which would further filter down the number
of included days based on an existing calendar.

4. The cron- vs data scheduling seems to come down to whether the dag run
is kicked off at the start of the period or immediately after. This could
be handled using an optional keyword argument "execution_plan" which
defaults to INTERVAL_END (the current behavior), but can be optionally set
to INTERVAL_START. The "execution_date" column then remains unchanged, but
the actual dag run time will be vary according to which execution plan was
specified.

Cheers

On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <bi...@gmail.com> wrote:

> I agree with Tomek.
>
> TBH, *Timetable *to me does seem to be a complex concept, and I can't
> quite understand what it is at first sight.
>
> I think *Schedule *does convey the message better - consider
> the sentence: "*the Scheduler arranges jobs to run based on some _______*
> *.*"  Here, *"schedules" *seem to fit better than *"timetables"*
>
> As for search results on schedule vs scheduler, I wonder if reorganizing
> the docs to have `schedule` in the top section and have `scheduler` under
> operation::architecture will help with the search result? (I don't know
> much about SEO)
>
> Nevertheless, the naming shouldn't be a blocker for this feature to move
> forward.
>
> Best
> Bin
>
> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <tu...@apache.org>
> wrote:
>
>> > Timetable is a synonym of ’Schedule’
>>
>> I'm not a native speaker and I don't get it easily as a synonym.
>>
>> To be honest the "timetable" sounds like a complex piece of software.
>> Personally I experienced that people use schedule and
>> schedule_interval interchangeably. Additionally, schedule being more linked
>> to scheduler imho is an advantage because it suggests some connection
>> between these two.
>>
>> I feel that by introducing timetable we will bring yet more complexity to
>> airflow vocabulary. And I personally would treat it as yet another moving
>> part of an already complex system.
>>
>> I think we can move forward with this feature. We renamed "functional
>> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get
>> consensus we can always ask the users - they will use the feature.
>>
>> Best,
>> Tomek
>>
>>
>> On Fri, 12 Mar 2021 at 01:15, James Timmins <ja...@astronomer.io.invalid>
>> wrote:
>>
>>> *Timetable vs Schedule*
>>> Re Timetable. I agree that if this was a greenfield project, it might
>>> make sense to use Schedule. But as it stands, we need to find the right
>>> balance between the most specific name and being sufficiently unique that
>>> it’s easy to work with in code and, perhaps most importantly, easy to find
>>> when searching on Google and in the Airflow Docs.
>>>
>>> There are more than 10,000 references to `schedule*` in the Airflow
>>> codebase. `schedule` and `scheduler` are also identical to most search
>>> engines/libraries, since they have the same stem, `schedule`. This means
>>> that when a user Googles `Airflow Schedule`, they will get back intermixed
>>> results of the Schedule class and the Scheduler.
>>>
>>> Timetable is a synonym of ’Schedule’, so it passes the accuracy test,
>>> won’t ever be ambiguous in code, and is distinct in search results.
>>>
>>> *Should "interval-less DAGs” have data_interval_start and end available
>>> in the context?*
>>> I think they should be present so it’s consistent across DAGs. Let’s not
>>> make users think too hard about what values are available in what context.
>>> What if someone sets the interval to 0? What if sometimes the interval is
>>> 0, and sometimes it’s 1 hour? Rather than changing the rules depending on
>>> usage, it’s easiest to have one rule that the users can depend upon.
>>>
>>> *Re set_context_variables()*
>>> What context is being defined here? The code comment says "Update or set
>>> new context variables to become available in task templates and operators.”
>>> The Timetable seems like the wrong place for variables that will get passed
>>> into task templates/operators, unless this is actually a way to pass
>>> Airflow macros into the Timetable context. In which case I fully support
>>> this. If not, we may want to add this functionality.
>>>
>>> James
>>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <ka...@gmail.com>, wrote:
>>>
>>> Yup I have no strong opinions on either so happy to keep it TimeTable or
>>> if there is another suggestion.
>>>
>>> Regards,
>>> Kaxil
>>>
>>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins
>>> <ja...@astronomer.io.invalid> wrote:
>>>
>>>> Respectfully, I strongly disagree with the renaming of Timetable to
>>>> Schedule. Schedule and Scheduler aren't meaningfully different, which can
>>>> lead to a lot of confusion. Even as a native English speaker, and someone
>>>> who works on Airflow full time, I routinely need to ask for clarification
>>>> about what schedule-related concept someone is referring to. I foresee
>>>> Schedule and Scheduler as two distinct yet closely related concepts
>>>> becoming a major source of confusion.
>>>>
>>>> If folks dislike Timetable, we could certainly change to something
>>>> else, but let's not use something so similar to existing Airflow classes.
>>>>
>>>> -James
>>>>
>>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org>
>>>> wrote:
>>>>
>>>>> Summary of changes so far on the AIP:
>>>>>
>>>>> My proposed rename of DagRun.execution_date is now
>>>>> DagRun.schedule_date (previously I had proposed run_date. Thanks dstandish!)
>>>>>
>>>>> Timetable classes are renamed to Schedule classes (CronSchedule etc),
>>>>> similarly the DAG argument is now schedule (reminder: schedule_interval
>>>>> will not be removed or deprecated, and will still be the way to use
>>>>> "simple" expressions)
>>>>>
>>>>> -ash
>>>>>
>>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org>
>>>>> wrote:
>>>>>
>>>>> Could change Timetable To Schedule -- that would mean the DAG arg
>>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current
>>>>> `schedule_interval` but I think clear enough difference.
>>>>>
>>>>> I do like the name but my one worry with "schedule" is that Scheduler
>>>>> and Schedule are very similar, and might be be confused with each other for
>>>>> non-native English speakers? (I defer to others' judgment here, as this is
>>>>> not something I can experience myself.)
>>>>>
>>>>> @Kevin Yang <yr...@gmail.com> @Daniel Standish
>>>>> <dp...@gmail.com> any final input on this AIP?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com> wrote:
>>>>>
>>>>> Hi Ash and all,
>>>>>
>>>>>
>>>>> What do people think of this? Worth it? Too complex to reason about
>>>>>> what context variables might exist as a result?
>>>>>
>>>>>
>>>>> I think I wouldn't worry about it right now or maybe not as part of
>>>>> this AIP. Currently, in one of the Github Issue, a user mentioned that it
>>>>> is not straightforward to know what is inside the context dictionary-
>>>>> https://github.com/apache/airflow/issues/14396. So maybe we can
>>>>> tackle this issue separately once the AbstractTimetable is built.
>>>>>
>>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my proposal
>>>>>> vs "DataTimetable") have data_interval_start and end available in the
>>>>>> context?
>>>>>
>>>>>
>>>>> hmm.. I would say No but then it contradicts my suggestion to remove
>>>>> context dict from this AIP. If we are going to use it in scheduler then
>>>>> yes, where data_interval_start = data_interval_end from CronTimetable.
>>>>>
>>>>> Does anyone have any better names than TimeDeltaTimetable,
>>>>>> DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>> up until release, so not important to get this correct *now*.)
>>>>>
>>>>>
>>>>> No strong opinion here. Just an alternate suggestion can
>>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>>>
>>>>>
>>>>>> Should I try to roll AIP-30
>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
>>>>>> to this, or should we make that a future addition? (My vote is for future
>>>>>> addition)
>>>>>
>>>>>
>>>>> I would vote for Future addition too.
>>>>>
>>>>> Regards,
>>>>> Kaxil
>>>>>
>>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I think, yes, AIP-35 or something like it would happily co-exist with
>>>>>> this proposal.
>>>>>>
>>>>>> @Daniel <dp...@gmail.com> and I have been discussing this a bit
>>>>>> on Slack, and one of the questions he asked was if the concept of
>>>>>> data_interval should be moved from DagRun as James and I suggested down on
>>>>>> to the individual task:
>>>>>>
>>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data to
>>>>>> s3. suppose that yesterday 4 of them succeeded but one failed. today, 4 of
>>>>>> them should pull from yesterday. but the one that failed should pull from 2
>>>>>> days back. so even though these normally have the same interval, today they
>>>>>> should not.
>>>>>>
>>>>>>
>>>>>> My view on this is two fold: one, this should primarily be handled by
>>>>>> retries on the task, and secondly, having different TaskIstances in the
>>>>>> same DagRun  have different data intervals would be much harder to reason
>>>>>> about/design the UI around, so for those reasons I still think interval
>>>>>> should be a DagRun-level concept.
>>>>>>
>>>>>> (He has a stalled AIP-30 where he proposed something to address this
>>>>>> kind of "watermark" case, which we might pick up next after this AIP is
>>>>>> complete)
>>>>>>
>>>>>> One thing we might want to do is extend the interface of
>>>>>> AbstractTimetable to be able to add/update parameters in the context dict,
>>>>>> so the interface could become this:
>>>>>>
>>>>>> class AbstractTimetable(ABC):
>>>>>>     @abstractmethod
>>>>>>     def next_dagrun_info(
>>>>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>>>
>>>>>>         session: Session,
>>>>>>     ) -> Optional[DagRunInfo]:
>>>>>>         """
>>>>>>         Get information about the next DagRun of this dag after
>>>>>> ``date_last_automated_dagrun`` -- the
>>>>>>         execution date, and the earliest it could be scheduled
>>>>>>
>>>>>>         :param date_last_automated_dagrun: The max(execution_date)
>>>>>> of existing
>>>>>>             "automated" DagRuns for this dag (scheduled or backfill,
>>>>>> but not
>>>>>>             manual)
>>>>>>         """
>>>>>>
>>>>>>     @abstractmethod
>>>>>>     def set_context_variables(self, dagrun: DagRun, context: Dict[str
>>>>>> , Any]) -> None:
>>>>>>         """
>>>>>>         Update or set new context variables to become available in
>>>>>> task templates and operators.
>>>>>>         """
>>>>>>
>>>>>>
>>>>>> What do people think of this? Worth it? Too complex to reason about
>>>>>> what context variables might exist as a result?
>>>>>>
>>>>>> *Outstanding question*:
>>>>>>
>>>>>>    - Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>>    proposal vs "DataTimetable") have data_interval_start and end available in
>>>>>>    the context?
>>>>>>    - Does anyone have any better names than TimeDeltaTimetable,
>>>>>>    DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>>    up until release, so not important to get this correct *now*.)
>>>>>>    - Should I try to roll AIP-30
>>>>>>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>>    in to this, or should we make that a future addition? (My vote is for
>>>>>>    future addition)
>>>>>>
>>>>>>
>>>>>> I'd like to start voting on this AIP next week (probably on Tuesday)
>>>>>> as I think this will be a powerful feature that eases confusing to new
>>>>>> users.
>>>>>>
>>>>>> -Ash
>>>>>>
>>>>>>
>>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com>
>>>>>> wrote:
>>>>>>
>>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based
>>>>>> Scheduling To Airflow"?
>>>>>> I think streaming was also discussed there (though it wasn't really
>>>>>> the use case).
>>>>>>
>>>>>>
>>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>>>>>>
>>>>>> Hi Kevin,
>>>>>>
>>>>>> Interesting idea. My original idea was actually for "interval-less
>>>>>> DAGs" (i.e. ones where it's just "run at this time") would not have
>>>>>> data_interval_start or end, but (while drafting the AIP) we decided that it
>>>>>> was probably "easier" if those values were always datetimes.
>>>>>>
>>>>>> That said, I think having the DB model have those values be nullable
>>>>>> would future proof it without needing another migration to change it. Do
>>>>>> you think this is worth doing now?
>>>>>>
>>>>>> I haven't (yet! It's on my list) spent any significant time thinking
>>>>>> about how to make Airflow play nicely with streaming jobs. If anyone else
>>>>>> has ideas here please share them
>>>>>>
>>>>>> -ash
>>>>>>
>>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Ash and James,
>>>>>>
>>>>>> This is an exciting move. What do you think about using this
>>>>>> opportunity to extend Airflow's support to streaming like use cases? I.e
>>>>>> DAGs/tasks that want to run forever like a service. For such use cases,
>>>>>> schedule interval might not be meaningful, then do we want to make the date
>>>>>> interval param optional to DagRun and task instances? That sounds like a
>>>>>> pretty major change to the underlying model of Airflow, but this AIP is so
>>>>>> far the best opportunity I saw that can level up Airflow's support for
>>>>>> streaming/service use cases.
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> Kevin Y
>>>>>>
>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dp...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Very excited to see this proposal come through and love the direction
>>>>>> this has gone.
>>>>>>
>>>>>> Couple comments...
>>>>>>
>>>>>> *Tree view / Data completeness view*
>>>>>>
>>>>>> When you design your tasks with the canonical idempotence pattern,
>>>>>> the tree view shows you both data completeness and task execution history
>>>>>> (success / failure etc).
>>>>>>
>>>>>> When you don't use that pattern (which is my general preference),
>>>>>> tree view is only task execution history.
>>>>>>
>>>>>> This change has the potential to unlock a data completeness view for
>>>>>> canonical tasks.  It's possible that the "data completeness view" can
>>>>>> simply be the tree view.  I.e. somehow it can use these new classes to know
>>>>>> what data was successfully filled and what data wasn't.
>>>>>>
>>>>>> To the extent we like the idea of either extending / plugging /
>>>>>> modifying tree view, or adding a distinct data completeness view, we might
>>>>>> want to anticipate the needs of that in this change.  And maybe no
>>>>>> alteration to the proposal would be needed but just want to throw the idea
>>>>>> out there.
>>>>>>
>>>>>> *Watermark workflow / incremental processing*
>>>>>>
>>>>>> A common pattern in data warehousing is pulling data incrementally
>>>>>> from a source.
>>>>>>
>>>>>> A standard way to achieve this is at the start of the task, select
>>>>>> max `updated_at` in source table and hold on to that value for a minute.
>>>>>> This is your tentative new high watermark.
>>>>>> Now it's time to pull your data.  If your task ran before, grab last
>>>>>> high watermark.  If not, use initial load value.
>>>>>> If successful, update high watermark.
>>>>>>
>>>>>> On my team we implemented this with a stateful tasks / stateful
>>>>>> processes concept (there's a dormant draft AIP here
>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>>
>>>>>> Again here, I don't have a specific suggestion at this moment.  But I
>>>>>> wanted to articulate this workflow because it is common and it wasn't
>>>>>> immediately obvious to me in reading AIP-39 how I would use it to implement
>>>>>> it.
>>>>>>
>>>>>> AIP-39 makes airflow more data-aware.  So if it can support this kind
>>>>>> of workflow that's great.  @Ash Berlin-Taylor <as...@astronomer.io> do
>>>>>> you have thoughts on how it might be compatible with this kind of thing as
>>>>>> is?
>>>>>>
>>>>>> ---
>>>>>>
>>>>>> * The base operator is designed so that Subclasses only need to
>>>>>> implement two methods:
>>>>>>     - `get_high_watermark`: produce the tentative new high watermark
>>>>>>     ' `watermark_execute`: analogous to implementing poke in a
>>>>>> sensor, this is where your work is done. `execute` is left to the base
>>>>>> class, and it orchestrates (1) getting last high watermark or inital load
>>>>>> value and (2) updating new high watermark if job successful.
>>>>>>
>>>>>>

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Xinbin Huang <bi...@gmail.com>.
I agree with Tomek.

TBH, *Timetable *to me does seem to be a complex concept, and I can't quite
understand what it is at first sight.

I think *Schedule *does convey the message better - consider the
sentence: "*the
Scheduler arranges jobs to run based on some _______**.*"  Here, *"schedules"
*seem to fit better than *"timetables"*

As for search results on schedule vs scheduler, I wonder if reorganizing
the docs to have `schedule` in the top section and have `scheduler` under
operation::architecture will help with the search result? (I don't know
much about SEO)

Nevertheless, the naming shouldn't be a blocker for this feature to move
forward.

Best
Bin

On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <tu...@apache.org>
wrote:

> > Timetable is a synonym of ’Schedule’
>
> I'm not a native speaker and I don't get it easily as a synonym.
>
> To be honest the "timetable" sounds like a complex piece of software.
> Personally I experienced that people use schedule and
> schedule_interval interchangeably. Additionally, schedule being more linked
> to scheduler imho is an advantage because it suggests some connection
> between these two.
>
> I feel that by introducing timetable we will bring yet more complexity to
> airflow vocabulary. And I personally would treat it as yet another moving
> part of an already complex system.
>
> I think we can move forward with this feature. We renamed "functional
> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get
> consensus we can always ask the users - they will use the feature.
>
> Best,
> Tomek
>
>
> On Fri, 12 Mar 2021 at 01:15, James Timmins <ja...@astronomer.io.invalid>
> wrote:
>
>> *Timetable vs Schedule*
>> Re Timetable. I agree that if this was a greenfield project, it might
>> make sense to use Schedule. But as it stands, we need to find the right
>> balance between the most specific name and being sufficiently unique that
>> it’s easy to work with in code and, perhaps most importantly, easy to find
>> when searching on Google and in the Airflow Docs.
>>
>> There are more than 10,000 references to `schedule*` in the Airflow
>> codebase. `schedule` and `scheduler` are also identical to most search
>> engines/libraries, since they have the same stem, `schedule`. This means
>> that when a user Googles `Airflow Schedule`, they will get back intermixed
>> results of the Schedule class and the Scheduler.
>>
>> Timetable is a synonym of ’Schedule’, so it passes the accuracy test,
>> won’t ever be ambiguous in code, and is distinct in search results.
>>
>> *Should "interval-less DAGs” have data_interval_start and end available
>> in the context?*
>> I think they should be present so it’s consistent across DAGs. Let’s not
>> make users think too hard about what values are available in what context.
>> What if someone sets the interval to 0? What if sometimes the interval is
>> 0, and sometimes it’s 1 hour? Rather than changing the rules depending on
>> usage, it’s easiest to have one rule that the users can depend upon.
>>
>> *Re set_context_variables()*
>> What context is being defined here? The code comment says "Update or set
>> new context variables to become available in task templates and operators.”
>> The Timetable seems like the wrong place for variables that will get passed
>> into task templates/operators, unless this is actually a way to pass
>> Airflow macros into the Timetable context. In which case I fully support
>> this. If not, we may want to add this functionality.
>>
>> James
>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <ka...@gmail.com>, wrote:
>>
>> Yup I have no strong opinions on either so happy to keep it TimeTable or
>> if there is another suggestion.
>>
>> Regards,
>> Kaxil
>>
>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins <ja...@astronomer.io.invalid>
>> wrote:
>>
>>> Respectfully, I strongly disagree with the renaming of Timetable to
>>> Schedule. Schedule and Scheduler aren't meaningfully different, which can
>>> lead to a lot of confusion. Even as a native English speaker, and someone
>>> who works on Airflow full time, I routinely need to ask for clarification
>>> about what schedule-related concept someone is referring to. I foresee
>>> Schedule and Scheduler as two distinct yet closely related concepts
>>> becoming a major source of confusion.
>>>
>>> If folks dislike Timetable, we could certainly change to something else,
>>> but let's not use something so similar to existing Airflow classes.
>>>
>>> -James
>>>
>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org>
>>> wrote:
>>>
>>>> Summary of changes so far on the AIP:
>>>>
>>>> My proposed rename of DagRun.execution_date is now DagRun.schedule_date
>>>> (previously I had proposed run_date. Thanks dstandish!)
>>>>
>>>> Timetable classes are renamed to Schedule classes (CronSchedule etc),
>>>> similarly the DAG argument is now schedule (reminder: schedule_interval
>>>> will not be removed or deprecated, and will still be the way to use
>>>> "simple" expressions)
>>>>
>>>> -ash
>>>>
>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org>
>>>> wrote:
>>>>
>>>> Could change Timetable To Schedule -- that would mean the DAG arg
>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current
>>>> `schedule_interval` but I think clear enough difference.
>>>>
>>>> I do like the name but my one worry with "schedule" is that Scheduler
>>>> and Schedule are very similar, and might be be confused with each other for
>>>> non-native English speakers? (I defer to others' judgment here, as this is
>>>> not something I can experience myself.)
>>>>
>>>> @Kevin Yang <yr...@gmail.com> @Daniel Standish <dp...@gmail.com> any
>>>> final input on this AIP?
>>>>
>>>>
>>>>
>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com> wrote:
>>>>
>>>> Hi Ash and all,
>>>>
>>>>
>>>> What do people think of this? Worth it? Too complex to reason about
>>>>> what context variables might exist as a result?
>>>>
>>>>
>>>> I think I wouldn't worry about it right now or maybe not as part of
>>>> this AIP. Currently, in one of the Github Issue, a user mentioned that it
>>>> is not straightforward to know what is inside the context dictionary-
>>>> https://github.com/apache/airflow/issues/14396. So maybe we can tackle
>>>> this issue separately once the AbstractTimetable is built.
>>>>
>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my proposal
>>>>> vs "DataTimetable") have data_interval_start and end available in the
>>>>> context?
>>>>
>>>>
>>>> hmm.. I would say No but then it contradicts my suggestion to remove
>>>> context dict from this AIP. If we are going to use it in scheduler then
>>>> yes, where data_interval_start = data_interval_end from CronTimetable.
>>>>
>>>> Does anyone have any better names than TimeDeltaTimetable,
>>>>> DataTimetable, and CronTimetable? (We can probably change these names right
>>>>> up until release, so not important to get this correct *now*.)
>>>>
>>>>
>>>> No strong opinion here. Just an alternate suggestion can
>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>>
>>>>
>>>>> Should I try to roll AIP-30
>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
>>>>> to this, or should we make that a future addition? (My vote is for future
>>>>> addition)
>>>>
>>>>
>>>> I would vote for Future addition too.
>>>>
>>>> Regards,
>>>> Kaxil
>>>>
>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org>
>>>> wrote:
>>>>
>>>>> I think, yes, AIP-35 or something like it would happily co-exist with
>>>>> this proposal.
>>>>>
>>>>> @Daniel <dp...@gmail.com> and I have been discussing this a bit
>>>>> on Slack, and one of the questions he asked was if the concept of
>>>>> data_interval should be moved from DagRun as James and I suggested down on
>>>>> to the individual task:
>>>>>
>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data to
>>>>> s3. suppose that yesterday 4 of them succeeded but one failed. today, 4 of
>>>>> them should pull from yesterday. but the one that failed should pull from 2
>>>>> days back. so even though these normally have the same interval, today they
>>>>> should not.
>>>>>
>>>>>
>>>>> My view on this is two fold: one, this should primarily be handled by
>>>>> retries on the task, and secondly, having different TaskIstances in the
>>>>> same DagRun  have different data intervals would be much harder to reason
>>>>> about/design the UI around, so for those reasons I still think interval
>>>>> should be a DagRun-level concept.
>>>>>
>>>>> (He has a stalled AIP-30 where he proposed something to address this
>>>>> kind of "watermark" case, which we might pick up next after this AIP is
>>>>> complete)
>>>>>
>>>>> One thing we might want to do is extend the interface of
>>>>> AbstractTimetable to be able to add/update parameters in the context dict,
>>>>> so the interface could become this:
>>>>>
>>>>> class AbstractTimetable(ABC):
>>>>>     @abstractmethod
>>>>>     def next_dagrun_info(
>>>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>>
>>>>>         session: Session,
>>>>>     ) -> Optional[DagRunInfo]:
>>>>>         """
>>>>>         Get information about the next DagRun of this dag after
>>>>> ``date_last_automated_dagrun`` -- the
>>>>>         execution date, and the earliest it could be scheduled
>>>>>
>>>>>         :param date_last_automated_dagrun: The max(execution_date) of
>>>>> existing
>>>>>             "automated" DagRuns for this dag (scheduled or backfill,
>>>>> but not
>>>>>             manual)
>>>>>         """
>>>>>
>>>>>     @abstractmethod
>>>>>     def set_context_variables(self, dagrun: DagRun, context: Dict[str,
>>>>> Any]) -> None:
>>>>>         """
>>>>>         Update or set new context variables to become available in
>>>>> task templates and operators.
>>>>>         """
>>>>>
>>>>>
>>>>> What do people think of this? Worth it? Too complex to reason about
>>>>> what context variables might exist as a result?
>>>>>
>>>>> *Outstanding question*:
>>>>>
>>>>>    - Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>    proposal vs "DataTimetable") have data_interval_start and end available in
>>>>>    the context?
>>>>>    - Does anyone have any better names than TimeDeltaTimetable,
>>>>>    DataTimetable, and CronTimetable? (We can probably change these names right
>>>>>    up until release, so not important to get this correct *now*.)
>>>>>    - Should I try to roll AIP-30
>>>>>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>    in to this, or should we make that a future addition? (My vote is for
>>>>>    future addition)
>>>>>
>>>>>
>>>>> I'd like to start voting on this AIP next week (probably on Tuesday)
>>>>> as I think this will be a powerful feature that eases confusing to new
>>>>> users.
>>>>>
>>>>> -Ash
>>>>>
>>>>>
>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com>
>>>>> wrote:
>>>>>
>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling
>>>>> To Airflow"?
>>>>> I think streaming was also discussed there (though it wasn't really
>>>>> the use case).
>>>>>
>>>>>
>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>>>>>
>>>>> Hi Kevin,
>>>>>
>>>>> Interesting idea. My original idea was actually for "interval-less
>>>>> DAGs" (i.e. ones where it's just "run at this time") would not have
>>>>> data_interval_start or end, but (while drafting the AIP) we decided that it
>>>>> was probably "easier" if those values were always datetimes.
>>>>>
>>>>> That said, I think having the DB model have those values be nullable
>>>>> would future proof it without needing another migration to change it. Do
>>>>> you think this is worth doing now?
>>>>>
>>>>> I haven't (yet! It's on my list) spent any significant time thinking
>>>>> about how to make Airflow play nicely with streaming jobs. If anyone else
>>>>> has ideas here please share them
>>>>>
>>>>> -ash
>>>>>
>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com> wrote:
>>>>>
>>>>> Hi Ash and James,
>>>>>
>>>>> This is an exciting move. What do you think about using this
>>>>> opportunity to extend Airflow's support to streaming like use cases? I.e
>>>>> DAGs/tasks that want to run forever like a service. For such use cases,
>>>>> schedule interval might not be meaningful, then do we want to make the date
>>>>> interval param optional to DagRun and task instances? That sounds like a
>>>>> pretty major change to the underlying model of Airflow, but this AIP is so
>>>>> far the best opportunity I saw that can level up Airflow's support for
>>>>> streaming/service use cases.
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Kevin Y
>>>>>
>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dp...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Very excited to see this proposal come through and love the direction
>>>>> this has gone.
>>>>>
>>>>> Couple comments...
>>>>>
>>>>> *Tree view / Data completeness view*
>>>>>
>>>>> When you design your tasks with the canonical idempotence pattern, the
>>>>> tree view shows you both data completeness and task execution history
>>>>> (success / failure etc).
>>>>>
>>>>> When you don't use that pattern (which is my general preference), tree
>>>>> view is only task execution history.
>>>>>
>>>>> This change has the potential to unlock a data completeness view for
>>>>> canonical tasks.  It's possible that the "data completeness view" can
>>>>> simply be the tree view.  I.e. somehow it can use these new classes to know
>>>>> what data was successfully filled and what data wasn't.
>>>>>
>>>>> To the extent we like the idea of either extending / plugging /
>>>>> modifying tree view, or adding a distinct data completeness view, we might
>>>>> want to anticipate the needs of that in this change.  And maybe no
>>>>> alteration to the proposal would be needed but just want to throw the idea
>>>>> out there.
>>>>>
>>>>> *Watermark workflow / incremental processing*
>>>>>
>>>>> A common pattern in data warehousing is pulling data incrementally
>>>>> from a source.
>>>>>
>>>>> A standard way to achieve this is at the start of the task, select max
>>>>> `updated_at` in source table and hold on to that value for a minute.  This
>>>>> is your tentative new high watermark.
>>>>> Now it's time to pull your data.  If your task ran before, grab last
>>>>> high watermark.  If not, use initial load value.
>>>>> If successful, update high watermark.
>>>>>
>>>>> On my team we implemented this with a stateful tasks / stateful
>>>>> processes concept (there's a dormant draft AIP here
>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>
>>>>> Again here, I don't have a specific suggestion at this moment.  But I
>>>>> wanted to articulate this workflow because it is common and it wasn't
>>>>> immediately obvious to me in reading AIP-39 how I would use it to implement
>>>>> it.
>>>>>
>>>>> AIP-39 makes airflow more data-aware.  So if it can support this kind
>>>>> of workflow that's great.  @Ash Berlin-Taylor <as...@astronomer.io> do
>>>>> you have thoughts on how it might be compatible with this kind of thing as
>>>>> is?
>>>>>
>>>>> ---
>>>>>
>>>>> * The base operator is designed so that Subclasses only need to
>>>>> implement two methods:
>>>>>     - `get_high_watermark`: produce the tentative new high watermark
>>>>>     ' `watermark_execute`: analogous to implementing poke in a sensor,
>>>>> this is where your work is done. `execute` is left to the base class, and
>>>>> it orchestrates (1) getting last high watermark or inital load value and
>>>>> (2) updating new high watermark if job successful.
>>>>>
>>>>>

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Tomasz Urbaszek <tu...@apache.org>.
> Timetable is a synonym of ’Schedule’

I'm not a native speaker and I don't get it easily as a synonym.

To be honest the "timetable" sounds like a complex piece of software.
Personally I experienced that people use schedule and
schedule_interval interchangeably. Additionally, schedule being more linked
to scheduler imho is an advantage because it suggests some connection
between these two.

I feel that by introducing timetable we will bring yet more complexity to
airflow vocabulary. And I personally would treat it as yet another moving
part of an already complex system.

I think we can move forward with this feature. We renamed "functional DAGs"
to "Taskflow API" so, naming is not a blocker. If we can't get consensus we
can always ask the users - they will use the feature.

Best,
Tomek


On Fri, 12 Mar 2021 at 01:15, James Timmins <ja...@astronomer.io.invalid>
wrote:

> *Timetable vs Schedule*
> Re Timetable. I agree that if this was a greenfield project, it might make
> sense to use Schedule. But as it stands, we need to find the right balance
> between the most specific name and being sufficiently unique that it’s easy
> to work with in code and, perhaps most importantly, easy to find when
> searching on Google and in the Airflow Docs.
>
> There are more than 10,000 references to `schedule*` in the Airflow
> codebase. `schedule` and `scheduler` are also identical to most search
> engines/libraries, since they have the same stem, `schedule`. This means
> that when a user Googles `Airflow Schedule`, they will get back intermixed
> results of the Schedule class and the Scheduler.
>
> Timetable is a synonym of ’Schedule’, so it passes the accuracy test,
> won’t ever be ambiguous in code, and is distinct in search results.
>
> *Should "interval-less DAGs” have data_interval_start and end available in
> the context?*
> I think they should be present so it’s consistent across DAGs. Let’s not
> make users think too hard about what values are available in what context.
> What if someone sets the interval to 0? What if sometimes the interval is
> 0, and sometimes it’s 1 hour? Rather than changing the rules depending on
> usage, it’s easiest to have one rule that the users can depend upon.
>
> *Re set_context_variables()*
> What context is being defined here? The code comment says "Update or set
> new context variables to become available in task templates and operators.”
> The Timetable seems like the wrong place for variables that will get passed
> into task templates/operators, unless this is actually a way to pass
> Airflow macros into the Timetable context. In which case I fully support
> this. If not, we may want to add this functionality.
>
> James
> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <ka...@gmail.com>, wrote:
>
> Yup I have no strong opinions on either so happy to keep it TimeTable or
> if there is another suggestion.
>
> Regards,
> Kaxil
>
> On Thu, Mar 11, 2021 at 5:00 PM James Timmins <ja...@astronomer.io.invalid>
> wrote:
>
>> Respectfully, I strongly disagree with the renaming of Timetable to
>> Schedule. Schedule and Scheduler aren't meaningfully different, which can
>> lead to a lot of confusion. Even as a native English speaker, and someone
>> who works on Airflow full time, I routinely need to ask for clarification
>> about what schedule-related concept someone is referring to. I foresee
>> Schedule and Scheduler as two distinct yet closely related concepts
>> becoming a major source of confusion.
>>
>> If folks dislike Timetable, we could certainly change to something else,
>> but let's not use something so similar to existing Airflow classes.
>>
>> -James
>>
>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org> wrote:
>>
>>> Summary of changes so far on the AIP:
>>>
>>> My proposed rename of DagRun.execution_date is now DagRun.schedule_date
>>> (previously I had proposed run_date. Thanks dstandish!)
>>>
>>> Timetable classes are renamed to Schedule classes (CronSchedule etc),
>>> similarly the DAG argument is now schedule (reminder: schedule_interval
>>> will not be removed or deprecated, and will still be the way to use
>>> "simple" expressions)
>>>
>>> -ash
>>>
>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>>
>>> Could change Timetable To Schedule -- that would mean the DAG arg
>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current
>>> `schedule_interval` but I think clear enough difference.
>>>
>>> I do like the name but my one worry with "schedule" is that Scheduler
>>> and Schedule are very similar, and might be be confused with each other for
>>> non-native English speakers? (I defer to others' judgment here, as this is
>>> not something I can experience myself.)
>>>
>>> @Kevin Yang <yr...@gmail.com> @Daniel Standish <dp...@gmail.com> any
>>> final input on this AIP?
>>>
>>>
>>>
>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com> wrote:
>>>
>>> Hi Ash and all,
>>>
>>>
>>> What do people think of this? Worth it? Too complex to reason about what
>>>> context variables might exist as a result?
>>>
>>>
>>> I think I wouldn't worry about it right now or maybe not as part of this
>>> AIP. Currently, in one of the Github Issue, a user mentioned that it is not
>>> straightforward to know what is inside the context dictionary-
>>> https://github.com/apache/airflow/issues/14396. So maybe we can tackle
>>> this issue separately once the AbstractTimetable is built.
>>>
>>> Should "interval-less DAGs" (ones using "CronTimetable" in my proposal
>>>> vs "DataTimetable") have data_interval_start and end available in the
>>>> context?
>>>
>>>
>>> hmm.. I would say No but then it contradicts my suggestion to remove
>>> context dict from this AIP. If we are going to use it in scheduler then
>>> yes, where data_interval_start = data_interval_end from CronTimetable.
>>>
>>> Does anyone have any better names than TimeDeltaTimetable,
>>>> DataTimetable, and CronTimetable? (We can probably change these names right
>>>> up until release, so not important to get this correct *now*.)
>>>
>>>
>>> No strong opinion here. Just an alternate suggestion can
>>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>
>>>
>>>> Should I try to roll AIP-30
>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
>>>> to this, or should we make that a future addition? (My vote is for future
>>>> addition)
>>>
>>>
>>> I would vote for Future addition too.
>>>
>>> Regards,
>>> Kaxil
>>>
>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org>
>>> wrote:
>>>
>>>> I think, yes, AIP-35 or something like it would happily co-exist with
>>>> this proposal.
>>>>
>>>> @Daniel <dp...@gmail.com> and I have been discussing this a bit
>>>> on Slack, and one of the questions he asked was if the concept of
>>>> data_interval should be moved from DagRun as James and I suggested down on
>>>> to the individual task:
>>>>
>>>> suppose i have a new dag hitting 5 api endpoints and pulling data to
>>>> s3. suppose that yesterday 4 of them succeeded but one failed. today, 4 of
>>>> them should pull from yesterday. but the one that failed should pull from 2
>>>> days back. so even though these normally have the same interval, today they
>>>> should not.
>>>>
>>>>
>>>> My view on this is two fold: one, this should primarily be handled by
>>>> retries on the task, and secondly, having different TaskIstances in the
>>>> same DagRun  have different data intervals would be much harder to reason
>>>> about/design the UI around, so for those reasons I still think interval
>>>> should be a DagRun-level concept.
>>>>
>>>> (He has a stalled AIP-30 where he proposed something to address this
>>>> kind of "watermark" case, which we might pick up next after this AIP is
>>>> complete)
>>>>
>>>> One thing we might want to do is extend the interface of
>>>> AbstractTimetable to be able to add/update parameters in the context dict,
>>>> so the interface could become this:
>>>>
>>>> class AbstractTimetable(ABC):
>>>>     @abstractmethod
>>>>     def next_dagrun_info(
>>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>
>>>>         session: Session,
>>>>     ) -> Optional[DagRunInfo]:
>>>>         """
>>>>         Get information about the next DagRun of this dag after
>>>> ``date_last_automated_dagrun`` -- the
>>>>         execution date, and the earliest it could be scheduled
>>>>
>>>>         :param date_last_automated_dagrun: The max(execution_date) of
>>>> existing
>>>>             "automated" DagRuns for this dag (scheduled or backfill,
>>>> but not
>>>>             manual)
>>>>         """
>>>>
>>>>     @abstractmethod
>>>>     def set_context_variables(self, dagrun: DagRun, context: Dict[str,
>>>> Any]) -> None:
>>>>         """
>>>>         Update or set new context variables to become available in
>>>> task templates and operators.
>>>>         """
>>>>
>>>>
>>>> What do people think of this? Worth it? Too complex to reason about
>>>> what context variables might exist as a result?
>>>>
>>>> *Outstanding question*:
>>>>
>>>>    - Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>    proposal vs "DataTimetable") have data_interval_start and end available in
>>>>    the context?
>>>>    - Does anyone have any better names than TimeDeltaTimetable,
>>>>    DataTimetable, and CronTimetable? (We can probably change these names right
>>>>    up until release, so not important to get this correct *now*.)
>>>>    - Should I try to roll AIP-30
>>>>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>    in to this, or should we make that a future addition? (My vote is for
>>>>    future addition)
>>>>
>>>>
>>>> I'd like to start voting on this AIP next week (probably on Tuesday) as
>>>> I think this will be a powerful feature that eases confusing to new users.
>>>>
>>>> -Ash
>>>>
>>>>
>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com>
>>>> wrote:
>>>>
>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling
>>>> To Airflow"?
>>>> I think streaming was also discussed there (though it wasn't really the
>>>> use case).
>>>>
>>>>
>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>>>>
>>>> Hi Kevin,
>>>>
>>>> Interesting idea. My original idea was actually for "interval-less
>>>> DAGs" (i.e. ones where it's just "run at this time") would not have
>>>> data_interval_start or end, but (while drafting the AIP) we decided that it
>>>> was probably "easier" if those values were always datetimes.
>>>>
>>>> That said, I think having the DB model have those values be nullable
>>>> would future proof it without needing another migration to change it. Do
>>>> you think this is worth doing now?
>>>>
>>>> I haven't (yet! It's on my list) spent any significant time thinking
>>>> about how to make Airflow play nicely with streaming jobs. If anyone else
>>>> has ideas here please share them
>>>>
>>>> -ash
>>>>
>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com> wrote:
>>>>
>>>> Hi Ash and James,
>>>>
>>>> This is an exciting move. What do you think about using this
>>>> opportunity to extend Airflow's support to streaming like use cases? I.e
>>>> DAGs/tasks that want to run forever like a service. For such use cases,
>>>> schedule interval might not be meaningful, then do we want to make the date
>>>> interval param optional to DagRun and task instances? That sounds like a
>>>> pretty major change to the underlying model of Airflow, but this AIP is so
>>>> far the best opportunity I saw that can level up Airflow's support for
>>>> streaming/service use cases.
>>>>
>>>>
>>>> Cheers,
>>>> Kevin Y
>>>>
>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dp...@gmail.com>
>>>> wrote:
>>>>
>>>> Very excited to see this proposal come through and love the direction
>>>> this has gone.
>>>>
>>>> Couple comments...
>>>>
>>>> *Tree view / Data completeness view*
>>>>
>>>> When you design your tasks with the canonical idempotence pattern, the
>>>> tree view shows you both data completeness and task execution history
>>>> (success / failure etc).
>>>>
>>>> When you don't use that pattern (which is my general preference), tree
>>>> view is only task execution history.
>>>>
>>>> This change has the potential to unlock a data completeness view for
>>>> canonical tasks.  It's possible that the "data completeness view" can
>>>> simply be the tree view.  I.e. somehow it can use these new classes to know
>>>> what data was successfully filled and what data wasn't.
>>>>
>>>> To the extent we like the idea of either extending / plugging /
>>>> modifying tree view, or adding a distinct data completeness view, we might
>>>> want to anticipate the needs of that in this change.  And maybe no
>>>> alteration to the proposal would be needed but just want to throw the idea
>>>> out there.
>>>>
>>>> *Watermark workflow / incremental processing*
>>>>
>>>> A common pattern in data warehousing is pulling data incrementally from
>>>> a source.
>>>>
>>>> A standard way to achieve this is at the start of the task, select max
>>>> `updated_at` in source table and hold on to that value for a minute.  This
>>>> is your tentative new high watermark.
>>>> Now it's time to pull your data.  If your task ran before, grab last
>>>> high watermark.  If not, use initial load value.
>>>> If successful, update high watermark.
>>>>
>>>> On my team we implemented this with a stateful tasks / stateful
>>>> processes concept (there's a dormant draft AIP here
>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>
>>>> Again here, I don't have a specific suggestion at this moment.  But I
>>>> wanted to articulate this workflow because it is common and it wasn't
>>>> immediately obvious to me in reading AIP-39 how I would use it to implement
>>>> it.
>>>>
>>>> AIP-39 makes airflow more data-aware.  So if it can support this kind
>>>> of workflow that's great.  @Ash Berlin-Taylor <as...@astronomer.io> do
>>>> you have thoughts on how it might be compatible with this kind of thing as
>>>> is?
>>>>
>>>> ---
>>>>
>>>> * The base operator is designed so that Subclasses only need to
>>>> implement two methods:
>>>>     - `get_high_watermark`: produce the tentative new high watermark
>>>>     ' `watermark_execute`: analogous to implementing poke in a sensor,
>>>> this is where your work is done. `execute` is left to the base class, and
>>>> it orchestrates (1) getting last high watermark or inital load value and
>>>> (2) updating new high watermark if job successful.
>>>>
>>>>

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by James Timmins <ja...@astronomer.io.INVALID>.
Timetable vs Schedule
Re Timetable. I agree that if this was a greenfield project, it might make sense to use Schedule. But as it stands, we need to find the right balance between the most specific name and being sufficiently unique that it’s easy to work with in code and, perhaps most importantly, easy to find when searching on Google and in the Airflow Docs.

There are more than 10,000 references to `schedule*` in the Airflow codebase. `schedule` and `scheduler` are also identical to most search engines/libraries, since they have the same stem, `schedule`. This means that when a user Googles `Airflow Schedule`, they will get back intermixed results of the Schedule class and the Scheduler.

Timetable is a synonym of ’Schedule’, so it passes the accuracy test, won’t ever be ambiguous in code, and is distinct in search results.

Should "interval-less DAGs” have data_interval_start and end available in the context?
I think they should be present so it’s consistent across DAGs. Let’s not make users think too hard about what values are available in what context. What if someone sets the interval to 0? What if sometimes the interval is 0, and sometimes it’s 1 hour? Rather than changing the rules depending on usage, it’s easiest to have one rule that the users can depend upon.

Re set_context_variables()
What context is being defined here? The code comment says "Update or set new context variables to become available in task templates and operators.” The Timetable seems like the wrong place for variables that will get passed into task templates/operators, unless this is actually a way to pass Airflow macros into the Timetable context. In which case I fully support this. If not, we may want to add this functionality.

James
On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <ka...@gmail.com>, wrote:
> Yup I have no strong opinions on either so happy to keep it TimeTable or if there is another suggestion.
>
> Regards,
> Kaxil
>
> > On Thu, Mar 11, 2021 at 5:00 PM James Timmins <ja...@astronomer.io.invalid> wrote:
> > > Respectfully, I strongly disagree with the renaming of Timetable to Schedule. Schedule and Scheduler aren't meaningfully different, which can lead to a lot of confusion. Even as a native English speaker, and someone who works on Airflow full time, I routinely need to ask for clarification about what schedule-related concept someone is referring to. I foresee Schedule and Scheduler as two distinct yet closely related concepts becoming a major source of confusion.
> > >
> > > If folks dislike Timetable, we could certainly change to something else, but let's not use something so similar to existing Airflow classes.
> > >
> > > -James
> > >
> > > > On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org> wrote:
> > > > > Summary of changes so far on the AIP:
> > > > >
> > > > > My proposed rename of DagRun.execution_date is now DagRun.schedule_date (previously I had proposed run_date. Thanks dstandish!)
> > > > >
> > > > > Timetable classes are renamed to Schedule classes (CronSchedule etc), similarly the DAG argument is now schedule (reminder: schedule_interval will not be removed or deprecated, and will still be the way to use "simple" expressions)
> > > > >
> > > > > -ash
> > > > >
> > > > > On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org> wrote:
> > > > > > Could change Timetable To Schedule -- that would mean the DAG arg becomes `schedule=CronSchedule(...)` -- a bit close to the current `schedule_interval` but I think clear enough difference.
> > > > > >
> > > > > > I do like the name but my one worry with "schedule" is that Scheduler and Schedule are very similar, and might be be confused with each other for non-native English speakers? (I defer to others' judgment here, as this is not something I can experience myself.)
> > > > > >
> > > > > > @Kevin Yang @Daniel Standish any final input on this AIP?
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com> wrote:
> > > > > > > Hi Ash and all,
> > > > > > >
> > > > > > >
> > > > > > > > What do people think of this? Worth it? Too complex to reason about what context variables might exist as a result?
> > > > > > >
> > > > > > > I think I wouldn't worry about it right now or maybe not as part of this AIP. Currently, in one of the Github Issue, a user mentioned that it is not straightforward to know what is inside the context dictionary- https://github.com/apache/airflow/issues/14396. So maybe we can tackle this issue separately once the AbstractTimetable is built.
> > > > > > >
> > > > > > > > Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs "DataTimetable") have data_interval_start and end available in the context?
> > > > > > >
> > > > > > > hmm.. I would say No but then it contradicts my suggestion to remove context dict from this AIP. If we are going to use it in scheduler then yes, where data_interval_start = data_interval_end from CronTimetable.
> > > > > > >
> > > > > > > > Does anyone have any better names than TimeDeltaTimetable, DataTimetable, and CronTimetable? (We can probably change these names right up until release, so not important to get this correct now.)
> > > > > > >
> > > > > > > No strong opinion here. Just an alternate suggestion can be TimeDeltaSchedule, DataSchedule and CronSchedule
> > > > > > >
> > > > > > > > Should I try to roll AIP-30 in to this, or should we make that a future addition? (My vote is for future addition)
> > > > > > >
> > > > > > > I would vote for Future addition too.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Kaxil
> > > > > > >
> > > > > > > > On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org> wrote:
> > > > > > > > > I think, yes, AIP-35 or something like it would happily co-exist with this proposal.
> > > > > > > > >
> > > > > > > > > @Daniel and I have been discussing this a bit on Slack, and one of the questions he asked was if the concept of data_interval should be moved from DagRun as James and I suggested down on to the individual task:
> > > > > > > > >
> > > > > > > > > > suppose i have a new dag hitting 5 api endpoints and pulling data to s3. suppose that yesterday 4 of them succeeded but one failed. today, 4 of them should pull from yesterday. but the one that failed should pull from 2 days back. so even though these normally have the same interval, today they should not.
> > > > > > > > >
> > > > > > > > > My view on this is two fold: one, this should primarily be handled by retries on the task, and secondly, having different TaskIstances in the same DagRun  have different data intervals would be much harder to reason about/design the UI around, so for those reasons I still think interval should be a DagRun-level concept.
> > > > > > > > >
> > > > > > > > > (He has a stalled AIP-30 where he proposed something to address this kind of "watermark" case, which we might pick up next after this AIP is complete)
> > > > > > > > >
> > > > > > > > > One thing we might want to do is extend the interface of AbstractTimetable to be able to add/update parameters in the context dict, so the interface could become this:
> > > > > > > > >
> > > > > > > > > class AbstractTimetable(ABC):
> > > > > > > > >     @abstractmethod
> > > > > > > > >     def next_dagrun_info(
> > > > > > > > >         date_last_automated_dagrun: Optional[pendulum.DateTime],
> > > > > > > > >
> > > > > > > > >         session: Session,
> > > > > > > > >     ) -> Optional[DagRunInfo]:
> > > > > > > > >         """
> > > > > > > > >         Get information about the next DagRun of this dag after ``date_last_automated_dagrun`` -- the
> > > > > > > > >         execution date, and the earliest it could be scheduled
> > > > > > > > >
> > > > > > > > >         :param date_last_automated_dagrun: The max(execution_date) of existing
> > > > > > > > >             "automated" DagRuns for this dag (scheduled or backfill, but not
> > > > > > > > >             manual)
> > > > > > > > >         """
> > > > > > > > >
> > > > > > > > >     @abstractmethod
> > > > > > > > >     def set_context_variables(self, dagrun: DagRun, context: Dict[str, Any]) -> None:
> > > > > > > > >         """
> > > > > > > > >         Update or set new context variables to become available in task templates and operators.
> > > > > > > > >         """
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > What do people think of this? Worth it? Too complex to reason about what context variables might exist as a result?
> > > > > > > > >
> > > > > > > > > Outstanding question:
> > > > > > > > >
> > > > > > > > > • Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs "DataTimetable") have data_interval_start and end available in the context?
> > > > > > > > > • Does anyone have any better names than TimeDeltaTimetable, DataTimetable, and CronTimetable? (We can probably change these names right up until release, so not important to get this correct now.)
> > > > > > > > > • Should I try to roll AIP-30 in to this, or should we make that a future addition? (My vote is for future addition)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I'd like to start voting on this AIP next week (probably on Tuesday) as I think this will be a powerful feature that eases confusing to new users.
> > > > > > > > >
> > > > > > > > > -Ash
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com> wrote:
> > > > > > > > > > Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling To Airflow"?
> > > > > > > > > > I think streaming was also discussed there (though it wasn't really the use case).
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
> > > > > > > > > > > Hi Kevin,
> > > > > > > > > > >
> > > > > > > > > > > Interesting idea. My original idea was actually for "interval-less DAGs" (i.e. ones where it's just "run at this time") would not have data_interval_start or end, but (while drafting the AIP) we decided that it was probably "easier" if those values were always datetimes.
> > > > > > > > > > >
> > > > > > > > > > > That said, I think having the DB model have those values be nullable would future proof it without needing another migration to change it. Do you think this is worth doing now?
> > > > > > > > > > >
> > > > > > > > > > > I haven't (yet! It's on my list) spent any significant time thinking about how to make Airflow play nicely with streaming jobs. If anyone else has ideas here please share them
> > > > > > > > > > >
> > > > > > > > > > > -ash
> > > > > > > > > > >
> > > > > > > > > > > On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com> wrote:
> > > > > > > > > > > > Hi Ash and James,
> > > > > > > > > > > >
> > > > > > > > > > > > This is an exciting move. What do you think about using this opportunity to extend Airflow's support to streaming like use cases? I.e DAGs/tasks that want to run forever like a service. For such use cases, schedule interval might not be meaningful, then do we want to make the date interval param optional to DagRun and task instances? That sounds like a pretty major change to the underlying model of Airflow, but this AIP is so far the best opportunity I saw that can level up Airflow's support for streaming/service use cases.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > > Kevin Y
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dp...@gmail.com> wrote:
> > > > > > > > > > > > > Very excited to see this proposal come through and love the direction this has gone.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Couple comments...
> > > > > > > > > > > > >
> > > > > > > > > > > > > Tree view / Data completeness view
> > > > > > > > > > > > >
> > > > > > > > > > > > > When you design your tasks with the canonical idempotence pattern, the tree view shows you both data completeness and task execution history (success / failure etc).
> > > > > > > > > > > > >
> > > > > > > > > > > > > When you don't use that pattern (which is my general preference), tree view is only task execution history.
> > > > > > > > > > > > >
> > > > > > > > > > > > > This change has the potential to unlock a data completeness view for canonical tasks.  It's possible that the "data completeness view" can simply be the tree view.  I.e. somehow it can use these new classes to know what data was successfully filled and what data wasn't.
> > > > > > > > > > > > >
> > > > > > > > > > > > > To the extent we like the idea of either extending / plugging / modifying tree view, or adding a distinct data completeness view, we might want to anticipate the needs of that in this change.  And maybe no alteration to the proposal would be needed but just want to throw the idea out there.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Watermark workflow / incremental processing
> > > > > > > > > > > > >
> > > > > > > > > > > > > A common pattern in data warehousing is pulling data incrementally from a source.
> > > > > > > > > > > > >
> > > > > > > > > > > > > A standard way to achieve this is at the start of the task, select max `updated_at` in source table and hold on to that value for a minute.  This is your tentative new high watermark.
> > > > > > > > > > > > > Now it's time to pull your data.  If your task ran before, grab last high watermark.  If not, use initial load value.
> > > > > > > > > > > > > If successful, update high watermark.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On my team we implemented this with a stateful tasks / stateful processes concept (there's a dormant draft AIP here) and a WatermarkOperator that handled the boilerplate*.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Again here, I don't have a specific suggestion at this moment.  But I wanted to articulate this workflow because it is common and it wasn't immediately obvious to me in reading AIP-39 how I would use it to implement it.
> > > > > > > > > > > > >
> > > > > > > > > > > > > AIP-39 makes airflow more data-aware.  So if it can support this kind of workflow that's great.  @Ash Berlin-Taylor do you have thoughts on how it might be compatible with this kind of thing as is?
> > > > > > > > > > > > >
> > > > > > > > > > > > > ---
> > > > > > > > > > > > >
> > > > > > > > > > > > > * The base operator is designed so that Subclasses only need to implement two methods:
> > > > > > > > > > > > >     - `get_high_watermark`: produce the tentative new high watermark
> > > > > > > > > > > > >     ' `watermark_execute`: analogous to implementing poke in a sensor, this is where your work is done. `execute` is left to the base class, and it orchestrates (1) getting last high watermark or inital load value and (2) updating new high watermark if job successful.
> > > > > > > > > > > > >

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Kaxil Naik <ka...@gmail.com>.
Yup I have no strong opinions on either so happy to keep it TimeTable or if
there is another suggestion.

Regards,
Kaxil

On Thu, Mar 11, 2021 at 5:00 PM James Timmins <ja...@astronomer.io.invalid>
wrote:

> Respectfully, I strongly disagree with the renaming of Timetable to
> Schedule. Schedule and Scheduler aren't meaningfully different, which can
> lead to a lot of confusion. Even as a native English speaker, and someone
> who works on Airflow full time, I routinely need to ask for clarification
> about what schedule-related concept someone is referring to. I foresee
> Schedule and Scheduler as two distinct yet closely related concepts
> becoming a major source of confusion.
>
> If folks dislike Timetable, we could certainly change to something else,
> but let's not use something so similar to existing Airflow classes.
>
> -James
>
> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org> wrote:
>
>> Summary of changes so far on the AIP:
>>
>> My proposed rename of DagRun.execution_date is now DagRun.schedule_date
>> (previously I had proposed run_date. Thanks dstandish!)
>>
>> Timetable classes are renamed to Schedule classes (CronSchedule etc),
>> similarly the DAG argument is now schedule (reminder: schedule_interval
>> will not be removed or deprecated, and will still be the way to use
>> "simple" expressions)
>>
>> -ash
>>
>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>>
>> Could change Timetable To Schedule -- that would mean the DAG arg becomes
>> `schedule=CronSchedule(...)` -- a bit close to the current
>> `schedule_interval` but I think clear enough difference.
>>
>> I do like the name but my one worry with "schedule" is that Scheduler and
>> Schedule are very similar, and might be be confused with each other for
>> non-native English speakers? (I defer to others' judgment here, as this is
>> not something I can experience myself.)
>>
>> @Kevin Yang <yr...@gmail.com> @Daniel Standish <dp...@gmail.com> any
>> final input on this AIP?
>>
>>
>>
>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com> wrote:
>>
>> Hi Ash and all,
>>
>>
>> What do people think of this? Worth it? Too complex to reason about what
>>> context variables might exist as a result?
>>
>>
>> I think I wouldn't worry about it right now or maybe not as part of this
>> AIP. Currently, in one of the Github Issue, a user mentioned that it is not
>> straightforward to know what is inside the context dictionary-
>> https://github.com/apache/airflow/issues/14396. So maybe we can tackle
>> this issue separately once the AbstractTimetable is built.
>>
>> Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs
>>> "DataTimetable") have data_interval_start and end available in the context?
>>
>>
>> hmm.. I would say No but then it contradicts my suggestion to remove
>> context dict from this AIP. If we are going to use it in scheduler then
>> yes, where data_interval_start = data_interval_end from CronTimetable.
>>
>> Does anyone have any better names than TimeDeltaTimetable, DataTimetable,
>>> and CronTimetable? (We can probably change these names right up until
>>> release, so not important to get this correct *now*.)
>>
>>
>> No strong opinion here. Just an alternate suggestion can
>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>
>>
>>> Should I try to roll AIP-30
>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
>>> to this, or should we make that a future addition? (My vote is for future
>>> addition)
>>
>>
>> I would vote for Future addition too.
>>
>> Regards,
>> Kaxil
>>
>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org> wrote:
>>
>>> I think, yes, AIP-35 or something like it would happily co-exist with
>>> this proposal.
>>>
>>> @Daniel <dp...@gmail.com> and I have been discussing this a bit on
>>> Slack, and one of the questions he asked was if the concept of
>>> data_interval should be moved from DagRun as James and I suggested down on
>>> to the individual task:
>>>
>>> suppose i have a new dag hitting 5 api endpoints and pulling data to s3.
>>> suppose that yesterday 4 of them succeeded but one failed. today, 4 of them
>>> should pull from yesterday. but the one that failed should pull from 2 days
>>> back. so even though these normally have the same interval, today they
>>> should not.
>>>
>>>
>>> My view on this is two fold: one, this should primarily be handled by
>>> retries on the task, and secondly, having different TaskIstances in the
>>> same DagRun  have different data intervals would be much harder to reason
>>> about/design the UI around, so for those reasons I still think interval
>>> should be a DagRun-level concept.
>>>
>>> (He has a stalled AIP-30 where he proposed something to address this
>>> kind of "watermark" case, which we might pick up next after this AIP is
>>> complete)
>>>
>>> One thing we might want to do is extend the interface of
>>> AbstractTimetable to be able to add/update parameters in the context dict,
>>> so the interface could become this:
>>>
>>> class AbstractTimetable(ABC):
>>>     @abstractmethod
>>>     def next_dagrun_info(
>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>
>>>         session: Session,
>>>     ) -> Optional[DagRunInfo]:
>>>         """
>>>         Get information about the next DagRun of this dag after
>>> ``date_last_automated_dagrun`` -- the
>>>         execution date, and the earliest it could be scheduled
>>>
>>>         :param date_last_automated_dagrun: The max(execution_date) of
>>> existing
>>>             "automated" DagRuns for this dag (scheduled or backfill,
>>> but not
>>>             manual)
>>>         """
>>>
>>>     @abstractmethod
>>>     def set_context_variables(self, dagrun: DagRun, context: Dict[str,
>>> Any]) -> None:
>>>         """
>>>         Update or set new context variables to become available in task
>>> templates and operators.
>>>         """
>>>
>>>
>>> What do people think of this? Worth it? Too complex to reason about what
>>> context variables might exist as a result?
>>>
>>> *Outstanding question*:
>>>
>>>    - Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>    proposal vs "DataTimetable") have data_interval_start and end available in
>>>    the context?
>>>    - Does anyone have any better names than TimeDeltaTimetable,
>>>    DataTimetable, and CronTimetable? (We can probably change these names right
>>>    up until release, so not important to get this correct *now*.)
>>>    - Should I try to roll AIP-30
>>>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>    in to this, or should we make that a future addition? (My vote is for
>>>    future addition)
>>>
>>>
>>> I'd like to start voting on this AIP next week (probably on Tuesday) as
>>> I think this will be a powerful feature that eases confusing to new users.
>>>
>>> -Ash
>>>
>>>
>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com> wrote:
>>>
>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling
>>> To Airflow"?
>>> I think streaming was also discussed there (though it wasn't really the
>>> use case).
>>>
>>>
>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>>>
>>> Hi Kevin,
>>>
>>> Interesting idea. My original idea was actually for "interval-less DAGs"
>>> (i.e. ones where it's just "run at this time") would not have
>>> data_interval_start or end, but (while drafting the AIP) we decided that it
>>> was probably "easier" if those values were always datetimes.
>>>
>>> That said, I think having the DB model have those values be nullable
>>> would future proof it without needing another migration to change it. Do
>>> you think this is worth doing now?
>>>
>>> I haven't (yet! It's on my list) spent any significant time thinking
>>> about how to make Airflow play nicely with streaming jobs. If anyone else
>>> has ideas here please share them
>>>
>>> -ash
>>>
>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com> wrote:
>>>
>>> Hi Ash and James,
>>>
>>> This is an exciting move. What do you think about using this opportunity
>>> to extend Airflow's support to streaming like use cases? I.e DAGs/tasks
>>> that want to run forever like a service. For such use cases, schedule
>>> interval might not be meaningful, then do we want to make the date interval
>>> param optional to DagRun and task instances? That sounds like a pretty
>>> major change to the underlying model of Airflow, but this AIP is so far the
>>> best opportunity I saw that can level up Airflow's support for
>>> streaming/service use cases.
>>>
>>>
>>> Cheers,
>>> Kevin Y
>>>
>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dp...@gmail.com>
>>> wrote:
>>>
>>> Very excited to see this proposal come through and love the direction
>>> this has gone.
>>>
>>> Couple comments...
>>>
>>> *Tree view / Data completeness view*
>>>
>>> When you design your tasks with the canonical idempotence pattern, the
>>> tree view shows you both data completeness and task execution history
>>> (success / failure etc).
>>>
>>> When you don't use that pattern (which is my general preference), tree
>>> view is only task execution history.
>>>
>>> This change has the potential to unlock a data completeness view for
>>> canonical tasks.  It's possible that the "data completeness view" can
>>> simply be the tree view.  I.e. somehow it can use these new classes to know
>>> what data was successfully filled and what data wasn't.
>>>
>>> To the extent we like the idea of either extending / plugging /
>>> modifying tree view, or adding a distinct data completeness view, we might
>>> want to anticipate the needs of that in this change.  And maybe no
>>> alteration to the proposal would be needed but just want to throw the idea
>>> out there.
>>>
>>> *Watermark workflow / incremental processing*
>>>
>>> A common pattern in data warehousing is pulling data incrementally from
>>> a source.
>>>
>>> A standard way to achieve this is at the start of the task, select max
>>> `updated_at` in source table and hold on to that value for a minute.  This
>>> is your tentative new high watermark.
>>> Now it's time to pull your data.  If your task ran before, grab last
>>> high watermark.  If not, use initial load value.
>>> If successful, update high watermark.
>>>
>>> On my team we implemented this with a stateful tasks / stateful
>>> processes concept (there's a dormant draft AIP here
>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>> and a WatermarkOperator that handled the boilerplate*.
>>>
>>> Again here, I don't have a specific suggestion at this moment.  But I
>>> wanted to articulate this workflow because it is common and it wasn't
>>> immediately obvious to me in reading AIP-39 how I would use it to implement
>>> it.
>>>
>>> AIP-39 makes airflow more data-aware.  So if it can support this kind of
>>> workflow that's great.  @Ash Berlin-Taylor <as...@astronomer.io> do you
>>> have thoughts on how it might be compatible with this kind of thing as is?
>>>
>>> ---
>>>
>>> * The base operator is designed so that Subclasses only need to
>>> implement two methods:
>>>     - `get_high_watermark`: produce the tentative new high watermark
>>>     ' `watermark_execute`: analogous to implementing poke in a sensor,
>>> this is where your work is done. `execute` is left to the base class, and
>>> it orchestrates (1) getting last high watermark or inital load value and
>>> (2) updating new high watermark if job successful.
>>>
>>>

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by James Timmins <ja...@astronomer.io.INVALID>.
Respectfully, I strongly disagree with the renaming of Timetable to
Schedule. Schedule and Scheduler aren't meaningfully different, which can
lead to a lot of confusion. Even as a native English speaker, and someone
who works on Airflow full time, I routinely need to ask for clarification
about what schedule-related concept someone is referring to. I foresee
Schedule and Scheduler as two distinct yet closely related concepts
becoming a major source of confusion.

If folks dislike Timetable, we could certainly change to something else,
but let's not use something so similar to existing Airflow classes.

-James

On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <as...@apache.org> wrote:

> Summary of changes so far on the AIP:
>
> My proposed rename of DagRun.execution_date is now DagRun.schedule_date
> (previously I had proposed run_date. Thanks dstandish!)
>
> Timetable classes are renamed to Schedule classes (CronSchedule etc),
> similarly the DAG argument is now schedule (reminder: schedule_interval
> will not be removed or deprecated, and will still be the way to use
> "simple" expressions)
>
> -ash
>
> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org> wrote:
>
> Could change Timetable To Schedule -- that would mean the DAG arg becomes
> `schedule=CronSchedule(...)` -- a bit close to the current
> `schedule_interval` but I think clear enough difference.
>
> I do like the name but my one worry with "schedule" is that Scheduler and
> Schedule are very similar, and might be be confused with each other for
> non-native English speakers? (I defer to others' judgment here, as this is
> not something I can experience myself.)
>
> @Kevin Yang <yr...@gmail.com> @Daniel Standish <dp...@gmail.com> any
> final input on this AIP?
>
>
>
> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com> wrote:
>
> Hi Ash and all,
>
>
> What do people think of this? Worth it? Too complex to reason about what
>> context variables might exist as a result?
>
>
> I think I wouldn't worry about it right now or maybe not as part of this
> AIP. Currently, in one of the Github Issue, a user mentioned that it is not
> straightforward to know what is inside the context dictionary-
> https://github.com/apache/airflow/issues/14396. So maybe we can tackle
> this issue separately once the AbstractTimetable is built.
>
> Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs
>> "DataTimetable") have data_interval_start and end available in the context?
>
>
> hmm.. I would say No but then it contradicts my suggestion to remove
> context dict from this AIP. If we are going to use it in scheduler then
> yes, where data_interval_start = data_interval_end from CronTimetable.
>
> Does anyone have any better names than TimeDeltaTimetable, DataTimetable,
>> and CronTimetable? (We can probably change these names right up until
>> release, so not important to get this correct *now*.)
>
>
> No strong opinion here. Just an alternate suggestion can
> be TimeDeltaSchedule, DataSchedule and CronSchedule
>
>
>> Should I try to roll AIP-30
>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
>> to this, or should we make that a future addition? (My vote is for future
>> addition)
>
>
> I would vote for Future addition too.
>
> Regards,
> Kaxil
>
> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org> wrote:
>
>> I think, yes, AIP-35 or something like it would happily co-exist with
>> this proposal.
>>
>> @Daniel <dp...@gmail.com> and I have been discussing this a bit on
>> Slack, and one of the questions he asked was if the concept of
>> data_interval should be moved from DagRun as James and I suggested down on
>> to the individual task:
>>
>> suppose i have a new dag hitting 5 api endpoints and pulling data to s3.
>> suppose that yesterday 4 of them succeeded but one failed. today, 4 of them
>> should pull from yesterday. but the one that failed should pull from 2 days
>> back. so even though these normally have the same interval, today they
>> should not.
>>
>>
>> My view on this is two fold: one, this should primarily be handled by
>> retries on the task, and secondly, having different TaskIstances in the
>> same DagRun  have different data intervals would be much harder to reason
>> about/design the UI around, so for those reasons I still think interval
>> should be a DagRun-level concept.
>>
>> (He has a stalled AIP-30 where he proposed something to address this kind
>> of "watermark" case, which we might pick up next after this AIP is complete)
>>
>> One thing we might want to do is extend the interface of
>> AbstractTimetable to be able to add/update parameters in the context dict,
>> so the interface could become this:
>>
>> class AbstractTimetable(ABC):
>>     @abstractmethod
>>     def next_dagrun_info(
>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>
>>         session: Session,
>>     ) -> Optional[DagRunInfo]:
>>         """
>>         Get information about the next DagRun of this dag after
>> ``date_last_automated_dagrun`` -- the
>>         execution date, and the earliest it could be scheduled
>>
>>         :param date_last_automated_dagrun: The max(execution_date) of
>> existing
>>             "automated" DagRuns for this dag (scheduled or backfill, but
>> not
>>             manual)
>>         """
>>
>>     @abstractmethod
>>     def set_context_variables(self, dagrun: DagRun, context: Dict[str,
>> Any]) -> None:
>>         """
>>         Update or set new context variables to become available in task
>> templates and operators.
>>         """
>>
>>
>> What do people think of this? Worth it? Too complex to reason about what
>> context variables might exist as a result?
>>
>> *Outstanding question*:
>>
>>    - Should "interval-less DAGs" (ones using "CronTimetable" in my
>>    proposal vs "DataTimetable") have data_interval_start and end available in
>>    the context?
>>    - Does anyone have any better names than TimeDeltaTimetable,
>>    DataTimetable, and CronTimetable? (We can probably change these names right
>>    up until release, so not important to get this correct *now*.)
>>    - Should I try to roll AIP-30
>>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>    in to this, or should we make that a future addition? (My vote is for
>>    future addition)
>>
>>
>> I'd like to start voting on this AIP next week (probably on Tuesday) as I
>> think this will be a powerful feature that eases confusing to new users.
>>
>> -Ash
>>
>>
>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com> wrote:
>>
>> Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling To
>> Airflow"?
>> I think streaming was also discussed there (though it wasn't really the
>> use case).
>>
>>
>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>>
>> Hi Kevin,
>>
>> Interesting idea. My original idea was actually for "interval-less DAGs"
>> (i.e. ones where it's just "run at this time") would not have
>> data_interval_start or end, but (while drafting the AIP) we decided that it
>> was probably "easier" if those values were always datetimes.
>>
>> That said, I think having the DB model have those values be nullable
>> would future proof it without needing another migration to change it. Do
>> you think this is worth doing now?
>>
>> I haven't (yet! It's on my list) spent any significant time thinking
>> about how to make Airflow play nicely with streaming jobs. If anyone else
>> has ideas here please share them
>>
>> -ash
>>
>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com> wrote:
>>
>> Hi Ash and James,
>>
>> This is an exciting move. What do you think about using this opportunity
>> to extend Airflow's support to streaming like use cases? I.e DAGs/tasks
>> that want to run forever like a service. For such use cases, schedule
>> interval might not be meaningful, then do we want to make the date interval
>> param optional to DagRun and task instances? That sounds like a pretty
>> major change to the underlying model of Airflow, but this AIP is so far the
>> best opportunity I saw that can level up Airflow's support for
>> streaming/service use cases.
>>
>>
>> Cheers,
>> Kevin Y
>>
>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dp...@gmail.com>
>> wrote:
>>
>> Very excited to see this proposal come through and love the direction
>> this has gone.
>>
>> Couple comments...
>>
>> *Tree view / Data completeness view*
>>
>> When you design your tasks with the canonical idempotence pattern, the
>> tree view shows you both data completeness and task execution history
>> (success / failure etc).
>>
>> When you don't use that pattern (which is my general preference), tree
>> view is only task execution history.
>>
>> This change has the potential to unlock a data completeness view for
>> canonical tasks.  It's possible that the "data completeness view" can
>> simply be the tree view.  I.e. somehow it can use these new classes to know
>> what data was successfully filled and what data wasn't.
>>
>> To the extent we like the idea of either extending / plugging / modifying
>> tree view, or adding a distinct data completeness view, we might want to
>> anticipate the needs of that in this change.  And maybe no alteration to
>> the proposal would be needed but just want to throw the idea out there.
>>
>> *Watermark workflow / incremental processing*
>>
>> A common pattern in data warehousing is pulling data incrementally from a
>> source.
>>
>> A standard way to achieve this is at the start of the task, select max
>> `updated_at` in source table and hold on to that value for a minute.  This
>> is your tentative new high watermark.
>> Now it's time to pull your data.  If your task ran before, grab last high
>> watermark.  If not, use initial load value.
>> If successful, update high watermark.
>>
>> On my team we implemented this with a stateful tasks / stateful processes
>> concept (there's a dormant draft AIP here
>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>> and a WatermarkOperator that handled the boilerplate*.
>>
>> Again here, I don't have a specific suggestion at this moment.  But I
>> wanted to articulate this workflow because it is common and it wasn't
>> immediately obvious to me in reading AIP-39 how I would use it to implement
>> it.
>>
>> AIP-39 makes airflow more data-aware.  So if it can support this kind of
>> workflow that's great.  @Ash Berlin-Taylor <as...@astronomer.io> do you
>> have thoughts on how it might be compatible with this kind of thing as is?
>>
>> ---
>>
>> * The base operator is designed so that Subclasses only need to implement
>> two methods:
>>     - `get_high_watermark`: produce the tentative new high watermark
>>     ' `watermark_execute`: analogous to implementing poke in a sensor,
>> this is where your work is done. `execute` is left to the base class, and
>> it orchestrates (1) getting last high watermark or inital load value and
>> (2) updating new high watermark if job successful.
>>
>>

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Ash Berlin-Taylor <as...@apache.org>.
Summary of changes so far on the AIP:

My proposed rename of DagRun.execution_date is now DagRun.schedule_date 
(previously I had proposed run_date. Thanks dstandish!)

Timetable classes are renamed to Schedule classes (CronSchedule etc), 
similarly the DAG argument is now schedule (reminder: schedule_interval 
will not be removed or deprecated, and will still be the way to use 
"simple" expressions)

-ash

On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <as...@apache.org> wrote:
> Could change Timetable To Schedule -- that would mean the DAG arg 
> becomes `schedule=CronSchedule(...)` -- a bit close to the current 
> `schedule_interval` but I think clear enough difference.
> 
> I do like the name but my one worry with "schedule" is that Scheduler 
> and Schedule are very similar, and might be be confused with each 
> other for non-native English speakers? (I defer to others' judgment 
> here, as this is not something I can experience myself.)
> 
> @Kevin Yang <ma...@gmail.com> @Daniel Standish 
> <ma...@gmail.com> any final input on this AIP?
> 
> 
> 
> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com> wrote:
>> Hi Ash and all,
>> 
>> 
>>> What do people think of this? Worth it? Too complex to reason about 
>>> what context variables might exist as a result?
>> 
>> I think I wouldn't worry about it right now or maybe not as part of 
>> this AIP. Currently, in one of the Github Issue, a user mentioned 
>> that it is not straightforward to know what is inside the context 
>> dictionary- <https://github.com/apache/airflow/issues/14396>. So 
>> maybe we can tackle this issue separately once the AbstractTimetable 
>> is built.
>> 
>>> Should "interval-less DAGs" (ones using "CronTimetable" in my 
>>> proposal vs "DataTimetable") have data_interval_start and end 
>>> available in the context?
>> 
>> hmm.. I would say No but then it contradicts my suggestion to remove 
>> context dict from this AIP. If we are going to use it in scheduler 
>> then yes, where data_interval_start = data_interval_end from 
>> CronTimetable.
>> 
>>> Does anyone have any better names than TimeDeltaTimetable, 
>>> DataTimetable, and CronTimetable? (We can probably change these 
>>> names right up until release, so not important to get this correct 
>>> /now/.)
>> 
>> No strong opinion here. Just an alternate suggestion can be 
>> TimeDeltaSchedule, DataSchedule and CronSchedule
>> 
>>> Should I try to roll AIP-30 
>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> 
>>> in to this, or should we make that a future addition? (My vote is 
>>> for future addition)
>> 
>> I would vote for Future addition too.
>> 
>> Regards,
>> Kaxil
>> 
>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <ash@apache.org 
>> <ma...@apache.org>> wrote:
>>> I think, yes, AIP-35 or something like it would happily co-exist 
>>> with this proposal.
>>> 
>>> @Daniel <ma...@gmail.com> and I have been discussing 
>>> this a bit on Slack, and one of the questions he asked was if the 
>>> concept of data_interval should be moved from DagRun as James and I 
>>> suggested down on to the individual task:
>>> 
>>>> suppose i have a new dag hitting 5 api endpoints and pulling data 
>>>> to s3. suppose that yesterday 4 of them succeeded but one failed. 
>>>> today, 4 of them should pull from yesterday. but the one that 
>>>> failed should pull from 2 days back. so even though these normally 
>>>> have the same interval, today they should not.
>>> 
>>> My view on this is two fold: one, this should primarily be handled 
>>> by retries on the task, and secondly, having different TaskIstances 
>>> in the same DagRun  have different data intervals would be much 
>>> harder to reason about/design the UI around, so for those reasons I 
>>> still think interval should be a DagRun-level concept.
>>> 
>>> (He has a stalled AIP-30 where he proposed something to address 
>>> this kind of "watermark" case, which we might pick up next after 
>>> this AIP is complete)
>>> 
>>> One thing we might want to do is extend the interface of 
>>> AbstractTimetable to be able to add/update parameters in the 
>>> context dict, so the interface could become this:
>>> 
>>> class AbstractTimetable(ABC):
>>>     @abstractmethod
>>>     def next_dagrun_info(
>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>> 
>>>         session: Session,
>>>     )-> Optional[DagRunInfo]:
>>>         """
>>>         Get information about the next DagRun of this dag after 
>>> ``date_last_automated_dagrun`` -- the
>>>         execution date, and the earliest it could be scheduled
>>> 
>>>         :param date_last_automated_dagrun: The max(execution_date) 
>>> of existing
>>>             "automated" DagRuns for this dag (scheduled or 
>>> backfill, but not
>>>             manual)
>>>         """
>>> 
>>>     @abstractmethod
>>>     def set_context_variables(self, dagrun: DagRun, 
>>> context:Dict[str,Any])->None:
>>>         """
>>>         Update or set new context variables to become available in 
>>> task templates and operators.
>>>         """
>>> 
>>> 
>>> What do people think of this? Worth it? Too complex to reason about 
>>> what context variables might exist as a result?
>>> 
>>> *Outstanding question*:
>>> Should "interval-less DAGs" (ones using "CronTimetable" in my 
>>> proposal vs "DataTimetable") have data_interval_start and end 
>>> available in the context?Does anyone have any better names than 
>>> TimeDeltaTimetable, DataTimetable, and CronTimetable? (We can 
>>> probably change these names right up until release, so not 
>>> important to get this correct /now/.)Should I try to roll AIP-30 
>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> 
>>> in to this, or should we make that a future addition? (My vote is 
>>> for future addition)
>>> 
>>> I'd like to start voting on this AIP next week (probably on 
>>> Tuesday) as I think this will be a powerful feature that eases 
>>> confusing to new users.
>>> 
>>> -Ash
>>> 
>>> 
>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <alexinhert@yandex.com 
>>> <ma...@yandex.com>> wrote:
>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based 
>>>> Scheduling To Airflow"?
>>>> I think streaming was also discussed there (though it wasn't 
>>>> really the use case).
>>>> 
>>>> 
>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <ash@apache.org 
>>>> <ma...@apache.org>>:
>>>>> Hi Kevin,
>>>>> 
>>>>> Interesting idea. My original idea was actually for 
>>>>> "interval-less DAGs" (i.e. ones where it's just "run at this 
>>>>> time") would not have data_interval_start or end, but (while 
>>>>> drafting the AIP) we decided that it was probably "easier" if 
>>>>> those values were always datetimes.
>>>>> 
>>>>> That said, I think having the DB model have those values be 
>>>>> nullable would future proof it without needing another migration 
>>>>> to change it. Do you think this is worth doing now?
>>>>> 
>>>>> I haven't (yet! It's on my list) spent any significant time 
>>>>> thinking about how to make Airflow play nicely with streaming 
>>>>> jobs. If anyone else has ideas here please share them
>>>>> 
>>>>> -ash
>>>>> 
>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yrqls21@gmail.com 
>>>>> <ma...@gmail.com>> wrote:
>>>>>> Hi Ash and James,
>>>>>> 
>>>>>> This is an exciting move. What do you think about using this 
>>>>>> opportunity to extend Airflow's support to streaming like use 
>>>>>> cases? I.e DAGs/tasks that want to run forever like a service. 
>>>>>> For such use cases, schedule interval might not be meaningful, 
>>>>>> then do we want to make the date interval param optional to 
>>>>>> DagRun and task instances? That sounds like a pretty major 
>>>>>> change to the underlying model of Airflow, but this AIP is so 
>>>>>> far the best opportunity I saw that can level up Airflow's 
>>>>>> support for streaming/service use cases.
>>>>>> 
>>>>>> 
>>>>>> Cheers,
>>>>>> Kevin Y
>>>>>> 
>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish 
>>>>>> <dpstandish@gmail.com <ma...@gmail.com>> wrote:
>>>>>>> Very excited to see this proposal come through and love the 
>>>>>>> direction this has gone.
>>>>>>> 
>>>>>>> Couple comments...
>>>>>>> 
>>>>>>> *Tree view / Data completeness view*
>>>>>>> 
>>>>>>> When you design your tasks with the canonical idempotence 
>>>>>>> pattern, the tree view shows you both data completeness and 
>>>>>>> task execution history (success / failure etc).
>>>>>>> 
>>>>>>> When you don't use that pattern (which is my general 
>>>>>>> preference), tree view is only task execution history.
>>>>>>> 
>>>>>>> This change has the potential to unlock a data completeness 
>>>>>>> view for canonical tasks.  It's possible that the "data 
>>>>>>> completeness view" can simply be the tree view.  I.e. somehow 
>>>>>>> it can use these new classes to know what data was successfully 
>>>>>>> filled and what data wasn't.
>>>>>>> 
>>>>>>> To the extent we like the idea of either extending / plugging / 
>>>>>>> modifying tree view, or adding a distinct data completeness 
>>>>>>> view, we might want to anticipate the needs of that in this 
>>>>>>> change.  And maybe no alteration to the proposal would be 
>>>>>>> needed but just want to throw the idea out there.
>>>>>>> 
>>>>>>> *Watermark workflow / incremental processing*
>>>>>>> 
>>>>>>> A common pattern in data warehousing is pulling data 
>>>>>>> incrementally from a source.
>>>>>>> 
>>>>>>> A standard way to achieve this is at the start of the task, 
>>>>>>> select max `updated_at` in source table and hold on to that 
>>>>>>> value for a minute.  This is your tentative new high watermark.
>>>>>>> Now it's time to pull your data.  If your task ran before, grab 
>>>>>>> last high watermark.  If not, use initial load value.
>>>>>>> If successful, update high watermark.
>>>>>>> 
>>>>>>> On my team we implemented this with a stateful tasks / stateful 
>>>>>>> processes concept (there's a dormant draft AIP here 
>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) 
>>>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>>> 
>>>>>>> Again here, I don't have a specific suggestion at this moment.  
>>>>>>> But I wanted to articulate this workflow because it is common 
>>>>>>> and it wasn't immediately obvious to me in reading AIP-39 how I 
>>>>>>> would use it to implement it.
>>>>>>> 
>>>>>>> AIP-39 makes airflow more data-aware.  So if it can support 
>>>>>>> this kind of workflow that's great.  @Ash Berlin-Taylor 
>>>>>>> <ma...@astronomer.io> do you have thoughts on how it might 
>>>>>>> be compatible with this kind of thing as is?
>>>>>>> 
>>>>>>> ---
>>>>>>> 
>>>>>>> * The base operator is designed so that Subclasses only need to 
>>>>>>> implement two methods:
>>>>>>>     - `get_high_watermark`: produce the tentative new high 
>>>>>>> watermark
>>>>>>>     ' `watermark_execute`: analogous to implementing poke in a 
>>>>>>> sensor, this is where your work is done. `execute` is left to 
>>>>>>> the base class, and it orchestrates (1) getting last high 
>>>>>>> watermark or inital load value and (2) updating new high 
>>>>>>> watermark if job successful.
>>>>>>> 


Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Ash Berlin-Taylor <as...@apache.org>.
Could change Timetable To Schedule -- that would mean the DAG arg 
becomes `schedule=CronSchedule(...)` -- a bit close to the current 
`schedule_interval` but I think clear enough difference.

I do like the name but my one worry with "schedule" is that Scheduler 
and Schedule are very similar, and might be be confused with each other 
for non-native English speakers? (I defer to others' judgment here, as 
this is not something I can experience myself.)

@Kevin Yang <ma...@gmail.com> @Daniel Standish 
<ma...@gmail.com> any final input on this AIP?



On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <ka...@gmail.com> wrote:
> Hi Ash and all,
> 
> 
>> What do people think of this? Worth it? Too complex to reason about 
>> what context variables might exist as a result?
> 
> I think I wouldn't worry about it right now or maybe not as part of 
> this AIP. Currently, in one of the Github Issue, a user mentioned 
> that it is not straightforward to know what is inside the context 
> dictionary- <https://github.com/apache/airflow/issues/14396>. So 
> maybe we can tackle this issue separately once the AbstractTimetable 
> is built.
> 
>> Should "interval-less DAGs" (ones using "CronTimetable" in my 
>> proposal vs "DataTimetable") have data_interval_start and end 
>> available in the context?
> 
> hmm.. I would say No but then it contradicts my suggestion to remove 
> context dict from this AIP. If we are going to use it in scheduler 
> then yes, where data_interval_start = data_interval_end from 
> CronTimetable.
> 
>> Does anyone have any better names than TimeDeltaTimetable, 
>> DataTimetable, and CronTimetable? (We can probably change these 
>> names right up until release, so not important to get this correct 
>> /now/.)
> 
> No strong opinion here. Just an alternate suggestion can be 
> TimeDeltaSchedule, DataSchedule and CronSchedule
> 
>> Should I try to roll AIP-30 
>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> 
>> in to this, or should we make that a future addition? (My vote is 
>> for future addition)
> 
> I would vote for Future addition too.
> 
> Regards,
> Kaxil
> 
> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <ash@apache.org 
> <ma...@apache.org>> wrote:
>> I think, yes, AIP-35 or something like it would happily co-exist 
>> with this proposal.
>> 
>> @Daniel <ma...@gmail.com> and I have been discussing 
>> this a bit on Slack, and one of the questions he asked was if the 
>> concept of data_interval should be moved from DagRun as James and I 
>> suggested down on to the individual task:
>> 
>>> suppose i have a new dag hitting 5 api endpoints and pulling data 
>>> to s3. suppose that yesterday 4 of them succeeded but one failed. 
>>> today, 4 of them should pull from yesterday. but the one that 
>>> failed should pull from 2 days back. so even though these normally 
>>> have the same interval, today they should not.
>> 
>> My view on this is two fold: one, this should primarily be handled 
>> by retries on the task, and secondly, having different TaskIstances 
>> in the same DagRun  have different data intervals would be much 
>> harder to reason about/design the UI around, so for those reasons I 
>> still think interval should be a DagRun-level concept.
>> 
>> (He has a stalled AIP-30 where he proposed something to address this 
>> kind of "watermark" case, which we might pick up next after this AIP 
>> is complete)
>> 
>> One thing we might want to do is extend the interface of 
>> AbstractTimetable to be able to add/update parameters in the context 
>> dict, so the interface could become this:
>> 
>> class AbstractTimetable(ABC):
>>     @abstractmethod
>>     def next_dagrun_info(
>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>> 
>>         session: Session,
>>     )-> Optional[DagRunInfo]:
>>         """
>>         Get information about the next DagRun of this dag after 
>> ``date_last_automated_dagrun`` -- the
>>         execution date, and the earliest it could be scheduled
>> 
>>         :param date_last_automated_dagrun: The max(execution_date) 
>> of existing
>>             "automated" DagRuns for this dag (scheduled or backfill, 
>> but not
>>             manual)
>>         """
>> 
>>     @abstractmethod
>>     def set_context_variables(self, dagrun: DagRun, 
>> context:Dict[str,Any])->None:
>>         """
>>         Update or set new context variables to become available in 
>> task templates and operators.
>>         """
>> 
>> 
>> What do people think of this? Worth it? Too complex to reason about 
>> what context variables might exist as a result?
>> 
>> *Outstanding question*:
>> Should "interval-less DAGs" (ones using "CronTimetable" in my 
>> proposal vs "DataTimetable") have data_interval_start and end 
>> available in the context?Does anyone have any better names than 
>> TimeDeltaTimetable, DataTimetable, and CronTimetable? (We can 
>> probably change these names right up until release, so not important 
>> to get this correct /now/.)Should I try to roll AIP-30 
>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> 
>> in to this, or should we make that a future addition? (My vote is 
>> for future addition)
>> 
>> I'd like to start voting on this AIP next week (probably on Tuesday) 
>> as I think this will be a powerful feature that eases confusing to 
>> new users.
>> 
>> -Ash
>> 
>> 
>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <alexinhert@yandex.com 
>> <ma...@yandex.com>> wrote:
>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based 
>>> Scheduling To Airflow"?
>>> I think streaming was also discussed there (though it wasn't really 
>>> the use case).
>>> 
>>> 
>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <ash@apache.org 
>>> <ma...@apache.org>>:
>>>> Hi Kevin,
>>>> 
>>>> Interesting idea. My original idea was actually for "interval-less 
>>>> DAGs" (i.e. ones where it's just "run at this time") would not 
>>>> have data_interval_start or end, but (while drafting the AIP) we 
>>>> decided that it was probably "easier" if those values were always 
>>>> datetimes.
>>>> 
>>>> That said, I think having the DB model have those values be 
>>>> nullable would future proof it without needing another migration 
>>>> to change it. Do you think this is worth doing now?
>>>> 
>>>> I haven't (yet! It's on my list) spent any significant time 
>>>> thinking about how to make Airflow play nicely with streaming 
>>>> jobs. If anyone else has ideas here please share them
>>>> 
>>>> -ash
>>>> 
>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yrqls21@gmail.com 
>>>> <ma...@gmail.com>> wrote:
>>>>> Hi Ash and James,
>>>>> 
>>>>> This is an exciting move. What do you think about using this 
>>>>> opportunity to extend Airflow's support to streaming like use 
>>>>> cases? I.e DAGs/tasks that want to run forever like a service. 
>>>>> For such use cases, schedule interval might not be meaningful, 
>>>>> then do we want to make the date interval param optional to 
>>>>> DagRun and task instances? That sounds like a pretty major change 
>>>>> to the underlying model of Airflow, but this AIP is so far the 
>>>>> best opportunity I saw that can level up Airflow's support for 
>>>>> streaming/service use cases.
>>>>> 
>>>>> 
>>>>> Cheers,
>>>>> Kevin Y
>>>>> 
>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish 
>>>>> <dpstandish@gmail.com <ma...@gmail.com>> wrote:
>>>>>> Very excited to see this proposal come through and love the 
>>>>>> direction this has gone.
>>>>>> 
>>>>>> Couple comments...
>>>>>> 
>>>>>> *Tree view / Data completeness view*
>>>>>> 
>>>>>> When you design your tasks with the canonical idempotence 
>>>>>> pattern, the tree view shows you both data completeness and task 
>>>>>> execution history (success / failure etc).
>>>>>> 
>>>>>> When you don't use that pattern (which is my general 
>>>>>> preference), tree view is only task execution history.
>>>>>> 
>>>>>> This change has the potential to unlock a data completeness view 
>>>>>> for canonical tasks.  It's possible that the "data completeness 
>>>>>> view" can simply be the tree view.  I.e. somehow it can use 
>>>>>> these new classes to know what data was successfully filled and 
>>>>>> what data wasn't.
>>>>>> 
>>>>>> To the extent we like the idea of either extending / plugging / 
>>>>>> modifying tree view, or adding a distinct data completeness 
>>>>>> view, we might want to anticipate the needs of that in this 
>>>>>> change.  And maybe no alteration to the proposal would be needed 
>>>>>> but just want to throw the idea out there.
>>>>>> 
>>>>>> *Watermark workflow / incremental processing*
>>>>>> 
>>>>>> A common pattern in data warehousing is pulling data 
>>>>>> incrementally from a source.
>>>>>> 
>>>>>> A standard way to achieve this is at the start of the task, 
>>>>>> select max `updated_at` in source table and hold on to that 
>>>>>> value for a minute.  This is your tentative new high watermark.
>>>>>> Now it's time to pull your data.  If your task ran before, grab 
>>>>>> last high watermark.  If not, use initial load value.
>>>>>> If successful, update high watermark.
>>>>>> 
>>>>>> On my team we implemented this with a stateful tasks / stateful 
>>>>>> processes concept (there's a dormant draft AIP here 
>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) 
>>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>> 
>>>>>> Again here, I don't have a specific suggestion at this moment.  
>>>>>> But I wanted to articulate this workflow because it is common 
>>>>>> and it wasn't immediately obvious to me in reading AIP-39 how I 
>>>>>> would use it to implement it.
>>>>>> 
>>>>>> AIP-39 makes airflow more data-aware.  So if it can support this 
>>>>>> kind of workflow that's great.  @Ash Berlin-Taylor 
>>>>>> <ma...@astronomer.io> do you have thoughts on how it might 
>>>>>> be compatible with this kind of thing as is?
>>>>>> 
>>>>>> ---
>>>>>> 
>>>>>> * The base operator is designed so that Subclasses only need to 
>>>>>> implement two methods:
>>>>>>     - `get_high_watermark`: produce the tentative new high 
>>>>>> watermark
>>>>>>     ' `watermark_execute`: analogous to implementing poke in a 
>>>>>> sensor, this is where your work is done. `execute` is left to 
>>>>>> the base class, and it orchestrates (1) getting last high 
>>>>>> watermark or inital load value and (2) updating new high 
>>>>>> watermark if job successful.
>>>>>> 


Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Kaxil Naik <ka...@gmail.com>.
Hi Ash and all,


What do people think of this? Worth it? Too complex to reason about what
> context variables might exist as a result?


I think I wouldn't worry about it right now or maybe not as part of this
AIP. Currently, in one of the Github Issue, a user mentioned that it is not
straightforward to know what is inside the context dictionary-
https://github.com/apache/airflow/issues/14396. So maybe we can tackle this
issue separately once the AbstractTimetable is built.

Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs
> "DataTimetable") have data_interval_start and end available in the context?


hmm.. I would say No but then it contradicts my suggestion to remove
context dict from this AIP. If we are going to use it in scheduler then
yes, where data_interval_start = data_interval_end from CronTimetable.

Does anyone have any better names than TimeDeltaTimetable, DataTimetable,
> and CronTimetable? (We can probably change these names right up until
> release, so not important to get this correct *now*.)


No strong opinion here. Just an alternate suggestion can
be TimeDeltaSchedule, DataSchedule and CronSchedule


> Should I try to roll AIP-30
> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> in
> to this, or should we make that a future addition? (My vote is for future
> addition)


I would vote for Future addition too.

Regards,
Kaxil

On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <as...@apache.org> wrote:

> I think, yes, AIP-35 or something like it would happily co-exist with this
> proposal.
>
> @Daniel <dp...@gmail.com> and I have been discussing this a bit on
> Slack, and one of the questions he asked was if the concept of
> data_interval should be moved from DagRun as James and I suggested down on
> to the individual task:
>
> suppose i have a new dag hitting 5 api endpoints and pulling data to s3.
> suppose that yesterday 4 of them succeeded but one failed. today, 4 of them
> should pull from yesterday. but the one that failed should pull from 2 days
> back. so even though these normally have the same interval, today they
> should not.
>
>
> My view on this is two fold: one, this should primarily be handled by
> retries on the task, and secondly, having different TaskIstances in the
> same DagRun  have different data intervals would be much harder to reason
> about/design the UI around, so for those reasons I still think interval
> should be a DagRun-level concept.
>
> (He has a stalled AIP-30 where he proposed something to address this kind
> of "watermark" case, which we might pick up next after this AIP is complete)
>
> One thing we might want to do is extend the interface of AbstractTimetable
> to be able to add/update parameters in the context dict, so the interface
> could become this:
>
> class AbstractTimetable(ABC):
>     @abstractmethod
>     def next_dagrun_info(
>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>
>         session: Session,
>     ) -> Optional[DagRunInfo]:
>         """
>         Get information about the next DagRun of this dag after
> ``date_last_automated_dagrun`` -- the
>         execution date, and the earliest it could be scheduled
>
>         :param date_last_automated_dagrun: The max(execution_date) of
> existing
>             "automated" DagRuns for this dag (scheduled or backfill, but
> not
>             manual)
>         """
>
>     @abstractmethod
>     def set_context_variables(self, dagrun: DagRun, context: Dict[str, Any])
> -> None:
>         """
>         Update or set new context variables to become available in task
> templates and operators.
>         """
>
>
> What do people think of this? Worth it? Too complex to reason about what
> context variables might exist as a result?
>
> *Outstanding question*:
>
>    - Should "interval-less DAGs" (ones using "CronTimetable" in my
>    proposal vs "DataTimetable") have data_interval_start and end available in
>    the context?
>    - Does anyone have any better names than TimeDeltaTimetable,
>    DataTimetable, and CronTimetable? (We can probably change these names right
>    up until release, so not important to get this correct *now*.)
>    - Should I try to roll AIP-30
>    <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>    in to this, or should we make that a future addition? (My vote is for
>    future addition)
>
>
> I'd like to start voting on this AIP next week (probably on Tuesday) as I
> think this will be a powerful feature that eases confusing to new users.
>
> -Ash
>
>
> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com> wrote:
>
> Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling To
> Airflow"?
> I think streaming was also discussed there (though it wasn't really the
> use case).
>
>
> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>
> Hi Kevin,
>
> Interesting idea. My original idea was actually for "interval-less DAGs"
> (i.e. ones where it's just "run at this time") would not have
> data_interval_start or end, but (while drafting the AIP) we decided that it
> was probably "easier" if those values were always datetimes.
>
> That said, I think having the DB model have those values be nullable would
> future proof it without needing another migration to change it. Do you
> think this is worth doing now?
>
> I haven't (yet! It's on my list) spent any significant time thinking about
> how to make Airflow play nicely with streaming jobs. If anyone else has
> ideas here please share them
>
> -ash
>
> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com> wrote:
>
> Hi Ash and James,
>
> This is an exciting move. What do you think about using this opportunity
> to extend Airflow's support to streaming like use cases? I.e DAGs/tasks
> that want to run forever like a service. For such use cases, schedule
> interval might not be meaningful, then do we want to make the date interval
> param optional to DagRun and task instances? That sounds like a pretty
> major change to the underlying model of Airflow, but this AIP is so far the
> best opportunity I saw that can level up Airflow's support for
> streaming/service use cases.
>
>
> Cheers,
> Kevin Y
>
> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dp...@gmail.com>
> wrote:
>
> Very excited to see this proposal come through and love the direction this
> has gone.
>
> Couple comments...
>
> *Tree view / Data completeness view*
>
> When you design your tasks with the canonical idempotence pattern, the
> tree view shows you both data completeness and task execution history
> (success / failure etc).
>
> When you don't use that pattern (which is my general preference), tree
> view is only task execution history.
>
> This change has the potential to unlock a data completeness view for
> canonical tasks.  It's possible that the "data completeness view" can
> simply be the tree view.  I.e. somehow it can use these new classes to know
> what data was successfully filled and what data wasn't.
>
> To the extent we like the idea of either extending / plugging / modifying
> tree view, or adding a distinct data completeness view, we might want to
> anticipate the needs of that in this change.  And maybe no alteration to
> the proposal would be needed but just want to throw the idea out there.
>
> *Watermark workflow / incremental processing*
>
> A common pattern in data warehousing is pulling data incrementally from a
> source.
>
> A standard way to achieve this is at the start of the task, select max
> `updated_at` in source table and hold on to that value for a minute.  This
> is your tentative new high watermark.
> Now it's time to pull your data.  If your task ran before, grab last high
> watermark.  If not, use initial load value.
> If successful, update high watermark.
>
> On my team we implemented this with a stateful tasks / stateful processes
> concept (there's a dormant draft AIP here
> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
> and a WatermarkOperator that handled the boilerplate*.
>
> Again here, I don't have a specific suggestion at this moment.  But I
> wanted to articulate this workflow because it is common and it wasn't
> immediately obvious to me in reading AIP-39 how I would use it to implement
> it.
>
> AIP-39 makes airflow more data-aware.  So if it can support this kind of
> workflow that's great.  @Ash Berlin-Taylor <as...@astronomer.io> do you
> have thoughts on how it might be compatible with this kind of thing as is?
>
> ---
>
> * The base operator is designed so that Subclasses only need to implement
> two methods:
>     - `get_high_watermark`: produce the tentative new high watermark
>     ' `watermark_execute`: analogous to implementing poke in a sensor,
> this is where your work is done. `execute` is left to the base class, and
> it orchestrates (1) getting last high watermark or inital load value and
> (2) updating new high watermark if job successful.
>
>

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Ash Berlin-Taylor <as...@apache.org>.
I think, yes, AIP-35 or something like it would happily co-exist with 
this proposal.

@Daniel <ma...@gmail.com> and I have been discussing this a 
bit on Slack, and one of the questions he asked was if the concept of 
data_interval should be moved from DagRun as James and I suggested down 
on to the individual task:

> suppose i have a new dag hitting 5 api endpoints and pulling data to 
> s3. suppose that yesterday 4 of them succeeded but one failed. today, 
> 4 of them should pull from yesterday. but the one that failed should 
> pull from 2 days back. so even though these normally have the same 
> interval, today they should not.

My view on this is two fold: one, this should primarily be handled by 
retries on the task, and secondly, having different TaskIstances in the 
same DagRun  have different data intervals would be much harder to 
reason about/design the UI around, so for those reasons I still think 
interval should be a DagRun-level concept.

(He has a stalled AIP-30 where he proposed something to address this 
kind of "watermark" case, which we might pick up next after this AIP is 
complete)

One thing we might want to do is extend the interface of 
AbstractTimetable to be able to add/update parameters in the context 
dict, so the interface could become this:

class AbstractTimetable(ABC):
    @abstractmethod
    def next_dagrun_info(
        date_last_automated_dagrun: Optional[pendulum.DateTime],

        session: Session,
    )-> Optional[DagRunInfo]:
        """
        Get information about the next DagRun of this dag after 
``date_last_automated_dagrun`` -- the
        execution date, and the earliest it could be scheduled

        :param date_last_automated_dagrun: The max(execution_date) of 
existing
            "automated" DagRuns for this dag (scheduled or backfill, 
but not
            manual)
        """

    @abstractmethod
    def set_context_variables(self, dagrun: DagRun, 
context:Dict[str,Any])->None:
        """
        Update or set new context variables to become available in task 
templates and operators.
        """


What do people think of this? Worth it? Too complex to reason about 
what context variables might exist as a result?

*Outstanding question*:
Should "interval-less DAGs" (ones using "CronTimetable" in my proposal 
vs "DataTimetable") have data_interval_start and end available in the 
context?Does anyone have any better names than TimeDeltaTimetable, 
DataTimetable, and CronTimetable? (We can probably change these names 
right up until release, so not important to get this correct 
/now/.)Should I try to roll AIP-30 
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> 
in to this, or should we make that a future addition? (My vote is for 
future addition)

I'd like to start voting on this AIP next week (probably on Tuesday) as 
I think this will be a powerful feature that eases confusing to new 
users.

-Ash


On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <al...@yandex.com> wrote:
> Is this AIP going to co-exist with AIP-35 "Add Signal Based 
> Scheduling To Airflow"?
> I think streaming was also discussed there (though it wasn't really 
> the use case).
> 
> 
> 02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:
>> Hi Kevin,
>> 
>> Interesting idea. My original idea was actually for "interval-less 
>> DAGs" (i.e. ones where it's just "run at this time") would not have 
>> data_interval_start or end, but (while drafting the AIP) we decided 
>> that it was probably "easier" if those values were always datetimes.
>> 
>> That said, I think having the DB model have those values be nullable 
>> would future proof it without needing another migration to change 
>> it. Do you think this is worth doing now?
>> 
>> I haven't (yet! It's on my list) spent any significant time thinking 
>> about how to make Airflow play nicely with streaming jobs. If anyone 
>> else has ideas here please share them
>> 
>> -ash
>> 
>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yrqls21@gmail.com 
>> <ma...@gmail.com>> wrote:
>>> Hi Ash and James,
>>> 
>>> This is an exciting move. What do you think about using this 
>>> opportunity to extend Airflow's support to streaming like use 
>>> cases? I.e DAGs/tasks that want to run forever like a service. For 
>>> such use cases, schedule interval might not be meaningful, then do 
>>> we want to make the date interval param optional to DagRun and task 
>>> instances? That sounds like a pretty major change to the underlying 
>>> model of Airflow, but this AIP is so far the best opportunity I saw 
>>> that can level up Airflow's support for streaming/service use cases.
>>> 
>>> 
>>> Cheers,
>>> Kevin Y
>>> 
>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish 
>>> <dpstandish@gmail.com <ma...@gmail.com>> wrote:
>>>> Very excited to see this proposal come through and love the 
>>>> direction this has gone.
>>>> 
>>>> Couple comments...
>>>> 
>>>> *Tree view / Data completeness view*
>>>> 
>>>> When you design your tasks with the canonical idempotence pattern, 
>>>> the tree view shows you both data completeness and task execution 
>>>> history (success / failure etc).
>>>> 
>>>> When you don't use that pattern (which is my general preference), 
>>>> tree view is only task execution history.
>>>> 
>>>> This change has the potential to unlock a data completeness view 
>>>> for canonical tasks.  It's possible that the "data completeness 
>>>> view" can simply be the tree view.  I.e. somehow it can use these 
>>>> new classes to know what data was successfully filled and what 
>>>> data wasn't.
>>>> 
>>>> To the extent we like the idea of either extending / plugging / 
>>>> modifying tree view, or adding a distinct data completeness view, 
>>>> we might want to anticipate the needs of that in this change.  And 
>>>> maybe no alteration to the proposal would be needed but just want 
>>>> to throw the idea out there.
>>>> 
>>>> *Watermark workflow / incremental processing*
>>>> 
>>>> A common pattern in data warehousing is pulling data incrementally 
>>>> from a source.
>>>> 
>>>> A standard way to achieve this is at the start of the task, select 
>>>> max `updated_at` in source table and hold on to that value for a 
>>>> minute.  This is your tentative new high watermark.
>>>> Now it's time to pull your data.  If your task ran before, grab 
>>>> last high watermark.  If not, use initial load value.
>>>> If successful, update high watermark.
>>>> 
>>>> On my team we implemented this with a stateful tasks / stateful 
>>>> processes concept (there's a dormant draft AIP here 
>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) 
>>>> and a WatermarkOperator that handled the boilerplate*.
>>>> 
>>>> Again here, I don't have a specific suggestion at this moment.  
>>>> But I wanted to articulate this workflow because it is common and 
>>>> it wasn't immediately obvious to me in reading AIP-39 how I would 
>>>> use it to implement it.
>>>> 
>>>> AIP-39 makes airflow more data-aware.  So if it can support this 
>>>> kind of workflow that's great.  @Ash Berlin-Taylor 
>>>> <ma...@astronomer.io> do you have thoughts on how it might be 
>>>> compatible with this kind of thing as is?
>>>> 
>>>> ---
>>>> 
>>>> * The base operator is designed so that Subclasses only need to 
>>>> implement two methods:
>>>>     - `get_high_watermark`: produce the tentative new high 
>>>> watermark
>>>>     ' `watermark_execute`: analogous to implementing poke in a 
>>>> sensor, this is where your work is done. `execute` is left to the 
>>>> base class, and it orchestrates (1) getting last high watermark or 
>>>> inital load value and (2) updating new high watermark if job 
>>>> successful.
>>>> 


Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Alex Inhert <al...@yandex.com>.
Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling To
Airflow"?  
I think streaming was also discussed there (though it wasn't really the use
case).

  

  

02.03.2021, 22:10, "Ash Berlin-Taylor" <as...@apache.org>:

> Hi Kevin,

>

>  
>

>

> Interesting idea. My original idea was actually for "interval-less DAGs"
(i.e. ones where it's just "run at this time") would not have
data_interval_start or end, but (while drafting the AIP) we decided that it
was probably "easier" if those values were always datetimes.

>

>  
>

>

> That said, I think having the DB model have those values be nullable would
future proof it without needing another migration to change it. Do you think
this is worth doing now?

>

>  
>

>

> I haven't (yet! It's on my list) spent any significant time thinking about
how to make Airflow play nicely with streaming jobs. If anyone else has ideas
here please share them

>

>  
>

>

> -ash

>

>  
> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang
<[yrqls21@gmail.com](mailto:yrqls21@gmail.com)> wrote:  
>

>

>> Hi Ash and James,

>>

>>  
>

>>

>> This is an exciting move. What do you think about using this opportunity to
extend Airflow's support to streaming like use cases? I.e DAGs/tasks that want
to run forever like a service. For such use cases, schedule interval might not
be meaningful, then do we want to make the date interval param optional to
DagRun and task instances? That sounds like a pretty major change to the
underlying model of Airflow, but this AIP is so far the best opportunity I saw
that can level up Airflow's support for streaming/service use cases.

>>

>>  
>

>>

>>  
>

>>

>> Cheers,

>>

>> Kevin Y

>>

>>  
>

>>

>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish
<[dpstandish@gmail.com](mailto:dpstandish@gmail.com)> wrote:  
>

>>

>>> Very excited to see this proposal come through and love the direction this
has gone.

>>>

>>>  
>

>>>

>>> Couple comments...

>>>

>>>  
>

>>>

>>>  **Tree view / Data completeness view**

>>>

>>>  
>

>>>

>>> When you design your tasks with the canonical idempotence pattern, the
tree view shows you both data completeness and task execution history (success
/ failure etc).

>>>

>>>  
>

>>>

>>> When you don't use that pattern (which is my general preference), tree
view is only task execution history.

>>>

>>>  
>

>>>

>>> This change has the potential to unlock a data completeness view for
canonical tasks. It's possible that the "data completeness view" can simply be
the tree view. I.e. somehow it can use these new classes to know what data was
successfully filled and what data wasn't.

>>>

>>>  
>

>>>

>>> To the extent we like the idea of either extending / plugging / modifying
tree view, or adding a distinct data completeness view, we might want to
anticipate the needs of that in this change. And maybe no alteration to the
proposal would be needed but just want to throw the idea out there.

>>>

>>>  
>

>>>

>>>  **Watermark workflow / incremental processing**

>>>

>>>  
>

>>>

>>> A common pattern in data warehousing is pulling data incrementally from a
source.

>>>

>>>  
>

>>>

>>> A standard way to achieve this is at the start of the task, select max
`updated_at` in source table and hold on to that value for a minute. This is
your tentative new high watermark.

>>>

>>> Now it's time to pull your data. If your task ran before, grab last high
watermark. If not, use initial load value.

>>>

>>> If successful, update high watermark.

>>>

>>>  
>

>>>

>>> On my team we implemented this with a stateful tasks / stateful processes
concept (there's a [dormant draft AIP
here](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence))
and a WatermarkOperator that handled the boilerplate*.

>>>

>>>  
>

>>>

>>> Again here, I don't have a specific suggestion at this moment. But I
wanted to articulate this workflow because it is common and it wasn't
immediately obvious to me in reading AIP-39 how I would use it to implement
it.

>>>

>>>  
>

>>>

>>> AIP-39 makes airflow more data-aware. So if it can support this kind of
workflow that's great. [@Ash Berlin-Taylor](mailto:ash@astronomer.io) do you
have thoughts on how it might be compatible with this kind of thing as is?

>>>

>>>  
>

>>>

>>> \---

>>>

>>>  
>

>>>

>>> * The base operator is designed so that Subclasses only need to implement
two methods:

>>>

>>> \- `get_high_watermark`: produce the tentative new high watermark

>>>

>>> ' `watermark_execute`: analogous to implementing poke in a sensor, this is
where your work is done. `execute` is left to the base class, and it
orchestrates (1) getting last high watermark or inital load value and (2)
updating new high watermark if job successful.

>>>

>>>  
>


Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Ash Berlin-Taylor <as...@apache.org>.
Hi Kevin,

Interesting idea. My original idea was actually for "interval-less 
DAGs" (i.e. ones where it's just "run at this time") would not have 
data_interval_start or end, but (while drafting the AIP) we decided 
that it was probably "easier" if those values were always datetimes.

That said, I think having the DB model have those values be nullable 
would future proof it without needing another migration to change it. 
Do you think this is worth doing now?

I haven't (yet! It's on my list) spent any significant time thinking 
about how to make Airflow play nicely with streaming jobs. If anyone 
else has ideas here please share them.

-ash

On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yr...@gmail.com> wrote:
> Hi Ash and James,
> 
> This is an exciting move. What do you think about using this 
> opportunity to extend Airflow's support to streaming like use cases? 
> I.e. DAGs/tasks that want to run forever like a service. For such use 
> cases, schedule interval might not be meaningful, then do we want to 
> make the date interval param optional to DagRun and task instances? 
> That sounds like a pretty major change to the underlying model of 
> Airflow, but this AIP is so far the best opportunity I saw that can 
> level up Airflow's support for streaming/service use cases.
> 
> 
> Cheers,
> Kevin Y
> 
> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dpstandish@gmail.com 
> <ma...@gmail.com>> wrote:
>> Very excited to see this proposal come through and love the 
>> direction this has gone.
>> 
>> Couple comments...
>> 
>> *Tree view / Data completeness view*
>> 
>> When you design your tasks with the canonical idempotence pattern, 
>> the tree view shows you both data completeness and task execution 
>> history (success / failure etc).
>> 
>> When you don't use that pattern (which is my general preference), 
>> tree view is only task execution history.
>> 
>> This change has the potential to unlock a data completeness view for 
>> canonical tasks.  It's possible that the "data completeness view" 
>> can simply be the tree view.  I.e. somehow it can use these new 
>> classes to know what data was successfully filled and what data 
>> wasn't.
>> 
>> To the extent we like the idea of either extending / plugging / 
>> modifying tree view, or adding a distinct data completeness view, we 
>> might want to anticipate the needs of that in this change.  And 
>> maybe no alteration to the proposal would be needed but just want to 
>> throw the idea out there.
>> 
>> *Watermark workflow / incremental processing*
>> 
>> A common pattern in data warehousing is pulling data incrementally 
>> from a source.
>> 
>> A standard way to achieve this is at the start of the task, select 
>> max `updated_at` in source table and hold on to that value for a 
>> minute.  This is your tentative new high watermark.
>> Now it's time to pull your data.  If your task ran before, grab last 
>> high watermark.  If not, use initial load value.
>> If successful, update high watermark.
>> 
>> On my team we implemented this with a stateful tasks / stateful 
>> processes concept (there's a dormant draft AIP here 
>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) 
>> and a WatermarkOperator that handled the boilerplate*.
>> 
>> Again here, I don't have a specific suggestion at this moment.  But 
>> I wanted to articulate this workflow because it is common and it 
>> wasn't immediately obvious to me in reading AIP-39 how I would use 
>> it to implement it.
>> 
>> AIP-39 makes airflow more data-aware.  So if it can support this 
>> kind of workflow that's great.  @Ash Berlin-Taylor 
>> <ma...@astronomer.io> do you have thoughts on how it might be 
>> compatible with this kind of thing as is?
>> 
>> ---
>> 
>> * The base operator is designed so that Subclasses only need to 
>> implement two methods:
>>     - `get_high_watermark`: produce the tentative new high watermark
>>     ' `watermark_execute`: analogous to implementing poke in a 
>> sensor, this is where your work is done. `execute` is left to the 
>> base class, and it orchestrates (1) getting last high watermark or 
>> inital load value and (2) updating new high watermark if job 
>> successful.
>> 


Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Kevin Yang <yr...@gmail.com>.
Hi Ash and James,

This is an exciting move. What do you think about using this opportunity to
extend Airflow's support to streaming like use cases? I.e. DAGs/tasks that
want to run forever like a service. For such use cases, schedule interval
might not be meaningful, then do we want to make the date interval param
optional to DagRun and task instances? That sounds like a pretty major
change to the underlying model of Airflow, but this AIP is so far the best
opportunity I saw that can level up Airflow's support for streaming/service
use cases.


Cheers,
Kevin Y

On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dp...@gmail.com>
wrote:

> Very excited to see this proposal come through and love the direction this
> has gone.
>
> Couple comments...
>
> *Tree view / Data completeness view*
>
> When you design your tasks with the canonical idempotence pattern, the
> tree view shows you both data completeness and task execution history
> (success / failure etc).
>
> When you don't use that pattern (which is my general preference), tree
> view is only task execution history.
>
> This change has the potential to unlock a data completeness view for
> canonical tasks.  It's possible that the "data completeness view" can
> simply be the tree view.  I.e. somehow it can use these new classes to know
> what data was successfully filled and what data wasn't.
>
> To the extent we like the idea of either extending / plugging / modifying
> tree view, or adding a distinct data completeness view, we might want to
> anticipate the needs of that in this change.  And maybe no alteration to
> the proposal would be needed but just want to throw the idea out there.
>
> *Watermark workflow / incremental processing*
>
> A common pattern in data warehousing is pulling data incrementally from a
> source.
>
> A standard way to achieve this is at the start of the task, select max
> `updated_at` in source table and hold on to that value for a minute.  This
> is your tentative new high watermark.
> Now it's time to pull your data.  If your task ran before, grab last high
> watermark.  If not, use initial load value.
> If successful, update high watermark.
>
> On my team we implemented this with a stateful tasks / stateful processes
> concept (there's a dormant draft AIP here
> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
> and a WatermarkOperator that handled the boilerplate*.
>
> Again here, I don't have a specific suggestion at this moment.  But I
> wanted to articulate this workflow because it is common and it wasn't
> immediately obvious to me in reading AIP-39 how I would use it to implement
> it.
>
> AIP-39 makes airflow more data-aware.  So if it can support this kind of
> workflow that's great.  @Ash Berlin-Taylor <as...@astronomer.io> do you
> have thoughts on how it might be compatible with this kind of thing as is?
>
> ---
>
> * The base operator is designed so that Subclasses only need to implement
> two methods:
>     - `get_high_watermark`: produce the tentative new high watermark
>     ' `watermark_execute`: analogous to implementing poke in a sensor,
> this is where your work is done. `execute` is left to the base class, and
> it orchestrates (1) getting last high watermark or inital load value and
> (2) updating new high watermark if job successful.
>
>

Re: [DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Posted by Daniel Standish <dp...@gmail.com>.
Very excited to see this proposal come through and love the direction this
has gone.

Couple comments...

*Tree view / Data completeness view*

When you design your tasks with the canonical idempotence pattern, the tree
view shows you both data completeness and task execution history (success /
failure etc).

When you don't use that pattern (which is my general preference), tree view
is only task execution history.

This change has the potential to unlock a data completeness view for
canonical tasks.  It's possible that the "data completeness view" can
simply be the tree view.  I.e. somehow it can use these new classes to know
what data was successfully filled and what data wasn't.

To the extent we like the idea of either extending / plugging / modifying
tree view, or adding a distinct data completeness view, we might want to
anticipate the needs of that in this change.  And maybe no alteration to
the proposal would be needed but just want to throw the idea out there.

*Watermark workflow / incremental processing*

A common pattern in data warehousing is pulling data incrementally from a
source.

A standard way to achieve this is at the start of the task, select max
`updated_at` in source table and hold on to that value for a minute.  This
is your tentative new high watermark.
Now it's time to pull your data.  If your task ran before, grab last high
watermark.  If not, use initial load value.
If successful, update high watermark.

On my team we implemented this with a stateful tasks / stateful processes
concept (there's a dormant draft AIP here
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
and a WatermarkOperator that handled the boilerplate*.

Again here, I don't have a specific suggestion at this moment.  But I
wanted to articulate this workflow because it is common and it wasn't
immediately obvious to me in reading AIP-39 how I would use it to implement
it.

AIP-39 makes airflow more data-aware.  So if it can support this kind of
workflow that's great.  @Ash Berlin-Taylor <as...@astronomer.io> do you have
thoughts on how it might be compatible with this kind of thing as is?

---

* The base operator is designed so that Subclasses only need to implement
two methods:
    - `get_high_watermark`: produce the tentative new high watermark
    ' `watermark_execute`: analogous to implementing poke in a sensor, this
is where your work is done. `execute` is left to the base class, and it
orchestrates (1) getting last high watermark or inital load value and (2)
updating new high watermark if job successful.