You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "oomsb (JIRA)" <ji...@apache.org> on 2019/01/29 07:42:00 UTC

[jira] [Created] (AIRFLOW-3781) job with one_failed trigger executed unwanted

oomsb created AIRFLOW-3781:
------------------------------

             Summary: job with one_failed trigger executed unwanted
                 Key: AIRFLOW-3781
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3781
             Project: Apache Airflow
          Issue Type: Bug
          Components: DagRun
    Affects Versions: 1.10.0
            Reporter: oomsb
         Attachments: airflow_one_failed_bug.png

I have several jobs with a trigger='one_failed'. If a process failes, only the direct downstream job with a 'one_failed' trigger should be executed. What I see is that all the downstream jobs, with no direct connection to it, and with a one_failed trigger_rule are executed. I think this is a bug.

As in my example. Job with name spark_tile_feeder failes and should trigger the 'set_status_seed_error' job only.

Job set_status_monitor_error and set_status_pyramid_error is wrongly executed too.

Is it a bug, or did I do something wrong?

 
{code:java}
 # -*- coding: utf-8 -*-

import airflow
import os
import datetime
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator

product_types = [ 
#1KM
                  "type1"
                ]

###################
# Extra functions #
###################

def get_date_now_formatted():
    d=datetime.datetime.now()
    return d.strftime("%Y-%m-%d %H:%M:%S")

def get_settings(layer, seed_date):
    f=open(PRESETS_FILE,'r')
    lines=f.readlines()
    f.close()
    settings={}
    for line in lines:
        if line.find(layer)>=0:
            lst=line.strip().split(',')
            settings = {
                'preset'  : lst[0],
                'level'   : "%02d" %  int(lst[2]),
                'zoom'    : "--zoom-from " + lst[2] + " --zoom-to " + lst[2],
                'folder'  : lst[4],
                'numexec' : lst[6],
                'minexec' : lst[7],
                'maxexec' : lst[8],
                'memexec' : lst[9],
                'startdate': "" 
                }
            preserveDate = lst[10]
            if preserveDate != 'true':
                settings['startdate']=seed_date[0:10]
            settings['GEOSERVER_DATA_DIR'] = lst[5]
            settings['GEOWEBCACHE_CACHE_DIR'] = 'gwc'
    
            if layer.find('CGS') >=0:
                settings['SPARK_EXTRA_JAVA_OPTIONS']="-DGEOSERVER_DATA_DIR=" + settings['GEOSERVER_DATA_DIR'] + " -DGEOWEBCACHE_CACHE_DIR=" + settings['GEOWEBCACHE_CACHE_DIR'] + " -Dvito.pdfcatalog.rest=true -Dorg.geotools.coverage.jaiext.enabled=true -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Denabledebug=false"
            else:
                settings['SPARK_EXTRA_JAVA_OPTIONS']="-DGEOSERVER_DATA_DIR=" + settings['GEOSERVER_DATA_DIR'] + " -DGEOWEBCACHE_CACHE_DIR=" + settings['GEOWEBCACHE_CACHE_DIR'] + " -Dvito.pdfcatalog.rest=true -Dorg.geotools.coverage.jaiext.enabled=false -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Denabledebug=false"

    return settings


