You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/22 02:46:47 UTC

[GitHub] [airflow] MrManicotti opened a new issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

MrManicotti opened a new issue #13824:
URL: https://github.com/apache/airflow/issues/13824


   
   **Apache Airflow version**:
   2.0.0
   
   **Environment**:
   Docker Stack
   Celery Executor w/ Redis
   3 Workers, Scheduler + Webserver
   Cloudwatch remote config turned on
   
   **What happened**:
   Following execution of a DAG when using cloudwatch integration, the state of the Task Instance is being externally set, causing SIGTERM/SIGKILL signals to be sent. This causes error logs in Workers, which is a nuisance for alert monitoring, and seems to prevent workers f
   
   ```
   *** Reading remote log from Cloudwatch log_group: dev1-airflow-task log_stream: xxxx/task/2021-01-21T17_59_19.643994+00_00/1.log.
   Dependencies all met for <TaskInstance: xxxx.task  2021-01-21T17:59:19.643994+00:00 [queued]>
   Dependencies all met for <TaskInstance: xxxx.task  2021-01-21T17:59:19.643994+00:00 [queued]>
   --------------------------------------------------------------------------------
   Starting attempt 1 of 1
   --------------------------------------------------------------------------------
   Executing <Task(TaskVerificationOperator): task > on 2021-01-21T17:59:19.643994+00:00
   Started process 654 to run task
   Running <TaskInstance: xxxx.task  2021-01-21T17:59:19.643994+00:00 [running]> on host 88f99fbc97a8
   Exporting the following env vars:
   AIRFLOW_CTX_DAG_EMAIL=xxxxxxxx
   AIRFLOW_CTX_DAG_OWNER=xxxxxxxx
   AIRFLOW_CTX_DAG_ID=xxxxxxxx
   AIRFLOW_CTX_TASK_ID=xxxxxx
   AIRFLOW_CTX_EXECUTION_DATE=2021-01-21T17:59:19.643994+00:00
   AIRFLOW_CTX_DAG_RUN_ID=85
   Set new audit correlation_id xxxxxxxxxx-xxxxxx-xxxxxxxxx
   Using connection to: id: xxxxx. Host: xxxxxxx, Port: 5432, Schema: xxxxxx, Login: xxxxxx, Password: XXXXXXXX, extra: None
   Marking task as SUCCESS. dag_id=xxxxxx, task_id=xxxxxx, execution_date=20210121T175919, start_date=20210121T175936, end_date=20210121T175938
   1 downstream tasks scheduled from follow-on schedule check
   ```
   
   However following the completion of the DAG, the following is appended to the logs:
   ```
   State of this instance has been externally set to success. Terminating instance.
   Sending Signals.SIGTERM to GPID 654
   process psutil.Process(pid=654, name='xxxxx', status='sleeping', started='17:59:36') did not respond to SIGTERM. Trying SIGKILL
   Process psutil.Process(pid=654, name='xxxxx', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='17:59:36') (654) terminated with exit code Negsignal.SIGKILL
   Task exited with return code Negsignal.SIGKILL
   ```
   
   This is a problem, because it causes the following to appear in Worker logs:
   ```
   [2021-01-21 15:00:01,102: WARNING/ForkPoolWorker-8] Running <TaskInstance: xxxx.task 2021-01-21T14:00:00+00:00 [queued]> on host ip-172-31-3-210.ec2.internal
   ...
   [2021-01-21 15:00:06,599: ERROR/ForkPoolWorker-8] Failed to execute task Task received SIGTERM signal.
   ```
   
   **What you expected to happen**:
   No errors to appear in Worker logs, if this SIGTERM/SIGKILL is intended
   
   **How to reproduce it**:
   Use Airflow w/ Celery Executor and Cloudwatch Remote Logging
   
   
   **Anything else we need to know**:
   Occurs every time, every task in DAG
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1058386842


   @rafidka, I think you have misunderstood point 4. in my setup guide. I meant leave the default environment variables as is just change `AIRFLOW__CORE__LOAD_EXAMPLES` from 'true' -> 'false' and extend it new ones. I have edited that comment.
   
   
    `simple_dag.py` is  [same](https://github.com/apache/airflow/issues/13824#issuecomment-1012838195) as before.
   
   ```python
   """Sample DAG."""
   
   import time
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       "start_date": datetime(2020, 1, 1),
       "email": ["support@airflow.com"],
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 1,
       "retry_delay": timedelta(minutes=5),
   }
   
   
   def sleep() -> bool:
       """Sleep.
   
       Returns:
           bool: True
       """
       time.sleep(20)
       return True
   
   
   with DAG("simple_dag_1", default_args=default_args, schedule_interval="* * * * *", catchup=False) as dag:
       t1 = PythonOperator(task_id="sleep", python_callable=sleep)
   
   ```
     
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mtraynham edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
mtraynham edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-766093578


   One thing to note with this issue, I believe this is preventing workers from also consuming more work via a 60 second sleep at the end of every task.  We noticed that there was a ~30-40 second delay between 2 workers running tasks, as if all workers were busy doing something, but nothing was actually happening.  It seemed work had been queued, but was not being performed.
   
   After looking at the logs for a given task in CloudWatch (which has the added benefit of including times in the log), there's a 60 second sleep after SIGTERM to when the process is finally killed with SIGKILL.
   
   ```
   2021-01-23T15:00:02.682Z | Marking task as SUCCESS. dag_id=xxxxxxxx, task_id=xxxxxxx, execution_date=20210123T140000, start_date=20210123T150001, end_date=20210123T150002
   -- | --
     | 2021-01-23T15:00:02.715Z | 0 downstream tasks scheduled from follow-on schedule check
     | 2021-01-23T15:00:06.606Z | State of this instance has been externally set to success. Terminating instance.
     | 2021-01-23T15:00:06.607Z | Sending Signals.SIGTERM to GPID 155
     | 2021-01-23T15:01:06.617Z | process psutil.Process(pid=155, name='airflow task ru', status='sleeping', started='15:00:01') did not respond to SIGTERM. Trying SIGKILL
     | 2021-01-23T15:01:06.625Z | Process psutil.Process(pid=155, name='airflow task ru', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='15:00:01') (155) terminated with exit code Negsignal.SIGKILL
     | 2021-01-23T15:01:06.625Z | Task exited with return code Negsignal.SIGKILL
   ```
   
   I imagine it's the configuration parameter [`KILLED_TASK_CLEANUP_TIME`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#killed-task-cleanup-time) and it's default value of 60 that influences that sleep.
   
   https://github.com/apache/airflow/blob/master/airflow/utils/process_utils.py#L42-L52


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-870569793


   Set it to false and see if that fixes the problem. Sadly we missed that in 2.1.1, so it would have to wait for 2.1.2 which would be at least a few weeks.
   
   And if setting that config doesn't help help then there is something else at fault here.
   
   https://airflow.apache.org/docs/apache-airflow/2.0.2/configurations-ref.html#schedule-after-task-execution for reference.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-883591250


   Hi @ephraimbuddy 
   
   I still having the same issue. Logs are the same
   
   airflow.cfg
   ```bash
   [scheduler]
   dag_dir_list_interval = 60
   orphaned_tasks_check_interval = 84600
   parsing_processes = 4
   run_duration = 41460
   schedule_after_task_execution = False
   statsd_host = airflow-statsd
   statsd_on = True
   statsd_port = 9125
   statsd_prefix = airflow
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055701303


   I did some dive deep on this issue and root caused it.
   
   **TL;DR** It is related to `watchtower` and has been fixed in version 2.0.0 and later versions. Airflow 2.2.4 now uses that version (see #19907). So, if you are using it, you should be good. If not, then you need to force install `watchtower` version 2.0.0 or later. Notice, however, that `watchtower` made some changes to the `CloudWatchLogHandler` `__init__ ` method so you need to update the relevant code.
    
   ---
   
   The issue is related to a combination of three factors: forking + threading + logging. This combination can lead to a deadlock when logs are being flushed after a task finishes execution. This means that the StandardTaskRunner will be stuck at [this line](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/task/task_runner/standard_task_runner.py#L92). Now, since the task has actually finished (thus its state is success), but the process didn't yet exit (and will never since it is in a deadlock state), the `heartbeat_callback` will end up [thinking](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/jobs/local_task_job.py#L186) that the task state was set externally and issue the following warning and then SIGTERM the task:
   
   ```
   State of this instance has been externally set to success. Terminating instance.
   ```
   
   ---
   
   ## References
   
   - https://github.com/kislyuk/watchtower/pull/139
   - https://github.com/kislyuk/watchtower/issues/141
   - https://bugs.python.org/issue6721
   - https://bugs.python.org/issue40089
   - https://bugs.python.org/issue874900
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-870049246


   Maybe this is something the Amazon team can take a look at @subashcanapathy ? I think that's one of the obvious candidates that someone from AWS could help with since this is CloudWatch integration problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] density edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
density edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-848400425


   We're also experiencing this issue with ECSOperator and CloudWatch logging on Airflow 2.0.2. Changing `stopTimeout` in our ECS config didn't fix the issue and it doesn't seem to always set the DAG run to failed for us like it does for @Miksu82.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055701303


   I did some dive deep on this issue and root caused it.
   
   **TL;DR** It is related to `watchtower` and has been fixed in version 2.0.0 and later versions. Airflow 2.2.4 now uses that version (see #19907). So, if you are using it, you should be good. If not, then you need to force install `watchtower` version 2.0.0 or later. Notice, however, that `watchtower` made some changes to the `CloudWatchLogHandler` `__init__ ` method so you need to update the relevant code.
    
   ---
   
   The issue is related to a combination of three factors: forking + threading + logging. This combination can lead to a deadlock when logs are being flushed after a task finishes execution. This means that the StandardTaskRunner will be stuck at [this line](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/task/task_runner/standard_task_runner.py#L92). Now, since the task has actually finished (thus its state is success), but the process didn't yet exit (and will never since it is in a deadlock state), the `heartbeat_callback` will end up thinking that the task state was set externally and issue the following warning and then SIGTERM the task:
   
   ```
   State of this instance has been externally set to success. Terminating instance.
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] john-jac commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
john-jac commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-975750605


   Is is possible we're running into a CloudWatch quota? 
   
   Per the below docs `There is a quota of 5 requests per second per log stream. Additional requests are throttled. This quota can't be changed. `
   
   https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mtraynham edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
