You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/06 05:27:31 UTC
[GitHub] [airflow] MartinKChen commented on issue #19043: Shard of SmartSensor keep terminating itself
MartinKChen commented on issue #19043:
URL: https://github.com/apache/airflow/issues/19043#issuecomment-1006295682
@xiaochong2dai I did not fix the issue by modifying Airflow's source code as we are using MWAA (Amazon Managed Workflow for Apache Airflow), which make it impossible to reach the code.
Alternatively, a workaround for the issue is create our own operator and DAGs for shard group.
1. Create an operator that inherit from SmartSensorOperator, which used by Shards, and overwrite the "execute" function by comment out the code causes Shard fail. Following the sample code:
```
# import base class
# Airflow
from airflow.sensors.smart_sensor import SmartSensorOperator
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.net import get_hostname
from airflow.utils.decorators import apply_defaults
# (General) import modules/classes for processing
from time import sleep
class MySmartSensorOperator(SmartSensorOperator):
@apply_defaults
def __init__(
self,
poke_interval=60, # overwrite the poke interval by our own requirement, which was not allowed by default
smart_sensor_timeout=60 * 60 * 24 * 7,
soft_fail=False,
shard_min=0,
shard_max=100000,
poke_timeout=6.0,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.poke_interval = poke_interval
self.soft_fail = soft_fail
self.timeout = smart_sensor_timeout
self._validate_input_values()
self.hostname = ''
self.sensor_works = []
self.cached_dedup_works = {}
self.cached_sensor_exceptions = {}
self.max_tis_per_query = 50
self.shard_min = shard_min
self.shard_max = shard_max
self.poke_timeout = poke_timeout
def execute(self, context):
# started_at = timezone.utcnow()
self.hostname = get_hostname()
while True:
poke_start_time = timezone.utcnow()
self.flush_cached_sensor_poke_results()
self._load_sensor_works()
self.log.info('Loaded %s sensor_works', len(self.sensor_works))
Stats.gauge('smart_sensor_operator.loaded_tasks', len(self.sensor_works))
for sensor_work in self.sensor_works:
self._execute_sensor_work(sensor_work)
duration = (timezone.utcnow() - poke_start_time).total_seconds()
self.log.info('Taking %s to execute %s tasks.', duration, len(self.sensor_works))
Stats.timing('smart_sensor_operator.loop_duration', duration)
Stats.gauge('smart_sensor_operator.executed_tasks', len(self.sensor_works))
self._emit_loop_stats()
if duration < self.poke_interval:
sleep(self.poke_interval - duration)
# comment out the code that causes shard fail
# if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
# self.log.info('Time is out for smart sensor.')
# return
```
2. Create Shard DAGs that uses the operator above. Which copied from "airflow-2.0.2\airflow\smart_sensor_dags\smart_sensor_group.py" with different Operator.
```
"""Smart sensor DAGs managing all smart sensor tasks."""
from datetime import timedelta
from airflow.configuration import conf
from airflow.models import DAG
from airflow.utils.dates import days_ago
from operators.mec_smartsensor_operator import MySmartSensorOperator
default_args = {
'owner': 'admin',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'start_date': days_ago(1),
'catchup': False,
}
schedule_interval = timedelta(minutes=5)
num_smart_sensor_shard = conf.getint('smart_sensor', 'shards')
shard_code_upper_limit = conf.getint('smart_sensor', 'shard_code_upper_limit')
for i in range(num_smart_sensor_shard):
shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard
shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard
dag_id=f'admin_my_smartsensor_{i}'
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval=schedule_interval,
concurrency=1,
max_active_runs=1,
dagrun_timeout=timedelta(hours=24),
)
MySmartSensorOperator(
task_id='smart_sensor_task',
dag=dag,
retries=100,
retry_delay=timedelta(seconds=10),
priority_weight=999,
shard_min=shard_min,
shard_max=shard_max,
poke_timeout=10,
smart_sensor_timeout=timedelta(hours=1).total_seconds(),
)
globals()[dag_id] = dag
```
There is a downside for the solution that smart_sensor_group_shard_* created by Airflow sill been created automatically as they are generated by Airflow itself, we just keep it suspended, and enable our own DAGs instead, which may cause confusion when operating.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org