You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Ash Berlin-Taylor <as...@apache.org> on 2019/12/13 15:42:22 UTC

Test fixtures for repeatable (performance) tests?

As part of my performance testing work I created a simple driver script, one of the steps it did was to "reset" the DB (at least for a given dag) so that the code was operating on the same rows in the same state each time -- I ran all my tests about 10 times to get mean and variance.

My "reset" function looks like this:


--

def reset_dag(dag, num_runs=NUM_RUNS):
    DR = airflow.models.DagRun
    TI = airflow.models.TaskInstance
    TF = airflow.models.TaskFail
    dag_id = dag.dag_id
    with create_session() as session:
        dag.sync_to_db(session=session)
        session.query(DR).filter(DR.dag_id == dag_id).delete()
        session.query(TI).filter(TI.dag_id == dag_id).delete()
        session.query(TF).filter(TF.dag_id == dag_id).delete()

        airflow.models.DagModel.get_dagmodel(dag.dag_id).set_is_paused(is_paused=False)

        next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks))

        for _ in range(num_runs):
            next_run = dag.create_dagrun(
                run_id=DR.ID_PREFIX + next_run_date.isoformat(),
                execution_date=next_run_date,
                start_date=timezone.utcnow(),
                state=state.State.RUNNING,
                external_trigger=False
            )
            next_run_date = dag.following_schedule(next_run_date)
        return next_run
--

This works well enough (at least for the case where I only want N dag runs created), but it has a couple of limitations:

- It's *slow*. 54% of the test runtime is from resetting between tests (not included in measurement, just affects me waiting)
- It's not that flexible; for instance if I wanted to put some task instances in certain states it's not easy.

Its main advantage is that it uses our model code, so is less likely to get out of date as the model evolves over time.

Does anyone have an opinions about test fixtures (good or bad), espeically with the somewhat complex interaction we have between DAG<->DagRun<->TI<->TaskFail etc (without any explicit FKs) and recommendations for modules to use or avoid?

-ash


Re: Test fixtures for repeatable (performance) tests?

Posted by Kaxil Naik <ka...@gmail.com>.
https://github.com/FactoryBoy/factory_boy looks good for creating Fixtures.

Good example and article about it here:
https://medium.com/@vittorio.camisa/agile-database-integration-tests-with-python-sqlalchemy-and-factory-boy-6824e8fe33a1

PS: I have tried or tested it yet

On Fri, Dec 13, 2019 at 3:42 PM Ash Berlin-Taylor <as...@apache.org> wrote:

> As part of my performance testing work I created a simple driver script,
> one of the steps it did was to "reset" the DB (at least for a given dag) so
> that the code was operating on the same rows in the same state each time --
> I ran all my tests about 10 times to get mean and variance.
>
> My "reset" function looks like this:
>
>
> --
>
> def reset_dag(dag, num_runs=NUM_RUNS):
>     DR = airflow.models.DagRun
>     TI = airflow.models.TaskInstance
>     TF = airflow.models.TaskFail
>     dag_id = dag.dag_id
>     with create_session() as session:
>         dag.sync_to_db(session=session)
>         session.query(DR).filter(DR.dag_id == dag_id).delete()
>         session.query(TI).filter(TI.dag_id == dag_id).delete()
>         session.query(TF).filter(TF.dag_id == dag_id).delete()
>
>
> airflow.models.DagModel.get_dagmodel(dag.dag_id).set_is_paused(is_paused=False)
>
>         next_run_date = dag.normalize_schedule(dag.start_date or
> min(t.start_date for t in dag.tasks))
>
>         for _ in range(num_runs):
>             next_run = dag.create_dagrun(
>                 run_id=DR.ID_PREFIX + next_run_date.isoformat(),
>                 execution_date=next_run_date,
>                 start_date=timezone.utcnow(),
>                 state=state.State.RUNNING,
>                 external_trigger=False
>             )
>             next_run_date = dag.following_schedule(next_run_date)
>         return next_run
> --
>
> This works well enough (at least for the case where I only want N dag runs
> created), but it has a couple of limitations:
>
> - It's *slow*. 54% of the test runtime is from resetting between tests
> (not included in measurement, just affects me waiting)
> - It's not that flexible; for instance if I wanted to put some task
> instances in certain states it's not easy.
>
> Its main advantage is that it uses our model code, so is less likely to
> get out of date as the model evolves over time.
>
> Does anyone have an opinions about test fixtures (good or bad), espeically
> with the somewhat complex interaction we have between
> DAG<->DagRun<->TI<->TaskFail etc (without any explicit FKs) and
> recommendations for modules to use or avoid?
>
> -ash
>
>