mtraynham edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-766093578


   One thing to note with this issue, I believe this is preventing workers from also consuming more work via a 60 second sleep at the end of every task.  We noticed that there was a ~30-40 second delay between 2 workers running tasks, as if all workers were busy doing something, but nothing was actually happening.  It seemed work had been queued, but was not being performed.
   
   After looking at the logs for a given task in CloudWatch (which has the added benefit of including times in the log), there's a 60 second sleep after SIGTERM to when the process is finally killed with SIGKILL.
   
   ```
   2021-01-23T15:00:02.682Z | Marking task as SUCCESS. dag_id=xxxxxxxx, task_id=xxxxxxx, execution_date=20210123T140000, start_date=20210123T150001, end_date=20210123T150002
   -- | --
     | 2021-01-23T15:00:02.715Z | 0 downstream tasks scheduled from follow-on schedule check
     | 2021-01-23T15:00:06.606Z | State of this instance has been externally set to success. Terminating instance.
     | 2021-01-23T15:00:06.607Z | Sending Signals.SIGTERM to GPID 155
     | 2021-01-23T15:01:06.617Z | process psutil.Process(pid=155, name='airflow task ru', status='sleeping', started='15:00:01') did not respond to SIGTERM. Trying SIGKILL
     | 2021-01-23T15:01:06.625Z | Process psutil.Process(pid=155, name='airflow task ru', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='15:00:01') (155) terminated with exit code Negsignal.SIGKILL
     | 2021-01-23T15:01:06.625Z | Task exited with return code Negsignal.SIGKILL
   ```
   
   I imagine it's the configuration parameter [`KILLED_TASK_CLEANUP_TIME`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#killed-task-cleanup-time) and it's default value of 60 that influences that sleep.
   
   https://github.com/apache/airflow/blob/master/airflow/utils/process_utils.py#L42-L52


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mtraynham edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
mtraynham edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-766093578


   One thing to note with this issue, I believe this is preventing workers from also consuming more work via a 60 second sleep at the end of every task.  We noticed that there was a ~30-40 second delay between 2 workers running tasks, as if all workers were busy doing something, but nothing was actually happening.  It seemed work had been queued, but was not being performed.
   
   After looking at the logs for a given task in CloudWatch (which has the added benefit of including times in the log), there's a 60 second sleep after SIGTERM to when the process is finally killed with SIGKILL.
   
   ```
   2021-01-23T15:00:02.682Z | Marking task as SUCCESS. dag_id=xxxxxxxx, task_id=xxxxxxx, execution_date=20210123T140000, start_date=20210123T150001, end_date=20210123T150002
   -- | --
     | 2021-01-23T15:00:02.715Z | 0 downstream tasks scheduled from follow-on schedule check
     | 2021-01-23T15:00:06.606Z | State of this instance has been externally set to success. Terminating instance.
     | 2021-01-23T15:00:06.607Z | Sending Signals.SIGTERM to GPID 155
     | 2021-01-23T15:01:06.617Z | process psutil.Process(pid=155, name='airflow task ru', status='sleeping', started='15:00:01') did not respond to SIGTERM. Trying SIGKILL
     | 2021-01-23T15:01:06.625Z | Process psutil.Process(pid=155, name='airflow task ru', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='15:00:01') (155) terminated with exit code Negsignal.SIGKILL
     | 2021-01-23T15:01:06.625Z | Task exited with return code Negsignal.SIGKILL
   
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-883431785


   @andormarkus can you set this:
   `AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL=84600` and let me know the result?
   
   If this worked for you, you should remove it when 2.1.3 is out


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055799980


   Hi @rafidka,
   
   I'm so sorry, I was looking for this warning in Kubernetes not in CloudWatch.
   
   Here are the warning in CloudWatch:
   <img width="1428" alt="Screen Shot 2022-03-01 at 20 43 43" src="https://user-images.githubusercontent.com/51825189/156237489-b40e4cd1-658c-42f7-ad8e-4b6f0bb4d11f.png">
   
   Here is an exported Log steam. In is very interesting: CloudWatch does not store the log level (info/warning/error),  just to log message. 
   ```csv
   timestamp,message
   1646162883080,Dependencies all met for <TaskInstance: simple_dag_1.sleep scheduled__2022-03-01T19:27:00+00:00 [queued]>
   1646162883380,Dependencies all met for <TaskInstance: simple_dag_1.sleep scheduled__2022-03-01T19:27:00+00:00 [queued]>
   1646162883380,"
   --------------------------------------------------------------------------------"
   1646162883380,Starting attempt 1 of 2
   1646162883380,"
   --------------------------------------------------------------------------------"
   1646162883612,Executing <Task(PythonOperator): sleep> on 2022-03-01 19:27:00+00:00
   1646162883618,Started process 163492 to run task
   1646162883624,"Running: ['airflow', 'tasks', 'run', 'simple_dag_1', 'sleep', 'scheduled__2022-03-01T19:27:00+00:00', '--job-id', '70863', '--raw', '--subdir', 'DAGS_FOLDER/simple_dag_1.py', '--cfg-path', '/tmp/tmpth9fhum_', '--error-file', '/tmp/tmp4e258sys']"
   1646162883625,Job 70863: Subtask sleep
   1646162883853,Running <TaskInstance: simple_dag_1.sleep scheduled__2022-03-01T19:27:00+00:00 [running]> on host airflow-worker-58b8d8789b-w7jwv
   1646162883926,"Exporting the following env vars:
   AIRFLOW_CTX_DAG_EMAIL=support@airflow.com
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=simple_dag_1
   AIRFLOW_CTX_TASK_ID=sleep
   AIRFLOW_CTX_EXECUTION_DATE=2022-03-01T19:27:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-03-01T19:27:00+00:00"
   1646162903946,Done. Returned value was: True
   1646162903969,"Marking task as SUCCESS. dag_id=simple_dag_1, task_id=sleep, execution_date=20220301T192700, start_date=20220301T192803, end_date=20220301T192823"
   1646162904149,State of this instance has been externally set to success. Terminating instance.
   1646162904152,Received SIGTERM. Terminating subprocesses.
   1646162904152,Sending Signals.SIGTERM to group 163492. PIDs of all processes in the group: [163492]
   1646162904152,Sending the signal Signals.SIGTERM to group 163492
   1646162904292,"Process psutil.Process(pid=163492, status='terminated', exitcode=1, started='19:28:03') (163492) terminated with exit code 1"
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055926552


   I just tried a sleep DAG (similar to yours) and it also succeeded on Airflow 2.2.4:
   
   ```
   [2022-03-01 14:25:43,323: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[9f11d899-b415-4541-8e4f-1221fa7b6b09] received
   [2022-03-01 14:25:43,394: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'sleep', 'execute_fn', 'scheduled__2022-03-01T22:24:29.637891+00:00', '--local', '--subdir', 'DAGS_FOLDER/sleep.py']
   [2022-03-01 14:25:43,394: INFO/ForkPoolWorker-16] Celery task ID: 9f11d899-b415-4541-8e4f-1221fa7b6b09
   [2022-03-01 14:25:43,438: INFO/ForkPoolWorker-16] Filling up the DagBag from /root/airflow/dags/sleep.py
   [2022-03-01 14:25:43,498: WARNING/ForkPoolWorker-16] Running <TaskInstance: sleep.execute_fn scheduled__2022-03-01T22:24:29.637891+00:00 [queued]> on host 1161269d3561
   [2022-03-01 14:25:54,463: INFO/ForkPoolWorker-16] Task airflow.executors.celery_executor.execute_command[9f11d899-b415-4541-8e4f-1221fa7b6b09] succeeded in 11.133019998000236s: None
   ```
   
   This is my DAG:
   
   ```Python
   import time
   
   from datetime import timedelta
   
   from airflow.decorators import dag, task
   from airflow.utils.dates import days_ago
   
   import os
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @dag(dag_id=DAG_ID, schedule_interval=timedelta(minutes=1), catchup=False, start_date=days_ago(0), tags=['test'])
   def sleep_dag():
       @task()
       def execute_fn():
           time.sleep(10)
   
       execute_fn_t = execute_fn()
   
   test_dag_d = sleep_dag()
   ```
   
   I suspect your setup have some issue (perhaps some stale configuration or package.) I would start clean or -even better- use Docker if you aren't already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-870563386






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-870034864






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055898977


   @andormarkus , what you are reporting above is exactly the symptoms I've seen when there is a problem with logging. Could you please do a `pip freeze` and check the reported watchtower version?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055926552


   I just tried a sleep DAG (similar to yours) and it also succeeded on Airflow 2.2.4:
   
   ```
   [2022-03-01 14:25:43,323: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[9f11d899-b415-4541-8e4f-1221fa7b6b09] received
   [2022-03-01 14:25:43,394: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'sleep', 'execute_fn', 'scheduled__2022-03-01T22:24:29.637891+00:00', '--local', '--subdir', 'DAGS_FOLDER/sleep.py']
   [2022-03-01 14:25:43,394: INFO/ForkPoolWorker-16] Celery task ID: 9f11d899-b415-4541-8e4f-1221fa7b6b09
   [2022-03-01 14:25:43,438: INFO/ForkPoolWorker-16] Filling up the DagBag from /root/airflow/dags/sleep.py
   [2022-03-01 14:25:43,498: WARNING/ForkPoolWorker-16] Running <TaskInstance: sleep.execute_fn scheduled__2022-03-01T22:24:29.637891+00:00 [queued]> on host 1161269d3561
   [2022-03-01 14:25:54,463: INFO/ForkPoolWorker-16] Task airflow.executors.celery_executor.execute_command[9f11d899-b415-4541-8e4f-1221fa7b6b09] succeeded in 11.133019998000236s: None
   ```
   
   This is my DAG:
   
   ```Python
   import time
   
   from datetime import timedelta
   
   from airflow.decorators import dag, task
   from airflow.utils.dates import days_ago
   from sqlalchemy import text
   
   import os
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @dag(dag_id=DAG_ID, schedule_interval=timedelta(minutes=1), catchup=False, start_date=days_ago(0), tags=['test'])
   def sleep_dag():
       @task()
       def execute_fn():
           time.sleep(10)
   
       execute_fn_t = execute_fn()
   
   test_dag_d = sleep_dag()
   ```
   
   I suspect your setup have some issue (perhaps some stale configuration or package.) I would start clean or -even better- use Docker if you aren't already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed issue #13824:
