You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Matt Inwood (JIRA)" <ji...@apache.org> on 2017/02/23 15:58:44 UTC

[jira] [Updated] (AIRFLOW-894) Trigger Rules not functioning

     [ https://issues.apache.org/jira/browse/AIRFLOW-894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matt Inwood updated AIRFLOW-894:
--------------------------------
    Description: 
Code below fails to schedule the join task. This includes with trigger rules for all_done, and one_success. It seems to only occur when dynamically generating tasks. 

from airflow import DAG
from airflow.operators import PythonOperator, BranchPythonOperator, DummyOperator
from datetime import datetime, timedelta
from datetime import datetime
from slackclient import SlackClient


default_args = {
    'owner': 'analytics',
    'depends_on_past': False,
    #'start_date': sixty_days_ago,
    'start_date': datetime(2017, 2, 22),
    'retries': 0
    # 'retry_delay': timedelta(seconds=30),
}

dag = DAG(
    'Valet_Data',
    default_args=default_args,
    schedule_interval='*/5 * * * *',
    dagrun_timeout=timedelta(seconds=60))

def valet_function(locdata, ds, **kwargs):
    if locdata == 'D':
        print(fuckme)

def on_success(kwargs):
    slack_message("On Success - Test")

def on_failure(kwargs):
    for arg in kwargs:
        print(arg ," - ", kwargs[arg])
    slack_message("On Failure - Test - Taskname = {0}".format("Something"))

def slack_message(body):
    token = dummy_token
    sc = SlackClient(token)
    sc.api_call(
        "chat.postMessage",
        channel='#airflow',
        text=body,
        username = 'airflow',
        icon_url='https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png'
    )

join = DummyOperator(
    task_id='join',
    trigger_rule='all_done',
    dag=dag
)


list = ['A','B','C','D','E','F','G','H','I','J','Z']

for l in list:

    task = PythonOperator(
        task_id='{0}_PANTS'.format(l),
        provide_context=True,
        python_callable=valet_function,
        op_kwargs={'locdata': l},
        # on_failure_callback=on_failure,
        # on_success_callback=on_success,
        dag=dag,
    )

  was:
Code below fails to schedule the join task. This includes with trigger rules for all_done, and one_success. It seems to only occur when dynamically generating tasks. 

from airflow import DAG
from airflow.operators import PythonOperator, BranchPythonOperator, DummyOperator
from datetime import datetime, timedelta
from datetime import datetime
from slackclient import SlackClient


default_args = {
    'owner': 'analytics',
    'depends_on_past': False,
    #'start_date': sixty_days_ago,
    'start_date': datetime(2017, 2, 22),
    'retries': 0
    # 'retry_delay': timedelta(seconds=30),
}

dag = DAG(
    'Valet_Data',
    default_args=default_args,
    schedule_interval='*/5 * * * *',
    dagrun_timeout=timedelta(seconds=60))

def valet_function(locdata, ds, **kwargs):
    if locdata == 'D':
        print(fuckme)

def on_success(kwargs):
    slack_message("On Success - Test")

def on_failure(kwargs):
    for arg in kwargs:
        print(arg ," - ", kwargs[arg])
    slack_message("On Failure - Test - Taskname = {0}".format("Something"))

def slack_message(body):
    token = 'xoxp-59581512119-59573138118-125681286114-e24e7f4d73c8efefa50bdfcbd4b9c6f5'
    sc = SlackClient(token)
    sc.api_call(
        "chat.postMessage",
        channel='#airflow',
        text=body,
        username = 'airflow',
        icon_url='https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png'
    )

join = DummyOperator(
    task_id='join',
    trigger_rule='all_done',
    dag=dag
)


list = ['A','B','C','D','E','F','G','H','I','J','Z']

for l in list:

    task = PythonOperator(
        task_id='{0}_PANTS'.format(l),
        provide_context=True,
        python_callable=valet_function,
        op_kwargs={'locdata': l},
        # on_failure_callback=on_failure,
        # on_success_callback=on_success,
        dag=dag,
    )


> Trigger Rules not functioning
> -----------------------------
>
>                 Key: AIRFLOW-894
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-894
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: dependencies
>    Affects Versions: Airflow 1.7.1.3
>            Reporter: Matt Inwood
>            Priority: Blocker
>         Attachments: airflow_error.PNG
>
>
> Code below fails to schedule the join task. This includes with trigger rules for all_done, and one_success. It seems to only occur when dynamically generating tasks. 
> from airflow import DAG
> from airflow.operators import PythonOperator, BranchPythonOperator, DummyOperator
> from datetime import datetime, timedelta
> from datetime import datetime
> from slackclient import SlackClient
> default_args = {
>     'owner': 'analytics',
>     'depends_on_past': False,
>     #'start_date': sixty_days_ago,
>     'start_date': datetime(2017, 2, 22),
>     'retries': 0
>     # 'retry_delay': timedelta(seconds=30),
> }
> dag = DAG(
>     'Valet_Data',
>     default_args=default_args,
>     schedule_interval='*/5 * * * *',
>     dagrun_timeout=timedelta(seconds=60))
> def valet_function(locdata, ds, **kwargs):
>     if locdata == 'D':
>         print(fuckme)
> def on_success(kwargs):
>     slack_message("On Success - Test")
> def on_failure(kwargs):
>     for arg in kwargs:
>         print(arg ," - ", kwargs[arg])
>     slack_message("On Failure - Test - Taskname = {0}".format("Something"))
> def slack_message(body):
>     token = dummy_token
>     sc = SlackClient(token)
>     sc.api_call(
>         "chat.postMessage",
>         channel='#airflow',
>         text=body,
>         username = 'airflow',
>         icon_url='https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png'
>     )
> join = DummyOperator(
>     task_id='join',
>     trigger_rule='all_done',
>     dag=dag
> )
> list = ['A','B','C','D','E','F','G','H','I','J','Z']
> for l in list:
>     task = PythonOperator(
>         task_id='{0}_PANTS'.format(l),
>         provide_context=True,
>         python_callable=valet_function,
>         op_kwargs={'locdata': l},
>         # on_failure_callback=on_failure,
>         # on_success_callback=on_success,
>         dag=dag,
>     )



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)