You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Andrey Anshin <an...@taragol.is> on 2023/01/28 13:04:50 UTC

[Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Greetings!

During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I
probably found skeletons in the closet.

Let's start from the beginning, initially I got this warning

airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY columns
added implicitly due to DISTINCT is deprecated and will be removed in
SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to
explicitly include the appropriate columns in the columns clause
(Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')

"OK let's fix it!", I thought at first and started to investigate
RenderedTaskInstanceFields model

*Skeleton #1:*

When I first time look on the code and comments it got me to thinking that
part which keep only latest N Rendered Task Fields potentially could lead
different performance degradation (Locks, Dead Locks, Data Bloating): see
code
https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245

Also this historical part (from Airflow 1.10.10) generate this SQL
Statement (pg backend)

DELETE FROM rendered_task_instance_fields
WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
  AND rendered_task_instance_fields.task_id = %(task_id_1) s
  AND (
    (
      rendered_task_instance_fields.dag_id,
      rendered_task_instance_fields.task_id,
      rendered_task_instance_fields.run_id
    ) NOT IN (
      SELECT
        anon_1.dag_id,
        anon_1.task_id,
        anon_1.run_id
      FROM
        (
          SELECT DISTINCT
            rendered_task_instance_fields.dag_id AS dag_id,
            rendered_task_instance_fields.task_id AS task_id,
            rendered_task_instance_fields.run_id AS run_id,
            dag_run.execution_date AS execution_date
          FROM rendered_task_instance_fields
            JOIN dag_run ON rendered_task_instance_fields.dag_id =
dag_run.dag_id
            AND rendered_task_instance_fields.run_id = dag_run.run_id
          WHERE
            rendered_task_instance_fields.dag_id = %(dag_id_2) s
            AND rendered_task_instance_fields.task_id = %(task_id_2) s
          ORDER BY
            dag_run.execution_date DESC
          limit %(param_1) s
        ) AS anon_1
    )
  )

Which is especially not effective in PostgreSQL. When IN SUBQUERY could be
easily transform internaly into SEMI-JOIN (aka EXISTS clause), but it is
not working for NOT IN SUBQUERY because it is not transformed into ANTI
JOIN (aka NOT EXISTS clause) even if it possible, see:
https://commitfest.postgresql.org/27/2023/

I didn't do any performance benchmarks yet but I guess if users set
AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than default
30 it could improve performance and reduce number of DeadLocks, however the
table size will increase but I think we don't do any maintenance job for
other tables.

Potentially it is a good idea to deprecate this option and recommend for
users to set it to 0? WDYT? Maybe someone has already tried or investigated
this?


*Skeleton #2:*

We have a k8s_pod_yaml field which is exclusively used by K8S executors.

Should we also decouple this field as part of AIP-51?

----
Best Wishes
*Andrey Anshin*

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Kaxil Naik <ka...@gmail.com>.
>I think we should not deprecate it though, but find a more efficient
way of deleting the old keys. I think we could slightly denormalize
RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
to the RenderedTaskInstance table and that will be enough to optimise
it.

yeah I agree with that

On Mon, 30 Jan 2023 at 19:51, Jarek Potiuk <ja...@potiuk.com> wrote:

> I think there is a good reason to clean those up automatically.
> rendered task instance fields are almost arbitrary in size. If we try
> to keep all historical values there by default, there are numerous
> cases it will grow very fast - far, far too quickly.
>
> And I am not worried at all about locks on this table if we do it the
> way I described it and it uses the indexes. The contention this way
> might only be between the two deleting tasks. and with the query I
> proposed, they will only last for a short time - the index will be
> locked when two DELETES  or SELECT DISTINCT - which should both be
> fast.
>
>
> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <an...@taragol.is>
> wrote:
> >
> > I guess two things involved to reduce performance on this query through
> the time: Dynamic Task Mapping and run_id instead of execution date.
> >
> > I still personally think that changing the default value from 30 to 0
> might improve performance of multiple concurrent tasks, just because this
> query does not run and there are no locks on multiple records/pages.
> >
> > I do not have any proof (yet?) other than simple DAGs. I think that
> there is some cross point exists when keeping this table growth worse
> rather than cleanup for each TI run. But users have ability to cleanup
> table by execute airflow db clean which should improve performance again
> >
> > And also there is interesting behavior with this query: if user already
> have more that value specified by
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill
> than rendered templates not written to table (or may be inserted and after
> that immediately deleted), the same is valid for cleanup old tasks.
> >
> > ----
> > Best Wishes
> > Andrey Anshin
> >
> >
> >
> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com> wrote:
> >>
> >> Yep. Agree this is not an efficient query and dynamic task mapping
> >> makes the effect much worse. Generally speaking, selecting "what
> >> should be left" and then deleting stuff where the key is "not in" is
> >> never an efficient way of running an sql query.  And the query not
> >> using index at all makes it rather terrible.
> >>
> >> I think we should not deprecate it though, but find a more efficient
> >> way of deleting the old keys. I think we could slightly denormalize
> >> RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
> >> to the RenderedTaskInstance table and that will be enough to optimise
> >> it.
> >>
> >> Then we could have either:
> >>
> >> * a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID,
> >> RUN_ID_EXECUTION_DATE
> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
> >>
> >> Probably the latter is better as I am not sure how < , > comparison
> >> looks like for composite B-TREE indexes when char + date columns are
> >> mixed. Also we could have hit the infamous MySQL index key length
> >> limit.
> >>
> >> Then deletion process would look roughly like:
> >>
> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
> >>
> >> I believe that would be fast, and it would use the B-TREE index
> >> features nicely (ordering support)
> >>
> >> J
> >>
> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <an...@taragol.is>
> wrote:
> >> >
> >> > First of all I want to highlight that this approach I guess worked
> well until Dynamic Task Mappings introduced.
> >> >
> >> > > The main reason for adding that cleanup was -- if you don't do
> that, you will have many rows, similar to the TaskInstance table
> >> >
> >> > The problem itself is not how big your table/indexes, rather then
> what kind of operation you run.
> >> >
> >> > > Do you have any data for locks or performance degradation?
> >> >
> >> > In this case if we try to clean up rendered_task_instance_fields
> table when a new TI is created/cleared we make almost two full/sequential
> scans (note: need to check) against the table without any index usage, so
> we pay here a couple times:
> >> > 1. We scan without indexes - not all parts of the composite key are
> included to query, plus we need to filter everything except 30 records with
> order and distinct
> >> > 2. After that we make another full scan for find 1 record or map_size
> records
> >> >
> >> > And I guess the situation becomes worse if you have a lot of tasks,
> even if we have a small table, we need to do ineffective operations.
> >> >
> >> > That how looks like Query Plan (please note without commit
> transaction DELETE operation doesn't have all information):
> https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
> >> >
> >> > In case if we do not clean up the table, we only use these operations:
> >> > 1. SELECT single record by index
> >> > 2. INSERT new record
> >> > 3. DELETE old record(s), which were found by index.
> >> >
> >> > I have not done any real tests yet, only synthetic DAGs (so we should
> not consider to use any findings as totally truth):
> https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
> >> > DAG with parallel tasks: degradation up to 2-3 times
> >> > DAG with single map tasks: degradation up to 7-10 times
> >> >
> >> > I have a plan for more complex and more close to real use cases with
> Database which do not have network latency almost 0 as I have in my local.
> >> > But I will not refuse if someone also does their tests with
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
> >> >
> >> > About deadlock we know that it exists at least in MySQL:
> https://github.com/apache/airflow/pull/18616
> >> >
> >> > > And the larger tables create problems during database migrations.
> >> >
> >> > That is a very good point, so if we found that problem only related
> to migrations we could:
> >> > 1. Cleanup this table in migration
> >> > 2. Add cli command to airflow db which could cleanup only rendered
> fields, so it would be user's choice cleanup or not before migration, do
> periodical maintenance or not
> >> >
> >> >
> >> > ----
> >> > Best Wishes
> >> > Andrey Anshin
> >> >
> >> >
> >> >
> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com> wrote:
> >> >>>
> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >>
> >> >>
> >> >> The main reason for adding that cleanup was -- if you don't do that,
> you will have many rows, similar to the TaskInstance table. And the
> RenderedTIFields were mainly added for checking rendered TI fields on the
> Webserver only because after DAG Serialization, the webserver won't have
> access to DAG files.
> >> >>
> >> >> And the larger tables create problems during database migrations.
> >> >>
> >> >> Do you have any data for locks or performance degradation?
> >> >>
> >> >>
> >> >>
> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >>>
> >> >>> Greetings!
> >> >>>
> >> >>> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I
> probably found skeletons in the closet.
> >> >>>
> >> >>> Let's start from the beginning, initially I got this warning
> >> >>>
> >> >>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY
> columns added implicitly due to DISTINCT is deprecated and will be removed
> in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to
> explicitly include the appropriate columns in the columns clause
> (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
> >> >>>
> >> >>> "OK let's fix it!", I thought at first and started to investigate
> RenderedTaskInstanceFields model
> >> >>>
> >> >>> Skeleton #1:
> >> >>>
> >> >>> When I first time look on the code and comments it got me to
> thinking that part which keep only latest N Rendered Task Fields
> potentially could lead different performance degradation (Locks, Dead
> Locks, Data Bloating): see code
> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
> >> >>>
> >> >>> Also this historical part (from Airflow 1.10.10) generate this SQL
> Statement (pg backend)
> >> >>>
> >> >>> DELETE FROM rendered_task_instance_fields
> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
> >> >>>   AND (
> >> >>>     (
> >> >>>       rendered_task_instance_fields.dag_id,
> >> >>>       rendered_task_instance_fields.task_id,
> >> >>>       rendered_task_instance_fields.run_id
> >> >>>     ) NOT IN (
> >> >>>       SELECT
> >> >>>         anon_1.dag_id,
> >> >>>         anon_1.task_id,
> >> >>>         anon_1.run_id
> >> >>>       FROM
> >> >>>         (
> >> >>>           SELECT DISTINCT
> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
> >> >>>             rendered_task_instance_fields.task_id AS task_id,
> >> >>>             rendered_task_instance_fields.run_id AS run_id,
> >> >>>             dag_run.execution_date AS execution_date
> >> >>>           FROM rendered_task_instance_fields
> >> >>>             JOIN dag_run ON rendered_task_instance_fields.dag_id =
> dag_run.dag_id
> >> >>>             AND rendered_task_instance_fields.run_id =
> dag_run.run_id
> >> >>>           WHERE
> >> >>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
> >> >>>             AND rendered_task_instance_fields.task_id =
> %(task_id_2) s
> >> >>>           ORDER BY
> >> >>>             dag_run.execution_date DESC
> >> >>>           limit %(param_1) s
> >> >>>         ) AS anon_1
> >> >>>     )
> >> >>>   )
> >> >>>
> >> >>> Which is especially not effective in PostgreSQL. When IN SUBQUERY
> could be easily transform internaly into SEMI-JOIN (aka EXISTS clause), but
> it is not working for NOT IN SUBQUERY because it is not transformed into
> ANTI JOIN (aka NOT EXISTS clause) even if it possible, see:
> https://commitfest.postgresql.org/27/2023/
> >> >>>
> >> >>> I didn't do any performance benchmarks yet but I guess if users set
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than default
> 30 it could improve performance and reduce number of DeadLocks, however the
> table size will increase but I think we don't do any maintenance job for
> other tables.
> >> >>>
> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >>>
> >> >>>
> >> >>> Skeleton #2:
> >> >>>
> >> >>> We have a k8s_pod_yaml field which is exclusively used by K8S
> executors.
> >> >>>
> >> >>> Should we also decouple this field as part of AIP-51?
> >> >>>
> >> >>> ----
> >> >>> Best Wishes
> >> >>> Andrey Anshin
> >> >>>
>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Andrey Anshin <an...@taragol.is>.
Just for a record, I still try to create representative experiments.

Last time when I ran DAG with pretty big rendered_templates (about 10KB per
TI) with total of 10k TI I'd found after couple of day of execution that
postgres very efficient compress my dummy rendered templates and it didn't
move into the TOAST table as I expects initially and all run are useless
for this kind of experiment (facepalm).

And only now realised that my MX records are gone and I didn't receive any
emails for a week or so, and all email which I sent went to /dev/null
(facepalm #2)

----
Best Wishes
*Andrey Anshin*



On Wed, 1 Feb 2023 at 01:46, Jarek Potiuk <ja...@potiuk.com> wrote:

> BTW, I would really love to hear what the original authors have to say
> here. I am merely trying to put myself in their shoes and guess what
> the reasoning is :).
>
> I think this is really a question of: Do we want to keep all rendered
> arbitrary size rendered task instance fields in our database forever
> by default, same as other fields.
> I believe the original authors answered the question to be "no". And
> the "num_row" was a way to limit it.
>
> And I really do not want to "win" this argument, I just want to
> protect our users (and environment).
>
> There is (IMHO) currently a big difference between
> session/logg/task_instance fields and rendered task_instance fields
> that justify different behaviour.
>
> The former are generally fixed in max size of rows (this is one of the
> reasons we have limited string sizes in our DB) - to be able to limit
> them growing uncontrollably large. We simply do not keep arbitrary
> size data in those tables.
> On the other hand, the rendered task instance is arbitrary in size
> (JSONField) and the need for deletion accounts for the "worst" case
> scenario.
> Until we get rid of that "property" of the rendered task instance
> table, I think "just" skipping deletion of those fields without fixing
> the "worst" case scenario is not a good idea.
>
> Maybe in your test cases (and many others) those tables are not
> bigger, but I think the protection here is implemented to account for
> the case where the rendered task instance field is "big". I think the
> protection here is done for the cases where the rendered task instance
> fields are really "big".
>
> But very interestingly - if the rendered task instance is "big" then
> likely it is next to useless to be displayed in the Web UI in its
> entirety.
>
> So maybe you are actually right Andrey, Maybe we can skip deleting
> those and maybe we could solve it differently and apply the same rules
> as other tables?
>
> Let me then - constructively - propose another idea which actually
> might solve both yours and my own concerns. Maybe we can fix the
> "worst case" scenario differently? We do not have to limit the number
> of rows, we can limit the size of the row instead.
>
> Why don't we simply limit the size of the rendered task instance JSON
> and if they are too big (we can configure the maximum size), we will
> render something like (probably a bit more unique and not
> "accidentally triggerable"):
>
> {
>    "error": "This task instance has too large rendered task instances
> to display it"
> }
>
> And implement an escape hatch in the web server to handle it properly
> when displaying such "truncated" rendered task instance field.
>
> We should be able come up with a sensible max size that we might think
> makes sense when rendering it in the web UI.  And we could make the
> max size configurable by the user if they have a lot of GB to spare.
> We could even apply it automatically. If there is a max_num_row_limit
> - we allow any size, if we have no limit on the number of rows, we
> limit the maximum row size.
>
> If we have such "upper-bounded" reasonable size of each row in that
> table - then I am perfectly happy with not deleting the rows
> automatically.
> But only if we limit that and handle the "worst" case properly.
>
> That would be my proposal how we can handle it to get both views taken
> into account.
>
> What do you Andrey (and others) think? Does it make sense? Or do we
> think we should not have any such protections in place ?
>
> J.
>
>
> On Tue, Jan 31, 2023 at 9:43 PM Andrey Anshin <an...@taragol.is>
> wrote:
> >
> > > I think that is all something to be tested with explaining plans.
> >
> > I would be really careful with these results. DELETE in Transaction with
> Rollback usually shows more optimistic than actually executed.
> >
> >
> > > I think we would not know before we try - and possibly there are other
> > optimisation approaches. The optimisation I proposed was only first
> > that came to my mind to avoid the "not in" query. The problem  with
> > "not in query" is that there is no way to optimise it by the DB.
> > Effectively you have to get every record (or index entry) and test it.
> > Maybe it can be done better :). And yes locking the index with
> > anti-insert locks and the need to rebalance trees during the delete is
> > a concern.
> >
> > My point is still the same, I would rather remove it in the future or
> make policy about maintenance more consistent: all or nothing. Right now we
> are close to nothing rather than all.
> >
> >
> > > It's not about index size or JSON access. It is about the size of the
> > actual rows and storage it takes - i.e. general size of the database.
> >
> > I'm tired, but I'm not sure that I understand the actual point.
> > Is it not really a matter of size of the table if you always access by
> pattern:
> > 1 request which returns exactly 1 record accessed by a unique index.
> > Basically query travercial by index find reference to single CTID/rowid
> (or whatever name used in other RDBMS).
> > So at this point it really matters how fast your index grows rather than
> table size.
> >
> >
> > > The problem with it is that (especially with dynamic task mapping), it
> > might grow really, really fast. Basically you have NUM_DAGS x
> > NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
> > of records there.
> > Back-of-the envelope calculation Assuming you have a
> > DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
> > fields, each field evaluating to 1K string.  Then you have 10 tasks *
> > 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
> > per one(!) run of one(1) DAG. Run it every 10 minutes and every day
> > your database from a single DAG grows by whooping 1.44 GB of data
> > every single day (from single DAG).
> >
> > Depends on DB. If we talk about Postgres you could easily miss up to 3-4
> times (thanks for inline compression before/instead of TOASTs).
> > I have a couple questions:
> > 1. Do you know how big would be indexes in rendered_task_instance_fields
> after one day? (Spoiler alert I could give estimation in the morning)
> > 2. In this scenario with default settings always would keep up to 30 000
> TI for this DAG.
> > Could someone who wants to optimize the query make it more optimal
> rather than access the table by index (Unique Index -> ctid/rowid - record)
> and where this crosspoint?
> >
> > > This is of course an estimation that assumes a lot, but it's not at
> all unrealistic.
> >
> > 144,000 TI per/day on single DAG (initially I want to put here some
> sarcastic message).
> > How would Airflow feel with 144k Task Instances per day? How
> > I asked because right now I've always had a task_instance table bigger
> than rendered_task_instance_fields.
> >
> >
> > > This table is very specific compared with the other tables. The only
> reason for it being here is to be able
> > to show the rendered fields in the UI if you go to the specific run of a
> task. If you clean-up other tables you basically lose the history of
> execution of the tasks and you cannot really know if the data has been
> > processed, you cannot do backfills effectively, you lose all the
> > context. Cleaning this table is merely about the values that have been
> > rendered for a specific run and the assumption there is that the older
> > it gets, the less interesting it is.
> >
> > What about these tables: session, log, job? I expected the answer would
> be "They are not so specific."
> > For me every table is specific for their purpose.
> > Users often asked (slack, issues, discussions) to give the ability to
> auto-maintain tables/logs and receive the usual answer: "No, we don't give
> you this opportunity, please use something for Airflow Ecosystem Page". But
> on the other hand we have auto-maintenance only for a single table.
> >
> > >> It is opposite of what we have right now, we scan tables (maybe
> multiple times), read all records tuples which contain JSON.
> > > Not sure if I get the point here :). Yes -in my proposal I think the
> records will not be touched - only indexes. So the cleanup should be way
> faster, contentions less of problem, due to the way the delete
> > uses < ordering, deadlocks will not be possible at all (as opposed to
> the current "not in" - there is a very easy way to get into deadlocks when
> two parallel deletes are trying to delete same rows in a different
> sequence. I think my proposal improves all the characteristics of the
> "cleanup" with very little extra penalty on record creation.
> >
> > I was talking about the current solution and why it is also slow (and if
> abstract users use some DBaaS where they also pay for IOPs then it is
> expensive). Let's talk about the benefits of optimised queries for 4
> different DB backends (3 if excluding SQLite) when we have it.
> >
> > > We pay for table/index size linary more records, more size. But other
> operations vary and depend on B-Tree implementation and usually it has
> logarithmic growth. Or do we worry only about table/toast/index size on
> disk?
> > >> Yep. I (and I believe the original author had the same worry) am
> worried a lot about the size of the table and the fact that this table will
> be by far the biggest table in our DB while most of the old records will
> never be touched. And by the fact that this is the only table that our
> users will have to know about to clean up separately from all others pretty
> much always.
> >
> > Same as previous.
> >
> > > If not even worrying about money spent by our users, and performance
> degradation that comes with databases that are bigger - that's a lot of
> environmental effects that we might incur. Airflow is heavily used, if
> suddenly all our users
> > will start having 10 bigger databases that they have now because we will
> deprecate the values and keep all the history, then we have a big number of
> extra disks that will have to be used. I'd strongly prefer a solution where
> we keep the data usage lower in this case.
> >
> > Am I right that this is all about "lets don't delete by default as we do
> for other tables" rather than the current default implementation?
> > Because I get the result which is opposite what you said. And
> rendered_task_instance_fields don't grow faster than other tables that what
> I got.
> > I would like to compare it with other findings and some reproducible
> metrics rather than with hypothetical things.
> >
> > ----
> > Best Wishes
> > Andrey Anshin
> >
> >
> >
> > On Tue, 31 Jan 2023 at 11:12, Jarek Potiuk <ja...@potiuk.com> wrote:
> >>
> >> COMMENT: While writing the answer here, I think I found a deeper
> >> problem (and optimisation needed)  - i.e I think the delete should be
> >> even more fine-grained than it is today and include map_index) -
> >> please take a look at the end (Also maybe TP might comment on that
> >> one).
> >>
> >> > 1. Additional indexes add additional performance degradation on
> Insert but gain potential improvements on delete and unknown on update,
> RDBMS still require rebalance index and make it consistent to the table.
> >> > 2. LIMIT x OFFSET y could easily become full seq scan, especially if
> the user set a huge number for offset (which? unknown).
> >> > 3. Mixing two indexes could improve performance in a single query but
> in concurrent execution might lead to degradation because it needs to
> create a bitmap table for comparison between these two indexes, as result
> it might lead different issues, such as OOM on DB backend, use swaps or
> optimiser decided that better not to use this indexes.
> >>
> >> I think that is all something to be tested with explain plans. I think
> >> we would not know before we try - and possibly there are other
> >> optimisation approaches. The optimisation I proposed was only first
> >> that came to my mind to avoid the "not in" query. The problem  with
> >> "not in query" is that there is no way to optimise it by the DB.
> >> Effectively you have to get every record (or index entry) and test it.
> >> Maybe it can be done better :). And yes locking the index with
> >> anti-insert locks and the need to rebalance trees during the delete is
> >> a concern.
> >>
> >> > Is it a real problem? Until we access only by indexes, which doesn't
> include this JSON, it really doesn't matter. I guess we almost always
> should make a UNIQUE INDEX SCAN for SELECT or DELETE (UPDATE) a single
> record.
> >>
> >> Yes I think so, and while. I was not the author of this "cleanup"
> >> code, I believe I know the intention.
> >>
> >> It's not about index size or JSON access. It is about the size of the
> >> actual rows and storage it takes - i.e. general size of the database.
> >> The problem with it is that (especially with dynamic task mapping), it
> >> might grow really, really fast. Basically you have NUM_DAGS x
> >> NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
> >> of records there. Back-of-the envelope calculation Assuming you have a
> >> DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
> >> fields, each field evaluating to 1K string.  Then you have 10 tasks *
> >> 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
> >> per one(!) run of one(1) DAG. Run it every 10 minutes and every day
> >> your database from a single DAG grows by whooping 1.44 GB of data
> >> every single day (from single DAG).This is of course an estimation
> >> that assumes a lot, but it's not at all unrealistic. That's a lot. And
> >> if we want the user to do the cleanup then a) they need to know it b)
> >> they need to specifically clean up this table only because all the
> >> other data is relatively small. This table is very specific compared
> >> with the other tables. The only reason for it being here is to be able
> >> to show the rendered fields in the UI if you go to the specific run of
> >> a task. If you clean-up other tables you basically lose the history of
> >> execution of the tasks and you cannot really know if the data has been
> >> processed, you cannot do backfills effectively, you lose all the
> >> context. Cleaning this table is merely about the values that have been
> >> rendered for a specific run and the assumption there is that the older
> >> it gets, the less interesting it is.
> >>
> >> > It is opposite of what we have right now, we scan tables (maybe
> multiple times), read all records tuples which contain JSON.
> >>
> >> Not sure if I get the point here :). Yes -in my proposal I think the
> >> records will not be touched - only indexes. So the cleanup should be
> >> way faster, contentions less of problem, due to the way the delete
> >> uses < ordering, deadlocks will not be possible at all (as opposed to
> >> the current "not in" - there is a very easy way to get into deadlocks
> >> when two parallel deletes are trying to delete same rows in a
> >> different sequence. I think my proposal improves all the
> >> characteristics of the "cleanup" with very little extra penalty on
> >> record creation.
> >>
> >> > We pay for table/index size linary more records, more size. But other
> operations vary and depend on B-Tree implementation and usually it has
> logarithmic growth. Or do we worry only about table/toast/index size on
> disk?
> >>
> >> Yep. I (and I believe the original author had the same worry) am
> >> worried a lot about the size of the table and the fact that this table
> >> will be by far the biggest table in our DB while most of the old
> >> records will never be touched. And by the fact that this is the only
> >> table that our users will have to know about to clean up separately
> >> from all others pretty much always. If not even worrying about money
> >> spent by our users, and performance degradation that comes with
> >> databases that are bigger - that's a lot of environmental effects that
> >> we might incur. Airflow is heavily used, if suddenly all our users
> >> will start having 10 bigger databases that they have now because we
> >> will deprecate the values and keep all the history, then we have a big
> >> number of extra disks that will have to be used. I'd strongly prefer a
> >> solution where we keep the data usage lower in this case.
> >>
> >> > If we do not want to grant users the ability to clean up rendered
> templates tables, there could be another option:
> >> > - Do not delete records on every task instance run.
> >> > - Delete once per defined period (hourly, daily, weekly, monthly). In
> this case you really could not care about locks.
> >>
> >> Yes we could come up with a different strategy as to "when" run the
> >> cleanup. This is also a viable option. If you can propose one that
> >> will be equally adaptive as the current solution, I am all ears.
> >> Basically my goal is to keep the usage of the table low, possibly
> >> controlled by the same parameter we had. How we do it - this is a
> >> different story. If we - for example add a thread in the scheduler
> >> (for example) that performs such cleanup effectively in parallel and
> >> scales, I am happy with that.
> >>
> >> But I am trying to get into the head of the author trying to
> >> understand why the original implementation was done this way. I
> >> believe (and maybe those who remember it better could confirm it) that
> >> distributing the deletion to tasks to clean up after itself is a
> >> better idea than centralising the cleanup. This makes each cleanup
> >> smaller, locks are held for a shorter time (at least that was the
> >> assumption where no full table scan was used), it is more "immediate"
> >> and you do not have to decide upfront what should be the cleanup
> >> frequency. It seems this is the best logical approach to keep the
> >> "MAX_NUM_RENDERED_TI_FIELDS_PER_TASK" promise. Simply after task is
> >> complete, you can be sure that there are no more than this number of
> >> fields per task in the DB. With a scheduled run, that would be a much
> >> more "eventual" consistency and it will be potentially fluctuating
> >> much more.
> >>
> >> But there are risks involved in having a single thread doing the
> >> cleanup. I think it has a huge risk of being a "stop-the world" and
> >> "deadlock-prone" kind of event - if in big instances there are a lot
> >> of rows to cleanup in a single pass. When you delete entries from a
> >> table, there are anti-insert locks created for existing index entries,
> >> which makes it possible to rollback the whole DELETE transaction.
> >> Which means that when you try to insert the row with the same index,
> >> the index will be held. And this means that when you run a single huge
> >> DELETE for multiple rows affected with multiple (all?) index keys
> >> matching select query, it will effectively prevent new rows with the
> >> same indexes that are matching the SELECT. It would mean that if you
> >> have some tasks running while deleting existing run_id rendered
> >> fields, then you could REALLY start having deadlocks on those tasks
> >> trying to insert rendered task instance rows. That's why I think the
> >> only viable strategy for single "cleanup" thread is to do such cleanup
> >> as separate DELETE for each of the "dag/task/map_index/run" - in order
> >> to avoid such deadlocks. Which effectively will turn into what have
> >> currently - only that currently those transactions are done by tasks,
> >> not by a single cleanup thread.
> >>
> >> Also using tasks to delete old rows is more "effective" when you have
> >> vast differences in frequency of DAGs. Naturally - when you do it in
> >> task, you will only do it "when needed" for given DAG + Task. If you
> >> try to centralize the cleanup, unless you include somehow schedule and
> >> frequency of each dag, you are going to check every DAG every time
> >> your run the cleanup - no matter if that DAG is run daily or every
> >> minute, you will have to run the cleanup frequently enough to match
> >> your most frequent dags. If you have 1000 dags that run hourly and one
> >> DAG that runs every minue, then you have to run a cleanup job that
> >> scans all DAGs every few minutes. That's a big waste.
> >>
> >> So I am not sure if we gain anything by centralizing the cleanup.
> >> Decentralising it to Task seems to be a well thought and sound
> >> decision (but I think the problem we have now is that we need to
> >> optimize it after Dynamic Task Mapping has been added).
> >>
> >> ANOTHER FINDING:
> >>
> >> While looking at the code and discussing it and looking more closely I
> >> **think** there is another problem that we have to fix regardless of a
> >> solution. I THINK a problem we might have now is that we do not
> >> include map_index in this DELETE. While we are curreently delete all
> >> the rendered task fields without including map_index - and for big
> >> dynamic tasks, it means that exacly the same DELETE query is run by
> >> every single mapped instance of that tasks and that is where a lot of
> >> contention and locking might happen (basically when single task
> >> instance does the delete, anti-insert locks held the other mapped
> >> instances of the same task from inserting rendered fields).
> >>
> >> It does not change much in the optimisation proposal of mine, other
> >> than we should include map_index in those queries. But I think this
> >> might cause a lot of delays in the current implementation.
> >>
> >> J.
> >>
> >> > ----
> >> > Best Wishes
> >> > Andrey Anshin
> >> >
> >> >
> >> >
> >> > On Mon, 30 Jan 2023 at 23:51, Jarek Potiuk <ja...@potiuk.com> wrote:
> >> >>
> >> >> I think there is a good reason to clean those up automatically.
> >> >> rendered task instance fields are almost arbitrary in size. If we try
> >> >> to keep all historical values there by default, there are numerous
> >> >> cases it will grow very fast - far, far too quickly.
> >> >>
> >> >> And I am not worried at all about locks on this table if we do it the
> >> >> way I described it and it uses the indexes. The contention this way
> >> >> might only be between the two deleting tasks. and with the query I
> >> >> proposed, they will only last for a short time - the index will be
> >> >> locked when two DELETES  or SELECT DISTINCT - which should both be
> >> >> fast.
> >> >>
> >> >>
> >> >> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >> >
> >> >> > I guess two things involved to reduce performance on this query
> through the time: Dynamic Task Mapping and run_id instead of execution date.
> >> >> >
> >> >> > I still personally think that changing the default value from 30
> to 0 might improve performance of multiple concurrent tasks, just because
> this query does not run and there are no locks on multiple records/pages.
> >> >> >
> >> >> > I do not have any proof (yet?) other than simple DAGs. I think
> that there is some cross point exists when keeping this table growth worse
> rather than cleanup for each TI run. But users have ability to cleanup
> table by execute airflow db clean which should improve performance again
> >> >> >
> >> >> > And also there is interesting behavior with this query: if user
> already have more that value specified by
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill
> than rendered templates not written to table (or may be inserted and after
> that immediately deleted), the same is valid for cleanup old tasks.
> >> >> >
> >> >> > ----
> >> >> > Best Wishes
> >> >> > Andrey Anshin
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com>
> wrote:
> >> >> >>
> >> >> >> Yep. Agree this is not an efficient query and dynamic task mapping
> >> >> >> makes the effect much worse. Generally speaking, selecting "what
> >> >> >> should be left" and then deleting stuff where the key is "not in"
> is
> >> >> >> never an efficient way of running an sql query.  And the query not
> >> >> >> using index at all makes it rather terrible.
> >> >> >>
> >> >> >> I think we should not deprecate it though, but find a more
> efficient
> >> >> >> way of deleting the old keys. I think we could slightly
> denormalize
> >> >> >> RenderedTaskInstance + DagRun tables, and add
> DAG_RUN_EXECUTION_DATE
> >> >> >> to the RenderedTaskInstance table and that will be enough to
> optimise
> >> >> >> it.
> >> >> >>
> >> >> >> Then we could have either:
> >> >> >>
> >> >> >> * a composite B-TREE indexed (non-unique) index on DAG_ID,
> TASK_ID,
> >> >> >> RUN_ID_EXECUTION_DATE
> >> >> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
> >> >> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
> >> >> >>
> >> >> >> Probably the latter is better as I am not sure how < , >
> comparison
> >> >> >> looks like for composite B-TREE indexes when char + date columns
> are
> >> >> >> mixed. Also we could have hit the infamous MySQL index key length
> >> >> >> limit.
> >> >> >>
> >> >> >> Then deletion process would look roughly like:
> >> >> >>
> >> >> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
> >> >> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> >> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
> >> >> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
> >> >> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
> >> >> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID
> =<DAG_ID>,
> >> >> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
> >> >> >>
> >> >> >> I believe that would be fast, and it would use the B-TREE index
> >> >> >> features nicely (ordering support)
> >> >> >>
> >> >> >> J
> >> >> >>
> >> >> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >> >> >
> >> >> >> > First of all I want to highlight that this approach I guess
> worked well until Dynamic Task Mappings introduced.
> >> >> >> >
> >> >> >> > > The main reason for adding that cleanup was -- if you don't
> do that, you will have many rows, similar to the TaskInstance table
> >> >> >> >
> >> >> >> > The problem itself is not how big your table/indexes, rather
> then what kind of operation you run.
> >> >> >> >
> >> >> >> > > Do you have any data for locks or performance degradation?
> >> >> >> >
> >> >> >> > In this case if we try to clean up
> rendered_task_instance_fields table when a new TI is created/cleared we
> make almost two full/sequential scans (note: need to check) against the
> table without any index usage, so we pay here a couple times:
> >> >> >> > 1. We scan without indexes - not all parts of the composite key
> are included to query, plus we need to filter everything except 30 records
> with order and distinct
> >> >> >> > 2. After that we make another full scan for find 1 record or
> map_size records
> >> >> >> >
> >> >> >> > And I guess the situation becomes worse if you have a lot of
> tasks, even if we have a small table, we need to do ineffective operations.
> >> >> >> >
> >> >> >> > That how looks like Query Plan (please note without commit
> transaction DELETE operation doesn't have all information):
> https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
> >> >> >> >
> >> >> >> > In case if we do not clean up the table, we only use these
> operations:
> >> >> >> > 1. SELECT single record by index
> >> >> >> > 2. INSERT new record
> >> >> >> > 3. DELETE old record(s), which were found by index.
> >> >> >> >
> >> >> >> > I have not done any real tests yet, only synthetic DAGs (so we
> should not consider to use any findings as totally truth):
> https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
> >> >> >> > DAG with parallel tasks: degradation up to 2-3 times
> >> >> >> > DAG with single map tasks: degradation up to 7-10 times
> >> >> >> >
> >> >> >> > I have a plan for more complex and more close to real use cases
> with Database which do not have network latency almost 0 as I have in my
> local.
> >> >> >> > But I will not refuse if someone also does their tests with
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
> >> >> >> >
> >> >> >> > About deadlock we know that it exists at least in MySQL:
> https://github.com/apache/airflow/pull/18616
> >> >> >> >
> >> >> >> > > And the larger tables create problems during database
> migrations.
> >> >> >> >
> >> >> >> > That is a very good point, so if we found that problem only
> related to migrations we could:
> >> >> >> > 1. Cleanup this table in migration
> >> >> >> > 2. Add cli command to airflow db which could cleanup only
> rendered fields, so it would be user's choice cleanup or not before
> migration, do periodical maintenance or not
> >> >> >> >
> >> >> >> >
> >> >> >> > ----
> >> >> >> > Best Wishes
> >> >> >> > Andrey Anshin
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com>
> wrote:
> >> >> >> >>>
> >> >> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> The main reason for adding that cleanup was -- if you don't do
> that, you will have many rows, similar to the TaskInstance table. And the
> RenderedTIFields were mainly added for checking rendered TI fields on the
> Webserver only because after DAG Serialization, the webserver won't have
> access to DAG files.
> >> >> >> >>
> >> >> >> >> And the larger tables create problems during database
> migrations.
> >> >> >> >>
> >> >> >> >> Do you have any data for locks or performance degradation?
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >> >> >>>
> >> >> >> >>> Greetings!
> >> >> >> >>>
> >> >> >> >>> During migrating our ORM syntax to compatible with SQLAlchemy
> 2.0 I probably found skeletons in the closet.
> >> >> >> >>>
> >> >> >> >>> Let's start from the beginning, initially I got this warning
> >> >> >> >>>
> >> >> >> >>> airflow/models/renderedtifields.py:245
> RemovedIn20Warning('ORDER BY columns added implicitly due to DISTINCT is
> deprecated and will be removed in SQLAlchemy 2.0.  SELECT statements with
> DISTINCT should be written to explicitly include the appropriate columns in
> the columns clause (Background on SQLAlchemy 2.0 at:
> https://sqlalche.me/e/b8d9)')
> >> >> >> >>>
> >> >> >> >>> "OK let's fix it!", I thought at first and started to
> investigate RenderedTaskInstanceFields model
> >> >> >> >>>
> >> >> >> >>> Skeleton #1:
> >> >> >> >>>
> >> >> >> >>> When I first time look on the code and comments it got me to
> thinking that part which keep only latest N Rendered Task Fields
> potentially could lead different performance degradation (Locks, Dead
> Locks, Data Bloating): see code
> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
> >> >> >> >>>
> >> >> >> >>> Also this historical part (from Airflow 1.10.10) generate
> this SQL Statement (pg backend)
> >> >> >> >>>
> >> >> >> >>> DELETE FROM rendered_task_instance_fields
> >> >> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
> >> >> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
> >> >> >> >>>   AND (
> >> >> >> >>>     (
> >> >> >> >>>       rendered_task_instance_fields.dag_id,
> >> >> >> >>>       rendered_task_instance_fields.task_id,
> >> >> >> >>>       rendered_task_instance_fields.run_id
> >> >> >> >>>     ) NOT IN (
> >> >> >> >>>       SELECT
> >> >> >> >>>         anon_1.dag_id,
> >> >> >> >>>         anon_1.task_id,
> >> >> >> >>>         anon_1.run_id
> >> >> >> >>>       FROM
> >> >> >> >>>         (
> >> >> >> >>>           SELECT DISTINCT
> >> >> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
> >> >> >> >>>             rendered_task_instance_fields.task_id AS task_id,
> >> >> >> >>>             rendered_task_instance_fields.run_id AS run_id,
> >> >> >> >>>             dag_run.execution_date AS execution_date
> >> >> >> >>>           FROM rendered_task_instance_fields
> >> >> >> >>>             JOIN dag_run ON
> rendered_task_instance_fields.dag_id = dag_run.dag_id
> >> >> >> >>>             AND rendered_task_instance_fields.run_id =
> dag_run.run_id
> >> >> >> >>>           WHERE
> >> >> >> >>>             rendered_task_instance_fields.dag_id =
> %(dag_id_2) s
> >> >> >> >>>             AND rendered_task_instance_fields.task_id =
> %(task_id_2) s
> >> >> >> >>>           ORDER BY
> >> >> >> >>>             dag_run.execution_date DESC
> >> >> >> >>>           limit %(param_1) s
> >> >> >> >>>         ) AS anon_1
> >> >> >> >>>     )
> >> >> >> >>>   )
> >> >> >> >>>
> >> >> >> >>> Which is especially not effective in PostgreSQL. When IN
> SUBQUERY could be easily transform internaly into SEMI-JOIN (aka EXISTS
> clause), but it is not working for NOT IN SUBQUERY because it is not
> transformed into ANTI JOIN (aka NOT EXISTS clause) even if it possible,
> see: https://commitfest.postgresql.org/27/2023/
> >> >> >> >>>
> >> >> >> >>> I didn't do any performance benchmarks yet but I guess if
> users set AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather
> than default 30 it could improve performance and reduce number of
> DeadLocks, however the table size will increase but I think we don't do any
> maintenance job for other tables.
> >> >> >> >>>
> >> >> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> Skeleton #2:
> >> >> >> >>>
> >> >> >> >>> We have a k8s_pod_yaml field which is exclusively used by K8S
> executors.
> >> >> >> >>>
> >> >> >> >>> Should we also decouple this field as part of AIP-51?
> >> >> >> >>>
> >> >> >> >>> ----
> >> >> >> >>> Best Wishes
> >> >> >> >>> Andrey Anshin
> >> >> >> >>>
>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Andrey Anshin <an...@taragol.is>.
Just for a record, I still try to create representative experiments.
Last time when I ran DAG with pretty big rendered_templates (about 10KB per
TI) with total of 10k TI I'd found after couple of day of execution that
postgres very efficient compress my dummy rendered templates and it didn't
move into the TOAST table (facepalm)

----
Best Wishes
*Andrey Anshin*



On Thu, 2 Feb 2023 at 05:58, Kaxil Naik <ka...@gmail.com> wrote:

> Maybe the “auto-maintenance” is true. Hopefully for not long though :)
>
> On Thu, 2 Feb 2023 at 01:41, Kaxil Naik <ka...@gmail.com> wrote:
>
>>
>> >Users often asked (slack, issues, discussions) to give the ability to
>>> auto-maintain tables/logs and receive the usual answer: "No, we don't give
>>> you this opportunity, please use something for Airflow Ecosystem Page". But
>>> on the other hand we have auto-maintenance only for a single table.
>>
>>
>> That is not true, we added "airflow db clean" command to give this
>> control to users.
>>
>> What about these tables: session, log, job? I expected the answer would
>>> be "They are not so specific."
>>
>> The RTIF can vary in size based on what is stored and is really useless
>> beyond last X for each task. RTIF also has a foreign key constraints
>> <https://github.com/apache/airflow/blob/d80b583db07197c8c3d0549a805e83ceaaf10d52/airflow/models/renderedtifields.py#L63-L73> on
>> TI and that is the most common table that is affected during the migrations
>> and can affect scheduling decisions. Log and Session tables aren't affected
>> by it.
>>
>> I will have a detailed reply if I manage to find the time, it has just
>> been too difficult
>>
>> On Tue, 31 Jan 2023 at 21:46, Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>>> BTW, I would really love to hear what the original authors have to say
>>> here. I am merely trying to put myself in their shoes and guess what
>>> the reasoning is :).
>>>
>>> I think this is really a question of: Do we want to keep all rendered
>>> arbitrary size rendered task instance fields in our database forever
>>> by default, same as other fields.
>>> I believe the original authors answered the question to be "no". And
>>> the "num_row" was a way to limit it.
>>>
>>> And I really do not want to "win" this argument, I just want to
>>> protect our users (and environment).
>>>
>>> There is (IMHO) currently a big difference between
>>> session/logg/task_instance fields and rendered task_instance fields
>>> that justify different behaviour.
>>>
>>> The former are generally fixed in max size of rows (this is one of the
>>> reasons we have limited string sizes in our DB) - to be able to limit
>>> them growing uncontrollably large. We simply do not keep arbitrary
>>> size data in those tables.
>>> On the other hand, the rendered task instance is arbitrary in size
>>> (JSONField) and the need for deletion accounts for the "worst" case
>>> scenario.
>>> Until we get rid of that "property" of the rendered task instance
>>> table, I think "just" skipping deletion of those fields without fixing
>>> the "worst" case scenario is not a good idea.
>>>
>>> Maybe in your test cases (and many others) those tables are not
>>> bigger, but I think the protection here is implemented to account for
>>> the case where the rendered task instance field is "big". I think the
>>> protection here is done for the cases where the rendered task instance
>>> fields are really "big".
>>>
>>> But very interestingly - if the rendered task instance is "big" then
>>> likely it is next to useless to be displayed in the Web UI in its
>>> entirety.
>>>
>>> So maybe you are actually right Andrey, Maybe we can skip deleting
>>> those and maybe we could solve it differently and apply the same rules
>>> as other tables?
>>>
>>> Let me then - constructively - propose another idea which actually
>>> might solve both yours and my own concerns. Maybe we can fix the
>>> "worst case" scenario differently? We do not have to limit the number
>>> of rows, we can limit the size of the row instead.
>>>
>>> Why don't we simply limit the size of the rendered task instance JSON
>>> and if they are too big (we can configure the maximum size), we will
>>> render something like (probably a bit more unique and not
>>> "accidentally triggerable"):
>>>
>>> {
>>>    "error": "This task instance has too large rendered task instances
>>> to display it"
>>> }
>>>
>>> And implement an escape hatch in the web server to handle it properly
>>> when displaying such "truncated" rendered task instance field.
>>>
>>> We should be able come up with a sensible max size that we might think
>>> makes sense when rendering it in the web UI.  And we could make the
>>> max size configurable by the user if they have a lot of GB to spare.
>>> We could even apply it automatically. If there is a max_num_row_limit
>>> - we allow any size, if we have no limit on the number of rows, we
>>> limit the maximum row size.
>>>
>>> If we have such "upper-bounded" reasonable size of each row in that
>>> table - then I am perfectly happy with not deleting the rows
>>> automatically.
>>> But only if we limit that and handle the "worst" case properly.
>>>
>>> That would be my proposal how we can handle it to get both views taken
>>> into account.
>>>
>>> What do you Andrey (and others) think? Does it make sense? Or do we
>>> think we should not have any such protections in place ?
>>>
>>> J.
>>>
>>>
>>> On Tue, Jan 31, 2023 at 9:43 PM Andrey Anshin <an...@taragol.is>
>>> wrote:
>>> >
>>> > > I think that is all something to be tested with explaining plans.
>>> >
>>> > I would be really careful with these results. DELETE in Transaction
>>> with Rollback usually shows more optimistic than actually executed.
>>> >
>>> >
>>> > > I think we would not know before we try - and possibly there are
>>> other
>>> > optimisation approaches. The optimisation I proposed was only first
>>> > that came to my mind to avoid the "not in" query. The problem  with
>>> > "not in query" is that there is no way to optimise it by the DB.
>>> > Effectively you have to get every record (or index entry) and test it.
>>> > Maybe it can be done better :). And yes locking the index with
>>> > anti-insert locks and the need to rebalance trees during the delete is
>>> > a concern.
>>> >
>>> > My point is still the same, I would rather remove it in the future or
>>> make policy about maintenance more consistent: all or nothing. Right now we
>>> are close to nothing rather than all.
>>> >
>>> >
>>> > > It's not about index size or JSON access. It is about the size of the
>>> > actual rows and storage it takes - i.e. general size of the database.
>>> >
>>> > I'm tired, but I'm not sure that I understand the actual point.
>>> > Is it not really a matter of size of the table if you always access by
>>> pattern:
>>> > 1 request which returns exactly 1 record accessed by a unique index.
>>> > Basically query travercial by index find reference to single
>>> CTID/rowid (or whatever name used in other RDBMS).
>>> > So at this point it really matters how fast your index grows rather
>>> than table size.
>>> >
>>> >
>>> > > The problem with it is that (especially with dynamic task mapping),
>>> it
>>> > might grow really, really fast. Basically you have NUM_DAGS x
>>> > NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
>>> > of records there.
>>> > Back-of-the envelope calculation Assuming you have a
>>> > DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
>>> > fields, each field evaluating to 1K string.  Then you have 10 tasks *
>>> > 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
>>> > per one(!) run of one(1) DAG. Run it every 10 minutes and every day
>>> > your database from a single DAG grows by whooping 1.44 GB of data
>>> > every single day (from single DAG).
>>> >
>>> > Depends on DB. If we talk about Postgres you could easily miss up to
>>> 3-4 times (thanks for inline compression before/instead of TOASTs).
>>> > I have a couple questions:
>>> > 1. Do you know how big would be indexes in
>>> rendered_task_instance_fields after one day? (Spoiler alert I could give
>>> estimation in the morning)
>>> > 2. In this scenario with default settings always would keep up to 30
>>> 000 TI for this DAG.
>>> > Could someone who wants to optimize the query make it more optimal
>>> rather than access the table by index (Unique Index -> ctid/rowid - record)
>>> and where this crosspoint?
>>> >
>>> > > This is of course an estimation that assumes a lot, but it's not at
>>> all unrealistic.
>>> >
>>> > 144,000 TI per/day on single DAG (initially I want to put here some
>>> sarcastic message).
>>> > How would Airflow feel with 144k Task Instances per day? How
>>> > I asked because right now I've always had a task_instance table bigger
>>> than rendered_task_instance_fields.
>>> >
>>> >
>>> > > This table is very specific compared with the other tables. The only
>>> reason for it being here is to be able
>>> > to show the rendered fields in the UI if you go to the specific run of
>>> a task. If you clean-up other tables you basically lose the history of
>>> execution of the tasks and you cannot really know if the data has been
>>> > processed, you cannot do backfills effectively, you lose all the
>>> > context. Cleaning this table is merely about the values that have been
>>> > rendered for a specific run and the assumption there is that the older
>>> > it gets, the less interesting it is.
>>> >
>>> > What about these tables: session, log, job? I expected the answer
>>> would be "They are not so specific."
>>> > For me every table is specific for their purpose.
>>> > Users often asked (slack, issues, discussions) to give the ability to
>>> auto-maintain tables/logs and receive the usual answer: "No, we don't give
>>> you this opportunity, please use something for Airflow Ecosystem Page". But
>>> on the other hand we have auto-maintenance only for a single table.
>>> >
>>> > >> It is opposite of what we have right now, we scan tables (maybe
>>> multiple times), read all records tuples which contain JSON.
>>> > > Not sure if I get the point here :). Yes -in my proposal I think the
>>> records will not be touched - only indexes. So the cleanup should be way
>>> faster, contentions less of problem, due to the way the delete
>>> > uses < ordering, deadlocks will not be possible at all (as opposed to
>>> the current "not in" - there is a very easy way to get into deadlocks when
>>> two parallel deletes are trying to delete same rows in a different
>>> sequence. I think my proposal improves all the characteristics of the
>>> "cleanup" with very little extra penalty on record creation.
>>> >
>>> > I was talking about the current solution and why it is also slow (and
>>> if abstract users use some DBaaS where they also pay for IOPs then it is
>>> expensive). Let's talk about the benefits of optimised queries for 4
>>> different DB backends (3 if excluding SQLite) when we have it.
>>> >
>>> > > We pay for table/index size linary more records, more size. But
>>> other operations vary and depend on B-Tree implementation and usually it
>>> has logarithmic growth. Or do we worry only about table/toast/index size on
>>> disk?
>>> > >> Yep. I (and I believe the original author had the same worry) am
>>> worried a lot about the size of the table and the fact that this table will
>>> be by far the biggest table in our DB while most of the old records will
>>> never be touched. And by the fact that this is the only table that our
>>> users will have to know about to clean up separately from all others pretty
>>> much always.
>>> >
>>> > Same as previous.
>>> >
>>> > > If not even worrying about money spent by our users, and performance
>>> degradation that comes with databases that are bigger - that's a lot of
>>> environmental effects that we might incur. Airflow is heavily used, if
>>> suddenly all our users
>>> > will start having 10 bigger databases that they have now because we
>>> will deprecate the values and keep all the history, then we have a big
>>> number of extra disks that will have to be used. I'd strongly prefer a
>>> solution where we keep the data usage lower in this case.
>>> >
>>> > Am I right that this is all about "lets don't delete by default as we
>>> do for other tables" rather than the current default implementation?
>>> > Because I get the result which is opposite what you said. And
>>> rendered_task_instance_fields don't grow faster than other tables that what
>>> I got.
>>> > I would like to compare it with other findings and some reproducible
>>> metrics rather than with hypothetical things.
>>> >
>>> > ----
>>> > Best Wishes
>>> > Andrey Anshin
>>> >
>>> >
>>> >
>>> > On Tue, 31 Jan 2023 at 11:12, Jarek Potiuk <ja...@potiuk.com> wrote:
>>> >>
>>> >> COMMENT: While writing the answer here, I think I found a deeper
>>> >> problem (and optimisation needed)  - i.e I think the delete should be
>>> >> even more fine-grained than it is today and include map_index) -
>>> >> please take a look at the end (Also maybe TP might comment on that
>>> >> one).
>>> >>
>>> >> > 1. Additional indexes add additional performance degradation on
>>> Insert but gain potential improvements on delete and unknown on update,
>>> RDBMS still require rebalance index and make it consistent to the table.
>>> >> > 2. LIMIT x OFFSET y could easily become full seq scan, especially
>>> if the user set a huge number for offset (which? unknown).
>>> >> > 3. Mixing two indexes could improve performance in a single query
>>> but in concurrent execution might lead to degradation because it needs to
>>> create a bitmap table for comparison between these two indexes, as result
>>> it might lead different issues, such as OOM on DB backend, use swaps or
>>> optimiser decided that better not to use this indexes.
>>> >>
>>> >> I think that is all something to be tested with explain plans. I think
>>> >> we would not know before we try - and possibly there are other
>>> >> optimisation approaches. The optimisation I proposed was only first
>>> >> that came to my mind to avoid the "not in" query. The problem  with
>>> >> "not in query" is that there is no way to optimise it by the DB.
>>> >> Effectively you have to get every record (or index entry) and test it.
>>> >> Maybe it can be done better :). And yes locking the index with
>>> >> anti-insert locks and the need to rebalance trees during the delete is
>>> >> a concern.
>>> >>
>>> >> > Is it a real problem? Until we access only by indexes, which
>>> doesn't include this JSON, it really doesn't matter. I guess we almost
>>> always should make a UNIQUE INDEX SCAN for SELECT or DELETE (UPDATE) a
>>> single record.
>>> >>
>>> >> Yes I think so, and while. I was not the author of this "cleanup"
>>> >> code, I believe I know the intention.
>>> >>
>>> >> It's not about index size or JSON access. It is about the size of the
>>> >> actual rows and storage it takes - i.e. general size of the database.
>>> >> The problem with it is that (especially with dynamic task mapping), it
>>> >> might grow really, really fast. Basically you have NUM_DAGS x
>>> >> NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
>>> >> of records there. Back-of-the envelope calculation Assuming you have a
>>> >> DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
>>> >> fields, each field evaluating to 1K string.  Then you have 10 tasks *
>>> >> 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
>>> >> per one(!) run of one(1) DAG. Run it every 10 minutes and every day
>>> >> your database from a single DAG grows by whooping 1.44 GB of data
>>> >> every single day (from single DAG).This is of course an estimation
>>> >> that assumes a lot, but it's not at all unrealistic. That's a lot. And
>>> >> if we want the user to do the cleanup then a) they need to know it b)
>>> >> they need to specifically clean up this table only because all the
>>> >> other data is relatively small. This table is very specific compared
>>> >> with the other tables. The only reason for it being here is to be able
>>> >> to show the rendered fields in the UI if you go to the specific run of
>>> >> a task. If you clean-up other tables you basically lose the history of
>>> >> execution of the tasks and you cannot really know if the data has been
>>> >> processed, you cannot do backfills effectively, you lose all the
>>> >> context. Cleaning this table is merely about the values that have been
>>> >> rendered for a specific run and the assumption there is that the older
>>> >> it gets, the less interesting it is.
>>> >>
>>> >> > It is opposite of what we have right now, we scan tables (maybe
>>> multiple times), read all records tuples which contain JSON.
>>> >>
>>> >> Not sure if I get the point here :). Yes -in my proposal I think the
>>> >> records will not be touched - only indexes. So the cleanup should be
>>> >> way faster, contentions less of problem, due to the way the delete
>>> >> uses < ordering, deadlocks will not be possible at all (as opposed to
>>> >> the current "not in" - there is a very easy way to get into deadlocks
>>> >> when two parallel deletes are trying to delete same rows in a
>>> >> different sequence. I think my proposal improves all the
>>> >> characteristics of the "cleanup" with very little extra penalty on
>>> >> record creation.
>>> >>
>>> >> > We pay for table/index size linary more records, more size. But
>>> other operations vary and depend on B-Tree implementation and usually it
>>> has logarithmic growth. Or do we worry only about table/toast/index size on
>>> disk?
>>> >>
>>> >> Yep. I (and I believe the original author had the same worry) am
>>> >> worried a lot about the size of the table and the fact that this table
>>> >> will be by far the biggest table in our DB while most of the old
>>> >> records will never be touched. And by the fact that this is the only
>>> >> table that our users will have to know about to clean up separately
>>> >> from all others pretty much always. If not even worrying about money
>>> >> spent by our users, and performance degradation that comes with
>>> >> databases that are bigger - that's a lot of environmental effects that
>>> >> we might incur. Airflow is heavily used, if suddenly all our users
>>> >> will start having 10 bigger databases that they have now because we
>>> >> will deprecate the values and keep all the history, then we have a big
>>> >> number of extra disks that will have to be used. I'd strongly prefer a
>>> >> solution where we keep the data usage lower in this case.
>>> >>
>>> >> > If we do not want to grant users the ability to clean up rendered
>>> templates tables, there could be another option:
>>> >> > - Do not delete records on every task instance run.
>>> >> > - Delete once per defined period (hourly, daily, weekly, monthly).
>>> In this case you really could not care about locks.
>>> >>
>>> >> Yes we could come up with a different strategy as to "when" run the
>>> >> cleanup. This is also a viable option. If you can propose one that
>>> >> will be equally adaptive as the current solution, I am all ears.
>>> >> Basically my goal is to keep the usage of the table low, possibly
>>> >> controlled by the same parameter we had. How we do it - this is a
>>> >> different story. If we - for example add a thread in the scheduler
>>> >> (for example) that performs such cleanup effectively in parallel and
>>> >> scales, I am happy with that.
>>> >>
>>> >> But I am trying to get into the head of the author trying to
>>> >> understand why the original implementation was done this way. I
>>> >> believe (and maybe those who remember it better could confirm it) that
>>> >> distributing the deletion to tasks to clean up after itself is a
>>> >> better idea than centralising the cleanup. This makes each cleanup
>>> >> smaller, locks are held for a shorter time (at least that was the
>>> >> assumption where no full table scan was used), it is more "immediate"
>>> >> and you do not have to decide upfront what should be the cleanup
>>> >> frequency. It seems this is the best logical approach to keep the
>>> >> "MAX_NUM_RENDERED_TI_FIELDS_PER_TASK" promise. Simply after task is
>>> >> complete, you can be sure that there are no more than this number of
>>> >> fields per task in the DB. With a scheduled run, that would be a much
>>> >> more "eventual" consistency and it will be potentially fluctuating
>>> >> much more.
>>> >>
>>> >> But there are risks involved in having a single thread doing the
>>> >> cleanup. I think it has a huge risk of being a "stop-the world" and
>>> >> "deadlock-prone" kind of event - if in big instances there are a lot
>>> >> of rows to cleanup in a single pass. When you delete entries from a
>>> >> table, there are anti-insert locks created for existing index entries,
>>> >> which makes it possible to rollback the whole DELETE transaction.
>>> >> Which means that when you try to insert the row with the same index,
>>> >> the index will be held. And this means that when you run a single huge
>>> >> DELETE for multiple rows affected with multiple (all?) index keys
>>> >> matching select query, it will effectively prevent new rows with the
>>> >> same indexes that are matching the SELECT. It would mean that if you
>>> >> have some tasks running while deleting existing run_id rendered
>>> >> fields, then you could REALLY start having deadlocks on those tasks
>>> >> trying to insert rendered task instance rows. That's why I think the
>>> >> only viable strategy for single "cleanup" thread is to do such cleanup
>>> >> as separate DELETE for each of the "dag/task/map_index/run" - in order
>>> >> to avoid such deadlocks. Which effectively will turn into what have
>>> >> currently - only that currently those transactions are done by tasks,
>>> >> not by a single cleanup thread.
>>> >>
>>> >> Also using tasks to delete old rows is more "effective" when you have
>>> >> vast differences in frequency of DAGs. Naturally - when you do it in
>>> >> task, you will only do it "when needed" for given DAG + Task. If you
>>> >> try to centralize the cleanup, unless you include somehow schedule and
>>> >> frequency of each dag, you are going to check every DAG every time
>>> >> your run the cleanup - no matter if that DAG is run daily or every
>>> >> minute, you will have to run the cleanup frequently enough to match
>>> >> your most frequent dags. If you have 1000 dags that run hourly and one
>>> >> DAG that runs every minue, then you have to run a cleanup job that
>>> >> scans all DAGs every few minutes. That's a big waste.
>>> >>
>>> >> So I am not sure if we gain anything by centralizing the cleanup.
>>> >> Decentralising it to Task seems to be a well thought and sound
>>> >> decision (but I think the problem we have now is that we need to
>>> >> optimize it after Dynamic Task Mapping has been added).
>>> >>
>>> >> ANOTHER FINDING:
>>> >>
>>> >> While looking at the code and discussing it and looking more closely I
>>> >> **think** there is another problem that we have to fix regardless of a
>>> >> solution. I THINK a problem we might have now is that we do not
>>> >> include map_index in this DELETE. While we are curreently delete all
>>> >> the rendered task fields without including map_index - and for big
>>> >> dynamic tasks, it means that exacly the same DELETE query is run by
>>> >> every single mapped instance of that tasks and that is where a lot of
>>> >> contention and locking might happen (basically when single task
>>> >> instance does the delete, anti-insert locks held the other mapped
>>> >> instances of the same task from inserting rendered fields).
>>> >>
>>> >> It does not change much in the optimisation proposal of mine, other
>>> >> than we should include map_index in those queries. But I think this
>>> >> might cause a lot of delays in the current implementation.
>>> >>
>>> >> J.
>>> >>
>>> >> > ----
>>> >> > Best Wishes
>>> >> > Andrey Anshin
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Mon, 30 Jan 2023 at 23:51, Jarek Potiuk <ja...@potiuk.com>
>>> wrote:
>>> >> >>
>>> >> >> I think there is a good reason to clean those up automatically.
>>> >> >> rendered task instance fields are almost arbitrary in size. If we
>>> try
>>> >> >> to keep all historical values there by default, there are numerous
>>> >> >> cases it will grow very fast - far, far too quickly.
>>> >> >>
>>> >> >> And I am not worried at all about locks on this table if we do it
>>> the
>>> >> >> way I described it and it uses the indexes. The contention this way
>>> >> >> might only be between the two deleting tasks. and with the query I
>>> >> >> proposed, they will only last for a short time - the index will be
>>> >> >> locked when two DELETES  or SELECT DISTINCT - which should both be
>>> >> >> fast.
>>> >> >>
>>> >> >>
>>> >> >> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <
>>> andrey.anshin@taragol.is> wrote:
>>> >> >> >
>>> >> >> > I guess two things involved to reduce performance on this query
>>> through the time: Dynamic Task Mapping and run_id instead of execution date.
>>> >> >> >
>>> >> >> > I still personally think that changing the default value from 30
>>> to 0 might improve performance of multiple concurrent tasks, just because
>>> this query does not run and there are no locks on multiple records/pages.
>>> >> >> >
>>> >> >> > I do not have any proof (yet?) other than simple DAGs. I think
>>> that there is some cross point exists when keeping this table growth worse
>>> rather than cleanup for each TI run. But users have ability to cleanup
>>> table by execute airflow db clean which should improve performance again
>>> >> >> >
>>> >> >> > And also there is interesting behavior with this query: if user
>>> already have more that value specified by
>>> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill
>>> than rendered templates not written to table (or may be inserted and after
>>> that immediately deleted), the same is valid for cleanup old tasks.
>>> >> >> >
>>> >> >> > ----
>>> >> >> > Best Wishes
>>> >> >> > Andrey Anshin
>>> >> >> >
>>> >> >> >
>>> >> >> >
>>> >> >> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com>
>>> wrote:
>>> >> >> >>
>>> >> >> >> Yep. Agree this is not an efficient query and dynamic task
>>> mapping
>>> >> >> >> makes the effect much worse. Generally speaking, selecting "what
>>> >> >> >> should be left" and then deleting stuff where the key is "not
>>> in" is
>>> >> >> >> never an efficient way of running an sql query.  And the query
>>> not
>>> >> >> >> using index at all makes it rather terrible.
>>> >> >> >>
>>> >> >> >> I think we should not deprecate it though, but find a more
>>> efficient
>>> >> >> >> way of deleting the old keys. I think we could slightly
>>> denormalize
>>> >> >> >> RenderedTaskInstance + DagRun tables, and add
>>> DAG_RUN_EXECUTION_DATE
>>> >> >> >> to the RenderedTaskInstance table and that will be enough to
>>> optimise
>>> >> >> >> it.
>>> >> >> >>
>>> >> >> >> Then we could have either:
>>> >> >> >>
>>> >> >> >> * a composite B-TREE indexed (non-unique) index on DAG_ID,
>>> TASK_ID,
>>> >> >> >> RUN_ID_EXECUTION_DATE
>>> >> >> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and
>>> separate
>>> >> >> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
>>> >> >> >>
>>> >> >> >> Probably the latter is better as I am not sure how < , >
>>> comparison
>>> >> >> >> looks like for composite B-TREE indexes when char + date
>>> columns are
>>> >> >> >> mixed. Also we could have hit the infamous MySQL index key
>>> length
>>> >> >> >> limit.
>>> >> >> >>
>>> >> >> >> Then deletion process would look roughly like:
>>> >> >> >>
>>> >> >> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
>>> >> >> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
>>> >> >> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
>>> >> >> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
>>> >> >> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
>>> >> >> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID
>>> =<DAG_ID>,
>>> >> >> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
>>> >> >> >>
>>> >> >> >> I believe that would be fast, and it would use the B-TREE index
>>> >> >> >> features nicely (ordering support)
>>> >> >> >>
>>> >> >> >> J
>>> >> >> >>
>>> >> >> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <
>>> andrey.anshin@taragol.is> wrote:
>>> >> >> >> >
>>> >> >> >> > First of all I want to highlight that this approach I guess
>>> worked well until Dynamic Task Mappings introduced.
>>> >> >> >> >
>>> >> >> >> > > The main reason for adding that cleanup was -- if you don't
>>> do that, you will have many rows, similar to the TaskInstance table
>>> >> >> >> >
>>> >> >> >> > The problem itself is not how big your table/indexes, rather
>>> then what kind of operation you run.
>>> >> >> >> >
>>> >> >> >> > > Do you have any data for locks or performance degradation?
>>> >> >> >> >
>>> >> >> >> > In this case if we try to clean up
>>> rendered_task_instance_fields table when a new TI is created/cleared we
>>> make almost two full/sequential scans (note: need to check) against the
>>> table without any index usage, so we pay here a couple times:
>>> >> >> >> > 1. We scan without indexes - not all parts of the composite
>>> key are included to query, plus we need to filter everything except 30
>>> records with order and distinct
>>> >> >> >> > 2. After that we make another full scan for find 1 record or
>>> map_size records
>>> >> >> >> >
>>> >> >> >> > And I guess the situation becomes worse if you have a lot of
>>> tasks, even if we have a small table, we need to do ineffective operations.
>>> >> >> >> >
>>> >> >> >> > That how looks like Query Plan (please note without commit
>>> transaction DELETE operation doesn't have all information):
>>> https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
>>> >> >> >> >
>>> >> >> >> > In case if we do not clean up the table, we only use these
>>> operations:
>>> >> >> >> > 1. SELECT single record by index
>>> >> >> >> > 2. INSERT new record
>>> >> >> >> > 3. DELETE old record(s), which were found by index.
>>> >> >> >> >
>>> >> >> >> > I have not done any real tests yet, only synthetic DAGs (so
>>> we should not consider to use any findings as totally truth):
>>> https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
>>> >> >> >> > DAG with parallel tasks: degradation up to 2-3 times
>>> >> >> >> > DAG with single map tasks: degradation up to 7-10 times
>>> >> >> >> >
>>> >> >> >> > I have a plan for more complex and more close to real use
>>> cases with Database which do not have network latency almost 0 as I have in
>>> my local.
>>> >> >> >> > But I will not refuse if someone also does their tests with
>>> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
>>> >> >> >> >
>>> >> >> >> > About deadlock we know that it exists at least in MySQL:
>>> https://github.com/apache/airflow/pull/18616
>>> >> >> >> >
>>> >> >> >> > > And the larger tables create problems during database
>>> migrations.
>>> >> >> >> >
>>> >> >> >> > That is a very good point, so if we found that problem only
>>> related to migrations we could:
>>> >> >> >> > 1. Cleanup this table in migration
>>> >> >> >> > 2. Add cli command to airflow db which could cleanup only
>>> rendered fields, so it would be user's choice cleanup or not before
>>> migration, do periodical maintenance or not
>>> >> >> >> >
>>> >> >> >> >
>>> >> >> >> > ----
>>> >> >> >> > Best Wishes
>>> >> >> >> > Andrey Anshin
>>> >> >> >> >
>>> >> >> >> >
>>> >> >> >> >
>>> >> >> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com>
>>> wrote:
>>> >> >> >> >>>
>>> >> >> >> >>> Potentially it is a good idea to deprecate this option and
>>> recommend for users to set it to 0? WDYT? Maybe someone has already tried
>>> or investigated this?
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >> >> The main reason for adding that cleanup was -- if you don't
>>> do that, you will have many rows, similar to the TaskInstance table. And
>>> the RenderedTIFields were mainly added for checking rendered TI fields on
>>> the Webserver only because after DAG Serialization, the webserver won't
>>> have access to DAG files.
>>> >> >> >> >>
>>> >> >> >> >> And the larger tables create problems during database
>>> migrations.
>>> >> >> >> >>
>>> >> >> >> >> Do you have any data for locks or performance degradation?
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <
>>> andrey.anshin@taragol.is> wrote:
>>> >> >> >> >>>
>>> >> >> >> >>> Greetings!
>>> >> >> >> >>>
>>> >> >> >> >>> During migrating our ORM syntax to compatible with
>>> SQLAlchemy 2.0 I probably found skeletons in the closet.
>>> >> >> >> >>>
>>> >> >> >> >>> Let's start from the beginning, initially I got this warning
>>> >> >> >> >>>
>>> >> >> >> >>> airflow/models/renderedtifields.py:245
>>> RemovedIn20Warning('ORDER BY columns added implicitly due to DISTINCT is
>>> deprecated and will be removed in SQLAlchemy 2.0.  SELECT statements with
>>> DISTINCT should be written to explicitly include the appropriate columns in
>>> the columns clause (Background on SQLAlchemy 2.0 at:
>>> https://sqlalche.me/e/b8d9)')
>>> >> >> >> >>>
>>> >> >> >> >>> "OK let's fix it!", I thought at first and started to
>>> investigate RenderedTaskInstanceFields model
>>> >> >> >> >>>
>>> >> >> >> >>> Skeleton #1:
>>> >> >> >> >>>
>>> >> >> >> >>> When I first time look on the code and comments it got me
>>> to thinking that part which keep only latest N Rendered Task Fields
>>> potentially could lead different performance degradation (Locks, Dead
>>> Locks, Data Bloating): see code
>>> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
>>> >> >> >> >>>
>>> >> >> >> >>> Also this historical part (from Airflow 1.10.10) generate
>>> this SQL Statement (pg backend)
>>> >> >> >> >>>
>>> >> >> >> >>> DELETE FROM rendered_task_instance_fields
>>> >> >> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
>>> >> >> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
>>> >> >> >> >>>   AND (
>>> >> >> >> >>>     (
>>> >> >> >> >>>       rendered_task_instance_fields.dag_id,
>>> >> >> >> >>>       rendered_task_instance_fields.task_id,
>>> >> >> >> >>>       rendered_task_instance_fields.run_id
>>> >> >> >> >>>     ) NOT IN (
>>> >> >> >> >>>       SELECT
>>> >> >> >> >>>         anon_1.dag_id,
>>> >> >> >> >>>         anon_1.task_id,
>>> >> >> >> >>>         anon_1.run_id
>>> >> >> >> >>>       FROM
>>> >> >> >> >>>         (
>>> >> >> >> >>>           SELECT DISTINCT
>>> >> >> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
>>> >> >> >> >>>             rendered_task_instance_fields.task_id AS
>>> task_id,
>>> >> >> >> >>>             rendered_task_instance_fields.run_id AS run_id,
>>> >> >> >> >>>             dag_run.execution_date AS execution_date
>>> >> >> >> >>>           FROM rendered_task_instance_fields
>>> >> >> >> >>>             JOIN dag_run ON
>>> rendered_task_instance_fields.dag_id = dag_run.dag_id
>>> >> >> >> >>>             AND rendered_task_instance_fields.run_id =
>>> dag_run.run_id
>>> >> >> >> >>>           WHERE
>>> >> >> >> >>>             rendered_task_instance_fields.dag_id =
>>> %(dag_id_2) s
>>> >> >> >> >>>             AND rendered_task_instance_fields.task_id =
>>> %(task_id_2) s
>>> >> >> >> >>>           ORDER BY
>>> >> >> >> >>>             dag_run.execution_date DESC
>>> >> >> >> >>>           limit %(param_1) s
>>> >> >> >> >>>         ) AS anon_1
>>> >> >> >> >>>     )
>>> >> >> >> >>>   )
>>> >> >> >> >>>
>>> >> >> >> >>> Which is especially not effective in PostgreSQL. When IN
>>> SUBQUERY could be easily transform internaly into SEMI-JOIN (aka EXISTS
>>> clause), but it is not working for NOT IN SUBQUERY because it is not
>>> transformed into ANTI JOIN (aka NOT EXISTS clause) even if it possible,
>>> see: https://commitfest.postgresql.org/27/2023/
>>> >> >> >> >>>
>>> >> >> >> >>> I didn't do any performance benchmarks yet but I guess if
>>> users set AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather
>>> than default 30 it could improve performance and reduce number of
>>> DeadLocks, however the table size will increase but I think we don't do any
>>> maintenance job for other tables.
>>> >> >> >> >>>
>>> >> >> >> >>> Potentially it is a good idea to deprecate this option and
>>> recommend for users to set it to 0? WDYT? Maybe someone has already tried
>>> or investigated this?
>>> >> >> >> >>>
>>> >> >> >> >>>
>>> >> >> >> >>> Skeleton #2:
>>> >> >> >> >>>
>>> >> >> >> >>> We have a k8s_pod_yaml field which is exclusively used by
>>> K8S executors.
>>> >> >> >> >>>
>>> >> >> >> >>> Should we also decouple this field as part of AIP-51?
>>> >> >> >> >>>
>>> >> >> >> >>> ----
>>> >> >> >> >>> Best Wishes
>>> >> >> >> >>> Andrey Anshin
>>> >> >> >> >>>
>>>
>>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Kaxil Naik <ka...@gmail.com>.
Maybe the “auto-maintenance” is true. Hopefully for not long though :)

On Thu, 2 Feb 2023 at 01:41, Kaxil Naik <ka...@gmail.com> wrote:

>
> >Users often asked (slack, issues, discussions) to give the ability to
>> auto-maintain tables/logs and receive the usual answer: "No, we don't give
>> you this opportunity, please use something for Airflow Ecosystem Page". But
>> on the other hand we have auto-maintenance only for a single table.
>
>
> That is not true, we added "airflow db clean" command to give this control
> to users.
>
> What about these tables: session, log, job? I expected the answer would be
>> "They are not so specific."
>
> The RTIF can vary in size based on what is stored and is really useless
> beyond last X for each task. RTIF also has a foreign key constraints
> <https://github.com/apache/airflow/blob/d80b583db07197c8c3d0549a805e83ceaaf10d52/airflow/models/renderedtifields.py#L63-L73> on
> TI and that is the most common table that is affected during the migrations
> and can affect scheduling decisions. Log and Session tables aren't affected
> by it.
>
> I will have a detailed reply if I manage to find the time, it has just
> been too difficult
>
> On Tue, 31 Jan 2023 at 21:46, Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> BTW, I would really love to hear what the original authors have to say
>> here. I am merely trying to put myself in their shoes and guess what
>> the reasoning is :).
>>
>> I think this is really a question of: Do we want to keep all rendered
>> arbitrary size rendered task instance fields in our database forever
>> by default, same as other fields.
>> I believe the original authors answered the question to be "no". And
>> the "num_row" was a way to limit it.
>>
>> And I really do not want to "win" this argument, I just want to
>> protect our users (and environment).
>>
>> There is (IMHO) currently a big difference between
>> session/logg/task_instance fields and rendered task_instance fields
>> that justify different behaviour.
>>
>> The former are generally fixed in max size of rows (this is one of the
>> reasons we have limited string sizes in our DB) - to be able to limit
>> them growing uncontrollably large. We simply do not keep arbitrary
>> size data in those tables.
>> On the other hand, the rendered task instance is arbitrary in size
>> (JSONField) and the need for deletion accounts for the "worst" case
>> scenario.
>> Until we get rid of that "property" of the rendered task instance
>> table, I think "just" skipping deletion of those fields without fixing
>> the "worst" case scenario is not a good idea.
>>
>> Maybe in your test cases (and many others) those tables are not
>> bigger, but I think the protection here is implemented to account for
>> the case where the rendered task instance field is "big". I think the
>> protection here is done for the cases where the rendered task instance
>> fields are really "big".
>>
>> But very interestingly - if the rendered task instance is "big" then
>> likely it is next to useless to be displayed in the Web UI in its
>> entirety.
>>
>> So maybe you are actually right Andrey, Maybe we can skip deleting
>> those and maybe we could solve it differently and apply the same rules
>> as other tables?
>>
>> Let me then - constructively - propose another idea which actually
>> might solve both yours and my own concerns. Maybe we can fix the
>> "worst case" scenario differently? We do not have to limit the number
>> of rows, we can limit the size of the row instead.
>>
>> Why don't we simply limit the size of the rendered task instance JSON
>> and if they are too big (we can configure the maximum size), we will
>> render something like (probably a bit more unique and not
>> "accidentally triggerable"):
>>
>> {
>>    "error": "This task instance has too large rendered task instances
>> to display it"
>> }
>>
>> And implement an escape hatch in the web server to handle it properly
>> when displaying such "truncated" rendered task instance field.
>>
>> We should be able come up with a sensible max size that we might think
>> makes sense when rendering it in the web UI.  And we could make the
>> max size configurable by the user if they have a lot of GB to spare.
>> We could even apply it automatically. If there is a max_num_row_limit
>> - we allow any size, if we have no limit on the number of rows, we
>> limit the maximum row size.
>>
>> If we have such "upper-bounded" reasonable size of each row in that
>> table - then I am perfectly happy with not deleting the rows
>> automatically.
>> But only if we limit that and handle the "worst" case properly.
>>
>> That would be my proposal how we can handle it to get both views taken
>> into account.
>>
>> What do you Andrey (and others) think? Does it make sense? Or do we
>> think we should not have any such protections in place ?
>>
>> J.
>>
>>
>> On Tue, Jan 31, 2023 at 9:43 PM Andrey Anshin <an...@taragol.is>
>> wrote:
>> >
>> > > I think that is all something to be tested with explaining plans.
>> >
>> > I would be really careful with these results. DELETE in Transaction
>> with Rollback usually shows more optimistic than actually executed.
>> >
>> >
>> > > I think we would not know before we try - and possibly there are other
>> > optimisation approaches. The optimisation I proposed was only first
>> > that came to my mind to avoid the "not in" query. The problem  with
>> > "not in query" is that there is no way to optimise it by the DB.
>> > Effectively you have to get every record (or index entry) and test it.
>> > Maybe it can be done better :). And yes locking the index with
>> > anti-insert locks and the need to rebalance trees during the delete is
>> > a concern.
>> >
>> > My point is still the same, I would rather remove it in the future or
>> make policy about maintenance more consistent: all or nothing. Right now we
>> are close to nothing rather than all.
>> >
>> >
>> > > It's not about index size or JSON access. It is about the size of the
>> > actual rows and storage it takes - i.e. general size of the database.
>> >
>> > I'm tired, but I'm not sure that I understand the actual point.
>> > Is it not really a matter of size of the table if you always access by
>> pattern:
>> > 1 request which returns exactly 1 record accessed by a unique index.
>> > Basically query travercial by index find reference to single CTID/rowid
>> (or whatever name used in other RDBMS).
>> > So at this point it really matters how fast your index grows rather
>> than table size.
>> >
>> >
>> > > The problem with it is that (especially with dynamic task mapping), it
>> > might grow really, really fast. Basically you have NUM_DAGS x
>> > NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
>> > of records there.
>> > Back-of-the envelope calculation Assuming you have a
>> > DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
>> > fields, each field evaluating to 1K string.  Then you have 10 tasks *
>> > 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
>> > per one(!) run of one(1) DAG. Run it every 10 minutes and every day
>> > your database from a single DAG grows by whooping 1.44 GB of data
>> > every single day (from single DAG).
>> >
>> > Depends on DB. If we talk about Postgres you could easily miss up to
>> 3-4 times (thanks for inline compression before/instead of TOASTs).
>> > I have a couple questions:
>> > 1. Do you know how big would be indexes in
>> rendered_task_instance_fields after one day? (Spoiler alert I could give
>> estimation in the morning)
>> > 2. In this scenario with default settings always would keep up to 30
>> 000 TI for this DAG.
>> > Could someone who wants to optimize the query make it more optimal
>> rather than access the table by index (Unique Index -> ctid/rowid - record)
>> and where this crosspoint?
>> >
>> > > This is of course an estimation that assumes a lot, but it's not at
>> all unrealistic.
>> >
>> > 144,000 TI per/day on single DAG (initially I want to put here some
>> sarcastic message).
>> > How would Airflow feel with 144k Task Instances per day? How
>> > I asked because right now I've always had a task_instance table bigger
>> than rendered_task_instance_fields.
>> >
>> >
>> > > This table is very specific compared with the other tables. The only
>> reason for it being here is to be able
>> > to show the rendered fields in the UI if you go to the specific run of
>> a task. If you clean-up other tables you basically lose the history of
>> execution of the tasks and you cannot really know if the data has been
>> > processed, you cannot do backfills effectively, you lose all the
>> > context. Cleaning this table is merely about the values that have been
>> > rendered for a specific run and the assumption there is that the older
>> > it gets, the less interesting it is.
>> >
>> > What about these tables: session, log, job? I expected the answer would
>> be "They are not so specific."
>> > For me every table is specific for their purpose.
>> > Users often asked (slack, issues, discussions) to give the ability to
>> auto-maintain tables/logs and receive the usual answer: "No, we don't give
>> you this opportunity, please use something for Airflow Ecosystem Page". But
>> on the other hand we have auto-maintenance only for a single table.
>> >
>> > >> It is opposite of what we have right now, we scan tables (maybe
>> multiple times), read all records tuples which contain JSON.
>> > > Not sure if I get the point here :). Yes -in my proposal I think the
>> records will not be touched - only indexes. So the cleanup should be way
>> faster, contentions less of problem, due to the way the delete
>> > uses < ordering, deadlocks will not be possible at all (as opposed to
>> the current "not in" - there is a very easy way to get into deadlocks when
>> two parallel deletes are trying to delete same rows in a different
>> sequence. I think my proposal improves all the characteristics of the
>> "cleanup" with very little extra penalty on record creation.
>> >
>> > I was talking about the current solution and why it is also slow (and
>> if abstract users use some DBaaS where they also pay for IOPs then it is
>> expensive). Let's talk about the benefits of optimised queries for 4
>> different DB backends (3 if excluding SQLite) when we have it.
>> >
>> > > We pay for table/index size linary more records, more size. But other
>> operations vary and depend on B-Tree implementation and usually it has
>> logarithmic growth. Or do we worry only about table/toast/index size on
>> disk?
>> > >> Yep. I (and I believe the original author had the same worry) am
>> worried a lot about the size of the table and the fact that this table will
>> be by far the biggest table in our DB while most of the old records will
>> never be touched. And by the fact that this is the only table that our
>> users will have to know about to clean up separately from all others pretty
>> much always.
>> >
>> > Same as previous.
>> >
>> > > If not even worrying about money spent by our users, and performance
>> degradation that comes with databases that are bigger - that's a lot of
>> environmental effects that we might incur. Airflow is heavily used, if
>> suddenly all our users
>> > will start having 10 bigger databases that they have now because we
>> will deprecate the values and keep all the history, then we have a big
>> number of extra disks that will have to be used. I'd strongly prefer a
>> solution where we keep the data usage lower in this case.
>> >
>> > Am I right that this is all about "lets don't delete by default as we
>> do for other tables" rather than the current default implementation?
>> > Because I get the result which is opposite what you said. And
>> rendered_task_instance_fields don't grow faster than other tables that what
>> I got.
>> > I would like to compare it with other findings and some reproducible
>> metrics rather than with hypothetical things.
>> >
>> > ----
>> > Best Wishes
>> > Andrey Anshin
>> >
>> >
>> >
>> > On Tue, 31 Jan 2023 at 11:12, Jarek Potiuk <ja...@potiuk.com> wrote:
>> >>
>> >> COMMENT: While writing the answer here, I think I found a deeper
>> >> problem (and optimisation needed)  - i.e I think the delete should be
>> >> even more fine-grained than it is today and include map_index) -
>> >> please take a look at the end (Also maybe TP might comment on that
>> >> one).
>> >>
>> >> > 1. Additional indexes add additional performance degradation on
>> Insert but gain potential improvements on delete and unknown on update,
>> RDBMS still require rebalance index and make it consistent to the table.
>> >> > 2. LIMIT x OFFSET y could easily become full seq scan, especially if
>> the user set a huge number for offset (which? unknown).
>> >> > 3. Mixing two indexes could improve performance in a single query
>> but in concurrent execution might lead to degradation because it needs to
>> create a bitmap table for comparison between these two indexes, as result
>> it might lead different issues, such as OOM on DB backend, use swaps or
>> optimiser decided that better not to use this indexes.
>> >>
>> >> I think that is all something to be tested with explain plans. I think
>> >> we would not know before we try - and possibly there are other
>> >> optimisation approaches. The optimisation I proposed was only first
>> >> that came to my mind to avoid the "not in" query. The problem  with
>> >> "not in query" is that there is no way to optimise it by the DB.
>> >> Effectively you have to get every record (or index entry) and test it.
>> >> Maybe it can be done better :). And yes locking the index with
>> >> anti-insert locks and the need to rebalance trees during the delete is
>> >> a concern.
>> >>
>> >> > Is it a real problem? Until we access only by indexes, which doesn't
>> include this JSON, it really doesn't matter. I guess we almost always
>> should make a UNIQUE INDEX SCAN for SELECT or DELETE (UPDATE) a single
>> record.
>> >>
>> >> Yes I think so, and while. I was not the author of this "cleanup"
>> >> code, I believe I know the intention.
>> >>
>> >> It's not about index size or JSON access. It is about the size of the
>> >> actual rows and storage it takes - i.e. general size of the database.
>> >> The problem with it is that (especially with dynamic task mapping), it
>> >> might grow really, really fast. Basically you have NUM_DAGS x
>> >> NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
>> >> of records there. Back-of-the envelope calculation Assuming you have a
>> >> DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
>> >> fields, each field evaluating to 1K string.  Then you have 10 tasks *
>> >> 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
>> >> per one(!) run of one(1) DAG. Run it every 10 minutes and every day
>> >> your database from a single DAG grows by whooping 1.44 GB of data
>> >> every single day (from single DAG).This is of course an estimation
>> >> that assumes a lot, but it's not at all unrealistic. That's a lot. And
>> >> if we want the user to do the cleanup then a) they need to know it b)
>> >> they need to specifically clean up this table only because all the
>> >> other data is relatively small. This table is very specific compared
>> >> with the other tables. The only reason for it being here is to be able
>> >> to show the rendered fields in the UI if you go to the specific run of
>> >> a task. If you clean-up other tables you basically lose the history of
>> >> execution of the tasks and you cannot really know if the data has been
>> >> processed, you cannot do backfills effectively, you lose all the
>> >> context. Cleaning this table is merely about the values that have been
>> >> rendered for a specific run and the assumption there is that the older
>> >> it gets, the less interesting it is.
>> >>
>> >> > It is opposite of what we have right now, we scan tables (maybe
>> multiple times), read all records tuples which contain JSON.
>> >>
>> >> Not sure if I get the point here :). Yes -in my proposal I think the
>> >> records will not be touched - only indexes. So the cleanup should be
>> >> way faster, contentions less of problem, due to the way the delete
>> >> uses < ordering, deadlocks will not be possible at all (as opposed to
>> >> the current "not in" - there is a very easy way to get into deadlocks
>> >> when two parallel deletes are trying to delete same rows in a
>> >> different sequence. I think my proposal improves all the
>> >> characteristics of the "cleanup" with very little extra penalty on
>> >> record creation.
>> >>
>> >> > We pay for table/index size linary more records, more size. But
>> other operations vary and depend on B-Tree implementation and usually it
>> has logarithmic growth. Or do we worry only about table/toast/index size on
>> disk?
>> >>
>> >> Yep. I (and I believe the original author had the same worry) am
>> >> worried a lot about the size of the table and the fact that this table
>> >> will be by far the biggest table in our DB while most of the old
>> >> records will never be touched. And by the fact that this is the only
>> >> table that our users will have to know about to clean up separately
>> >> from all others pretty much always. If not even worrying about money
>> >> spent by our users, and performance degradation that comes with
>> >> databases that are bigger - that's a lot of environmental effects that
>> >> we might incur. Airflow is heavily used, if suddenly all our users
>> >> will start having 10 bigger databases that they have now because we
>> >> will deprecate the values and keep all the history, then we have a big
>> >> number of extra disks that will have to be used. I'd strongly prefer a
>> >> solution where we keep the data usage lower in this case.
>> >>
>> >> > If we do not want to grant users the ability to clean up rendered
>> templates tables, there could be another option:
>> >> > - Do not delete records on every task instance run.
>> >> > - Delete once per defined period (hourly, daily, weekly, monthly).
>> In this case you really could not care about locks.
>> >>
>> >> Yes we could come up with a different strategy as to "when" run the
>> >> cleanup. This is also a viable option. If you can propose one that
>> >> will be equally adaptive as the current solution, I am all ears.
>> >> Basically my goal is to keep the usage of the table low, possibly
>> >> controlled by the same parameter we had. How we do it - this is a
>> >> different story. If we - for example add a thread in the scheduler
>> >> (for example) that performs such cleanup effectively in parallel and
>> >> scales, I am happy with that.
>> >>
>> >> But I am trying to get into the head of the author trying to
>> >> understand why the original implementation was done this way. I
>> >> believe (and maybe those who remember it better could confirm it) that
>> >> distributing the deletion to tasks to clean up after itself is a
>> >> better idea than centralising the cleanup. This makes each cleanup
>> >> smaller, locks are held for a shorter time (at least that was the
>> >> assumption where no full table scan was used), it is more "immediate"
>> >> and you do not have to decide upfront what should be the cleanup
>> >> frequency. It seems this is the best logical approach to keep the
>> >> "MAX_NUM_RENDERED_TI_FIELDS_PER_TASK" promise. Simply after task is
>> >> complete, you can be sure that there are no more than this number of
>> >> fields per task in the DB. With a scheduled run, that would be a much
>> >> more "eventual" consistency and it will be potentially fluctuating
>> >> much more.
>> >>
>> >> But there are risks involved in having a single thread doing the
>> >> cleanup. I think it has a huge risk of being a "stop-the world" and
>> >> "deadlock-prone" kind of event - if in big instances there are a lot
>> >> of rows to cleanup in a single pass. When you delete entries from a
>> >> table, there are anti-insert locks created for existing index entries,
>> >> which makes it possible to rollback the whole DELETE transaction.
>> >> Which means that when you try to insert the row with the same index,
>> >> the index will be held. And this means that when you run a single huge
>> >> DELETE for multiple rows affected with multiple (all?) index keys
>> >> matching select query, it will effectively prevent new rows with the
>> >> same indexes that are matching the SELECT. It would mean that if you
>> >> have some tasks running while deleting existing run_id rendered
>> >> fields, then you could REALLY start having deadlocks on those tasks
>> >> trying to insert rendered task instance rows. That's why I think the
>> >> only viable strategy for single "cleanup" thread is to do such cleanup
>> >> as separate DELETE for each of the "dag/task/map_index/run" - in order
>> >> to avoid such deadlocks. Which effectively will turn into what have
>> >> currently - only that currently those transactions are done by tasks,
>> >> not by a single cleanup thread.
>> >>
>> >> Also using tasks to delete old rows is more "effective" when you have
>> >> vast differences in frequency of DAGs. Naturally - when you do it in
>> >> task, you will only do it "when needed" for given DAG + Task. If you
>> >> try to centralize the cleanup, unless you include somehow schedule and
>> >> frequency of each dag, you are going to check every DAG every time
>> >> your run the cleanup - no matter if that DAG is run daily or every
>> >> minute, you will have to run the cleanup frequently enough to match
>> >> your most frequent dags. If you have 1000 dags that run hourly and one
>> >> DAG that runs every minue, then you have to run a cleanup job that
>> >> scans all DAGs every few minutes. That's a big waste.
>> >>
>> >> So I am not sure if we gain anything by centralizing the cleanup.
>> >> Decentralising it to Task seems to be a well thought and sound
>> >> decision (but I think the problem we have now is that we need to
>> >> optimize it after Dynamic Task Mapping has been added).
>> >>
>> >> ANOTHER FINDING:
>> >>
>> >> While looking at the code and discussing it and looking more closely I
>> >> **think** there is another problem that we have to fix regardless of a
>> >> solution. I THINK a problem we might have now is that we do not
>> >> include map_index in this DELETE. While we are curreently delete all
>> >> the rendered task fields without including map_index - and for big
>> >> dynamic tasks, it means that exacly the same DELETE query is run by
>> >> every single mapped instance of that tasks and that is where a lot of
>> >> contention and locking might happen (basically when single task
>> >> instance does the delete, anti-insert locks held the other mapped
>> >> instances of the same task from inserting rendered fields).
>> >>
>> >> It does not change much in the optimisation proposal of mine, other
>> >> than we should include map_index in those queries. But I think this
>> >> might cause a lot of delays in the current implementation.
>> >>
>> >> J.
>> >>
>> >> > ----
>> >> > Best Wishes
>> >> > Andrey Anshin
>> >> >
>> >> >
>> >> >
>> >> > On Mon, 30 Jan 2023 at 23:51, Jarek Potiuk <ja...@potiuk.com> wrote:
>> >> >>
>> >> >> I think there is a good reason to clean those up automatically.
>> >> >> rendered task instance fields are almost arbitrary in size. If we
>> try
>> >> >> to keep all historical values there by default, there are numerous
>> >> >> cases it will grow very fast - far, far too quickly.
>> >> >>
>> >> >> And I am not worried at all about locks on this table if we do it
>> the
>> >> >> way I described it and it uses the indexes. The contention this way
>> >> >> might only be between the two deleting tasks. and with the query I
>> >> >> proposed, they will only last for a short time - the index will be
>> >> >> locked when two DELETES  or SELECT DISTINCT - which should both be
>> >> >> fast.
>> >> >>
>> >> >>
>> >> >> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <
>> andrey.anshin@taragol.is> wrote:
>> >> >> >
>> >> >> > I guess two things involved to reduce performance on this query
>> through the time: Dynamic Task Mapping and run_id instead of execution date.
>> >> >> >
>> >> >> > I still personally think that changing the default value from 30
>> to 0 might improve performance of multiple concurrent tasks, just because
>> this query does not run and there are no locks on multiple records/pages.
>> >> >> >
>> >> >> > I do not have any proof (yet?) other than simple DAGs. I think
>> that there is some cross point exists when keeping this table growth worse
>> rather than cleanup for each TI run. But users have ability to cleanup
>> table by execute airflow db clean which should improve performance again
>> >> >> >
>> >> >> > And also there is interesting behavior with this query: if user
>> already have more that value specified by
>> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill
>> than rendered templates not written to table (or may be inserted and after
>> that immediately deleted), the same is valid for cleanup old tasks.
>> >> >> >
>> >> >> > ----
>> >> >> > Best Wishes
>> >> >> > Andrey Anshin
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com>
>> wrote:
>> >> >> >>
>> >> >> >> Yep. Agree this is not an efficient query and dynamic task
>> mapping
>> >> >> >> makes the effect much worse. Generally speaking, selecting "what
>> >> >> >> should be left" and then deleting stuff where the key is "not
>> in" is
>> >> >> >> never an efficient way of running an sql query.  And the query
>> not
>> >> >> >> using index at all makes it rather terrible.
>> >> >> >>
>> >> >> >> I think we should not deprecate it though, but find a more
>> efficient
>> >> >> >> way of deleting the old keys. I think we could slightly
>> denormalize
>> >> >> >> RenderedTaskInstance + DagRun tables, and add
>> DAG_RUN_EXECUTION_DATE
>> >> >> >> to the RenderedTaskInstance table and that will be enough to
>> optimise
>> >> >> >> it.
>> >> >> >>
>> >> >> >> Then we could have either:
>> >> >> >>
>> >> >> >> * a composite B-TREE indexed (non-unique) index on DAG_ID,
>> TASK_ID,
>> >> >> >> RUN_ID_EXECUTION_DATE
>> >> >> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and
>> separate
>> >> >> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
>> >> >> >>
>> >> >> >> Probably the latter is better as I am not sure how < , >
>> comparison
>> >> >> >> looks like for composite B-TREE indexes when char + date columns
>> are
>> >> >> >> mixed. Also we could have hit the infamous MySQL index key length
>> >> >> >> limit.
>> >> >> >>
>> >> >> >> Then deletion process would look roughly like:
>> >> >> >>
>> >> >> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
>> >> >> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
>> >> >> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
>> >> >> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
>> >> >> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
>> >> >> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID
>> =<DAG_ID>,
>> >> >> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
>> >> >> >>
>> >> >> >> I believe that would be fast, and it would use the B-TREE index
>> >> >> >> features nicely (ordering support)
>> >> >> >>
>> >> >> >> J
>> >> >> >>
>> >> >> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <
>> andrey.anshin@taragol.is> wrote:
>> >> >> >> >
>> >> >> >> > First of all I want to highlight that this approach I guess
>> worked well until Dynamic Task Mappings introduced.
>> >> >> >> >
>> >> >> >> > > The main reason for adding that cleanup was -- if you don't
>> do that, you will have many rows, similar to the TaskInstance table
>> >> >> >> >
>> >> >> >> > The problem itself is not how big your table/indexes, rather
>> then what kind of operation you run.
>> >> >> >> >
>> >> >> >> > > Do you have any data for locks or performance degradation?
>> >> >> >> >
>> >> >> >> > In this case if we try to clean up
>> rendered_task_instance_fields table when a new TI is created/cleared we
>> make almost two full/sequential scans (note: need to check) against the
>> table without any index usage, so we pay here a couple times:
>> >> >> >> > 1. We scan without indexes - not all parts of the composite
>> key are included to query, plus we need to filter everything except 30
>> records with order and distinct
>> >> >> >> > 2. After that we make another full scan for find 1 record or
>> map_size records
>> >> >> >> >
>> >> >> >> > And I guess the situation becomes worse if you have a lot of
>> tasks, even if we have a small table, we need to do ineffective operations.
>> >> >> >> >
>> >> >> >> > That how looks like Query Plan (please note without commit
>> transaction DELETE operation doesn't have all information):
>> https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
>> >> >> >> >
>> >> >> >> > In case if we do not clean up the table, we only use these
>> operations:
>> >> >> >> > 1. SELECT single record by index
>> >> >> >> > 2. INSERT new record
>> >> >> >> > 3. DELETE old record(s), which were found by index.
>> >> >> >> >
>> >> >> >> > I have not done any real tests yet, only synthetic DAGs (so we
>> should not consider to use any findings as totally truth):
>> https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
>> >> >> >> > DAG with parallel tasks: degradation up to 2-3 times
>> >> >> >> > DAG with single map tasks: degradation up to 7-10 times
>> >> >> >> >
>> >> >> >> > I have a plan for more complex and more close to real use
>> cases with Database which do not have network latency almost 0 as I have in
>> my local.
>> >> >> >> > But I will not refuse if someone also does their tests with
>> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
>> >> >> >> >
>> >> >> >> > About deadlock we know that it exists at least in MySQL:
>> https://github.com/apache/airflow/pull/18616
>> >> >> >> >
>> >> >> >> > > And the larger tables create problems during database
>> migrations.
>> >> >> >> >
>> >> >> >> > That is a very good point, so if we found that problem only
>> related to migrations we could:
>> >> >> >> > 1. Cleanup this table in migration
>> >> >> >> > 2. Add cli command to airflow db which could cleanup only
>> rendered fields, so it would be user's choice cleanup or not before
>> migration, do periodical maintenance or not
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > ----
>> >> >> >> > Best Wishes
>> >> >> >> > Andrey Anshin
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com>
>> wrote:
>> >> >> >> >>>
>> >> >> >> >>> Potentially it is a good idea to deprecate this option and
>> recommend for users to set it to 0? WDYT? Maybe someone has already tried
>> or investigated this?
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> The main reason for adding that cleanup was -- if you don't
>> do that, you will have many rows, similar to the TaskInstance table. And
>> the RenderedTIFields were mainly added for checking rendered TI fields on
>> the Webserver only because after DAG Serialization, the webserver won't
>> have access to DAG files.
>> >> >> >> >>
>> >> >> >> >> And the larger tables create problems during database
>> migrations.
>> >> >> >> >>
>> >> >> >> >> Do you have any data for locks or performance degradation?
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <
>> andrey.anshin@taragol.is> wrote:
>> >> >> >> >>>
>> >> >> >> >>> Greetings!
>> >> >> >> >>>
>> >> >> >> >>> During migrating our ORM syntax to compatible with
>> SQLAlchemy 2.0 I probably found skeletons in the closet.
>> >> >> >> >>>
>> >> >> >> >>> Let's start from the beginning, initially I got this warning
>> >> >> >> >>>
>> >> >> >> >>> airflow/models/renderedtifields.py:245
>> RemovedIn20Warning('ORDER BY columns added implicitly due to DISTINCT is
>> deprecated and will be removed in SQLAlchemy 2.0.  SELECT statements with
>> DISTINCT should be written to explicitly include the appropriate columns in
>> the columns clause (Background on SQLAlchemy 2.0 at:
>> https://sqlalche.me/e/b8d9)')
>> >> >> >> >>>
>> >> >> >> >>> "OK let's fix it!", I thought at first and started to
>> investigate RenderedTaskInstanceFields model
>> >> >> >> >>>
>> >> >> >> >>> Skeleton #1:
>> >> >> >> >>>
>> >> >> >> >>> When I first time look on the code and comments it got me to
>> thinking that part which keep only latest N Rendered Task Fields
>> potentially could lead different performance degradation (Locks, Dead
>> Locks, Data Bloating): see code
>> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
>> >> >> >> >>>
>> >> >> >> >>> Also this historical part (from Airflow 1.10.10) generate
>> this SQL Statement (pg backend)
>> >> >> >> >>>
>> >> >> >> >>> DELETE FROM rendered_task_instance_fields
>> >> >> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
>> >> >> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
>> >> >> >> >>>   AND (
>> >> >> >> >>>     (
>> >> >> >> >>>       rendered_task_instance_fields.dag_id,
>> >> >> >> >>>       rendered_task_instance_fields.task_id,
>> >> >> >> >>>       rendered_task_instance_fields.run_id
>> >> >> >> >>>     ) NOT IN (
>> >> >> >> >>>       SELECT
>> >> >> >> >>>         anon_1.dag_id,
>> >> >> >> >>>         anon_1.task_id,
>> >> >> >> >>>         anon_1.run_id
>> >> >> >> >>>       FROM
>> >> >> >> >>>         (
>> >> >> >> >>>           SELECT DISTINCT
>> >> >> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
>> >> >> >> >>>             rendered_task_instance_fields.task_id AS task_id,
>> >> >> >> >>>             rendered_task_instance_fields.run_id AS run_id,
>> >> >> >> >>>             dag_run.execution_date AS execution_date
>> >> >> >> >>>           FROM rendered_task_instance_fields
>> >> >> >> >>>             JOIN dag_run ON
>> rendered_task_instance_fields.dag_id = dag_run.dag_id
>> >> >> >> >>>             AND rendered_task_instance_fields.run_id =
>> dag_run.run_id
>> >> >> >> >>>           WHERE
>> >> >> >> >>>             rendered_task_instance_fields.dag_id =
>> %(dag_id_2) s
>> >> >> >> >>>             AND rendered_task_instance_fields.task_id =
>> %(task_id_2) s
>> >> >> >> >>>           ORDER BY
>> >> >> >> >>>             dag_run.execution_date DESC
>> >> >> >> >>>           limit %(param_1) s
>> >> >> >> >>>         ) AS anon_1
>> >> >> >> >>>     )
>> >> >> >> >>>   )
>> >> >> >> >>>
>> >> >> >> >>> Which is especially not effective in PostgreSQL. When IN
>> SUBQUERY could be easily transform internaly into SEMI-JOIN (aka EXISTS
>> clause), but it is not working for NOT IN SUBQUERY because it is not
>> transformed into ANTI JOIN (aka NOT EXISTS clause) even if it possible,
>> see: https://commitfest.postgresql.org/27/2023/
>> >> >> >> >>>
>> >> >> >> >>> I didn't do any performance benchmarks yet but I guess if
>> users set AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather
>> than default 30 it could improve performance and reduce number of
>> DeadLocks, however the table size will increase but I think we don't do any
>> maintenance job for other tables.
>> >> >> >> >>>
>> >> >> >> >>> Potentially it is a good idea to deprecate this option and
>> recommend for users to set it to 0? WDYT? Maybe someone has already tried
>> or investigated this?
>> >> >> >> >>>
>> >> >> >> >>>
>> >> >> >> >>> Skeleton #2:
>> >> >> >> >>>
>> >> >> >> >>> We have a k8s_pod_yaml field which is exclusively used by
>> K8S executors.
>> >> >> >> >>>
>> >> >> >> >>> Should we also decouple this field as part of AIP-51?
>> >> >> >> >>>
>> >> >> >> >>> ----
>> >> >> >> >>> Best Wishes
>> >> >> >> >>> Andrey Anshin
>> >> >> >> >>>
>>
>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Kaxil Naik <ka...@gmail.com>.
> >Users often asked (slack, issues, discussions) to give the ability to
> auto-maintain tables/logs and receive the usual answer: "No, we don't give
> you this opportunity, please use something for Airflow Ecosystem Page". But
> on the other hand we have auto-maintenance only for a single table.


That is not true, we added "airflow db clean" command to give this control
to users.

What about these tables: session, log, job? I expected the answer would be
> "They are not so specific."

The RTIF can vary in size based on what is stored and is really useless
beyond last X for each task. RTIF also has a foreign key constraints
<https://github.com/apache/airflow/blob/d80b583db07197c8c3d0549a805e83ceaaf10d52/airflow/models/renderedtifields.py#L63-L73>
on
TI and that is the most common table that is affected during the migrations
and can affect scheduling decisions. Log and Session tables aren't affected
by it.

I will have a detailed reply if I manage to find the time, it has just been
too difficult

On Tue, 31 Jan 2023 at 21:46, Jarek Potiuk <ja...@potiuk.com> wrote:

> BTW, I would really love to hear what the original authors have to say
> here. I am merely trying to put myself in their shoes and guess what
> the reasoning is :).
>
> I think this is really a question of: Do we want to keep all rendered
> arbitrary size rendered task instance fields in our database forever
> by default, same as other fields.
> I believe the original authors answered the question to be "no". And
> the "num_row" was a way to limit it.
>
> And I really do not want to "win" this argument, I just want to
> protect our users (and environment).
>
> There is (IMHO) currently a big difference between
> session/logg/task_instance fields and rendered task_instance fields
> that justify different behaviour.
>
> The former are generally fixed in max size of rows (this is one of the
> reasons we have limited string sizes in our DB) - to be able to limit
> them growing uncontrollably large. We simply do not keep arbitrary
> size data in those tables.
> On the other hand, the rendered task instance is arbitrary in size
> (JSONField) and the need for deletion accounts for the "worst" case
> scenario.
> Until we get rid of that "property" of the rendered task instance
> table, I think "just" skipping deletion of those fields without fixing
> the "worst" case scenario is not a good idea.
>
> Maybe in your test cases (and many others) those tables are not
> bigger, but I think the protection here is implemented to account for
> the case where the rendered task instance field is "big". I think the
> protection here is done for the cases where the rendered task instance
> fields are really "big".
>
> But very interestingly - if the rendered task instance is "big" then
> likely it is next to useless to be displayed in the Web UI in its
> entirety.
>
> So maybe you are actually right Andrey, Maybe we can skip deleting
> those and maybe we could solve it differently and apply the same rules
> as other tables?
>
> Let me then - constructively - propose another idea which actually
> might solve both yours and my own concerns. Maybe we can fix the
> "worst case" scenario differently? We do not have to limit the number
> of rows, we can limit the size of the row instead.
>
> Why don't we simply limit the size of the rendered task instance JSON
> and if they are too big (we can configure the maximum size), we will
> render something like (probably a bit more unique and not
> "accidentally triggerable"):
>
> {
>    "error": "This task instance has too large rendered task instances
> to display it"
> }
>
> And implement an escape hatch in the web server to handle it properly
> when displaying such "truncated" rendered task instance field.
>
> We should be able come up with a sensible max size that we might think
> makes sense when rendering it in the web UI.  And we could make the
> max size configurable by the user if they have a lot of GB to spare.
> We could even apply it automatically. If there is a max_num_row_limit
> - we allow any size, if we have no limit on the number of rows, we
> limit the maximum row size.
>
> If we have such "upper-bounded" reasonable size of each row in that
> table - then I am perfectly happy with not deleting the rows
> automatically.
> But only if we limit that and handle the "worst" case properly.
>
> That would be my proposal how we can handle it to get both views taken
> into account.
>
> What do you Andrey (and others) think? Does it make sense? Or do we
> think we should not have any such protections in place ?
>
> J.
>
>
> On Tue, Jan 31, 2023 at 9:43 PM Andrey Anshin <an...@taragol.is>
> wrote:
> >
> > > I think that is all something to be tested with explaining plans.
> >
> > I would be really careful with these results. DELETE in Transaction with
> Rollback usually shows more optimistic than actually executed.
> >
> >
> > > I think we would not know before we try - and possibly there are other
> > optimisation approaches. The optimisation I proposed was only first
> > that came to my mind to avoid the "not in" query. The problem  with
> > "not in query" is that there is no way to optimise it by the DB.
> > Effectively you have to get every record (or index entry) and test it.
> > Maybe it can be done better :). And yes locking the index with
> > anti-insert locks and the need to rebalance trees during the delete is
> > a concern.
> >
> > My point is still the same, I would rather remove it in the future or
> make policy about maintenance more consistent: all or nothing. Right now we
> are close to nothing rather than all.
> >
> >
> > > It's not about index size or JSON access. It is about the size of the
> > actual rows and storage it takes - i.e. general size of the database.
> >
> > I'm tired, but I'm not sure that I understand the actual point.
> > Is it not really a matter of size of the table if you always access by
> pattern:
> > 1 request which returns exactly 1 record accessed by a unique index.
> > Basically query travercial by index find reference to single CTID/rowid
> (or whatever name used in other RDBMS).
> > So at this point it really matters how fast your index grows rather than
> table size.
> >
> >
> > > The problem with it is that (especially with dynamic task mapping), it
> > might grow really, really fast. Basically you have NUM_DAGS x
> > NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
> > of records there.
> > Back-of-the envelope calculation Assuming you have a
> > DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
> > fields, each field evaluating to 1K string.  Then you have 10 tasks *
> > 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
> > per one(!) run of one(1) DAG. Run it every 10 minutes and every day
> > your database from a single DAG grows by whooping 1.44 GB of data
> > every single day (from single DAG).
> >
> > Depends on DB. If we talk about Postgres you could easily miss up to 3-4
> times (thanks for inline compression before/instead of TOASTs).
> > I have a couple questions:
> > 1. Do you know how big would be indexes in rendered_task_instance_fields
> after one day? (Spoiler alert I could give estimation in the morning)
> > 2. In this scenario with default settings always would keep up to 30 000
> TI for this DAG.
> > Could someone who wants to optimize the query make it more optimal
> rather than access the table by index (Unique Index -> ctid/rowid - record)
> and where this crosspoint?
> >
> > > This is of course an estimation that assumes a lot, but it's not at
> all unrealistic.
> >
> > 144,000 TI per/day on single DAG (initially I want to put here some
> sarcastic message).
> > How would Airflow feel with 144k Task Instances per day? How
> > I asked because right now I've always had a task_instance table bigger
> than rendered_task_instance_fields.
> >
> >
> > > This table is very specific compared with the other tables. The only
> reason for it being here is to be able
> > to show the rendered fields in the UI if you go to the specific run of a
> task. If you clean-up other tables you basically lose the history of
> execution of the tasks and you cannot really know if the data has been
> > processed, you cannot do backfills effectively, you lose all the
> > context. Cleaning this table is merely about the values that have been
> > rendered for a specific run and the assumption there is that the older
> > it gets, the less interesting it is.
> >
> > What about these tables: session, log, job? I expected the answer would
> be "They are not so specific."
> > For me every table is specific for their purpose.
> > Users often asked (slack, issues, discussions) to give the ability to
> auto-maintain tables/logs and receive the usual answer: "No, we don't give
> you this opportunity, please use something for Airflow Ecosystem Page". But
> on the other hand we have auto-maintenance only for a single table.
> >
> > >> It is opposite of what we have right now, we scan tables (maybe
> multiple times), read all records tuples which contain JSON.
> > > Not sure if I get the point here :). Yes -in my proposal I think the
> records will not be touched - only indexes. So the cleanup should be way
> faster, contentions less of problem, due to the way the delete
> > uses < ordering, deadlocks will not be possible at all (as opposed to
> the current "not in" - there is a very easy way to get into deadlocks when
> two parallel deletes are trying to delete same rows in a different
> sequence. I think my proposal improves all the characteristics of the
> "cleanup" with very little extra penalty on record creation.
> >
> > I was talking about the current solution and why it is also slow (and if
> abstract users use some DBaaS where they also pay for IOPs then it is
> expensive). Let's talk about the benefits of optimised queries for 4
> different DB backends (3 if excluding SQLite) when we have it.
> >
> > > We pay for table/index size linary more records, more size. But other
> operations vary and depend on B-Tree implementation and usually it has
> logarithmic growth. Or do we worry only about table/toast/index size on
> disk?
> > >> Yep. I (and I believe the original author had the same worry) am
> worried a lot about the size of the table and the fact that this table will
> be by far the biggest table in our DB while most of the old records will
> never be touched. And by the fact that this is the only table that our
> users will have to know about to clean up separately from all others pretty
> much always.
> >
> > Same as previous.
> >
> > > If not even worrying about money spent by our users, and performance
> degradation that comes with databases that are bigger - that's a lot of
> environmental effects that we might incur. Airflow is heavily used, if
> suddenly all our users
> > will start having 10 bigger databases that they have now because we will
> deprecate the values and keep all the history, then we have a big number of
> extra disks that will have to be used. I'd strongly prefer a solution where
> we keep the data usage lower in this case.
> >
> > Am I right that this is all about "lets don't delete by default as we do
> for other tables" rather than the current default implementation?
> > Because I get the result which is opposite what you said. And
> rendered_task_instance_fields don't grow faster than other tables that what
> I got.
> > I would like to compare it with other findings and some reproducible
> metrics rather than with hypothetical things.
> >
> > ----
> > Best Wishes
> > Andrey Anshin
> >
> >
> >
> > On Tue, 31 Jan 2023 at 11:12, Jarek Potiuk <ja...@potiuk.com> wrote:
> >>
> >> COMMENT: While writing the answer here, I think I found a deeper
> >> problem (and optimisation needed)  - i.e I think the delete should be
> >> even more fine-grained than it is today and include map_index) -
> >> please take a look at the end (Also maybe TP might comment on that
> >> one).
> >>
> >> > 1. Additional indexes add additional performance degradation on
> Insert but gain potential improvements on delete and unknown on update,
> RDBMS still require rebalance index and make it consistent to the table.
> >> > 2. LIMIT x OFFSET y could easily become full seq scan, especially if
> the user set a huge number for offset (which? unknown).
> >> > 3. Mixing two indexes could improve performance in a single query but
> in concurrent execution might lead to degradation because it needs to
> create a bitmap table for comparison between these two indexes, as result
> it might lead different issues, such as OOM on DB backend, use swaps or
> optimiser decided that better not to use this indexes.
> >>
> >> I think that is all something to be tested with explain plans. I think
> >> we would not know before we try - and possibly there are other
> >> optimisation approaches. The optimisation I proposed was only first
> >> that came to my mind to avoid the "not in" query. The problem  with
> >> "not in query" is that there is no way to optimise it by the DB.
> >> Effectively you have to get every record (or index entry) and test it.
> >> Maybe it can be done better :). And yes locking the index with
> >> anti-insert locks and the need to rebalance trees during the delete is
> >> a concern.
> >>
> >> > Is it a real problem? Until we access only by indexes, which doesn't
> include this JSON, it really doesn't matter. I guess we almost always
> should make a UNIQUE INDEX SCAN for SELECT or DELETE (UPDATE) a single
> record.
> >>
> >> Yes I think so, and while. I was not the author of this "cleanup"
> >> code, I believe I know the intention.
> >>
> >> It's not about index size or JSON access. It is about the size of the
> >> actual rows and storage it takes - i.e. general size of the database.
> >> The problem with it is that (especially with dynamic task mapping), it
> >> might grow really, really fast. Basically you have NUM_DAGS x
> >> NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
> >> of records there. Back-of-the envelope calculation Assuming you have a
> >> DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
> >> fields, each field evaluating to 1K string.  Then you have 10 tasks *
> >> 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
> >> per one(!) run of one(1) DAG. Run it every 10 minutes and every day
> >> your database from a single DAG grows by whooping 1.44 GB of data
> >> every single day (from single DAG).This is of course an estimation
> >> that assumes a lot, but it's not at all unrealistic. That's a lot. And
> >> if we want the user to do the cleanup then a) they need to know it b)
> >> they need to specifically clean up this table only because all the
> >> other data is relatively small. This table is very specific compared
> >> with the other tables. The only reason for it being here is to be able
> >> to show the rendered fields in the UI if you go to the specific run of
> >> a task. If you clean-up other tables you basically lose the history of
> >> execution of the tasks and you cannot really know if the data has been
> >> processed, you cannot do backfills effectively, you lose all the
> >> context. Cleaning this table is merely about the values that have been
> >> rendered for a specific run and the assumption there is that the older
> >> it gets, the less interesting it is.
> >>
> >> > It is opposite of what we have right now, we scan tables (maybe
> multiple times), read all records tuples which contain JSON.
> >>
> >> Not sure if I get the point here :). Yes -in my proposal I think the
> >> records will not be touched - only indexes. So the cleanup should be
> >> way faster, contentions less of problem, due to the way the delete
> >> uses < ordering, deadlocks will not be possible at all (as opposed to
> >> the current "not in" - there is a very easy way to get into deadlocks
> >> when two parallel deletes are trying to delete same rows in a
> >> different sequence. I think my proposal improves all the
> >> characteristics of the "cleanup" with very little extra penalty on
> >> record creation.
> >>
> >> > We pay for table/index size linary more records, more size. But other
> operations vary and depend on B-Tree implementation and usually it has
> logarithmic growth. Or do we worry only about table/toast/index size on
> disk?
> >>
> >> Yep. I (and I believe the original author had the same worry) am
> >> worried a lot about the size of the table and the fact that this table
> >> will be by far the biggest table in our DB while most of the old
> >> records will never be touched. And by the fact that this is the only
> >> table that our users will have to know about to clean up separately
> >> from all others pretty much always. If not even worrying about money
> >> spent by our users, and performance degradation that comes with
> >> databases that are bigger - that's a lot of environmental effects that
> >> we might incur. Airflow is heavily used, if suddenly all our users
> >> will start having 10 bigger databases that they have now because we
> >> will deprecate the values and keep all the history, then we have a big
> >> number of extra disks that will have to be used. I'd strongly prefer a
> >> solution where we keep the data usage lower in this case.
> >>
> >> > If we do not want to grant users the ability to clean up rendered
> templates tables, there could be another option:
> >> > - Do not delete records on every task instance run.
> >> > - Delete once per defined period (hourly, daily, weekly, monthly). In
> this case you really could not care about locks.
> >>
> >> Yes we could come up with a different strategy as to "when" run the
> >> cleanup. This is also a viable option. If you can propose one that
> >> will be equally adaptive as the current solution, I am all ears.
> >> Basically my goal is to keep the usage of the table low, possibly
> >> controlled by the same parameter we had. How we do it - this is a
> >> different story. If we - for example add a thread in the scheduler
> >> (for example) that performs such cleanup effectively in parallel and
> >> scales, I am happy with that.
> >>
> >> But I am trying to get into the head of the author trying to
> >> understand why the original implementation was done this way. I
> >> believe (and maybe those who remember it better could confirm it) that
> >> distributing the deletion to tasks to clean up after itself is a
> >> better idea than centralising the cleanup. This makes each cleanup
> >> smaller, locks are held for a shorter time (at least that was the
> >> assumption where no full table scan was used), it is more "immediate"
> >> and you do not have to decide upfront what should be the cleanup
> >> frequency. It seems this is the best logical approach to keep the
> >> "MAX_NUM_RENDERED_TI_FIELDS_PER_TASK" promise. Simply after task is
> >> complete, you can be sure that there are no more than this number of
> >> fields per task in the DB. With a scheduled run, that would be a much
> >> more "eventual" consistency and it will be potentially fluctuating
> >> much more.
> >>
> >> But there are risks involved in having a single thread doing the
> >> cleanup. I think it has a huge risk of being a "stop-the world" and
> >> "deadlock-prone" kind of event - if in big instances there are a lot
> >> of rows to cleanup in a single pass. When you delete entries from a
> >> table, there are anti-insert locks created for existing index entries,
> >> which makes it possible to rollback the whole DELETE transaction.
> >> Which means that when you try to insert the row with the same index,
> >> the index will be held. And this means that when you run a single huge
> >> DELETE for multiple rows affected with multiple (all?) index keys
> >> matching select query, it will effectively prevent new rows with the
> >> same indexes that are matching the SELECT. It would mean that if you
> >> have some tasks running while deleting existing run_id rendered
> >> fields, then you could REALLY start having deadlocks on those tasks
> >> trying to insert rendered task instance rows. That's why I think the
> >> only viable strategy for single "cleanup" thread is to do such cleanup
> >> as separate DELETE for each of the "dag/task/map_index/run" - in order
> >> to avoid such deadlocks. Which effectively will turn into what have
> >> currently - only that currently those transactions are done by tasks,
> >> not by a single cleanup thread.
> >>
> >> Also using tasks to delete old rows is more "effective" when you have
> >> vast differences in frequency of DAGs. Naturally - when you do it in
> >> task, you will only do it "when needed" for given DAG + Task. If you
> >> try to centralize the cleanup, unless you include somehow schedule and
> >> frequency of each dag, you are going to check every DAG every time
> >> your run the cleanup - no matter if that DAG is run daily or every
> >> minute, you will have to run the cleanup frequently enough to match
> >> your most frequent dags. If you have 1000 dags that run hourly and one
> >> DAG that runs every minue, then you have to run a cleanup job that
> >> scans all DAGs every few minutes. That's a big waste.
> >>
> >> So I am not sure if we gain anything by centralizing the cleanup.
> >> Decentralising it to Task seems to be a well thought and sound
> >> decision (but I think the problem we have now is that we need to
> >> optimize it after Dynamic Task Mapping has been added).
> >>
> >> ANOTHER FINDING:
> >>
> >> While looking at the code and discussing it and looking more closely I
> >> **think** there is another problem that we have to fix regardless of a
> >> solution. I THINK a problem we might have now is that we do not
> >> include map_index in this DELETE. While we are curreently delete all
> >> the rendered task fields without including map_index - and for big
> >> dynamic tasks, it means that exacly the same DELETE query is run by
> >> every single mapped instance of that tasks and that is where a lot of
> >> contention and locking might happen (basically when single task
> >> instance does the delete, anti-insert locks held the other mapped
> >> instances of the same task from inserting rendered fields).
> >>
> >> It does not change much in the optimisation proposal of mine, other
> >> than we should include map_index in those queries. But I think this
> >> might cause a lot of delays in the current implementation.
> >>
> >> J.
> >>
> >> > ----
> >> > Best Wishes
> >> > Andrey Anshin
> >> >
> >> >
> >> >
> >> > On Mon, 30 Jan 2023 at 23:51, Jarek Potiuk <ja...@potiuk.com> wrote:
> >> >>
> >> >> I think there is a good reason to clean those up automatically.
> >> >> rendered task instance fields are almost arbitrary in size. If we try
> >> >> to keep all historical values there by default, there are numerous
> >> >> cases it will grow very fast - far, far too quickly.
> >> >>
> >> >> And I am not worried at all about locks on this table if we do it the
> >> >> way I described it and it uses the indexes. The contention this way
> >> >> might only be between the two deleting tasks. and with the query I
> >> >> proposed, they will only last for a short time - the index will be
> >> >> locked when two DELETES  or SELECT DISTINCT - which should both be
> >> >> fast.
> >> >>
> >> >>
> >> >> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >> >
> >> >> > I guess two things involved to reduce performance on this query
> through the time: Dynamic Task Mapping and run_id instead of execution date.
> >> >> >
> >> >> > I still personally think that changing the default value from 30
> to 0 might improve performance of multiple concurrent tasks, just because
> this query does not run and there are no locks on multiple records/pages.
> >> >> >
> >> >> > I do not have any proof (yet?) other than simple DAGs. I think
> that there is some cross point exists when keeping this table growth worse
> rather than cleanup for each TI run. But users have ability to cleanup
> table by execute airflow db clean which should improve performance again
> >> >> >
> >> >> > And also there is interesting behavior with this query: if user
> already have more that value specified by
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill
> than rendered templates not written to table (or may be inserted and after
> that immediately deleted), the same is valid for cleanup old tasks.
> >> >> >
> >> >> > ----
> >> >> > Best Wishes
> >> >> > Andrey Anshin
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com>
> wrote:
> >> >> >>
> >> >> >> Yep. Agree this is not an efficient query and dynamic task mapping
> >> >> >> makes the effect much worse. Generally speaking, selecting "what
> >> >> >> should be left" and then deleting stuff where the key is "not in"
> is
> >> >> >> never an efficient way of running an sql query.  And the query not
> >> >> >> using index at all makes it rather terrible.
> >> >> >>
> >> >> >> I think we should not deprecate it though, but find a more
> efficient
> >> >> >> way of deleting the old keys. I think we could slightly
> denormalize
> >> >> >> RenderedTaskInstance + DagRun tables, and add
> DAG_RUN_EXECUTION_DATE
> >> >> >> to the RenderedTaskInstance table and that will be enough to
> optimise
> >> >> >> it.
> >> >> >>
> >> >> >> Then we could have either:
> >> >> >>
> >> >> >> * a composite B-TREE indexed (non-unique) index on DAG_ID,
> TASK_ID,
> >> >> >> RUN_ID_EXECUTION_DATE
> >> >> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
> >> >> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
> >> >> >>
> >> >> >> Probably the latter is better as I am not sure how < , >
> comparison
> >> >> >> looks like for composite B-TREE indexes when char + date columns
> are
> >> >> >> mixed. Also we could have hit the infamous MySQL index key length
> >> >> >> limit.
> >> >> >>
> >> >> >> Then deletion process would look roughly like:
> >> >> >>
> >> >> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
> >> >> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> >> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
> >> >> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
> >> >> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
> >> >> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID
> =<DAG_ID>,
> >> >> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
> >> >> >>
> >> >> >> I believe that would be fast, and it would use the B-TREE index
> >> >> >> features nicely (ordering support)
> >> >> >>
> >> >> >> J
> >> >> >>
> >> >> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >> >> >
> >> >> >> > First of all I want to highlight that this approach I guess
> worked well until Dynamic Task Mappings introduced.
> >> >> >> >
> >> >> >> > > The main reason for adding that cleanup was -- if you don't
> do that, you will have many rows, similar to the TaskInstance table
> >> >> >> >
> >> >> >> > The problem itself is not how big your table/indexes, rather
> then what kind of operation you run.
> >> >> >> >
> >> >> >> > > Do you have any data for locks or performance degradation?
> >> >> >> >
> >> >> >> > In this case if we try to clean up
> rendered_task_instance_fields table when a new TI is created/cleared we
> make almost two full/sequential scans (note: need to check) against the
> table without any index usage, so we pay here a couple times:
> >> >> >> > 1. We scan without indexes - not all parts of the composite key
> are included to query, plus we need to filter everything except 30 records
> with order and distinct
> >> >> >> > 2. After that we make another full scan for find 1 record or
> map_size records
> >> >> >> >
> >> >> >> > And I guess the situation becomes worse if you have a lot of
> tasks, even if we have a small table, we need to do ineffective operations.
> >> >> >> >
> >> >> >> > That how looks like Query Plan (please note without commit
> transaction DELETE operation doesn't have all information):
> https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
> >> >> >> >
> >> >> >> > In case if we do not clean up the table, we only use these
> operations:
> >> >> >> > 1. SELECT single record by index
> >> >> >> > 2. INSERT new record
> >> >> >> > 3. DELETE old record(s), which were found by index.
> >> >> >> >
> >> >> >> > I have not done any real tests yet, only synthetic DAGs (so we
> should not consider to use any findings as totally truth):
> https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
> >> >> >> > DAG with parallel tasks: degradation up to 2-3 times
> >> >> >> > DAG with single map tasks: degradation up to 7-10 times
> >> >> >> >
> >> >> >> > I have a plan for more complex and more close to real use cases
> with Database which do not have network latency almost 0 as I have in my
> local.
> >> >> >> > But I will not refuse if someone also does their tests with
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
> >> >> >> >
> >> >> >> > About deadlock we know that it exists at least in MySQL:
> https://github.com/apache/airflow/pull/18616
> >> >> >> >
> >> >> >> > > And the larger tables create problems during database
> migrations.
> >> >> >> >
> >> >> >> > That is a very good point, so if we found that problem only
> related to migrations we could:
> >> >> >> > 1. Cleanup this table in migration
> >> >> >> > 2. Add cli command to airflow db which could cleanup only
> rendered fields, so it would be user's choice cleanup or not before
> migration, do periodical maintenance or not
> >> >> >> >
> >> >> >> >
> >> >> >> > ----
> >> >> >> > Best Wishes
> >> >> >> > Andrey Anshin
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com>
> wrote:
> >> >> >> >>>
> >> >> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> The main reason for adding that cleanup was -- if you don't do
> that, you will have many rows, similar to the TaskInstance table. And the
> RenderedTIFields were mainly added for checking rendered TI fields on the
> Webserver only because after DAG Serialization, the webserver won't have
> access to DAG files.
> >> >> >> >>
> >> >> >> >> And the larger tables create problems during database
> migrations.
> >> >> >> >>
> >> >> >> >> Do you have any data for locks or performance degradation?
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >> >> >>>
> >> >> >> >>> Greetings!
> >> >> >> >>>
> >> >> >> >>> During migrating our ORM syntax to compatible with SQLAlchemy
> 2.0 I probably found skeletons in the closet.
> >> >> >> >>>
> >> >> >> >>> Let's start from the beginning, initially I got this warning
> >> >> >> >>>
> >> >> >> >>> airflow/models/renderedtifields.py:245
> RemovedIn20Warning('ORDER BY columns added implicitly due to DISTINCT is
> deprecated and will be removed in SQLAlchemy 2.0.  SELECT statements with
> DISTINCT should be written to explicitly include the appropriate columns in
> the columns clause (Background on SQLAlchemy 2.0 at:
> https://sqlalche.me/e/b8d9)')
> >> >> >> >>>
> >> >> >> >>> "OK let's fix it!", I thought at first and started to
> investigate RenderedTaskInstanceFields model
> >> >> >> >>>
> >> >> >> >>> Skeleton #1:
> >> >> >> >>>
> >> >> >> >>> When I first time look on the code and comments it got me to
> thinking that part which keep only latest N Rendered Task Fields
> potentially could lead different performance degradation (Locks, Dead
> Locks, Data Bloating): see code
> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
> >> >> >> >>>
> >> >> >> >>> Also this historical part (from Airflow 1.10.10) generate
> this SQL Statement (pg backend)
> >> >> >> >>>
> >> >> >> >>> DELETE FROM rendered_task_instance_fields
> >> >> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
> >> >> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
> >> >> >> >>>   AND (
> >> >> >> >>>     (
> >> >> >> >>>       rendered_task_instance_fields.dag_id,
> >> >> >> >>>       rendered_task_instance_fields.task_id,
> >> >> >> >>>       rendered_task_instance_fields.run_id
> >> >> >> >>>     ) NOT IN (
> >> >> >> >>>       SELECT
> >> >> >> >>>         anon_1.dag_id,
> >> >> >> >>>         anon_1.task_id,
> >> >> >> >>>         anon_1.run_id
> >> >> >> >>>       FROM
> >> >> >> >>>         (
> >> >> >> >>>           SELECT DISTINCT
> >> >> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
> >> >> >> >>>             rendered_task_instance_fields.task_id AS task_id,
> >> >> >> >>>             rendered_task_instance_fields.run_id AS run_id,
> >> >> >> >>>             dag_run.execution_date AS execution_date
> >> >> >> >>>           FROM rendered_task_instance_fields
> >> >> >> >>>             JOIN dag_run ON
> rendered_task_instance_fields.dag_id = dag_run.dag_id
> >> >> >> >>>             AND rendered_task_instance_fields.run_id =
> dag_run.run_id
> >> >> >> >>>           WHERE
> >> >> >> >>>             rendered_task_instance_fields.dag_id =
> %(dag_id_2) s
> >> >> >> >>>             AND rendered_task_instance_fields.task_id =
> %(task_id_2) s
> >> >> >> >>>           ORDER BY
> >> >> >> >>>             dag_run.execution_date DESC
> >> >> >> >>>           limit %(param_1) s
> >> >> >> >>>         ) AS anon_1
> >> >> >> >>>     )
> >> >> >> >>>   )
> >> >> >> >>>
> >> >> >> >>> Which is especially not effective in PostgreSQL. When IN
> SUBQUERY could be easily transform internaly into SEMI-JOIN (aka EXISTS
> clause), but it is not working for NOT IN SUBQUERY because it is not
> transformed into ANTI JOIN (aka NOT EXISTS clause) even if it possible,
> see: https://commitfest.postgresql.org/27/2023/
> >> >> >> >>>
> >> >> >> >>> I didn't do any performance benchmarks yet but I guess if
> users set AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather
> than default 30 it could improve performance and reduce number of
> DeadLocks, however the table size will increase but I think we don't do any
> maintenance job for other tables.
> >> >> >> >>>
> >> >> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> Skeleton #2:
> >> >> >> >>>
> >> >> >> >>> We have a k8s_pod_yaml field which is exclusively used by K8S
> executors.
> >> >> >> >>>
> >> >> >> >>> Should we also decouple this field as part of AIP-51?
> >> >> >> >>>
> >> >> >> >>> ----
> >> >> >> >>> Best Wishes
> >> >> >> >>> Andrey Anshin
> >> >> >> >>>
>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Jarek Potiuk <ja...@potiuk.com>.
BTW, I would really love to hear what the original authors have to say
here. I am merely trying to put myself in their shoes and guess what
the reasoning is :).

I think this is really a question of: Do we want to keep all rendered
arbitrary size rendered task instance fields in our database forever
by default, same as other fields.
I believe the original authors answered the question to be "no". And
the "num_row" was a way to limit it.

And I really do not want to "win" this argument, I just want to
protect our users (and environment).

There is (IMHO) currently a big difference between
session/logg/task_instance fields and rendered task_instance fields
that justify different behaviour.

The former are generally fixed in max size of rows (this is one of the
reasons we have limited string sizes in our DB) - to be able to limit
them growing uncontrollably large. We simply do not keep arbitrary
size data in those tables.
On the other hand, the rendered task instance is arbitrary in size
(JSONField) and the need for deletion accounts for the "worst" case
scenario.
Until we get rid of that "property" of the rendered task instance
table, I think "just" skipping deletion of those fields without fixing
the "worst" case scenario is not a good idea.

Maybe in your test cases (and many others) those tables are not
bigger, but I think the protection here is implemented to account for
the case where the rendered task instance field is "big". I think the
protection here is done for the cases where the rendered task instance
fields are really "big".

But very interestingly - if the rendered task instance is "big" then
likely it is next to useless to be displayed in the Web UI in its
entirety.

So maybe you are actually right Andrey, Maybe we can skip deleting
those and maybe we could solve it differently and apply the same rules
as other tables?

Let me then - constructively - propose another idea which actually
might solve both yours and my own concerns. Maybe we can fix the
"worst case" scenario differently? We do not have to limit the number
of rows, we can limit the size of the row instead.

Why don't we simply limit the size of the rendered task instance JSON
and if they are too big (we can configure the maximum size), we will
render something like (probably a bit more unique and not
"accidentally triggerable"):

{
   "error": "This task instance has too large rendered task instances
to display it"
}

And implement an escape hatch in the web server to handle it properly
when displaying such "truncated" rendered task instance field.

We should be able come up with a sensible max size that we might think
makes sense when rendering it in the web UI.  And we could make the
max size configurable by the user if they have a lot of GB to spare.
We could even apply it automatically. If there is a max_num_row_limit
- we allow any size, if we have no limit on the number of rows, we
limit the maximum row size.

If we have such "upper-bounded" reasonable size of each row in that
table - then I am perfectly happy with not deleting the rows
automatically.
But only if we limit that and handle the "worst" case properly.

That would be my proposal how we can handle it to get both views taken
into account.

What do you Andrey (and others) think? Does it make sense? Or do we
think we should not have any such protections in place ?

J.


On Tue, Jan 31, 2023 at 9:43 PM Andrey Anshin <an...@taragol.is> wrote:
>
> > I think that is all something to be tested with explaining plans.
>
> I would be really careful with these results. DELETE in Transaction with Rollback usually shows more optimistic than actually executed.
>
>
> > I think we would not know before we try - and possibly there are other
> optimisation approaches. The optimisation I proposed was only first
> that came to my mind to avoid the "not in" query. The problem  with
> "not in query" is that there is no way to optimise it by the DB.
> Effectively you have to get every record (or index entry) and test it.
> Maybe it can be done better :). And yes locking the index with
> anti-insert locks and the need to rebalance trees during the delete is
> a concern.
>
> My point is still the same, I would rather remove it in the future or make policy about maintenance more consistent: all or nothing. Right now we are close to nothing rather than all.
>
>
> > It's not about index size or JSON access. It is about the size of the
> actual rows and storage it takes - i.e. general size of the database.
>
> I'm tired, but I'm not sure that I understand the actual point.
> Is it not really a matter of size of the table if you always access by pattern:
> 1 request which returns exactly 1 record accessed by a unique index.
> Basically query travercial by index find reference to single CTID/rowid (or whatever name used in other RDBMS).
> So at this point it really matters how fast your index grows rather than table size.
>
>
> > The problem with it is that (especially with dynamic task mapping), it
> might grow really, really fast. Basically you have NUM_DAGS x
> NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
> of records there.
> Back-of-the envelope calculation Assuming you have a
> DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
> fields, each field evaluating to 1K string.  Then you have 10 tasks *
> 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
> per one(!) run of one(1) DAG. Run it every 10 minutes and every day
> your database from a single DAG grows by whooping 1.44 GB of data
> every single day (from single DAG).
>
> Depends on DB. If we talk about Postgres you could easily miss up to 3-4 times (thanks for inline compression before/instead of TOASTs).
> I have a couple questions:
> 1. Do you know how big would be indexes in rendered_task_instance_fields after one day? (Spoiler alert I could give estimation in the morning)
> 2. In this scenario with default settings always would keep up to 30 000 TI for this DAG.
> Could someone who wants to optimize the query make it more optimal rather than access the table by index (Unique Index -> ctid/rowid - record) and where this crosspoint?
>
> > This is of course an estimation that assumes a lot, but it's not at all unrealistic.
>
> 144,000 TI per/day on single DAG (initially I want to put here some sarcastic message).
> How would Airflow feel with 144k Task Instances per day? How
> I asked because right now I've always had a task_instance table bigger than rendered_task_instance_fields.
>
>
> > This table is very specific compared with the other tables. The only reason for it being here is to be able
> to show the rendered fields in the UI if you go to the specific run of a task. If you clean-up other tables you basically lose the history of execution of the tasks and you cannot really know if the data has been
> processed, you cannot do backfills effectively, you lose all the
> context. Cleaning this table is merely about the values that have been
> rendered for a specific run and the assumption there is that the older
> it gets, the less interesting it is.
>
> What about these tables: session, log, job? I expected the answer would be "They are not so specific."
> For me every table is specific for their purpose.
> Users often asked (slack, issues, discussions) to give the ability to auto-maintain tables/logs and receive the usual answer: "No, we don't give you this opportunity, please use something for Airflow Ecosystem Page". But on the other hand we have auto-maintenance only for a single table.
>
> >> It is opposite of what we have right now, we scan tables (maybe multiple times), read all records tuples which contain JSON.
> > Not sure if I get the point here :). Yes -in my proposal I think the records will not be touched - only indexes. So the cleanup should be way faster, contentions less of problem, due to the way the delete
> uses < ordering, deadlocks will not be possible at all (as opposed to the current "not in" - there is a very easy way to get into deadlocks when two parallel deletes are trying to delete same rows in a different sequence. I think my proposal improves all the characteristics of the "cleanup" with very little extra penalty on record creation.
>
> I was talking about the current solution and why it is also slow (and if abstract users use some DBaaS where they also pay for IOPs then it is expensive). Let's talk about the benefits of optimised queries for 4 different DB backends (3 if excluding SQLite) when we have it.
>
> > We pay for table/index size linary more records, more size. But other operations vary and depend on B-Tree implementation and usually it has logarithmic growth. Or do we worry only about table/toast/index size on disk?
> >> Yep. I (and I believe the original author had the same worry) am worried a lot about the size of the table and the fact that this table will be by far the biggest table in our DB while most of the old records will never be touched. And by the fact that this is the only table that our users will have to know about to clean up separately from all others pretty much always.
>
> Same as previous.
>
> > If not even worrying about money spent by our users, and performance degradation that comes with databases that are bigger - that's a lot of environmental effects that we might incur. Airflow is heavily used, if suddenly all our users
> will start having 10 bigger databases that they have now because we will deprecate the values and keep all the history, then we have a big number of extra disks that will have to be used. I'd strongly prefer a solution where we keep the data usage lower in this case.
>
> Am I right that this is all about "lets don't delete by default as we do for other tables" rather than the current default implementation?
> Because I get the result which is opposite what you said. And rendered_task_instance_fields don't grow faster than other tables that what I got.
> I would like to compare it with other findings and some reproducible metrics rather than with hypothetical things.
>
> ----
> Best Wishes
> Andrey Anshin
>
>
>
> On Tue, 31 Jan 2023 at 11:12, Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>> COMMENT: While writing the answer here, I think I found a deeper
>> problem (and optimisation needed)  - i.e I think the delete should be
>> even more fine-grained than it is today and include map_index) -
>> please take a look at the end (Also maybe TP might comment on that
>> one).
>>
>> > 1. Additional indexes add additional performance degradation on Insert but gain potential improvements on delete and unknown on update, RDBMS still require rebalance index and make it consistent to the table.
>> > 2. LIMIT x OFFSET y could easily become full seq scan, especially if the user set a huge number for offset (which? unknown).
>> > 3. Mixing two indexes could improve performance in a single query but in concurrent execution might lead to degradation because it needs to create a bitmap table for comparison between these two indexes, as result it might lead different issues, such as OOM on DB backend, use swaps or optimiser decided that better not to use this indexes.
>>
>> I think that is all something to be tested with explain plans. I think
>> we would not know before we try - and possibly there are other
>> optimisation approaches. The optimisation I proposed was only first
>> that came to my mind to avoid the "not in" query. The problem  with
>> "not in query" is that there is no way to optimise it by the DB.
>> Effectively you have to get every record (or index entry) and test it.
>> Maybe it can be done better :). And yes locking the index with
>> anti-insert locks and the need to rebalance trees during the delete is
>> a concern.
>>
>> > Is it a real problem? Until we access only by indexes, which doesn't include this JSON, it really doesn't matter. I guess we almost always should make a UNIQUE INDEX SCAN for SELECT or DELETE (UPDATE) a single record.
>>
>> Yes I think so, and while. I was not the author of this "cleanup"
>> code, I believe I know the intention.
>>
>> It's not about index size or JSON access. It is about the size of the
>> actual rows and storage it takes - i.e. general size of the database.
>> The problem with it is that (especially with dynamic task mapping), it
>> might grow really, really fast. Basically you have NUM_DAGS x
>> NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
>> of records there. Back-of-the envelope calculation Assuming you have a
>> DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
>> fields, each field evaluating to 1K string.  Then you have 10 tasks *
>> 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
>> per one(!) run of one(1) DAG. Run it every 10 minutes and every day
>> your database from a single DAG grows by whooping 1.44 GB of data
>> every single day (from single DAG).This is of course an estimation
>> that assumes a lot, but it's not at all unrealistic. That's a lot. And
>> if we want the user to do the cleanup then a) they need to know it b)
>> they need to specifically clean up this table only because all the
>> other data is relatively small. This table is very specific compared
>> with the other tables. The only reason for it being here is to be able
>> to show the rendered fields in the UI if you go to the specific run of
>> a task. If you clean-up other tables you basically lose the history of
>> execution of the tasks and you cannot really know if the data has been
>> processed, you cannot do backfills effectively, you lose all the
>> context. Cleaning this table is merely about the values that have been
>> rendered for a specific run and the assumption there is that the older
>> it gets, the less interesting it is.
>>
>> > It is opposite of what we have right now, we scan tables (maybe multiple times), read all records tuples which contain JSON.
>>
>> Not sure if I get the point here :). Yes -in my proposal I think the
>> records will not be touched - only indexes. So the cleanup should be
>> way faster, contentions less of problem, due to the way the delete
>> uses < ordering, deadlocks will not be possible at all (as opposed to
>> the current "not in" - there is a very easy way to get into deadlocks
>> when two parallel deletes are trying to delete same rows in a
>> different sequence. I think my proposal improves all the
>> characteristics of the "cleanup" with very little extra penalty on
>> record creation.
>>
>> > We pay for table/index size linary more records, more size. But other operations vary and depend on B-Tree implementation and usually it has logarithmic growth. Or do we worry only about table/toast/index size on disk?
>>
>> Yep. I (and I believe the original author had the same worry) am
>> worried a lot about the size of the table and the fact that this table
>> will be by far the biggest table in our DB while most of the old
>> records will never be touched. And by the fact that this is the only
>> table that our users will have to know about to clean up separately
>> from all others pretty much always. If not even worrying about money
>> spent by our users, and performance degradation that comes with
>> databases that are bigger - that's a lot of environmental effects that
>> we might incur. Airflow is heavily used, if suddenly all our users
>> will start having 10 bigger databases that they have now because we
>> will deprecate the values and keep all the history, then we have a big
>> number of extra disks that will have to be used. I'd strongly prefer a
>> solution where we keep the data usage lower in this case.
>>
>> > If we do not want to grant users the ability to clean up rendered templates tables, there could be another option:
>> > - Do not delete records on every task instance run.
>> > - Delete once per defined period (hourly, daily, weekly, monthly). In this case you really could not care about locks.
>>
>> Yes we could come up with a different strategy as to "when" run the
>> cleanup. This is also a viable option. If you can propose one that
>> will be equally adaptive as the current solution, I am all ears.
>> Basically my goal is to keep the usage of the table low, possibly
>> controlled by the same parameter we had. How we do it - this is a
>> different story. If we - for example add a thread in the scheduler
>> (for example) that performs such cleanup effectively in parallel and
>> scales, I am happy with that.
>>
>> But I am trying to get into the head of the author trying to
>> understand why the original implementation was done this way. I
>> believe (and maybe those who remember it better could confirm it) that
>> distributing the deletion to tasks to clean up after itself is a
>> better idea than centralising the cleanup. This makes each cleanup
>> smaller, locks are held for a shorter time (at least that was the
>> assumption where no full table scan was used), it is more "immediate"
>> and you do not have to decide upfront what should be the cleanup
>> frequency. It seems this is the best logical approach to keep the
>> "MAX_NUM_RENDERED_TI_FIELDS_PER_TASK" promise. Simply after task is
>> complete, you can be sure that there are no more than this number of
>> fields per task in the DB. With a scheduled run, that would be a much
>> more "eventual" consistency and it will be potentially fluctuating
>> much more.
>>
>> But there are risks involved in having a single thread doing the
>> cleanup. I think it has a huge risk of being a "stop-the world" and
>> "deadlock-prone" kind of event - if in big instances there are a lot
>> of rows to cleanup in a single pass. When you delete entries from a
>> table, there are anti-insert locks created for existing index entries,
>> which makes it possible to rollback the whole DELETE transaction.
>> Which means that when you try to insert the row with the same index,
>> the index will be held. And this means that when you run a single huge
>> DELETE for multiple rows affected with multiple (all?) index keys
>> matching select query, it will effectively prevent new rows with the
>> same indexes that are matching the SELECT. It would mean that if you
>> have some tasks running while deleting existing run_id rendered
>> fields, then you could REALLY start having deadlocks on those tasks
>> trying to insert rendered task instance rows. That's why I think the
>> only viable strategy for single "cleanup" thread is to do such cleanup
>> as separate DELETE for each of the "dag/task/map_index/run" - in order
>> to avoid such deadlocks. Which effectively will turn into what have
>> currently - only that currently those transactions are done by tasks,
>> not by a single cleanup thread.
>>
>> Also using tasks to delete old rows is more "effective" when you have
>> vast differences in frequency of DAGs. Naturally - when you do it in
>> task, you will only do it "when needed" for given DAG + Task. If you
>> try to centralize the cleanup, unless you include somehow schedule and
>> frequency of each dag, you are going to check every DAG every time
>> your run the cleanup - no matter if that DAG is run daily or every
>> minute, you will have to run the cleanup frequently enough to match
>> your most frequent dags. If you have 1000 dags that run hourly and one
>> DAG that runs every minue, then you have to run a cleanup job that
>> scans all DAGs every few minutes. That's a big waste.
>>
>> So I am not sure if we gain anything by centralizing the cleanup.
>> Decentralising it to Task seems to be a well thought and sound
>> decision (but I think the problem we have now is that we need to
>> optimize it after Dynamic Task Mapping has been added).
>>
>> ANOTHER FINDING:
>>
>> While looking at the code and discussing it and looking more closely I
>> **think** there is another problem that we have to fix regardless of a
>> solution. I THINK a problem we might have now is that we do not
>> include map_index in this DELETE. While we are curreently delete all
>> the rendered task fields without including map_index - and for big
>> dynamic tasks, it means that exacly the same DELETE query is run by
>> every single mapped instance of that tasks and that is where a lot of
>> contention and locking might happen (basically when single task
>> instance does the delete, anti-insert locks held the other mapped
>> instances of the same task from inserting rendered fields).
>>
>> It does not change much in the optimisation proposal of mine, other
>> than we should include map_index in those queries. But I think this
>> might cause a lot of delays in the current implementation.
>>
>> J.
>>
>> > ----
>> > Best Wishes
>> > Andrey Anshin
>> >
>> >
>> >
>> > On Mon, 30 Jan 2023 at 23:51, Jarek Potiuk <ja...@potiuk.com> wrote:
>> >>
>> >> I think there is a good reason to clean those up automatically.
>> >> rendered task instance fields are almost arbitrary in size. If we try
>> >> to keep all historical values there by default, there are numerous
>> >> cases it will grow very fast - far, far too quickly.
>> >>
>> >> And I am not worried at all about locks on this table if we do it the
>> >> way I described it and it uses the indexes. The contention this way
>> >> might only be between the two deleting tasks. and with the query I
>> >> proposed, they will only last for a short time - the index will be
>> >> locked when two DELETES  or SELECT DISTINCT - which should both be
>> >> fast.
>> >>
>> >>
>> >> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <an...@taragol.is> wrote:
>> >> >
>> >> > I guess two things involved to reduce performance on this query through the time: Dynamic Task Mapping and run_id instead of execution date.
>> >> >
>> >> > I still personally think that changing the default value from 30 to 0 might improve performance of multiple concurrent tasks, just because this query does not run and there are no locks on multiple records/pages.
>> >> >
>> >> > I do not have any proof (yet?) other than simple DAGs. I think that there is some cross point exists when keeping this table growth worse rather than cleanup for each TI run. But users have ability to cleanup table by execute airflow db clean which should improve performance again
>> >> >
>> >> > And also there is interesting behavior with this query: if user already have more that value specified by AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill than rendered templates not written to table (or may be inserted and after that immediately deleted), the same is valid for cleanup old tasks.
>> >> >
>> >> > ----
>> >> > Best Wishes
>> >> > Andrey Anshin
>> >> >
>> >> >
>> >> >
>> >> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com> wrote:
>> >> >>
>> >> >> Yep. Agree this is not an efficient query and dynamic task mapping
>> >> >> makes the effect much worse. Generally speaking, selecting "what
>> >> >> should be left" and then deleting stuff where the key is "not in" is
>> >> >> never an efficient way of running an sql query.  And the query not
>> >> >> using index at all makes it rather terrible.
>> >> >>
>> >> >> I think we should not deprecate it though, but find a more efficient
>> >> >> way of deleting the old keys. I think we could slightly denormalize
>> >> >> RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
>> >> >> to the RenderedTaskInstance table and that will be enough to optimise
>> >> >> it.
>> >> >>
>> >> >> Then we could have either:
>> >> >>
>> >> >> * a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID,
>> >> >> RUN_ID_EXECUTION_DATE
>> >> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
>> >> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
>> >> >>
>> >> >> Probably the latter is better as I am not sure how < , > comparison
>> >> >> looks like for composite B-TREE indexes when char + date columns are
>> >> >> mixed. Also we could have hit the infamous MySQL index key length
>> >> >> limit.
>> >> >>
>> >> >> Then deletion process would look roughly like:
>> >> >>
>> >> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
>> >> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
>> >> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
>> >> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
>> >> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
>> >> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
>> >> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
>> >> >>
>> >> >> I believe that would be fast, and it would use the B-TREE index
>> >> >> features nicely (ordering support)
>> >> >>
>> >> >> J
>> >> >>
>> >> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <an...@taragol.is> wrote:
>> >> >> >
>> >> >> > First of all I want to highlight that this approach I guess worked well until Dynamic Task Mappings introduced.
>> >> >> >
>> >> >> > > The main reason for adding that cleanup was -- if you don't do that, you will have many rows, similar to the TaskInstance table
>> >> >> >
>> >> >> > The problem itself is not how big your table/indexes, rather then what kind of operation you run.
>> >> >> >
>> >> >> > > Do you have any data for locks or performance degradation?
>> >> >> >
>> >> >> > In this case if we try to clean up rendered_task_instance_fields table when a new TI is created/cleared we make almost two full/sequential scans (note: need to check) against the table without any index usage, so we pay here a couple times:
>> >> >> > 1. We scan without indexes - not all parts of the composite key are included to query, plus we need to filter everything except 30 records with order and distinct
>> >> >> > 2. After that we make another full scan for find 1 record or map_size records
>> >> >> >
>> >> >> > And I guess the situation becomes worse if you have a lot of tasks, even if we have a small table, we need to do ineffective operations.
>> >> >> >
>> >> >> > That how looks like Query Plan (please note without commit transaction DELETE operation doesn't have all information): https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
>> >> >> >
>> >> >> > In case if we do not clean up the table, we only use these operations:
>> >> >> > 1. SELECT single record by index
>> >> >> > 2. INSERT new record
>> >> >> > 3. DELETE old record(s), which were found by index.
>> >> >> >
>> >> >> > I have not done any real tests yet, only synthetic DAGs (so we should not consider to use any findings as totally truth): https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
>> >> >> > DAG with parallel tasks: degradation up to 2-3 times
>> >> >> > DAG with single map tasks: degradation up to 7-10 times
>> >> >> >
>> >> >> > I have a plan for more complex and more close to real use cases with Database which do not have network latency almost 0 as I have in my local.
>> >> >> > But I will not refuse if someone also does their tests with AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
>> >> >> >
>> >> >> > About deadlock we know that it exists at least in MySQL: https://github.com/apache/airflow/pull/18616
>> >> >> >
>> >> >> > > And the larger tables create problems during database migrations.
>> >> >> >
>> >> >> > That is a very good point, so if we found that problem only related to migrations we could:
>> >> >> > 1. Cleanup this table in migration
>> >> >> > 2. Add cli command to airflow db which could cleanup only rendered fields, so it would be user's choice cleanup or not before migration, do periodical maintenance or not
>> >> >> >
>> >> >> >
>> >> >> > ----
>> >> >> > Best Wishes
>> >> >> > Andrey Anshin
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com> wrote:
>> >> >> >>>
>> >> >> >>> Potentially it is a good idea to deprecate this option and recommend for users to set it to 0? WDYT? Maybe someone has already tried or investigated this?
>> >> >> >>
>> >> >> >>
>> >> >> >> The main reason for adding that cleanup was -- if you don't do that, you will have many rows, similar to the TaskInstance table. And the RenderedTIFields were mainly added for checking rendered TI fields on the Webserver only because after DAG Serialization, the webserver won't have access to DAG files.
>> >> >> >>
>> >> >> >> And the larger tables create problems during database migrations.
>> >> >> >>
>> >> >> >> Do you have any data for locks or performance degradation?
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <an...@taragol.is> wrote:
>> >> >> >>>
>> >> >> >>> Greetings!
>> >> >> >>>
>> >> >> >>> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I probably found skeletons in the closet.
>> >> >> >>>
>> >> >> >>> Let's start from the beginning, initially I got this warning
>> >> >> >>>
>> >> >> >>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY columns added implicitly due to DISTINCT is deprecated and will be removed in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to explicitly include the appropriate columns in the columns clause (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
>> >> >> >>>
>> >> >> >>> "OK let's fix it!", I thought at first and started to investigate RenderedTaskInstanceFields model
>> >> >> >>>
>> >> >> >>> Skeleton #1:
>> >> >> >>>
>> >> >> >>> When I first time look on the code and comments it got me to thinking that part which keep only latest N Rendered Task Fields potentially could lead different performance degradation (Locks, Dead Locks, Data Bloating): see code https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
>> >> >> >>>
>> >> >> >>> Also this historical part (from Airflow 1.10.10) generate this SQL Statement (pg backend)
>> >> >> >>>
>> >> >> >>> DELETE FROM rendered_task_instance_fields
>> >> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
>> >> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
>> >> >> >>>   AND (
>> >> >> >>>     (
>> >> >> >>>       rendered_task_instance_fields.dag_id,
>> >> >> >>>       rendered_task_instance_fields.task_id,
>> >> >> >>>       rendered_task_instance_fields.run_id
>> >> >> >>>     ) NOT IN (
>> >> >> >>>       SELECT
>> >> >> >>>         anon_1.dag_id,
>> >> >> >>>         anon_1.task_id,
>> >> >> >>>         anon_1.run_id
>> >> >> >>>       FROM
>> >> >> >>>         (
>> >> >> >>>           SELECT DISTINCT
>> >> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
>> >> >> >>>             rendered_task_instance_fields.task_id AS task_id,
>> >> >> >>>             rendered_task_instance_fields.run_id AS run_id,
>> >> >> >>>             dag_run.execution_date AS execution_date
>> >> >> >>>           FROM rendered_task_instance_fields
>> >> >> >>>             JOIN dag_run ON rendered_task_instance_fields.dag_id = dag_run.dag_id
>> >> >> >>>             AND rendered_task_instance_fields.run_id = dag_run.run_id
>> >> >> >>>           WHERE
>> >> >> >>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
>> >> >> >>>             AND rendered_task_instance_fields.task_id = %(task_id_2) s
>> >> >> >>>           ORDER BY
>> >> >> >>>             dag_run.execution_date DESC
>> >> >> >>>           limit %(param_1) s
>> >> >> >>>         ) AS anon_1
>> >> >> >>>     )
>> >> >> >>>   )
>> >> >> >>>
>> >> >> >>> Which is especially not effective in PostgreSQL. When IN SUBQUERY could be easily transform internaly into SEMI-JOIN (aka EXISTS clause), but it is not working for NOT IN SUBQUERY because it is not transformed into ANTI JOIN (aka NOT EXISTS clause) even if it possible, see: https://commitfest.postgresql.org/27/2023/
>> >> >> >>>
>> >> >> >>> I didn't do any performance benchmarks yet but I guess if users set AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than default 30 it could improve performance and reduce number of DeadLocks, however the table size will increase but I think we don't do any maintenance job for other tables.
>> >> >> >>>
>> >> >> >>> Potentially it is a good idea to deprecate this option and recommend for users to set it to 0? WDYT? Maybe someone has already tried or investigated this?
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> Skeleton #2:
>> >> >> >>>
>> >> >> >>> We have a k8s_pod_yaml field which is exclusively used by K8S executors.
>> >> >> >>>
>> >> >> >>> Should we also decouple this field as part of AIP-51?
>> >> >> >>>
>> >> >> >>> ----
>> >> >> >>> Best Wishes
>> >> >> >>> Andrey Anshin
>> >> >> >>>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Andrey Anshin <an...@taragol.is>.
> I think that is all something to be tested with explaining plans.

I would be really careful with these results. DELETE in Transaction with
Rollback usually shows more optimistic than actually executed.


> I think we would not know before we try - and possibly there are other
optimisation approaches. The optimisation I proposed was only first
that came to my mind to avoid the "not in" query. The problem  with
"not in query" is that there is no way to optimise it by the DB.
Effectively you have to get every record (or index entry) and test it.
Maybe it can be done better :). And yes locking the index with
anti-insert locks and the need to rebalance trees during the delete is
a concern.

My point is still the same, I would rather remove it in the future or make
policy about maintenance more consistent: all or nothing. Right now we are
close to nothing rather than all.


> It's not about index size or JSON access. It is about the size of the
actual rows and storage it takes - i.e. general size of the database.

I'm tired, but I'm not sure that I understand the actual point.
Is it not really a matter of size of the table if you always access by
pattern:
1 request which returns exactly 1 record accessed by a unique index.
Basically query travercial by index find reference to single CTID/rowid (or
whatever name used in other RDBMS).
So at this point it really matters how fast your index grows rather than
table size.


> The problem with it is that (especially with dynamic task mapping), it
might grow really, really fast. Basically you have NUM_DAGS x
NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
of records there.
Back-of-the envelope calculation Assuming you have a
DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
fields, each field evaluating to 1K string.  Then you have 10 tasks *
100 map indexes * 10 fields * 1K rendered string size = 10MB to store
per one(!) run of one(1) DAG. Run it every 10 minutes and every day
your database from a single DAG grows by whooping 1.44 GB of data
every single day (from single DAG).

Depends on DB. If we talk about Postgres you could easily miss up to 3-4
times (thanks for inline compression before/instead of TOASTs).
I have a couple questions:
1. Do you know how big would be indexes in rendered_task_instance_fields
after one day? (Spoiler alert I could give estimation in the morning)
2. In this scenario with default settings always would keep up to 30 000 TI
for this DAG.
Could someone who wants to optimize the query make it more optimal rather
than access the table by index (Unique Index -> ctid/rowid - record) and
where this crosspoint?

> This is of course an estimation that assumes a lot, but it's not at all
unrealistic.

144,000 TI per/day on single DAG (initially I want to put here some
sarcastic message).
How would Airflow feel with 144k Task Instances per day? How
I asked because right now I've always had a task_instance table bigger than
rendered_task_instance_fields.


> This table is very specific compared with the other tables. The only
reason for it being here is to be able
to show the rendered fields in the UI if you go to the specific run of a
task. If you clean-up other tables you basically lose the history of
execution of the tasks and you cannot really know if the data has been
processed, you cannot do backfills effectively, you lose all the
context. Cleaning this table is merely about the values that have been
rendered for a specific run and the assumption there is that the older
it gets, the less interesting it is.

What about these tables: session, log, job? I expected the answer would be
"They are not so specific."
For me every table is specific for their purpose.
Users often asked (slack, issues, discussions) to give the ability to
auto-maintain tables/logs and receive the usual answer: "No, we don't give
you this opportunity, please use something for Airflow Ecosystem Page". But
on the other hand we have auto-maintenance only for a single table.

>> It is opposite of what we have right now, we scan tables (maybe multiple
times), read all records tuples which contain JSON.
> Not sure if I get the point here :). Yes -in my proposal I think the
records will not be touched - only indexes. So the cleanup should be way
faster, contentions less of problem, due to the way the delete
uses < ordering, deadlocks will not be possible at all (as opposed to the
current "not in" - there is a very easy way to get into deadlocks when two
parallel deletes are trying to delete same rows in a different sequence. I
think my proposal improves all the characteristics of the "cleanup" with
very little extra penalty on record creation.

I was talking about the current solution and why it is also slow (and if
abstract users use some DBaaS where they also pay for IOPs then it is
expensive). Let's talk about the benefits of optimised queries for 4
different DB backends (3 if excluding SQLite) when we have it.

> We pay for table/index size linary more records, more size. But other
operations vary and depend on B-Tree implementation and usually it has
logarithmic growth. Or do we worry only about table/toast/index size on
disk?
>> Yep. I (and I believe the original author had the same worry) am worried
a lot about the size of the table and the fact that this table will be by
far the biggest table in our DB while most of the old records will never be
touched. And by the fact that this is the only table that our users will
have to know about to clean up separately from all others pretty much
always.

Same as previous.

> If not even worrying about money spent by our users, and performance
degradation that comes with databases that are bigger - that's a lot of
environmental effects that we might incur. Airflow is heavily used, if
suddenly all our users
will start having 10 bigger databases that they have now because we will
deprecate the values and keep all the history, then we have a big number of
extra disks that will have to be used. I'd strongly prefer a solution where
we keep the data usage lower in this case.

Am I right that this is all about "lets don't delete by default as we do
for other tables" rather than the current default implementation?
Because I get the result which is opposite what you said.
And rendered_task_instance_fields don't grow faster than other tables that
what I got.
I would like to compare it with other findings and some reproducible
metrics rather than with hypothetical things.

----
Best Wishes
*Andrey Anshin*



On Tue, 31 Jan 2023 at 11:12, Jarek Potiuk <ja...@potiuk.com> wrote:

> COMMENT: While writing the answer here, I think I found a deeper
> problem (and optimisation needed)  - i.e I think the delete should be
> even more fine-grained than it is today and include map_index) -
> please take a look at the end (Also maybe TP might comment on that
> one).
>
> > 1. Additional indexes add additional performance degradation on Insert
> but gain potential improvements on delete and unknown on update, RDBMS
> still require rebalance index and make it consistent to the table.
> > 2. LIMIT x OFFSET y could easily become full seq scan, especially if the
> user set a huge number for offset (which? unknown).
> > 3. Mixing two indexes could improve performance in a single query but in
> concurrent execution might lead to degradation because it needs to create a
> bitmap table for comparison between these two indexes, as result it might
> lead different issues, such as OOM on DB backend, use swaps or optimiser
> decided that better not to use this indexes.
>
> I think that is all something to be tested with explain plans. I think
> we would not know before we try - and possibly there are other
> optimisation approaches. The optimisation I proposed was only first
> that came to my mind to avoid the "not in" query. The problem  with
> "not in query" is that there is no way to optimise it by the DB.
> Effectively you have to get every record (or index entry) and test it.
> Maybe it can be done better :). And yes locking the index with
> anti-insert locks and the need to rebalance trees during the delete is
> a concern.
>
> > Is it a real problem? Until we access only by indexes, which doesn't
> include this JSON, it really doesn't matter. I guess we almost always
> should make a UNIQUE INDEX SCAN for SELECT or DELETE (UPDATE) a single
> record.
>
> Yes I think so, and while. I was not the author of this "cleanup"
> code, I believe I know the intention.
>
> It's not about index size or JSON access. It is about the size of the
> actual rows and storage it takes - i.e. general size of the database.
> The problem with it is that (especially with dynamic task mapping), it
> might grow really, really fast. Basically you have NUM_DAGS x
> NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
> of records there. Back-of-the envelope calculation Assuming you have a
> DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
> fields, each field evaluating to 1K string.  Then you have 10 tasks *
> 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
> per one(!) run of one(1) DAG. Run it every 10 minutes and every day
> your database from a single DAG grows by whooping 1.44 GB of data
> every single day (from single DAG).This is of course an estimation
> that assumes a lot, but it's not at all unrealistic. That's a lot. And
> if we want the user to do the cleanup then a) they need to know it b)
> they need to specifically clean up this table only because all the
> other data is relatively small. This table is very specific compared
> with the other tables. The only reason for it being here is to be able
> to show the rendered fields in the UI if you go to the specific run of
> a task. If you clean-up other tables you basically lose the history of
> execution of the tasks and you cannot really know if the data has been
> processed, you cannot do backfills effectively, you lose all the
> context. Cleaning this table is merely about the values that have been
> rendered for a specific run and the assumption there is that the older
> it gets, the less interesting it is.
>
> > It is opposite of what we have right now, we scan tables (maybe multiple
> times), read all records tuples which contain JSON.
>
> Not sure if I get the point here :). Yes -in my proposal I think the
> records will not be touched - only indexes. So the cleanup should be
> way faster, contentions less of problem, due to the way the delete
> uses < ordering, deadlocks will not be possible at all (as opposed to
> the current "not in" - there is a very easy way to get into deadlocks
> when two parallel deletes are trying to delete same rows in a
> different sequence. I think my proposal improves all the
> characteristics of the "cleanup" with very little extra penalty on
> record creation.
>
> > We pay for table/index size linary more records, more size. But other
> operations vary and depend on B-Tree implementation and usually it has
> logarithmic growth. Or do we worry only about table/toast/index size on
> disk?
>
> Yep. I (and I believe the original author had the same worry) am
> worried a lot about the size of the table and the fact that this table
> will be by far the biggest table in our DB while most of the old
> records will never be touched. And by the fact that this is the only
> table that our users will have to know about to clean up separately
> from all others pretty much always. If not even worrying about money
> spent by our users, and performance degradation that comes with
> databases that are bigger - that's a lot of environmental effects that
> we might incur. Airflow is heavily used, if suddenly all our users
> will start having 10 bigger databases that they have now because we
> will deprecate the values and keep all the history, then we have a big
> number of extra disks that will have to be used. I'd strongly prefer a
> solution where we keep the data usage lower in this case.
>
> > If we do not want to grant users the ability to clean up rendered
> templates tables, there could be another option:
> > - Do not delete records on every task instance run.
> > - Delete once per defined period (hourly, daily, weekly, monthly). In
> this case you really could not care about locks.
>
> Yes we could come up with a different strategy as to "when" run the
> cleanup. This is also a viable option. If you can propose one that
> will be equally adaptive as the current solution, I am all ears.
> Basically my goal is to keep the usage of the table low, possibly
> controlled by the same parameter we had. How we do it - this is a
> different story. If we - for example add a thread in the scheduler
> (for example) that performs such cleanup effectively in parallel and
> scales, I am happy with that.
>
> But I am trying to get into the head of the author trying to
> understand why the original implementation was done this way. I
> believe (and maybe those who remember it better could confirm it) that
> distributing the deletion to tasks to clean up after itself is a
> better idea than centralising the cleanup. This makes each cleanup
> smaller, locks are held for a shorter time (at least that was the
> assumption where no full table scan was used), it is more "immediate"
> and you do not have to decide upfront what should be the cleanup
> frequency. It seems this is the best logical approach to keep the
> "MAX_NUM_RENDERED_TI_FIELDS_PER_TASK" promise. Simply after task is
> complete, you can be sure that there are no more than this number of
> fields per task in the DB. With a scheduled run, that would be a much
> more "eventual" consistency and it will be potentially fluctuating
> much more.
>
> But there are risks involved in having a single thread doing the
> cleanup. I think it has a huge risk of being a "stop-the world" and
> "deadlock-prone" kind of event - if in big instances there are a lot
> of rows to cleanup in a single pass. When you delete entries from a
> table, there are anti-insert locks created for existing index entries,
> which makes it possible to rollback the whole DELETE transaction.
> Which means that when you try to insert the row with the same index,
> the index will be held. And this means that when you run a single huge
> DELETE for multiple rows affected with multiple (all?) index keys
> matching select query, it will effectively prevent new rows with the
> same indexes that are matching the SELECT. It would mean that if you
> have some tasks running while deleting existing run_id rendered
> fields, then you could REALLY start having deadlocks on those tasks
> trying to insert rendered task instance rows. That's why I think the
> only viable strategy for single "cleanup" thread is to do such cleanup
> as separate DELETE for each of the "dag/task/map_index/run" - in order
> to avoid such deadlocks. Which effectively will turn into what have
> currently - only that currently those transactions are done by tasks,
> not by a single cleanup thread.
>
> Also using tasks to delete old rows is more "effective" when you have
> vast differences in frequency of DAGs. Naturally - when you do it in
> task, you will only do it "when needed" for given DAG + Task. If you
> try to centralize the cleanup, unless you include somehow schedule and
> frequency of each dag, you are going to check every DAG every time
> your run the cleanup - no matter if that DAG is run daily or every
> minute, you will have to run the cleanup frequently enough to match
> your most frequent dags. If you have 1000 dags that run hourly and one
> DAG that runs every minue, then you have to run a cleanup job that
> scans all DAGs every few minutes. That's a big waste.
>
> So I am not sure if we gain anything by centralizing the cleanup.
> Decentralising it to Task seems to be a well thought and sound
> decision (but I think the problem we have now is that we need to
> optimize it after Dynamic Task Mapping has been added).
>
> ANOTHER FINDING:
>
> While looking at the code and discussing it and looking more closely I
> **think** there is another problem that we have to fix regardless of a
> solution. I THINK a problem we might have now is that we do not
> include map_index in this DELETE. While we are curreently delete all
> the rendered task fields without including map_index - and for big
> dynamic tasks, it means that exacly the same DELETE query is run by
> every single mapped instance of that tasks and that is where a lot of
> contention and locking might happen (basically when single task
> instance does the delete, anti-insert locks held the other mapped
> instances of the same task from inserting rendered fields).
>
> It does not change much in the optimisation proposal of mine, other
> than we should include map_index in those queries. But I think this
> might cause a lot of delays in the current implementation.
>
> J.
>
> > ----
> > Best Wishes
> > Andrey Anshin
> >
> >
> >
> > On Mon, 30 Jan 2023 at 23:51, Jarek Potiuk <ja...@potiuk.com> wrote:
> >>
> >> I think there is a good reason to clean those up automatically.
> >> rendered task instance fields are almost arbitrary in size. If we try
> >> to keep all historical values there by default, there are numerous
> >> cases it will grow very fast - far, far too quickly.
> >>
> >> And I am not worried at all about locks on this table if we do it the
> >> way I described it and it uses the indexes. The contention this way
> >> might only be between the two deleting tasks. and with the query I
> >> proposed, they will only last for a short time - the index will be
> >> locked when two DELETES  or SELECT DISTINCT - which should both be
> >> fast.
> >>
> >>
> >> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <an...@taragol.is>
> wrote:
> >> >
> >> > I guess two things involved to reduce performance on this query
> through the time: Dynamic Task Mapping and run_id instead of execution date.
> >> >
> >> > I still personally think that changing the default value from 30 to 0
> might improve performance of multiple concurrent tasks, just because this
> query does not run and there are no locks on multiple records/pages.
> >> >
> >> > I do not have any proof (yet?) other than simple DAGs. I think that
> there is some cross point exists when keeping this table growth worse
> rather than cleanup for each TI run. But users have ability to cleanup
> table by execute airflow db clean which should improve performance again
> >> >
> >> > And also there is interesting behavior with this query: if user
> already have more that value specified by
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill
> than rendered templates not written to table (or may be inserted and after
> that immediately deleted), the same is valid for cleanup old tasks.
> >> >
> >> > ----
> >> > Best Wishes
> >> > Andrey Anshin
> >> >
> >> >
> >> >
> >> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com> wrote:
> >> >>
> >> >> Yep. Agree this is not an efficient query and dynamic task mapping
> >> >> makes the effect much worse. Generally speaking, selecting "what
> >> >> should be left" and then deleting stuff where the key is "not in" is
> >> >> never an efficient way of running an sql query.  And the query not
> >> >> using index at all makes it rather terrible.
> >> >>
> >> >> I think we should not deprecate it though, but find a more efficient
> >> >> way of deleting the old keys. I think we could slightly denormalize
> >> >> RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
> >> >> to the RenderedTaskInstance table and that will be enough to optimise
> >> >> it.
> >> >>
> >> >> Then we could have either:
> >> >>
> >> >> * a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID,
> >> >> RUN_ID_EXECUTION_DATE
> >> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
> >> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
> >> >>
> >> >> Probably the latter is better as I am not sure how < , > comparison
> >> >> looks like for composite B-TREE indexes when char + date columns are
> >> >> mixed. Also we could have hit the infamous MySQL index key length
> >> >> limit.
> >> >>
> >> >> Then deletion process would look roughly like:
> >> >>
> >> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
> >> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
> >> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
> >> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
> >> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
> >> >>
> >> >> I believe that would be fast, and it would use the B-TREE index
> >> >> features nicely (ordering support)
> >> >>
> >> >> J
> >> >>
> >> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >> >
> >> >> > First of all I want to highlight that this approach I guess worked
> well until Dynamic Task Mappings introduced.
> >> >> >
> >> >> > > The main reason for adding that cleanup was -- if you don't do
> that, you will have many rows, similar to the TaskInstance table
> >> >> >
> >> >> > The problem itself is not how big your table/indexes, rather then
> what kind of operation you run.
> >> >> >
> >> >> > > Do you have any data for locks or performance degradation?
> >> >> >
> >> >> > In this case if we try to clean up rendered_task_instance_fields
> table when a new TI is created/cleared we make almost two full/sequential
> scans (note: need to check) against the table without any index usage, so
> we pay here a couple times:
> >> >> > 1. We scan without indexes - not all parts of the composite key
> are included to query, plus we need to filter everything except 30 records
> with order and distinct
> >> >> > 2. After that we make another full scan for find 1 record or
> map_size records
> >> >> >
> >> >> > And I guess the situation becomes worse if you have a lot of
> tasks, even if we have a small table, we need to do ineffective operations.
> >> >> >
> >> >> > That how looks like Query Plan (please note without commit
> transaction DELETE operation doesn't have all information):
> https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
> >> >> >
> >> >> > In case if we do not clean up the table, we only use these
> operations:
> >> >> > 1. SELECT single record by index
> >> >> > 2. INSERT new record
> >> >> > 3. DELETE old record(s), which were found by index.
> >> >> >
> >> >> > I have not done any real tests yet, only synthetic DAGs (so we
> should not consider to use any findings as totally truth):
> https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
> >> >> > DAG with parallel tasks: degradation up to 2-3 times
> >> >> > DAG with single map tasks: degradation up to 7-10 times
> >> >> >
> >> >> > I have a plan for more complex and more close to real use cases
> with Database which do not have network latency almost 0 as I have in my
> local.
> >> >> > But I will not refuse if someone also does their tests with
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
> >> >> >
> >> >> > About deadlock we know that it exists at least in MySQL:
> https://github.com/apache/airflow/pull/18616
> >> >> >
> >> >> > > And the larger tables create problems during database migrations.
> >> >> >
> >> >> > That is a very good point, so if we found that problem only
> related to migrations we could:
> >> >> > 1. Cleanup this table in migration
> >> >> > 2. Add cli command to airflow db which could cleanup only rendered
> fields, so it would be user's choice cleanup or not before migration, do
> periodical maintenance or not
> >> >> >
> >> >> >
> >> >> > ----
> >> >> > Best Wishes
> >> >> > Andrey Anshin
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com>
> wrote:
> >> >> >>>
> >> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >> >>
> >> >> >>
> >> >> >> The main reason for adding that cleanup was -- if you don't do
> that, you will have many rows, similar to the TaskInstance table. And the
> RenderedTIFields were mainly added for checking rendered TI fields on the
> Webserver only because after DAG Serialization, the webserver won't have
> access to DAG files.
> >> >> >>
> >> >> >> And the larger tables create problems during database migrations.
> >> >> >>
> >> >> >> Do you have any data for locks or performance degradation?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >> >>>
> >> >> >>> Greetings!
> >> >> >>>
> >> >> >>> During migrating our ORM syntax to compatible with SQLAlchemy
> 2.0 I probably found skeletons in the closet.
> >> >> >>>
> >> >> >>> Let's start from the beginning, initially I got this warning
> >> >> >>>
> >> >> >>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER
> BY columns added implicitly due to DISTINCT is deprecated and will be
> removed in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be
> written to explicitly include the appropriate columns in the columns clause
> (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
> >> >> >>>
> >> >> >>> "OK let's fix it!", I thought at first and started to
> investigate RenderedTaskInstanceFields model
> >> >> >>>
> >> >> >>> Skeleton #1:
> >> >> >>>
> >> >> >>> When I first time look on the code and comments it got me to
> thinking that part which keep only latest N Rendered Task Fields
> potentially could lead different performance degradation (Locks, Dead
> Locks, Data Bloating): see code
> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
> >> >> >>>
> >> >> >>> Also this historical part (from Airflow 1.10.10) generate this
> SQL Statement (pg backend)
> >> >> >>>
> >> >> >>> DELETE FROM rendered_task_instance_fields
> >> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
> >> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
> >> >> >>>   AND (
> >> >> >>>     (
> >> >> >>>       rendered_task_instance_fields.dag_id,
> >> >> >>>       rendered_task_instance_fields.task_id,
> >> >> >>>       rendered_task_instance_fields.run_id
> >> >> >>>     ) NOT IN (
> >> >> >>>       SELECT
> >> >> >>>         anon_1.dag_id,
> >> >> >>>         anon_1.task_id,
> >> >> >>>         anon_1.run_id
> >> >> >>>       FROM
> >> >> >>>         (
> >> >> >>>           SELECT DISTINCT
> >> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
> >> >> >>>             rendered_task_instance_fields.task_id AS task_id,
> >> >> >>>             rendered_task_instance_fields.run_id AS run_id,
> >> >> >>>             dag_run.execution_date AS execution_date
> >> >> >>>           FROM rendered_task_instance_fields
> >> >> >>>             JOIN dag_run ON rendered_task_instance_fields.dag_id
> = dag_run.dag_id
> >> >> >>>             AND rendered_task_instance_fields.run_id =
> dag_run.run_id
> >> >> >>>           WHERE
> >> >> >>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
> >> >> >>>             AND rendered_task_instance_fields.task_id =
> %(task_id_2) s
> >> >> >>>           ORDER BY
> >> >> >>>             dag_run.execution_date DESC
> >> >> >>>           limit %(param_1) s
> >> >> >>>         ) AS anon_1
> >> >> >>>     )
> >> >> >>>   )
> >> >> >>>
> >> >> >>> Which is especially not effective in PostgreSQL. When IN
> SUBQUERY could be easily transform internaly into SEMI-JOIN (aka EXISTS
> clause), but it is not working for NOT IN SUBQUERY because it is not
> transformed into ANTI JOIN (aka NOT EXISTS clause) even if it possible,
> see: https://commitfest.postgresql.org/27/2023/
> >> >> >>>
> >> >> >>> I didn't do any performance benchmarks yet but I guess if users
> set AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than
> default 30 it could improve performance and reduce number of DeadLocks,
> however the table size will increase but I think we don't do any
> maintenance job for other tables.
> >> >> >>>
> >> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >> >>>
> >> >> >>>
> >> >> >>> Skeleton #2:
> >> >> >>>
> >> >> >>> We have a k8s_pod_yaml field which is exclusively used by K8S
> executors.
> >> >> >>>
> >> >> >>> Should we also decouple this field as part of AIP-51?
> >> >> >>>
> >> >> >>> ----
> >> >> >>> Best Wishes
> >> >> >>> Andrey Anshin
> >> >> >>>
>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Andrey Anshin <an...@taragol.is>.
Whoooa Jarek, really huge comments. I will try to answer in the evening
when I have free time.

Before I went to sleep I ran a sample with pretty huge rendered template
field with disabled auto deletion.
If someone interested you could find the result in this gist:
https://gist.github.com/Taragolis/df2e86ffcb3d6fc3edd490d60fcb758d

This rendered_task_instance_fields is not the biggest table in the Airflow
database particularly in this dummy case, it could be changed for more than
one task in a single DAG run but others should be still bigger. I didn't
run the same with enabled deletion just because previous tests show that
spikes in less DAG runs bigger than current.

If someone has time it would be nice to compare more complicated cases but
I bet that in more complicated cases there is more chance that Database
dies in torment with enabled rendered_templates deletion. Or at least
provide DAG for testing, we could collect different cases and then use for
final decision which way is better: keep as rendered_task_instance_fields
unchanged (as other Airflow DB tables) or autocleanup by optimised queries
for particular DB backend.

And still my IMHO, if user doesn't has some extreme Airflow usage than for
Airflow 2.3+  AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 it is a
valid option, especially if quite a few dynamic tasks uses, because right
now it conflicts to each other


----
Best Wishes
*Andrey Anshin*



On Tue, 31 Jan 2023 at 11:12, Jarek Potiuk <ja...@potiuk.com> wrote:

> COMMENT: While writing the answer here, I think I found a deeper
> problem (and optimisation needed)  - i.e I think the delete should be
> even more fine-grained than it is today and include map_index) -
> please take a look at the end (Also maybe TP might comment on that
> one).
>
> > 1. Additional indexes add additional performance degradation on Insert
> but gain potential improvements on delete and unknown on update, RDBMS
> still require rebalance index and make it consistent to the table.
> > 2. LIMIT x OFFSET y could easily become full seq scan, especially if the
> user set a huge number for offset (which? unknown).
> > 3. Mixing two indexes could improve performance in a single query but in
> concurrent execution might lead to degradation because it needs to create a
> bitmap table for comparison between these two indexes, as result it might
> lead different issues, such as OOM on DB backend, use swaps or optimiser
> decided that better not to use this indexes.
>
> I think that is all something to be tested with explain plans. I think
> we would not know before we try - and possibly there are other
> optimisation approaches. The optimisation I proposed was only first
> that came to my mind to avoid the "not in" query. The problem  with
> "not in query" is that there is no way to optimise it by the DB.
> Effectively you have to get every record (or index entry) and test it.
> Maybe it can be done better :). And yes locking the index with
> anti-insert locks and the need to rebalance trees during the delete is
> a concern.
>
> > Is it a real problem? Until we access only by indexes, which doesn't
> include this JSON, it really doesn't matter. I guess we almost always
> should make a UNIQUE INDEX SCAN for SELECT or DELETE (UPDATE) a single
> record.
>
> Yes I think so, and while. I was not the author of this "cleanup"
> code, I believe I know the intention.
>
> It's not about index size or JSON access. It is about the size of the
> actual rows and storage it takes - i.e. general size of the database.
> The problem with it is that (especially with dynamic task mapping), it
> might grow really, really fast. Basically you have NUM_DAGS x
> NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
> of records there. Back-of-the envelope calculation Assuming you have a
> DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
> fields, each field evaluating to 1K string.  Then you have 10 tasks *
> 100 map indexes * 10 fields * 1K rendered string size = 10MB to store
> per one(!) run of one(1) DAG. Run it every 10 minutes and every day
> your database from a single DAG grows by whooping 1.44 GB of data
> every single day (from single DAG).This is of course an estimation
> that assumes a lot, but it's not at all unrealistic. That's a lot. And
> if we want the user to do the cleanup then a) they need to know it b)
> they need to specifically clean up this table only because all the
> other data is relatively small. This table is very specific compared
> with the other tables. The only reason for it being here is to be able
> to show the rendered fields in the UI if you go to the specific run of
> a task. If you clean-up other tables you basically lose the history of
> execution of the tasks and you cannot really know if the data has been
> processed, you cannot do backfills effectively, you lose all the
> context. Cleaning this table is merely about the values that have been
> rendered for a specific run and the assumption there is that the older
> it gets, the less interesting it is.
>
> > It is opposite of what we have right now, we scan tables (maybe multiple
> times), read all records tuples which contain JSON.
>
> Not sure if I get the point here :). Yes -in my proposal I think the
> records will not be touched - only indexes. So the cleanup should be
> way faster, contentions less of problem, due to the way the delete
> uses < ordering, deadlocks will not be possible at all (as opposed to
> the current "not in" - there is a very easy way to get into deadlocks
> when two parallel deletes are trying to delete same rows in a
> different sequence. I think my proposal improves all the
> characteristics of the "cleanup" with very little extra penalty on
> record creation.
>
> > We pay for table/index size linary more records, more size. But other
> operations vary and depend on B-Tree implementation and usually it has
> logarithmic growth. Or do we worry only about table/toast/index size on
> disk?
>
> Yep. I (and I believe the original author had the same worry) am
> worried a lot about the size of the table and the fact that this table
> will be by far the biggest table in our DB while most of the old
> records will never be touched. And by the fact that this is the only
> table that our users will have to know about to clean up separately
> from all others pretty much always. If not even worrying about money
> spent by our users, and performance degradation that comes with
> databases that are bigger - that's a lot of environmental effects that
> we might incur. Airflow is heavily used, if suddenly all our users
> will start having 10 bigger databases that they have now because we
> will deprecate the values and keep all the history, then we have a big
> number of extra disks that will have to be used. I'd strongly prefer a
> solution where we keep the data usage lower in this case.
>
> > If we do not want to grant users the ability to clean up rendered
> templates tables, there could be another option:
> > - Do not delete records on every task instance run.
> > - Delete once per defined period (hourly, daily, weekly, monthly). In
> this case you really could not care about locks.
>
> Yes we could come up with a different strategy as to "when" run the
> cleanup. This is also a viable option. If you can propose one that
> will be equally adaptive as the current solution, I am all ears.
> Basically my goal is to keep the usage of the table low, possibly
> controlled by the same parameter we had. How we do it - this is a
> different story. If we - for example add a thread in the scheduler
> (for example) that performs such cleanup effectively in parallel and
> scales, I am happy with that.
>
> But I am trying to get into the head of the author trying to
> understand why the original implementation was done this way. I
> believe (and maybe those who remember it better could confirm it) that
> distributing the deletion to tasks to clean up after itself is a
> better idea than centralising the cleanup. This makes each cleanup
> smaller, locks are held for a shorter time (at least that was the
> assumption where no full table scan was used), it is more "immediate"
> and you do not have to decide upfront what should be the cleanup
> frequency. It seems this is the best logical approach to keep the
> "MAX_NUM_RENDERED_TI_FIELDS_PER_TASK" promise. Simply after task is
> complete, you can be sure that there are no more than this number of
> fields per task in the DB. With a scheduled run, that would be a much
> more "eventual" consistency and it will be potentially fluctuating
> much more.
>
> But there are risks involved in having a single thread doing the
> cleanup. I think it has a huge risk of being a "stop-the world" and
> "deadlock-prone" kind of event - if in big instances there are a lot
> of rows to cleanup in a single pass. When you delete entries from a
> table, there are anti-insert locks created for existing index entries,
> which makes it possible to rollback the whole DELETE transaction.
> Which means that when you try to insert the row with the same index,
> the index will be held. And this means that when you run a single huge
> DELETE for multiple rows affected with multiple (all?) index keys
> matching select query, it will effectively prevent new rows with the
> same indexes that are matching the SELECT. It would mean that if you
> have some tasks running while deleting existing run_id rendered
> fields, then you could REALLY start having deadlocks on those tasks
> trying to insert rendered task instance rows. That's why I think the
> only viable strategy for single "cleanup" thread is to do such cleanup
> as separate DELETE for each of the "dag/task/map_index/run" - in order
> to avoid such deadlocks. Which effectively will turn into what have
> currently - only that currently those transactions are done by tasks,
> not by a single cleanup thread.
>
> Also using tasks to delete old rows is more "effective" when you have
> vast differences in frequency of DAGs. Naturally - when you do it in
> task, you will only do it "when needed" for given DAG + Task. If you
> try to centralize the cleanup, unless you include somehow schedule and
> frequency of each dag, you are going to check every DAG every time
> your run the cleanup - no matter if that DAG is run daily or every
> minute, you will have to run the cleanup frequently enough to match
> your most frequent dags. If you have 1000 dags that run hourly and one
> DAG that runs every minue, then you have to run a cleanup job that
> scans all DAGs every few minutes. That's a big waste.
>
> So I am not sure if we gain anything by centralizing the cleanup.
> Decentralising it to Task seems to be a well thought and sound
> decision (but I think the problem we have now is that we need to
> optimize it after Dynamic Task Mapping has been added).
>
> ANOTHER FINDING:
>
> While looking at the code and discussing it and looking more closely I
> **think** there is another problem that we have to fix regardless of a
> solution. I THINK a problem we might have now is that we do not
> include map_index in this DELETE. While we are curreently delete all
> the rendered task fields without including map_index - and for big
> dynamic tasks, it means that exacly the same DELETE query is run by
> every single mapped instance of that tasks and that is where a lot of
> contention and locking might happen (basically when single task
> instance does the delete, anti-insert locks held the other mapped
> instances of the same task from inserting rendered fields).
>
> It does not change much in the optimisation proposal of mine, other
> than we should include map_index in those queries. But I think this
> might cause a lot of delays in the current implementation.
>
> J.
>
> > ----
> > Best Wishes
> > Andrey Anshin
> >
> >
> >
> > On Mon, 30 Jan 2023 at 23:51, Jarek Potiuk <ja...@potiuk.com> wrote:
> >>
> >> I think there is a good reason to clean those up automatically.
> >> rendered task instance fields are almost arbitrary in size. If we try
> >> to keep all historical values there by default, there are numerous
> >> cases it will grow very fast - far, far too quickly.
> >>
> >> And I am not worried at all about locks on this table if we do it the
> >> way I described it and it uses the indexes. The contention this way
> >> might only be between the two deleting tasks. and with the query I
> >> proposed, they will only last for a short time - the index will be
> >> locked when two DELETES  or SELECT DISTINCT - which should both be
> >> fast.
> >>
> >>
> >> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <an...@taragol.is>
> wrote:
> >> >
> >> > I guess two things involved to reduce performance on this query
> through the time: Dynamic Task Mapping and run_id instead of execution date.
> >> >
> >> > I still personally think that changing the default value from 30 to 0
> might improve performance of multiple concurrent tasks, just because this
> query does not run and there are no locks on multiple records/pages.
> >> >
> >> > I do not have any proof (yet?) other than simple DAGs. I think that
> there is some cross point exists when keeping this table growth worse
> rather than cleanup for each TI run. But users have ability to cleanup
> table by execute airflow db clean which should improve performance again
> >> >
> >> > And also there is interesting behavior with this query: if user
> already have more that value specified by
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill
> than rendered templates not written to table (or may be inserted and after
> that immediately deleted), the same is valid for cleanup old tasks.
> >> >
> >> > ----
> >> > Best Wishes
> >> > Andrey Anshin
> >> >
> >> >
> >> >
> >> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com> wrote:
> >> >>
> >> >> Yep. Agree this is not an efficient query and dynamic task mapping
> >> >> makes the effect much worse. Generally speaking, selecting "what
> >> >> should be left" and then deleting stuff where the key is "not in" is
> >> >> never an efficient way of running an sql query.  And the query not
> >> >> using index at all makes it rather terrible.
> >> >>
> >> >> I think we should not deprecate it though, but find a more efficient
> >> >> way of deleting the old keys. I think we could slightly denormalize
> >> >> RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
> >> >> to the RenderedTaskInstance table and that will be enough to optimise
> >> >> it.
> >> >>
> >> >> Then we could have either:
> >> >>
> >> >> * a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID,
> >> >> RUN_ID_EXECUTION_DATE
> >> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
> >> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
> >> >>
> >> >> Probably the latter is better as I am not sure how < , > comparison
> >> >> looks like for composite B-TREE indexes when char + date columns are
> >> >> mixed. Also we could have hit the infamous MySQL index key length
> >> >> limit.
> >> >>
> >> >> Then deletion process would look roughly like:
> >> >>
> >> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
> >> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
> >> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
> >> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
> >> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
> >> >>
> >> >> I believe that would be fast, and it would use the B-TREE index
> >> >> features nicely (ordering support)
> >> >>
> >> >> J
> >> >>
> >> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >> >
> >> >> > First of all I want to highlight that this approach I guess worked
> well until Dynamic Task Mappings introduced.
> >> >> >
> >> >> > > The main reason for adding that cleanup was -- if you don't do
> that, you will have many rows, similar to the TaskInstance table
> >> >> >
> >> >> > The problem itself is not how big your table/indexes, rather then
> what kind of operation you run.
> >> >> >
> >> >> > > Do you have any data for locks or performance degradation?
> >> >> >
> >> >> > In this case if we try to clean up rendered_task_instance_fields
> table when a new TI is created/cleared we make almost two full/sequential
> scans (note: need to check) against the table without any index usage, so
> we pay here a couple times:
> >> >> > 1. We scan without indexes - not all parts of the composite key
> are included to query, plus we need to filter everything except 30 records
> with order and distinct
> >> >> > 2. After that we make another full scan for find 1 record or
> map_size records
> >> >> >
> >> >> > And I guess the situation becomes worse if you have a lot of
> tasks, even if we have a small table, we need to do ineffective operations.
> >> >> >
> >> >> > That how looks like Query Plan (please note without commit
> transaction DELETE operation doesn't have all information):
> https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
> >> >> >
> >> >> > In case if we do not clean up the table, we only use these
> operations:
> >> >> > 1. SELECT single record by index
> >> >> > 2. INSERT new record
> >> >> > 3. DELETE old record(s), which were found by index.
> >> >> >
> >> >> > I have not done any real tests yet, only synthetic DAGs (so we
> should not consider to use any findings as totally truth):
> https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
> >> >> > DAG with parallel tasks: degradation up to 2-3 times
> >> >> > DAG with single map tasks: degradation up to 7-10 times
> >> >> >
> >> >> > I have a plan for more complex and more close to real use cases
> with Database which do not have network latency almost 0 as I have in my
> local.
> >> >> > But I will not refuse if someone also does their tests with
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
> >> >> >
> >> >> > About deadlock we know that it exists at least in MySQL:
> https://github.com/apache/airflow/pull/18616
> >> >> >
> >> >> > > And the larger tables create problems during database migrations.
> >> >> >
> >> >> > That is a very good point, so if we found that problem only
> related to migrations we could:
> >> >> > 1. Cleanup this table in migration
> >> >> > 2. Add cli command to airflow db which could cleanup only rendered
> fields, so it would be user's choice cleanup or not before migration, do
> periodical maintenance or not
> >> >> >
> >> >> >
> >> >> > ----
> >> >> > Best Wishes
> >> >> > Andrey Anshin
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com>
> wrote:
> >> >> >>>
> >> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >> >>
> >> >> >>
> >> >> >> The main reason for adding that cleanup was -- if you don't do
> that, you will have many rows, similar to the TaskInstance table. And the
> RenderedTIFields were mainly added for checking rendered TI fields on the
> Webserver only because after DAG Serialization, the webserver won't have
> access to DAG files.
> >> >> >>
> >> >> >> And the larger tables create problems during database migrations.
> >> >> >>
> >> >> >> Do you have any data for locks or performance degradation?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >> >>>
> >> >> >>> Greetings!
> >> >> >>>
> >> >> >>> During migrating our ORM syntax to compatible with SQLAlchemy
> 2.0 I probably found skeletons in the closet.
> >> >> >>>
> >> >> >>> Let's start from the beginning, initially I got this warning
> >> >> >>>
> >> >> >>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER
> BY columns added implicitly due to DISTINCT is deprecated and will be
> removed in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be
> written to explicitly include the appropriate columns in the columns clause
> (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
> >> >> >>>
> >> >> >>> "OK let's fix it!", I thought at first and started to
> investigate RenderedTaskInstanceFields model
> >> >> >>>
> >> >> >>> Skeleton #1:
> >> >> >>>
> >> >> >>> When I first time look on the code and comments it got me to
> thinking that part which keep only latest N Rendered Task Fields
> potentially could lead different performance degradation (Locks, Dead
> Locks, Data Bloating): see code
> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
> >> >> >>>
> >> >> >>> Also this historical part (from Airflow 1.10.10) generate this
> SQL Statement (pg backend)
> >> >> >>>
> >> >> >>> DELETE FROM rendered_task_instance_fields
> >> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
> >> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
> >> >> >>>   AND (
> >> >> >>>     (
> >> >> >>>       rendered_task_instance_fields.dag_id,
> >> >> >>>       rendered_task_instance_fields.task_id,
> >> >> >>>       rendered_task_instance_fields.run_id
> >> >> >>>     ) NOT IN (
> >> >> >>>       SELECT
> >> >> >>>         anon_1.dag_id,
> >> >> >>>         anon_1.task_id,
> >> >> >>>         anon_1.run_id
> >> >> >>>       FROM
> >> >> >>>         (
> >> >> >>>           SELECT DISTINCT
> >> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
> >> >> >>>             rendered_task_instance_fields.task_id AS task_id,
> >> >> >>>             rendered_task_instance_fields.run_id AS run_id,
> >> >> >>>             dag_run.execution_date AS execution_date
> >> >> >>>           FROM rendered_task_instance_fields
> >> >> >>>             JOIN dag_run ON rendered_task_instance_fields.dag_id
> = dag_run.dag_id
> >> >> >>>             AND rendered_task_instance_fields.run_id =
> dag_run.run_id
> >> >> >>>           WHERE
> >> >> >>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
> >> >> >>>             AND rendered_task_instance_fields.task_id =
> %(task_id_2) s
> >> >> >>>           ORDER BY
> >> >> >>>             dag_run.execution_date DESC
> >> >> >>>           limit %(param_1) s
> >> >> >>>         ) AS anon_1
> >> >> >>>     )
> >> >> >>>   )
> >> >> >>>
> >> >> >>> Which is especially not effective in PostgreSQL. When IN
> SUBQUERY could be easily transform internaly into SEMI-JOIN (aka EXISTS
> clause), but it is not working for NOT IN SUBQUERY because it is not
> transformed into ANTI JOIN (aka NOT EXISTS clause) even if it possible,
> see: https://commitfest.postgresql.org/27/2023/
> >> >> >>>
> >> >> >>> I didn't do any performance benchmarks yet but I guess if users
> set AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than
> default 30 it could improve performance and reduce number of DeadLocks,
> however the table size will increase but I think we don't do any
> maintenance job for other tables.
> >> >> >>>
> >> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >> >>>
> >> >> >>>
> >> >> >>> Skeleton #2:
> >> >> >>>
> >> >> >>> We have a k8s_pod_yaml field which is exclusively used by K8S
> executors.
> >> >> >>>
> >> >> >>> Should we also decouple this field as part of AIP-51?
> >> >> >>>
> >> >> >>> ----
> >> >> >>> Best Wishes
> >> >> >>> Andrey Anshin
> >> >> >>>
>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Jarek Potiuk <ja...@potiuk.com>.
COMMENT: While writing the answer here, I think I found a deeper
problem (and optimisation needed)  - i.e I think the delete should be
even more fine-grained than it is today and include map_index) -
please take a look at the end (Also maybe TP might comment on that
one).

> 1. Additional indexes add additional performance degradation on Insert but gain potential improvements on delete and unknown on update, RDBMS still require rebalance index and make it consistent to the table.
> 2. LIMIT x OFFSET y could easily become full seq scan, especially if the user set a huge number for offset (which? unknown).
> 3. Mixing two indexes could improve performance in a single query but in concurrent execution might lead to degradation because it needs to create a bitmap table for comparison between these two indexes, as result it might lead different issues, such as OOM on DB backend, use swaps or optimiser decided that better not to use this indexes.

I think that is all something to be tested with explain plans. I think
we would not know before we try - and possibly there are other
optimisation approaches. The optimisation I proposed was only first
that came to my mind to avoid the "not in" query. The problem  with
"not in query" is that there is no way to optimise it by the DB.
Effectively you have to get every record (or index entry) and test it.
Maybe it can be done better :). And yes locking the index with
anti-insert locks and the need to rebalance trees during the delete is
a concern.

> Is it a real problem? Until we access only by indexes, which doesn't include this JSON, it really doesn't matter. I guess we almost always should make a UNIQUE INDEX SCAN for SELECT or DELETE (UPDATE) a single record.

Yes I think so, and while. I was not the author of this "cleanup"
code, I believe I know the intention.

It's not about index size or JSON access. It is about the size of the
actual rows and storage it takes - i.e. general size of the database.
The problem with it is that (especially with dynamic task mapping), it
might grow really, really fast. Basically you have NUM_DAGS x
NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS  * NUM_RUNS number
of records there. Back-of-the envelope calculation Assuming you have a
DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10
fields, each field evaluating to 1K string.  Then you have 10 tasks *
100 map indexes * 10 fields * 1K rendered string size = 10MB to store
per one(!) run of one(1) DAG. Run it every 10 minutes and every day
your database from a single DAG grows by whooping 1.44 GB of data
every single day (from single DAG).This is of course an estimation
that assumes a lot, but it's not at all unrealistic. That's a lot. And
if we want the user to do the cleanup then a) they need to know it b)
they need to specifically clean up this table only because all the
other data is relatively small. This table is very specific compared
with the other tables. The only reason for it being here is to be able
to show the rendered fields in the UI if you go to the specific run of
a task. If you clean-up other tables you basically lose the history of
execution of the tasks and you cannot really know if the data has been
processed, you cannot do backfills effectively, you lose all the
context. Cleaning this table is merely about the values that have been
rendered for a specific run and the assumption there is that the older
it gets, the less interesting it is.