URL: https://github.com/apache/airflow/issues/13824


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-884683321


   When I turned on remote logs to CloudWatch it is causing stability issues as well.
   We are running this DAG hourly.
   
   ![Screen Shot 2021-07-22 at 08 21 27](https://user-images.githubusercontent.com/51825189/126597489-95ea6aaa-d52d-497e-843f-f6227aea32d4.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-870049246


   Maybe this is something the Amazon team can take a look at @subashcanapathy ? I think that's one of the obvious candidates that someone from AWS could help with since this is CloudWatch integration problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mtraynham commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
mtraynham commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-766093578


   One thing to note with this issue, I believe this is preventing workers from also consuming more work via a 60 second sleep at the end of every task.  We noticed that there was a ~30-40 second delay between 2 workers running tasks, as if all workers were busy doing something.  It seemed work had been queued, but was not being performed.
   
   After looking at the logs within for a given task in CloudWatch (which has the added benefit of including times in the log), there's a 60 sleep after SIGTERM to when the process is finally killed with SIGKILL.
   
   ```
   2021-01-23T15:00:02.682Z | Marking task as SUCCESS. dag_id=xxxxxxxx, task_id=xxxxxxx, execution_date=20210123T140000, start_date=20210123T150001, end_date=20210123T150002
   -- | --
     | 2021-01-23T15:00:02.715Z | 0 downstream tasks scheduled from follow-on schedule check
     | 2021-01-23T15:00:06.606Z | State of this instance has been externally set to success. Terminating instance.
     | 2021-01-23T15:00:06.607Z | Sending Signals.SIGTERM to GPID 155
     | 2021-01-23T15:01:06.617Z | process psutil.Process(pid=155, name='airflow task ru', status='sleeping', started='15:00:01') did not respond to SIGTERM. Trying SIGKILL
     | 2021-01-23T15:01:06.625Z | Process psutil.Process(pid=155, name='airflow task ru', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='15:00:01') (155) terminated with exit code Negsignal.SIGKILL
     | 2021-01-23T15:01:06.625Z | Task exited with return code Negsignal.SIGKILL
   
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-870563386


   This might be fixed by https://github.com/apache/airflow/pull/16289 -- one thing to try to see if that is the case is to disable the mini scheduler run by setting `AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055908220


   I just Airflow 2.2.4 and it is working fine for me. On Airflow 2.2.4:
   
   ```
   [2022-03-01 14:05:19,784: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'print', 'execute_fn', 'scheduled__2022-03-01T22:04:03.337259+00:00', '--local', '--subdir', 'DAGS_FOLDER/print.py']
   [2022-03-01 14:05:19,784: INFO/ForkPoolWorker-16] Celery task ID: fd4cedea-bbaa-4d92-a536-e19aa0dfc34d
   [2022-03-01 14:05:19,831: INFO/ForkPoolWorker-16] Filling up the DagBag from /root/airflow/dags/print.py
   [2022-03-01 14:05:19,889: WARNING/ForkPoolWorker-16] Running <TaskInstance: print.execute_fn scheduled__2022-03-01T22:04:03.337259+00:00 [queued]> on host 000464fbb146
   [2022-03-01 14:05:21,598: INFO/ForkPoolWorker-16] Task airflow.executors.celery_executor.execute_command[fd4cedea-bbaa-4d92-a536-e19aa0dfc34d] succeeded in 1.8837955820199568s: None
   ```
   
   On Airflow 2.2.3:
   
   ```
   [2022-03-01 14:07:55,928: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'print', 'execute_fn', 'scheduled__2022-03-01T22:06:39.485677+00:00', '--local', '--subdir', 'DAGS_FOLDER/print.py']
   [2022-03-01 14:07:55,928: INFO/ForkPoolWorker-16] Celery task ID: c646cb26-e28a-4b33-8346-0e4ca4060232
   [2022-03-01 14:07:55,978: INFO/ForkPoolWorker-16] Filling up the DagBag from /root/airflow/dags/print.py
   [2022-03-01 14:07:56,049: WARNING/ForkPoolWorker-16] Running <TaskInstance: print.execute_fn scheduled__2022-03-01T22:06:39.485677+00:00 [queued]> on host e2196b74de7f
   [2022-03-01 14:08:01,439: ERROR/ForkPoolWorker-16] Failed to execute task Task received SIGTERM signal.
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 121, in _execute_in_fork
       args.func(args)
     File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
       return f(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
       run_job.run()
     File "/usr/local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/usr/local/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line 103, in _execute
       self.task_runner.start()
     File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
       self.process = self._start_by_fork()
     File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 97, in _start_by_fork
       logging.shutdown()
     File "/usr/lib64/python3.7/logging/__init__.py", line 2036, in shutdown
       h.flush()
     File "/usr/local/lib/python3.7/site-packages/watchtower/__init__.py", line 297, in flush
       q.join()
     File "/usr/lib64/python3.7/queue.py", line 89, in join
       self.all_tasks_done.wait()
     File "/usr/lib64/python3.7/threading.py", line 296, in wait
       waiter.acquire()
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1410, in signal_handler
       raise AirflowException("Task received SIGTERM signal")
   airflow.exceptions.AirflowException: Task received SIGTERM signal
   ```
   
   For reference, this is the DAG I am testing with:
   
   ```python
   from datetime import timedelta
   
   from airflow.decorators import dag, task
   from airflow.utils.dates import days_ago
   from sqlalchemy import text
   
   import os
   from datetime import datetime
   
   NUM_LINES = 10000
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @dag(dag_id=DAG_ID, schedule_interval=timedelta(minutes=1), catchup=False, start_date=days_ago(0), tags=['test'])
   def print_dag():
       @task()
       def execute_fn():
           for i in range(0, NUM_LINES):
               print(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
   
       execute_fn_t = execute_fn()
   
   test_dag_d = print_dag()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-973286690


   Hi @ephraimbuddy 
   
   I have tested it with Airflow 2.2.2 and the problem is still existing.
   
   Example logs 1:
   ```shell
   2021-11-18 21:19:01,060: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[53ca9d3f-b59e-4a3f-9f21-009c32db5473] received
   [2021-11-18 21:19:01,082: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'simple_dag', 'sleep', 'scheduled__2021-11-18T21:18:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/simple_dag.py']
   [2021-11-18 21:19:01,082: INFO/ForkPoolWorker-16] Celery task ID: 53ca9d3f-b59e-4a3f-9f21-009c32db5473
   [2021-11-18 21:19:01,123: INFO/ForkPoolWorker-16] Filling up the DagBag from /opt/airflow/dags/repo/dags/simple_dag.py
   [2021-11-18 21:19:01,234: WARNING/ForkPoolWorker-16] Running <TaskInstance: simple_dag.sleep scheduled__2021-11-18T21:18:00+00:00 [queued]> on host airflow-worker-5997488b78-t2ftx
   [2021-11-18 21:19:17,631: INFO/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[4ccd30db-274f-4f2d-a750-3df8ec6e856c] succeeded in 76.68239335156977s: None
   [2021-11-18 21:19:17,852: ERROR/ForkPoolWorker-16] Failed to execute task Task received SIGTERM signal.
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/executors/celery_executor.py", line 121, in _execute_in_fork
       args.func(args)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
       run_job.run()
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 103, in _execute
       self.task_runner.start()
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
       self.process = self._start_by_fork()
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 97, in _start_by_fork
       logging.shutdown()
     File "/usr/local/lib/python3.9/logging/__init__.py", line 2141, in shutdown
       h.flush()
     File "/home/airflow/.local/lib/python3.9/site-packages/watchtower/__init__.py", line 297, in flush
       q.join()
     File "/usr/local/lib/python3.9/queue.py", line 90, in join
       self.all_tasks_done.wait()
     File "/usr/local/lib/python3.9/threading.py", line 312, in wait
       waiter.acquire()
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1413, in signal_handler
       raise AirflowException("Task received SIGTERM signal")
   airflow.exceptions.AirflowException: Task received SIGTERM signal
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mtraynham edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
mtraynham edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-766093578


   One thing to note with this issue, I believe this is preventing workers from also consuming more work via a 60 second sleep at the end of every task.  We noticed that there was a ~30-40 second delay between 2 workers running tasks, as if all workers were busy doing something, but nothing was actually happening.  It seemed work had been queued, but was not being performed.
   
   After looking at the logs for a given task in CloudWatch (which has the added benefit of including times in the log), there's a 60 sleep after SIGTERM to when the process is finally killed with SIGKILL.
   
   ```
   2021-01-23T15:00:02.682Z | Marking task as SUCCESS. dag_id=xxxxxxxx, task_id=xxxxxxx, execution_date=20210123T140000, start_date=20210123T150001, end_date=20210123T150002
   -- | --
     | 2021-01-23T15:00:02.715Z | 0 downstream tasks scheduled from follow-on schedule check
     | 2021-01-23T15:00:06.606Z | State of this instance has been externally set to success. Terminating instance.
     | 2021-01-23T15:00:06.607Z | Sending Signals.SIGTERM to GPID 155
     | 2021-01-23T15:01:06.617Z | process psutil.Process(pid=155, name='airflow task ru', status='sleeping', started='15:00:01') did not respond to SIGTERM. Trying SIGKILL
     | 2021-01-23T15:01:06.625Z | Process psutil.Process(pid=155, name='airflow task ru', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='15:00:01') (155) terminated with exit code Negsignal.SIGKILL
     | 2021-01-23T15:01:06.625Z | Task exited with return code Negsignal.SIGKILL
   
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1058363246


   @andormarkus , to avoid making assumptions, could you please share the exact code of `simple_dag.py` you used in your reproduction?
   
   Also, it is worth noting that based on the setup you mentioned above, the local executor will be used instead of the Celery executor. The testing I done was on the Celery executor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055908220


   I just Airflow 2.2.4 and it is working fine for me. On Airflow 2.2.4:
   
   ```
   [2022-03-01 14:05:19,784: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'print', 'execute_fn', 'scheduled__2022-03-01T22:04:03.337259+00:00', '--local', '--subdir', 'DAGS_FOLDER/print.py']
   [2022-03-01 14:05:19,784: INFO/ForkPoolWorker-16] Celery task ID: fd4cedea-bbaa-4d92-a536-e19aa0dfc34d
   [2022-03-01 14:05:19,831: INFO/ForkPoolWorker-16] Filling up the DagBag from /root/airflow/dags/print.py
   [2022-03-01 14:05:19,889: WARNING/ForkPoolWorker-16] Running <TaskInstance: print.execute_fn scheduled__2022-03-01T22:04:03.337259+00:00 [queued]> on host 000464fbb146
   [2022-03-01 14:05:21,598: INFO/ForkPoolWorker-16] Task airflow.executors.celery_executor.execute_command[fd4cedea-bbaa-4d92-a536-e19aa0dfc34d] succeeded in 1.8837955820199568s: None
   ```
   
   On Airflow 2.2.3:
   
   ```
   [2022-03-01 14:07:55,928: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'print', 'execute_fn', 'scheduled__2022-03-01T22:06:39.485677+00:00', '--local', '--subdir', 'DAGS_FOLDER/print.py']
   [2022-03-01 14:07:55,928: INFO/ForkPoolWorker-16] Celery task ID: c646cb26-e28a-4b33-8346-0e4ca4060232
   [2022-03-01 14:07:55,978: INFO/ForkPoolWorker-16] Filling up the DagBag from /root/airflow/dags/print.py
   [2022-03-01 14:07:56,049: WARNING/ForkPoolWorker-16] Running <TaskInstance: print.execute_fn scheduled__2022-03-01T22:06:39.485677+00:00 [queued]> on host e2196b74de7f
   [2022-03-01 14:08:01,439: ERROR/ForkPoolWorker-16] Failed to execute task Task received SIGTERM signal.
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 121, in _execute_in_fork
       args.func(args)
     File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
       return f(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
       run_job.run()
     File "/usr/local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/usr/local/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line 103, in _execute
       self.task_runner.start()
     File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
       self.process = self._start_by_fork()
     File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 97, in _start_by_fork
       logging.shutdown()
     File "/usr/lib64/python3.7/logging/__init__.py", line 2036, in shutdown
       h.flush()
     File "/usr/local/lib/python3.7/site-packages/watchtower/__init__.py", line 297, in flush
       q.join()
     File "/usr/lib64/python3.7/queue.py", line 89, in join
       self.all_tasks_done.wait()
     File "/usr/lib64/python3.7/threading.py", line 296, in wait
       waiter.acquire()
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1410, in signal_handler
       raise AirflowException("Task received SIGTERM signal")
   airflow.exceptions.AirflowException: Task received SIGTERM signal
   ```
   
   For reference, this is the DAG I am testing with:
   
   ```python
   from datetime import timedelta
   
   from airflow.decorators import dag, task
   from airflow.utils.dates import days_ago
   
   import os
   from datetime import datetime
   
   NUM_LINES = 10000
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @dag(dag_id=DAG_ID, schedule_interval=timedelta(minutes=1), catchup=False, start_date=days_ago(0), tags=['test'])
   def print_dag():
       @task()
       def execute_fn():
           for i in range(0, NUM_LINES):
               print(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
   
       execute_fn_t = execute_fn()
   
   test_dag_d = print_dag()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055787983


   @andormarkus , hmm this is interesting. I didn't particularly test Airflow 2.2.4, but I tested 2.0.2 + watchtower 2.0.0 and beyond and I couldn't reproduce the issue, so I assumed  since Airflow 2.2.4 is using watchtower 2.0.1, then the issue should be resolved. I will see if I can do some testing and try to reproduce this issue.
   
   > Where should I find the State of this instance has been externally set to success. Terminating instance. warning Message? I can not find in the worker or scheduler logs.
   
   This is a bit tricky unfortunately. The thing is that when you configure Airflow to use CloudWatch logging, but the logging itself has issues, you are likely to miss some logs. On the other hand, when stop using CloudWatch logging, the issue itself disappears (i.e. this is a Heisenbug situation.) In my case, I had to modify Airflow source code locally and then use file based logging like this:
   
   ```
               with open('/tmp/local_task_job.py.log', 'a') as f:
                   print(f"State of this instance has been externally set to {ti.state}. ", file=f)
                   print(f'Dumping stack trace:', file=f)
                   frame = inspect.currentframe()
                   stack_trace = traceback.format_stack(frame)
                   print('\n'.join(stack_trace), file=f)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] density commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
density commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-848400425


   We're also experiencing this issue with ECSOperator and CloudWatch logging. Changing `stopTimeout` in our ECS config didn't fix the issue and it doesn't seem to always set the DAG run to failed for us like it does for @Miksu82.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-870034864


   Hi @potiuk, Who can help us with this bug? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-955644568


   We have fixed this on https://github.com/apache/airflow/pull/18269 released in 2.2.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mtraynham edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
mtraynham edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-766093578


   One thing to note with this issue, I believe this is preventing workers from also consuming more work via a 60 second sleep at the end of every task.  We noticed that there was a ~30-40 second delay between 2 workers running tasks, as if all workers were busy doing something, but nothing was actually happening.  It seemed work had been queued, but was not being performed.
   
   After looking at the logs within for a given task in CloudWatch (which has the added benefit of including times in the log), there's a 60 sleep after SIGTERM to when the process is finally killed with SIGKILL.
   
   ```
   2021-01-23T15:00:02.682Z | Marking task as SUCCESS. dag_id=xxxxxxxx, task_id=xxxxxxx, execution_date=20210123T140000, start_date=20210123T150001, end_date=20210123T150002
   -- | --
     | 2021-01-23T15:00:02.715Z | 0 downstream tasks scheduled from follow-on schedule check
     | 2021-01-23T15:00:06.606Z | State of this instance has been externally set to success. Terminating instance.
     | 2021-01-23T15:00:06.607Z | Sending Signals.SIGTERM to GPID 155
     | 2021-01-23T15:01:06.617Z | process psutil.Process(pid=155, name='airflow task ru', status='sleeping', started='15:00:01') did not respond to SIGTERM. Trying SIGKILL
     | 2021-01-23T15:01:06.625Z | Process psutil.Process(pid=155, name='airflow task ru', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='15:00:01') (155) terminated with exit code Negsignal.SIGKILL
     | 2021-01-23T15:01:06.625Z | Task exited with return code Negsignal.SIGKILL
   
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Miksu82 commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
Miksu82 commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-779759752


   It also seems that this issues sets all the JobRuns to have `failed` state although the TaskInstance is correctly marked as `success`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-883239510


   Hi @ashb
   
   I set `AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False` in Airflow 2.1.2 which does not have #16289.
   
   All the tasks are successful on Airflow side, however in flower all are failed. 
   The logs are showing up in CloudWatch as expected.
   
   My airflow.cfg
   ```bash
   [scheduler]
   dag_dir_list_interval = 60
   parsing_processes = 4
   run_duration = 41460
   schedule_after_task_execution = False
   statsd_host = airflow-statsd
   statsd_on = True
   statsd_port = 9125
   statsd_prefix = airflow
   ```
   
   
   This is how Celery logs look like for a given worker:
   ```bash
   [2021-07-20 09:02:49,057: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'tasks', 'run', 'XXXXXXXX', 'eks.sensor', '2021-07-20T08:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/XXXXXXXX.py']
   [2021-07-20 09:02:49,086: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/XXXXXXXX.py
   [2021-07-20 09:02:51,442: INFO/ForkPoolWorker-15] Datasets List: 2
   [2021-07-20 09:02:51,442: INFO/ForkPoolWorker-15] Start getting tables list from dataset: XXXXXXXX
   [2021-07-20 09:02:51,585: INFO/ForkPoolWorker-15] Start getting tables list from dataset: XXXXXXXX
   [2021-07-20 09:02:52,380: WARNING/ForkPoolWorker-15] Running <TaskInstance: XXXXXXXX.eks.sensor 2021-07-20T08:00:00+00:00 [queued]> on host airflow-worker-7c6b4f75f9-x67r9
   
   [2021-07-20 09:03:18,710: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 117, in _execute_in_fork
       args.func(args)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
       run_job.run()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 100, in _execute
       self.task_runner.start()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
       self.process = self._start_by_fork()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 92, in _start_by_fork
       logging.shutdown()
     File "/usr/local/lib/python3.8/logging/__init__.py", line 2126, in shutdown
       h.flush()
     File "/home/airflow/.local/lib/python3.8/site-packages/watchtower/__init__.py", line 297, in flush
       q.join()
     File "/usr/local/lib/python3.8/queue.py", line 89, in join
       self.all_tasks_done.wait()
     File "/usr/local/lib/python3.8/threading.py", line 302, in wait
       waiter.acquire()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1286, in signal_handler
       raise AirflowException("Task received SIGTERM signal")
   airflow.exceptions.AirflowException: Task received SIGTERM signal
   
   [2021-07-20 09:04:18,738: ERROR/ForkPoolWorker-15] Failed to execute task [Errno 2] No such file or directory: '/tmp/tmpbwn0h8za'.
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 117, in _execute_in_fork
       args.func(args)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
       run_job.run()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 145, in _execute
       self.on_kill()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 166, in on_kill
       self.task_runner.on_finish()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/base_task_runner.py", line 178, in on_finish
       self._error_file.close()
     File "/usr/local/lib/python3.8/tempfile.py", line 499, in close
       self._closer.close()
     File "/usr/local/lib/python3.8/tempfile.py", line 436, in close
       unlink(self.name)
   FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmpbwn0h8za'
   [2021-07-20 09:04:18,834: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[0a8ca64c-01df-4868-a21d-b369d3f7a6cd] raised unexpected: AirflowException('Celery command failed on host: airflow-worker-7c6b4f75f9-x67r9')
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task
       R = retval = fun(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__
       return self.run(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command
       _execute_in_fork(command_to_exec)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
       raise AirflowException('Celery command failed on host: ' + get_hostname())
   airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-7c6b4f75f9-x67r9
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055701303


   I did some dive deep on this issue and root caused it.
   
   **TL;DR** It is related to `watchtower` and has been fixed in version 2.0.0 and later versions. Airflow 2.2.4 now uses that version (see #19907). So, if you are using it, you should be good. If not, then you need to force install `watchtower` version 2.0.0 or later. Notice, however, that `watchtower` made some changes to the `CloudWatchLogHandler` `__init__ ` method so you need to update the relevant code.
    
   ---
   
   The issue is related to a combination of three factors: forking + threading + logging. This combination can lead to a deadlock when logs are being flushed after a task finishes execution. This means that the StandardTaskRunner will be stuck at [this line](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/task/task_runner/standard_task_runner.py#L92). Now, since the task has actually finished (thus its state is success), but the process didn't yet exit (and will never since it is in a deadlock state), the `heartbeat_callback` will end up [thinking](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/jobs/local_task_job.py#L186) that the task state was set externally and issue the following warning and then SIGTERM the task:
   
   ```
   State of this instance has been externally set to success. Terminating instance.
   ```
   
   Notice that this could cause further issues:
   
   - **Could not read remote logs from log_group**: This error happens when we don’t have the necessary log data in CloudWatch. This can easily happen with a task that writes logs at the end which gets interrupted by the SIGTERM and thus no log is published.
   - **Celery command failed on host**: Obviously, when a SIGTERM is sent, the process will exit with a non-zero code, and Airflow ends up generating this error [here](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/executors/celery_executor.py#L98).
   - **Tasks executing multiple times**: In case a Celery Executor + SQS is used (as in Amazon MWAA for example), and since Airflow uses Celery's [`ack_late` feature](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late) (see [here](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/config_templates/default_celery.py#L43)), a SIGTERM signal will result in the task message not to be deleted from the SQS queue, and thus after its timeout, it will go back to the queue and will be picked again by another worker.
   
   ---
   
   ## References
   
   - https://github.com/kislyuk/watchtower/pull/139
   - https://github.com/kislyuk/watchtower/issues/141
   - https://bugs.python.org/issue6721
   - https://bugs.python.org/issue40089
   - https://bugs.python.org/issue874900
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055787983


   @andormarkus , hmm this is interesting. I didn't particularly test Airflow 2.2.4, but I tested 2.0.2 + watchtower 2.0.0 and beyond and I couldn't reproduce the issue, so I assumed  since Airflow 2.2.4 is using watchtower 2.0.1, then the issue should be resolved. I will see if I can do some testing and try to reproduce this issue.
   
   > Where should I find the State of this instance has been externally set to success. Terminating instance. warning Message? I can not find in the worker or scheduler logs.
   
   This is a bit tricky unfortunately. The thing is that when you configure Airflow to use CloudWatch logging, but the logging itself has issues, you are likely to miss some logs. On the other hand, when stop using CloudWatch logging, the issue itself disappears (i.e. this is a Heisenbug situation.) In my case, I had to modify Airflow source code locally and then use file based logging like this:
   
   ```
               with open('/tmp/local_task_job.py.log', 'a') as f:
                   print(f"State of this instance has been externally set to {ti.state}. ", file=f)
                   print(f"State before refresh_from_db() call was {state_before_refresh}. ", file=f)
                   print(f"State after refresh_from_db() call was {state_after_refresh}. ", file=f)
                   print(f'Dumping stack trace:', file=f)
                   frame = inspect.currentframe()
                   stack_trace = traceback.format_stack(frame)
                   print('\n'.join(stack_trace), file=f)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-883239510


   Hi @ashb
   
   I set `AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False` in Airflow 2.1.2 which does not have #16289.
   
   All the tasks are successful on Airflow side, however in flower all are failed. 
   The logs are showing up in CloudWatch as expected.
   
   My airflow.cfg
   ```bash
   [scheduler]
   dag_dir_list_interval = 60
   parsing_processes = 4
   run_duration = 41460
   schedule_after_task_execution = False
   statsd_host = airflow-statsd
   statsd_on = True
   statsd_port = 9125
   statsd_prefix = airflow
   ```
   
   
   This is how Celery logs look like for a given worker:
   ```bash
   [2021-07-20 09:02:49,057: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'tasks', 'run', 'XXXXXXXX', 'eks.sensor', '2021-07-20T08:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/XXXXXXXX.py']
   [2021-07-20 09:02:49,086: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/XXXXXXXX.py
   [2021-07-20 09:02:51,442: INFO/ForkPoolWorker-15] Datasets List: 2
   [2021-07-20 09:02:51,442: INFO/ForkPoolWorker-15] Start getting tables list from dataset: XXXXXXXX
   [2021-07-20 09:02:51,585: INFO/ForkPoolWorker-15] Start getting tables list from dataset: XXXXXXXX
   [2021-07-20 09:02:52,380: WARNING/ForkPoolWorker-15] Running <TaskInstance: XXXXXXXX.eks.sensor 2021-07-20T08:00:00+00:00 [queued]> on host airflow-worker-7c6b4f75f9-x67r9
   
   [2021-07-20 09:03:18,710: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 117, in _execute_in_fork
       args.func(args)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
       run_job.run()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 100, in _execute
       self.task_runner.start()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
       self.process = self._start_by_fork()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 92, in _start_by_fork
       logging.shutdown()
     File "/usr/local/lib/python3.8/logging/__init__.py", line 2126, in shutdown
       h.flush()
     File "/home/airflow/.local/lib/python3.8/site-packages/watchtower/__init__.py", line 297, in flush
       q.join()
     File "/usr/local/lib/python3.8/queue.py", line 89, in join
       self.all_tasks_done.wait()
     File "/usr/local/lib/python3.8/threading.py", line 302, in wait
       waiter.acquire()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1286, in signal_handler
       raise AirflowException("Task received SIGTERM signal")
   airflow.exceptions.AirflowException: Task received SIGTERM signal
   
   [2021-07-20 09:04:18,738: ERROR/ForkPoolWorker-15] Failed to execute task [Errno 2] No such file or directory: '/tmp/tmpbwn0h8za'.
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 117, in _execute_in_fork
       args.func(args)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
       run_job.run()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 145, in _execute
       self.on_kill()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 166, in on_kill
       self.task_runner.on_finish()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/base_task_runner.py", line 178, in on_finish
       self._error_file.close()
     File "/usr/local/lib/python3.8/tempfile.py", line 499, in close
       self._closer.close()
     File "/usr/local/lib/python3.8/tempfile.py", line 436, in close
       unlink(self.name)
   FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmpbwn0h8za'
   [2021-07-20 09:04:18,834: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[0a8ca64c-01df-4868-a21d-b369d3f7a6cd] raised unexpected: AirflowException('Celery command failed on host: airflow-worker-7c6b4f75f9-x67r9')
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task
       R = retval = fun(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__
       return self.run(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command
       _execute_in_fork(command_to_exec)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
       raise AirflowException('Celery command failed on host: ' + get_hostname())
   airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-7c6b4f75f9-x67r9
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kdickinson87 commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
kdickinson87 commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-954000078


   Has there been any movement in this, or any proposed fixes? I'm still getting the temp file location error. We are are 2.0.1, though I spun up a local container of 2.2.0, and still experienced this error, so that leads me to believe it still exists. Any updates?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-883239510


   Hi @ashb
   
   I set `AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False` in Airflow 2.1.2 which does not have #16289.
   
   All the tasks are successful on Airflow side, however in flower all are failed. 
   The logs are showing up in CloudWatch as expected.
   
   My airflow.cfg
   ```bash
   [scheduler]
   dag_dir_list_interval = 60
   parsing_processes = 4
   run_duration = 41460
   schedule_after_task_execution = False
   statsd_host = airflow-statsd
   statsd_on = True
   statsd_port = 9125
   statsd_prefix = airflow
   ```
   
   
   This is how Celery logs look like for a given worker:
   ```bash
   [2021-07-20 09:02:49,057: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'tasks', 'run', 'XXXXXXXX', 'eks.sensor', '2021-07-20T08:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/XXXXXXXX.py']
   [2021-07-20 09:02:49,086: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/XXXXXXXX.py
   [2021-07-20 09:02:51,442: INFO/ForkPoolWorker-15] Datasets List: 2
   [2021-07-20 09:02:51,442: INFO/ForkPoolWorker-15] Start getting tables list from dataset: XXXXXXXX
   [2021-07-20 09:02:51,585: INFO/ForkPoolWorker-15] Start getting tables list from dataset: XXXXXXXX
   [2021-07-20 09:02:52,380: WARNING/ForkPoolWorker-15] Running <TaskInstance: XXXXXXXX.eks.sensor 2021-07-20T08:00:00+00:00 [queued]> on host airflow-worker-7c6b4f75f9-x67r9
   
   [2021-07-20 09:03:18,710: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 117, in _execute_in_fork
       args.func(args)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
       run_job.run()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 100, in _execute
       self.task_runner.start()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
       self.process = self._start_by_fork()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 92, in _start_by_fork
       logging.shutdown()
     File "/usr/local/lib/python3.8/logging/__init__.py", line 2126, in shutdown
       h.flush()
     File "/home/airflow/.local/lib/python3.8/site-packages/watchtower/__init__.py", line 297, in flush
       q.join()
     File "/usr/local/lib/python3.8/queue.py", line 89, in join
       self.all_tasks_done.wait()
     File "/usr/local/lib/python3.8/threading.py", line 302, in wait
       waiter.acquire()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1286, in signal_handler
       raise AirflowException("Task received SIGTERM signal")
   airflow.exceptions.AirflowException: Task received SIGTERM signal
   
   [2021-07-20 09:04:18,738: ERROR/ForkPoolWorker-15] Failed to execute task [Errno 2] No such file or directory: '/tmp/tmpbwn0h8za'.
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 117, in _execute_in_fork
       args.func(args)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
       run_job.run()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 145, in _execute
       self.on_kill()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 166, in on_kill
       self.task_runner.on_finish()
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/base_task_runner.py", line 178, in on_finish
       self._error_file.close()
     File "/usr/local/lib/python3.8/tempfile.py", line 499, in close
       self._closer.close()
     File "/usr/local/lib/python3.8/tempfile.py", line 436, in close
       unlink(self.name)
   FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmpbwn0h8za'
   [2021-07-20 09:04:18,834: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[0a8ca64c-01df-4868-a21d-b369d3f7a6cd] raised unexpected: AirflowException('Celery command failed on host: airflow-worker-7c6b4f75f9-x67r9')
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task
       R = retval = fun(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__
       return self.run(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command
       _execute_in_fork(command_to_exec)
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
       raise AirflowException('Celery command failed on host: ' + get_hostname())
   airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-7c6b4f75f9-x67r9
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-870566777


   Should I set `AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False` in 2.0.2 or wait till #16289 is released?
   Is #16289 part of 2.1.1 or we have to wait till 2.1.2?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055914397


   Hi @rafidka,
   
   I’m running 5 ‘simple_dag.py’ parallel and I got one error every 5-10 minutes. 
   
   I will deploy your dag tomorrow.
   
   Im running my code on Airflow 2.2.4 with watchtower 2.0.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1057844236


   Hello,
   
   We are running our Airflow on AWS EKS with the latest helm chart and official docker image. I don't think it is setup issue.
   
   I was able to reproduce it locally as well with docker compose:
   
   1. Download the official docker compose file `curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml` .
   2. Create a test CloudWatch log group.
   3. Create `dags` directory with `simple_dag.py` 5 times.
   4. Edit the `x-airflow-common.environment` section in the `docker-compose.yaml` file the following way:  
      Disable example dags
      ```yaml
      AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
      ```
      Add the following environment variables. Adjust your `region` according to your preferences.
       ```yaml
       AWS_DEFAULT_REGION: 'eu-central-1'
       AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
       AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
       AWS_SESSION_TOKEN: ${AWS_SESSION_TOKEN}
       AIRFLOW_CONN_AWS_DEFAULT: "aws://?region_name=eu-central-1"
      
       AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
       AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: AWS_DEFAULT
       AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: cloudwatch://<cloudwatch-log-group-arn>
   
       ```
   5. Insert you AWS environment variables into terminal. (we are using AWS SSO to login etc...)
       ```shell
       export AWS_ACCESS_KEY_ID="########"
       export AWS_SECRET_ACCESS_KEY="########"
       export AWS_SESSION_TOKEN="########"
       ```
   6. Setting the right Airflow user: `echo -e "AIRFLOW_UID=$(id -u)" > .env`
   7. Initiate docker compose environment: `docker compose up -d`
   8. Login to webui <http://localhost:8080/> and enable dags.
   ![Screen Shot 2022-03-03 at 09 29 51](https://user-images.githubusercontent.com/51825189/156529851-1e8a1877-b348-41fd-967b-88db35b1131f.png)
   
   9. Check the worker logs after minimum 60 minutes. **It only happens only 1-2% of the dag runs.**
       ```shell
       ▶ docker logs airflow_dag_test-airflow-worker-1 --since=60m 2>&1 | grep ERROR
       [2022-03-03 08:34:21,817: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 08:38:22,608: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 09:01:21,553: ERROR/ForkPoolWorker-16] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 09:11:21,887: ERROR/ForkPoolWorker-3] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 09:11:21,898: ERROR/ForkPoolWorker-2] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 09:15:21,876: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 09:15:21,876: ERROR/ForkPoolWorker-1] Failed to execute task Task received SIGTERM signal.
       ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1057844236


   Hello,
   
   We are running our Airflow on AWS EKS with the latest helm chart and official docker image. I don't think it is setup issue.
   
   I was able to reproduce it locally as well with docker compose:
   
   1. Download the official docker compose file `curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml` .
   2. Create a test CloudWatch log group.
   3. Create `dags` directory with `simple_dag.py` 5 times.
   4. Edit the `x-airflow-common.environment` section in the `docker-compose.yaml` file the following way:  
      Leave the default environment variables as is. Disable the loading of example dags with changing the following environment variable from `true` to `false`.
      ```yaml
      AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
      ```
      Add the following environment variables. Adjust your `region` according to your preferences.
       ```yaml
       AWS_DEFAULT_REGION: 'eu-central-1'
       AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
       AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
       AWS_SESSION_TOKEN: ${AWS_SESSION_TOKEN}
       AIRFLOW_CONN_AWS_DEFAULT: "aws://?region_name=eu-central-1"
      
       AIRFLOW__LOGGING__REMOTE_LOGGING: 'true'
       AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: AWS_DEFAULT
       AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: cloudwatch://<cloudwatch-log-group-arn>
   
       ```
   
      On the end  it should looks like this:  
      ```yaml
      environment:
       &airflow-common-env
       AIRFLOW__CORE__EXECUTOR: CeleryExecutor
       AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
       AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
       AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
       AIRFLOW__CORE__FERNET_KEY: ''
       AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
       AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
       AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
       _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
   
       AWS_DEFAULT_REGION: 'eu-central-1'
       AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
       AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
       AWS_SESSION_TOKEN: ${AWS_SESSION_TOKEN}
       AIRFLOW_CONN_AWS_DEFAULT: "aws://?region_name=eu-central-1"
      
       AIRFLOW__LOGGING__REMOTE_LOGGING: 'true'
       AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: AWS_DEFAULT
       AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: cloudwatch://<cloudwatch-log-group-arn>
      ```
   
   
   6. Insert you AWS environment variables into terminal. (we are using AWS SSO to login etc...)
       ```shell
       export AWS_ACCESS_KEY_ID="########"
       export AWS_SECRET_ACCESS_KEY="########"
       export AWS_SESSION_TOKEN="########"
       ```
   7. Setting the right Airflow user: `echo -e "AIRFLOW_UID=$(id -u)" > .env`
   8. Initiate docker compose environment: `docker compose up -d`
   9. Login to webui <http://localhost:8080/> and enable dags.
   ![Screen Shot 2022-03-03 at 09 29 51](https://user-images.githubusercontent.com/51825189/156529851-1e8a1877-b348-41fd-967b-88db35b1131f.png)
   
   10. Check the worker logs after minimum 60 minutes. **It only happens only 1-2% of the dag runs.**
       ```shell
       ▶ docker logs airflow_dag_test-airflow-worker-1 --since=60m 2>&1 | grep ERROR
       [2022-03-03 08:34:21,817: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 08:38:22,608: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 09:01:21,553: ERROR/ForkPoolWorker-16] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 09:11:21,887: ERROR/ForkPoolWorker-3] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 09:11:21,898: ERROR/ForkPoolWorker-2] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 09:15:21,876: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
       [2022-03-03 09:15:21,876: ERROR/ForkPoolWorker-1] Failed to execute task Task received SIGTERM signal.
       ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] john-jac commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
john-jac commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-918539921


   Hi Folks...have there been any new insights into this issue? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-883431785


   @andormarkus can you set this:
   `AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL=84600` and let me know the result?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rafidka edited a comment on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
rafidka edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055701303


   I did some dive deep on this issue and root caused it.
   
   **TL;DR** It is related to `watchtower` and has been fixed in version 2.0.0 and later versions. Airflow 2.2.4 now uses that version (see #19907). So, if you are using it, you should be good. If not, then you need to force install `watchtower` version 2.0.0 or later. Notice, however, that `watchtower` made some changes to the `CloudWatchLogHandler` `__init__ ` method so you need to update the relevant code.
    
   ---
   
   The issue is related to a combination of three factors: forking + threading + logging. This combination can lead to a deadlock when logs are being flushed after a task finishes execution. This means that the StandardTaskRunner will be stuck at [this line](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/task/task_runner/standard_task_runner.py#L92). Now, since the task has actually finished (thus its state is success), but the process didn't yet exit (and will never since it is in a deadlock state), the `heartbeat_callback` will end up thinking that the task state was set externally and issue the following warning and then SIGTERM the task:
   
   ```
   State of this instance has been externally set to success. Terminating instance.
   ```
   
   ---
   
   ## References
   
   - https://github.com/kislyuk/watchtower/pull/139
   - https://github.com/kislyuk/watchtower/issues/141
   - https://bugs.python.org/issue6721
   - https://bugs.python.org/issue40089
   - https://bugs.python.org/issue874900
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055782143


   Hi @rafidka, thanks, for the update. 
   
   Here is not my current findings:
   
   Used versions:
   ```yaml
   airflow: 2.2.4
   watchtower: 2.0.1
   ```
   
   Test setup: I'm running 5 [simple dag](https://github.com/apache/airflow/issues/13824#issuecomment-1012838195) every in the past few days:
   
   <img width="1731" alt="Screen Shot 2022-03-01 at 19 31 50" src="https://user-images.githubusercontent.com/51825189/156227973-6f3bba96-9581-4b9c-bda4-07d389c97550.png">
   
   From flower perspective everything looks good:
   <img width="1727" alt="Screen Shot 2022-03-01 at 19 33 12" src="https://user-images.githubusercontent.com/51825189/156228089-070f61cf-03d4-4cbe-b7c4-61cdecedd66e.png">
   
   
   I have checked the worker logs and few times per hour I get the following error messages:
   ```bash
   [2022-03-01 18:21:24,954: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/executors/celery_executor.py", line 121, in _execute_in_fork
       args.func(args)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
       run_job.run()
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 246, in run
       self._execute()
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 103, in _execute
       self.task_runner.start()
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
       self.process = self._start_by_fork()
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 97, in _start_by_fork
       logging.shutdown()
     File "/usr/local/lib/python3.9/logging/__init__.py", line 2141, in shutdown
       h.flush()
     File "/home/airflow/.local/lib/python3.9/site-packages/watchtower/__init__.py", line 432, in flush
       q.join()
     File "/usr/local/lib/python3.9/queue.py", line 90, in join
       self.all_tasks_done.wait()
     File "/usr/local/lib/python3.9/threading.py", line 312, in wait
       waiter.acquire()
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1415, in signal_handler
   ```
   
   
   Worker logs for a failed task looks like this:
   ```bash
   ▶ kubectl -n airflow logs airflow-worker worker | grep ForkPoolWorker-16
   
   [2022-03-01 19:08:00,654: INFO/ForkPoolWorker-16] Celery task ID: fee17d13-2423-4ed1-ab2f-3f1a3fd34551
   [2022-03-01 19:08:00,709: INFO/ForkPoolWorker-16] Filling up the DagBag from /opt/airflow/dags/repo/dags/simple_dag_2.py
   [2022-03-01 19:08:00,842: WARNING/ForkPoolWorker-16] Running <TaskInstance: simple_dag_2.sleep scheduled__2022-03-01T19:07:00+00:00 [queued]> on host airflow-worker-58b8d8789b-w7jwv
   [2022-03-01 19:08:24,702: ERROR/ForkPoolWorker-16] Failed to execute task Task received SIGTERM signal.
   [2022-03-01 19:08:25,170: INFO/ForkPoolWorker-16] Task airflow.executors.celery_executor.execute_command[fee17d13-2423-4ed1-ab2f-3f1a3fd34551] succeeded in 24.535810169007163s: None
   ```
   
   @rafidka Where should I find the `State of this instance has been externally set to success. Terminating instance.` warning Message? I can not find in the worker or scheduler logs.
   
   Based on my testing `Celery command failed on host` error was fixed with Airflow 2.2.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] john-jac commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
john-jac commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1059316054


   Thanks @andormarkus !  Are you able to reproduce when using the celery executor?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andormarkus commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
andormarkus commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1012838195


   Hi @o-nikolas 
   
   We are on helm chart `1.3.0` and airlfow `2.2.2-python3.9`. Airflow is running on AWS EKS with celery executor (keda enabled). The logs attached in my previous comment was from the celery worker logs. 
   
   Please let me know if you need more information.
   
   Relevant section from helm chart configuration
   ```yaml
   config:
     logging:
       colored_console_log: "False"
       remote_logging: "True"
       remote_log_conn_id: aws_default
       remote_base_log_folder: "cloudwatch://${log_group_arn}"
   ```
   
   `simple_dag `
   ```python
   """Sample DAG."""
   
   import time
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       "start_date": datetime(2020, 1, 1),
       "email": ["support@airflow.com"],
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 1,
       "retry_delay": timedelta(minutes=5),
   }
   
   
   def sleep() -> bool:
       """Sleep.
   
       Returns:
           bool: True
       """
       time.sleep(10)
       return True
   
   
   with DAG("simple_dag", default_args=default_args, schedule_interval="* * * * *", catchup=False) as dag:
       t1 = PythonOperator(task_id="sleep", python_callable=sleep)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on issue #13824: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1012774813


   Hey folks,
   
   I was looking into this today, but I can't seem to reproduce it.
   
   @andormarkus I see in your last post you're using 2.2.2 and are running some `simple_dag` (specifically a `sleep` task). I'm assuming that's a test dag. Do you mind sharing it to help me reproduce the error? Along with any other relevant environement configurations you're using. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org