You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/09 17:11:02 UTC
[GitHub] [airflow] BasPH opened a new issue #21471: Scheduled DAG runs fail using Timetable in Airflow 2.2.3
BasPH opened a new issue #21471:
URL: https://github.com/apache/airflow/issues/21471
### Apache Airflow version
2.2.3 (latest released)
### What happened
Was testing custom Timetables and found that scheduled task instances fail in Airflow 2.2.3, but succeed prior to 2.2.3.
Observed behaviour:
- Have a DAG with custom timetable and `catchup=True`
- First tried my own Timetable, ended up copy-pasting the [example Timetable in the docs](https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html), but either way all scheduled task instances fail on me.
- Manually triggered runs work fine.
- DAG run info (start/end date) is accurate
- But task instances default to failed, without running
- No errors in any logfile
- See screenshot, all failed TIs are scheduled, the succeeded TI was manual. Hovering over the DAG run shows the correct start/end datetime, but the TIs are empty.
- In the UI, TIs go from queued -> failed. No logs are displayed, there are also no logs files for those TIs on disk.
- In the DB, the scheduled TIs have no start_date, the manual TIs do have a start_date.
![image](https://user-images.githubusercontent.com/6249654/153249070-f871442d-f93a-47bd-95f8-4bf3d6088fc2.png)
### What you expected to happen
I expect a custom timetable to work for scheduled intervals.
### How to reproduce
Run the Timetable in the docs on Airflow 2.2.3:
```python
from datetime import timedelta
from typing import Optional
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from pendulum import Date, DateTime, Time, timezone
UTC = timezone("UTC")
class AfterWorkdayTimetable(Timetable):
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
weekday = run_after.weekday()
if weekday in (0, 6): # Monday and Sunday -- interval is last Friday.
days_since_friday = (run_after.weekday() - 4) % 7
delta = timedelta(days=days_since_friday)
else: # Otherwise the interval is yesterday.
delta = timedelta(days=1)
start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC)
return DataInterval(start=start, end=(start + timedelta(days=1)))
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
last_start_weekday = last_start.weekday()
if 0 <= last_start_weekday < 4: # Last run on Monday through Thursday -- next is tomorrow.
delta = timedelta(days=1)
else: # Last run on Friday -- skip to next Monday.
delta = timedelta(days=(7 - last_start_weekday))
next_start = DateTime.combine((last_start + delta).date(), Time.min).replace(tzinfo=UTC)
else: # This is the first ever run on the regular schedule.
next_start = restriction.earliest
if next_start is None: # No start_date. Don't schedule.
return None
if not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
elif next_start.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_day = next_start.date() + timedelta(days=1)
next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC)
next_start_weekday = next_start.weekday()
if next_start_weekday in (5, 6): # If next start is in the weekend, go to next Monday.
delta = timedelta(days=(7 - next_start_weekday))
next_start = next_start + delta
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))
class WorkdayTimetablePlugin(AirflowPlugin):
name = "workday_timetable_plugin"
timetables = [AfterWorkdayTimetable]
```
DAG:
```python
import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from after_workday_timetable import AfterWorkdayTimetable
def print_interval_info(data_interval_start, data_interval_end, **context):
print("===================================================")
print(f"START = {data_interval_start} ({data_interval_start.format('dddd')})")
print(f"END = {data_interval_end} ({data_interval_end.format('dddd')})")
print(context)
print("===================================================")
with DAG(
dag_id="timetable_demo",
start_date=datetime.datetime(2022, 1, 1),
timetable=AfterWorkdayTimetable(),
catchup=True,
) as dag:
test = PythonOperator(task_id="test", python_callable=print_interval_info)
```
### Operating System
Debian Bullseye
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==2.4.0
apache-airflow-providers-celery==2.1.0
apache-airflow-providers-cncf-kubernetes==2.2.0
apache-airflow-providers-ftp==2.0.1
apache-airflow-providers-http==2.0.1
apache-airflow-providers-imap==2.0.1
apache-airflow-providers-postgres==2.4.0
apache-airflow-providers-redis==2.0.1
apache-airflow-providers-sqlite==2.0.1
### Deployment
Docker-Compose
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org