You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Prasad Bhalerao <pr...@gmail.com> on 2021/06/11 13:20:51 UTC

Capture DAG states

Hi,

I want to capture the following events. I want to capture these states and
generate an event for each.
As part of this I want to capture DAG ID and DAG Name etc.

1) DAG\pipeline started
2) DAG\pipeline halted
3) DAG\pipeline failed
4) DAG\pipeline success.

Can someone please help me with this?


Thanks,
Prasad

Re: Capture DAG states

Posted by Bruno Gonzalez <br...@homelight.com>.
Yes, you can use both of them using simple Python functions you define
inside the DAG file or from other Python libraries accessible by the DAG.
You can check the Airflow code here
<https://github.com/apache/airflow/blob/304e174674ff6921cb7ed79c0158949b50eff8fe/airflow/models/dag.py#L839>.
As you can see in the code I shared, it passes the context including all
the normal context variables (see here
<https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html#default-variables>
).

On Fri, Jun 11, 2021 at 2:58 PM Prasad Bhalerao <
prasadbhalerao1983@gmail.com> wrote:

> Hi,
>
> It means I have to call these apis explicitly right!
> But that is not my requirement, I want to fire my custom events when DAG
> state changes.
>
> As per my initial analysis I could think of using on_failure_callback and on_success_callback
> while creating DAGs. These callbacks will trigger my custom events. What I
> understand is that these callbacks will be executed on the scheduler node.
> So I have to use some mechanism like rest end pt or web hooks to post these
> events to my eventing module.
> Does this callback provide DAG_ID in context object?
>
> Please correct me if I am wrong.
>
> Thanks,
> Prasad
>
> On Fri, Jun 11, 2021 at 10:58 PM Bruno Gonzalez <br...@homelight.com>
> wrote:
>
>> Prasad, if you are using Airflow 2 you can use the new stable API. The
>> state of the DAG will be a combination of List DAG runs
>> <https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag_runs> to
>> get the *dag_run_id* and Get a DAG run
>> <https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag_run> to
>> get the *state*, that is the one that you're looking for.
>>
>>  Hope this helps.
>>
>> On Fri, Jun 11, 2021 at 11:50 AM Prasad Bhalerao <
>> prasadbhalerao1983@gmail.com> wrote:
>>
>>> Let me rephrase my question.
>>>
>>> Is there anyway to capture DAG states?
>>>
>>> Based on these state I want to trigger the events.
>>>
>>>
>>>
>>> On Fri, 11 Jun 2021 at 8:12 PM, Ash Berlin-Taylor <as...@apache.org>
>>> wrote:
>>>
>>>> What do you mean by "halted"? That isn't the name of any state in
>>>> Airflow.
>>>>
>>>> -ash
>>>>
>>>> On Fri, Jun 11 2021 at 18:50:51 +0530, Prasad Bhalerao <
>>>> prasadbhalerao1983@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I want to capture the following events. I want to capture these states
>>>> and generate an event for each.
>>>> As part of this I want to capture DAG ID and DAG Name etc.
>>>>
>>>> 1) DAG\pipeline started
>>>> 2) DAG\pipeline halted
>>>> 3) DAG\pipeline failed
>>>> 4) DAG\pipeline success.
>>>>
>>>> Can someone please help me with this?
>>>>
>>>>
>>>> Thanks,
>>>> Prasad
>>>>
>>>>

Re: Capture DAG states

Posted by Prasad Bhalerao <pr...@gmail.com>.
Hi,

It means I have to call these apis explicitly right!
But that is not my requirement, I want to fire my custom events when DAG
state changes.

As per my initial analysis I could think of using on_failure_callback
and on_success_callback
while creating DAGs. These callbacks will trigger my custom events. What I
understand is that these callbacks will be executed on the scheduler node.
So I have to use some mechanism like rest end pt or web hooks to post these
events to my eventing module.
Does this callback provide DAG_ID in context object?

Please correct me if I am wrong.

Thanks,
Prasad

On Fri, Jun 11, 2021 at 10:58 PM Bruno Gonzalez <br...@homelight.com> wrote:

