You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "oomsb (JIRA)" <ji...@apache.org> on 2019/01/29 07:43:00 UTC

[jira] [Updated] (AIRFLOW-3781) job with one_failed trigger executed unwanted

     [ https://issues.apache.org/jira/browse/AIRFLOW-3781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

oomsb updated AIRFLOW-3781:
---------------------------
    Description: 
I have several jobs with a trigger='one_failed'. If a process failes, only the direct downstream job with a 'one_failed' trigger should be executed. What I see is that all the downstream jobs, with no direct connection to it, and with a one_failed trigger_rule are executed. I think this is a bug.

As in my example, the job with name 'spark_tile_feeder' failes and should trigger the 'set_status_seed_error' job +only.+

Job set_status_monitor_error and set_status_pyramid_error is wrongly executed too.

Is it a bug, or did I do something wrong?

 
{code:java}
 # -*- coding: utf-8 -*-

import airflow
import os
import datetime
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator

product_types = [ 
#1KM
                  "type1"
                ]

###################
# Extra functions #
###################

def get_date_now_formatted():
    d=datetime.datetime.now()
    return d.strftime("%Y-%m-%d %H:%M:%S")

def get_settings(layer, seed_date):
    f=open(PRESETS_FILE,'r')
    lines=f.readlines()
    f.close()
    settings={}
    for line in lines:
        if line.find(layer)>=0:
            lst=line.strip().split(',')
            settings = {
                'preset'  : lst[0],
                'level'   : "%02d" %  int(lst[2]),
                'zoom'    : "--zoom-from " + lst[2] + " --zoom-to " + lst[2],
                'folder'  : lst[4],
                'numexec' : lst[6],
                'minexec' : lst[7],
                'maxexec' : lst[8],
                'memexec' : lst[9],
                'startdate': "" 
                }
            preserveDate = lst[10]
            if preserveDate != 'true':
                settings['startdate']=seed_date[0:10]
            settings['GEOSERVER_DATA_DIR'] = lst[5]
            settings['GEOWEBCACHE_CACHE_DIR'] = 'gwc'
    
            if layer.find('CGS') >=0:
                settings['SPARK_EXTRA_JAVA_OPTIONS']="-DGEOSERVER_DATA_DIR=" + settings['GEOSERVER_DATA_DIR'] + " -DGEOWEBCACHE_CACHE_DIR=" + settings['GEOWEBCACHE_CACHE_DIR'] + " -Dvito.pdfcatalog.rest=true -Dorg.geotools.coverage.jaiext.enabled=true -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Denabledebug=false"
            else:
                settings['SPARK_EXTRA_JAVA_OPTIONS']="-DGEOSERVER_DATA_DIR=" + settings['GEOSERVER_DATA_DIR'] + " -DGEOWEBCACHE_CACHE_DIR=" + settings['GEOWEBCACHE_CACHE_DIR'] + " -Dvito.pdfcatalog.rest=true -Dorg.geotools.coverage.jaiext.enabled=false -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Denabledebug=false"

    return settings


###################################
# start here
###############################
#create a dag for each producttype
dag={}
for product_type in product_types:
    dag_id = 'geovwr_seed_' + product_type
    #1) get settings to default_args dict
    default_args = {
        'owner': 'oomsb',
        'start_date': airflow.utils.dates.days_ago(2),
        'sparkTileFeeder' : 'hdfs:///tapworkflows/probavgeoviewerseed/lib/sparktilefeeder-1.5.41-shaded.jar',
        'java_options' :'',
        'max_tries' : 1,
        'dag_dir' : os.path.join(os.environ.get('AIRFLOW_HOME'),'dags',  'seeding'),
        }
    
    
    POSTGRES_CONN_ID='postgresqldmz3_seedings'
    PRESETS_FILE=os.path.join(default_args['dag_dir'], 'presets')
    default_args['seeding_script'] = os.path.join(default_args['dag_dir'], 'spark_submit.sh')
    default_args['pyramide_script'] = os.path.join(default_args['dag_dir'], 'generate_pyramide.sh')
    
    globals()[dag_id] = DAG(dag_id = dag_id, default_args=default_args,schedule_interval=None)
    print (dag)
    def get_parameters(**context):
        #read the rest parameters
        layer =  context['dag_run'].conf['producttype']
        seed_date = context['dag_run'].conf['seed_date']
        estimated_tile_nr = context['dag_run'].conf['estimated_tile_nr']
        seedid = context['dag_run'].conf['seed_id']
    
        #read presets
        settings = get_settings(layer, seed_date) 
    
        #push them to xcom to have access in other operators
        task_instance = context['task_instance']
        task_instance.xcom_push(key='seedid', value=seedid)
        task_instance.xcom_push(key='estimated_tile_nr', value=estimated_tile_nr)
        task_instance.xcom_push(key='layer', value=layer)
        task_instance.xcom_push(key='seed_date', value=seed_date)
        task_instance.xcom_push(key='folder', value=settings['folder'])
        task_instance.xcom_push(key='level', value=settings['level'])
        task_instance.xcom_push(key='numexec', value=settings['numexec'])
        task_instance.xcom_push(key='minexec', value=settings['minexec'])
        task_instance.xcom_push(key='maxexec', value=settings['maxexec'])
        task_instance.xcom_push(key='memexec', value=settings['memexec'])
        task_instance.xcom_push(key='SPARK_EXTRA_JAVA_OPTIONS', value=settings['SPARK_EXTRA_JAVA_OPTIONS'])
        task_instance.xcom_push(key='GEOSERVER_DATA_DIR', value=settings['GEOSERVER_DATA_DIR'])
        task_instance.xcom_push(key='zoom', value=settings['zoom'])
     
    def process_result(**context):
        #run conditional jobs, the jobname is returned in this procedure
        result = context['task_instance'].xcom_pull(task_ids='get_seed_result')
        estimated_tile_nr = context['task_instance'].xcom_pull(task_ids='get_parameters', key='estimated_tile_nr')
        print ("Seed result : " +str(result))
        print ("Limit value : " + str(estimated_tile_nr)) 
        
        #load job on the following condition
        #if int(result) < int(estimated_tile_nr):
        if int(result) < 1:
           print ('Number of seeded tiles is under the limit')
           print ('Stop seeding now')

           # call error2 with the all_succees_trigger
           return 'set_status_seed_error2'
        else:
           return 'set_status_pyramide'
    
    ###########################
    #Operators
    ###########################
    get_parameters_operator = PythonOperator(
        task_id="get_parameters",
        provide_context = True,
        python_callable=get_parameters,
        dag=globals()[dag_id])
    
    
    spark_build_pyramides = BashOperator(
        task_id='spark_build_pyramides',
        bash_command = 'bash ' + default_args['pyramide_script'] + " {{ task_instance.xcom_pull(task_ids='get_parameters', key='layer') }}" + " {{ task_instance.xcom_pull(task_ids='get_parameters',key='seed_date') }}", 
        dag=globals()[dag_id])
    
    get_seed_result = BashOperator(
        task_id='get_seed_result',
        bash_command = 'find ' + os.path.join( "{{ task_instance.xcom_pull(task_ids='get_parameters',key='folder') }}"  , "{{ task_instance.xcom_pull(task_ids='get_parameters',key='layer') }}", 'g', "{{ task_instance.xcom_pull(task_ids='get_parameters',key='seed_date') }}","{{ task_instance.xcom_pull(task_ids='get_parameters',key='level') }}") + ' -name *.png | wc -l',
        xcom_push = True,
        dag=globals()[dag_id])
    
    log_seed_result = PostgresOperator(task_id='log_seed_result',
        sql="update seeding_queue_tbl set status=4, result={{ task_instance.xcom_pull(task_ids='get_seed_result') }} where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters',key='seedid') }}" +";",
        postgres_conn_id=POSTGRES_CONN_ID,
        autocommit=True,
        database="cvbdmz3",
        trigger_rule='all_success',
        dag=globals()[dag_id])
    
    process_seed_result = BranchPythonOperator(
        task_id='process_seed_result',
        provide_context = True,
        #op_kwargs={ 'seed_limit_value' : default_args['min_seed_limit']},
        python_callable=process_result,
        trigger_rule="all_success",
        dag=globals()[dag_id])
    
    
    #            " " + "{{ task_instance.xcom_pull(task_ids='get_parameters') }}"[0] + \
    #            " " + "{{ task_instance.xcom_pull(task_ids='get_parameters') }}"[1] + " " +\
    spark_tile_feeder = BashOperator(
        task_id='spark_tile_feeder',
        bash_command = 'bash ' + default_args['seeding_script'] + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='layer') }}" + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='seed_date') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='memexec') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='numexec') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='minexec') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='maxexec') }}" + ' ' + \
                " '{{ task_instance.xcom_pull(task_ids='get_parameters', key='SPARK_EXTRA_JAVA_OPTIONS') }}'" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='GEOSERVER_DATA_DIR') }}" + ' ' + \
                " '{{ task_instance.xcom_pull(task_ids='get_parameters', key='zoom') }}'" ,
        dag=globals()[dag_id])
    
    
    ########################
    #SET POSTGRES STATUSSES
    #########################
    set_status_start_seeding = PostgresOperator(task_id='set_status_start_seeding',
                          sql="update seeding_queue_tbl set status=1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          #sql="update seeding_queue_tbl set status=1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_seed_error= PostgresOperator(task_id='set_status_seed_error',
                          sql="update seeding_queue_tbl set status=-1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='one_failed',
                          dag=globals()[dag_id])
    set_status_seed_error2= PostgresOperator(task_id='set_status_seed_error2',
                          sql="update seeding_queue_tbl set status=-1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_start_monitor= PostgresOperator(task_id='set_status_start_monitor',
                          sql="update seeding_queue_tbl set status=3 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_monitor_error= PostgresOperator(task_id='set_status_monitor_error',
                          sql="update seeding_queue_tbl set status=-3 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='one_failed',
                          dag=globals()[dag_id])
    
    set_status_pyramide = PostgresOperator(task_id='set_status_pyramide',
                          sql="update seeding_queue_tbl set status=2 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    set_status_done     = PostgresOperator(task_id='set_status_done',
                          sql="update seeding_queue_tbl set status=0, modification_date='" + get_date_now_formatted() + "' where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_pyramide_error= PostgresOperator(task_id='set_status_pyramide_error',
                          sql="update seeding_queue_tbl set status=-2 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='one_failed',
                          dag=globals()[dag_id])
    
    
    
    ####################
    ####### CHAINING ###
    ####################
    get_parameters_operator   >> set_status_start_seeding
    set_status_start_seeding  >> spark_tile_feeder
    
    spark_tile_feeder         >> set_status_start_monitor
    spark_tile_feeder         >> set_status_seed_error
    
    
    set_status_start_monitor  >> get_seed_result
    
    get_seed_result           >> set_status_monitor_error
    get_seed_result	      >> log_seed_result
    get_seed_result           >> process_seed_result
    
    process_seed_result       >> set_status_pyramide
    process_seed_result       >> set_status_seed_error2
    log_seed_result           >> set_status_pyramide
    
    set_status_pyramide       >> spark_build_pyramides
    
    spark_build_pyramides     >> set_status_done
    spark_build_pyramides     >> set_status_pyramide_error{code}

  was:
I have several jobs with a trigger='one_failed'. If a process failes, only the direct downstream job with a 'one_failed' trigger should be executed. What I see is that all the downstream jobs, with no direct connection to it, and with a one_failed trigger_rule are executed. I think this is a bug.

As in my example. Job with name spark_tile_feeder failes and should trigger the 'set_status_seed_error' job only.

Job set_status_monitor_error and set_status_pyramid_error is wrongly executed too.

Is it a bug, or did I do something wrong?

 
{code:java}
 # -*- coding: utf-8 -*-

import airflow
import os
import datetime
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator

product_types = [ 
#1KM
                  "type1"
                ]

###################
# Extra functions #
###################

def get_date_now_formatted():
    d=datetime.datetime.now()
    return d.strftime("%Y-%m-%d %H:%M:%S")

def get_settings(layer, seed_date):
    f=open(PRESETS_FILE,'r')
    lines=f.readlines()
    f.close()
    settings={}
    for line in lines:
        if line.find(layer)>=0:
            lst=line.strip().split(',')
            settings = {
                'preset'  : lst[0],
                'level'   : "%02d" %  int(lst[2]),
                'zoom'    : "--zoom-from " + lst[2] + " --zoom-to " + lst[2],
                'folder'  : lst[4],
                'numexec' : lst[6],
                'minexec' : lst[7],
                'maxexec' : lst[8],
                'memexec' : lst[9],
                'startdate': "" 
                }
            preserveDate = lst[10]
            if preserveDate != 'true':
                settings['startdate']=seed_date[0:10]
            settings['GEOSERVER_DATA_DIR'] = lst[5]
            settings['GEOWEBCACHE_CACHE_DIR'] = 'gwc'
    
            if layer.find('CGS') >=0:
                settings['SPARK_EXTRA_JAVA_OPTIONS']="-DGEOSERVER_DATA_DIR=" + settings['GEOSERVER_DATA_DIR'] + " -DGEOWEBCACHE_CACHE_DIR=" + settings['GEOWEBCACHE_CACHE_DIR'] + " -Dvito.pdfcatalog.rest=true -Dorg.geotools.coverage.jaiext.enabled=true -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Denabledebug=false"
            else:
                settings['SPARK_EXTRA_JAVA_OPTIONS']="-DGEOSERVER_DATA_DIR=" + settings['GEOSERVER_DATA_DIR'] + " -DGEOWEBCACHE_CACHE_DIR=" + settings['GEOWEBCACHE_CACHE_DIR'] + " -Dvito.pdfcatalog.rest=true -Dorg.geotools.coverage.jaiext.enabled=false -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Denabledebug=false"

    return settings


###################################
# start here
###############################
#create a dag for each producttype
dag={}
for product_type in product_types:
    dag_id = 'geovwr_seed_' + product_type
    #1) get settings to default_args dict
    default_args = {
        'owner': 'oomsb',
        'start_date': airflow.utils.dates.days_ago(2),
        'sparkTileFeeder' : 'hdfs:///tapworkflows/probavgeoviewerseed/lib/sparktilefeeder-1.5.41-shaded.jar',
        'java_options' :'',
        'max_tries' : 1,
        'dag_dir' : os.path.join(os.environ.get('AIRFLOW_HOME'),'dags',  'seeding'),
        }
    
    
    POSTGRES_CONN_ID='postgresqldmz3_seedings'
    PRESETS_FILE=os.path.join(default_args['dag_dir'], 'presets')
    default_args['seeding_script'] = os.path.join(default_args['dag_dir'], 'spark_submit.sh')
    default_args['pyramide_script'] = os.path.join(default_args['dag_dir'], 'generate_pyramide.sh')
    
    globals()[dag_id] = DAG(dag_id = dag_id, default_args=default_args,schedule_interval=None)
    print (dag)
    def get_parameters(**context):
        #read the rest parameters
        layer =  context['dag_run'].conf['producttype']
        seed_date = context['dag_run'].conf['seed_date']
        estimated_tile_nr = context['dag_run'].conf['estimated_tile_nr']
        seedid = context['dag_run'].conf['seed_id']
    
        #read presets
        settings = get_settings(layer, seed_date) 
    
        #push them to xcom to have access in other operators
        task_instance = context['task_instance']
        task_instance.xcom_push(key='seedid', value=seedid)
        task_instance.xcom_push(key='estimated_tile_nr', value=estimated_tile_nr)
        task_instance.xcom_push(key='layer', value=layer)
        task_instance.xcom_push(key='seed_date', value=seed_date)
        task_instance.xcom_push(key='folder', value=settings['folder'])
        task_instance.xcom_push(key='level', value=settings['level'])
        task_instance.xcom_push(key='numexec', value=settings['numexec'])
        task_instance.xcom_push(key='minexec', value=settings['minexec'])
        task_instance.xcom_push(key='maxexec', value=settings['maxexec'])
        task_instance.xcom_push(key='memexec', value=settings['memexec'])
        task_instance.xcom_push(key='SPARK_EXTRA_JAVA_OPTIONS', value=settings['SPARK_EXTRA_JAVA_OPTIONS'])
        task_instance.xcom_push(key='GEOSERVER_DATA_DIR', value=settings['GEOSERVER_DATA_DIR'])
        task_instance.xcom_push(key='zoom', value=settings['zoom'])
     
    def process_result(**context):
        #run conditional jobs, the jobname is returned in this procedure
        result = context['task_instance'].xcom_pull(task_ids='get_seed_result')
        estimated_tile_nr = context['task_instance'].xcom_pull(task_ids='get_parameters', key='estimated_tile_nr')
        print ("Seed result : " +str(result))
        print ("Limit value : " + str(estimated_tile_nr)) 
        
        #load job on the following condition
        #if int(result) < int(estimated_tile_nr):
        if int(result) < 1:
           print ('Number of seeded tiles is under the limit')
           print ('Stop seeding now')

           # call error2 with the all_succees_trigger
           return 'set_status_seed_error2'
        else:
           return 'set_status_pyramide'
    
    ###########################
    #Operators
    ###########################
    get_parameters_operator = PythonOperator(
        task_id="get_parameters",
        provide_context = True,
        python_callable=get_parameters,
        dag=globals()[dag_id])
    
    
    spark_build_pyramides = BashOperator(
        task_id='spark_build_pyramides',
        bash_command = 'bash ' + default_args['pyramide_script'] + " {{ task_instance.xcom_pull(task_ids='get_parameters', key='layer') }}" + " {{ task_instance.xcom_pull(task_ids='get_parameters',key='seed_date') }}", 
        dag=globals()[dag_id])
    
    get_seed_result = BashOperator(
        task_id='get_seed_result',
        bash_command = 'find ' + os.path.join( "{{ task_instance.xcom_pull(task_ids='get_parameters',key='folder') }}"  , "{{ task_instance.xcom_pull(task_ids='get_parameters',key='layer') }}", 'g', "{{ task_instance.xcom_pull(task_ids='get_parameters',key='seed_date') }}","{{ task_instance.xcom_pull(task_ids='get_parameters',key='level') }}") + ' -name *.png | wc -l',
        xcom_push = True,
        dag=globals()[dag_id])
    
    log_seed_result = PostgresOperator(task_id='log_seed_result',
        sql="update seeding_queue_tbl set status=4, result={{ task_instance.xcom_pull(task_ids='get_seed_result') }} where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters',key='seedid') }}" +";",
        postgres_conn_id=POSTGRES_CONN_ID,
        autocommit=True,
        database="cvbdmz3",
        trigger_rule='all_success',
        dag=globals()[dag_id])
    
    process_seed_result = BranchPythonOperator(
        task_id='process_seed_result',
        provide_context = True,
        #op_kwargs={ 'seed_limit_value' : default_args['min_seed_limit']},
        python_callable=process_result,
        trigger_rule="all_success",
        dag=globals()[dag_id])
    
    
    #            " " + "{{ task_instance.xcom_pull(task_ids='get_parameters') }}"[0] + \
    #            " " + "{{ task_instance.xcom_pull(task_ids='get_parameters') }}"[1] + " " +\
    spark_tile_feeder = BashOperator(
        task_id='spark_tile_feeder',
        bash_command = 'bash ' + default_args['seeding_script'] + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='layer') }}" + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='seed_date') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='memexec') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='numexec') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='minexec') }}" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='maxexec') }}" + ' ' + \
                " '{{ task_instance.xcom_pull(task_ids='get_parameters', key='SPARK_EXTRA_JAVA_OPTIONS') }}'" + ' ' + \
                " {{ task_instance.xcom_pull(task_ids='get_parameters', key='GEOSERVER_DATA_DIR') }}" + ' ' + \
                " '{{ task_instance.xcom_pull(task_ids='get_parameters', key='zoom') }}'" ,
        dag=globals()[dag_id])
    
    
    ########################
    #SET POSTGRES STATUSSES
    #########################
    set_status_start_seeding = PostgresOperator(task_id='set_status_start_seeding',
                          sql="update seeding_queue_tbl set status=1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          #sql="update seeding_queue_tbl set status=1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_seed_error= PostgresOperator(task_id='set_status_seed_error',
                          sql="update seeding_queue_tbl set status=-1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='one_failed',
                          dag=globals()[dag_id])
    set_status_seed_error2= PostgresOperator(task_id='set_status_seed_error2',
                          sql="update seeding_queue_tbl set status=-1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_start_monitor= PostgresOperator(task_id='set_status_start_monitor',
                          sql="update seeding_queue_tbl set status=3 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_monitor_error= PostgresOperator(task_id='set_status_monitor_error',
                          sql="update seeding_queue_tbl set status=-3 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='one_failed',
                          dag=globals()[dag_id])
    
    set_status_pyramide = PostgresOperator(task_id='set_status_pyramide',
                          sql="update seeding_queue_tbl set status=2 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    set_status_done     = PostgresOperator(task_id='set_status_done',
                          sql="update seeding_queue_tbl set status=0, modification_date='" + get_date_now_formatted() + "' where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='all_success',
                          dag=globals()[dag_id])
    
    set_status_pyramide_error= PostgresOperator(task_id='set_status_pyramide_error',
                          sql="update seeding_queue_tbl set status=-2 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
                          postgres_conn_id=POSTGRES_CONN_ID,
                          autocommit=True,
                          database="cvbdmz3",
                          trigger_rule='one_failed',
                          dag=globals()[dag_id])
    
    
    
    ####################
    ####### CHAINING ###
    ####################
    get_parameters_operator   >> set_status_start_seeding
    set_status_start_seeding  >> spark_tile_feeder
    
    spark_tile_feeder         >> set_status_start_monitor
    spark_tile_feeder         >> set_status_seed_error
    
    
    set_status_start_monitor  >> get_seed_result
    
    get_seed_result           >> set_status_monitor_error
    get_seed_result	      >> log_seed_result
    get_seed_result           >> process_seed_result
    
    process_seed_result       >> set_status_pyramide
    process_seed_result       >> set_status_seed_error2
    log_seed_result           >> set_status_pyramide
    
    set_status_pyramide       >> spark_build_pyramides
    
    spark_build_pyramides     >> set_status_done
    spark_build_pyramides     >> set_status_pyramide_error{code}


> job with one_failed trigger executed unwanted
> ---------------------------------------------
>
>                 Key: AIRFLOW-3781
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3781
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun
>    Affects Versions: 1.10.0
>            Reporter: oomsb
>            Priority: Major
>         Attachments: airflow_one_failed_bug.png
>
>
> I have several jobs with a trigger='one_failed'. If a process failes, only the direct downstream job with a 'one_failed' trigger should be executed. What I see is that all the downstream jobs, with no direct connection to it, and with a one_failed trigger_rule are executed. I think this is a bug.
> As in my example, the job with name 'spark_tile_feeder' failes and should trigger the 'set_status_seed_error' job +only.+
> Job set_status_monitor_error and set_status_pyramid_error is wrongly executed too.
> Is it a bug, or did I do something wrong?
>  
> {code:java}
>  # -*- coding: utf-8 -*-
> import airflow
> import os
> import datetime
> from airflow import DAG
> from airflow.operators.postgres_operator import PostgresOperator
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.python_operator import PythonOperator
> product_types = [ 
> #1KM
>                   "type1"
>                 ]
> ###################
> # Extra functions #
> ###################
> def get_date_now_formatted():
>     d=datetime.datetime.now()
>     return d.strftime("%Y-%m-%d %H:%M:%S")
> def get_settings(layer, seed_date):
>     f=open(PRESETS_FILE,'r')
>     lines=f.readlines()
>     f.close()
>     settings={}
>     for line in lines:
>         if line.find(layer)>=0:
>             lst=line.strip().split(',')
>             settings = {
>                 'preset'  : lst[0],
>                 'level'   : "%02d" %  int(lst[2]),
>                 'zoom'    : "--zoom-from " + lst[2] + " --zoom-to " + lst[2],
>                 'folder'  : lst[4],
>                 'numexec' : lst[6],
>                 'minexec' : lst[7],
>                 'maxexec' : lst[8],
>                 'memexec' : lst[9],
>                 'startdate': "" 
>                 }
>             preserveDate = lst[10]
>             if preserveDate != 'true':
>                 settings['startdate']=seed_date[0:10]
>             settings['GEOSERVER_DATA_DIR'] = lst[5]
>             settings['GEOWEBCACHE_CACHE_DIR'] = 'gwc'
>     
>             if layer.find('CGS') >=0:
>                 settings['SPARK_EXTRA_JAVA_OPTIONS']="-DGEOSERVER_DATA_DIR=" + settings['GEOSERVER_DATA_DIR'] + " -DGEOWEBCACHE_CACHE_DIR=" + settings['GEOWEBCACHE_CACHE_DIR'] + " -Dvito.pdfcatalog.rest=true -Dorg.geotools.coverage.jaiext.enabled=true -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Denabledebug=false"
>             else:
>                 settings['SPARK_EXTRA_JAVA_OPTIONS']="-DGEOSERVER_DATA_DIR=" + settings['GEOSERVER_DATA_DIR'] + " -DGEOWEBCACHE_CACHE_DIR=" + settings['GEOWEBCACHE_CACHE_DIR'] + " -Dvito.pdfcatalog.rest=true -Dorg.geotools.coverage.jaiext.enabled=false -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Denabledebug=false"
>     return settings
> ###################################
> # start here
> ###############################
> #create a dag for each producttype
> dag={}
> for product_type in product_types:
>     dag_id = 'geovwr_seed_' + product_type
>     #1) get settings to default_args dict
>     default_args = {
>         'owner': 'oomsb',
>         'start_date': airflow.utils.dates.days_ago(2),
>         'sparkTileFeeder' : 'hdfs:///tapworkflows/probavgeoviewerseed/lib/sparktilefeeder-1.5.41-shaded.jar',
>         'java_options' :'',
>         'max_tries' : 1,
>         'dag_dir' : os.path.join(os.environ.get('AIRFLOW_HOME'),'dags',  'seeding'),
>         }
>     
>     
>     POSTGRES_CONN_ID='postgresqldmz3_seedings'
>     PRESETS_FILE=os.path.join(default_args['dag_dir'], 'presets')
>     default_args['seeding_script'] = os.path.join(default_args['dag_dir'], 'spark_submit.sh')
>     default_args['pyramide_script'] = os.path.join(default_args['dag_dir'], 'generate_pyramide.sh')
>     
>     globals()[dag_id] = DAG(dag_id = dag_id, default_args=default_args,schedule_interval=None)
>     print (dag)
>     def get_parameters(**context):
>         #read the rest parameters
>         layer =  context['dag_run'].conf['producttype']
>         seed_date = context['dag_run'].conf['seed_date']
>         estimated_tile_nr = context['dag_run'].conf['estimated_tile_nr']
>         seedid = context['dag_run'].conf['seed_id']
>     
>         #read presets
>         settings = get_settings(layer, seed_date) 
>     
>         #push them to xcom to have access in other operators
>         task_instance = context['task_instance']
>         task_instance.xcom_push(key='seedid', value=seedid)
>         task_instance.xcom_push(key='estimated_tile_nr', value=estimated_tile_nr)
>         task_instance.xcom_push(key='layer', value=layer)
>         task_instance.xcom_push(key='seed_date', value=seed_date)
>         task_instance.xcom_push(key='folder', value=settings['folder'])
>         task_instance.xcom_push(key='level', value=settings['level'])
>         task_instance.xcom_push(key='numexec', value=settings['numexec'])
>         task_instance.xcom_push(key='minexec', value=settings['minexec'])
>         task_instance.xcom_push(key='maxexec', value=settings['maxexec'])
>         task_instance.xcom_push(key='memexec', value=settings['memexec'])
>         task_instance.xcom_push(key='SPARK_EXTRA_JAVA_OPTIONS', value=settings['SPARK_EXTRA_JAVA_OPTIONS'])
>         task_instance.xcom_push(key='GEOSERVER_DATA_DIR', value=settings['GEOSERVER_DATA_DIR'])
>         task_instance.xcom_push(key='zoom', value=settings['zoom'])
>      
>     def process_result(**context):
>         #run conditional jobs, the jobname is returned in this procedure
>         result = context['task_instance'].xcom_pull(task_ids='get_seed_result')
>         estimated_tile_nr = context['task_instance'].xcom_pull(task_ids='get_parameters', key='estimated_tile_nr')
>         print ("Seed result : " +str(result))
>         print ("Limit value : " + str(estimated_tile_nr)) 
>         
>         #load job on the following condition
>         #if int(result) < int(estimated_tile_nr):
>         if int(result) < 1:
>            print ('Number of seeded tiles is under the limit')
>            print ('Stop seeding now')
>            # call error2 with the all_succees_trigger
>            return 'set_status_seed_error2'
>         else:
>            return 'set_status_pyramide'
>     
>     ###########################
>     #Operators
>     ###########################
>     get_parameters_operator = PythonOperator(
>         task_id="get_parameters",
>         provide_context = True,
>         python_callable=get_parameters,
>         dag=globals()[dag_id])
>     
>     
>     spark_build_pyramides = BashOperator(
>         task_id='spark_build_pyramides',
>         bash_command = 'bash ' + default_args['pyramide_script'] + " {{ task_instance.xcom_pull(task_ids='get_parameters', key='layer') }}" + " {{ task_instance.xcom_pull(task_ids='get_parameters',key='seed_date') }}", 
>         dag=globals()[dag_id])
>     
>     get_seed_result = BashOperator(
>         task_id='get_seed_result',
>         bash_command = 'find ' + os.path.join( "{{ task_instance.xcom_pull(task_ids='get_parameters',key='folder') }}"  , "{{ task_instance.xcom_pull(task_ids='get_parameters',key='layer') }}", 'g', "{{ task_instance.xcom_pull(task_ids='get_parameters',key='seed_date') }}","{{ task_instance.xcom_pull(task_ids='get_parameters',key='level') }}") + ' -name *.png | wc -l',
>         xcom_push = True,
>         dag=globals()[dag_id])
>     
>     log_seed_result = PostgresOperator(task_id='log_seed_result',
>         sql="update seeding_queue_tbl set status=4, result={{ task_instance.xcom_pull(task_ids='get_seed_result') }} where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters',key='seedid') }}" +";",
>         postgres_conn_id=POSTGRES_CONN_ID,
>         autocommit=True,
>         database="cvbdmz3",
>         trigger_rule='all_success',
>         dag=globals()[dag_id])
>     
>     process_seed_result = BranchPythonOperator(
>         task_id='process_seed_result',
>         provide_context = True,
>         #op_kwargs={ 'seed_limit_value' : default_args['min_seed_limit']},
>         python_callable=process_result,
>         trigger_rule="all_success",
>         dag=globals()[dag_id])
>     
>     
>     #            " " + "{{ task_instance.xcom_pull(task_ids='get_parameters') }}"[0] + \
>     #            " " + "{{ task_instance.xcom_pull(task_ids='get_parameters') }}"[1] + " " +\
>     spark_tile_feeder = BashOperator(
>         task_id='spark_tile_feeder',
>         bash_command = 'bash ' + default_args['seeding_script'] + \
>                 " {{ task_instance.xcom_pull(task_ids='get_parameters', key='layer') }}" + \
>                 " {{ task_instance.xcom_pull(task_ids='get_parameters', key='seed_date') }}" + ' ' + \
>                 " {{ task_instance.xcom_pull(task_ids='get_parameters', key='memexec') }}" + ' ' + \
>                 " {{ task_instance.xcom_pull(task_ids='get_parameters', key='numexec') }}" + ' ' + \
>                 " {{ task_instance.xcom_pull(task_ids='get_parameters', key='minexec') }}" + ' ' + \
>                 " {{ task_instance.xcom_pull(task_ids='get_parameters', key='maxexec') }}" + ' ' + \
>                 " '{{ task_instance.xcom_pull(task_ids='get_parameters', key='SPARK_EXTRA_JAVA_OPTIONS') }}'" + ' ' + \
>                 " {{ task_instance.xcom_pull(task_ids='get_parameters', key='GEOSERVER_DATA_DIR') }}" + ' ' + \
>                 " '{{ task_instance.xcom_pull(task_ids='get_parameters', key='zoom') }}'" ,
>         dag=globals()[dag_id])
>     
>     
>     ########################
>     #SET POSTGRES STATUSSES
>     #########################
>     set_status_start_seeding = PostgresOperator(task_id='set_status_start_seeding',
>                           sql="update seeding_queue_tbl set status=1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
>                           #sql="update seeding_queue_tbl set status=1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters') }};",
>                           postgres_conn_id=POSTGRES_CONN_ID,
>                           autocommit=True,
>                           database="cvbdmz3",
>                           trigger_rule='all_success',
>                           dag=globals()[dag_id])
>     
>     set_status_seed_error= PostgresOperator(task_id='set_status_seed_error',
>                           sql="update seeding_queue_tbl set status=-1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
>                           postgres_conn_id=POSTGRES_CONN_ID,
>                           autocommit=True,
>                           database="cvbdmz3",
>                           trigger_rule='one_failed',
>                           dag=globals()[dag_id])
>     set_status_seed_error2= PostgresOperator(task_id='set_status_seed_error2',
>                           sql="update seeding_queue_tbl set status=-1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
>                           postgres_conn_id=POSTGRES_CONN_ID,
>                           autocommit=True,
>                           database="cvbdmz3",
>                           trigger_rule='all_success',
>                           dag=globals()[dag_id])
>     
>     set_status_start_monitor= PostgresOperator(task_id='set_status_start_monitor',
>                           sql="update seeding_queue_tbl set status=3 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
>                           postgres_conn_id=POSTGRES_CONN_ID,
>                           autocommit=True,
>                           database="cvbdmz3",
>                           trigger_rule='all_success',
>                           dag=globals()[dag_id])
>     
>     set_status_monitor_error= PostgresOperator(task_id='set_status_monitor_error',
>                           sql="update seeding_queue_tbl set status=-3 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
>                           postgres_conn_id=POSTGRES_CONN_ID,
>                           autocommit=True,
>                           database="cvbdmz3",
>                           trigger_rule='one_failed',
>                           dag=globals()[dag_id])
>     
>     set_status_pyramide = PostgresOperator(task_id='set_status_pyramide',
>                           sql="update seeding_queue_tbl set status=2 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
>                           postgres_conn_id=POSTGRES_CONN_ID,
>                           autocommit=True,
>                           database="cvbdmz3",
>                           trigger_rule='all_success',
>                           dag=globals()[dag_id])
>     set_status_done     = PostgresOperator(task_id='set_status_done',
>                           sql="update seeding_queue_tbl set status=0, modification_date='" + get_date_now_formatted() + "' where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
>                           postgres_conn_id=POSTGRES_CONN_ID,
>                           autocommit=True,
>                           database="cvbdmz3",
>                           trigger_rule='all_success',
>                           dag=globals()[dag_id])
>     
>     set_status_pyramide_error= PostgresOperator(task_id='set_status_pyramide_error',
>                           sql="update seeding_queue_tbl set status=-2 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
>                           postgres_conn_id=POSTGRES_CONN_ID,
>                           autocommit=True,
>                           database="cvbdmz3",
>                           trigger_rule='one_failed',
>                           dag=globals()[dag_id])
>     
>     
>     
>     ####################
>     ####### CHAINING ###
>     ####################
>     get_parameters_operator   >> set_status_start_seeding
>     set_status_start_seeding  >> spark_tile_feeder
>     
>     spark_tile_feeder         >> set_status_start_monitor
>     spark_tile_feeder         >> set_status_seed_error
>     
>     
>     set_status_start_monitor  >> get_seed_result
>     
>     get_seed_result           >> set_status_monitor_error
>     get_seed_result	      >> log_seed_result
>     get_seed_result           >> process_seed_result
>     
>     process_seed_result       >> set_status_pyramide
>     process_seed_result       >> set_status_seed_error2
>     log_seed_result           >> set_status_pyramide
>     
>     set_status_pyramide       >> spark_build_pyramides
>     
>     spark_build_pyramides     >> set_status_done
>     spark_build_pyramides     >> set_status_pyramide_error{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)