You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Matt Inwood (JIRA)" <ji...@apache.org> on 2017/02/23 15:57:44 UTC
[jira] [Created] (AIRFLOW-894) Trigger Rules not functioning
Matt Inwood created AIRFLOW-894:
-----------------------------------
Summary: Trigger Rules not functioning
Key: AIRFLOW-894
URL: https://issues.apache.org/jira/browse/AIRFLOW-894
Project: Apache Airflow
Issue Type: Bug
Components: dependencies
Affects Versions: Airflow 1.7.1.3
Reporter: Matt Inwood
Priority: Blocker
Attachments: airflow_error.PNG
Code below fails to schedule the join task. This includes with trigger rules for all_done, and one_success. It seems to only occur when dynamically generating tasks.
from airflow import DAG
from airflow.operators import PythonOperator, BranchPythonOperator, DummyOperator
from datetime import datetime, timedelta
from datetime import datetime
from slackclient import SlackClient
default_args = {
'owner': 'analytics',
'depends_on_past': False,
#'start_date': sixty_days_ago,
'start_date': datetime(2017, 2, 22),
'retries': 0
# 'retry_delay': timedelta(seconds=30),
}
dag = DAG(
'Valet_Data',
default_args=default_args,
schedule_interval='*/5 * * * *',
dagrun_timeout=timedelta(seconds=60))
def valet_function(locdata, ds, **kwargs):
if locdata == 'D':
print(fuckme)
def on_success(kwargs):
slack_message("On Success - Test")
def on_failure(kwargs):
for arg in kwargs:
print(arg ," - ", kwargs[arg])
slack_message("On Failure - Test - Taskname = {0}".format("Something"))
def slack_message(body):
token = 'xoxp-59581512119-59573138118-125681286114-e24e7f4d73c8efefa50bdfcbd4b9c6f5'
sc = SlackClient(token)
sc.api_call(
"chat.postMessage",
channel='#airflow',
text=body,
username = 'airflow',
icon_url='https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png'
)
join = DummyOperator(
task_id='join',
trigger_rule='all_done',
dag=dag
)
list = ['A','B','C','D','E','F','G','H','I','J','Z']
for l in list:
task = PythonOperator(
task_id='{0}_PANTS'.format(l),
provide_context=True,
python_callable=valet_function,
op_kwargs={'locdata': l},
# on_failure_callback=on_failure,
# on_success_callback=on_success,
dag=dag,
)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)