You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "deepaktripathi1997 (via GitHub)" <gi...@apache.org> on 2023/02/15 19:37:49 UTC
[GitHub] [airflow] deepaktripathi1997 opened a new issue, #29557: Execution Timeout is not working properly on airflow 2.5.0
deepaktripathi1997 opened a new issue, #29557:
URL: https://github.com/apache/airflow/issues/29557
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
We're experiencing occasional issues with tasks that have specified an 'execution_timeout'. Despite the process being timed out, the task remains stuck in a 'running' state for several hours.
The task looks like this:
```
@task(task_id='save_job_details', retries=1, retry_delay=5, execution_timeout=execution_timeout,
pool='nrt_save_job_details', trigger_rule=TriggerRule.NONE_FAILED)
def save_job_details(**context):
ti = context['ti']
connection_details = Connection.get_connection_from_secrets(conn_id='redshift_etl_flack')
# Pulling all the XCOMS
job_status = ti.xcom_pull(task_ids='running_redshift_queries', key='job_status')
last_run_epoch = ti.xcom_pull(task_ids='list_running_epochs', key='last_run_epoch')
schema_change = ti.xcom_pull(task_ids='list_running_epochs', key='schema_change')
schema_epoch = ti.xcom_pull(task_ids='list_running_epochs', key='schema_epoch')
before_count = ti.xcom_pull(task_ids='running_redshift_queries', key='before_count')
after_count = ti.xcom_pull(task_ids='running_redshift_queries', key='after_count')
max_kt = ti.xcom_pull(task_ids='running_redshift_queries', key='max_kt')
print(f"Job status of previous task -> {job_status}")
print(f"last run epoch received of previous task -> {last_run_epoch}")
# Creating DAG URL
dag_id = context['dag'].dag_id
dag_run_context = context["dag_run"]
run_id = dag_run_context.run_id
dag_url = f"""http://10.249.5.183:3000/airflow/dags/{dag_id}/graph?root=&dag_run_id={run_id}"""
table_id = table['table_id']
# Finding event_type
conf = dag_run_context.conf
event_type = "all" if 'event_type' not in conf else conf['event_type']
# Saving Job details in RDS
# Case if no epoch was processed
if job_status is None:
job_status = 2
if job_status == 1 and last_run_epoch is None:
job_status = 0
if job_status in (0, 2, 3) or last_run_epoch is None or event_type != 'all':
cmd = f"""insert into nrt_jobs (table_id, status, table_name)
values ({table_id}, {job_status}, '{destination_table_name}')
"""
else:
cmd = f"""insert into nrt_jobs (table_id, status, table_name, last_run_epoch)
values ({table_id}, {job_status}, '{destination_table_name}', {last_run_epoch})
"""
print(cmd)
connection = mysql_hook_nrt.get_conn()
# saving the stats
stats_path = f"{nrt_variables['stats_path_s3'].rstrip('/')}/{table['source']}/{table['db']}/{table['table_name']}/ingestion.json"
stats_parsed = urlparse(stats_path)
stats_bucket = stats_parsed.netloc
stats_prefix = stats_parsed.path.lstrip('/')
stats = {
"source": table['source'],
"db": table['db'],
"table_name": table['table_name'],
"before_count": before_count,
"after_count": after_count,
"max_kafka_timestamp": max_kt,
"addedon": pendulum.now().to_datetime_string()
}
try:
cur = connection.cursor()
cur.execute(cmd)
stats_s3_hook = S3Hook()
stats_s3_hook.load_string(bucket_name=stats_bucket, key=stats_prefix,
string_data=json.dumps(stats, indent=2, default=str), replace=True)
except (ClientError, MySQLdb.Error) as e:
connection.rollback()
raise AirflowFailException(f"Not able to save stats due to {e}")
finally:
connection.commit()
connection.close()
print("Stats saved")
print("Job details saved")
# CDC code
if schema_change:
try:
cdc_s3_hook = S3Hook()
CDC(table_name=destination_table_name, ingestion_schema=ingestion_schema,
data_dumps_schema=data_dumps_schema).driver_function()
cdc_s3_hook.load_string(bucket_name=bucket, key=f"{schema_epoch}_CDC_DONE", string_data='')
except (ClientError, MySQLdb.Error) as e:
print(f"Outer: Failed CDC due to {e}")
raise AirflowFailException(f"Outer: Failed CDC due to {e}")
if os.path.exists(f"redshift_queries_{destination_table_name}"):
os.remove(f"redshift_queries_{destination_table_name}")
if os.path.exists(f"epoch_list_{destination_table_name}"):
os.remove(f"epoch_list_{destination_table_name}")
```
Value for `execution_timeout=pendulum.duration(minutes=3)`
**Task Logs:**
*** Falling back to local log
*** Reading local file: /home/deploy/ssot-airflow/logs/dag_id=realtime_payout_payout_Beneficiary/run_id=scheduled__2023-02-15T13:17:00+00:00/task_id=save_job_details/attempt=1.log
--------------------------------------------------------------------------------
[2023-02-15, 20:55:21 IST] {logging_mixin.py:137} INFO - Job status of previous task -> 1
[2023-02-15, 20:55:21 IST] {logging_mixin.py:137} INFO - last run epoch received of previous task -> 1676474400
[2023-02-15, 20:55:21 IST] {logging_mixin.py:137} INFO - insert into ***
[2023-02-15, 20:55:21 IST] {base.py:73} INFO - Using connection ID 'nrt_rds' for task execution.
[2023-02-15, 20:55:21 IST] {base.py:73} INFO - Using connection ID 'aws_default' for task execution.
[2023-02-15, 20:55:21 IST] {credentials.py:1049} INFO - Found credentials from IAM Role: ssot-prod-role
[2023-02-15, 20:58:20 IST] {timeout.py:68} ERROR - Process timed out, PID: 11392
The task is running for past 1.5 hours and not failing.
**Operating System**
Virtualization: kvm
Operating System: CentOS Linux 7 (Core)
CPE OS Name: cpe:/o:centos:centos:7
Kernel: Linux 6.0.10-1.el7.elrepo.x86_64
Architecture: x86-64
<img width="1385" alt="image" src="https://user-images.githubusercontent.com/25430062/219133170-ffd654da-b70d-4dad-9747-55563e7554b7.png">
This happens only when there is some timeout in this task .
### What you think should happen instead
_No response_
### How to reproduce
_No response_
### Operating System
CentOS Linux 7 (Core)
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==6.2.0
apache-airflow-providers-celery==3.1.0
apache-airflow-providers-common-sql==1.3.1
apache-airflow-providers-ftp==3.1.0
apache-airflow-providers-http==4.0.0
apache-airflow-providers-imap==3.0.0
apache-airflow-providers-mysql==3.4.0
apache-airflow-providers-postgres==5.0.0
apache-airflow-providers-sftp==4.2.0
apache-airflow-providers-sqlite==3.2.1
apache-airflow-providers-ssh==3.3.0
### Deployment
Virtualenv installation
### Deployment details
_CPU: 64 core
Mem: 256 GB
worker_autoscale: 1024, 256_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] hussein-awala commented on issue #29557: Execution Timeout is not working properly on airflow 2.5.0
Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on issue #29557:
URL: https://github.com/apache/airflow/issues/29557#issuecomment-1436106024
Airflow raises `AirflowTaskTimeout` exception when the task timed out, and your code can catch this exception and handle it if needed.
Since you have `[2023-02-15, 20:58:20 IST] {timeout.py:68} ERROR - Process timed out, PID: 11392` in the log, the timeout exception is raised, but it seems like your code is stuck in one of the finally blocks, where python calls finally before raising the exception. I cannot check whats wrong with your operator, but I can recommend some steps to debug the problem.
First, here is a simple exemple:
```python
import datetime
import time
from typing import Any
import pendulum
from airflow.models import BaseOperator
from airflow.models.dag import dag
from airflow.utils.context import Context
class MyOperator(BaseOperator):
def execute(self, context: Context) -> Any:
try:
print("try")
time.sleep(120)
except Exception as e:
print(e)
raise e
finally:
print("finally")
time.sleep(120)
@dag(
schedule=None,
start_date=pendulum.yesterday(),
)
def timeout_dag():
MyOperator(task_id="test_task", execution_timeout=datetime.timedelta(seconds=30))
timeout_dag()
```
And here is the log
```
[2023-02-19, 21:27:29 UTC] {logging_mixin.py:149} INFO - try
[2023-02-19, 21:27:59 UTC] {timeout.py:68} ERROR - Process timed out, PID: 98045
[2023-02-19, 21:27:59 UTC] {logging_mixin.py:149} INFO - Timeout, PID: 98045
[2023-02-19, 21:27:59 UTC] {logging_mixin.py:149} INFO - finally
[2023-02-19, 21:29:59 UTC] {taskinstance.py:1837} ERROR - Task failed with exception
Traceback (most recent call last):
File "/files/dags/dag18.py", line 19, in execute
raise e
File "/files/dags/dag18.py", line 16, in execute
time.sleep(120)
File "/opt/airflow/airflow/utils/timeout.py", line 69, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 98045
```
To detect the problem, you can create a simple test to execute your operator in the IDE in debug mode, and use breakpoints at each step to follow the call stack. If you find this complicated, you can add a `print` after each line and read the log from the UI to find where the task is stuck.
Also if you have some special actions to do when the task is timed out, you can add this in a new except block:
```python
except AirflowTaskTimeout as timeout_exception:
do_something()
raise timeout_exception
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eladkal closed issue #29557: Execution Timeout is not working properly on airflow 2.5.0
Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal closed issue #29557: Execution Timeout is not working properly on airflow 2.5.0
URL: https://github.com/apache/airflow/issues/29557
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #29557: Execution Timeout is not working properly on airflow 2.5.0
Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on issue #29557:
URL: https://github.com/apache/airflow/issues/29557#issuecomment-1431913769
Thanks for opening your first issue here! Be sure to follow the issue template!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org