You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Valeriys Soloviov (Jira)" <ji...@apache.org> on 2019/11/25 13:26:00 UTC
[jira] [Created] (AIRFLOW-6061) xcom is not created
Valeriys Soloviov created AIRFLOW-6061:
------------------------------------------
Summary: xcom is not created
Key: AIRFLOW-6061
URL: https://issues.apache.org/jira/browse/AIRFLOW-6061
Project: Apache Airflow
Issue Type: Bug
Components: xcom
Affects Versions: 1.10.3
Reporter: Valeriys Soloviov
Attachments: image-2019-11-25-15-09-52-646.png, image-2019-11-25-15-24-20-610.png
I faced a strange bug:
We are using:
* Airflow *1.10.3* [all_dbs,crypto,async,jdbc,hdfs,hive,ldap,mssql,mysql,password.s3.slack,gcp,google_auth,stats]
* mysql: 5.7.22
{code:java}
mysql> select @@version;
+------------+
| @@version |
+------------+
| 5.7.22-log |
+------------+
{code}
** LocalExecuter*
I have a DAG with one main task that sends to EMR steps via EmrCreateJobFlowOperator
and that 12 tasks (EmrJobFlowSensor) which wait for completion and that run 12 more steps and that read anyway(trigger_rule=TriggerRule.ALL_DONE) from S3 the log of the EMR
!image-2019-11-25-15-09-52-646.png!
For some reason I got 3/12 task failed and no xcom written
!image-2019-11-25-15-24-20-610.png!
The log
{noformat}
--------------------------------------------------------------------------------
[2019-11-25 11:09:18,434] {__init__.py:1354} INFO - Starting attempt 3 of 3
[2019-11-25 11:09:18,434] {__init__.py:1355} INFO - --------------------------------------------------------------------------------
[2019-11-25 11:09:18,448] {__init__.py:1374} INFO - Executing <Task(SparkStepOperator): stage_cs_ext_api_myheritage_tickets_volume> on 2019-11-25T09:46:10.998426+00:00
[2019-11-25 11:09:18,449] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'api', 'api_myheritage_tickets_volume', '2019-11-25T09:46:10.998426+00:00', '--job_id', '98486', '--raw', '-sd', 'DAGS_FOLDER/api_export.py', '--cfg_path', '/tmp/tmp4tfkxfsq']
[2019-11-25 11:09:18,963] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume
[2019-11-25 11:09:18,963] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=26857
[2019-11-25 11:09:19,929] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume
[2019-11-25 11:09:19,929] {__init__.py:51} INFO - Using executor LocalExecutor
[2019-11-25 11:09:20,091] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume
[2019-11-25 11:09:20,091] {__init__.py:305} INFO - Filling up the DagBag from /home/airflow/dags/export.py
[2019-11-25 11:09:20,219] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume
[2019-11-25 11:09:20,218] {cli.py:517} INFO - Running <TaskInstance: wap-cs-external-api.stage_cs_ext_api_myheritage_tickets_volume 2019-11-25T09:46:10.998426+00:00 [running]> on host airflow
[2019-11-25 11:09:20,312] {emr.py:348} INFO - Adding step [{'Name': 'stage_cs_ext_api_myheritage_tickets_volume', 'ActionOnFailure': 'CONTINUE', 'HadoopJarStep': {'Jar': 'command-runner.jar', 'Args': ['spark-submit', '--deploy-mode', 'client', 's3://releases/airflow/parquet_to_csv.py', 'prod', 'marketing', 'cs_ext_api_myheritage_tickets_volume', 's3://data/cs_ext_api_myheritage_tickets_volume.csv', 'false', 'true']}}] to the cluster [j-1UVOKFN0F1QC2] [2019-11-25 11:09:20,315] {logging_mixin.py:95} INFO -
[2019-11-25 11:09:20,315] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): elasticmapreduce.us-east-1.amazonaws.com
[2019-11-25 11:11:20,485] {logging_mixin.py:95} INFO -
[2019-11-25 11:11:20,484] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2019-11-25 11:11:20,534] {__init__.py:1580} ERROR - Waiter StepComplete failed: Waiter encountered a terminal failure state
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/__init__.py", line 1436, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/flow/operators/emr.py", line 350, in execute
hook.wait_for_step_completion(cluster_id, step_id)
File "/usr/local/lib/python3.6/site-packages/wixflow/aws/emr_lib.py", line 55, in wait_for_step_completion
'MaxAttempts': 60
File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 53, in wait
Waiter.wait(self, **kwargs)
File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 323, in wait
last_response=response,
botocore.exceptions.WaiterError: Waiter StepComplete failed: Waiter encountered a terminal failure state
{noformat}
flow/operators/emr.py
{code:java}
class SparkStepOperator(BaseOperator):
ui_color = '#cce6ff'
template_fields = ['app_args']
def __init__(self,
step_name,
source_path,
deploy_mode='client',
main_class=None,
app_args: List[str] = None,
spark_submit_args: List[str] = None,
action_on_failure='CONTINUE',
cluster_id=None,
emr_conn_id=default_emr_conn_id,
*args, **kwargs):
"""
:param step_name: name of the step to submit
:param source_path: path to jar or python file on s3
:param action_on_failure: 'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE'
:param emr_conn_id: emr connection id
"""
super(SparkStepOperator, self).__init__(task_id=step_name, *args, **kwargs)
self.emr_conn_id = emr_conn_id
self.cluster_id = cluster_id
self.action_on_failure = action_on_failure
self.step_name = step_name
self.source_path = source_path
self.deploy_mode = deploy_mode
self.spark_submit_args = spark_submit_args if spark_submit_args else []
self.app_args = app_args if app_args else []
self.main_class_args = ['--class', main_class] if main_class else []
def execute(self, context):
cluster_id = self.cluster_id if self.cluster_id else self.xcom_pull(context, task_ids='create_cluster')
hook = EmrHook(self.emr_conn_id)
args = ['spark-submit', '--deploy-mode', self.deploy_mode] + self.spark_submit_args + self.main_class_args + [
self.source_path] + self.app_args
step = {
'Name': self.step_name,
'ActionOnFailure': self.action_on_failure,
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': args
}
}
self.log.info(f"Adding step [{step}] to the cluster [{cluster_id}]")
step_id = hook.add_step(cluster_id, step)
hook.wait_for_step_completion(cluster_id, step_id)
self.log.info(f"Step [{step_id}] finished on the cluster [{cluster_id}]")
return step_id
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)