You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/01 20:11:58 UTC

[GitHub] [airflow] seunggabi commented on pull request #28020: [#20815] DAG.default_args.catchup is not working...

seunggabi commented on PR #28020:
URL: https://github.com/apache/airflow/pull/28020#issuecomment-1368525855

   I'm sorry about poor information.
   
   Let me explain detail.
   
   Actually, I focused `default_arg`, for common code.
   We want to reuse `DAG` by `DAG`.
   ```
   DEFAULT_ARGS = {
       "_slack_channel": DefaultVariable.get("slack")["channel"],
   
       "catchup": False,
   
       "owner": "",
       "depends_on_past": False,
       "email": [],
       "email_on_failure": True,
       "email_on_retry": True,
       "is_paused_upon_creation": False,
       "retries": 1,
       "max_active_runs": 1,
       "retry_delay": timedelta(minutes=1),
   }
   ```
   
   So, I made `DefaultAirflowDAG`. (DAG.default_args.catchup is not working... #28015)
   
   ```python
   from datetime import datetime, timedelta
   
   import pendulum
   from airflow import DAG
   
   from batch.core.default_variable import DefaultVariable
   from batch.utils.log_utils import LogUtils
   from batch.utils.slack_utils import SlackUtils
   
   DEFAULT_ARGS = {
       "_slack_channel": DefaultVariable.get("slack")["channel"],
   
       "catchup": False,
   
       "owner": "",
       "depends_on_past": False,
       "email": [],
       "email_on_failure": True,
       "email_on_retry": True,
       "is_paused_upon_creation": False,
       "retries": 1,
       "max_active_runs": 1,
       "retry_delay": timedelta(minutes=1),
   }
   
   
   def extends(default_args):
       for k in DEFAULT_ARGS:
           if k in default_args:
               continue
   
           default_args[k] = DEFAULT_ARGS[k]
   
       return default_args
   
   
   class DefaultAirflowDAG(DAG):
       LOG = LogUtils.log()
       ZFILL = 4
   
       def __init__(
               self,
               dag_id,
               schedule_interval=None,
               description=None,
               default_args=None,
               history=None,
               start_date_args=None,
               **kwargs,
       ):
           default_args = default_args or {}
   
           if "start_date" not in default_args:
               start_date_args = start_date_args or [2022, 1, 1]
   
               default_args["start_date"] = datetime(
                   *start_date_args, tzinfo=pendulum.timezone("Asia/Seoul")
               )
   
           kwargs["default_args"] = extends(default_args)
           kwargs["catchup"] = kwargs["default_args"]["catchup"]
   
           slack = SlackUtils(kwargs["default_args"]["_slack_channel"], kwargs["default_args"]["owner"])
           kwargs["on_failure_callback"] = slack.on_failure_callback
   
           super().__init__(dag_id=dag_id, schedule_interval=schedule_interval, **kwargs)
   
           self.doc_md = __doc__
           self.doc_md = DefaultAirflowDAG.make_doc_md(
               dag_id, description, schedule_interval, history
           )
   
       @staticmethod
       def make_doc_md(dag_id, description, schedule_interval, history):
           doc_md = f"""
   =================================================================================================<br>
   ID: {dag_id}<br>
   -------------------------------------------------------------------------------------------------<br>
   DESCRIPTION: {description}<br>
   -------------------------------------------------------------------------------------------------<br>
   SCHEDULE: {schedule_interval}<br>
   -------------------------------------------------------------------------------------------------<br>
   """
           if history is not None:
               doc_md += """
   NO | DATE | AUTH | COMMENT<br>
   """
               for i, o in enumerate(history):
                   doc_md += f"""
   -------------------------------------------------------------------------------------------------<br>
   {str(i).zfill(DefaultAirflowDAG.ZFILL)} | {o.date} | {o.author} | {o.comment}<br>
   """
           doc_md += """
   =================================================================================================<br>
   """
           return doc_md
   
   
   class DagHistory:
       def __init__(self, date, author, comment):
           self.date = date
           self.author = author
           self.comment = comment
   
   ```
   
   this is just idea.
   But, I want to suggest to add this code in DAG.
   That's why, making PR.
   
   ---
   
   If we use default_args to reduce the common code of DAGs,
   I think we can make a simpler DAG.
   
   I know you're busy, but I ask for a positive review.
   Thank you always.
   Happy New Year~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org