You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Matt Inwood (JIRA)" <ji...@apache.org> on 2017/02/23 15:57:44 UTC

[jira] [Created] (AIRFLOW-894) Trigger Rules not functioning

Matt Inwood created AIRFLOW-894:
-----------------------------------

             Summary: Trigger Rules not functioning
                 Key: AIRFLOW-894
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-894
             Project: Apache Airflow
          Issue Type: Bug
          Components: dependencies
    Affects Versions: Airflow 1.7.1.3
            Reporter: Matt Inwood
            Priority: Blocker
         Attachments: airflow_error.PNG

Code below fails to schedule the join task. This includes with trigger rules for all_done, and one_success. It seems to only occur when dynamically generating tasks. 

from airflow import DAG
from airflow.operators import PythonOperator, BranchPythonOperator, DummyOperator
from datetime import datetime, timedelta
from datetime import datetime
from slackclient import SlackClient


default_args = {
    'owner': 'analytics',
    'depends_on_past': False,
    #'start_date': sixty_days_ago,
    'start_date': datetime(2017, 2, 22),
    'retries': 0
    # 'retry_delay': timedelta(seconds=30),
}

dag = DAG(
    'Valet_Data',
    default_args=default_args,
    schedule_interval='*/5 * * * *',
    dagrun_timeout=timedelta(seconds=60))

def valet_function(locdata, ds, **kwargs):
    if locdata == 'D':
        print(fuckme)

def on_success(kwargs):
    slack_message("On Success - Test")

def on_failure(kwargs):
    for arg in kwargs:
        print(arg ," - ", kwargs[arg])
    slack_message("On Failure - Test - Taskname = {0}".format("Something"))

def slack_message(body):
    token = 'xoxp-59581512119-59573138118-125681286114-e24e7f4d73c8efefa50bdfcbd4b9c6f5'
    sc = SlackClient(token)
    sc.api_call(
        "chat.postMessage",
        channel='#airflow',
        text=body,
        username = 'airflow',
        icon_url='https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png'
    )

join = DummyOperator(
    task_id='join',
    trigger_rule='all_done',
    dag=dag
)


list = ['A','B','C','D','E','F','G','H','I','J','Z']

for l in list:

    task = PythonOperator(
        task_id='{0}_PANTS'.format(l),
        provide_context=True,
        python_callable=valet_function,
        op_kwargs={'locdata': l},
        # on_failure_callback=on_failure,
        # on_success_callback=on_success,
        dag=dag,
    )



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)