You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/09 07:48:05 UTC
[GitHub] [airflow] gudata opened a new issue #15745: Task is in "running" state while the celery has raised exception
gudata opened a new issue #15745:
URL: https://github.com/apache/airflow/issues/15745
**Apache Airflow version**: 2.0.2, 2.0.1
**Environment**:
- **Cloud provider or hardware configuration**: aws
- **OS** (e.g. from /etc/os-release):Amazon Linux
- **Kernel** (e.g. `uname -a`): Linux 4.14.225-169.362.amzn2.x86_64 #1 SMP Mon Mar 22 20:14:50 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
- **Install tools**:
- **Others**:
**What happened**:
We run a dag to process one year of data. It starts with 16 working tasks, then they become 15 ..14..10 ...
We see that some tasks are labelled as queued but in celery, they are marked as failed.
We opened [discussion](https://github.com/apache/airflow/discussions/15722) but it looks more like a bug.
**What you expected to happen**: I expect when the task failed in celery this to be bubbled to airflow and the task to be rescheduled.
We have DAG with 2 steps which runs on a dedicated queue/container (sensor_prediction)
```
start = DummyOperator(task_id='start')
t1 = ETLOperator(task_id='aggregate',
client_config=client_config,
queue='some_docker_container_name',
dag=dag)
start >> t1
```
it runs OK for a couple of minutes, and at some point, it gets stuck between start and t1.
- start is labelled "success"
- t1 is labelled "queued"
When I checked the flower WEB UI there is an error for that task and it is marked as "FAILURE"
```
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/celery/app/trace.py", line 412, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/celery/app/trace.py", line 704, in __protected_call__
return self.run(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 87, in execute_command
_execute_in_fork(command_to_exec)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 98, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())
airflow.exceptions.AirflowException: Celery command failed on host: 192.168.96.3
```
The code which is failing is
```
def _execute_in_fork(command_to_exec: CommandType) -> None:
pid = os.fork()
if pid:
# In parent, wait for the child
pid, ret = os.waitpid(pid, 0)
if ret == 0:
return
raise AirflowException('Celery command failed on host: ' + get_hostname()) <<<
from airflow.sentry import Sentry
ret = 1
```
We can't see any other log messages.
On docker stats, we see that we have 2/3 memory free.
![Selection_048](https://user-images.githubusercontent.com/25951/117476841-f8766b00-af65-11eb-9b25-a3a85c61df62.png)
![Selection_047](https://user-images.githubusercontent.com/25951/117476854-fb715b80-af65-11eb-860b-135fd1e622c0.png)
docker-compose:
```
---
version: '3'
x-airflow-image:
&airflow-image
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
x-airflow-common:
&airflow-common
profiles: ["common"]
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: "postgres+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}"
AIRFLOW__CELERY__RESULT_BACKEND: "db+postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}"
AIRFLOW__CELERY__BROKER_URL: "redis://${REDIS_HOST}:${REDIS_PORT}/${REDIS_DATABASE_BROKER}"
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__WEBSERVER__BASE_URL: "${AIRFLOW__WEBSERVER__BASE_URL}"
AIRFLOW__CORE__DAGS_FOLDER: "${AIRFLOW__CORE__DAGS_FOLDER}"
AIRFLOW__CORE__HOSTNAME_CALLABLE: "${AIRFLOW__CORE__HOSTNAME_CALLABLE}"
AIRFLOW__CORE__FERNET_KEY: "${AIRFLOW__CORE__FERNET_KEY}"
AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL: "${AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL}"
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "${AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL}"
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: "${AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL}"
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: "${AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL}"
AIRFLOW_UID: "${AIRFLOW_UID}"
AIRFLOW_GID: "${AIRFLOW_GID}"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./config:/opt/airflow/config
- ./plugins:/opt/airflow/plugins
- ./seed:/opt/airflow/seed
- ./sensor_prediction:/opt/airflow/sensor_prediction
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on:
redis:
condition: service_healthy
services:
airflow-webserver:
<<: *airflow-common
<<: *airflow-image
command: webserver
ports:
- 8181:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 60s
timeout: 60s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
<<: *airflow-image
command: scheduler
restart: always
airflow-worker:
<<: *airflow-common
<<: *airflow-image
command: celery worker
restart: always
airflow-worker-superqueue:
<<: *airflow-common
<<: *airflow-image
command: celery worker --queues superqueue
restart: always
airflow-worker-sensor_prediction:
<<: *airflow-common
build:
context: images/sensor_prediction
dockerfile: Dockerfile
command: celery worker --queues sensor_prediction
restart: always
airflow-init:
<<: *airflow-common
<<: *airflow-image
command: version
environment:
<<: *airflow-common-env
<<: *airflow-image
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
flower:
<<: *airflow-common
<<: *airflow-image
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
redis:
image: redis:latest
# ports:
# - 6380:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
profiles: ["development", "common"]
postgres:
image: postgres
restart: "no"
profiles: ["development"]
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
```
airflow-image dockerfile:
```
FROM apache/airflow:2.0.2-python3.7
USER root
RUN /usr/local/bin/python -m pip install --upgrade pip
USER airflow
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ADD requirements.txt /opt/airflow/requirements.txt
RUN pip install --no-cache-dir --user -r requirements.txt
```
requirements.txt
```
numpy ==1.18.5
scipy ==1.6.1
scikit_learn ==0.24.1
tensorflow ==1.14.0
pymysql
pyarrow ==4.0.0
s3fs
fsspec
```
The queue container/image is the same.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] gudata commented on issue #15745: Task is in "running" state while the celery has raised exception
Posted by GitBox <gi...@apache.org>.
gudata commented on issue #15745:
URL: https://github.com/apache/airflow/issues/15745#issuecomment-837991710
When we sync the dags we first delete them, then copy.
So there is a moment when we don't have the dag on the disk. That's why the celery fails with
```
[2021-05-08 22:30:34,442: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/test.py
[2021-05-08 22:30:34,443: ERROR/ForkPoolWorker-15] Failed to execute task dag_id could not be found: point72_test. Either the dag did not exist or it failed to parse..
[2021-05-08 22:30:34,455: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[5c77d2c4-f6fa-4ce4-b520-939b3405722b] raised unexpected: AirflowException('Celery command failed on host: 192.168.96.3')
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/celery/app/trace.py", line 412, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/celery/app/trace.py", line 704, in __protected_call__
return self.run(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 87, in execute_command
_execute_in_fork(command_to_exec)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 98, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())
airflow.exceptions.AirflowException: Celery command failed on host: 192.168.96.3
```
It would be great if somehow that message can be visible not only n the docker logs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] gudata closed issue #15745: Task is in "running" state while the celery has raised exception
Posted by GitBox <gi...@apache.org>.
gudata closed issue #15745:
URL: https://github.com/apache/airflow/issues/15745
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org