You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/06 05:27:52 UTC

[GitHub] [airflow] MartinKChen edited a comment on issue #19043: Shard of SmartSensor keep terminating itself

MartinKChen edited a comment on issue #19043:
URL: https://github.com/apache/airflow/issues/19043#issuecomment-1006295682


   @xiaochong2dai I did not fix the issue by modifying Airflow's source code as we are using MWAA (Amazon Managed Workflow for Apache Airflow), which make it impossible to reach the code.
   
   Alternatively, a workaround for the issue is create our own operator and DAGs for shard group.
   
   1. Create an operator that inherit from SmartSensorOperator, which used by Shards, and overwrite the "execute" function by comment out the code causes Shard fail. Following the sample code:
   ```
   # import base class
   # Airflow
   from airflow.sensors.smart_sensor import SmartSensorOperator
   from airflow.stats import Stats
   from airflow.utils import timezone
   from airflow.utils.net import get_hostname
   from airflow.utils.decorators import apply_defaults
   
   # (General) import modules/classes for processing
   from time import sleep
   
   class MySmartSensorOperator(SmartSensorOperator):
       @apply_defaults
       def __init__(
           self,
           poke_interval=60, # overwrite the poke interval by our own requirement, which was not allowed by default
           smart_sensor_timeout=60 * 60 * 24 * 7,
           soft_fail=False,
           shard_min=0,
           shard_max=100000,
           poke_timeout=6.0,
           *args,
           **kwargs,
       ):
           super().__init__(*args, **kwargs)
   
       def execute(self, context):
           # started_at = timezone.utcnow()
   
           self.hostname = get_hostname()
           
           while True:
               poke_start_time = timezone.utcnow()
   
               self.flush_cached_sensor_poke_results()
   
               self._load_sensor_works()
               self.log.info('Loaded %s sensor_works', len(self.sensor_works))
               Stats.gauge('smart_sensor_operator.loaded_tasks', len(self.sensor_works))
   
               for sensor_work in self.sensor_works:
                   self._execute_sensor_work(sensor_work)
   
               duration = (timezone.utcnow() - poke_start_time).total_seconds()
   
               self.log.info('Taking %s to execute %s tasks.', duration, len(self.sensor_works))
   
               Stats.timing('smart_sensor_operator.loop_duration', duration)
               Stats.gauge('smart_sensor_operator.executed_tasks', len(self.sensor_works))
               self._emit_loop_stats()
   
               if duration < self.poke_interval:
                   sleep(self.poke_interval - duration)
               
               # comment out the code that causes shard fail
               # if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
               #     self.log.info('Time is out for smart sensor.')
               #     return
   ```
   2. Create Shard DAGs that uses the operator above. Which copied from "airflow-2.0.2\airflow\smart_sensor_dags\smart_sensor_group.py" with different Operator.
   ```
   """Smart sensor DAGs managing all smart sensor tasks."""
   from datetime import timedelta
   
   from airflow.configuration import conf
   from airflow.models import DAG
   from airflow.utils.dates import days_ago
   
   
   from operators.mec_smartsensor_operator import MySmartSensorOperator
   
   default_args = {
       'owner': 'admin',
       'depends_on_past': False,
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 0,
       'start_date': days_ago(1),
       'catchup': False,
   }
   
   schedule_interval = timedelta(minutes=5)
   
   num_smart_sensor_shard = conf.getint('smart_sensor', 'shards')
   shard_code_upper_limit = conf.getint('smart_sensor', 'shard_code_upper_limit')
   
   for i in range(num_smart_sensor_shard):
       shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard
       shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard
       
       dag_id=f'admin_my_smartsensor_{i}'
       
       dag = DAG(
           dag_id=dag_id,
           default_args=default_args,
           schedule_interval=schedule_interval,
           concurrency=1,
           max_active_runs=1,
           dagrun_timeout=timedelta(hours=24),
       )
       
       MySmartSensorOperator(
           task_id='smart_sensor_task',
           dag=dag,
           retries=100,
           retry_delay=timedelta(seconds=10),
           priority_weight=999,
           shard_min=shard_min,
           shard_max=shard_max,
           poke_timeout=10,
           smart_sensor_timeout=timedelta(hours=1).total_seconds(),
       )
   
       globals()[dag_id] = dag
   ```
   
   
   There is a downside for the solution that smart_sensor_group_shard_* created by Airflow sill been created automatically as they are generated by Airflow itself, we just keep it suspended, and enable our own DAGs instead, which may cause confusion when operating.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org