You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/09 17:11:02 UTC

[GitHub] [airflow] BasPH opened a new issue #21471: Scheduled DAG runs fail using Timetable in Airflow 2.2.3

BasPH opened a new issue #21471:
URL: https://github.com/apache/airflow/issues/21471


   ### Apache Airflow version
   
   2.2.3 (latest released)
   
   ### What happened
   
   Was testing custom Timetables and found that scheduled task instances fail in Airflow 2.2.3, but succeed prior to 2.2.3.
   
   Observed behaviour:
   
   - Have a DAG with custom timetable and `catchup=True`
   - First tried my own Timetable, ended up copy-pasting the [example Timetable in the docs](https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html), but either way all scheduled task instances fail on me.
   - Manually triggered runs work fine.
   - DAG run info (start/end date) is accurate
   - But task instances default to failed, without running
   - No errors in any logfile
   - See screenshot, all failed TIs are scheduled, the succeeded TI was manual. Hovering over the DAG run shows the correct start/end datetime, but the TIs are empty.
   - In the UI, TIs go from queued -> failed. No logs are displayed, there are also no logs files for those TIs on disk.
   - In the DB, the scheduled TIs have no start_date, the manual TIs do have a start_date.
   
   ![image](https://user-images.githubusercontent.com/6249654/153249070-f871442d-f93a-47bd-95f8-4bf3d6088fc2.png)
   
   
   ### What you expected to happen
   
   I expect a custom timetable to work for scheduled intervals.
   
   ### How to reproduce
   
   Run the Timetable in the docs on Airflow 2.2.3:
   
   ```python
   from datetime import timedelta
   from typing import Optional
   
   from airflow.plugins_manager import AirflowPlugin
   from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
   from pendulum import Date, DateTime, Time, timezone
   
   UTC = timezone("UTC")
   
   
   class AfterWorkdayTimetable(Timetable):
       def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
           weekday = run_after.weekday()
           if weekday in (0, 6):  # Monday and Sunday -- interval is last Friday.
               days_since_friday = (run_after.weekday() - 4) % 7
               delta = timedelta(days=days_since_friday)
           else:  # Otherwise the interval is yesterday.
               delta = timedelta(days=1)
           start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC)
           return DataInterval(start=start, end=(start + timedelta(days=1)))
   
       def next_dagrun_info(
           self,
           *,
           last_automated_data_interval: Optional[DataInterval],
           restriction: TimeRestriction,
       ) -> Optional[DagRunInfo]:
           if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
               last_start = last_automated_data_interval.start
               last_start_weekday = last_start.weekday()
               if 0 <= last_start_weekday < 4:  # Last run on Monday through Thursday -- next is tomorrow.
                   delta = timedelta(days=1)
               else:  # Last run on Friday -- skip to next Monday.
                   delta = timedelta(days=(7 - last_start_weekday))
               next_start = DateTime.combine((last_start + delta).date(), Time.min).replace(tzinfo=UTC)
           else:  # This is the first ever run on the regular schedule.
               next_start = restriction.earliest
               if next_start is None:  # No start_date. Don't schedule.
                   return None
               if not restriction.catchup:
                   # If the DAG has catchup=False, today is the earliest to consider.
                   next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
               elif next_start.time() != Time.min:
                   # If earliest does not fall on midnight, skip to the next day.
                   next_day = next_start.date() + timedelta(days=1)
                   next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC)
               next_start_weekday = next_start.weekday()
               if next_start_weekday in (5, 6):  # If next start is in the weekend, go to next Monday.
                   delta = timedelta(days=(7 - next_start_weekday))
                   next_start = next_start + delta
           if restriction.latest is not None and next_start > restriction.latest:
               return None  # Over the DAG's scheduled end; don't schedule.
           return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))
   
   
   class WorkdayTimetablePlugin(AirflowPlugin):
       name = "workday_timetable_plugin"
       timetables = [AfterWorkdayTimetable]
   ```
   
   DAG:
   ```python
   import datetime
   
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   
   from after_workday_timetable import AfterWorkdayTimetable
   
   
   def print_interval_info(data_interval_start, data_interval_end, **context):
       print("===================================================")
       print(f"START = {data_interval_start} ({data_interval_start.format('dddd')})")
       print(f"END = {data_interval_end} ({data_interval_end.format('dddd')})")
       print(context)
       print("===================================================")
   
   
   with DAG(
       dag_id="timetable_demo",
       start_date=datetime.datetime(2022, 1, 1),
       timetable=AfterWorkdayTimetable(),
       catchup=True,
   ) as dag:
       test = PythonOperator(task_id="test", python_callable=print_interval_info)
   ```
   
   ### Operating System
   
   Debian Bullseye
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==2.4.0
   apache-airflow-providers-celery==2.1.0
   apache-airflow-providers-cncf-kubernetes==2.2.0
   apache-airflow-providers-ftp==2.0.1
   apache-airflow-providers-http==2.0.1
   apache-airflow-providers-imap==2.0.1
   apache-airflow-providers-postgres==2.4.0
   apache-airflow-providers-redis==2.0.1
   apache-airflow-providers-sqlite==2.0.1
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org