You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/14 07:32:13 UTC

[GitHub] [airflow] Shadowsong27 commented on issue #10194: Ability to better support odd scheduling time

Shadowsong27 commented on issue #10194:
URL: https://github.com/apache/airflow/issues/10194#issuecomment-673933756


   I have just hacked some workaround since I really need this feature, it may be a bit raw but it's usable, using the first approach I mentioned above, with some abstractions. Basically I create a function that takes in any Airflow Operator, and returned a children of it, opening up an additional argument to skip dates. 
   
   Code here:
   
   ```py
   def flexify_operator(operator: Any) -> Any:
   
       class FlexibleOperator(operator):
   
           @apply_defaults
           def __init__(
                   self,
                   allowed_dates: Tuple = (),
                   *args,
                   **kwargs,
           ):
               super().__init__(*args, **kwargs)
               self.allowed_dates = allowed_dates
   
           def execute(self, context):
               execution_date = context.get('ds')
               if self.allowed_dates:
   
                   if self.is_skipping_task(execution_date):
                       self.log.warning('Skipping the task since the date is not found in the allowed dates. ')
                       return
   
               # no allowed dates passed, or not skippingtreat as normal operator
               super().execute(context)
   
           def is_skipping_task(self, execution_date: str) -> bool:
   
               date_obj = dt.datetime.strptime(execution_date, '%Y-%m-%d')
   
               if date_obj.day not in self.allowed_dates:
                   return True
   
               return False
   
       return FlexibleOperator
   
   ```
   
   usage in DAG file:
   
   ```py
   from airflow.operators.postgres_operator import PostgresOperator
   PostgresOperator = utils.flexify_operator(PostgresOperator)
   
   PostgresOperator(
       task_id=f"test",
       postgres_conn_id="conn_id",
       sql="SELECT * FROM public.test_demo",
       dag=dag,
       allowed_dates=(12, 13, 14)
   )
   ```
   
   So apparently this works only at day granularity, but it can be easily hacked to fulfill your needs, hope this helps.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org