You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Valeriys Soloviov (Jira)" <ji...@apache.org> on 2019/11/25 13:26:00 UTC

[jira] [Created] (AIRFLOW-6061) xcom is not created

Valeriys Soloviov created AIRFLOW-6061:
------------------------------------------

             Summary: xcom is not created
                 Key: AIRFLOW-6061
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6061
             Project: Apache Airflow
          Issue Type: Bug
          Components: xcom
    Affects Versions: 1.10.3
            Reporter: Valeriys Soloviov
         Attachments: image-2019-11-25-15-09-52-646.png, image-2019-11-25-15-24-20-610.png

I faced a strange bug:

We are using:
 * Airflow *1.10.3* [all_dbs,crypto,async,jdbc,hdfs,hive,ldap,mssql,mysql,password.s3.slack,gcp,google_auth,stats]

 * mysql: 5.7.22 
{code:java}
mysql> select @@version;
+------------+
| @@version  |
+------------+
| 5.7.22-log |
+------------+
{code}

 ** LocalExecuter*

I have a DAG with one main task that sends to EMR steps via EmrCreateJobFlowOperator

and that 12 tasks (EmrJobFlowSensor) which wait for  completion and that run 12 more steps and that read anyway(trigger_rule=TriggerRule.ALL_DONE) from S3 the log of the EMR   

!image-2019-11-25-15-09-52-646.png!

For some reason I got 3/12 task failed   and no xcom written

  !image-2019-11-25-15-24-20-610.png!
The log
{noformat}

-------------------------------------------------------------------------------- 
[2019-11-25 11:09:18,434] {__init__.py:1354} INFO - Starting attempt 3 of 3 
[2019-11-25 11:09:18,434] {__init__.py:1355} INFO - -------------------------------------------------------------------------------- 
[2019-11-25 11:09:18,448] {__init__.py:1374} INFO - Executing <Task(SparkStepOperator): stage_cs_ext_api_myheritage_tickets_volume> on 2019-11-25T09:46:10.998426+00:00 
[2019-11-25 11:09:18,449] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'api', 'api_myheritage_tickets_volume', '2019-11-25T09:46:10.998426+00:00', '--job_id', '98486', '--raw', '-sd', 'DAGS_FOLDER/api_export.py', '--cfg_path', '/tmp/tmp4tfkxfsq'] 
[2019-11-25 11:09:18,963] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume 
[2019-11-25 11:09:18,963] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=26857 
[2019-11-25 11:09:19,929] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume 
[2019-11-25 11:09:19,929] {__init__.py:51} INFO - Using executor LocalExecutor 
[2019-11-25 11:09:20,091] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume 
[2019-11-25 11:09:20,091] {__init__.py:305} INFO - Filling up the DagBag from /home/airflow/dags/export.py 
[2019-11-25 11:09:20,219] {base_task_runner.py:101} INFO - Job 98486: Subtask stage_cs_ext_api_myheritage_tickets_volume 
[2019-11-25 11:09:20,218] {cli.py:517} INFO - Running <TaskInstance: wap-cs-external-api.stage_cs_ext_api_myheritage_tickets_volume 2019-11-25T09:46:10.998426+00:00 [running]> on host airflow 
[2019-11-25 11:09:20,312] {emr.py:348} INFO - Adding step [{'Name': 'stage_cs_ext_api_myheritage_tickets_volume', 'ActionOnFailure': 'CONTINUE', 'HadoopJarStep': {'Jar': 'command-runner.jar', 'Args': ['spark-submit', '--deploy-mode', 'client', 's3://releases/airflow/parquet_to_csv.py', 'prod', 'marketing', 'cs_ext_api_myheritage_tickets_volume', 's3://data/cs_ext_api_myheritage_tickets_volume.csv', 'false', 'true']}}] to the cluster [j-1UVOKFN0F1QC2] [2019-11-25 11:09:20,315] {logging_mixin.py:95} INFO - 
[2019-11-25 11:09:20,315] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): elasticmapreduce.us-east-1.amazonaws.com 
[2019-11-25 11:11:20,485] {logging_mixin.py:95} INFO - 
[2019-11-25 11:11:20,484] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com 
[2019-11-25 11:11:20,534] {__init__.py:1580} ERROR - Waiter StepComplete failed: Waiter encountered a terminal failure state
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/__init__.py", line 1436, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/flow/operators/emr.py", line 350, in execute
    hook.wait_for_step_completion(cluster_id, step_id)
  File "/usr/local/lib/python3.6/site-packages/wixflow/aws/emr_lib.py", line 55, in wait_for_step_completion
    'MaxAttempts': 60
  File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 53, in wait
    Waiter.wait(self, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 323, in wait
    last_response=response,
botocore.exceptions.WaiterError: Waiter StepComplete failed: Waiter encountered a terminal failure state
{noformat}
  
 flow/operators/emr.py
{code:java}
class SparkStepOperator(BaseOperator):
    ui_color = '#cce6ff'
    template_fields = ['app_args']

    def __init__(self,
                 step_name,
                 source_path,
                 deploy_mode='client',
                 main_class=None,
                 app_args: List[str] = None,
                 spark_submit_args: List[str] = None,
                 action_on_failure='CONTINUE',
                 cluster_id=None,
                 emr_conn_id=default_emr_conn_id,
                 *args, **kwargs):
        """
        :param step_name: name of the step to submit
        :param source_path: path to jar or python file on s3
        :param action_on_failure: 'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE'
        :param emr_conn_id: emr connection id
        """
        super(SparkStepOperator, self).__init__(task_id=step_name, *args, **kwargs)
        self.emr_conn_id = emr_conn_id
        self.cluster_id = cluster_id
        self.action_on_failure = action_on_failure
        self.step_name = step_name
        self.source_path = source_path
        self.deploy_mode = deploy_mode
        self.spark_submit_args = spark_submit_args if spark_submit_args else []
        self.app_args = app_args if app_args else []
        self.main_class_args = ['--class', main_class] if main_class else []

    def execute(self, context):
        cluster_id = self.cluster_id if self.cluster_id else self.xcom_pull(context, task_ids='create_cluster')
        hook = EmrHook(self.emr_conn_id)
        args = ['spark-submit', '--deploy-mode', self.deploy_mode] + self.spark_submit_args + self.main_class_args + [
            self.source_path] + self.app_args
        step = {
            'Name': self.step_name,
            'ActionOnFailure': self.action_on_failure,
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': args
            }
        }
        self.log.info(f"Adding step [{step}] to the cluster [{cluster_id}]")
        step_id = hook.add_step(cluster_id, step)
        hook.wait_for_step_completion(cluster_id, step_id)
        self.log.info(f"Step [{step_id}] finished on the cluster [{cluster_id}]")
        return step_id
{code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)