You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Valeriys Soloviov (Jira)" <ji...@apache.org> on 2019/11/25 13:26:00 UTC
[jira] [Updated] (AIRFLOW-6061) xcom was not created for a task
[ https://issues.apache.org/jira/browse/AIRFLOW-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Valeriys Soloviov updated AIRFLOW-6061:
---------------------------------------
Summary: xcom was not created for a task (was: xcom is not created)
> xcom was not created for a task
> -------------------------------
>
> Key: AIRFLOW-6061
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6061
> Project: Apache Airflow
> Issue Type: Bug
> Components: xcom
> Affects Versions: 1.10.3
> Reporter: Valeriys Soloviov
> Priority: Major
> Attachments: image-2019-11-25-15-09-52-646.png, image-2019-11-25-15-24-20-610.png
>
>
> I faced a strange bug:
> We are using:
> * Airflow *1.10.3* [all_dbs,crypto,async,jdbc,hdfs,hive,ldap,mssql,mysql,password.s3.slack,gcp,google_auth,stats]
> * mysql: 5.7.22
> {code:java}
> mysql> select @@version;
> +------------+
> | @@version |
> +------------+
> | 5.7.22-log |
> +------------+
> {code}
> ** LocalExecuter*
> I have a DAG with one main task that sends to EMR steps via EmrCreateJobFlowOperator
> and that 12 tasks (EmrJobFlowSensor) which wait for completion and that run 12 more steps and that read anyway(trigger_rule=TriggerRule.ALL_DONE) from S3 the log of the EMR
> !image-2019-11-25-15-09-52-646.png!
> For some reason I got 3/12 task failed and no xcom written
> !image-2019-11-25-15-24-20-610.png!
> The log
> {noformat}
> --------------------------------------------------------------------------------
> [2019-11-25 11:09:18,434] {__init__.py:1354} INFO - Starting attempt 3 of 3
> [2019-11-25 11:09:18,434] {__init__.py:1355} INFO - --------------------------------------------------------------------------------
> [2019-11-25 11:09:18,448] {__init__.py:1374} INFO - Executing <Task(SparkStepOperator): stage_cs_ext_api_myheritage_tickets_volume> on 2019-11-25T09:46:10.998426+00:00
> [2019-11-25 11:09:18,449] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'api', 'api_myheritage_tickets_volume', '2019-11-25T09:46:10.998426+00:00', '--job_id', '98486', '--raw', '-sd', 'DAGS_FOLDER/api_export.py', '--cfg_path', '/tmp/tmp4tfkxfsq']
> [2019-11-25 11:09:18,963] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume
> [2019-11-25 11:09:18,963] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=26857
> [2019-11-25 11:09:19,929] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume
> [2019-11-25 11:09:19,929] {__init__.py:51} INFO - Using executor LocalExecutor
> [2019-11-25 11:09:20,091] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume
> [2019-11-25 11:09:20,091] {__init__.py:305} INFO - Filling up the DagBag from /home/airflow/dags/export.py
> [2019-11-25 11:09:20,219] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume
> [2019-11-25 11:09:20,218] {cli.py:517} INFO - Running <TaskInstance: wap-cs-external-api.stage_cs_ext_api_myheritage_tickets_volume 2019-11-25T09:46:10.998426+00:00 [running]> on host airflow
> [2019-11-25 11:09:20,312] {emr.py:348} INFO - Adding step [{'Name': 'stage_cs_ext_api_myheritage_tickets_volume', 'ActionOnFailure': 'CONTINUE', 'HadoopJarStep': {'Jar': 'command-runner.jar', 'Args': ['spark-submit', '--deploy-mode', 'client', 's3://releases/airflow/parquet_to_csv.py', 'prod', 'marketing', 'cs_ext_api_myheritage_tickets_volume', 's3://data/cs_ext_api_myheritage_tickets_volume.csv', 'false', 'true']}}] to the cluster [j-1UVOKFN0F1QC2] [2019-11-25 11:09:20,315] {logging_mixin.py:95} INFO -
> [2019-11-25 11:09:20,315] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): elasticmapreduce.us-east-1.amazonaws.com
> [2019-11-25 11:11:20,485] {logging_mixin.py:95} INFO -
> [2019-11-25 11:11:20,484] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
> [2019-11-25 11:11:20,534] {__init__.py:1580} ERROR - Waiter StepComplete failed: Waiter encountered a terminal failure state
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/models/__init__.py", line 1436, in _run_raw_task
> result = task_copy.execute(context=context)
> File "/usr/local/lib/python3.6/site-packages/flow/operators/emr.py", line 350, in execute
> hook.wait_for_step_completion(cluster_id, step_id)
> File "/usr/local/lib/python3.6/site-packages/wixflow/aws/emr_lib.py", line 55, in wait_for_step_completion
> 'MaxAttempts': 60
> File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 53, in wait
> Waiter.wait(self, **kwargs)
> File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 323, in wait
> last_response=response,
> botocore.exceptions.WaiterError: Waiter StepComplete failed: Waiter encountered a terminal failure state
> {noformat}
>
> flow/operators/emr.py
> {code:java}
> class SparkStepOperator(BaseOperator):
> ui_color = '#cce6ff'
> template_fields = ['app_args']
> def __init__(self,
> step_name,
> source_path,
> deploy_mode='client',
> main_class=None,
> app_args: List[str] = None,
> spark_submit_args: List[str] = None,
> action_on_failure='CONTINUE',
> cluster_id=None,
> emr_conn_id=default_emr_conn_id,
> *args, **kwargs):
> """
> :param step_name: name of the step to submit
> :param source_path: path to jar or python file on s3
> :param action_on_failure: 'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE'
> :param emr_conn_id: emr connection id
> """
> super(SparkStepOperator, self).__init__(task_id=step_name, *args, **kwargs)
> self.emr_conn_id = emr_conn_id
> self.cluster_id = cluster_id
> self.action_on_failure = action_on_failure
> self.step_name = step_name
> self.source_path = source_path
> self.deploy_mode = deploy_mode
> self.spark_submit_args = spark_submit_args if spark_submit_args else []
> self.app_args = app_args if app_args else []
> self.main_class_args = ['--class', main_class] if main_class else []
> def execute(self, context):
> cluster_id = self.cluster_id if self.cluster_id else self.xcom_pull(context, task_ids='create_cluster')
> hook = EmrHook(self.emr_conn_id)
> args = ['spark-submit', '--deploy-mode', self.deploy_mode] + self.spark_submit_args + self.main_class_args + [
> self.source_path] + self.app_args
> step = {
> 'Name': self.step_name,
> 'ActionOnFailure': self.action_on_failure,
> 'HadoopJarStep': {
> 'Jar': 'command-runner.jar',
> 'Args': args
> }
> }
> self.log.info(f"Adding step [{step}] to the cluster [{cluster_id}]")
> step_id = hook.add_step(cluster_id, step)
> hook.wait_for_step_completion(cluster_id, step_id)
> self.log.info(f"Step [{step_id}] finished on the cluster [{cluster_id}]")
> return step_id
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)