You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/09 07:48:05 UTC

[GitHub] [airflow] gudata opened a new issue #15745: Task is in "running" state while the celery has raised exception

gudata opened a new issue #15745:
URL: https://github.com/apache/airflow/issues/15745


   **Apache Airflow version**: 2.0.2, 2.0.1
   **Environment**: 
   
   - **Cloud provider or hardware configuration**: aws
   - **OS** (e.g. from /etc/os-release):Amazon Linux
   - **Kernel** (e.g. `uname -a`): Linux 4.14.225-169.362.amzn2.x86_64 #1 SMP Mon Mar 22 20:14:50 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
   
   - **Install tools**:
   - **Others**:
   
   **What happened**:
   
   We run a dag to process one year of data. It starts with 16 working tasks, then they become 15 ..14..10 ... 
   We see that some tasks are labelled as queued but in celery, they are marked as failed.
   We opened [discussion](https://github.com/apache/airflow/discussions/15722) but it looks more like a bug.
   
   **What you expected to happen**: I expect when the task failed in celery this to be bubbled to airflow and the task to be rescheduled. 
   
   
   We have DAG with 2 steps which runs on a dedicated queue/container (sensor_prediction)
   ```
   start = DummyOperator(task_id='start')
   
   t1 = ETLOperator(task_id='aggregate',
   client_config=client_config,
   queue='some_docker_container_name',
   dag=dag)
   
   start >> t1
   ```
   
   it runs OK for a couple of minutes, and at some point, it gets stuck between start and t1.
   
   - start is labelled "success"
   - t1 is labelled "queued"
   
   When I checked the flower WEB UI there is an error for that task and it is marked as "FAILURE"
   
   ```
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.7/site-packages/celery/app/trace.py", line 412, in trace_task
       R = retval = fun(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.7/site-packages/celery/app/trace.py", line 704, in __protected_call__
       return self.run(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 87, in execute_command
       _execute_in_fork(command_to_exec)
     File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 98, in _execute_in_fork
       raise AirflowException('Celery command failed on host: ' + get_hostname())
   airflow.exceptions.AirflowException: Celery command failed on host: 192.168.96.3
   ```
   
   The code which is failing is 
   ```
   def _execute_in_fork(command_to_exec: CommandType) -> None:
       pid = os.fork()
       if pid:
           # In parent, wait for the child
           pid, ret = os.waitpid(pid, 0)
           if ret == 0:
               return
   
           raise AirflowException('Celery command failed on host: ' + get_hostname()) <<<
   
       from airflow.sentry import Sentry
   
       ret = 1
   ```
   
   We can't see any other log messages.
   
   On docker stats, we see that we have 2/3 memory free.
   
   
   ![Selection_048](https://user-images.githubusercontent.com/25951/117476841-f8766b00-af65-11eb-9b25-a3a85c61df62.png)
   ![Selection_047](https://user-images.githubusercontent.com/25951/117476854-fb715b80-af65-11eb-860b-135fd1e622c0.png)
   
   docker-compose:
   ```
   ---
   version: '3'
   
   x-airflow-image:
     &airflow-image
     image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
   
   x-airflow-common:
     &airflow-common
     profiles: ["common"]
   
     environment:
       &airflow-common-env
       AIRFLOW__CORE__EXECUTOR: CeleryExecutor
       AIRFLOW__CORE__SQL_ALCHEMY_CONN: "postgres+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}"
       AIRFLOW__CELERY__RESULT_BACKEND: "db+postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}"
       AIRFLOW__CELERY__BROKER_URL: "redis://${REDIS_HOST}:${REDIS_PORT}/${REDIS_DATABASE_BROKER}"
       AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
       AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
   
       AIRFLOW__WEBSERVER__BASE_URL: "${AIRFLOW__WEBSERVER__BASE_URL}"
       AIRFLOW__CORE__DAGS_FOLDER: "${AIRFLOW__CORE__DAGS_FOLDER}"
       AIRFLOW__CORE__HOSTNAME_CALLABLE: "${AIRFLOW__CORE__HOSTNAME_CALLABLE}"
       AIRFLOW__CORE__FERNET_KEY: "${AIRFLOW__CORE__FERNET_KEY}"
       AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL: "${AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL}"
       AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "${AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL}"
       AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: "${AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL}"
       AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: "${AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL}"
       AIRFLOW_UID: "${AIRFLOW_UID}"
       AIRFLOW_GID: "${AIRFLOW_GID}"
   
     volumes:
       - ./dags:/opt/airflow/dags
       - ./logs:/opt/airflow/logs
       - ./config:/opt/airflow/config
       - ./plugins:/opt/airflow/plugins
       - ./seed:/opt/airflow/seed
       - ./sensor_prediction:/opt/airflow/sensor_prediction
   
   
   
     user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
     depends_on:
       redis:
         condition: service_healthy
   
   services:
     airflow-webserver:
       <<: *airflow-common
       <<: *airflow-image
       command: webserver
       ports:
         - 8181:8080
       healthcheck:
         test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
         interval: 60s
         timeout: 60s
         retries: 5
       restart: always
   
     airflow-scheduler:
       <<: *airflow-common
       <<: *airflow-image
       command: scheduler
       restart: always
   
     airflow-worker:
       <<: *airflow-common
       <<: *airflow-image
       command: celery worker
       restart: always
   
     airflow-worker-superqueue:
       <<: *airflow-common
       <<: *airflow-image
       command: celery worker --queues superqueue
       restart: always
   
   
     airflow-worker-sensor_prediction:
       <<: *airflow-common
       build:
         context: images/sensor_prediction
         dockerfile: Dockerfile
       command: celery worker --queues sensor_prediction
   
       restart: always
   
   
     airflow-init:
       <<: *airflow-common
       <<: *airflow-image
       command: version
       environment:
         <<: *airflow-common-env
         <<: *airflow-image
         _AIRFLOW_DB_UPGRADE: 'true'
         _AIRFLOW_WWW_USER_CREATE: 'true'
         _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
         _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
   
     flower:
       <<: *airflow-common
       <<: *airflow-image
       command: celery flower
       ports:
         - 5555:5555
       healthcheck:
         test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
         interval: 10s
         timeout: 10s
         retries: 5
       restart: always
   
     redis:
       image: redis:latest
       # ports:
       #   - 6380:6379
       healthcheck:
         test: ["CMD", "redis-cli", "ping"]
         interval: 5s
         timeout: 30s
         retries: 50
       restart: always
       profiles: ["development", "common"]
   
     postgres:
       image: postgres
       restart: "no"
       profiles: ["development"]
       environment:
           - POSTGRES_USER=airflow
           - POSTGRES_PASSWORD=airflow
           - POSTGRES_DB=airflow
   ```
   
   airflow-image dockerfile:
   ```
   FROM apache/airflow:2.0.2-python3.7
   
   USER root
   RUN /usr/local/bin/python -m pip install --upgrade pip
   
   USER airflow
   
   ENV PYTHONDONTWRITEBYTECODE 1
   ENV PYTHONUNBUFFERED 1
   
   ADD requirements.txt /opt/airflow/requirements.txt
   RUN pip install --no-cache-dir --user -r requirements.txt
   ```
   requirements.txt 
   ```
   numpy ==1.18.5
   scipy ==1.6.1
   scikit_learn ==0.24.1
   tensorflow ==1.14.0
   pymysql
   pyarrow ==4.0.0 
   s3fs 
   fsspec
   ```
   The queue container/image is the same.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] gudata commented on issue #15745: Task is in "running" state while the celery has raised exception

Posted by GitBox <gi...@apache.org>.
gudata commented on issue #15745:
URL: https://github.com/apache/airflow/issues/15745#issuecomment-837991710


   When we sync the dags we first delete them, then copy.
   So there is a moment when we don't have the dag on the disk. That's why the celery fails with
   
   ```
   [2021-05-08 22:30:34,442: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/test.py
   [2021-05-08 22:30:34,443: ERROR/ForkPoolWorker-15] Failed to execute task dag_id could not be found: point72_test. Either the dag did not exist or it failed to parse..
   [2021-05-08 22:30:34,455: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[5c77d2c4-f6fa-4ce4-b520-939b3405722b] raised unexpected: AirflowException('Celery command failed on host: 192.168.96.3')
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.7/site-packages/celery/app/trace.py", line 412, in trace_task
       R = retval = fun(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.7/site-packages/celery/app/trace.py", line 704, in __protected_call__
       return self.run(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 87, in execute_command
       _execute_in_fork(command_to_exec)
     File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 98, in _execute_in_fork
       raise AirflowException('Celery command failed on host: ' + get_hostname())
   airflow.exceptions.AirflowException: Celery command failed on host: 192.168.96.3
   ```
   
   It would be great if somehow that message can be visible not only n the docker logs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] gudata closed issue #15745: Task is in "running" state while the celery has raised exception

Posted by GitBox <gi...@apache.org>.
gudata closed issue #15745:
URL: https://github.com/apache/airflow/issues/15745


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org