You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Jarek Potiuk (Jira)" <ji...@apache.org> on 2020/01/19 23:37:07 UTC

[jira] [Closed] (AIRFLOW-104) State of `ExternalTaskSensor` task when the external task is marked as "failure"

     [ https://issues.apache.org/jira/browse/AIRFLOW-104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jarek Potiuk closed AIRFLOW-104.
--------------------------------
    Resolution: Won't Fix

I am closing some old issues that are not relevant any more. Please let me know if you want to reopen it.

> State of `ExternalTaskSensor` task when the external task is marked as "failure"
> --------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-104
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-104
>             Project: Apache Airflow
>          Issue Type: Improvement
>            Reporter: Sabeer Zaman
>            Priority: Minor
>
> Dear Airflow Maintainers,
> Before I tell you about my issue, let me describe my environment:
> h3. Environment
> * Version of Airflow: v1.6.2
> * Airflow components and configuration: Running with CeleryExecutor (separate docker containers running webserver, worker, rabbitmq and mysql db)
> * Operating System: {{Darwin Kernel Version 15.3.0: Thu Dec 10 18:40:58 PST 2015; root:xnu-3248.30.4~1/RELEASE_X86_64 x86_64}}
> * Python Version: 2.7.6
> Now that you know a little about me, let me tell you about the issue I am having:
> h3. Description of Issue
> I created two DAGs - let's call them {{dag_a}} and {{dag_b}}. One of the tasks in {{dag_b}} is an {{ExternalTaskSensor}} referencing a task with {{task_id="external_task"}} in {{dag_a}}. So the code looked as shown below:
> {code}
> # in DAG definition for "dag_a"
> # ... imports, boilerplate setup - e.g., defining `default_args`
> dag = DAG(dag_id="dag_a", default_args=default_args, schedule_interval="0 0 * * *",)
> external_task = DummyOperator(
>   task_id="external_task",
>   dag=dag,
> )
> {code}
> {code}
> # in DAG definition for "dag_b"
> # ... imports, boilerplate setup - e.g., defining `default_args`
> dag = DAG(dag_id="dag_b", default_args=default_args, schedule_interval="0 0 * * *",)
> task_sensor = ExternalTaskSensor(
>   task_id="dag_a.external_task",
>   external_dag_id="dag_a",
>   external_task_id="external_task",
>   dag=dag,
> )
> {code}
> To test failure behavior, I marked the task with {{task_id="external_task"}} in {{dag_a}} as "failed" (for a particular execution date). I then ran the backfill for the _same execution date_ for {{dag_b}}.
> * What did you expect to happen?
> ** I expected the task named {{"dag_a.external_task"}} in {{dag_b}} to be marked either as {{failed}} or {{upstream_failed}}, since the actual task it was referencing in {{dag_a}} failed.
> * What happened instead?
> ** The log for the task {{"dag_a.external_task"}} in {{dag_b}} showed that it kept poking {{external_task}} in {{dag_a}} every minute
> h3. Requested Change 
> Looking at the logic in the [{{poke}} function for the {{ExternalTaskSensor}}|https://github.com/airbnb/airflow/blob/1.7.0/airflow/operators/sensors.py#L178-L200], it's evident that it's acting as a regular Airflow Sensor and just waiting until something becomes true, and is in no way coupling the state of the current task with the state of the external task.
> That being said, is it reasonable to request such behavior (i.e., the {{ExternalTaskSensor}}'s state is set to failed if the task it's waiting on is marked as {{failed}})? I'd be willing to take a stab at adding the logic, but I'd like to make sure that this is in line with how this Sensor's intended behavior, or if there's a suggested alternative way of achieving this behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)