You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "JELEZ (JIRA)" <ji...@apache.org> on 2017/09/28 18:17:00 UTC

[jira] [Updated] (AIRFLOW-1653) distributed schedulers with dedicated dags folders

     [ https://issues.apache.org/jira/browse/AIRFLOW-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

JELEZ updated AIRFLOW-1653:
---------------------------
    Description: 
The number of tasks that we schedule reaches many thousands per job and we run multiple jobs. We are thinking about running multiple distributed schedulers each processing its own dedicated segment of the dags directory in its airflow.cfg. Using celery executor.

One thing I am concerned with is the section of jobs.py where  orphaned tasks are reset see at the end of the thread. This section is executed on start of each scheduler instance.

My question is - does this section create some sort of conflict? What is the impact of it in this setup?

{quote}        self.logger.info("Resetting state for orphaned tasks")
        grab orphaned tasks and make sure to reset their state
        active_runs = DagRun.find(
            state=State.RUNNING,
            external_trigger=False,
            session=session,
            no_backfills=True,
        )
        for dr in active_runs:
            self.logger.info("Resetting {} {}".format(dr.dag_id,
                                                      dr.execution_date))
            self.reset_state_for_orphaned_tasks(dr, session=session)
{quote}

  was:
The number of tasks that we schedule reaches many thousands per job and we run multiple jobs. We are thinking about running multiple distributed schedulers each processing its own dedicated segment of the dags directory in its airflow.cfg. Using celery executor.

One thing I am concerned with is the section of jobs.py where  orphaned tasks are reset see at the end of the thread. This section is executed on start of each scheduler instance.

My question is - does this section create some sort of conflict? What is the impact of it in this setup?

{quote}        self.logger.info("Resetting state for orphaned tasks")
        # grab orphaned tasks and make sure to reset their state
        active_runs = DagRun.find(
            state=State.RUNNING,
            external_trigger=False,
            session=session,
            no_backfills=True,
        )
        for dr in active_runs:
            self.logger.info("Resetting {} {}".format(dr.dag_id,
                                                      dr.execution_date))
            self.reset_state_for_orphaned_tasks(dr, session=session)
{quote}


> distributed schedulers with dedicated dags folders
> --------------------------------------------------
>
>                 Key: AIRFLOW-1653
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1653
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: scheduler
>    Affects Versions: 1.8.1
>         Environment: Linux
>            Reporter: JELEZ
>
> The number of tasks that we schedule reaches many thousands per job and we run multiple jobs. We are thinking about running multiple distributed schedulers each processing its own dedicated segment of the dags directory in its airflow.cfg. Using celery executor.
> One thing I am concerned with is the section of jobs.py where  orphaned tasks are reset see at the end of the thread. This section is executed on start of each scheduler instance.
> My question is - does this section create some sort of conflict? What is the impact of it in this setup?
> {quote}        self.logger.info("Resetting state for orphaned tasks")
>         grab orphaned tasks and make sure to reset their state
>         active_runs = DagRun.find(
>             state=State.RUNNING,
>             external_trigger=False,
>             session=session,
>             no_backfills=True,
>         )
>         for dr in active_runs:
>             self.logger.info("Resetting {} {}".format(dr.dag_id,
>                                                       dr.execution_date))
>             self.reset_state_for_orphaned_tasks(dr, session=session)
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)