You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Matt Inwood (JIRA)" <ji...@apache.org> on 2017/02/17 20:50:41 UTC
[jira] [Commented] (AIRFLOW-884) SlackAPIPostOperator works
inconsistently across two DAGs / Python scripts.
[ https://issues.apache.org/jira/browse/AIRFLOW-884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872499#comment-15872499 ]
Matt Inwood commented on AIRFLOW-884:
-------------------------------------
Full code of a test file where it's not completing downstream slack tasks.
from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import PythonOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
from datetime import datetime, timedelta
import pyodbc
from datetime import datetime
import os
import urllib.parse
default_args = {
'owner': 'analytics',
'depends_on_past': False,
#'start_date': sixty_days_ago,
'start_date': datetime(2017, 2, 16),
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'Valet_Data',
default_args=default_args,
schedule_interval='05 * * * *',
dagrun_timeout=timedelta(seconds=60))
def valet_function(locdata, ds, **kwargs):
print(locdata)
slack_success = SlackAPIPostOperator(
task_id='slack_success',
token='xoxp-59581512119-59573138118-125681286114-e24e7f4d73c8efefa50bdfcbd4b9c6f5',
channel='#airflow',
username='airflow',
text="job {{ dag }} finished at {{ ts }} successfully 8)",
dag=dag,
trigger_rule=TriggerRule.ALL_SUCCESS
)
slack_fail = SlackAPIPostOperator(
task_id='slack_fail',
token='xoxp-59581512119-59573138118-125681286114-e24e7f4d73c8efefa50bdfcbd4b9c6f5',
channel='#airflow',
username='airflow',
text="job '{{ dag }}' finished at {{ ts }} in disgrace >:(",
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED
)
fac = {
'LAX_S': [123, 0, 'LOREM'],
'LAX_C': [234, 11, 'IPSUM'],
'IAH_JFK': [345, 22, 'LOREM'],
'STL1': [456, 33, 'IPSUM'],
'EWR_H': [678, 44, 'LOREM'],
'EWR_M': [789, 55, 'IPSUM'],
'JFK': [890, 66, 'LOREM'],
'LGA': [901, 77, 'IPSUM'],
'PHL1': [12, 88, 'LOREM'],
'BNA': [23, 99, 'IPSUM']
}
for loc in fac:
task = PythonOperator(
task_id='valet_{0}'.format(loc),
provide_context=True,
python_callable=valet_function,
op_kwargs={'locdata': fac[loc]},
dag=dag)
slack_success.set_upstream(task)
slack_fail.set_upstream(task)
> SlackAPIPostOperator works inconsistently across two DAGs / Python scripts.
> ---------------------------------------------------------------------------
>
> Key: AIRFLOW-884
> URL: https://issues.apache.org/jira/browse/AIRFLOW-884
> Project: Apache Airflow
> Issue Type: Bug
> Components: api
> Affects Versions: Airflow 1.7.1.3
> Reporter: Matt Inwood
> Attachments: airflow_error.PNG
>
>
> Wrote a DAG file where the script executes and then sends a message to slack. The code at the bottom is used to implement that.
> I used the same code in another DAG file. The upstream task completes successfully, but the slack_success task does not. If I force an error into my script, it does return the slack_fail task.
> On success, both Slack operators indicate Operater Undefined, per attached screenshot
> slack_success = SlackAPIPostOperator(
> task_id='slack_success',
> token=TOKEN,
> channel='#airflow',
> username='airflow',
> text="job {{ dag }} finished at {{ ts }} successfully 8)",
> dag=dag
> )
> slack_fail = SlackAPIPostOperator(
> task_id='slack_fail',
> token=TOKEN,
> channel='#airflow',
> username='airflow',
> text="job '{{ dag }}' finished at {{ ts }} in disgrace >:(",
> dag=dag,
> trigger_rule=TriggerRule.ONE_FAILED
> )
> slack_success.set_upstream(sample_task)
> slack_fail.set_upstream(sample_task)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)