You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Mubin Khalid (JIRA)" <ji...@apache.org> on 2017/04/26 19:47:04 UTC

[jira] [Comment Edited] (AIRFLOW-1147) airflow scheduler not working

    [ https://issues.apache.org/jira/browse/AIRFLOW-1147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985453#comment-15985453 ] 

Mubin Khalid edited comment on AIRFLOW-1147 at 4/26/17 7:46 PM:
----------------------------------------------------------------

[~dxhuang], yes, did test it with both toggle it on from UI and also from cli.
{code}airflow unpause DAGID{code}

here is 5 seconds log window

{code}
[2017-04-26 05:51:30,876] {jobs.py:343} DagFileProcessor2 INFO - Started process (PID=9434) to work on /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:30,880] {jobs.py:1521} DagFileProcessor2 INFO - Processing file /airflow/dags/etl_elastic/StandardizeDataDag.py for tasks to queue
[2017-04-26 05:51:30,880] {models.py:167} DagFileProcessor2 INFO - Filling up the DagBag from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:31,067] {jobs.py:1535} DagFileProcessor2 INFO - DAG(s) dict_keys(['StandardizeDataDag']) retrieved from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:31,094] {jobs.py:1169} DagFileProcessor2 INFO - Processing StandardizeDataDag
[2017-04-26 05:51:31,104] {jobs.py:566} DagFileProcessor2 INFO - Skipping SLA check for <DAG: StandardizeDataDag> because no tasks in DAG have SLAs
/anaconda3/lib/python3.5/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)
[2017-04-26 05:51:31,112] {models.py:322} DagFileProcessor2 INFO - Finding 'running' jobs without a recent heartbeat
[2017-04-26 05:51:31,113] {models.py:328} DagFileProcessor2 INFO - Failing jobs without heartbeat after 2017-04-26 05:46:31.113179
[2017-04-26 05:51:31,118] {jobs.py:351} DagFileProcessor2 INFO - Processing /airflow/dags/etl_elastic/StandardizeDataDag.py took 0.243 seconds
[2017-04-26 05:51:32,925] {jobs.py:343} DagFileProcessor5 INFO - Started process (PID=9441) to work on /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:32,929] {jobs.py:1521} DagFileProcessor5 INFO - Processing file /airflow/dags/etl_elastic/StandardizeDataDag.py for tasks to queue
[2017-04-26 05:51:32,930] {models.py:167} DagFileProcessor5 INFO - Filling up the DagBag from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:33,119] {jobs.py:1535} DagFileProcessor5 INFO - DAG(s) dict_keys(['StandardizeDataDag']) retrieved from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:33,145] {jobs.py:1169} DagFileProcessor5 INFO - Processing StandardizeDataDag
[2017-04-26 05:51:33,155] {jobs.py:566} DagFileProcessor5 INFO - Skipping SLA check for <DAG: StandardizeDataDag> because no tasks in DAG have SLAs
/anaconda3/lib/python3.5/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)
[2017-04-26 05:51:33,164] {models.py:322} DagFileProcessor5 INFO - Finding 'running' jobs without a recent heartbeat
[2017-04-26 05:51:33,164] {models.py:328} DagFileProcessor5 INFO - Failing jobs without heartbeat after 2017-04-26 05:46:33.164884
[2017-04-26 05:51:33,170] {jobs.py:351} DagFileProcessor5 INFO - Processing /airflow/dags/etl_elastic/StandardizeDataDag.py took 0.245 seconds
[2017-04-26 05:51:34,971] {jobs.py:343} DagFileProcessor8 INFO - Started process (PID=9447) to work on /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:34,975] {jobs.py:1521} DagFileProcessor8 INFO - Processing file /airflow/dags/etl_elastic/StandardizeDataDag.py for tasks to queue
[2017-04-26 05:51:34,975] {models.py:167} DagFileProcessor8 INFO - Filling up the DagBag from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:35,150] {jobs.py:1535} DagFileProcessor8 INFO - DAG(s) dict_keys(['StandardizeDataDag']) retrieved from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:35,178] {jobs.py:1169} DagFileProcessor8 INFO - Processing StandardizeDataDag
[2017-04-26 05:51:35,187] {jobs.py:566} DagFileProcessor8 INFO - Skipping SLA check for <DAG: StandardizeDataDag> because no tasks in DAG have SLAs
/anaconda3/lib/python3.5/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)
[2017-04-26 05:51:35,196] {models.py:322} DagFileProcessor8 INFO - Finding 'running' jobs without a recent heartbeat
[2017-04-26 05:51:35,197] {models.py:328} DagFileProcessor8 INFO - Failing jobs without heartbeat after 2017-04-26 05:46:35.197177
[2017-04-26 05:51:35,201] {jobs.py:351} DagFileProcessor8 INFO - Processing /airflow/dags/etl_elastic/StandardizeDataDag.py took 0.231 seconds
[2017-04-26 05:51:37,033] {jobs.py:343} DagFileProcessor11 INFO - Started process (PID=9453) to work on /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:37,036] {jobs.py:1521} DagFileProcessor11 INFO - Processing file /airflow/dags/etl_elastic/StandardizeDataDag.py for tasks to queue
[2017-04-26 05:51:37,037] {models.py:167} DagFileProcessor11 INFO - Filling up the DagBag from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:37,195] {jobs.py:1535} DagFileProcessor11 INFO - DAG(s) dict_keys(['StandardizeDataDag']) retrieved from /airflow/dags/etl_elastic/StandardizeDataDag.py
{code}


