You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Edgar Rodriguez (JIRA)" <ji...@apache.org> on 2017/09/21 17:30:02 UTC

[jira] [Updated] (AIRFLOW-1631) LocalExecutor does not maintain contract of unbound parallelism (0 value)

     [ https://issues.apache.org/jira/browse/AIRFLOW-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Edgar Rodriguez updated AIRFLOW-1631:
-------------------------------------
    Description: 
*Location*
{{airflow/executors/local_executor.py:LocalExecutor#start}}:
{code}
def start(self):
    self.queue = multiprocessing.JoinableQueue()
    self.result_queue = multiprocessing.Queue()
    self.workers = [
        LocalWorker(self.queue, self.result_queue)
        for _ in range(self.parallelism)
    ]

    for w in self.workers:
        w.start()
{code}

*Description*
When *{{PARALLELISM}}* configuration value is set to {{0}}, using local executor will stall computation since it won't create any workers. As described in base_executor:
{code}
:param parallelism: how many jobs should run at one time. Set to
            ``0`` for infinity
:type parallelism: int
{code}
Hence, this contract is not maintained in {{LocalExecutor}}.

*Remediation*
In the context of local executor, in theory if parallelism is unbounded, then every task submitted should run immediately in its own process, so we could spawn a process for each submitted task without using a worker queue. When the task is completed, the worker can be terminated.

  was:
*Location*
{{airflow/executors/local_executor.py:LocalExecutor#start}}:
{code}
def start(self):
    self.queue = multiprocessing.JoinableQueue()
    self.result_queue = multiprocessing.Queue()
    self.workers = [
        LocalWorker(self.queue, self.result_queue)
        for _ in range(self.parallelism)
    ]

    for w in self.workers:
        w.start()
{code}

*Description*
When *{{PARALLELISM}}* configuration value is set to 0, using local executor will stall computation since it won't create any workers. As described in base_executor:
{code}
:param parallelism: how many jobs should run at one time. Set to
            ``0`` for infinity
:type parallelism: int
{code}
Hence, this contract is not maintained in {{LocalExecutor}}.

*Remediation*
In the context of local executor, in theory if parallelism is unbounded, then every task submitted should run immediately in its own process, so we could spawn a process for each submitted task without using a worker queue. When the task is completed, the worker can be terminated.


> LocalExecutor does not maintain contract of unbound parallelism (0 value)
> -------------------------------------------------------------------------
>
>                 Key: AIRFLOW-1631
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1631
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: executor
>    Affects Versions: 1.8.1
>            Reporter: Edgar Rodriguez
>            Assignee: Edgar Rodriguez
>
> *Location*
> {{airflow/executors/local_executor.py:LocalExecutor#start}}:
> {code}
> def start(self):
>     self.queue = multiprocessing.JoinableQueue()
>     self.result_queue = multiprocessing.Queue()
>     self.workers = [
>         LocalWorker(self.queue, self.result_queue)
>         for _ in range(self.parallelism)
>     ]
>     for w in self.workers:
>         w.start()
> {code}
> *Description*
> When *{{PARALLELISM}}* configuration value is set to {{0}}, using local executor will stall computation since it won't create any workers. As described in base_executor:
> {code}
> :param parallelism: how many jobs should run at one time. Set to
>             ``0`` for infinity
> :type parallelism: int
> {code}
> Hence, this contract is not maintained in {{LocalExecutor}}.
> *Remediation*
> In the context of local executor, in theory if parallelism is unbounded, then every task submitted should run immediately in its own process, so we could spawn a process for each submitted task without using a worker queue. When the task is completed, the worker can be terminated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)