You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF subversion and git services (JIRA)" <ji...@apache.org> on 2017/10/17 18:40:01 UTC

[jira] [Commented] (AIRFLOW-1631) LocalExecutor does not maintain contract of unbound parallelism (0 value)

    [ https://issues.apache.org/jira/browse/AIRFLOW-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208134#comment-16208134 ] 

ASF subversion and git services commented on AIRFLOW-1631:
----------------------------------------------------------

Commit cdfced3248c7f14b639919c093f4f3042deb754b in incubator-airflow's branch refs/heads/master from [~erod]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=cdfced3 ]

[AIRFLOW-1631] Fix local executor unbound parallelism

Before, if unlimited parallelism was used passing
`0` for the
parallelism value, the local executor would stall
execution since no
worker was being created, violating the
BaseExecutor contract on the
parallelism option.

Now, if unbound parallelism is used, processes
will be created on demand
for each task submitted for execution.

Closes #2658 from edgarRd/erod-localexecutor-fix


> LocalExecutor does not maintain contract of unbound parallelism (0 value)
> -------------------------------------------------------------------------
>
>                 Key: AIRFLOW-1631
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1631
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: executor
>    Affects Versions: 1.8.1
>            Reporter: Edgar Rodriguez
>            Assignee: Edgar Rodriguez
>
> *Location*
> {{airflow/executors/local_executor.py:LocalExecutor#start}}:
> {code}
> def start(self):
>     self.queue = multiprocessing.JoinableQueue()
>     self.result_queue = multiprocessing.Queue()
>     self.workers = [
>         LocalWorker(self.queue, self.result_queue)
>         for _ in range(self.parallelism)
>     ]
>     for w in self.workers:
>         w.start()
> {code}
> *Description*
> When *{{PARALLELISM}}* configuration value is set to {{0}}, using local executor will stall computation since it won't create any workers. As described in base_executor:
> {code}
> :param parallelism: how many jobs should run at one time. Set to
>             ``0`` for infinity
> :type parallelism: int
> {code}
> Hence, this contract is not maintained in {{LocalExecutor}}.
> *Remediation*
> In the context of local executor, in theory if parallelism is unbounded, then every task submitted should run immediately in its own process, so we could spawn a process for each submitted task without using a worker queue. When the task is completed, the worker can be terminated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)