You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/01 20:11:58 UTC
[GitHub] [airflow] seunggabi commented on pull request #28020: [#20815] DAG.default_args.catchup is not working...
seunggabi commented on PR #28020:
URL: https://github.com/apache/airflow/pull/28020#issuecomment-1368525855
I'm sorry about poor information.
Let me explain detail.
Actually, I focused `default_arg`, for common code.
We want to reuse `DAG` by `DAG`.
```
DEFAULT_ARGS = {
"_slack_channel": DefaultVariable.get("slack")["channel"],
"catchup": False,
"owner": "",
"depends_on_past": False,
"email": [],
"email_on_failure": True,
"email_on_retry": True,
"is_paused_upon_creation": False,
"retries": 1,
"max_active_runs": 1,
"retry_delay": timedelta(minutes=1),
}
```
So, I made `DefaultAirflowDAG`. (DAG.default_args.catchup is not working... #28015)
```python
from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from batch.core.default_variable import DefaultVariable
from batch.utils.log_utils import LogUtils
from batch.utils.slack_utils import SlackUtils
DEFAULT_ARGS = {
"_slack_channel": DefaultVariable.get("slack")["channel"],
"catchup": False,
"owner": "",
"depends_on_past": False,
"email": [],
"email_on_failure": True,
"email_on_retry": True,
"is_paused_upon_creation": False,
"retries": 1,
"max_active_runs": 1,
"retry_delay": timedelta(minutes=1),
}
def extends(default_args):
for k in DEFAULT_ARGS:
if k in default_args:
continue
default_args[k] = DEFAULT_ARGS[k]
return default_args
class DefaultAirflowDAG(DAG):
LOG = LogUtils.log()
ZFILL = 4
def __init__(
self,
dag_id,
schedule_interval=None,
description=None,
default_args=None,
history=None,
start_date_args=None,
**kwargs,
):
default_args = default_args or {}
if "start_date" not in default_args:
start_date_args = start_date_args or [2022, 1, 1]
default_args["start_date"] = datetime(
*start_date_args, tzinfo=pendulum.timezone("Asia/Seoul")
)
kwargs["default_args"] = extends(default_args)
kwargs["catchup"] = kwargs["default_args"]["catchup"]
slack = SlackUtils(kwargs["default_args"]["_slack_channel"], kwargs["default_args"]["owner"])
kwargs["on_failure_callback"] = slack.on_failure_callback
super().__init__(dag_id=dag_id, schedule_interval=schedule_interval, **kwargs)
self.doc_md = __doc__
self.doc_md = DefaultAirflowDAG.make_doc_md(
dag_id, description, schedule_interval, history
)
@staticmethod
def make_doc_md(dag_id, description, schedule_interval, history):
doc_md = f"""
=================================================================================================<br>
ID: {dag_id}<br>
-------------------------------------------------------------------------------------------------<br>
DESCRIPTION: {description}<br>
-------------------------------------------------------------------------------------------------<br>
SCHEDULE: {schedule_interval}<br>
-------------------------------------------------------------------------------------------------<br>
"""
if history is not None:
doc_md += """
NO | DATE | AUTH | COMMENT<br>
"""
for i, o in enumerate(history):
doc_md += f"""
-------------------------------------------------------------------------------------------------<br>
{str(i).zfill(DefaultAirflowDAG.ZFILL)} | {o.date} | {o.author} | {o.comment}<br>
"""
doc_md += """
=================================================================================================<br>
"""
return doc_md
class DagHistory:
def __init__(self, date, author, comment):
self.date = date
self.author = author
self.comment = comment
```
this is just idea.
But, I want to suggest to add this code in DAG.
That's why, making PR.
---
If we use default_args to reduce the common code of DAGs,
I think we can make a simpler DAG.
I know you're busy, but I ask for a positive review.
Thank you always.
Happy New Year~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org