You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/13 18:20:32 UTC

[airflow] 12/38: Set Process title for Worker when using ``LocalExecutor`` (#16623)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 600309b5f623fa41203d982c88bfeb33f3ef51e9
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu Jun 24 09:18:52 2021 +0100

    Set Process title for Worker when using ``LocalExecutor`` (#16623)
    
    This has annoyed me for a long time. When using  ``LocalExecutor``, it was difficult to see which process is a worker as it just showed up as below -- which had same title as parent scheduler process. This PR/commit adds a title for idle workers and when a task is running it has the "command" that is running in the title, similar to our supervising process
    
    Before:
    
    ```
    root       124  0.0  0.0   6676  4636 pts/1    Ss   Jun23   0:00  \_ -bash
    root      1449  0.8  2.6 988356 326312 pts/1   Sl+  Jun23   0:16  |   \_ /usr/local/bin/python /usr/local/bin/airflow webserver
    root      1584  0.0  0.4 121068 56864 pts/1    S+   Jun23   0:01  |       \_ gunicorn: master [airflow-webserver]
    root      1587  0.6  2.5 986144 318712 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1588  0.6  2.5 984776 317672 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1589  0.6  2.5 985688 318148 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1590  0.6  2.5 985200 317776 pts/1   Sl+  Jun23   0:11  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root       128  0.0  0.0   6676  4552 pts/2    Ss   Jun23   0:00  \_ -bash
    root     13933 31.0  0.9 466596 117656 pts/2   S+   00:22   0:01      \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13941  0.0  0.7 466340 97988 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13942  3.2  0.8 1392072 100136 pts/2  Sl+  00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13950  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13952  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13955  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13958  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13962  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13966  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13969  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13975  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13979  6.5  0.8 466596 99956 pts/2    S    00:22   0:00          \_ airflow scheduler -- DagFileProcessorManager
    ```
    
    After (with no running tasks - idle workers):
    
    ```
    root       124  0.0  0.0   6676  4636 pts/1    Ss   Jun23   0:00  \_ -bash
    root      1449  0.8  2.6 988356 326312 pts/1   Sl+  Jun23   0:16  |   \_ /usr/local/bin/python /usr/local/bin/airflow webserver
    root      1584  0.0  0.4 121068 56864 pts/1    S+   Jun23   0:01  |       \_ gunicorn: master [airflow-webserver]
    root      1587  0.6  2.5 985752 318184 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1588  0.6  2.5 984776 317672 pts/1   Sl+  Jun23   0:11  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1589  0.6  2.5 985688 318148 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1590  0.6  2.5 985200 317776 pts/1   Sl+  Jun23   0:11  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root       128  0.0  0.0   6676  4552 pts/2    Ss   Jun23   0:00  \_ -bash
    root     13237 25.7  0.9 466596 117692 pts/2   S+   00:20   0:02      \_ airflow worker -- LocalExecutor
    root     13245  0.1  0.7 466340 97804 pts/2    S+   00:20   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13246  2.1  0.8 1318340 100104 pts/2  Sl+  00:20   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13254  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13256  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13259  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13263  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13267  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13271  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13274  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13276  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13282  4.1  0.8 466596 99952 pts/2    S    00:20   0:00          \_ airflow scheduler -- DagFileProcessorManager
    ```
    
    After (with running tasks):
    ```
    root@a7c8aa590704:/opt/airflow# ps auxf
    USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
    root      6434  0.0  0.0   6652  4584 pts/3    Ss   00:01   0:00 /bin/bash
    root     19250  0.0  0.0   9556  3064 pts/3    R+   00:39   0:00  \_ ps auxf
    root         1  0.0  0.0   2148   720 ?        Ss   Jun23   0:00 /usr/bin/dumb-init -- /entrypoint
    root         7  0.0  0.0   6656  4400 pts/0    Ss   Jun23   0:00 /bin/bash
    root       121  0.0  0.0   8220  3228 pts/0    S+   Jun23   0:00  \_ tmux
    root       101  0.0  0.0  15856  4272 ?        Ss   Jun23   0:00 /usr/sbin/sshd
    root       123  0.0  0.0  10176  5148 ?        Ss   Jun23   0:00 tmux
    root       124  0.0  0.0   6676  4636 pts/1    Ss   Jun23   0:00  \_ -bash
    root      1449  0.6  2.6 988356 326312 pts/1   Sl+  Jun23   0:20  |   \_ /usr/local/bin/python /usr/local/bin/airflow webserver
    root      1584  0.0  0.4 121068 56864 pts/1    S+   Jun23   0:01  |       \_ gunicorn: master [airflow-webserver]
    root      1587  0.4  2.5 986144 318712 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1588  0.4  2.5 984776 317672 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1589  0.4  2.5 985848 318600 pts/1   Sl+  Jun23   0:13  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1590  0.4  2.5 985628 318424 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root       128  0.0  0.0   6676  4552 pts/2    Ss   Jun23   0:00  \_ -bash
    root     19030 17.9  0.9 467108 118628 pts/2   S+   00:38   0:02      \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     19038  0.0  0.7 466084 97776 pts/2    S+   00:38   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     19039  1.4  0.8 1318084 99804 pts/2   Sl+  00:38   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     19047  0.0  0.8 466084 98692 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', '2021-06-24T00:39:06.539715+00:00', '--local', '
    root     19240 25.3  0.8 470820 104400 pts/2   S+   00:39   0:00          |   \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', '2021-06-24T00:39:06.539715+00:00', '--local', '--po
    root     19246  0.0  0.8 470956 103980 pts/2   S    00:39   0:00          |       \_ airflow task runner: example_bash_operator runme_2 2021-06-24T00:39:06.539715+00:00 91
    root     19049  0.1  0.8 466084 98696 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', '2021-06-24T00:39:06.539715+00:00', '--local', '
    root     19241 26.0  0.8 470824 104408 pts/2   S+   00:39   0:00          |   \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', '2021-06-24T00:39:06.539715+00:00', '--local', '--po
    root     19248  0.0  0.8 470824 103720 pts/2   S    00:39   0:00          |       \_ airflow task runner: example_bash_operator runme_1 2021-06-24T00:39:06.539715+00:00 93
    root     19052  0.1  0.8 466084 98760 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', '2021-06-24T00:39:06.539715+00:00', '--local', '
    root     19244 26.0  0.8 470824 104404 pts/2   S+   00:39   0:00          |   \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', '2021-06-24T00:39:06.539715+00:00', '--local', '--po
    root     19245  0.0  0.8 471212 104032 pts/2   S    00:39   0:00          |       \_ airflow task runner: example_bash_operator runme_0 2021-06-24T00:39:06.539715+00:00 90
    root     19056  0.1  0.8 466084 98760 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'this_will_skip', '2021-06-24T00:39:06.539715+00:00', '--lo
    root     19243 24.6  0.8 470824 104400 pts/2   S+   00:39   0:00          |   \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'this_will_skip', '2021-06-24T00:39:06.539715+00:00', '--local'
    root     19247  0.0  0.8 470956 103712 pts/2   S    00:39   0:00          |       \_ airflow task runner: example_bash_operator this_will_skip 2021-06-24T00:39:06.539715+00:00 92
    root     19057  0.1  0.8 466084 98760 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2021-06-24T00:39:06.539715+00:00', '--loc
    root     19242 26.6  0.8 470824 104404 pts/2   R+   00:39   0:00          |   \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2021-06-24T00:39:06.539715+00:00', '--local',
    root     19249  0.0  0.8 470824 101976 pts/2   S    00:39   0:00          |       \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2021-06-24T00:39:06.539715+00:00', '--loc
    root     19062  0.0  0.8 466084 98300 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor
    root     19066  0.0  0.8 466084 98300 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor
    root     19069  0.0  0.8 466084 98300 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor
    root     19075  2.7  0.8 466596 100144 pts/2   S    00:38   0:00          \_ airflow scheduler -- DagFileProcessorManager
    ```
    
    Once the worker is done executing a task, the worker is renamed back to `airflow worker -- LocalExecutor`
    
    (cherry picked from commit c8a628abf484f0bd9805f44dd37e284d2b5ee7db)
---
 airflow/executors/local_executor.py | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index ab9356f..7789f28 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -63,6 +63,7 @@ class LocalWorkerBase(Process, LoggingMixin):
         # We know we've just started a new process, so lets disconnect from the metadata db now
         settings.engine.pool.dispose()
         settings.engine.dispose()
+        setproctitle("airflow worker -- LocalExecutor")
         return super().run()
 
     def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
@@ -76,12 +77,15 @@ class LocalWorkerBase(Process, LoggingMixin):
             return
 
         self.log.info("%s running %s", self.__class__.__name__, command)
+        setproctitle(f"airflow worker -- LocalExecutor: {command}")
         if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
             state = self._execute_work_in_subprocess(command)
         else:
             state = self._execute_work_in_fork(command)
 
         self.result_queue.put((key, state))
+        # Remove the command since the worker is done executing the task
+        setproctitle("airflow worker -- LocalExecutor")
 
     def _execute_work_in_subprocess(self, command: CommandType) -> str:
         try: