You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "deepaktripathi1997 (via GitHub)" <gi...@apache.org> on 2023/02/15 19:37:49 UTC

[GitHub] [airflow] deepaktripathi1997 opened a new issue, #29557: Execution Timeout is not working properly on airflow 2.5.0

deepaktripathi1997 opened a new issue, #29557:
URL: https://github.com/apache/airflow/issues/29557

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   We're experiencing occasional issues with tasks that have specified an 'execution_timeout'. Despite the process being timed out, the task remains stuck in a 'running' state for several hours.
   
   The task looks like this:
   
   ```
   @task(task_id='save_job_details', retries=1, retry_delay=5, execution_timeout=execution_timeout,
             pool='nrt_save_job_details', trigger_rule=TriggerRule.NONE_FAILED)
       def save_job_details(**context):
           ti = context['ti']
           connection_details = Connection.get_connection_from_secrets(conn_id='redshift_etl_flack')
   
           # Pulling all the XCOMS
           job_status = ti.xcom_pull(task_ids='running_redshift_queries', key='job_status')
           last_run_epoch = ti.xcom_pull(task_ids='list_running_epochs', key='last_run_epoch')
           schema_change = ti.xcom_pull(task_ids='list_running_epochs', key='schema_change')
           schema_epoch = ti.xcom_pull(task_ids='list_running_epochs', key='schema_epoch')
           before_count = ti.xcom_pull(task_ids='running_redshift_queries', key='before_count')
           after_count = ti.xcom_pull(task_ids='running_redshift_queries', key='after_count')
           max_kt = ti.xcom_pull(task_ids='running_redshift_queries', key='max_kt')
           print(f"Job status of previous task -> {job_status}")
           print(f"last run epoch received of previous task -> {last_run_epoch}")
   
           # Creating DAG URL
           dag_id = context['dag'].dag_id
           dag_run_context = context["dag_run"]
           run_id = dag_run_context.run_id
           dag_url = f"""http://10.249.5.183:3000/airflow/dags/{dag_id}/graph?root=&dag_run_id={run_id}"""
           table_id = table['table_id']
   
           # Finding event_type
           conf = dag_run_context.conf
           event_type = "all" if 'event_type' not in conf else conf['event_type']
   
           # Saving Job details in RDS
   
           # Case if no epoch was processed
           if job_status is None:
               job_status = 2
           if job_status == 1 and last_run_epoch is None:
               job_status = 0
   
           if job_status in (0, 2, 3) or last_run_epoch is None or event_type != 'all':
               cmd = f"""insert into nrt_jobs (table_id, status, table_name)
                       values ({table_id}, {job_status}, '{destination_table_name}')
                        """
           else:
               cmd = f"""insert into nrt_jobs (table_id, status, table_name, last_run_epoch)
                                   values ({table_id}, {job_status}, '{destination_table_name}', {last_run_epoch})
                                    """
           print(cmd)
   
           connection = mysql_hook_nrt.get_conn()
   
           # saving the stats
           stats_path = f"{nrt_variables['stats_path_s3'].rstrip('/')}/{table['source']}/{table['db']}/{table['table_name']}/ingestion.json"
           stats_parsed = urlparse(stats_path)
           stats_bucket = stats_parsed.netloc
           stats_prefix = stats_parsed.path.lstrip('/')
   
           stats = {
               "source": table['source'],
               "db": table['db'],
               "table_name": table['table_name'],
               "before_count": before_count,
               "after_count": after_count,
               "max_kafka_timestamp": max_kt,
               "addedon": pendulum.now().to_datetime_string()
           }
   
           try:
               cur = connection.cursor()
               cur.execute(cmd)
               stats_s3_hook = S3Hook()
               stats_s3_hook.load_string(bucket_name=stats_bucket, key=stats_prefix,
                                         string_data=json.dumps(stats, indent=2, default=str), replace=True)
           except (ClientError, MySQLdb.Error) as e:
               connection.rollback()
               raise AirflowFailException(f"Not able to save stats due to {e}")
           finally:
               connection.commit()
               connection.close()
               print("Stats saved")
               print("Job details saved")
   
           # CDC code
           if schema_change:
               try:
                   cdc_s3_hook = S3Hook()
   
                   CDC(table_name=destination_table_name, ingestion_schema=ingestion_schema,
                       data_dumps_schema=data_dumps_schema).driver_function()
                   cdc_s3_hook.load_string(bucket_name=bucket, key=f"{schema_epoch}_CDC_DONE", string_data='')
               except (ClientError, MySQLdb.Error) as e:
                   print(f"Outer: Failed CDC due to {e}")
                   raise AirflowFailException(f"Outer: Failed CDC due to {e}")
   
           if os.path.exists(f"redshift_queries_{destination_table_name}"):
               os.remove(f"redshift_queries_{destination_table_name}")
   
           if os.path.exists(f"epoch_list_{destination_table_name}"):
               os.remove(f"epoch_list_{destination_table_name}")
   ```
   Value for `execution_timeout=pendulum.duration(minutes=3)`
   
   **Task Logs:**
   *** Falling back to local log
   *** Reading local file: /home/deploy/ssot-airflow/logs/dag_id=realtime_payout_payout_Beneficiary/run_id=scheduled__2023-02-15T13:17:00+00:00/task_id=save_job_details/attempt=1.log
   
   --------------------------------------------------------------------------------
   [2023-02-15, 20:55:21 IST] {logging_mixin.py:137} INFO - Job status of previous task -> 1
   [2023-02-15, 20:55:21 IST] {logging_mixin.py:137} INFO - last run epoch received of previous task -> 1676474400
   [2023-02-15, 20:55:21 IST] {logging_mixin.py:137} INFO - insert into ***
                                    
   [2023-02-15, 20:55:21 IST] {base.py:73} INFO - Using connection ID 'nrt_rds' for task execution.
   [2023-02-15, 20:55:21 IST] {base.py:73} INFO - Using connection ID 'aws_default' for task execution.
   [2023-02-15, 20:55:21 IST] {credentials.py:1049} INFO - Found credentials from IAM Role: ssot-prod-role
   [2023-02-15, 20:58:20 IST] {timeout.py:68} ERROR - Process timed out, PID: 11392
   
   
   The task is running for past 1.5 hours and not failing.
   
   **Operating System**
   Virtualization: kvm
     Operating System: CentOS Linux 7 (Core)
          CPE OS Name: cpe:/o:centos:centos:7
               Kernel: Linux 6.0.10-1.el7.elrepo.x86_64
         Architecture: x86-64
   
   <img width="1385" alt="image" src="https://user-images.githubusercontent.com/25430062/219133170-ffd654da-b70d-4dad-9747-55563e7554b7.png">
   
   This happens only when there is some timeout in this task .
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   CentOS Linux 7 (Core)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==6.2.0
   apache-airflow-providers-celery==3.1.0
   apache-airflow-providers-common-sql==1.3.1
   apache-airflow-providers-ftp==3.1.0
   apache-airflow-providers-http==4.0.0
   apache-airflow-providers-imap==3.0.0
   apache-airflow-providers-mysql==3.4.0
   apache-airflow-providers-postgres==5.0.0
   apache-airflow-providers-sftp==4.2.0
   apache-airflow-providers-sqlite==3.2.1
   apache-airflow-providers-ssh==3.3.0
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _CPU: 64 core
   Mem: 256 GB
   worker_autoscale: 1024, 256_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on issue #29557: Execution Timeout is not working properly on airflow 2.5.0

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on issue #29557:
URL: https://github.com/apache/airflow/issues/29557#issuecomment-1436106024

   Airflow raises `AirflowTaskTimeout` exception when the task timed out, and your code can catch this exception and handle it if needed.
   
   Since you have `[2023-02-15, 20:58:20 IST] {timeout.py:68} ERROR - Process timed out, PID: 11392` in the log, the timeout exception is raised, but it seems like your code is stuck in one of the finally blocks, where python calls finally before raising the exception. I cannot check whats wrong with your operator, but I can recommend some steps to debug the problem.
   
   First, here is a simple exemple:
   ```python
   import datetime
   import time
   from typing import Any
   
   import pendulum
   
   from airflow.models import BaseOperator
   from airflow.models.dag import dag
   from airflow.utils.context import Context
   
   
   class MyOperator(BaseOperator):
       def execute(self, context: Context) -> Any:
           try:
               print("try")
               time.sleep(120)
           except Exception as e:
               print(e)
               raise e
           finally:
               print("finally")
               time.sleep(120)
   
   
   @dag(
       schedule=None,
       start_date=pendulum.yesterday(),
   )
   def timeout_dag():
       MyOperator(task_id="test_task", execution_timeout=datetime.timedelta(seconds=30))
   
   timeout_dag()
   ```
   And here is the log
   ```
   [2023-02-19, 21:27:29 UTC] {logging_mixin.py:149} INFO - try
   [2023-02-19, 21:27:59 UTC] {timeout.py:68} ERROR - Process timed out, PID: 98045
   [2023-02-19, 21:27:59 UTC] {logging_mixin.py:149} INFO - Timeout, PID: 98045
   [2023-02-19, 21:27:59 UTC] {logging_mixin.py:149} INFO - finally
   [2023-02-19, 21:29:59 UTC] {taskinstance.py:1837} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/files/dags/dag18.py", line 19, in execute
       raise e
     File "/files/dags/dag18.py", line 16, in execute
       time.sleep(120)
     File "/opt/airflow/airflow/utils/timeout.py", line 69, in handle_timeout
       raise AirflowTaskTimeout(self.error_message)
   airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 98045
   ```
   To detect the problem, you can create a simple test to execute your operator in the IDE in debug mode, and use breakpoints at each step to follow the call stack. If you find this complicated, you can add a `print` after each line and read the log from the UI to find where the task is stuck.
   
   Also if you have some special actions to do when the task is timed out, you can add this in a new except block:
   ```python
   except AirflowTaskTimeout as timeout_exception:
       do_something()
       raise timeout_exception
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal closed issue #29557: Execution Timeout is not working properly on airflow 2.5.0

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal closed issue #29557: Execution Timeout is not working properly on airflow 2.5.0
URL: https://github.com/apache/airflow/issues/29557


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #29557: Execution Timeout is not working properly on airflow 2.5.0

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on issue #29557:
URL: https://github.com/apache/airflow/issues/29557#issuecomment-1431913769

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org