###################################
# start here
###############################
#create a dag for each producttype
dag={}
for product_type in product_types:
    dag_id = 'geovwr_seed_' + product_type
    #1) get settings to default_args dict
    default_args = {
        'owner': 'oomsb',
        'start_date': airflow.utils.dates.days_ago(2),
        'sparkTileFeeder' : 'hdfs:///tapworkflows/probavgeoviewerseed/lib/sparktilefeeder-1.5.41-shaded.jar',
        'java_options' :'',
        'max_tries' : 1,
        'dag_dir' : os.path.join(os.environ.get('AIRFLOW_HOME'),'dags',  'seeding'),
        }
    
    
    POSTGRES_CONN_ID='postgresqldmz3_seedings'
    PRESETS_FILE=os.path.join(default_args['dag_dir'], 'presets')
    default_args['seeding_script'] = os.path.join(default_args['dag_dir'], 'spark_submit.sh')
    default_args['pyramide_script'] = os.path.join(default_args['dag_dir'], 'generate_pyramide.sh')
    
    globals()[dag_id] = DAG(dag_id = dag_id, default_args=default_args,schedule_interval=None)
    print (dag)
    def get_parameters(**context):
        #read the rest parameters
        layer =  context['dag_run'].conf['producttype']
        seed_date = context['dag_run'].conf['seed_date']
        estimated_tile_nr = context['dag_run'].conf['estimated_tile_nr']
        seedid = context['dag_run'].conf['seed_id']
    
        #read presets
        settings = get_settings(layer, seed_date) 
    
        #push them to xcom to have access in other operators
        task_instance = context['task_instance']
        task_instance.xcom_push(key='seedid', value=seedid)
        task_instance.xcom_push(key='estimated_tile_nr', value=estimated_tile_nr)
        task_instance.xcom_push(key='layer', value=layer)
        task_instance.xcom_push(key='seed_date', value=seed_date)
        task_instance.xcom_push(key='folder', value=settings['folder'])
        task_instance.xcom_push(key='level', value=settings['level'])
        task_instance.xcom_push(key='numexec', value=settings['numexec'])
        task_instance.xcom_push(key='minexec', value=settings['minexec'])
        task_instance.xcom_push(key='maxexec', value=settings['maxexec'])
        task_instance.xcom_push(key='memexec', value=settings['memexec'])
        task_instance.xcom_push(key='SPARK_EXTRA_JAVA_OPTIONS', value=settings['SPARK_EXTRA_JAVA_OPTIONS'])
        task_instance.xcom_push(key='GEOSERVER_DATA_DIR', value=settings['GEOSERVER_DATA_DIR'])
        task_instance.xcom_push(key='zoom', value=settings['zoom'])
     
    def process_result(**context):
        #run conditional jobs, the jobname is returned in this procedure
        result = context['task_instance'].xcom_pull(task_ids='get_seed_result')
        estimated_tile_nr = context['task_instance'].xcom_pull(task_ids='get_parameters', key='estimated_tile_nr')
        print ("Seed result : " +str(result))
        print ("Limit value : " + str(estimated_tile_nr)) 
        
        #load job on the following condition
        #if int(result) < int(estimated_tile_nr):
        if int(result) < 1:
           print ('Number of seeded tiles is under the limit')
           print ('Stop seeding now')

           # call error2 with the all_succees_trigger
           return 'set_status_seed_error2'
        else:
           return 'set_status_pyramide'
    
    ###########################
    #Operators
    ###########################
    get_parameters_operator = PythonOperator(
        task_id="get_parameters",
        provide_context = True,
        python_callable=get_parameters,
        dag=globals()[dag_id])
    
    
    spark_build_pyramides = BashOperator(
        task_id='spark_build_pyramides',
        bash_command = 'bash ' + default_args['pyramide_script'] + " {{ task_instance.xcom_pull(task_ids='get_parameters', key='layer') }}" + " {{ task_instance.xcom_pull(task_ids='get_parameters',key='seed_date') }}", 
        dag=globals()[dag_id])
    
    get_seed_result = BashOperator(
        task_id='get_seed_result',
        bash_command = 'find ' + os.path.join( "{{ task_instance.xcom_pull(task_ids='get_parameters',key='folder') }}"  , "{{ task_instance.xcom_pull(task_ids='get_parameters',key='layer') }}", 'g', "{{ task_instance.xcom_pull(task_ids='get_parameters',key='seed_date') }}","{{ task_instance.xcom_pull(task_ids='get_parameters',key='level') }}") + ' -name *.png | wc -l',
        xcom_push = True,
        dag=globals()[dag_id])
    
    log_seed_result = PostgresOperator(task_id='log_seed_result',
        sql="update seeding_queue_tbl set status=4, result={{ task_instance.xcom_pull(task_ids='get_seed_result') }} where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters',key='seedid') }}" +";",
        postgres_conn_id=POSTGRES_CONN_ID,
        autocommit=True,
        database="cvbdmz3",
        trigger_rule='all_success',
        dag=globals()[dag_id])
    
    process_seed_result = BranchPythonOperator(
        task_id='process_seed_result',
        provide_context = True,
        #op_kwargs={ 'seed_limit_value' : default_args['min_seed_limit']},
        python_callable=process_result,
        trigger_rule="all_success",
        dag=globals()[dag_id])
    
    
    #            " " + "{{ task_instance.xcom_pull(task_ids='get_parameters') }}"[0] + \
    #            " " + "{{ task_instance.xcom_pull(task_ids='get_parameters') }}"[1] + " " +\
    spark_tile_feeder = BashOperator(
        task_id='spark_tile_feeder',
        bash_command = 'bash ' + default_args['seeding_script'] + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='layer') }}" + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='seed_date') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='memexec') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='numexec') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='minexec') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='maxexec') }}" + ' ' + \
                " '{{ task_instance.xcom_pull(task_ids='get_parameters', key='SPARK_EXTRA_JAVA_OPTIONS') }}'" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='GEOSERVER_DATA_DIR') }}" + ' ' + \
                " '{{ task_instance.xcom_pull(task_ids='get_parameters', key='zoom') }}'" ,
        dag=globals()[dag_id])
    
    
    ########################
    #SET POSTGRES STATUSSES
    #########################
    set_status_start_seeding = PostgresOperator(task_id='set_status_start_seeding',
                          sql="update seeding_queue_tbl set status=1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          #sql="update seeding_queue_tbl set status=1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_seed_error= PostgresOperator(task_id='set_status_seed_error',
                          sql="update seeding_queue_tbl set status=-1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='one_failed',
                          dag=globals()[dag_id])
    set_status_seed_error2= PostgresOperator(task_id='set_status_seed_error2',
                          sql="update seeding_queue_tbl set status=-1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_start_monitor= PostgresOperator(task_id='set_status_start_monitor',
                          sql="update seeding_queue_tbl set status=3 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_monitor_error= PostgresOperator(task_id='set_status_monitor_error',
                          sql="update seeding_queue_tbl set status=-3 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='one_failed',
                          dag=globals()[dag_id])
    
    set_status_pyramide = PostgresOperator(task_id='set_status_pyramide',
                          sql="update seeding_queue_tbl set status=2 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    set_status_done     = PostgresOperator(task_id='set_status_done',
                          sql="update seeding_queue_tbl set status=0, modification_date='" + get_date_now_formatted() + "' where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_pyramide_error= PostgresOperator(task_id='set_status_pyramide_error',
                          sql="update seeding_queue_tbl set status=-2 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='one_failed',
                          dag=globals()[dag_id])
    
    
    
    ####################
    ####### CHAINING ###
    ####################
    get_parameters_operator   >> set_status_start_seeding
    set_status_start_seeding  >> spark_tile_feeder
    
    spark_tile_feeder         >> set_status_start_monitor
    spark_tile_feeder         >> set_status_seed_error
    
    
    set_status_start_monitor  >> get_seed_result
    
    get_seed_result           >> set_status_monitor_error
    get_seed_result	      >> log_seed_result
    get_seed_result           >> process_seed_result
    
    process_seed_result       >> set_status_pyramide
    process_seed_result       >> set_status_seed_error2
    log_seed_result           >> set_status_pyramide
    
    set_status_pyramide       >> spark_build_pyramides
    
    spark_build_pyramides     >> set_status_done
    spark_build_pyramides     >> set_status_pyramide_error{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)