was (Author: mubin):
yes, did test it with both toggle it on from UI and also from cli.
{code}airflow unpause DAGID{code}

here is 5 seconds log window

{code}
[2017-04-26 05:51:30,876] {jobs.py:343} DagFileProcessor2 INFO - Started process (PID=9434) to work on /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:30,880] {jobs.py:1521} DagFileProcessor2 INFO - Processing file /airflow/dags/etl_elastic/StandardizeDataDag.py for tasks to queue
[2017-04-26 05:51:30,880] {models.py:167} DagFileProcessor2 INFO - Filling up the DagBag from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:31,067] {jobs.py:1535} DagFileProcessor2 INFO - DAG(s) dict_keys(['StandardizeDataDag']) retrieved from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:31,094] {jobs.py:1169} DagFileProcessor2 INFO - Processing StandardizeDataDag
[2017-04-26 05:51:31,104] {jobs.py:566} DagFileProcessor2 INFO - Skipping SLA check for <DAG: StandardizeDataDag> because no tasks in DAG have SLAs
/anaconda3/lib/python3.5/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)
[2017-04-26 05:51:31,112] {models.py:322} DagFileProcessor2 INFO - Finding 'running' jobs without a recent heartbeat
[2017-04-26 05:51:31,113] {models.py:328} DagFileProcessor2 INFO - Failing jobs without heartbeat after 2017-04-26 05:46:31.113179
[2017-04-26 05:51:31,118] {jobs.py:351} DagFileProcessor2 INFO - Processing /airflow/dags/etl_elastic/StandardizeDataDag.py took 0.243 seconds
[2017-04-26 05:51:32,925] {jobs.py:343} DagFileProcessor5 INFO - Started process (PID=9441) to work on /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:32,929] {jobs.py:1521} DagFileProcessor5 INFO - Processing file /airflow/dags/etl_elastic/StandardizeDataDag.py for tasks to queue
[2017-04-26 05:51:32,930] {models.py:167} DagFileProcessor5 INFO - Filling up the DagBag from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:33,119] {jobs.py:1535} DagFileProcessor5 INFO - DAG(s) dict_keys(['StandardizeDataDag']) retrieved from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:33,145] {jobs.py:1169} DagFileProcessor5 INFO - Processing StandardizeDataDag
[2017-04-26 05:51:33,155] {jobs.py:566} DagFileProcessor5 INFO - Skipping SLA check for <DAG: StandardizeDataDag> because no tasks in DAG have SLAs
/anaconda3/lib/python3.5/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)
[2017-04-26 05:51:33,164] {models.py:322} DagFileProcessor5 INFO - Finding 'running' jobs without a recent heartbeat
[2017-04-26 05:51:33,164] {models.py:328} DagFileProcessor5 INFO - Failing jobs without heartbeat after 2017-04-26 05:46:33.164884
[2017-04-26 05:51:33,170] {jobs.py:351} DagFileProcessor5 INFO - Processing /airflow/dags/etl_elastic/StandardizeDataDag.py took 0.245 seconds
[2017-04-26 05:51:34,971] {jobs.py:343} DagFileProcessor8 INFO - Started process (PID=9447) to work on /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:34,975] {jobs.py:1521} DagFileProcessor8 INFO - Processing file /airflow/dags/etl_elastic/StandardizeDataDag.py for tasks to queue
[2017-04-26 05:51:34,975] {models.py:167} DagFileProcessor8 INFO - Filling up the DagBag from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:35,150] {jobs.py:1535} DagFileProcessor8 INFO - DAG(s) dict_keys(['StandardizeDataDag']) retrieved from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:35,178] {jobs.py:1169} DagFileProcessor8 INFO - Processing StandardizeDataDag
[2017-04-26 05:51:35,187] {jobs.py:566} DagFileProcessor8 INFO - Skipping SLA check for <DAG: StandardizeDataDag> because no tasks in DAG have SLAs
/anaconda3/lib/python3.5/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)
[2017-04-26 05:51:35,196] {models.py:322} DagFileProcessor8 INFO - Finding 'running' jobs without a recent heartbeat
[2017-04-26 05:51:35,197] {models.py:328} DagFileProcessor8 INFO - Failing jobs without heartbeat after 2017-04-26 05:46:35.197177
[2017-04-26 05:51:35,201] {jobs.py:351} DagFileProcessor8 INFO - Processing /airflow/dags/etl_elastic/StandardizeDataDag.py took 0.231 seconds
[2017-04-26 05:51:37,033] {jobs.py:343} DagFileProcessor11 INFO - Started process (PID=9453) to work on /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:37,036] {jobs.py:1521} DagFileProcessor11 INFO - Processing file /airflow/dags/etl_elastic/StandardizeDataDag.py for tasks to queue
[2017-04-26 05:51:37,037] {models.py:167} DagFileProcessor11 INFO - Filling up the DagBag from /airflow/dags/etl_elastic/StandardizeDataDag.py
[2017-04-26 05:51:37,195] {jobs.py:1535} DagFileProcessor11 INFO - DAG(s) dict_keys(['StandardizeDataDag']) retrieved from /airflow/dags/etl_elastic/StandardizeDataDag.py
{code}

