You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/23 06:24:36 UTC

[GitHub] [airflow] lionheart106008 opened a new issue #20475: Custom Timetable error with CeleryExecutor

lionheart106008 opened a new issue #20475:
URL: https://github.com/apache/airflow/issues/20475


   ### Apache Airflow version
   
   2.2.3 (latest released)
   
   ### What happened
   
   i'm really new to airflow, and i have an error when using Custom Timetable w/ CeleryExecutor
   
   ### What you expected to happen
   
   For the custom timetable to be implemented and used by DAG.
   
   error log as follows:
   
   ```
   [2021-12-23 05:44:30,843: WARNING/ForkPoolWorker-2] Running <TaskInstance: example_cron_trivial_dag.dummy scheduled__2021-12-23T05:44:00+00:00 [queued]> on host 310bbd362d25
   [2021-12-23 05:44:30,897: ERROR/ForkPoolWorker-2] Failed to execute task Not a valid timetable: <cron_trivial_timetable.CronTrivialTimetable object at 0x7fc15a0a4100>.
   Traceback (most recent call last):
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 121, in _execute_in_fork
       args.func(args)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
       return f(*args, **kwargs)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
       run_job.run()
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 78, in _execute
       self.task_runner = get_task_runner(self)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/task/task_runner/__init__.py", line 63, in get_task_runner
       task_runner = task_runner_class(local_task_job)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 35, in __init__
       super().__init__(local_task_job)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/task/task_runner/base_task_runner.py", line 48, in __init__
       super().__init__(local_task_job.task_instance)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 40, in __init__
       self._set_context(context)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 54, in _set_context
       set_context(self.log, context)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 178, in set_context
       handler.set_context(value)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 59, in set_context
       local_loc = self._init_file(ti)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 264, in _init_file
       relative_path = self._render_filename(ti, ti.try_number)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 80, in _render_filename
       context = ti.get_template_context()
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1912, in get_template_context
       'next_ds': get_next_ds(),
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1868, in get_next_ds
       execution_date = get_next_execution_date()
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1862, in get_next_execution_date
       next_execution_date = dag.following_schedule(self.execution_date)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/models/dag.py", line 595, in following_schedule
       data_interval = self.infer_automated_data_interval(timezone.coerce_datetime(dttm))
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/models/dag.py", line 678, in infer_automated_data_interval
       raise ValueError(f"Not a valid timetable: {self.timetable!r}")
   ValueError: Not a valid timetable: <cron_trivial_timetable.CronTrivialTimetable object at 0x7fc15a0a4100>
   [2021-12-23 05:44:30,936: ERROR/ForkPoolWorker-2] Task airflow.executors.celery_executor.execute_command[7c76904d-1b61-4441-b5f0-96ef2ba7c3b7] raised unexpected: AirflowException('Celery command failed on host: 310bbd362d25')
   Traceback (most recent call last):
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 451, in trace_task
       R = retval = fun(*args, **kwargs)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 734, in __protected_call__
       return self.run(*args, **kwargs)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 90, in execute_command
       _execute_in_fork(command_to_exec, celery_task_id)
     File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 101, in _execute_in_fork
       raise AirflowException('Celery command failed on host: ' + get_hostname())
   airflow.exceptions.AirflowException: Celery command failed on host: 310bbd362d25
   ```
   
   seems like the same problem mentioned here, but by different reasons that i really don't understand
   https://github.com/apache/airflow/issues/19578#issuecomment-974654209
   
   ### How to reproduce
   
   before 2.2.3 release, i'm having DAG import error w/ custom timetables, and since i only want Next Dag Run on UI be exact time, i will just change the code of CronDataIntervalTimetable by returning DataInterval.exact(end) in next_dagrun_info and infer_manual_data_interval.
   
   here's the new timetable plugin file that i created (simply copied and tweaked from CronDataIntervalTimetable):
   
   ```
   import datetime
   from typing import Any, Dict, Optional, Union
   
   from croniter import CroniterBadCronError, CroniterBadDateError, croniter
   from dateutil.relativedelta import relativedelta
   from pendulum import DateTime
   from pendulum.tz.timezone import Timezone
   
   from airflow.plugins_manager import AirflowPlugin
   from airflow.compat.functools import cached_property
   from airflow.exceptions import AirflowTimetableInvalid
   from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
   from airflow.utils.dates import cron_presets
   from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
   
   Delta = Union[datetime.timedelta, relativedelta]
   
   class _TrivialTimetable(Timetable):
       """Basis for timetable implementations that schedule data intervals.
   
       This kind of timetable classes create periodic data intervals (exact times) from an
       underlying schedule representation (e.g. a cron expression, or a timedelta
       instance), and schedule a DagRun at the end of each interval (exact time).
       """
   
       def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
           """Bound the earliest time a run can be scheduled.
   
           This is called when ``catchup=False``.
           """
           raise NotImplementedError()
   
       def _align(self, current: DateTime) -> DateTime:
           """Align given time to the scheduled.
   
           For fixed schedules (e.g. every midnight); this finds the next time that
           aligns to the declared time, if the given time does not align. If the
           schedule is not fixed (e.g. every hour), the given time is returned.
           """
           raise NotImplementedError()
   
       def _get_next(self, current: DateTime) -> DateTime:
           """Get the first schedule after the current time."""
           raise NotImplementedError()
   
       def _get_prev(self, current: DateTime) -> DateTime:
           """Get the last schedule before the current time."""
           raise NotImplementedError()
   
       def next_dagrun_info(
           self,
           *,
           last_automated_data_interval: Optional[DataInterval],
           restriction: TimeRestriction,
       ) -> Optional[DagRunInfo]:
           earliest = restriction.earliest
           if not restriction.catchup:
               earliest = self._skip_to_latest(earliest)
           if last_automated_data_interval is None:
               # First run; schedule the run at the first available time matching
               # the schedule, and retrospectively create a data interval for it.
               if earliest is None:
                   return None
               start = self._align(earliest)
           else:
               # There's a previous run. Create a data interval starting from when
               # the end of the previous interval.
               start = last_automated_data_interval.end
           if restriction.latest is not None and start > restriction.latest:
               return None
           end = self._get_next(start)
           return DagRunInfo.exact(end)
   
   def _is_schedule_fixed(expression: str) -> bool:
       """Figures out if the schedule has a fixed time (e.g. 3 AM every day).
   
       :return: True if the schedule has a fixed time, False if not.
   
       Detection is done by "peeking" the next two cron trigger time; if the
       two times have the same minute and hour value, the schedule is fixed,
       and we *don't* need to perform the DST fix.
   
       This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
       """
       cron = croniter(expression)
       next_a = cron.get_next(datetime.datetime)
       next_b = cron.get_next(datetime.datetime)
       return next_b.minute == next_a.minute and next_b.hour == next_a.hour
   
   class CronTrivialTimetable(_TrivialTimetable):
       """Timetable that schedules data intervals with a cron expression.
   
       This corresponds to ``schedule_interval=<cron>``, where ``<cron>`` is either
       a five/six-segment representation, or one of ``cron_presets``.
   
       The implementation extends on croniter to add timezone awareness. This is
       because crontier works only with naive timestamps, and cannot consider DST
       when determining the next/previous time.
   
       Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
       """
   
       def __init__(self, cron: str, timezone: Timezone) -> None:
           self._expression = cron_presets.get(cron, cron)
           self._timezone = timezone
   
       @classmethod
       def deserialize(cls, data: Dict[str, Any]) -> "Timetable":
           from airflow.serialization.serialized_objects import decode_timezone
   
           return cls(data["expression"], decode_timezone(data["timezone"]))
   
       def __eq__(self, other: Any) -> bool:
           """Both expression and timezone should match.
   
           This is only for testing purposes and should not be relied on otherwise.
           """
           if not isinstance(other, CronTrivialTimetable):
               return NotImplemented
           return self._expression == other._expression and self._timezone == other._timezone
   
       @property
       def summary(self) -> str:
           return self._expression
      
       def serialize(self) -> Dict[str, Any]:
           from airflow.serialization.serialized_objects import encode_timezone
   
           return {"expression": self._expression, "timezone": encode_timezone(self._timezone)}
      
       def validate(self) -> None:
           try:
               croniter(self._expression)
           except (CroniterBadCronError, CroniterBadDateError) as e:
               raise AirflowTimetableInvalid(str(e))
      
       @cached_property
       def _should_fix_dst(self) -> bool:
           # This is lazy so instantiating a schedule does not immediately raise
           # an exception. Validity is checked with validate() during DAG-bagging.
           return not _is_schedule_fixed(self._expression)
      
       def _get_next(self, current: DateTime) -> DateTime:
           """Get the first schedule after specified time, with DST fixed."""
           naive = make_naive(current, self._timezone)
           cron = croniter(self._expression, start_time=naive)
           scheduled = cron.get_next(datetime.datetime)
           if not self._should_fix_dst:
               return convert_to_utc(make_aware(scheduled, self._timezone))
           delta = scheduled - naive
           return convert_to_utc(current.in_timezone(self._timezone) + delta)
   
       def _get_prev(self, current: DateTime) -> DateTime:
           """Get the first schedule before specified time, with DST fixed."""
           naive = make_naive(current, self._timezone)
           cron = croniter(self._expression, start_time=naive)
           scheduled = cron.get_prev(datetime.datetime)
           if not self._should_fix_dst:
               return convert_to_utc(make_aware(scheduled, self._timezone))
           delta = naive - scheduled
           return convert_to_utc(current.in_timezone(self._timezone) - delta)
   
       def _align(self, current: DateTime) -> DateTime:
           """Get the next scheduled time.
   
           This is ``current + interval``, unless ``current`` is first interval,
           then ``current`` is returned.
           """
           next_time = self._get_next(current)
           if self._get_prev(next_time) != current:
               return next_time
           return current
   
       def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
           """Bound the earliest time a run can be scheduled.
   
           The logic is that we move start_date up until one period before, so the
           current time is AFTER the period end, and the job can be created...
           """
           current_time = DateTime.utcnow()
           next_start = self._get_next(current_time)
           last_start = self._get_prev(current_time)
           if next_start == current_time:
               new_start = last_start
           elif next_start > current_time:
               new_start = self._get_prev(last_start)
           else:
               raise AssertionError("next schedule shouldn't be earlier")
           if earliest is None:
               return new_start
           return max(new_start, earliest)
   
       def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
           # Get the last complete period before run_after, e.g. if a DAG run is
           # scheduled at each midnight, the data interval of a manually triggered
           # run at 1am 25th is between 0am 24th and 0am 25th.
           end = self._get_prev(self._align(run_after))
           return DataInterval.exact(end)
   
   class CronTrivialTimetablePlugin(AirflowPlugin):
       name = "cron_trivial_timetable_plugin"
       timetables = [CronTrivialTimetable]
   ```
   
   the dag file i'm using/testing:
   
   ```
   import pendulum
   from datetime import datetime, timedelta
   from typing import Dict
   
   from airflow.decorators import task
   from airflow.models import DAG
   from airflow.operators.bash import BashOperator
   from airflow.operators.dummy import DummyOperator
   
   from cron_trivial_timetable import CronTrivialTimetable
   
   with DAG(
       dag_id="example_cron_trivial_dag",
       start_date=datetime(2021,11,14,12,0,tzinfo=pendulum.timezone('Asia/Tokyo')),,
       max_active_runs=1,
       timetable=CronTrivialTimetable('*/2 * * * *', pendulum.timezone('Asia/Tokyo')),
       default_args={
           'owner': 'HashDash',
           'depends_on_past': False,
           'email': [**************],
           'email_on_failure': False,
           'email_on_retry': False,
           'retries': 0,
           'retry_delay': timedelta(seconds=1),
           'end_date': datetime(2101, 1, 1),
       },
       tags=['testing'],
       catchup=False
   ) as dag:
   
       dummy = BashOperator(task_id='dummy', queue='daybatch', bash_command="date")
   ```
   
   ### Operating System
   
   CentOS-7
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   my scheduler & workers are in different dockers, hence i'm using CeleryExecutor
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on issue #20475: Custom Timetable error with CeleryExecutor

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #20475:
URL: https://github.com/apache/airflow/issues/20475#issuecomment-1000668394


   It is still possible to generate a useful date for the timetable case, I think. This is more of a code path we did not correctly anticipate and thus not convert correctly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #20475: Custom Timetable error with CeleryExecutor

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #20475:
URL: https://github.com/apache/airflow/issues/20475#issuecomment-1000073232


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #20475: Custom Timetable error with CeleryExecutor

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #20475:
URL: https://github.com/apache/airflow/issues/20475#issuecomment-1000269635


   I looked at it and seems this is simply triggerered by `get_templatae_context()` in file_handker - it tries to retrieve all context (including the deprecated `next_ds` > `get_next_execution_date` -> `following_schedule` -> `infer_automated_data_interval`) and that triggers the exception. 
   
   I'd say we should simply set the `next_ds` to be some value (might be None possibly ) but we should not raise the exception, because this will always happen with any custom timetable not deriving from one of "our" built-in timetables.
   
   Did I get it right @uranusjr ?
   
   @lionheart106008 -> I think what would be good workaround for you to derive your timetable from `NullTimetable` - this should make the `next_ds` equal to `logical_date`. This likely makes little sense for your case, but `next_ds` field is deprecated so you should not use it anyway (and you don't) in your DAG - but at least it will not crate. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on issue #20475: Custom Timetable error with CeleryExecutor

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on issue #20475:
URL: https://github.com/apache/airflow/issues/20475#issuecomment-1000668394


   It is still possible to generate a useful date for the timetable case, I think. This is more of a code path we did not correctly anticipate and thus not convert correctly.
   
   Edit: More specifically, `get_next_execution_date` should not rely on `following_schedule`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on issue #20475: Custom Timetable error with CeleryExecutor

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on issue #20475:
URL: https://github.com/apache/airflow/issues/20475#issuecomment-1000269635


   I looked at it and seems this is simply triggerered by `get_templatae_context()` in file_handker - it tries to retrieve all context (including the deprecated `next_ds` > `get_next_execution_date` -> `following_schedule` -> `infer_automated_data_interval`) and that triggers the exception. 
   
   I'd say we should simply set the `next_ds` to be some value (might be None possibly ) but we should not raise the exception, because this will always happen with any custom timetable not deriving from one of "our" built-in timetables.
   
   Did I get it right @uranusjr ?
   
   @lionheart106008 -> I think what would be good workaround for you to derive your timetable from `NullTimetable` - this should make the `next_ds` equal to `logical_date`. This likely makes little sense for your case, but `next_ds` field is deprecated so you should not use it anyway (and you don't) in your DAG - but at least it will not crash. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lionheart106008 commented on issue #20475: Custom Timetable error with CeleryExecutor

Posted by GitBox <gi...@apache.org>.
lionheart106008 commented on issue #20475:
URL: https://github.com/apache/airflow/issues/20475#issuecomment-1000624076


   @potiuk 
   
   Thank you very much!
   The workaround works like a charm, although it's not really intuitive.
   
   I suppose I just leave this issue open?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed issue #20475: Custom Timetable error with CeleryExecutor

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #20475:
URL: https://github.com/apache/airflow/issues/20475


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on issue #20475: Custom Timetable error with CeleryExecutor

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on issue #20475:
URL: https://github.com/apache/airflow/issues/20475#issuecomment-1000269635


   I looked at it and seems this is simply triggerered by `get_templatae_context()` in file_handler - it tries to retrieve all context (including the deprecated `next_ds` > `get_next_execution_date` -> `following_schedule` -> `infer_automated_data_interval`) and that triggers the exception. 
   
   I'd say we should simply set the `next_ds` to be some value (might be None possibly ) but we should not raise the exception, because this will always happen with any custom timetable not deriving from one of "our" built-in timetables.
   
   Did I get it right @uranusjr ?
   
   @lionheart106008 -> I think what would be good workaround for you to derive your timetable from `NullTimetable` - this should make the `next_ds` equal to `logical_date`. This likely makes little sense for your case, but `next_ds` field is deprecated so you should not use it anyway (and you don't) in your DAG - but at least it will not crash. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org