You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Stefan Seelmann (JIRA)" <ji...@apache.org> on 2019/01/27 21:50:00 UTC

[jira] [Commented] (AIRFLOW-3590) In case of reschedule executor should not log success

    [ https://issues.apache.org/jira/browse/AIRFLOW-3590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16753603#comment-16753603 ] 

Stefan Seelmann commented on AIRFLOW-3590:
------------------------------------------

[~ashb] I digged deeper into that.

The state that is reported by the executor is not the actual task instance state. It is the result of running the executable "airflow run ...". When the exit code 0 it logs success, otherwise failed. In case of remote executors (Celery, Mesos, Kubernetes) it's similar, just the executor's specific state is translated to sucess or failed.

Another case where the "wrong" state is logged is when a task is skpped by raising AirflowSkipException:

{code}
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowSkipException
from datetime import datetime

def raise_airflow_skip_exception():
    raise AirflowSkipException()

dag = DAG(
    'skip',
    start_date=datetime(2019, 1, 1),
    schedule_interval=None)

with dag:
    skip = PythonOperator(
        task_id='skip',
        python_callable=raise_airflow_skip_exception)


[2019-01-27 21:05:01,966] {jobs.py:1455} INFO - Executor reports skip.skip execution_date=2019-01-27 21:04:47.115781+00:00 as success for try_number 1
{code}

The reason is clear because in {{TaskInstance._run_raw_task()}} the task instance state is set to skipped and the python process just exists with 0.

I'd not make the executors aware of the reschedule state. To me it seems excutors are dumb and don't know about task instances and it's states, except reusing some of the states to report results (exception seems to be the Kubernetes executor which also queries the database).

Maybe we should just change the log, e.g. from "Executor reports %s.%s execution_date=%s as %s for try_number %s" to "Executor reports execution of %s.%s execution_date=%s exited with status %s for try_number %s"?


> In case of reschedule executor should not log success
> -----------------------------------------------------
>
>                 Key: AIRFLOW-3590
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3590
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: executor
>            Reporter: Stefan Seelmann
>            Assignee: Stefan Seelmann
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Based on comment from [~ashb] https://github.com/apache/airflow/pull/3596#issuecomment-447590657
> The scheduler (when using SequentialExecutor, but that isn't relevant) logs this task as Success!
> {code}
> [2018-12-15 18:59:13,635] {jobs.py:1100} INFO - 1 tasks up for execution:
>      <TaskInstance: hello_world.wait 2018-12-15 18:50:00+00:00 [scheduled]>
> [2018-12-15 18:59:13,649] {jobs.py:1135} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
> [2018-12-15 18:59:13,656] {jobs.py:1171} INFO - DAG hello_world has 0/16 running and queued tasks
> [2018-12-15 18:59:13,656] {jobs.py:1209} INFO - Setting the follow tasks to queued state:
>      <TaskInstance: hello_world.wait 2018-12-15 18:50:00+00:00 [scheduled]>
> [2018-12-15 18:59:13,698] {jobs.py:1293} INFO - Setting the following 1 tasks to queued state:
>      <TaskInstance: hello_world.wait 2018-12-15 18:50:00+00:00 [queued]>
> [2018-12-15 18:59:13,699] {jobs.py:1335} INFO - Sending ('hello_world', 'wait', datetime.datetime(2018, 12, 15, 18, 50, tzinfo=<Timezone [UTC]>), 1) to executor with priority 2 and queue default
> [2018-12-15 18:59:13,701] {base_executor.py:56} INFO - Adding to queue: airflow run hello_world wait 2018-12-15T18:50:00+00:00 --local -sd /Users/ash/airflow/dags/foo.py
> [2018-12-15 18:59:13,742] {sequential_executor.py:45} INFO - Executing command: airflow run hello_world wait 2018-12-15T18:50:00+00:00 --local -sd /Users/ash/airflow/dags/foo.py
> [2018-12-15 18:59:15,558] {__init__.py:51} INFO - Using executor SequentialExecutor
> [2018-12-15 18:59:15,755] {models.py:273} INFO - Filling up the DagBag from /Users/ash/airflow/dags/foo.py
> [2018-12-15 18:59:15,833] {cli.py:530} INFO - Running <TaskInstance: hello_world.wait 2018-12-15T18:50:00+00:00 [queued]> on host themisto.localdomain
> [2018-12-15 18:59:21,427] {jobs.py:1439} INFO - Executor reports hello_world.wait execution_date=2018-12-15 18:50:00+00:00 as success for try_number 1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)