> airflow scheduler not working
> -----------------------------
>
>                 Key: AIRFLOW-1147
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1147
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: Airflow 1.8
>         Environment: CentOS running on 128 GB ram
>            Reporter: Mubin Khalid
>            Priority: Critical
>              Labels: documentation, newbie
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> I've created some `DAG`s, and I tried to put it on scheduler. I want to run all the tasks in the DAG after exact 24 hours.
> I tried to do something like this.
> {code}
>     DEFAULT_ARGS        = {
>     'owner'           : 'mubin',
>     'depends_on_past' : False,
>     'start_date'      : datetime(2017, 4, 24, 14, 30),
>     'retries'         : 5,
>     'retry_delay'     : timedetla(1),
>     }
>     SCHEDULE_INTERVAL      = timedelta(minutes=1440)
>     # SCHEDULE_INTERVAL    = timedelta(hours=24)
>     # SCHEDULE_INTERVAL    = timedelta(days=1)
>     dag = DAG('StandardizeDataDag',
>         default_args       = DEFAULT_ARGS,
>         schedule_interval  = SCHEDULE_INTERVAL
>     )
>  {code}   
> I tried to put different intervals, but not any working. However if I try to reset db  {code} airflow resetdb -y {code}  and then run  {code} airflow initdb {code} , it works for once. then after that, scheduler isn't able to run it.
> PS.  {code} airflow scheduler {code}  executed from  {code} root {code} 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)