You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/27 02:58:23 UTC

[GitHub] [airflow] stoiandl opened a new issue #15543: Smart Sensor Kubernetes Airflow 2.0.1

stoiandl opened a new issue #15543:
URL: https://github.com/apache/airflow/issues/15543


   Apache Airflow version 2.0.1:
   
   Kubernetes version 1.18.14
   
   Environment: Azure - AKS
   
   What happened:
   I created a sensor:
   
   ```python
   from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor
   from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
   import datetime
   
   
   class ModifiedWasbBlobSensor(WasbBlobSensor):
       poke_context_fields = ('container_name', 'blob_name', 'delta_seconds', 'wasb_conn_id')
   
       def __init__(self, *, container_name: str, blob_name: str, delta_seconds: int, **kwargs):
           super().__init__(container_name=container_name, blob_name=blob_name, **kwargs)
           self.delta_seconds = delta_seconds
   
       def poke(self, context: dict):
           blob_exists = super().poke(context)
           if blob_exists:
               hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
               connection = hook.get_conn()
               blob = connection.get_blob_to_bytes(self.container_name, self.blob_name)
               last_modified = blob.properties.last_modified.astimezone(datetime.timezone.utc)
               # Ensure we use UTC timezone
               return last_modified + datetime.timedelta(seconds=self.delta_seconds) >= datetime.datetime.now(
                   last_modified.tzinfo)
           return False
   ```
   
   and a DAG
   
   ```python
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from dags.plugins import wasb_modified_sensor
   from dags.dependencies import utils
   
   connections, paths, schema, tables = utils.load_config()
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'start_date': days_ago(0),
       'retries': 0,
   }
   
   dag = DAG(
       'test_dag',
       default_args=default_args,
       description='Just a test tag',
       schedule_interval=None,
       tags=['test']
   )
   
   
   def say_hello() -> None:
       print('Hello there')
   
   
   task_sensor = wasb_modified_sensor.ModifiedWasbBlobSensor(
       task_id='sensor',
       container_name=paths['containers']['container_data']['name'],
       blob_name='test/hawaii.csv',
       wasb_conn_id=connections['azure_blob_data']['connection_id'],
       poke_interval=5,
       delta_seconds=60,
       dag=dag
   )
   
   task_say_hello = PythonOperator(
       task_id='say_hello',
       trigger_rule='all_success',
       python_callable=say_hello,
       op_args=[],
       op_kwargs={},
       dag=dag
   )
   
   task_sensor >> task_say_hello
   ```
   
   My airflow.cfg:
   
   ```cfg
   [core]
   colored_console_log = False
   dags_folder = {{.Values.dagsPath}}/repo/dags
   executor = KubernetesExecutor
   load_examples = False
   
   [smart_sensor]
   use_smart_sensor = True
   shard_code_upper_limit = 10000
   shards = 5
   sensors_enabled = ModifiedWasbBlobSensor
   
   ...
   ```
   
   I trigger my DAG and then I upload the file such that the poke will return True. The sensor task gets submitted to the sensor dags/service but it never returns. The DAG gets stuck in sensing phase.
   
   I am running this on kubernetes.
   
   `task_sensor` logs:
   ```log
   [2021-04-27 02:35:19,071] {taskinstance.py:1295} INFO - Submitting <TaskInstance: test_dag.sensor 2021-04-27T02:35:04.218394+00:00 [running]> to sensor service
   [2021-04-27 02:35:19,389] {taskinstance.py:1116} INFO - Task successfully registered in smart sensor.
   [2021-04-27 02:35:19,413] {local_task_job.py:146} INFO - Task exited with return code 0
   ```
   
   All my sensor dags have similar logs:
   ```log
   /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:336 DeprecationWarning: The colored_console_log option in [core] has been moved to the colored_console_log option in [logging] - the old setting has been used, but please update your config.
   /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:336 DeprecationWarning: The statsd_on option in [scheduler] has been moved to the statsd_on option in [metrics] - the old setting has been used, but please update your config.
   /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:336 DeprecationWarning: The statsd_host option in [scheduler] has been moved to the statsd_host option in [metrics] - the old setting has been used, but please update your config.
   /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:336 DeprecationWarning: The statsd_port option in [scheduler] has been moved to the statsd_port option in [metrics] - the old setting has been used, but please update your config.
   /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:336 DeprecationWarning: The statsd_prefix option in [scheduler] has been moved to the statsd_prefix option in [metrics] - the old setting has been used, but please update your config.
   [2021-04-27 02:34:11,420] {dagbag.py:448} INFO - Filling up the DagBag from /home/airflow/.local/lib/python3.8/site-packages/airflow/smart_sensor_dags/smart_sensor_group.py
   ```
   
   Scheduler logs
   ```log
   [2021-04-27 02:35:05,572] {scheduler_job.py:941} INFO - 1 tasks up for execution:
           <TaskInstance: test_dag.sensor 2021-04-27 02:35:04.218394+00:00 [scheduled]>
   [2021-04-27 02:35:05,580] {scheduler_job.py:970} INFO - Figuring out tasks to run in Pool(name=default_pool) with 122 open slots and 1 task instances ready to be queued
   [2021-04-27 02:35:05,580] {scheduler_job.py:998} INFO - DAG test_dag has 0/16 running and queued tasks
   [2021-04-27 02:35:05,580] {scheduler_job.py:1063} INFO - Setting the following tasks to queued state:
           <TaskInstance: test_dag.sensor 2021-04-27 02:35:04.218394+00:00 [scheduled]>
   [2021-04-27 02:35:05,583] {scheduler_job.py:1105} INFO - Sending TaskInstanceKey(dag_id='test_dag', task_id='sensor', execution_date=datetime.datetime(2021, 4, 27, 2, 35, 4, 218394, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 2 and queue default
   [2021-04-27 02:35:05,583] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_dag', 'sensor', '2021-04-27T02:35:04.218394+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/repo/dags/test_dag.py']
   [2021-04-27 02:35:05,600] {kubernetes_executor.py:492} INFO - Add task TaskInstanceKey(dag_id='test_dag', task_id='sensor', execution_date=datetime.datetime(2021, 4, 27, 2, 35, 4, 218394, tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run', 'test_dag', 'sensor', '2021-04-27T02:35:04.218394+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/repo/dags/test_dag.py'] with executor_config {}
   [2021-04-27 02:35:05,601] {kubernetes_executor.py:277} INFO - Kubernetes job is (TaskInstanceKey(dag_id='test_dag', task_id='sensor', execution_date=datetime.datetime(2021, 4, 27, 2, 35, 4, 218394, tzinfo=Timezone('UTC')), try_number=1), ['airflow', 'tasks', 'run', 'test_dag', 'sensor', '2021-04-27T02:35:04.218394+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/repo/dags/test_dag.py'], None, None)
   [2021-04-27 02:35:05,717] {kubernetes_executor.py:147} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 had an event of type ADDED
   [2021-04-27 02:35:05,717] {kubernetes_executor.py:202} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 Pending
   [2021-04-27 02:35:05,718] {scheduler_job.py:1199} INFO - Executor reports execution of test_dag.sensor execution_date=2021-04-27 02:35:04.218394+00:00 exited with status queued for try_number 1
   [2021-04-27 02:35:05,726] {scheduler_job.py:1226} INFO - Setting external_id for <TaskInstance: test_dag.sensor 2021-04-27 02:35:04.218394+00:00 [queued]> to 4615
   [2021-04-27 02:35:05,728] {kubernetes_executor.py:147} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 had an event of type MODIFIED
   [2021-04-27 02:35:05,728] {kubernetes_executor.py:202} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 Pending
   [2021-04-27 02:35:05,756] {kubernetes_executor.py:147} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 had an event of type MODIFIED
   [2021-04-27 02:35:05,757] {kubernetes_executor.py:202} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 Pending
   [2021-04-27 02:35:07,490] {kubernetes_executor.py:147} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 had an event of type MODIFIED
   [2021-04-27 02:35:07,490] {kubernetes_executor.py:202} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 Pending
   [2021-04-27 02:35:08,525] {kubernetes_executor.py:147} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 had an event of type MODIFIED
   [2021-04-27 02:35:08,526] {kubernetes_executor.py:210} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 is Running
   [2021-04-27 02:35:20,861] {kubernetes_executor.py:147} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 had an event of type MODIFIED
   [2021-04-27 02:35:20,861] {kubernetes_executor.py:207} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 Succeeded
   [2021-04-27 02:35:21,116] {kubernetes_executor.py:353} INFO - Attempting to finish pod; pod_id: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4; state: None; annotations: {'dag_id': 'test_dag', 'task_id': 'sensor', 'execution_date': '2021-04-27T02:35:04.218394+00:00', 'try_number': '1'}
   [2021-04-27 02:35:21,118] {kubernetes_executor.py:528} INFO - Changing state of (TaskInstanceKey(dag_id='test_dag', task_id='sensor', execution_date=datetime.datetime(2021, 4, 27, 2, 35, 4, 218394, tzinfo=tzlocal()), try_number=1), None, 'testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4', 'airflow', '10427317') to None
   [2021-04-27 02:35:21,137] {kubernetes_executor.py:147} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 had an event of type MODIFIED
   [2021-04-27 02:35:21,137] {kubernetes_executor.py:207} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 Succeeded
   [2021-04-27 02:35:21,140] {kubernetes_executor.py:577} INFO - Deleted pod: TaskInstanceKey(dag_id='test_dag', task_id='sensor', execution_date=datetime.datetime(2021, 4, 27, 2, 35, 4, 218394, tzinfo=tzlocal()), try_number=1) in namespace airflow
   [2021-04-27 02:35:21,141] {scheduler_job.py:1199} INFO - Executor reports execution of test_dag.sensor execution_date=2021-04-27 02:35:04.218394+00:00 exited with status None for try_number 1
   [2021-04-27 02:35:21,143] {kubernetes_executor.py:147} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 had an event of type DELETED
   [2021-04-27 02:35:21,144] {kubernetes_executor.py:207} INFO - Event: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4 Succeeded
   [2021-04-27 02:35:21,294] {kubernetes_executor.py:353} INFO - Attempting to finish pod; pod_id: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4; state: None; annotations: {'dag_id': 'test_dag', 'task_id': 'sensor', 'execution_date': '2021-04-27T02:35:04.218394+00:00', 'try_number': '1'}
   [2021-04-27 02:35:21,296] {kubernetes_executor.py:353} INFO - Attempting to finish pod; pod_id: testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4; state: None; annotations: {'dag_id': 'test_dag', 'task_id': 'sensor', 'execution_date': '2021-04-27T02:35:04.218394+00:00', 'try_number': '1'}
   [2021-04-27 02:35:21,297] {kubernetes_executor.py:528} INFO - Changing state of (TaskInstanceKey(dag_id='test_dag', task_id='sensor', execution_date=datetime.datetime(2021, 4, 27, 2, 35, 4, 218394, tzinfo=tzlocal()), try_number=1), None, 'testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4', 'airflow', '10427318') to None
   [2021-04-27 02:35:21,304] {kubernetes_executor.py:577} INFO - Deleted pod: TaskInstanceKey(dag_id='test_dag', task_id='sensor', execution_date=datetime.datetime(2021, 4, 27, 2, 35, 4, 218394, tzinfo=tzlocal()), try_number=1) in namespace airflow
   [2021-04-27 02:35:21,305] {kubernetes_executor.py:528} INFO - Changing state of (TaskInstanceKey(dag_id='test_dag', task_id='sensor', execution_date=datetime.datetime(2021, 4, 27, 2, 35, 4, 218394, tzinfo=tzlocal()), try_number=1), None, 'testdagsensor.c91c2da85abc43a9b0f3c968b07f7af4', 'airflow', '10427320') to None
   [2021-04-27 02:35:21,311] {kubernetes_executor.py:577} INFO - Deleted pod: TaskInstanceKey(dag_id='test_dag', task_id='sensor', execution_date=datetime.datetime(2021, 4, 27, 2, 35, 4, 218394, tzinfo=tzlocal()), try_number=1) in namespace airflow
   [2021-04-27 02:35:21,312] {scheduler_job.py:1199} INFO - Executor reports execution of test_dag.sensor execution_date=2021-04-27 02:35:04.218394+00:00 exited with status None for try_number 1
   [2021-04-27 02:38:59,159] {scheduler_job.py:1834} INFO - Resetting orphaned tasks for active dag runs
   [2021-04-27 02:38:59,163] {scheduler_job.py:1856} INFO - Marked 1 SchedulerJob instances as failed
   [2021-04-27 02:43:59,307] {scheduler_job.py:1834} INFO - Resetting orphaned tasks for active dag runs
   [2021-04-27 02:48:59,472] {scheduler_job.py:1834} INFO - Resetting orphaned tasks for active dag runs
   [2021-04-27 02:53:59,612] {scheduler_job.py:1834} INFO - Resetting orphaned tasks for active dag runs
   ```
   
   Is the smart sensor supposed to work with Kubernetes Executor?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] stoiandl commented on issue #15543: Smart Sensor Kubernetes Airflow 2.0.1

