You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/27 15:05:37 UTC

[GitHub] [airflow] stoiandl commented on issue #15543: Smart Sensor Kubernetes Airflow 2.0.1

stoiandl commented on issue #15543:
URL: https://github.com/apache/airflow/issues/15543#issuecomment-827679722


   I figured out what is going on. I am checking if a file was modified in the last 60 seconds, but the SmartSensor does not care about my poke_interval and uses 180 seconds.
   ```python
   class SmartSensorOperator(BaseOperator, SkipMixin):
       ui_color = '#e6f1f2'
   
       @apply_defaults
       def __init__(
           self,
           poke_interval=180,
           smart_sensor_timeout=60 * 60 * 24 * 7,
           soft_fail=False,
           shard_min=0,
           shard_max=100000,
           poke_timeout=6.0,
           *args,
           **kwargs,
       ):
   ```
   
   Also, this is not changed at Smart Sensor DAG init:
   ```python
   num_smart_sensor_shard = conf.getint("smart_sensor", "shards")
   shard_code_upper_limit = conf.getint('smart_sensor', 'shard_code_upper_limit')
   
   for i in range(num_smart_sensor_shard):
       shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard
       shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard
   
   dag = DAG(
       dag_id=dag_id,
       default_args=args,
       schedule_interval=timedelta(minutes=5),
       concurrency=1,
       max_active_runs=1,
       catchup=False,
       dagrun_timeout=timedelta(hours=24),
       start_date=days_ago(2),
   )
   
   SmartSensorOperator(
       task_id='smart_sensor_task',
       dag=dag,
       retries=100,
       retry_delay=timedelta(seconds=10),
       priority_weight=999,
       shard_min=shard_min,
       shard_max=shard_max,
       poke_timeout=10,
       smart_sensor_timeout=timedelta(hours=24).total_seconds(),
   )
   ```
   
   Can we add one more config option under smart_sensor in airflow.cfg called `poke_interval` which would take an `int` and will be in seconds.
   
   Then do this:
   ```python
   num_smart_sensor_shard = conf.getint("smart_sensor", "shards")
   shard_code_upper_limit = conf.getint('smart_sensor', 'shard_code_upper_limit')
   poke_interval = conf.getint('smart_sensor', 'poke_interval')
   
   for i in range(num_smart_sensor_shard):
       shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard
       shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard
   
   dag = DAG(
       dag_id=dag_id,
       default_args=args,
       schedule_interval=timedelta(minutes=5),
       concurrency=1,
       max_active_runs=1,
       catchup=False,
       dagrun_timeout=timedelta(hours=24),
       start_date=days_ago(2),
   )
   
   SmartSensorOperator(
       task_id='smart_sensor_task',
       dag=dag,
       retries=100,
       retry_delay=timedelta(seconds=10),
       priority_weight=999,
       shard_min=shard_min,
       shard_max=shard_max,
       poke_timeout=10,
       smart_sensor_timeout=timedelta(hours=24).total_seconds(),
       poke_interval=poke_interval,
   )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org