You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "oomsb (JIRA)" <ji...@apache.org> on 2019/01/29 07:42:00 UTC
[jira] [Created] (AIRFLOW-3781) job with one_failed trigger
executed unwanted
oomsb created AIRFLOW-3781:
------------------------------
Summary: job with one_failed trigger executed unwanted
Key: AIRFLOW-3781
URL: https://issues.apache.org/jira/browse/AIRFLOW-3781
Project: Apache Airflow
Issue Type: Bug
Components: DagRun
Affects Versions: 1.10.0
Reporter: oomsb
Attachments: airflow_one_failed_bug.png
I have several jobs with a trigger='one_failed'. If a process failes, only the direct downstream job with a 'one_failed' trigger should be executed. What I see is that all the downstream jobs, with no direct connection to it, and with a one_failed trigger_rule are executed. I think this is a bug.
As in my example. Job with name spark_tile_feeder failes and should trigger the 'set_status_seed_error' job only.
Job set_status_monitor_error and set_status_pyramid_error is wrongly executed too.
Is it a bug, or did I do something wrong?
{code:java}
# -*- coding: utf-8 -*-
import airflow
import os
import datetime
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
product_types = [
#1KM
"type1"
]
###################
# Extra functions #
###################
def get_date_now_formatted():
d=datetime.datetime.now()
return d.strftime("%Y-%m-%d %H:%M:%S")
def get_settings(layer, seed_date):
f=open(PRESETS_FILE,'r')
lines=f.readlines()
f.close()
settings={}
for line in lines:
if line.find(layer)>=0:
lst=line.strip().split(',')
settings = {
'preset' : lst[0],
'level' : "%02d" % int(lst[2]),
'zoom' : "--zoom-from " + lst[2] + " --zoom-to " + lst[2],
'folder' : lst[4],
'numexec' : lst[6],
'minexec' : lst[7],
'maxexec' : lst[8],
'memexec' : lst[9],
'startdate': ""
}
preserveDate = lst[10]
if preserveDate != 'true':
settings['startdate']=seed_date[0:10]
settings['GEOSERVER_DATA_DIR'] = lst[5]
settings['GEOWEBCACHE_CACHE_DIR'] = 'gwc'
if layer.find('CGS') >=0:
settings['SPARK_EXTRA_JAVA_OPTIONS']="-DGEOSERVER_DATA_DIR=" + settings['GEOSERVER_DATA_DIR'] + " -DGEOWEBCACHE_CACHE_DIR=" + settings['GEOWEBCACHE_CACHE_DIR'] + " -Dvito.pdfcatalog.rest=true -Dorg.geotools.coverage.jaiext.enabled=true -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Denabledebug=false"
else:
settings['SPARK_EXTRA_JAVA_OPTIONS']="-DGEOSERVER_DATA_DIR=" + settings['GEOSERVER_DATA_DIR'] + " -DGEOWEBCACHE_CACHE_DIR=" + settings['GEOWEBCACHE_CACHE_DIR'] + " -Dvito.pdfcatalog.rest=true -Dorg.geotools.coverage.jaiext.enabled=false -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Denabledebug=false"
return settings
###################################
# start here
###############################
#create a dag for each producttype
dag={}
for product_type in product_types:
dag_id = 'geovwr_seed_' + product_type
#1) get settings to default_args dict
default_args = {
'owner': 'oomsb',
'start_date': airflow.utils.dates.days_ago(2),
'sparkTileFeeder' : 'hdfs:///tapworkflows/probavgeoviewerseed/lib/sparktilefeeder-1.5.41-shaded.jar',
'java_options' :'',
'max_tries' : 1,
'dag_dir' : os.path.join(os.environ.get('AIRFLOW_HOME'),'dags', 'seeding'),
}
POSTGRES_CONN_ID='postgresqldmz3_seedings'
PRESETS_FILE=os.path.join(default_args['dag_dir'], 'presets')
default_args['seeding_script'] = os.path.join(default_args['dag_dir'], 'spark_submit.sh')
default_args['pyramide_script'] = os.path.join(default_args['dag_dir'], 'generate_pyramide.sh')
globals()[dag_id] = DAG(dag_id = dag_id, default_args=default_args,schedule_interval=None)
print (dag)
def get_parameters(**context):
#read the rest parameters
layer = context['dag_run'].conf['producttype']
seed_date = context['dag_run'].conf['seed_date']
estimated_tile_nr = context['dag_run'].conf['estimated_tile_nr']
seedid = context['dag_run'].conf['seed_id']
#read presets
settings = get_settings(layer, seed_date)
#push them to xcom to have access in other operators
task_instance = context['task_instance']
task_instance.xcom_push(key='seedid', value=seedid)
task_instance.xcom_push(key='estimated_tile_nr', value=estimated_tile_nr)
task_instance.xcom_push(key='layer', value=layer)
task_instance.xcom_push(key='seed_date', value=seed_date)
task_instance.xcom_push(key='folder', value=settings['folder'])
task_instance.xcom_push(key='level', value=settings['level'])
task_instance.xcom_push(key='numexec', value=settings['numexec'])
task_instance.xcom_push(key='minexec', value=settings['minexec'])
task_instance.xcom_push(key='maxexec', value=settings['maxexec'])
task_instance.xcom_push(key='memexec', value=settings['memexec'])
task_instance.xcom_push(key='SPARK_EXTRA_JAVA_OPTIONS', value=settings['SPARK_EXTRA_JAVA_OPTIONS'])
task_instance.xcom_push(key='GEOSERVER_DATA_DIR', value=settings['GEOSERVER_DATA_DIR'])
task_instance.xcom_push(key='zoom', value=settings['zoom'])
def process_result(**context):
#run conditional jobs, the jobname is returned in this procedure
result = context['task_instance'].xcom_pull(task_ids='get_seed_result')
estimated_tile_nr = context['task_instance'].xcom_pull(task_ids='get_parameters', key='estimated_tile_nr')
print ("Seed result : " +str(result))
print ("Limit value : " + str(estimated_tile_nr))
#load job on the following condition
#if int(result) < int(estimated_tile_nr):
if int(result) < 1:
print ('Number of seeded tiles is under the limit')
print ('Stop seeding now')
# call error2 with the all_succees_trigger
return 'set_status_seed_error2'
else:
return 'set_status_pyramide'
###########################
#Operators
###########################
get_parameters_operator = PythonOperator(
task_id="get_parameters",
provide_context = True,
python_callable=get_parameters,
dag=globals()[dag_id])
spark_build_pyramides = BashOperator(
task_id='spark_build_pyramides',
bash_command = 'bash ' + default_args['pyramide_script'] + " {{ task_instance.xcom_pull(task_ids='get_parameters', key='layer') }}" + " {{ task_instance.xcom_pull(task_ids='get_parameters',key='seed_date') }}",
dag=globals()[dag_id])
get_seed_result = BashOperator(
task_id='get_seed_result',
bash_command = 'find ' + os.path.join( "{{ task_instance.xcom_pull(task_ids='get_parameters',key='folder') }}" , "{{ task_instance.xcom_pull(task_ids='get_parameters',key='layer') }}", 'g', "{{ task_instance.xcom_pull(task_ids='get_parameters',key='seed_date') }}","{{ task_instance.xcom_pull(task_ids='get_parameters',key='level') }}") + ' -name *.png | wc -l',
xcom_push = True,
dag=globals()[dag_id])
log_seed_result = PostgresOperator(task_id='log_seed_result',
sql="update seeding_queue_tbl set status=4, result={{ task_instance.xcom_pull(task_ids='get_seed_result') }} where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters',key='seedid') }}" +";",
postgres_conn_id=POSTGRES_CONN_ID,
autocommit=True,
database="cvbdmz3",
trigger_rule='all_success',
dag=globals()[dag_id])
process_seed_result = BranchPythonOperator(
task_id='process_seed_result',
provide_context = True,
#op_kwargs={ 'seed_limit_value' : default_args['min_seed_limit']},
python_callable=process_result,
trigger_rule="all_success",
dag=globals()[dag_id])
# " " + "{{ task_instance.xcom_pull(task_ids='get_parameters') }}"[0] + \
# " " + "{{ task_instance.xcom_pull(task_ids='get_parameters') }}"[1] + " " +\
spark_tile_feeder = BashOperator(
task_id='spark_tile_feeder',
bash_command = 'bash ' + default_args['seeding_script'] + \
" {{ task_instance.xcom_pull(task_ids='get_parameters', key='layer') }}" + \
" {{ task_instance.xcom_pull(task_ids='get_parameters', key='seed_date') }}" + ' ' + \
" {{ task_instance.xcom_pull(task_ids='get_parameters', key='memexec') }}" + ' ' + \
" {{ task_instance.xcom_pull(task_ids='get_parameters', key='numexec') }}" + ' ' + \
" {{ task_instance.xcom_pull(task_ids='get_parameters', key='minexec') }}" + ' ' + \
" {{ task_instance.xcom_pull(task_ids='get_parameters', key='maxexec') }}" + ' ' + \
" '{{ task_instance.xcom_pull(task_ids='get_parameters', key='SPARK_EXTRA_JAVA_OPTIONS') }}'" + ' ' + \
" {{ task_instance.xcom_pull(task_ids='get_parameters', key='GEOSERVER_DATA_DIR') }}" + ' ' + \
" '{{ task_instance.xcom_pull(task_ids='get_parameters', key='zoom') }}'" ,
dag=globals()[dag_id])
########################
#SET POSTGRES STATUSSES
#########################
set_status_start_seeding = PostgresOperator(task_id='set_status_start_seeding',
sql="update seeding_queue_tbl set status=1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
#sql="update seeding_queue_tbl set status=1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters') }};",
postgres_conn_id=POSTGRES_CONN_ID,
autocommit=True,
database="cvbdmz3",
trigger_rule='all_success',
dag=globals()[dag_id])
set_status_seed_error= PostgresOperator(task_id='set_status_seed_error',
sql="update seeding_queue_tbl set status=-1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
postgres_conn_id=POSTGRES_CONN_ID,
autocommit=True,
database="cvbdmz3",
trigger_rule='one_failed',
dag=globals()[dag_id])
set_status_seed_error2= PostgresOperator(task_id='set_status_seed_error2',
sql="update seeding_queue_tbl set status=-1 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
postgres_conn_id=POSTGRES_CONN_ID,
autocommit=True,
database="cvbdmz3",
trigger_rule='all_success',
dag=globals()[dag_id])
set_status_start_monitor= PostgresOperator(task_id='set_status_start_monitor',
sql="update seeding_queue_tbl set status=3 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
postgres_conn_id=POSTGRES_CONN_ID,
autocommit=True,
database="cvbdmz3",
trigger_rule='all_success',
dag=globals()[dag_id])
set_status_monitor_error= PostgresOperator(task_id='set_status_monitor_error',
sql="update seeding_queue_tbl set status=-3 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
postgres_conn_id=POSTGRES_CONN_ID,
autocommit=True,
database="cvbdmz3",
trigger_rule='one_failed',
dag=globals()[dag_id])
set_status_pyramide = PostgresOperator(task_id='set_status_pyramide',
sql="update seeding_queue_tbl set status=2 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
postgres_conn_id=POSTGRES_CONN_ID,
autocommit=True,
database="cvbdmz3",
trigger_rule='all_success',
dag=globals()[dag_id])
set_status_done = PostgresOperator(task_id='set_status_done',
sql="update seeding_queue_tbl set status=0, modification_date='" + get_date_now_formatted() + "' where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
postgres_conn_id=POSTGRES_CONN_ID,
autocommit=True,
database="cvbdmz3",
trigger_rule='all_success',
dag=globals()[dag_id])
set_status_pyramide_error= PostgresOperator(task_id='set_status_pyramide_error',
sql="update seeding_queue_tbl set status=-2 where seed_id={{ task_instance.xcom_pull(task_ids='get_parameters', key='seedid') }};",
postgres_conn_id=POSTGRES_CONN_ID,
autocommit=True,
database="cvbdmz3",
trigger_rule='one_failed',
dag=globals()[dag_id])
####################
####### CHAINING ###
####################
get_parameters_operator >> set_status_start_seeding
set_status_start_seeding >> spark_tile_feeder
spark_tile_feeder >> set_status_start_monitor
spark_tile_feeder >> set_status_seed_error
set_status_start_monitor >> get_seed_result
get_seed_result >> set_status_monitor_error
get_seed_result >> log_seed_result
get_seed_result >> process_seed_result
process_seed_result >> set_status_pyramide
process_seed_result >> set_status_seed_error2
log_seed_result >> set_status_pyramide
set_status_pyramide >> spark_build_pyramides
spark_build_pyramides >> set_status_done
spark_build_pyramides >> set_status_pyramide_error{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)