You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/15 08:47:43 UTC
[GitHub] [airflow] linyfei commented on issue #13322: Backfill crashes with "KeyError: TaskInstanceKey" when task has retries
linyfei commented on issue #13322:
URL: https://github.com/apache/airflow/issues/13322#issuecomment-820243081
I got the same error.Let me explain my workflow : I submitted the airflow job with DebugExecutor on my mac and submitted it to Amazon EMR.
The weird thing is that my airflow job(all steps) has been submitted successfully, and it was confirmed to run successfully on Amazon EMR.
I tried to find google for relevant informations , but didn't get any things.
I sincerely hope that this problem will be taken seriously and resolved as soon as possible.Thanks a lot!
Here is my error:
`Traceback (most recent call last):
File "/Users/anker/PycharmProjects/Airflow2.0_test/dags/EMRS3FileRepartitionTest.py", line 82, in <module>
dag.run()
File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/models/dag.py", line 1706, in run
job.run()
File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 237, in run
self._execute()
File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 805, in _execute
session=session,
File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 727, in _execute_for_run_dates
session=session,
File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 620, in _process_backfill_task_instances
self._update_counters(ti_status=ti_status)
File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 231, in _update_counters
ti_status.running.pop(reduced_key)
KeyError: TaskInstanceKey(dag_id='emr_sync_ea_apply_l_dag', task_id='watch_steps', execution_date=datetime.datetime(2021, 4, 10, 0, 0, tzinfo=Timezone('UTC')), try_number=1)
`
Here is my Code:
`from airflow import DAG
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
default_args = {
'owner': 'stella',
'depends_on_past': True,
'wait_for_downstream': True,
'email': ['stella.wu@**.com'],
'email_on_failure': False,
'email_on_retry': False,
'reties': 1,
'retry_delay': timedelta(minutes=1),
}
Job_Flow_Overrides = {
'Name': 'apply_2_s_dag',
}
with DAG(
dag_id='apply_2_s_dag',
default_args=default_args,
catchup=True,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(5),
# schedule_interval='@once',
) as dag:
PYSPARK_STEPS = [
{
'Name': 'sync_apply_2s',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
's3://******/apply_2_s.py',
's3://******-partitioned',
's3://*****-dev',
'/*****/apply_s/',
'{{ task_instance.execution_date }}'],
}
},
]
# create job flow at first
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
emr_conn_id='emr_default',
job_flow_overrides=Job_Flow_Overrides,
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',key='return_value') }}",
aws_conn_id='aws_default',
steps=PYSPARK_STEPS,
)
# max steps is 256 could be add
# Asks for the state of the step until it reaches a terminal state. If it fails the sensor errors, failing the task
step_checker = EmrStepSensor(
task_id='watch_steps',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
# finally terminate job flow with emr
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker >> cluster_remover
if __name__ == '__main__':
dag.clear()
dag.run()
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org