You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/27 15:05:37 UTC
[GitHub] [airflow] stoiandl commented on issue #15543: Smart Sensor Kubernetes Airflow 2.0.1
stoiandl commented on issue #15543:
URL: https://github.com/apache/airflow/issues/15543#issuecomment-827679722
I figured out what is going on. I am checking if a file was modified in the last 60 seconds, but the SmartSensor does not care about my poke_interval and uses 180 seconds.
```python
class SmartSensorOperator(BaseOperator, SkipMixin):
ui_color = '#e6f1f2'
@apply_defaults
def __init__(
self,
poke_interval=180,
smart_sensor_timeout=60 * 60 * 24 * 7,
soft_fail=False,
shard_min=0,
shard_max=100000,
poke_timeout=6.0,
*args,
**kwargs,
):
```
Also, this is not changed at Smart Sensor DAG init:
```python
num_smart_sensor_shard = conf.getint("smart_sensor", "shards")
shard_code_upper_limit = conf.getint('smart_sensor', 'shard_code_upper_limit')
for i in range(num_smart_sensor_shard):
shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard
shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard
dag = DAG(
dag_id=dag_id,
default_args=args,
schedule_interval=timedelta(minutes=5),
concurrency=1,
max_active_runs=1,
catchup=False,
dagrun_timeout=timedelta(hours=24),
start_date=days_ago(2),
)
SmartSensorOperator(
task_id='smart_sensor_task',
dag=dag,
retries=100,
retry_delay=timedelta(seconds=10),
priority_weight=999,
shard_min=shard_min,
shard_max=shard_max,
poke_timeout=10,
smart_sensor_timeout=timedelta(hours=24).total_seconds(),
)
```
Can we add one more config option under smart_sensor in airflow.cfg called `poke_interval` which would take an `int` and will be in seconds.
Then do this:
```python
num_smart_sensor_shard = conf.getint("smart_sensor", "shards")
shard_code_upper_limit = conf.getint('smart_sensor', 'shard_code_upper_limit')
poke_interval = conf.getint('smart_sensor', 'poke_interval')
for i in range(num_smart_sensor_shard):
shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard
shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard
dag = DAG(
dag_id=dag_id,
default_args=args,
schedule_interval=timedelta(minutes=5),
concurrency=1,
max_active_runs=1,
catchup=False,
dagrun_timeout=timedelta(hours=24),
start_date=days_ago(2),
)
SmartSensorOperator(
task_id='smart_sensor_task',
dag=dag,
retries=100,
retry_delay=timedelta(seconds=10),
priority_weight=999,
shard_min=shard_min,
shard_max=shard_max,
poke_timeout=10,
smart_sensor_timeout=timedelta(hours=24).total_seconds(),
poke_interval=poke_interval,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org