> Prasad, if you are using Airflow 2 you can use the new stable API. The
> state of the DAG will be a combination of List DAG runs
> <https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag_runs> to
> get the *dag_run_id* and Get a DAG run
> <https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag_run> to
> get the *state*, that is the one that you're looking for.
>
>  Hope this helps.
>
> On Fri, Jun 11, 2021 at 11:50 AM Prasad Bhalerao <
> prasadbhalerao1983@gmail.com> wrote:
>
>> Let me rephrase my question.
>>
>> Is there anyway to capture DAG states?
>>
>> Based on these state I want to trigger the events.
>>
>>
>>
>> On Fri, 11 Jun 2021 at 8:12 PM, Ash Berlin-Taylor <as...@apache.org> wrote:
>>
>>> What do you mean by "halted"? That isn't the name of any state in
>>> Airflow.
>>>
>>> -ash
>>>
>>> On Fri, Jun 11 2021 at 18:50:51 +0530, Prasad Bhalerao <
>>> prasadbhalerao1983@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I want to capture the following events. I want to capture these states
>>> and generate an event for each.
>>> As part of this I want to capture DAG ID and DAG Name etc.
>>>
>>> 1) DAG\pipeline started
>>> 2) DAG\pipeline halted
>>> 3) DAG\pipeline failed
>>> 4) DAG\pipeline success.
>>>
>>> Can someone please help me with this?
>>>
>>>
>>> Thanks,
>>> Prasad
>>>
>>>

Re: Capture DAG states

Posted by Bruno Gonzalez <br...@homelight.com>.
Prasad, if you are using Airflow 2 you can use the new stable API. The
state of the DAG will be a combination of List DAG runs
<https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag_runs>
to
get the *dag_run_id* and Get a DAG run
<https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag_run>
to
get the *state*, that is the one that you're looking for.

 Hope this helps.

On Fri, Jun 11, 2021 at 11:50 AM Prasad Bhalerao <
prasadbhalerao1983@gmail.com> wrote:

> Let me rephrase my question.
>
> Is there anyway to capture DAG states?
>
> Based on these state I want to trigger the events.
>
>
>
> On Fri, 11 Jun 2021 at 8:12 PM, Ash Berlin-Taylor <as...@apache.org> wrote:
>
>> What do you mean by "halted"? That isn't the name of any state in Airflow.
>>
>> -ash
>>
>> On Fri, Jun 11 2021 at 18:50:51 +0530, Prasad Bhalerao <
>> prasadbhalerao1983@gmail.com> wrote:
>>
>> Hi,
>>
>> I want to capture the following events. I want to capture these states
>> and generate an event for each.
>> As part of this I want to capture DAG ID and DAG Name etc.
>>
>> 1) DAG\pipeline started
>> 2) DAG\pipeline halted
>> 3) DAG\pipeline failed
>> 4) DAG\pipeline success.
>>
>> Can someone please help me with this?
>>
>>
>> Thanks,
>> Prasad
>>
>>

Re: Capture DAG states

Posted by Prasad Bhalerao <pr...@gmail.com>.
Let me rephrase my question.

Is there anyway to capture DAG states?

Based on these state I want to trigger the events.



On Fri, 11 Jun 2021 at 8:12 PM, Ash Berlin-Taylor <as...@apache.org> wrote:

> What do you mean by "halted"? That isn't the name of any state in Airflow.
>
> -ash
>
> On Fri, Jun 11 2021 at 18:50:51 +0530, Prasad Bhalerao <
> prasadbhalerao1983@gmail.com> wrote:
>
> Hi,
>
> I want to capture the following events. I want to capture these states and
> generate an event for each.
> As part of this I want to capture DAG ID and DAG Name etc.
>
> 1) DAG\pipeline started
> 2) DAG\pipeline halted
> 3) DAG\pipeline failed
> 4) DAG\pipeline success.
>
> Can someone please help me with this?
>
>
> Thanks,
> Prasad
>
>

Re: Capture DAG states

Posted by Ash Berlin-Taylor <as...@apache.org>.
What do you mean by "halted"? That isn't the name of any state in 
Airflow.

-ash

On Fri, Jun 11 2021 at 18:50:51 +0530, Prasad Bhalerao 
<pr...@gmail.com> wrote:
> Hi,
> 
> I want to capture the following events. I want to capture these 
> states and generate an event for each.
> As part of this I want to capture DAG ID and DAG Name etc.
> 
> 1) DAG\pipeline started
> 2) DAG\pipeline halted
> 3) DAG\pipeline failed
> 4) DAG\pipeline success.
> 
> Can someone please help me with this?
> 
> 
> Thanks,
> Prasad