You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Jarek Potiuk (Jira)" <ji...@apache.org> on 2020/01/19 23:37:06 UTC

[jira] [Closed] (AIRFLOW-1653) distributed schedulers with dedicated dags folders

     [ https://issues.apache.org/jira/browse/AIRFLOW-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jarek Potiuk closed AIRFLOW-1653.
---------------------------------
    Resolution: Won't Fix

I am closing some old issues that are not relevant any more. Please let me know if you want to reopen it.

> distributed schedulers with dedicated dags folders
> --------------------------------------------------
>
>                 Key: AIRFLOW-1653
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1653
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: scheduler
>    Affects Versions: 1.8.1
>         Environment: Linux
>            Reporter: JELEZ
>            Priority: Major
>
> The number of tasks that we schedule reaches many thousands per job and we run multiple jobs. We are thinking about running multiple distributed schedulers each processing its own dedicated segment of the dags directory in its airflow.cfg. Using celery executor.
> One thing I am concerned with is the section of jobs.py where  orphaned tasks are reset see at the end of the thread. This section is executed on start of each scheduler instance.
> My question is - does this section create some sort of conflict? What is the impact of it in this setup?
> {quote}        self.logger.info("Resetting state for orphaned tasks")
>         grab orphaned tasks and make sure to reset their state
>         active_runs = DagRun.find(
>             state=State.RUNNING,
>             external_trigger=False,
>             session=session,
>             no_backfills=True,
>         )
>         for dr in active_runs:
>             self.logger.info("Resetting {} {}".format(dr.dag_id,
>                                                       dr.execution_date))
>             self.reset_state_for_orphaned_tasks(dr, session=session)
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)