You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/15 08:47:43 UTC

[GitHub] [airflow] linyfei commented on issue #13322: Backfill crashes with "KeyError: TaskInstanceKey" when task has retries

linyfei commented on issue #13322:
URL: https://github.com/apache/airflow/issues/13322#issuecomment-820243081


   I got the same error.Let me explain my workflow : I submitted the airflow job with DebugExecutor on my mac and submitted it to Amazon EMR.
   The weird thing is that my airflow job(all steps) has been submitted successfully, and it was confirmed to run successfully on Amazon EMR.
   
   I tried to find google for relevant informations , but didn't get any things. 
   
   I sincerely hope that this problem will be taken seriously and resolved as soon as possible.Thanks a lot!
   
   Here is my error:
   `Traceback (most recent call last):
     File "/Users/anker/PycharmProjects/Airflow2.0_test/dags/EMRS3FileRepartitionTest.py", line 82, in <module>
       dag.run()
     File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/models/dag.py", line 1706, in run
       job.run()
     File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 237, in run
       self._execute()
     File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 805, in _execute
       session=session,
     File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
       return func(*args, **kwargs)
     File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 727, in _execute_for_run_dates
       session=session,
     File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
       return func(*args, **kwargs)
     File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 620, in _process_backfill_task_instances
       self._update_counters(ti_status=ti_status)
     File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 231, in _update_counters
       ti_status.running.pop(reduced_key)
   KeyError: TaskInstanceKey(dag_id='emr_sync_ea_apply_l_dag', task_id='watch_steps', execution_date=datetime.datetime(2021, 4, 10, 0, 0, tzinfo=Timezone('UTC')), try_number=1)
   `
   
   Here is my Code:
   `from airflow import DAG
   from datetime import timedelta
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
   from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
   from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
   from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
   
   default_args = {
       'owner': 'stella',
       'depends_on_past': True,
       'wait_for_downstream': True,
       'email': ['stella.wu@**.com'],
       'email_on_failure': False,
       'email_on_retry': False,
       'reties': 1,
       'retry_delay': timedelta(minutes=1),
   }
   
   Job_Flow_Overrides = {
       'Name': 'apply_2_s_dag',
   }
   with DAG(
       dag_id='apply_2_s_dag',
       default_args=default_args,
       catchup=True,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(5),
       # schedule_interval='@once',
   ) as dag:
       PYSPARK_STEPS = [
           {
               'Name': 'sync_apply_2s',
               'ActionOnFailure': 'TERMINATE_CLUSTER',
               'HadoopJarStep': {
                   'Jar': 'command-runner.jar',
                   'Args': ['spark-submit',
                            's3://******/apply_2_s.py',
                            's3://******-partitioned',
                            's3://*****-dev',
                            '/*****/apply_s/',
                            '{{ task_instance.execution_date }}'],
               }
           },
       ]
       # create job flow at first
       cluster_creator = EmrCreateJobFlowOperator(
           task_id='create_job_flow',
           aws_conn_id='aws_default',
           emr_conn_id='emr_default',
           job_flow_overrides=Job_Flow_Overrides,
       )
   
       step_adder = EmrAddStepsOperator(
           task_id='add_steps',
           job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',key='return_value') }}",
           aws_conn_id='aws_default',
           steps=PYSPARK_STEPS,
   
       )
       # max steps is 256 could be add
   
       # Asks for the state of the step until it reaches a terminal state. If it fails the sensor errors, failing the task
       step_checker = EmrStepSensor(
           task_id='watch_steps',
           job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
           step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
           aws_conn_id='aws_default',
       )
   
       # finally terminate job flow with emr
       cluster_remover = EmrTerminateJobFlowOperator(
           task_id='remove_cluster',
           job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
           aws_conn_id='aws_default',
       )
   
       cluster_creator >> step_adder >> step_checker >> cluster_remover
   
       if __name__ == '__main__':
           dag.clear()
           dag.run()
   `
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org