> It is opposite of what we have right now, we scan tables (maybe multiple times), read all records tuples which contain JSON.

Not sure if I get the point here :). Yes -in my proposal I think the
records will not be touched - only indexes. So the cleanup should be
way faster, contentions less of problem, due to the way the delete
uses < ordering, deadlocks will not be possible at all (as opposed to
the current "not in" - there is a very easy way to get into deadlocks
when two parallel deletes are trying to delete same rows in a
different sequence. I think my proposal improves all the
characteristics of the "cleanup" with very little extra penalty on
record creation.

> We pay for table/index size linary more records, more size. But other operations vary and depend on B-Tree implementation and usually it has logarithmic growth. Or do we worry only about table/toast/index size on disk?

Yep. I (and I believe the original author had the same worry) am
worried a lot about the size of the table and the fact that this table
will be by far the biggest table in our DB while most of the old
records will never be touched. And by the fact that this is the only
table that our users will have to know about to clean up separately
from all others pretty much always. If not even worrying about money
spent by our users, and performance degradation that comes with
databases that are bigger - that's a lot of environmental effects that
we might incur. Airflow is heavily used, if suddenly all our users
will start having 10 bigger databases that they have now because we
will deprecate the values and keep all the history, then we have a big
number of extra disks that will have to be used. I'd strongly prefer a
solution where we keep the data usage lower in this case.

> If we do not want to grant users the ability to clean up rendered templates tables, there could be another option:
> - Do not delete records on every task instance run.
> - Delete once per defined period (hourly, daily, weekly, monthly). In this case you really could not care about locks.

Yes we could come up with a different strategy as to "when" run the
cleanup. This is also a viable option. If you can propose one that
will be equally adaptive as the current solution, I am all ears.
Basically my goal is to keep the usage of the table low, possibly
controlled by the same parameter we had. How we do it - this is a
different story. If we - for example add a thread in the scheduler
(for example) that performs such cleanup effectively in parallel and
scales, I am happy with that.

But I am trying to get into the head of the author trying to
understand why the original implementation was done this way. I
believe (and maybe those who remember it better could confirm it) that
distributing the deletion to tasks to clean up after itself is a
better idea than centralising the cleanup. This makes each cleanup
smaller, locks are held for a shorter time (at least that was the
assumption where no full table scan was used), it is more "immediate"
and you do not have to decide upfront what should be the cleanup
frequency. It seems this is the best logical approach to keep the
"MAX_NUM_RENDERED_TI_FIELDS_PER_TASK" promise. Simply after task is
complete, you can be sure that there are no more than this number of
fields per task in the DB. With a scheduled run, that would be a much
more "eventual" consistency and it will be potentially fluctuating
much more.

But there are risks involved in having a single thread doing the
cleanup. I think it has a huge risk of being a "stop-the world" and
"deadlock-prone" kind of event - if in big instances there are a lot
of rows to cleanup in a single pass. When you delete entries from a
table, there are anti-insert locks created for existing index entries,
which makes it possible to rollback the whole DELETE transaction.
Which means that when you try to insert the row with the same index,
the index will be held. And this means that when you run a single huge
DELETE for multiple rows affected with multiple (all?) index keys
matching select query, it will effectively prevent new rows with the
same indexes that are matching the SELECT. It would mean that if you
have some tasks running while deleting existing run_id rendered
fields, then you could REALLY start having deadlocks on those tasks
trying to insert rendered task instance rows. That's why I think the
only viable strategy for single "cleanup" thread is to do such cleanup
as separate DELETE for each of the "dag/task/map_index/run" - in order
to avoid such deadlocks. Which effectively will turn into what have
currently - only that currently those transactions are done by tasks,
not by a single cleanup thread.

Also using tasks to delete old rows is more "effective" when you have
vast differences in frequency of DAGs. Naturally - when you do it in
task, you will only do it "when needed" for given DAG + Task. If you
try to centralize the cleanup, unless you include somehow schedule and
frequency of each dag, you are going to check every DAG every time
your run the cleanup - no matter if that DAG is run daily or every
minute, you will have to run the cleanup frequently enough to match
your most frequent dags. If you have 1000 dags that run hourly and one
DAG that runs every minue, then you have to run a cleanup job that
scans all DAGs every few minutes. That's a big waste.

So I am not sure if we gain anything by centralizing the cleanup.
Decentralising it to Task seems to be a well thought and sound
decision (but I think the problem we have now is that we need to
optimize it after Dynamic Task Mapping has been added).

ANOTHER FINDING:

While looking at the code and discussing it and looking more closely I
**think** there is another problem that we have to fix regardless of a
solution. I THINK a problem we might have now is that we do not
include map_index in this DELETE. While we are curreently delete all
the rendered task fields without including map_index - and for big
dynamic tasks, it means that exacly the same DELETE query is run by
every single mapped instance of that tasks and that is where a lot of
contention and locking might happen (basically when single task
instance does the delete, anti-insert locks held the other mapped
instances of the same task from inserting rendered fields).

It does not change much in the optimisation proposal of mine, other
than we should include map_index in those queries. But I think this
might cause a lot of delays in the current implementation.

J.

> ----
> Best Wishes
> Andrey Anshin
>
>
>
> On Mon, 30 Jan 2023 at 23:51, Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>> I think there is a good reason to clean those up automatically.
>> rendered task instance fields are almost arbitrary in size. If we try
>> to keep all historical values there by default, there are numerous
>> cases it will grow very fast - far, far too quickly.
>>
>> And I am not worried at all about locks on this table if we do it the
>> way I described it and it uses the indexes. The contention this way
>> might only be between the two deleting tasks. and with the query I
>> proposed, they will only last for a short time - the index will be
>> locked when two DELETES  or SELECT DISTINCT - which should both be
>> fast.
>>
>>
>> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <an...@taragol.is> wrote:
>> >
>> > I guess two things involved to reduce performance on this query through the time: Dynamic Task Mapping and run_id instead of execution date.
>> >
>> > I still personally think that changing the default value from 30 to 0 might improve performance of multiple concurrent tasks, just because this query does not run and there are no locks on multiple records/pages.
>> >
>> > I do not have any proof (yet?) other than simple DAGs. I think that there is some cross point exists when keeping this table growth worse rather than cleanup for each TI run. But users have ability to cleanup table by execute airflow db clean which should improve performance again
>> >
>> > And also there is interesting behavior with this query: if user already have more that value specified by AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill than rendered templates not written to table (or may be inserted and after that immediately deleted), the same is valid for cleanup old tasks.
>> >
>> > ----
>> > Best Wishes
>> > Andrey Anshin
>> >
>> >
>> >
>> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com> wrote:
>> >>
>> >> Yep. Agree this is not an efficient query and dynamic task mapping
>> >> makes the effect much worse. Generally speaking, selecting "what
>> >> should be left" and then deleting stuff where the key is "not in" is
>> >> never an efficient way of running an sql query.  And the query not
>> >> using index at all makes it rather terrible.
>> >>
>> >> I think we should not deprecate it though, but find a more efficient
>> >> way of deleting the old keys. I think we could slightly denormalize
>> >> RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
>> >> to the RenderedTaskInstance table and that will be enough to optimise
>> >> it.
>> >>
>> >> Then we could have either:
>> >>
>> >> * a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID,
>> >> RUN_ID_EXECUTION_DATE
>> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
>> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
>> >>
>> >> Probably the latter is better as I am not sure how < , > comparison
>> >> looks like for composite B-TREE indexes when char + date columns are
>> >> mixed. Also we could have hit the infamous MySQL index key length
>> >> limit.
>> >>
>> >> Then deletion process would look roughly like:
>> >>
>> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
>> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
>> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
>> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
>> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
>> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
>> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
>> >>
>> >> I believe that would be fast, and it would use the B-TREE index
>> >> features nicely (ordering support)
>> >>
>> >> J
>> >>
>> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <an...@taragol.is> wrote:
>> >> >
>> >> > First of all I want to highlight that this approach I guess worked well until Dynamic Task Mappings introduced.
>> >> >
>> >> > > The main reason for adding that cleanup was -- if you don't do that, you will have many rows, similar to the TaskInstance table
>> >> >
>> >> > The problem itself is not how big your table/indexes, rather then what kind of operation you run.
>> >> >
>> >> > > Do you have any data for locks or performance degradation?
>> >> >
>> >> > In this case if we try to clean up rendered_task_instance_fields table when a new TI is created/cleared we make almost two full/sequential scans (note: need to check) against the table without any index usage, so we pay here a couple times:
>> >> > 1. We scan without indexes - not all parts of the composite key are included to query, plus we need to filter everything except 30 records with order and distinct
>> >> > 2. After that we make another full scan for find 1 record or map_size records
>> >> >
>> >> > And I guess the situation becomes worse if you have a lot of tasks, even if we have a small table, we need to do ineffective operations.
>> >> >
>> >> > That how looks like Query Plan (please note without commit transaction DELETE operation doesn't have all information): https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
>> >> >
>> >> > In case if we do not clean up the table, we only use these operations:
>> >> > 1. SELECT single record by index
>> >> > 2. INSERT new record
>> >> > 3. DELETE old record(s), which were found by index.
>> >> >
>> >> > I have not done any real tests yet, only synthetic DAGs (so we should not consider to use any findings as totally truth): https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
>> >> > DAG with parallel tasks: degradation up to 2-3 times
>> >> > DAG with single map tasks: degradation up to 7-10 times
>> >> >
>> >> > I have a plan for more complex and more close to real use cases with Database which do not have network latency almost 0 as I have in my local.
>> >> > But I will not refuse if someone also does their tests with AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
>> >> >
>> >> > About deadlock we know that it exists at least in MySQL: https://github.com/apache/airflow/pull/18616
>> >> >
>> >> > > And the larger tables create problems during database migrations.
>> >> >
>> >> > That is a very good point, so if we found that problem only related to migrations we could:
>> >> > 1. Cleanup this table in migration
>> >> > 2. Add cli command to airflow db which could cleanup only rendered fields, so it would be user's choice cleanup or not before migration, do periodical maintenance or not
>> >> >
>> >> >
>> >> > ----
>> >> > Best Wishes
>> >> > Andrey Anshin
>> >> >
>> >> >
>> >> >
>> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com> wrote:
>> >> >>>
>> >> >>> Potentially it is a good idea to deprecate this option and recommend for users to set it to 0? WDYT? Maybe someone has already tried or investigated this?
>> >> >>
>> >> >>
>> >> >> The main reason for adding that cleanup was -- if you don't do that, you will have many rows, similar to the TaskInstance table. And the RenderedTIFields were mainly added for checking rendered TI fields on the Webserver only because after DAG Serialization, the webserver won't have access to DAG files.
>> >> >>
>> >> >> And the larger tables create problems during database migrations.
>> >> >>
>> >> >> Do you have any data for locks or performance degradation?
>> >> >>
>> >> >>
>> >> >>
>> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <an...@taragol.is> wrote:
>> >> >>>
>> >> >>> Greetings!
>> >> >>>
>> >> >>> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I probably found skeletons in the closet.
>> >> >>>
>> >> >>> Let's start from the beginning, initially I got this warning
>> >> >>>
>> >> >>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY columns added implicitly due to DISTINCT is deprecated and will be removed in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to explicitly include the appropriate columns in the columns clause (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
>> >> >>>
>> >> >>> "OK let's fix it!", I thought at first and started to investigate RenderedTaskInstanceFields model
>> >> >>>
>> >> >>> Skeleton #1:
>> >> >>>
>> >> >>> When I first time look on the code and comments it got me to thinking that part which keep only latest N Rendered Task Fields potentially could lead different performance degradation (Locks, Dead Locks, Data Bloating): see code https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
>> >> >>>
>> >> >>> Also this historical part (from Airflow 1.10.10) generate this SQL Statement (pg backend)
>> >> >>>
>> >> >>> DELETE FROM rendered_task_instance_fields
>> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
>> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
>> >> >>>   AND (
>> >> >>>     (
>> >> >>>       rendered_task_instance_fields.dag_id,
>> >> >>>       rendered_task_instance_fields.task_id,
>> >> >>>       rendered_task_instance_fields.run_id
>> >> >>>     ) NOT IN (
>> >> >>>       SELECT
>> >> >>>         anon_1.dag_id,
>> >> >>>         anon_1.task_id,
>> >> >>>         anon_1.run_id
>> >> >>>       FROM
>> >> >>>         (
>> >> >>>           SELECT DISTINCT
>> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
>> >> >>>             rendered_task_instance_fields.task_id AS task_id,
>> >> >>>             rendered_task_instance_fields.run_id AS run_id,
>> >> >>>             dag_run.execution_date AS execution_date
>> >> >>>           FROM rendered_task_instance_fields
>> >> >>>             JOIN dag_run ON rendered_task_instance_fields.dag_id = dag_run.dag_id
>> >> >>>             AND rendered_task_instance_fields.run_id = dag_run.run_id
>> >> >>>           WHERE
>> >> >>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
>> >> >>>             AND rendered_task_instance_fields.task_id = %(task_id_2) s
>> >> >>>           ORDER BY
>> >> >>>             dag_run.execution_date DESC
>> >> >>>           limit %(param_1) s
>> >> >>>         ) AS anon_1
>> >> >>>     )
>> >> >>>   )
>> >> >>>
>> >> >>> Which is especially not effective in PostgreSQL. When IN SUBQUERY could be easily transform internaly into SEMI-JOIN (aka EXISTS clause), but it is not working for NOT IN SUBQUERY because it is not transformed into ANTI JOIN (aka NOT EXISTS clause) even if it possible, see: https://commitfest.postgresql.org/27/2023/
>> >> >>>
>> >> >>> I didn't do any performance benchmarks yet but I guess if users set AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than default 30 it could improve performance and reduce number of DeadLocks, however the table size will increase but I think we don't do any maintenance job for other tables.
>> >> >>>
>> >> >>> Potentially it is a good idea to deprecate this option and recommend for users to set it to 0? WDYT? Maybe someone has already tried or investigated this?
>> >> >>>
>> >> >>>
>> >> >>> Skeleton #2:
>> >> >>>
>> >> >>> We have a k8s_pod_yaml field which is exclusively used by K8S executors.
>> >> >>>
>> >> >>> Should we also decouple this field as part of AIP-51?
>> >> >>>
>> >> >>> ----
>> >> >>> Best Wishes
>> >> >>> Andrey Anshin
>> >> >>>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Andrey Anshin <an...@taragol.is>.
There are some potential drawbacks in your suggestion (you never know
until you try).

1. Additional indexes add additional performance degradation on Insert but
gain potential improvements on delete and unknown on update, RDBMS still
require rebalance index and make it consistent to the table.
2. LIMIT x OFFSET y could easily become full seq scan, especially if the
user set a huge number for offset (which? unknown).
3. Mixing two indexes could improve performance in a single query but in
concurrent execution might lead to degradation because it needs to create a
bitmap table for comparison between these two indexes, as result it might
lead different issues, such as OOM on DB backend, use swaps or optimiser
decided that better not to use this indexes.

> If we try to keep all historical values there by default, there are
numerous cases it will grow very fast - far, far too quickly.

Is it a real problem? Until we access only by indexes, which doesn't
include this JSON, it really doesn't matter. I guess we almost always
should make a UNIQUE INDEX SCAN for SELECT or DELETE (UPDATE) a single
record.

It is opposite of what we have right now, we scan tables (maybe multiple
times), read all records tuples which contain JSON.

We pay for table/index size linary more records, more size. But other
operations vary and depend on B-Tree implementation and usually it has
logarithmic growth. Or do we worry only about table/toast/index size on
disk?

If we do not want to grant users the ability to clean up rendered templates
tables, there could be another option:
- Do not delete records on every task instance run.
- Delete once per defined period (hourly, daily, weekly, monthly). In this
case you really could not care about locks.

WDYT?

----
Best Wishes
*Andrey Anshin*



On Mon, 30 Jan 2023 at 23:51, Jarek Potiuk <ja...@potiuk.com> wrote:

> I think there is a good reason to clean those up automatically.
> rendered task instance fields are almost arbitrary in size. If we try
> to keep all historical values there by default, there are numerous
> cases it will grow very fast - far, far too quickly.
>
> And I am not worried at all about locks on this table if we do it the
> way I described it and it uses the indexes. The contention this way
> might only be between the two deleting tasks. and with the query I
> proposed, they will only last for a short time - the index will be
> locked when two DELETES  or SELECT DISTINCT - which should both be
> fast.
>
>
> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <an...@taragol.is>
> wrote:
> >
> > I guess two things involved to reduce performance on this query through
> the time: Dynamic Task Mapping and run_id instead of execution date.
> >
> > I still personally think that changing the default value from 30 to 0
> might improve performance of multiple concurrent tasks, just because this
> query does not run and there are no locks on multiple records/pages.
> >
> > I do not have any proof (yet?) other than simple DAGs. I think that
> there is some cross point exists when keeping this table growth worse
> rather than cleanup for each TI run. But users have ability to cleanup
> table by execute airflow db clean which should improve performance again
> >
> > And also there is interesting behavior with this query: if user already
> have more that value specified by
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill
> than rendered templates not written to table (or may be inserted and after
> that immediately deleted), the same is valid for cleanup old tasks.
> >
> > ----
> > Best Wishes
> > Andrey Anshin
> >
> >
> >
> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com> wrote:
> >>
> >> Yep. Agree this is not an efficient query and dynamic task mapping
> >> makes the effect much worse. Generally speaking, selecting "what
> >> should be left" and then deleting stuff where the key is "not in" is
> >> never an efficient way of running an sql query.  And the query not
> >> using index at all makes it rather terrible.
> >>
> >> I think we should not deprecate it though, but find a more efficient
> >> way of deleting the old keys. I think we could slightly denormalize
> >> RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
> >> to the RenderedTaskInstance table and that will be enough to optimise
> >> it.
> >>
> >> Then we could have either:
> >>
> >> * a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID,
> >> RUN_ID_EXECUTION_DATE
> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
> >>
> >> Probably the latter is better as I am not sure how < , > comparison
> >> looks like for composite B-TREE indexes when char + date columns are
> >> mixed. Also we could have hit the infamous MySQL index key length
> >> limit.
> >>
> >> Then deletion process would look roughly like:
> >>
> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
> >>
> >> I believe that would be fast, and it would use the B-TREE index
> >> features nicely (ordering support)
> >>
> >> J
> >>
> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <an...@taragol.is>
> wrote:
> >> >
> >> > First of all I want to highlight that this approach I guess worked
> well until Dynamic Task Mappings introduced.
> >> >
> >> > > The main reason for adding that cleanup was -- if you don't do
> that, you will have many rows, similar to the TaskInstance table
> >> >
> >> > The problem itself is not how big your table/indexes, rather then
> what kind of operation you run.
> >> >
> >> > > Do you have any data for locks or performance degradation?
> >> >
> >> > In this case if we try to clean up rendered_task_instance_fields
> table when a new TI is created/cleared we make almost two full/sequential
> scans (note: need to check) against the table without any index usage, so
> we pay here a couple times:
> >> > 1. We scan without indexes - not all parts of the composite key are
> included to query, plus we need to filter everything except 30 records with
> order and distinct
> >> > 2. After that we make another full scan for find 1 record or map_size
> records
> >> >
> >> > And I guess the situation becomes worse if you have a lot of tasks,
> even if we have a small table, we need to do ineffective operations.
> >> >
> >> > That how looks like Query Plan (please note without commit
> transaction DELETE operation doesn't have all information):
> https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
> >> >
> >> > In case if we do not clean up the table, we only use these operations:
> >> > 1. SELECT single record by index
> >> > 2. INSERT new record
> >> > 3. DELETE old record(s), which were found by index.
> >> >
> >> > I have not done any real tests yet, only synthetic DAGs (so we should
> not consider to use any findings as totally truth):
> https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
> >> > DAG with parallel tasks: degradation up to 2-3 times
> >> > DAG with single map tasks: degradation up to 7-10 times
> >> >
> >> > I have a plan for more complex and more close to real use cases with
> Database which do not have network latency almost 0 as I have in my local.
> >> > But I will not refuse if someone also does their tests with
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
> >> >
> >> > About deadlock we know that it exists at least in MySQL:
> https://github.com/apache/airflow/pull/18616
> >> >
> >> > > And the larger tables create problems during database migrations.
> >> >
> >> > That is a very good point, so if we found that problem only related
> to migrations we could:
> >> > 1. Cleanup this table in migration
> >> > 2. Add cli command to airflow db which could cleanup only rendered
> fields, so it would be user's choice cleanup or not before migration, do
> periodical maintenance or not
> >> >
> >> >
> >> > ----
> >> > Best Wishes
> >> > Andrey Anshin
> >> >
> >> >
> >> >
> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com> wrote:
> >> >>>
> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >>
> >> >>
> >> >> The main reason for adding that cleanup was -- if you don't do that,
> you will have many rows, similar to the TaskInstance table. And the
> RenderedTIFields were mainly added for checking rendered TI fields on the
> Webserver only because after DAG Serialization, the webserver won't have
> access to DAG files.
> >> >>
> >> >> And the larger tables create problems during database migrations.
> >> >>
> >> >> Do you have any data for locks or performance degradation?
> >> >>
> >> >>
> >> >>
> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <
> andrey.anshin@taragol.is> wrote:
> >> >>>
> >> >>> Greetings!
> >> >>>
> >> >>> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I
> probably found skeletons in the closet.
> >> >>>
> >> >>> Let's start from the beginning, initially I got this warning
> >> >>>
> >> >>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY
> columns added implicitly due to DISTINCT is deprecated and will be removed
> in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to
> explicitly include the appropriate columns in the columns clause
> (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
> >> >>>
> >> >>> "OK let's fix it!", I thought at first and started to investigate
> RenderedTaskInstanceFields model
> >> >>>
> >> >>> Skeleton #1:
> >> >>>
> >> >>> When I first time look on the code and comments it got me to
> thinking that part which keep only latest N Rendered Task Fields
> potentially could lead different performance degradation (Locks, Dead
> Locks, Data Bloating): see code
> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
> >> >>>
> >> >>> Also this historical part (from Airflow 1.10.10) generate this SQL
> Statement (pg backend)
> >> >>>
> >> >>> DELETE FROM rendered_task_instance_fields
> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
> >> >>>   AND (
> >> >>>     (
> >> >>>       rendered_task_instance_fields.dag_id,
> >> >>>       rendered_task_instance_fields.task_id,
> >> >>>       rendered_task_instance_fields.run_id
> >> >>>     ) NOT IN (
> >> >>>       SELECT
> >> >>>         anon_1.dag_id,
> >> >>>         anon_1.task_id,
> >> >>>         anon_1.run_id
> >> >>>       FROM
> >> >>>         (
> >> >>>           SELECT DISTINCT
> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
> >> >>>             rendered_task_instance_fields.task_id AS task_id,
> >> >>>             rendered_task_instance_fields.run_id AS run_id,
> >> >>>             dag_run.execution_date AS execution_date
> >> >>>           FROM rendered_task_instance_fields
> >> >>>             JOIN dag_run ON rendered_task_instance_fields.dag_id =
> dag_run.dag_id
> >> >>>             AND rendered_task_instance_fields.run_id =
> dag_run.run_id
> >> >>>           WHERE
> >> >>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
> >> >>>             AND rendered_task_instance_fields.task_id =
> %(task_id_2) s
> >> >>>           ORDER BY
> >> >>>             dag_run.execution_date DESC
> >> >>>           limit %(param_1) s
> >> >>>         ) AS anon_1
> >> >>>     )
> >> >>>   )
> >> >>>
> >> >>> Which is especially not effective in PostgreSQL. When IN SUBQUERY
> could be easily transform internaly into SEMI-JOIN (aka EXISTS clause), but
> it is not working for NOT IN SUBQUERY because it is not transformed into
> ANTI JOIN (aka NOT EXISTS clause) even if it possible, see:
> https://commitfest.postgresql.org/27/2023/
> >> >>>
> >> >>> I didn't do any performance benchmarks yet but I guess if users set
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than default
> 30 it could improve performance and reduce number of DeadLocks, however the
> table size will increase but I think we don't do any maintenance job for
> other tables.
> >> >>>
> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >>>
> >> >>>
> >> >>> Skeleton #2:
> >> >>>
> >> >>> We have a k8s_pod_yaml field which is exclusively used by K8S
> executors.
> >> >>>
> >> >>> Should we also decouple this field as part of AIP-51?
> >> >>>
> >> >>> ----
> >> >>> Best Wishes
> >> >>> Andrey Anshin
> >> >>>
>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Jarek Potiuk <ja...@potiuk.com>.
I think there is a good reason to clean those up automatically.
rendered task instance fields are almost arbitrary in size. If we try
to keep all historical values there by default, there are numerous
cases it will grow very fast - far, far too quickly.

And I am not worried at all about locks on this table if we do it the
way I described it and it uses the indexes. The contention this way
might only be between the two deleting tasks. and with the query I
proposed, they will only last for a short time - the index will be
locked when two DELETES  or SELECT DISTINCT - which should both be
fast.


On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <an...@taragol.is> wrote:
>
> I guess two things involved to reduce performance on this query through the time: Dynamic Task Mapping and run_id instead of execution date.
>
> I still personally think that changing the default value from 30 to 0 might improve performance of multiple concurrent tasks, just because this query does not run and there are no locks on multiple records/pages.
>
> I do not have any proof (yet?) other than simple DAGs. I think that there is some cross point exists when keeping this table growth worse rather than cleanup for each TI run. But users have ability to cleanup table by execute airflow db clean which should improve performance again
>
> And also there is interesting behavior with this query: if user already have more that value specified by AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill than rendered templates not written to table (or may be inserted and after that immediately deleted), the same is valid for cleanup old tasks.
>
> ----
> Best Wishes
> Andrey Anshin
>
>
>
> On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>> Yep. Agree this is not an efficient query and dynamic task mapping
>> makes the effect much worse. Generally speaking, selecting "what
>> should be left" and then deleting stuff where the key is "not in" is
>> never an efficient way of running an sql query.  And the query not
>> using index at all makes it rather terrible.
>>
>> I think we should not deprecate it though, but find a more efficient
>> way of deleting the old keys. I think we could slightly denormalize
>> RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
>> to the RenderedTaskInstance table and that will be enough to optimise
>> it.
>>
>> Then we could have either:
>>
>> * a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID,
>> RUN_ID_EXECUTION_DATE
>> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
>> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
>>
>> Probably the latter is better as I am not sure how < , > comparison
>> looks like for composite B-TREE indexes when char + date columns are
>> mixed. Also we could have hit the infamous MySQL index key length
>> limit.
>>
>> Then deletion process would look roughly like:
>>
>> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
>> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
>> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
>> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
>> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
>> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
>> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
>>
>> I believe that would be fast, and it would use the B-TREE index
>> features nicely (ordering support)
>>
>> J
>>
>> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <an...@taragol.is> wrote:
>> >
>> > First of all I want to highlight that this approach I guess worked well until Dynamic Task Mappings introduced.
>> >
>> > > The main reason for adding that cleanup was -- if you don't do that, you will have many rows, similar to the TaskInstance table
>> >
>> > The problem itself is not how big your table/indexes, rather then what kind of operation you run.
>> >
>> > > Do you have any data for locks or performance degradation?
>> >
>> > In this case if we try to clean up rendered_task_instance_fields table when a new TI is created/cleared we make almost two full/sequential scans (note: need to check) against the table without any index usage, so we pay here a couple times:
>> > 1. We scan without indexes - not all parts of the composite key are included to query, plus we need to filter everything except 30 records with order and distinct
>> > 2. After that we make another full scan for find 1 record or map_size records
>> >
>> > And I guess the situation becomes worse if you have a lot of tasks, even if we have a small table, we need to do ineffective operations.
>> >
>> > That how looks like Query Plan (please note without commit transaction DELETE operation doesn't have all information): https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
>> >
>> > In case if we do not clean up the table, we only use these operations:
>> > 1. SELECT single record by index
>> > 2. INSERT new record
>> > 3. DELETE old record(s), which were found by index.
>> >
>> > I have not done any real tests yet, only synthetic DAGs (so we should not consider to use any findings as totally truth): https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
>> > DAG with parallel tasks: degradation up to 2-3 times
>> > DAG with single map tasks: degradation up to 7-10 times
>> >
>> > I have a plan for more complex and more close to real use cases with Database which do not have network latency almost 0 as I have in my local.
>> > But I will not refuse if someone also does their tests with AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
>> >
>> > About deadlock we know that it exists at least in MySQL: https://github.com/apache/airflow/pull/18616
>> >
>> > > And the larger tables create problems during database migrations.
>> >
>> > That is a very good point, so if we found that problem only related to migrations we could:
>> > 1. Cleanup this table in migration
>> > 2. Add cli command to airflow db which could cleanup only rendered fields, so it would be user's choice cleanup or not before migration, do periodical maintenance or not
>> >
>> >
>> > ----
>> > Best Wishes
>> > Andrey Anshin
>> >
>> >
>> >
>> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com> wrote:
>> >>>
>> >>> Potentially it is a good idea to deprecate this option and recommend for users to set it to 0? WDYT? Maybe someone has already tried or investigated this?
>> >>
>> >>
>> >> The main reason for adding that cleanup was -- if you don't do that, you will have many rows, similar to the TaskInstance table. And the RenderedTIFields were mainly added for checking rendered TI fields on the Webserver only because after DAG Serialization, the webserver won't have access to DAG files.
>> >>
>> >> And the larger tables create problems during database migrations.
>> >>
>> >> Do you have any data for locks or performance degradation?
>> >>
>> >>
>> >>
>> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <an...@taragol.is> wrote:
>> >>>
>> >>> Greetings!
>> >>>
>> >>> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I probably found skeletons in the closet.
>> >>>
>> >>> Let's start from the beginning, initially I got this warning
>> >>>
>> >>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY columns added implicitly due to DISTINCT is deprecated and will be removed in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to explicitly include the appropriate columns in the columns clause (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
>> >>>
>> >>> "OK let's fix it!", I thought at first and started to investigate RenderedTaskInstanceFields model
>> >>>
>> >>> Skeleton #1:
>> >>>
>> >>> When I first time look on the code and comments it got me to thinking that part which keep only latest N Rendered Task Fields potentially could lead different performance degradation (Locks, Dead Locks, Data Bloating): see code https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
>> >>>
>> >>> Also this historical part (from Airflow 1.10.10) generate this SQL Statement (pg backend)
>> >>>
>> >>> DELETE FROM rendered_task_instance_fields
>> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
>> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
>> >>>   AND (
>> >>>     (
>> >>>       rendered_task_instance_fields.dag_id,
>> >>>       rendered_task_instance_fields.task_id,
>> >>>       rendered_task_instance_fields.run_id
>> >>>     ) NOT IN (
>> >>>       SELECT
>> >>>         anon_1.dag_id,
>> >>>         anon_1.task_id,
>> >>>         anon_1.run_id
>> >>>       FROM
>> >>>         (
>> >>>           SELECT DISTINCT
>> >>>             rendered_task_instance_fields.dag_id AS dag_id,
>> >>>             rendered_task_instance_fields.task_id AS task_id,
>> >>>             rendered_task_instance_fields.run_id AS run_id,
>> >>>             dag_run.execution_date AS execution_date
>> >>>           FROM rendered_task_instance_fields
>> >>>             JOIN dag_run ON rendered_task_instance_fields.dag_id = dag_run.dag_id
>> >>>             AND rendered_task_instance_fields.run_id = dag_run.run_id
>> >>>           WHERE
>> >>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
>> >>>             AND rendered_task_instance_fields.task_id = %(task_id_2) s
>> >>>           ORDER BY
>> >>>             dag_run.execution_date DESC
>> >>>           limit %(param_1) s
>> >>>         ) AS anon_1
>> >>>     )
>> >>>   )
>> >>>
>> >>> Which is especially not effective in PostgreSQL. When IN SUBQUERY could be easily transform internaly into SEMI-JOIN (aka EXISTS clause), but it is not working for NOT IN SUBQUERY because it is not transformed into ANTI JOIN (aka NOT EXISTS clause) even if it possible, see: https://commitfest.postgresql.org/27/2023/
>> >>>
>> >>> I didn't do any performance benchmarks yet but I guess if users set AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than default 30 it could improve performance and reduce number of DeadLocks, however the table size will increase but I think we don't do any maintenance job for other tables.
>> >>>
>> >>> Potentially it is a good idea to deprecate this option and recommend for users to set it to 0? WDYT? Maybe someone has already tried or investigated this?
>> >>>
>> >>>
>> >>> Skeleton #2:
>> >>>
>> >>> We have a k8s_pod_yaml field which is exclusively used by K8S executors.
>> >>>
>> >>> Should we also decouple this field as part of AIP-51?
>> >>>
>> >>> ----
>> >>> Best Wishes
>> >>> Andrey Anshin
>> >>>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Andrey Anshin <an...@taragol.is>.
I guess two things involved to reduce performance on this query through the
time: Dynamic Task Mapping and run_id instead of execution date.

I still personally think that changing the default value from 30 to 0 might
improve performance of multiple concurrent tasks, just because this query
does not run and there are no locks on multiple records/pages.

I do not have any proof (yet?) other than simple DAGs. I think that there
is some cross point exists when keeping this table growth worse rather than
cleanup for each TI run. But users have ability to cleanup table by
execute airflow
db clean which should improve performance again

And also there is interesting behavior with this query: if user already
have more that value specified by AIRFLOW__CORE__MAX_NUM_
RENDERED_TI_FIELDS_PER_TASK and tried run backfill than rendered templates
not written to table (or may be inserted and after that immediately
deleted), the same is valid for cleanup old tasks.

----
Best Wishes
*Andrey Anshin*



On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com> wrote:

> Yep. Agree this is not an efficient query and dynamic task mapping
> makes the effect much worse. Generally speaking, selecting "what
> should be left" and then deleting stuff where the key is "not in" is
> never an efficient way of running an sql query.  And the query not
> using index at all makes it rather terrible.
>
> I think we should not deprecate it though, but find a more efficient
> way of deleting the old keys. I think we could slightly denormalize
> RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
> to the RenderedTaskInstance table and that will be enough to optimise
> it.
>
> Then we could have either:
>
> * a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID,
> RUN_ID_EXECUTION_DATE
> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
>
> Probably the latter is better as I am not sure how < , > comparison
> looks like for composite B-TREE indexes when char + date columns are
> mixed. Also we could have hit the infamous MySQL index key length
> limit.
>
> Then deletion process would look roughly like:
>
> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
>
> I believe that would be fast, and it would use the B-TREE index
> features nicely (ordering support)
>
> J
>
> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <an...@taragol.is>
> wrote:
> >
> > First of all I want to highlight that this approach I guess worked well
> until Dynamic Task Mappings introduced.
> >
> > > The main reason for adding that cleanup was -- if you don't do that,
> you will have many rows, similar to the TaskInstance table
> >
> > The problem itself is not how big your table/indexes, rather then what
> kind of operation you run.
> >
> > > Do you have any data for locks or performance degradation?
> >
> > In this case if we try to clean up rendered_task_instance_fields table
> when a new TI is created/cleared we make almost two full/sequential scans
> (note: need to check) against the table without any index usage, so we pay
> here a couple times:
> > 1. We scan without indexes - not all parts of the composite key are
> included to query, plus we need to filter everything except 30 records with
> order and distinct
> > 2. After that we make another full scan for find 1 record or map_size
> records
> >
> > And I guess the situation becomes worse if you have a lot of tasks, even
> if we have a small table, we need to do ineffective operations.
> >
> > That how looks like Query Plan (please note without commit transaction
> DELETE operation doesn't have all information):
> https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
> >
> > In case if we do not clean up the table, we only use these operations:
> > 1. SELECT single record by index
> > 2. INSERT new record
> > 3. DELETE old record(s), which were found by index.
> >
> > I have not done any real tests yet, only synthetic DAGs (so we should
> not consider to use any findings as totally truth):
> https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
> > DAG with parallel tasks: degradation up to 2-3 times
> > DAG with single map tasks: degradation up to 7-10 times
> >
> > I have a plan for more complex and more close to real use cases with
> Database which do not have network latency almost 0 as I have in my local.
> > But I will not refuse if someone also does their tests with
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
> >
> > About deadlock we know that it exists at least in MySQL:
> https://github.com/apache/airflow/pull/18616
> >
> > > And the larger tables create problems during database migrations.
> >
> > That is a very good point, so if we found that problem only related to
> migrations we could:
> > 1. Cleanup this table in migration
> > 2. Add cli command to airflow db which could cleanup only rendered
> fields, so it would be user's choice cleanup or not before migration, do
> periodical maintenance or not
> >
> >
> > ----
> > Best Wishes
> > Andrey Anshin
> >
> >
> >
> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com> wrote:
> >>>
> >>> Potentially it is a good idea to deprecate this option and recommend
> for users to set it to 0? WDYT? Maybe someone has already tried or
> investigated this?
> >>
> >>
> >> The main reason for adding that cleanup was -- if you don't do that,
> you will have many rows, similar to the TaskInstance table. And the
> RenderedTIFields were mainly added for checking rendered TI fields on the
> Webserver only because after DAG Serialization, the webserver won't have
> access to DAG files.
> >>
> >> And the larger tables create problems during database migrations.
> >>
> >> Do you have any data for locks or performance degradation?
> >>
> >>
> >>
> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <an...@taragol.is>
> wrote:
> >>>
> >>> Greetings!
> >>>
> >>> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I
> probably found skeletons in the closet.
> >>>
> >>> Let's start from the beginning, initially I got this warning
> >>>
> >>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY
> columns added implicitly due to DISTINCT is deprecated and will be removed
> in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to
> explicitly include the appropriate columns in the columns clause
> (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
> >>>
> >>> "OK let's fix it!", I thought at first and started to investigate
> RenderedTaskInstanceFields model
> >>>
> >>> Skeleton #1:
> >>>
> >>> When I first time look on the code and comments it got me to thinking
> that part which keep only latest N Rendered Task Fields potentially could
> lead different performance degradation (Locks, Dead Locks, Data Bloating):
> see code
> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
> >>>
> >>> Also this historical part (from Airflow 1.10.10) generate this SQL
> Statement (pg backend)
> >>>
> >>> DELETE FROM rendered_task_instance_fields
> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
> >>>   AND (
> >>>     (
> >>>       rendered_task_instance_fields.dag_id,
> >>>       rendered_task_instance_fields.task_id,
> >>>       rendered_task_instance_fields.run_id
> >>>     ) NOT IN (
> >>>       SELECT
> >>>         anon_1.dag_id,
> >>>         anon_1.task_id,
> >>>         anon_1.run_id
> >>>       FROM
> >>>         (
> >>>           SELECT DISTINCT
> >>>             rendered_task_instance_fields.dag_id AS dag_id,
> >>>             rendered_task_instance_fields.task_id AS task_id,
> >>>             rendered_task_instance_fields.run_id AS run_id,
> >>>             dag_run.execution_date AS execution_date
> >>>           FROM rendered_task_instance_fields
> >>>             JOIN dag_run ON rendered_task_instance_fields.dag_id =
> dag_run.dag_id
> >>>             AND rendered_task_instance_fields.run_id = dag_run.run_id
> >>>           WHERE
> >>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
> >>>             AND rendered_task_instance_fields.task_id = %(task_id_2) s
> >>>           ORDER BY
> >>>             dag_run.execution_date DESC
> >>>           limit %(param_1) s
> >>>         ) AS anon_1
> >>>     )
> >>>   )
> >>>
> >>> Which is especially not effective in PostgreSQL. When IN SUBQUERY
> could be easily transform internaly into SEMI-JOIN (aka EXISTS clause), but
> it is not working for NOT IN SUBQUERY because it is not transformed into
> ANTI JOIN (aka NOT EXISTS clause) even if it possible, see:
> https://commitfest.postgresql.org/27/2023/
> >>>
> >>> I didn't do any performance benchmarks yet but I guess if users set
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than default
> 30 it could improve performance and reduce number of DeadLocks, however the
> table size will increase but I think we don't do any maintenance job for
> other tables.
> >>>
> >>> Potentially it is a good idea to deprecate this option and recommend
> for users to set it to 0? WDYT? Maybe someone has already tried or
> investigated this?
> >>>
> >>>
> >>> Skeleton #2:
> >>>
> >>> We have a k8s_pod_yaml field which is exclusively used by K8S
> executors.
> >>>
> >>> Should we also decouple this field as part of AIP-51?
> >>>
> >>> ----
> >>> Best Wishes
> >>> Andrey Anshin
> >>>
>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Jarek Potiuk <ja...@potiuk.com>.
Yep. Agree this is not an efficient query and dynamic task mapping
makes the effect much worse. Generally speaking, selecting "what
should be left" and then deleting stuff where the key is "not in" is
never an efficient way of running an sql query.  And the query not
using index at all makes it rather terrible.

I think we should not deprecate it though, but find a more efficient
way of deleting the old keys. I think we could slightly denormalize
RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
to the RenderedTaskInstance table and that will be enough to optimise
it.

Then we could have either:

* a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID,
RUN_ID_EXECUTION_DATE
* or maybe even regular HASH index on DAG_ID, TASK_ID and separate
B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE

Probably the latter is better as I am not sure how < , > comparison
looks like for composite B-TREE indexes when char + date columns are
mixed. Also we could have hit the infamous MySQL index key length
limit.

Then deletion process would look roughly like:

1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
<MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date

I believe that would be fast, and it would use the B-TREE index
features nicely (ordering support)

J

On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <an...@taragol.is> wrote:
>
> First of all I want to highlight that this approach I guess worked well until Dynamic Task Mappings introduced.
>
> > The main reason for adding that cleanup was -- if you don't do that, you will have many rows, similar to the TaskInstance table
>
> The problem itself is not how big your table/indexes, rather then what kind of operation you run.
>
> > Do you have any data for locks or performance degradation?
>
> In this case if we try to clean up rendered_task_instance_fields table when a new TI is created/cleared we make almost two full/sequential scans (note: need to check) against the table without any index usage, so we pay here a couple times:
> 1. We scan without indexes - not all parts of the composite key are included to query, plus we need to filter everything except 30 records with order and distinct
> 2. After that we make another full scan for find 1 record or map_size records
>
> And I guess the situation becomes worse if you have a lot of tasks, even if we have a small table, we need to do ineffective operations.
>
> That how looks like Query Plan (please note without commit transaction DELETE operation doesn't have all information): https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
>
> In case if we do not clean up the table, we only use these operations:
> 1. SELECT single record by index
> 2. INSERT new record
> 3. DELETE old record(s), which were found by index.
>
> I have not done any real tests yet, only synthetic DAGs (so we should not consider to use any findings as totally truth): https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
> DAG with parallel tasks: degradation up to 2-3 times
> DAG with single map tasks: degradation up to 7-10 times
>
> I have a plan for more complex and more close to real use cases with Database which do not have network latency almost 0 as I have in my local.
> But I will not refuse if someone also does their tests with AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
>
> About deadlock we know that it exists at least in MySQL: https://github.com/apache/airflow/pull/18616
>
> > And the larger tables create problems during database migrations.
>
> That is a very good point, so if we found that problem only related to migrations we could:
> 1. Cleanup this table in migration
> 2. Add cli command to airflow db which could cleanup only rendered fields, so it would be user's choice cleanup or not before migration, do periodical maintenance or not
>
>
> ----
> Best Wishes
> Andrey Anshin
>
>
>
> On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com> wrote:
>>>
>>> Potentially it is a good idea to deprecate this option and recommend for users to set it to 0? WDYT? Maybe someone has already tried or investigated this?
>>
>>
>> The main reason for adding that cleanup was -- if you don't do that, you will have many rows, similar to the TaskInstance table. And the RenderedTIFields were mainly added for checking rendered TI fields on the Webserver only because after DAG Serialization, the webserver won't have access to DAG files.
>>
>> And the larger tables create problems during database migrations.
>>
>> Do you have any data for locks or performance degradation?
>>
>>
>>
>> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <an...@taragol.is> wrote:
>>>
>>> Greetings!
>>>
>>> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I probably found skeletons in the closet.
>>>
>>> Let's start from the beginning, initially I got this warning
>>>
>>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY columns added implicitly due to DISTINCT is deprecated and will be removed in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to explicitly include the appropriate columns in the columns clause (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
>>>
>>> "OK let's fix it!", I thought at first and started to investigate RenderedTaskInstanceFields model
>>>
>>> Skeleton #1:
>>>
>>> When I first time look on the code and comments it got me to thinking that part which keep only latest N Rendered Task Fields potentially could lead different performance degradation (Locks, Dead Locks, Data Bloating): see code https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
>>>
>>> Also this historical part (from Airflow 1.10.10) generate this SQL Statement (pg backend)
>>>
>>> DELETE FROM rendered_task_instance_fields
>>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
>>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
>>>   AND (
>>>     (
>>>       rendered_task_instance_fields.dag_id,
>>>       rendered_task_instance_fields.task_id,
>>>       rendered_task_instance_fields.run_id
>>>     ) NOT IN (
>>>       SELECT
>>>         anon_1.dag_id,
>>>         anon_1.task_id,
>>>         anon_1.run_id
>>>       FROM
>>>         (
>>>           SELECT DISTINCT
>>>             rendered_task_instance_fields.dag_id AS dag_id,
>>>             rendered_task_instance_fields.task_id AS task_id,
>>>             rendered_task_instance_fields.run_id AS run_id,
>>>             dag_run.execution_date AS execution_date
>>>           FROM rendered_task_instance_fields
>>>             JOIN dag_run ON rendered_task_instance_fields.dag_id = dag_run.dag_id
>>>             AND rendered_task_instance_fields.run_id = dag_run.run_id
>>>           WHERE
>>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
>>>             AND rendered_task_instance_fields.task_id = %(task_id_2) s
>>>           ORDER BY
>>>             dag_run.execution_date DESC
>>>           limit %(param_1) s
>>>         ) AS anon_1
>>>     )
>>>   )
>>>
>>> Which is especially not effective in PostgreSQL. When IN SUBQUERY could be easily transform internaly into SEMI-JOIN (aka EXISTS clause), but it is not working for NOT IN SUBQUERY because it is not transformed into ANTI JOIN (aka NOT EXISTS clause) even if it possible, see: https://commitfest.postgresql.org/27/2023/
>>>
>>> I didn't do any performance benchmarks yet but I guess if users set AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than default 30 it could improve performance and reduce number of DeadLocks, however the table size will increase but I think we don't do any maintenance job for other tables.
>>>
>>> Potentially it is a good idea to deprecate this option and recommend for users to set it to 0? WDYT? Maybe someone has already tried or investigated this?
>>>
>>>
>>> Skeleton #2:
>>>
>>> We have a k8s_pod_yaml field which is exclusively used by K8S executors.
>>>
>>> Should we also decouple this field as part of AIP-51?
>>>
>>> ----
>>> Best Wishes
>>> Andrey Anshin
>>>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Andrey Anshin <an...@taragol.is>.
First of all I want to highlight that this approach I guess worked
well until Dynamic Task Mappings introduced.

> The main reason for adding that cleanup was -- if you don't do that, you
will have many rows, similar to the TaskInstance table

The problem itself is not how big your table/indexes, rather then what kind
of operation you run.

> Do you have any data for locks or performance degradation?

In this case if we try to clean up rendered_task_instance_fields table when
a new TI is created/cleared we make almost two full/sequential scans
(*note: need
to check*) against the table without any index usage, so we pay here a
couple times:
1. We scan without indexes - not all parts of the composite key are
included to query, plus we need to filter everything except 30 records with
order and distinct
2. After that we make another full scan for find 1 record or map_size
records

And I guess the situation becomes worse if you have a lot of tasks, even if
we have a small table, we need to do ineffective operations.

That how looks like Query Plan (please note without commit transaction
DELETE operation doesn't have all information):
https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b

In case if we do not clean up the table, we only use these operations:
1. SELECT single record by index
2. INSERT new record
3. DELETE old record(s), which were found by index.

I have not done any real tests yet, only synthetic DAGs (*so we should not
consider to use any findings as totally truth*):
https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
<https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480>
DAG with parallel tasks: degradation up to 2-3 times
DAG with single map tasks: degradation up to 7-10 times

I have a plan for more complex and more close to real use cases with
Database which do not have network latency almost 0 as I have in my local.
But I will not refuse if someone also does their tests with
AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.

About deadlock we know that it exists at least in MySQL:
https://github.com/apache/airflow/pull/18616

> And the larger tables create problems during database migrations.

That is a very good point, so if we found that problem only related to
migrations we could:
1. Cleanup this table in migration
2. Add cli command to airflow db which could cleanup only rendered fields,
so it would be user's choice cleanup or not before migration, do periodical
maintenance or not


----
Best Wishes
*Andrey Anshin*



On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <ka...@gmail.com> wrote:

> Potentially it is a good idea to deprecate this option and recommend for
>> users to set it to 0? WDYT? Maybe someone has already tried or investigated
>> this?
>
>
> The main reason for adding that cleanup was -- if you don't do that, you
> will have many rows, similar to the TaskInstance table. And the
> RenderedTIFields were mainly added for checking rendered TI fields on the
> Webserver only because after DAG Serialization, the webserver won't have
> access to DAG files.
>
> And the larger tables create problems during database migrations.
>
> Do you have any data for locks or performance degradation?
>
>
>
> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <an...@taragol.is>
> wrote:
>
>> Greetings!
>>
>> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I
>> probably found skeletons in the closet.
>>
>> Let's start from the beginning, initially I got this warning
>>
>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY
>> columns added implicitly due to DISTINCT is deprecated and will be removed
>> in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to
>> explicitly include the appropriate columns in the columns clause
>> (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
>>
>> "OK let's fix it!", I thought at first and started to investigate
>> RenderedTaskInstanceFields model
>>
>> *Skeleton #1:*
>>
>> When I first time look on the code and comments it got me to thinking
>> that part which keep only latest N Rendered Task Fields potentially
>> could lead different performance degradation (Locks, Dead Locks, Data
>> Bloating): see code
>> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
>>
>> Also this historical part (from Airflow 1.10.10) generate this SQL
>> Statement (pg backend)
>>
>> DELETE FROM rendered_task_instance_fields
>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
>>   AND (
>>     (
>>       rendered_task_instance_fields.dag_id,
>>       rendered_task_instance_fields.task_id,
>>       rendered_task_instance_fields.run_id
>>     ) NOT IN (
>>       SELECT
>>         anon_1.dag_id,
>>         anon_1.task_id,
>>         anon_1.run_id
>>       FROM
>>         (
>>           SELECT DISTINCT
>>             rendered_task_instance_fields.dag_id AS dag_id,
>>             rendered_task_instance_fields.task_id AS task_id,
>>             rendered_task_instance_fields.run_id AS run_id,
>>             dag_run.execution_date AS execution_date
>>           FROM rendered_task_instance_fields
>>             JOIN dag_run ON rendered_task_instance_fields.dag_id =
>> dag_run.dag_id
>>             AND rendered_task_instance_fields.run_id = dag_run.run_id
>>           WHERE
>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
>>             AND rendered_task_instance_fields.task_id = %(task_id_2) s
>>           ORDER BY
>>             dag_run.execution_date DESC
>>           limit %(param_1) s
>>         ) AS anon_1
>>     )
>>   )
>>
>> Which is especially not effective in PostgreSQL. When IN SUBQUERY could
>> be easily transform internaly into SEMI-JOIN (aka EXISTS clause), but it is
>> not working for NOT IN SUBQUERY because it is not transformed into ANTI
>> JOIN (aka NOT EXISTS clause) even if it possible, see:
>> https://commitfest.postgresql.org/27/2023/
>>
>> I didn't do any performance benchmarks yet but I guess if users set
>> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than
>> default 30 it could improve performance and reduce number of DeadLocks,
>> however the table size will increase but I think we don't do any
>> maintenance job for other tables.
>>
>> Potentially it is a good idea to deprecate this option and recommend for
>> users to set it to 0? WDYT? Maybe someone has already tried or investigated
>> this?
>>
>>
>> *Skeleton #2:*
>>
>> We have a k8s_pod_yaml field which is exclusively used by K8S executors.
>>
>> Should we also decouple this field as part of AIP-51?
>>
>> ----
>> Best Wishes
>> *Andrey Anshin*
>>
>>

Re: [Discussion] Deprecate auto cleanup RenderedTaskInstanceFields and decouple k8s_pod_yaml

Posted by Kaxil Naik <ka...@gmail.com>.
>
> Potentially it is a good idea to deprecate this option and recommend for
> users to set it to 0? WDYT? Maybe someone has already tried or investigated
> this?


The main reason for adding that cleanup was -- if you don't do that, you
will have many rows, similar to the TaskInstance table. And the
RenderedTIFields were mainly added for checking rendered TI fields on the
Webserver only because after DAG Serialization, the webserver won't have
access to DAG files.

And the larger tables create problems during database migrations.

Do you have any data for locks or performance degradation?



On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <an...@taragol.is>
wrote:

> Greetings!
>
> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I
> probably found skeletons in the closet.
>
> Let's start from the beginning, initially I got this warning
>
> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY
> columns added implicitly due to DISTINCT is deprecated and will be removed
> in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to
> explicitly include the appropriate columns in the columns clause
> (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
>
> "OK let's fix it!", I thought at first and started to investigate
> RenderedTaskInstanceFields model
>
> *Skeleton #1:*
>
> When I first time look on the code and comments it got me to thinking that
> part which keep only latest N Rendered Task Fields potentially could lead
> different performance degradation (Locks, Dead Locks, Data Bloating): see
> code
> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
>
> Also this historical part (from Airflow 1.10.10) generate this SQL
> Statement (pg backend)
>
> DELETE FROM rendered_task_instance_fields
> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
>   AND (
>     (
>       rendered_task_instance_fields.dag_id,
>       rendered_task_instance_fields.task_id,
>       rendered_task_instance_fields.run_id
>     ) NOT IN (
>       SELECT
>         anon_1.dag_id,
>         anon_1.task_id,
>         anon_1.run_id
>       FROM
>         (
>           SELECT DISTINCT
>             rendered_task_instance_fields.dag_id AS dag_id,
>             rendered_task_instance_fields.task_id AS task_id,
>             rendered_task_instance_fields.run_id AS run_id,
>             dag_run.execution_date AS execution_date
>           FROM rendered_task_instance_fields
>             JOIN dag_run ON rendered_task_instance_fields.dag_id =
> dag_run.dag_id
>             AND rendered_task_instance_fields.run_id = dag_run.run_id
>           WHERE
>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
>             AND rendered_task_instance_fields.task_id = %(task_id_2) s
>           ORDER BY
>             dag_run.execution_date DESC
>           limit %(param_1) s
>         ) AS anon_1
>     )
>   )
>
> Which is especially not effective in PostgreSQL. When IN SUBQUERY could be
> easily transform internaly into SEMI-JOIN (aka EXISTS clause), but it is
> not working for NOT IN SUBQUERY because it is not transformed into ANTI
> JOIN (aka NOT EXISTS clause) even if it possible, see:
> https://commitfest.postgresql.org/27/2023/
>
> I didn't do any performance benchmarks yet but I guess if users set
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than
> default 30 it could improve performance and reduce number of DeadLocks,
> however the table size will increase but I think we don't do any
> maintenance job for other tables.
>
> Potentially it is a good idea to deprecate this option and recommend for
> users to set it to 0? WDYT? Maybe someone has already tried or investigated
> this?
>
>
> *Skeleton #2:*
>
> We have a k8s_pod_yaml field which is exclusively used by K8S executors.
>
> Should we also decouple this field as part of AIP-51?
>
> ----
> Best Wishes
> *Andrey Anshin*
>
>