Posted by GitBox <gi...@apache.org>.
stoiandl commented on issue #15543:
URL: https://github.com/apache/airflow/issues/15543#issuecomment-827679722


   I figured out what is going on. I am checking if a file was modified in the last 60 seconds, but the SmartSensor does not care about my poke_interval and uses 180 seconds.
   ```python
   class SmartSensorOperator(BaseOperator, SkipMixin):
       ui_color = '#e6f1f2'
   
       @apply_defaults
       def __init__(
           self,
           poke_interval=180,
           smart_sensor_timeout=60 * 60 * 24 * 7,
           soft_fail=False,
           shard_min=0,
           shard_max=100000,
           poke_timeout=6.0,
           *args,
           **kwargs,
       ):
   ```
   
   Also, this is not changed at Smart Sensor DAG init:
   ```python
   num_smart_sensor_shard = conf.getint("smart_sensor", "shards")
   shard_code_upper_limit = conf.getint('smart_sensor', 'shard_code_upper_limit')
   
   for i in range(num_smart_sensor_shard):
       shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard
       shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard
   
   dag = DAG(
       dag_id=dag_id,
       default_args=args,
       schedule_interval=timedelta(minutes=5),
       concurrency=1,
       max_active_runs=1,
       catchup=False,
       dagrun_timeout=timedelta(hours=24),
       start_date=days_ago(2),
   )
   
   SmartSensorOperator(
       task_id='smart_sensor_task',
       dag=dag,
       retries=100,
       retry_delay=timedelta(seconds=10),
       priority_weight=999,
       shard_min=shard_min,
       shard_max=shard_max,
       poke_timeout=10,
       smart_sensor_timeout=timedelta(hours=24).total_seconds(),
   )
   ```
   
   Can we add one more config option under smart_sensor in airflow.cfg called `poke_interval` which would take an `int` and will be in seconds.
   
   Then do this:
   ```python
   num_smart_sensor_shard = conf.getint("smart_sensor", "shards")
   shard_code_upper_limit = conf.getint('smart_sensor', 'shard_code_upper_limit')
   poke_interval = conf.getint('smart_sensor', 'poke_interval')
   
   for i in range(num_smart_sensor_shard):
       shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard
       shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard
   
   dag = DAG(
       dag_id=dag_id,
       default_args=args,
       schedule_interval=timedelta(minutes=5),
       concurrency=1,
       max_active_runs=1,
       catchup=False,
       dagrun_timeout=timedelta(hours=24),
       start_date=days_ago(2),
   )
   
   SmartSensorOperator(
       task_id='smart_sensor_task',
       dag=dag,
       retries=100,
       retry_delay=timedelta(seconds=10),
       priority_weight=999,
       shard_min=shard_min,
       shard_max=shard_max,
       poke_timeout=10,
       smart_sensor_timeout=timedelta(hours=24).total_seconds(),
       poke_interval=poke_interval,
   )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on issue #15543: Smart Sensor Kubernetes Airflow 2.0.1

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #15543:
URL: https://github.com/apache/airflow/issues/15543#issuecomment-1013868773


   Since Smart Sensors is now deprecated I'm not sure if it's worth to investigating this issue?
   WDYT @xinbinhuang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on issue #15543: Smart Sensor Kubernetes Airflow 2.0.1

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on issue #15543:
URL: https://github.com/apache/airflow/issues/15543#issuecomment-828172059


   Will take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang edited a comment on issue #15543: Smart Sensor Kubernetes Airflow 2.0.1

Posted by GitBox <gi...@apache.org>.
xinbinhuang edited a comment on issue #15543:
URL: https://github.com/apache/airflow/issues/15543#issuecomment-828172059


   I will take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org