You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Matt Inwood (JIRA)" <ji...@apache.org> on 2017/02/17 20:50:41 UTC

[jira] [Commented] (AIRFLOW-884) SlackAPIPostOperator works inconsistently across two DAGs / Python scripts.

    [ https://issues.apache.org/jira/browse/AIRFLOW-884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872499#comment-15872499 ] 

Matt Inwood commented on AIRFLOW-884:
-------------------------------------

Full code of a test file where it's not completing downstream slack tasks. 

from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import PythonOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
from datetime import datetime, timedelta
import pyodbc
from datetime import datetime
import os
import urllib.parse


default_args = {
    'owner': 'analytics',
    'depends_on_past': False,
    #'start_date': sixty_days_ago,
    'start_date': datetime(2017, 2, 16),
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'Valet_Data',
    default_args=default_args,
    schedule_interval='05 * * * *',
    dagrun_timeout=timedelta(seconds=60))

def valet_function(locdata, ds, **kwargs):
    print(locdata)

slack_success = SlackAPIPostOperator(
    task_id='slack_success',
    token='xoxp-59581512119-59573138118-125681286114-e24e7f4d73c8efefa50bdfcbd4b9c6f5',
    channel='#airflow',
    username='airflow',
    text="job {{ dag }} finished at {{ ts }} successfully 8)",
    dag=dag,
    trigger_rule=TriggerRule.ALL_SUCCESS
)

slack_fail = SlackAPIPostOperator(
    task_id='slack_fail',
    token='xoxp-59581512119-59573138118-125681286114-e24e7f4d73c8efefa50bdfcbd4b9c6f5',
    channel='#airflow',
    username='airflow',
    text="job '{{ dag }}' finished at {{ ts }} in disgrace >:(",
    dag=dag,
    trigger_rule=TriggerRule.ONE_FAILED
)

fac = {
    'LAX_S': [123, 0, 'LOREM'],
    'LAX_C': [234, 11, 'IPSUM'],
    'IAH_JFK': [345, 22, 'LOREM'],
    'STL1': [456, 33, 'IPSUM'],
    'EWR_H': [678, 44, 'LOREM'],
    'EWR_M': [789, 55, 'IPSUM'],
    'JFK': [890, 66, 'LOREM'],
    'LGA': [901, 77, 'IPSUM'],
    'PHL1': [12, 88, 'LOREM'],
    'BNA': [23, 99, 'IPSUM']
}

for loc in fac:
    task = PythonOperator(
        task_id='valet_{0}'.format(loc),
        provide_context=True,
        python_callable=valet_function,
        op_kwargs={'locdata': fac[loc]},
        dag=dag)
    slack_success.set_upstream(task)
    slack_fail.set_upstream(task)


> SlackAPIPostOperator works inconsistently across two DAGs / Python scripts.
> ---------------------------------------------------------------------------
>
>                 Key: AIRFLOW-884
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-884
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: api
>    Affects Versions: Airflow 1.7.1.3
>            Reporter: Matt Inwood
>         Attachments: airflow_error.PNG
>
>
> Wrote a DAG file where the script executes and then sends a message to slack. The code at the bottom is used to implement that.
> I used the same code in another DAG file. The upstream task completes successfully, but the slack_success task does not. If I force an error into my script, it does return the slack_fail task. 
> On success, both Slack operators indicate Operater Undefined, per attached screenshot
> slack_success = SlackAPIPostOperator(
>     task_id='slack_success',
>     token=TOKEN,
>     channel='#airflow',
>     username='airflow',
>     text="job {{ dag }} finished at {{ ts }} successfully 8)",
>     dag=dag
> )
> slack_fail = SlackAPIPostOperator(
>     task_id='slack_fail',
>     token=TOKEN,
>     channel='#airflow',
>     username='airflow',
>     text="job '{{ dag }}' finished at {{ ts }} in disgrace >:(",
>     dag=dag,
>     trigger_rule=TriggerRule.ONE_FAILED
> )
> slack_success.set_upstream(sample_task)
> slack_fail.set_upstream(sample_task)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)