You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Valeriys Soloviov (Jira)" <ji...@apache.org> on 2019/11/25 13:26:00 UTC

[jira] [Updated] (AIRFLOW-6061) xcom was not created for a task

     [ https://issues.apache.org/jira/browse/AIRFLOW-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Valeriys Soloviov updated AIRFLOW-6061:
---------------------------------------
    Summary: xcom was not created for a task  (was: xcom is not created)

> xcom was not created for a task
> -------------------------------
>
>                 Key: AIRFLOW-6061
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6061
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: xcom
>    Affects Versions: 1.10.3
>            Reporter: Valeriys Soloviov
>            Priority: Major
>         Attachments: image-2019-11-25-15-09-52-646.png, image-2019-11-25-15-24-20-610.png
>
>
> I faced a strange bug:
> We are using:
>  * Airflow *1.10.3* [all_dbs,crypto,async,jdbc,hdfs,hive,ldap,mssql,mysql,password.s3.slack,gcp,google_auth,stats]
>  * mysql: 5.7.22 
> {code:java}
> mysql> select @@version;
> +------------+
> | @@version  |
> +------------+
> | 5.7.22-log |
> +------------+
> {code}
>  ** LocalExecuter*
> I have a DAG with one main task that sends to EMR steps via EmrCreateJobFlowOperator
> and that 12 tasks (EmrJobFlowSensor) which wait for  completion and that run 12 more steps and that read anyway(trigger_rule=TriggerRule.ALL_DONE) from S3 the log of the EMR   
> !image-2019-11-25-15-09-52-646.png!
> For some reason I got 3/12 task failed   and no xcom written
>   !image-2019-11-25-15-24-20-610.png!
> The log
> {noformat}
> -------------------------------------------------------------------------------- 
> [2019-11-25 11:09:18,434] {__init__.py:1354} INFO - Starting attempt 3 of 3 
> [2019-11-25 11:09:18,434] {__init__.py:1355} INFO - -------------------------------------------------------------------------------- 
> [2019-11-25 11:09:18,448] {__init__.py:1374} INFO - Executing <Task(SparkStepOperator): stage_cs_ext_api_myheritage_tickets_volume> on 2019-11-25T09:46:10.998426+00:00 
> [2019-11-25 11:09:18,449] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'api', 'api_myheritage_tickets_volume', '2019-11-25T09:46:10.998426+00:00', '--job_id', '98486', '--raw', '-sd', 'DAGS_FOLDER/api_export.py', '--cfg_path', '/tmp/tmp4tfkxfsq'] 
> [2019-11-25 11:09:18,963] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume 
> [2019-11-25 11:09:18,963] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=26857 
> [2019-11-25 11:09:19,929] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume 
> [2019-11-25 11:09:19,929] {__init__.py:51} INFO - Using executor LocalExecutor 
> [2019-11-25 11:09:20,091] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume 
> [2019-11-25 11:09:20,091] {__init__.py:305} INFO - Filling up the DagBag from /home/airflow/dags/export.py 
> [2019-11-25 11:09:20,219] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume 
> [2019-11-25 11:09:20,218] {cli.py:517} INFO - Running <TaskInstance: wap-cs-external-api.stage_cs_ext_api_myheritage_tickets_volume 2019-11-25T09:46:10.998426+00:00 [running]> on host airflow 
> [2019-11-25 11:09:20,312] {emr.py:348} INFO - Adding step [{'Name': 'stage_cs_ext_api_myheritage_tickets_volume', 'ActionOnFailure': 'CONTINUE', 'HadoopJarStep': {'Jar': 'command-runner.jar', 'Args': ['spark-submit', '--deploy-mode', 'client', 's3://releases/airflow/parquet_to_csv.py', 'prod', 'marketing', 'cs_ext_api_myheritage_tickets_volume', 's3://data/cs_ext_api_myheritage_tickets_volume.csv', 'false', 'true']}}] to the cluster [j-1UVOKFN0F1QC2] [2019-11-25 11:09:20,315] {logging_mixin.py:95} INFO - 
> [2019-11-25 11:09:20,315] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): elasticmapreduce.us-east-1.amazonaws.com 
> [2019-11-25 11:11:20,485] {logging_mixin.py:95} INFO - 
> [2019-11-25 11:11:20,484] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com 
> [2019-11-25 11:11:20,534] {__init__.py:1580} ERROR - Waiter StepComplete failed: Waiter encountered a terminal failure state
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/site-packages/airflow/models/__init__.py", line 1436, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python3.6/site-packages/flow/operators/emr.py", line 350, in execute
>     hook.wait_for_step_completion(cluster_id, step_id)
>   File "/usr/local/lib/python3.6/site-packages/wixflow/aws/emr_lib.py", line 55, in wait_for_step_completion
>     'MaxAttempts': 60
>   File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 53, in wait
>     Waiter.wait(self, **kwargs)
>   File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 323, in wait
>     last_response=response,
> botocore.exceptions.WaiterError: Waiter StepComplete failed: Waiter encountered a terminal failure state
> {noformat}
>   
>  flow/operators/emr.py
> {code:java}
> class SparkStepOperator(BaseOperator):
>     ui_color = '#cce6ff'
>     template_fields = ['app_args']
>     def __init__(self,
>                  step_name,
>                  source_path,
>                  deploy_mode='client',
>                  main_class=None,
>                  app_args: List[str] = None,
>                  spark_submit_args: List[str] = None,
>                  action_on_failure='CONTINUE',
>                  cluster_id=None,
>                  emr_conn_id=default_emr_conn_id,
>                  *args, **kwargs):
>         """
>         :param step_name: name of the step to submit
>         :param source_path: path to jar or python file on s3
>         :param action_on_failure: 'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE'
>         :param emr_conn_id: emr connection id
>         """
>         super(SparkStepOperator, self).__init__(task_id=step_name, *args, **kwargs)
>         self.emr_conn_id = emr_conn_id
>         self.cluster_id = cluster_id
>         self.action_on_failure = action_on_failure
>         self.step_name = step_name
>         self.source_path = source_path
>         self.deploy_mode = deploy_mode
>         self.spark_submit_args = spark_submit_args if spark_submit_args else []
>         self.app_args = app_args if app_args else []
>         self.main_class_args = ['--class', main_class] if main_class else []
>     def execute(self, context):
>         cluster_id = self.cluster_id if self.cluster_id else self.xcom_pull(context, task_ids='create_cluster')
>         hook = EmrHook(self.emr_conn_id)
>         args = ['spark-submit', '--deploy-mode', self.deploy_mode] + self.spark_submit_args + self.main_class_args + [
>             self.source_path] + self.app_args
>         step = {
>             'Name': self.step_name,
>             'ActionOnFailure': self.action_on_failure,
>             'HadoopJarStep': {
>                 'Jar': 'command-runner.jar',
>                 'Args': args
>             }
>         }
>         self.log.info(f"Adding step [{step}] to the cluster [{cluster_id}]")
>         step_id = hook.add_step(cluster_id, step)
>         hook.wait_for_step_completion(cluster_id, step_id)
>         self.log.info(f"Step [{step_id}] finished on the cluster [{cluster_id}]")
>         return step_id
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)