You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/07 16:59:47 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #5499: [AIRFLOW-3964][AIP-17] Build smart sensor

kaxil commented on a change in pull request #5499:
URL: https://github.com/apache/airflow/pull/5499#discussion_r467100430



##########
File path: airflow/config_templates/config.yml
##########
@@ -2331,3 +2331,35 @@
     to identify the task.
     Should be supplied in the format: ``key = value``
   options: []
+- name: smart_sensor
+  description: ~
+  options:
+    - name: use_smart_sensor
+      description: |
+        When use smart sensor, redirect multiple qualified sensor tasks to smart sensor task

Review comment:
       ```suggestion
           When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to smart sensor task
   ```

##########
File path: airflow/models/sensorinstance.py
##########
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+
+from sqlalchemy import BigInteger, Column, Index, Integer, String, Text
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException
+from airflow.models.base import ID_LEN, Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+from airflow.utils.state import State
+
+
+class SensorInstance(Base):
+    """
+    SensorInstance support the smart sensor service. It stores the sensor task states
+    and context that required for poking include poke context and execution context.
+    In sensor_instance table we also save the sensor operator classpath so that inside
+    smart sensor there is no need to import the dagbag and create task object for each
+    sensor task.
+
+    SensorInstance include another set of columns to support the smart sensor shard on
+    large number of sensor instance. By hashcode generated from poke_contex and shardcode
+    the distributed smart sensor processes can work on different shards.
+
+    """
+
+    __tablename__ = "sensor_instance"
+
+    id = Column(Integer, primary_key=True)
+    task_id = Column(String(ID_LEN), nullable=False)
+    dag_id = Column(String(ID_LEN), nullable=False)
+    execution_date = Column(UtcDateTime, nullable=False)
+    state = Column(String(20))
+    _try_number = Column('try_number', Integer, default=0)
+    start_date = Column(UtcDateTime)
+    operator = Column(String(1000), nullable=False)
+    op_classpath = Column(String(1000), nullable=False)
+    hashcode = Column(BigInteger, nullable=False)
+    shardcode = Column(Integer, nullable=False)
+    poke_context = Column(Text, nullable=False)
+    execution_context = Column(Text)
+    created_at = Column(UtcDateTime, default=timezone.utcnow(), nullable=False)
+    updated_at = Column(UtcDateTime,
+                        default=timezone.utcnow(),
+                        onupdate=timezone.utcnow(),
+                        nullable=False)
+
+    __table_args__ = (
+        Index('ti_primary_key', dag_id, task_id, execution_date, unique=True),
+
+        Index('si_hashcode', hashcode),
+        Index('si_shardcode', shardcode),
+        Index('si_state_shard', state, shardcode),
+        Index('si_updated_at', updated_at),
+    )
+
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+
+    @staticmethod
+    def get_classpath(obj):
+        """
+        Get the object dotted class path. Used for getting operator classpath
+        :param obj:

Review comment:
       Needs a blank space between description and params for Sphinx rendering
   
   ```suggestion
   
           :param obj:
   ```

##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -66,6 +68,8 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
     """
     ui_color = '#e6f1f2'  # type: str
     valid_modes = ['poke', 'reschedule']  # type: Iterable[str]
+    execution_fields = ('poke_interval', 'retries', 'execution_timeout', 'timeout',

Review comment:
       Can we add a comment on what should go in `execution_fields`

##########
File path: airflow/migrations/versions/e38be357a868_update_schema_for_smart_sensor.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add sensor_instance table
+
+Revision ID: e38be357a868
+Revises: 939bb1e647c8
+Create Date: 2019-06-07 04:03:17.003939
+
+"""
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import func
+
+# revision identifiers, used by Alembic.
+revision = 'e38be357a868'
+down_revision = '8f966b9c467a'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():  # noqa: D103
+    op.create_table(
+        'sensor_instance',
+        sa.Column('id', sa.Integer(), nullable=False),
+        sa.Column('task_id', sa.String(length=250), nullable=False),
+        sa.Column('dag_id', sa.String(length=250), nullable=False),
+        sa.Column('execution_date', sa.TIMESTAMP(timezone=True), nullable=False),
+        sa.Column('state', sa.String(length=20), nullable=True),
+        sa.Column('try_number', sa.Integer(), nullable=True),
+        sa.Column('start_date', sa.TIMESTAMP(timezone=True), nullable=True),
+        sa.Column('operator', sa.String(length=1000), nullable=False),
+        sa.Column('op_classpath', sa.String(length=1000), nullable=False),
+        sa.Column('hashcode', sa.BigInteger(), nullable=False),
+        sa.Column('shardcode', sa.Integer(), nullable=False),
+        sa.Column('poke_context', sa.Text(), nullable=False),
+        sa.Column('execution_context', sa.Text(), nullable=True),
+        sa.Column('created_at', sa.TIMESTAMP(timezone=True), default=func.now(), nullable=False),
+        sa.Column('updated_at', sa.TIMESTAMP(timezone=True), default=func.now(), nullable=False),

Review comment:
       Any reason for having the defaults here? (Just curious)

##########
File path: airflow/models/taskinstance.py
##########
@@ -280,7 +281,8 @@ def try_number(self):
         database, in all other cases this will be incremented.
         """
         # This is designed so that task logs end up in the right file.
-        if self.state == State.RUNNING:
+        # TODO: whether we need sensing here or not (in sensor and task_instance state machine)
+        if self.state in State.running():

Review comment:
       Just to confirm here, the state of SENSING just means the "Sensor DAG" has this task, or does it mean this TI is currently being Poked ?

##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -106,6 +113,62 @@ def poke(self, context: Dict) -> bool:
         """
         raise AirflowException('Override me.')
 
+    def is_smart_sensor_compatible(self):
+        check_list = [not self.sensor_service_enabled,
+                      self.on_success_callback,
+                      self.on_retry_callback,
+                      self.on_failure_callback]
+        for status in check_list:
+            if status:
+                return False
+
+        operator = self.__class__.__name__
+        return operator in self.sensors_support_sensor_service
+
+    def register_in_sensor_service(self, ti, context):
+        """
+        Register ti in smart sensor service
+        :param ti:
+        :param context:
+        :return: boolean
+        """
+        poke_context = self.get_poke_context(context)
+        execution_context = self.get_execution_context(context)
+
+        return SensorInstance.register(ti, poke_context, execution_context)
+
+    def get_poke_context(self, context):
+        """
+        Return a dictionary with all attributes in poke_context_fields. The
+        poke_context with operator class can be used to identify a unique
+        sensor job.
+        :param context:

Review comment:
       Here too

##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -106,6 +113,62 @@ def poke(self, context: Dict) -> bool:
         """
         raise AirflowException('Override me.')
 
+    def is_smart_sensor_compatible(self):
+        check_list = [not self.sensor_service_enabled,
+                      self.on_success_callback,
+                      self.on_retry_callback,
+                      self.on_failure_callback]
+        for status in check_list:
+            if status:
+                return False
+
+        operator = self.__class__.__name__
+        return operator in self.sensors_support_sensor_service
+
+    def register_in_sensor_service(self, ti, context):
+        """
+        Register ti in smart sensor service
+        :param ti:
+        :param context:
+        :return: boolean
+        """
+        poke_context = self.get_poke_context(context)
+        execution_context = self.get_execution_context(context)
+
+        return SensorInstance.register(ti, poke_context, execution_context)
+
+    def get_poke_context(self, context):
+        """
+        Return a dictionary with all attributes in poke_context_fields. The
+        poke_context with operator class can be used to identify a unique
+        sensor job.
+        :param context:
+        :return:
+        """
+        if not context:
+            self.log.info("Function get_poke_context doesn't have a context input.")
+
+        poke_context_fields = getattr(self.__class__, "poke_context_fields", None)
+        result = {key: getattr(self, key, None) for key in poke_context_fields}
+        return result
+
+    def get_execution_context(self, context):
+        """
+        Return a dictionary with all attributes in execution_fields. The
+        execution_context include execution requirement for each sensor task
+        such as timeout setup, email_alert setup.
+        :param context:

Review comment:
       Same here

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),

Review comment:
       Wil this fail if users have configured a different log-template-id

##########
File path: airflow/smart_sensor_dags/smart_sensor_group.py
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+""" Smart sensor DAGs managing all smart sensor tasks """
+
+from builtins import range
+from datetime import timedelta
+
+from airflow.configuration import conf
+from airflow.models import DAG
+from airflow.sensors.smart_sensor_operator import SmartSensorOperator
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+    'start_date': days_ago(2),

Review comment:
       ```suggestion
   ```

##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -71,15 +71,25 @@ def close(self):
 
     def _render_filename(self, ti, try_number):
         if self.filename_jinja_template:
-            jinja_context = ti.get_template_context()
-            jinja_context['try_number'] = try_number
+            if hasattr(ti, 'task'):
+                jinja_context = ti.get_template_context()
+                jinja_context['try_number'] = try_number
+            else:
+                jinja_context = {
+                    'ti': ti,
+                    'ts': ti.execution_date.isoformat(),
+                    'try_number': try_number,
+                }
             return self.filename_jinja_template.render(**jinja_context)
 
         return self.filename_template.format(dag_id=ti.dag_id,
                                              task_id=ti.task_id,
                                              execution_date=ti.execution_date.isoformat(),
                                              try_number=try_number)
 
+    def _read_grouped_logs(self):
+        return False

Review comment:
       Is it supposed to return `False` always ?

##########
File path: airflow/models/sensorinstance.py
##########
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+
+from sqlalchemy import BigInteger, Column, Index, Integer, String, Text
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException
+from airflow.models.base import ID_LEN, Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+from airflow.utils.state import State
+
+
+class SensorInstance(Base):
+    """
+    SensorInstance support the smart sensor service. It stores the sensor task states
+    and context that required for poking include poke context and execution context.
+    In sensor_instance table we also save the sensor operator classpath so that inside
+    smart sensor there is no need to import the dagbag and create task object for each
+    sensor task.
+
+    SensorInstance include another set of columns to support the smart sensor shard on
+    large number of sensor instance. By hashcode generated from poke_contex and shardcode
+    the distributed smart sensor processes can work on different shards.
+
+    """
+
+    __tablename__ = "sensor_instance"
+
+    id = Column(Integer, primary_key=True)
+    task_id = Column(String(ID_LEN), nullable=False)
+    dag_id = Column(String(ID_LEN), nullable=False)
+    execution_date = Column(UtcDateTime, nullable=False)
+    state = Column(String(20))
+    _try_number = Column('try_number', Integer, default=0)
+    start_date = Column(UtcDateTime)
+    operator = Column(String(1000), nullable=False)
+    op_classpath = Column(String(1000), nullable=False)
+    hashcode = Column(BigInteger, nullable=False)
+    shardcode = Column(Integer, nullable=False)
+    poke_context = Column(Text, nullable=False)
+    execution_context = Column(Text)
+    created_at = Column(UtcDateTime, default=timezone.utcnow(), nullable=False)
+    updated_at = Column(UtcDateTime,
+                        default=timezone.utcnow(),
+                        onupdate=timezone.utcnow(),
+                        nullable=False)
+
+    __table_args__ = (
+        Index('ti_primary_key', dag_id, task_id, execution_date, unique=True),
+
+        Index('si_hashcode', hashcode),
+        Index('si_shardcode', shardcode),
+        Index('si_state_shard', state, shardcode),
+        Index('si_updated_at', updated_at),
+    )
+
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+
+    @staticmethod
+    def get_classpath(obj):
+        """
+        Get the object dotted class path. Used for getting operator classpath
+        :param obj:
+        :return: string
+        """
+        module_name, class_name = obj.__module__, obj.__class__.__name__
+
+        if '.' not in module_name:
+            # Fix if module name was broken by airflow importer

Review comment:
       When can this happen?

##########
File path: airflow/models/sensorinstance.py
##########
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+
+from sqlalchemy import BigInteger, Column, Index, Integer, String, Text
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException
+from airflow.models.base import ID_LEN, Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+from airflow.utils.state import State
+
+
+class SensorInstance(Base):
+    """
+    SensorInstance support the smart sensor service. It stores the sensor task states
+    and context that required for poking include poke context and execution context.
+    In sensor_instance table we also save the sensor operator classpath so that inside
+    smart sensor there is no need to import the dagbag and create task object for each
+    sensor task.
+
+    SensorInstance include another set of columns to support the smart sensor shard on
+    large number of sensor instance. By hashcode generated from poke_contex and shardcode
+    the distributed smart sensor processes can work on different shards.
+
+    """
+
+    __tablename__ = "sensor_instance"
+
+    id = Column(Integer, primary_key=True)
+    task_id = Column(String(ID_LEN), nullable=False)
+    dag_id = Column(String(ID_LEN), nullable=False)
+    execution_date = Column(UtcDateTime, nullable=False)
+    state = Column(String(20))
+    _try_number = Column('try_number', Integer, default=0)
+    start_date = Column(UtcDateTime)
+    operator = Column(String(1000), nullable=False)
+    op_classpath = Column(String(1000), nullable=False)
+    hashcode = Column(BigInteger, nullable=False)
+    shardcode = Column(Integer, nullable=False)
+    poke_context = Column(Text, nullable=False)
+    execution_context = Column(Text)
+    created_at = Column(UtcDateTime, default=timezone.utcnow(), nullable=False)
+    updated_at = Column(UtcDateTime,
+                        default=timezone.utcnow(),
+                        onupdate=timezone.utcnow(),
+                        nullable=False)
+
+    __table_args__ = (
+        Index('ti_primary_key', dag_id, task_id, execution_date, unique=True),
+
+        Index('si_hashcode', hashcode),
+        Index('si_shardcode', shardcode),
+        Index('si_state_shard', state, shardcode),
+        Index('si_updated_at', updated_at),
+    )
+
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+
+    @staticmethod
+    def get_classpath(obj):
+        """
+        Get the object dotted class path. Used for getting operator classpath
+        :param obj:

Review comment:
       Needs docstrings

##########
File path: airflow/models/baseoperator.py
##########
@@ -1338,6 +1338,13 @@ def get_serialized_fields(cls):
 
         return cls.__serialized_fields
 
+    def is_smart_sensor_compatible(self):
+        """
+        Return if this operator can use smart service. Default False.
+        :return:

Review comment:
       ```suggestion
   ```

##########
File path: airflow/providers/apache/hive/sensors/named_hive_partition.py
##########
@@ -104,3 +105,9 @@ def poke(self, context: Dict[str, Any]) -> bool:
 
         self.next_index_to_poke = 0
         return True
+
+    def is_smart_sensor_compatible(self):
+        result = not self.soft_fail and not self.hook and \
+            len(self.partition_names) <= 30 and \
+            super(NamedHivePartitionSensor, self).is_smart_sensor_compatible()

Review comment:
       ```suggestion
               super().is_smart_sensor_compatible()
   ```

##########
File path: airflow/models/sensorinstance.py
##########
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+
+from sqlalchemy import BigInteger, Column, Index, Integer, String, Text
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException
+from airflow.models.base import ID_LEN, Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+from airflow.utils.state import State
+
+
+class SensorInstance(Base):
+    """
+    SensorInstance support the smart sensor service. It stores the sensor task states
+    and context that required for poking include poke context and execution context.
+    In sensor_instance table we also save the sensor operator classpath so that inside
+    smart sensor there is no need to import the dagbag and create task object for each
+    sensor task.
+
+    SensorInstance include another set of columns to support the smart sensor shard on
+    large number of sensor instance. By hashcode generated from poke_contex and shardcode
+    the distributed smart sensor processes can work on different shards.
+
+    """
+
+    __tablename__ = "sensor_instance"
+
+    id = Column(Integer, primary_key=True)
+    task_id = Column(String(ID_LEN), nullable=False)
+    dag_id = Column(String(ID_LEN), nullable=False)
+    execution_date = Column(UtcDateTime, nullable=False)
+    state = Column(String(20))
+    _try_number = Column('try_number', Integer, default=0)
+    start_date = Column(UtcDateTime)
+    operator = Column(String(1000), nullable=False)
+    op_classpath = Column(String(1000), nullable=False)
+    hashcode = Column(BigInteger, nullable=False)
+    shardcode = Column(Integer, nullable=False)
+    poke_context = Column(Text, nullable=False)
+    execution_context = Column(Text)
+    created_at = Column(UtcDateTime, default=timezone.utcnow(), nullable=False)
+    updated_at = Column(UtcDateTime,
+                        default=timezone.utcnow(),
+                        onupdate=timezone.utcnow(),
+                        nullable=False)
+
+    __table_args__ = (
+        Index('ti_primary_key', dag_id, task_id, execution_date, unique=True),
+
+        Index('si_hashcode', hashcode),
+        Index('si_shardcode', shardcode),
+        Index('si_state_shard', state, shardcode),
+        Index('si_updated_at', updated_at),
+    )
+
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+
+    @staticmethod
+    def get_classpath(obj):
+        """
+        Get the object dotted class path. Used for getting operator classpath
+        :param obj:
+        :return: string
+        """
+        module_name, class_name = obj.__module__, obj.__class__.__name__
+
+        if '.' not in module_name:
+            # Fix if module name was broken by airflow importer
+            from airflow import operators as builtin_operators, sensors as builtin_sensors
+            stem = ''
+            if hasattr(builtin_operators, class_name):
+                stem = 'airflow.operators.'
+            elif hasattr(builtin_sensors, class_name):
+                stem = 'airflow.sensors.'
+            module_name = stem + module_name
+
+        return module_name + "." + class_name
+
+    @classmethod
+    @provide_session
+    def register(cls, ti, poke_context, execution_context, session=None):
+        """
+        Register task instance ti for a sensor in sensor_instance table. Persist the
+        context used for a sensor and set the sensor_instance table state to sensing.
+        :param ti: The task instance for the sensor to be registered.
+        :param poke_context: Context used for sensor poke function.
+        :param execution_context: Context used for execute sensor such as timeout
+        setting and email configuration.

Review comment:
       ```suggestion
       def register(cls, ti, poke_context, execution_context, session=None):
           """
           Register task instance ti for a sensor in sensor_instance table. Persist the
           context used for a sensor and set the sensor_instance table state to sensing.
           
           :param ti: The task instance for the sensor to be registered.
           :param poke_context: Context used for sensor poke function.
           :param execution_context: Context used for execute sensor such as timeout
               setting and email configuration.
   ```

##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -106,6 +113,62 @@ def poke(self, context: Dict) -> bool:
         """
         raise AirflowException('Override me.')
 
+    def is_smart_sensor_compatible(self):
+        check_list = [not self.sensor_service_enabled,
+                      self.on_success_callback,
+                      self.on_retry_callback,
+                      self.on_failure_callback]
+        for status in check_list:
+            if status:
+                return False
+
+        operator = self.__class__.__name__
+        return operator in self.sensors_support_sensor_service
+
+    def register_in_sensor_service(self, ti, context):
+        """
+        Register ti in smart sensor service
+        :param ti:
+        :param context:

Review comment:
       Needs to be filled in

##########
File path: airflow/migrations/versions/e38be357a868_update_schema_for_smart_sensor.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add sensor_instance table
+
+Revision ID: e38be357a868
+Revises: 939bb1e647c8
+Create Date: 2019-06-07 04:03:17.003939
+
+"""
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import func
+
+# revision identifiers, used by Alembic.
+revision = 'e38be357a868'
+down_revision = '8f966b9c467a'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():  # noqa: D103
+    op.create_table(
+        'sensor_instance',
+        sa.Column('id', sa.Integer(), nullable=False),
+        sa.Column('task_id', sa.String(length=250), nullable=False),
+        sa.Column('dag_id', sa.String(length=250), nullable=False),
+        sa.Column('execution_date', sa.TIMESTAMP(timezone=True), nullable=False),
+        sa.Column('state', sa.String(length=20), nullable=True),
+        sa.Column('try_number', sa.Integer(), nullable=True),
+        sa.Column('start_date', sa.TIMESTAMP(timezone=True), nullable=True),
+        sa.Column('operator', sa.String(length=1000), nullable=False),
+        sa.Column('op_classpath', sa.String(length=1000), nullable=False),
+        sa.Column('hashcode', sa.BigInteger(), nullable=False),
+        sa.Column('shardcode', sa.Integer(), nullable=False),
+        sa.Column('poke_context', sa.Text(), nullable=False),
+        sa.Column('execution_context', sa.Text(), nullable=True),
+        sa.Column('created_at', sa.TIMESTAMP(timezone=True), default=func.now(), nullable=False),
+        sa.Column('updated_at', sa.TIMESTAMP(timezone=True), default=func.now(), nullable=False),

Review comment:
       Also I think the Timestamp needs to have precision for MySQL. Similar to https://github.com/apache/airflow/commit/9e6b5abea08eb35c48839e33a4518f292ef7564e#diff-b89252a0e7858101fe0bbd8bf89faf1aR40-R49

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state

Review comment:
       i.e. info regarding args 

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state
+    """
+    def __init__(self):
+        self.state = None
+        self.sensor_task = None
+        self.last_poke_time = None
+        self.to_flush = False
+
+    def set_state(self, state):
+        """
+        Set state for cached poke work.
+        :param state:
+        """
+        self.state = state
+        self.last_poke_time = timezone.utcnow()
+
+    def clear_state(self):
+        """
+        Clear state for cached poke work.
+        """
+        self.state = None
+
+    def set_to_flush(self):
+        """
+        Mark this poke work to be popped from cached dict after current loop.
+        """
+        self.to_flush = True
+
+    def is_expired(self):
+        """
+        The cached task object expires if there is no poke for 20 mins.
+        :return: Boolean
+        """
+        return self.to_flush or (timezone.utcnow() - self.last_poke_time).total_seconds() > 1200
+
+
+class SensorExceptionInfo:
+    """
+    Hold sensor exception information and the type of exception. For possible transient
+    infra failure, give the task more chance to retry before fail it.
+    """
+    def __init__(self,
+                 exception_info,
+                 is_infra_failure=False,
+                 infra_failure_retry_window=datetime.timedelta(minutes=130)):
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+        self._infra_failure_retry_window = infra_failure_retry_window
+
+        self._infra_failure_timeout = None
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_latest_exception(self, exception_info, is_infra_failure=False):
+        """
+        This function set the latest exception information for sensor exception. If the exception
+        implies an infra failure, this function will check the recorded infra failure timeout
+        which was set at the first infra failure exception arrives. There is a 6 hours window
+        for retry without failing current run.
+
+        :param exception_info:
+        :param is_infra_failure:
+        :return:
+        """
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_infra_failure_timeout(self):
+        """
+        Set the time point when the sensor should be failed if it kept getting infra
+        failure.
+        :return:
+        """
+        # Only set the infra_failure_timeout if there is no existing one
+        if not self._is_infra_failure:
+            self._infra_failure_timeout = None
+        elif self._infra_failure_timeout is None:
+            self._infra_failure_timeout = timezone.utcnow() + self._infra_failure_retry_window
+
+    def should_fail_current_run(self):
+        """
+        :return: Should the sensor fail
+        :type: boolean
+        """
+        return not self.is_infra_failure or timezone.utcnow() > self._infra_failure_timeout
+
+    @property
+    def exception_info(self):
+        """
+        :return: exception msg.
+        """
+        return self._exception_info
+
+    @property
+    def is_infra_failure(self):
+        """
+
+        :return: If the exception is an infra failure
+        :type: boolean
+        """
+        return self._is_infra_failure
+
+    def is_expired(self):
+        """
+        :return: If current exception need to be kept.
+        :type: boolean
+        """
+        if not self._is_infra_failure:
+            return True
+        return timezone.utcnow() > self._infra_failure_timeout + datetime.timedelta(minutes=30)
+
+
+class SmartSensorOperator(BaseOperator, SkipMixin):
+    """
+    Smart sensor operators are derived from this class.
+
+    Smart Sensor operators keep refresh a dictionary by visiting DB.
+    Taking qualified active sensor tasks. Different from sensor operator,
+    Smart sensor operators poke for all sensor tasks in the dictionary at
+    a time interval. When a criteria is met or fail by time out, it update
+    all sensor task state in task_instance table
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str
+    """
+    ui_color = '#e6f1f2'
+
+    @apply_defaults
+    def __init__(self,
+                 poke_interval=180,
+                 smart_sensor_timeout=60 * 60 * 24 * 7,
+                 soft_fail=False,
+                 shard_min=0,
+                 shard_max=100000,
+                 poke_exception_cache_ttl=600,
+                 poke_timeout=6,
+                 poke_exception_to_fail_task_threshold=3,
+                 *args,
+                 **kwargs):
+        super().__init__(*args, **kwargs)
+        # super(SmartSensorOperator, self).__init__(*args, **kwargs)
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        self.timeout = smart_sensor_timeout
+        self._validate_input_values()
+        self.hostname = ""
+
+        self.sensor_works = []
+        self.cached_dedup_works = {}
+        self.cached_sensor_exceptions = {}
+
+        self.max_tis_per_query = 50
+        self.shard_min = shard_min
+        self.shard_max = shard_max
+        self.poke_exception_cache_ttl = poke_exception_cache_ttl
+        self.poke_timeout = poke_timeout
+        self._poke_exception_to_fail_task_threshold = poke_exception_to_fail_task_threshold
+
+    def _validate_input_values(self):
+        if not isinstance(self.poke_interval, (int, float)) or self.poke_interval < 0:
+            raise AirflowException(
+                "The poke_interval must be a non-negative number")
+        if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
+            raise AirflowException(
+                "The timeout must be a non-negative number")
+
+    @provide_session
+    def _load_sensor_works(self, session=None):
+        """
+        Refresh sensor instances need to be handled by this operator. Put the context,
+        hashcode and sensor instance start_date in a wrapper class
+        :param session:
+        :return:
+        """
+        SI = SensorInstance
+        start_query_time = time.time()
+        query = session.query(SI) \
+            .filter(SI.state == State.SENSING)\
+            .filter(SI.shardcode < self.shard_max,
+                    SI.shardcode >= self.shard_min)
+        tis = query.all()
+        session.commit()

Review comment:
       Do we need this? We are simply querying in this function so I think we can safely remove this

##########
File path: docs/smart-sensor.rst
##########
@@ -0,0 +1,86 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Smart Sensor
+============
+
+The smart sensor is a service which greatly reduces airflow’s infrastructure cost by consolidating
+some of the airflow long running light weight tasks.
+
+.. image:: img/smart_sensor_architecture.png
+
+Instead of using one process for each task, the main idea of the smart sensor service to improve the

Review comment:
       ```suggestion
   Instead of using one process for each task, the main idea of the smart sensor service is to improve the
   ```

##########
File path: airflow/utils/log/file_processor_handler.py
##########
@@ -83,7 +83,14 @@ def close(self):
             self.handler.close()
 
     def _render_filename(self, filename):
-        filename = os.path.relpath(filename, self.dag_dir)
+        # Fix for airflow native dag access right problem. If the filename is in airflow
+        # source, mock as the dag is under dag_dir/native_dags

Review comment:
       Can you expand on this a little bit -- I am not sure I understand

##########
File path: docs/smart-sensor.rst
##########
@@ -0,0 +1,86 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Smart Sensor
+============
+
+The smart sensor is a service which greatly reduces airflow’s infrastructure cost by consolidating

Review comment:
       ```suggestion
   The smart sensor is a service (run by a builtin DAG) which greatly reduces airflow’s infrastructure cost by consolidating
   ```

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """

Review comment:
       Can we add docstrings for params

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?

Review comment:
       Did you'll decide anything on this TODO?

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state
+    """
+    def __init__(self):
+        self.state = None
+        self.sensor_task = None
+        self.last_poke_time = None
+        self.to_flush = False
+
+    def set_state(self, state):
+        """
+        Set state for cached poke work.
+        :param state:
+        """
+        self.state = state
+        self.last_poke_time = timezone.utcnow()
+
+    def clear_state(self):
+        """
+        Clear state for cached poke work.
+        """
+        self.state = None
+
+    def set_to_flush(self):
+        """
+        Mark this poke work to be popped from cached dict after current loop.
+        """
+        self.to_flush = True
+
+    def is_expired(self):
+        """
+        The cached task object expires if there is no poke for 20 mins.
+        :return: Boolean
+        """
+        return self.to_flush or (timezone.utcnow() - self.last_poke_time).total_seconds() > 1200
+
+
+class SensorExceptionInfo:
+    """
+    Hold sensor exception information and the type of exception. For possible transient
+    infra failure, give the task more chance to retry before fail it.
+    """
+    def __init__(self,
+                 exception_info,
+                 is_infra_failure=False,
+                 infra_failure_retry_window=datetime.timedelta(minutes=130)):
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+        self._infra_failure_retry_window = infra_failure_retry_window
+
+        self._infra_failure_timeout = None
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_latest_exception(self, exception_info, is_infra_failure=False):
+        """
+        This function set the latest exception information for sensor exception. If the exception
+        implies an infra failure, this function will check the recorded infra failure timeout
+        which was set at the first infra failure exception arrives. There is a 6 hours window
+        for retry without failing current run.
+
+        :param exception_info:
+        :param is_infra_failure:
+        :return:
+        """
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_infra_failure_timeout(self):
+        """
+        Set the time point when the sensor should be failed if it kept getting infra
+        failure.
+        :return:
+        """
+        # Only set the infra_failure_timeout if there is no existing one
+        if not self._is_infra_failure:
+            self._infra_failure_timeout = None
+        elif self._infra_failure_timeout is None:
+            self._infra_failure_timeout = timezone.utcnow() + self._infra_failure_retry_window
+
+    def should_fail_current_run(self):
+        """
+        :return: Should the sensor fail
+        :type: boolean
+        """
+        return not self.is_infra_failure or timezone.utcnow() > self._infra_failure_timeout
+
+    @property
+    def exception_info(self):
+        """
+        :return: exception msg.
+        """
+        return self._exception_info
+
+    @property
+    def is_infra_failure(self):
+        """
+
+        :return: If the exception is an infra failure
+        :type: boolean
+        """
+        return self._is_infra_failure
+
+    def is_expired(self):
+        """
+        :return: If current exception need to be kept.
+        :type: boolean
+        """
+        if not self._is_infra_failure:
+            return True
+        return timezone.utcnow() > self._infra_failure_timeout + datetime.timedelta(minutes=30)
+
+
+class SmartSensorOperator(BaseOperator, SkipMixin):
+    """
+    Smart sensor operators are derived from this class.
+
+    Smart Sensor operators keep refresh a dictionary by visiting DB.
+    Taking qualified active sensor tasks. Different from sensor operator,
+    Smart sensor operators poke for all sensor tasks in the dictionary at
+    a time interval. When a criteria is met or fail by time out, it update
+    all sensor task state in task_instance table
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str

Review comment:
       Some of them are missing:
   
   - shard_min
   - shard_max
   - poke_exception_cache_ttl
   - poke_timeout
   - poke_exception_to_fail_task_threshold
   

##########
File path: airflow/smart_sensor_dags/smart_sensor_group.py
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+""" Smart sensor DAGs managing all smart sensor tasks """
+
+from builtins import range
+from datetime import timedelta
+
+from airflow.configuration import conf
+from airflow.models import DAG
+from airflow.sensors.smart_sensor_operator import SmartSensorOperator
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+    'start_date': days_ago(2),
+}
+
+num_smart_sensor_shard = conf.getint("smart_sensor", "shards")
+shard_code_upper_limit = conf.getint('smart_sensor', 'shard_code_upper_limit')
+
+for i in range(num_smart_sensor_shard):
+    shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard
+    shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard
+
+    dag_id = 'smart_sensor_group_shard_{}'.format(i)
+    dag = DAG(
+        dag_id=dag_id,
+        default_args=args,
+        schedule_interval=timedelta(days=1),

Review comment:
       ```suggestion
           schedule_interval=timedelta(days=1),
           start_date=days_ago(2),
   ```

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state

Review comment:
       needs docstrings here

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state
+    """
+    def __init__(self):
+        self.state = None
+        self.sensor_task = None
+        self.last_poke_time = None
+        self.to_flush = False
+
+    def set_state(self, state):
+        """
+        Set state for cached poke work.
+        :param state:
+        """
+        self.state = state
+        self.last_poke_time = timezone.utcnow()
+
+    def clear_state(self):
+        """
+        Clear state for cached poke work.
+        """
+        self.state = None
+
+    def set_to_flush(self):
+        """
+        Mark this poke work to be popped from cached dict after current loop.
+        """
+        self.to_flush = True
+
+    def is_expired(self):
+        """
+        The cached task object expires if there is no poke for 20 mins.
+        :return: Boolean
+        """
+        return self.to_flush or (timezone.utcnow() - self.last_poke_time).total_seconds() > 1200
+
+
+class SensorExceptionInfo:
+    """
+    Hold sensor exception information and the type of exception. For possible transient
+    infra failure, give the task more chance to retry before fail it.
+    """
+    def __init__(self,
+                 exception_info,
+                 is_infra_failure=False,
+                 infra_failure_retry_window=datetime.timedelta(minutes=130)):
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+        self._infra_failure_retry_window = infra_failure_retry_window
+
+        self._infra_failure_timeout = None
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_latest_exception(self, exception_info, is_infra_failure=False):
+        """
+        This function set the latest exception information for sensor exception. If the exception
+        implies an infra failure, this function will check the recorded infra failure timeout
+        which was set at the first infra failure exception arrives. There is a 6 hours window
+        for retry without failing current run.
+
+        :param exception_info:
+        :param is_infra_failure:
+        :return:
+        """
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_infra_failure_timeout(self):
+        """
+        Set the time point when the sensor should be failed if it kept getting infra
+        failure.
+        :return:
+        """
+        # Only set the infra_failure_timeout if there is no existing one
+        if not self._is_infra_failure:
+            self._infra_failure_timeout = None
+        elif self._infra_failure_timeout is None:
+            self._infra_failure_timeout = timezone.utcnow() + self._infra_failure_retry_window
+
+    def should_fail_current_run(self):
+        """
+        :return: Should the sensor fail
+        :type: boolean
+        """
+        return not self.is_infra_failure or timezone.utcnow() > self._infra_failure_timeout
+
+    @property
+    def exception_info(self):
+        """
+        :return: exception msg.
+        """
+        return self._exception_info
+
+    @property
+    def is_infra_failure(self):
+        """
+
+        :return: If the exception is an infra failure
+        :type: boolean
+        """
+        return self._is_infra_failure
+
+    def is_expired(self):
+        """
+        :return: If current exception need to be kept.
+        :type: boolean
+        """
+        if not self._is_infra_failure:
+            return True
+        return timezone.utcnow() > self._infra_failure_timeout + datetime.timedelta(minutes=30)
+
+
+class SmartSensorOperator(BaseOperator, SkipMixin):
+    """
+    Smart sensor operators are derived from this class.
+
+    Smart Sensor operators keep refresh a dictionary by visiting DB.
+    Taking qualified active sensor tasks. Different from sensor operator,
+    Smart sensor operators poke for all sensor tasks in the dictionary at
+    a time interval. When a criteria is met or fail by time out, it update
+    all sensor task state in task_instance table
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str
+    """
+    ui_color = '#e6f1f2'
+
+    @apply_defaults
+    def __init__(self,
+                 poke_interval=180,
+                 smart_sensor_timeout=60 * 60 * 24 * 7,
+                 soft_fail=False,
+                 shard_min=0,
+                 shard_max=100000,
+                 poke_exception_cache_ttl=600,
+                 poke_timeout=6,
+                 poke_exception_to_fail_task_threshold=3,
+                 *args,
+                 **kwargs):
+        super().__init__(*args, **kwargs)
+        # super(SmartSensorOperator, self).__init__(*args, **kwargs)
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        self.timeout = smart_sensor_timeout
+        self._validate_input_values()
+        self.hostname = ""
+
+        self.sensor_works = []
+        self.cached_dedup_works = {}
+        self.cached_sensor_exceptions = {}
+
+        self.max_tis_per_query = 50
+        self.shard_min = shard_min
+        self.shard_max = shard_max
+        self.poke_exception_cache_ttl = poke_exception_cache_ttl
+        self.poke_timeout = poke_timeout
+        self._poke_exception_to_fail_task_threshold = poke_exception_to_fail_task_threshold
+
+    def _validate_input_values(self):
+        if not isinstance(self.poke_interval, (int, float)) or self.poke_interval < 0:
+            raise AirflowException(
+                "The poke_interval must be a non-negative number")
+        if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
+            raise AirflowException(
+                "The timeout must be a non-negative number")
+
+    @provide_session
+    def _load_sensor_works(self, session=None):
+        """
+        Refresh sensor instances need to be handled by this operator. Put the context,
+        hashcode and sensor instance start_date in a wrapper class
+        :param session:
+        :return:
+        """
+        SI = SensorInstance
+        start_query_time = time.time()
+        query = session.query(SI) \
+            .filter(SI.state == State.SENSING)\
+            .filter(SI.shardcode < self.shard_max,
+                    SI.shardcode >= self.shard_min)
+        tis = query.all()
+        session.commit()
+        self.log.info("Performance query %s tis, time: %s", len(tis), time.time() - start_query_time)
+
+        # Query without checking dagrun state might keep some failed dag_run tasks alive.
+        # Join with DagRun table will be very slow based on the number of sensor tasks we
+        # need to handle. We query all smart tasks in this operator
+        # and expect scheduler correct the states in _change_state_for_tis_without_dagrun()
+
+        sensor_works = []
+        for ti in tis:
+            try:
+                sensor_works.append(SensorWork(ti))
+            except Exception as e:  # pylint: disable=broad-except
+                self.log.exception("Exception at creating sensor work for ti %s", ti.key)
+                self.log.exception(e, exc_info=True)
+
+        self.log.info("%d tasks detected.", len(sensor_works))
+
+        new_sensor_works = [x for x in sensor_works if x not in self.sensor_works]
+
+        self._update_ti_hostname(new_sensor_works)
+
+        self.sensor_works = sensor_works
+
+    @provide_session
+    def _update_ti_hostname(self, sensor_works, session=None):
+        """
+        Update task instance hostname for new sensor works.
+        :param sensor_works:
+        :return:
+        """
+        TI = TaskInstance
+        ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in sensor_works]
+
+        def update_ti_hostname_with_count(count, ti_keys):
+            # Using or_ instead of in_ here to prevent from full table scan.
+            tis = session.query(TI) \
+                .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key
+                            for ti_key in ti_keys)) \
+                .all()
+
+            for ti in tis:
+                ti.hostname = self.hostname
+            session.commit()
+
+            return count + len(ti_keys)
+
+        count = helpers.reduce_in_chunks(update_ti_hostname_with_count, ti_keys, 0, self.max_tis_per_query)
+        if count:
+            self.log.info("Updated hostname on %s tis.", count)
+
+    @provide_session
+    def _mark_multi_state(self, operator, poke_hash, encoded_poke_context, state, session=None):
+        """
+        Mark state for multiple tasks that have hashcode=poke_hash.
+        :param poke_hash:
+        :param state:
+        :param session:
+        :return:
+        """
+
+        def mark_state(ti, sensor_instance):
+            ti.state = state
+            sensor_instance.state = state
+            if state in State.finished():
+                ti.end_date = end_date
+                ti.set_duration()
+
+        SI = SensorInstance
+        TI = TaskInstance
+
+        count_marked = 0
+        try:
+            query_result = session.query(TI, SI)\
+                .join(TI, and_(TI.dag_id == SI.dag_id,
+                               TI.task_id == SI.task_id,
+                               TI.execution_date == SI.execution_date)) \
+                .filter(SI.state == State.SENSING) \
+                .filter(SI.hashcode == poke_hash) \
+                .filter(SI.operator == operator) \
+                .with_for_update().all()

Review comment:
       I am concerned about any join that involves TaskInstance as that table can contain large number of records.
   
   Did you find any issues with this with your Prod cluster?

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state
+    """
+    def __init__(self):
+        self.state = None
+        self.sensor_task = None
+        self.last_poke_time = None
+        self.to_flush = False
+
+    def set_state(self, state):
+        """
+        Set state for cached poke work.
+        :param state:
+        """
+        self.state = state
+        self.last_poke_time = timezone.utcnow()
+
+    def clear_state(self):
+        """
+        Clear state for cached poke work.
+        """
+        self.state = None
+
+    def set_to_flush(self):
+        """
+        Mark this poke work to be popped from cached dict after current loop.
+        """
+        self.to_flush = True
+
+    def is_expired(self):
+        """
+        The cached task object expires if there is no poke for 20 mins.
+        :return: Boolean
+        """
+        return self.to_flush or (timezone.utcnow() - self.last_poke_time).total_seconds() > 1200
+
+
+class SensorExceptionInfo:
+    """
+    Hold sensor exception information and the type of exception. For possible transient
+    infra failure, give the task more chance to retry before fail it.
+    """
+    def __init__(self,
+                 exception_info,
+                 is_infra_failure=False,
+                 infra_failure_retry_window=datetime.timedelta(minutes=130)):
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+        self._infra_failure_retry_window = infra_failure_retry_window
+
+        self._infra_failure_timeout = None
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_latest_exception(self, exception_info, is_infra_failure=False):
+        """
+        This function set the latest exception information for sensor exception. If the exception
+        implies an infra failure, this function will check the recorded infra failure timeout
+        which was set at the first infra failure exception arrives. There is a 6 hours window
+        for retry without failing current run.
+
+        :param exception_info:
+        :param is_infra_failure:
+        :return:
+        """
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_infra_failure_timeout(self):
+        """
+        Set the time point when the sensor should be failed if it kept getting infra
+        failure.
+        :return:
+        """
+        # Only set the infra_failure_timeout if there is no existing one
+        if not self._is_infra_failure:
+            self._infra_failure_timeout = None
+        elif self._infra_failure_timeout is None:
+            self._infra_failure_timeout = timezone.utcnow() + self._infra_failure_retry_window
+
+    def should_fail_current_run(self):
+        """
+        :return: Should the sensor fail
+        :type: boolean
+        """
+        return not self.is_infra_failure or timezone.utcnow() > self._infra_failure_timeout
+
+    @property
+    def exception_info(self):
+        """
+        :return: exception msg.
+        """
+        return self._exception_info
+
+    @property
+    def is_infra_failure(self):
+        """
+
+        :return: If the exception is an infra failure
+        :type: boolean
+        """
+        return self._is_infra_failure
+
+    def is_expired(self):
+        """
+        :return: If current exception need to be kept.
+        :type: boolean
+        """
+        if not self._is_infra_failure:
+            return True
+        return timezone.utcnow() > self._infra_failure_timeout + datetime.timedelta(minutes=30)
+
+
+class SmartSensorOperator(BaseOperator, SkipMixin):
+    """
+    Smart sensor operators are derived from this class.
+
+    Smart Sensor operators keep refresh a dictionary by visiting DB.
+    Taking qualified active sensor tasks. Different from sensor operator,
+    Smart sensor operators poke for all sensor tasks in the dictionary at
+    a time interval. When a criteria is met or fail by time out, it update
+    all sensor task state in task_instance table
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str
+    """
+    ui_color = '#e6f1f2'
+
+    @apply_defaults
+    def __init__(self,
+                 poke_interval=180,
+                 smart_sensor_timeout=60 * 60 * 24 * 7,
+                 soft_fail=False,
+                 shard_min=0,
+                 shard_max=100000,
+                 poke_exception_cache_ttl=600,
+                 poke_timeout=6,
+                 poke_exception_to_fail_task_threshold=3,
+                 *args,
+                 **kwargs):
+        super().__init__(*args, **kwargs)
+        # super(SmartSensorOperator, self).__init__(*args, **kwargs)
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        self.timeout = smart_sensor_timeout
+        self._validate_input_values()
+        self.hostname = ""
+
+        self.sensor_works = []
+        self.cached_dedup_works = {}
+        self.cached_sensor_exceptions = {}
+
+        self.max_tis_per_query = 50
+        self.shard_min = shard_min
+        self.shard_max = shard_max
+        self.poke_exception_cache_ttl = poke_exception_cache_ttl
+        self.poke_timeout = poke_timeout
+        self._poke_exception_to_fail_task_threshold = poke_exception_to_fail_task_threshold
+
+    def _validate_input_values(self):
+        if not isinstance(self.poke_interval, (int, float)) or self.poke_interval < 0:
+            raise AirflowException(
+                "The poke_interval must be a non-negative number")
+        if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
+            raise AirflowException(
+                "The timeout must be a non-negative number")
+
+    @provide_session
+    def _load_sensor_works(self, session=None):
+        """
+        Refresh sensor instances need to be handled by this operator. Put the context,
+        hashcode and sensor instance start_date in a wrapper class
+        :param session:
+        :return:
+        """
+        SI = SensorInstance
+        start_query_time = time.time()
+        query = session.query(SI) \
+            .filter(SI.state == State.SENSING)\
+            .filter(SI.shardcode < self.shard_max,
+                    SI.shardcode >= self.shard_min)
+        tis = query.all()
+        session.commit()
+        self.log.info("Performance query %s tis, time: %s", len(tis), time.time() - start_query_time)
+
+        # Query without checking dagrun state might keep some failed dag_run tasks alive.
+        # Join with DagRun table will be very slow based on the number of sensor tasks we
+        # need to handle. We query all smart tasks in this operator
+        # and expect scheduler correct the states in _change_state_for_tis_without_dagrun()
+
+        sensor_works = []
+        for ti in tis:
+            try:
+                sensor_works.append(SensorWork(ti))
+            except Exception as e:  # pylint: disable=broad-except
+                self.log.exception("Exception at creating sensor work for ti %s", ti.key)
+                self.log.exception(e, exc_info=True)
+
+        self.log.info("%d tasks detected.", len(sensor_works))
+
+        new_sensor_works = [x for x in sensor_works if x not in self.sensor_works]
+
+        self._update_ti_hostname(new_sensor_works)
+
+        self.sensor_works = sensor_works
+
+    @provide_session
+    def _update_ti_hostname(self, sensor_works, session=None):
+        """
+        Update task instance hostname for new sensor works.
+        :param sensor_works:
+        :return:
+        """
+        TI = TaskInstance
+        ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in sensor_works]
+
+        def update_ti_hostname_with_count(count, ti_keys):
+            # Using or_ instead of in_ here to prevent from full table scan.
+            tis = session.query(TI) \
+                .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key
+                            for ti_key in ti_keys)) \
+                .all()
+
+            for ti in tis:
+                ti.hostname = self.hostname
+            session.commit()
+
+            return count + len(ti_keys)
+
+        count = helpers.reduce_in_chunks(update_ti_hostname_with_count, ti_keys, 0, self.max_tis_per_query)
+        if count:
+            self.log.info("Updated hostname on %s tis.", count)
+
+    @provide_session
+    def _mark_multi_state(self, operator, poke_hash, encoded_poke_context, state, session=None):
+        """
+        Mark state for multiple tasks that have hashcode=poke_hash.
+        :param poke_hash:
+        :param state:
+        :param session:
+        :return:
+        """
+
+        def mark_state(ti, sensor_instance):
+            ti.state = state
+            sensor_instance.state = state
+            if state in State.finished():
+                ti.end_date = end_date
+                ti.set_duration()
+
+        SI = SensorInstance
+        TI = TaskInstance
+
+        count_marked = 0
+        try:
+            query_result = session.query(TI, SI)\
+                .join(TI, and_(TI.dag_id == SI.dag_id,
+                               TI.task_id == SI.task_id,
+                               TI.execution_date == SI.execution_date)) \
+                .filter(SI.state == State.SENSING) \
+                .filter(SI.hashcode == poke_hash) \
+                .filter(SI.operator == operator) \
+                .with_for_update().all()
+
+            end_date = timezone.utcnow()
+            for ti, sensor_instance in query_result:
+                if sensor_instance.poke_context != encoded_poke_context:
+                    continue
+
+                ti.hostname = self.hostname
+                if ti.state == State.SENSING:
+                    mark_state(ti=ti, sensor_instance=sensor_instance)
+                    count_marked += 1
+                else:
+                    # ti.state != State.SENSING
+                    sensor_instance.state = ti.state
+
+            session.commit()
+
+        except Exception as e:  # pylint: disable=broad-except
+            self.log.exception("Exception _mark_multi_state in smart sensor for hashcode %s",
+                               str(poke_hash))
+            self.log.exception(e, exc_info=True)
+        self.log.info("Marked %s tasks out of %s to state %s", count_marked, len(query_result), state)

Review comment:
       https://docs.python.org/3/library/logging.html#logging.Logger.exception
   >exception(msg, *args, **kwargs)¶
   >Logs a message with level ERROR on this logger. The arguments are interpreted as for debug(). Exception info is added to the logging message. This method should only be called from an exception handler.
   
   So I think the 2nd `self.log.exception` can be removed

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state
+    """
+    def __init__(self):
+        self.state = None
+        self.sensor_task = None
+        self.last_poke_time = None
+        self.to_flush = False
+
+    def set_state(self, state):
+        """
+        Set state for cached poke work.
+        :param state:
+        """
+        self.state = state
+        self.last_poke_time = timezone.utcnow()
+
+    def clear_state(self):
+        """
+        Clear state for cached poke work.
+        """
+        self.state = None
+
+    def set_to_flush(self):
+        """
+        Mark this poke work to be popped from cached dict after current loop.
+        """
+        self.to_flush = True
+
+    def is_expired(self):
+        """
+        The cached task object expires if there is no poke for 20 mins.
+        :return: Boolean
+        """
+        return self.to_flush or (timezone.utcnow() - self.last_poke_time).total_seconds() > 1200
+
+
+class SensorExceptionInfo:
+    """
+    Hold sensor exception information and the type of exception. For possible transient
+    infra failure, give the task more chance to retry before fail it.
+    """
+    def __init__(self,
+                 exception_info,
+                 is_infra_failure=False,
+                 infra_failure_retry_window=datetime.timedelta(minutes=130)):
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+        self._infra_failure_retry_window = infra_failure_retry_window
+
+        self._infra_failure_timeout = None
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_latest_exception(self, exception_info, is_infra_failure=False):
+        """
+        This function set the latest exception information for sensor exception. If the exception
+        implies an infra failure, this function will check the recorded infra failure timeout
+        which was set at the first infra failure exception arrives. There is a 6 hours window
+        for retry without failing current run.
+
+        :param exception_info:
+        :param is_infra_failure:
+        :return:
+        """
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_infra_failure_timeout(self):
+        """
+        Set the time point when the sensor should be failed if it kept getting infra
+        failure.
+        :return:
+        """
+        # Only set the infra_failure_timeout if there is no existing one
+        if not self._is_infra_failure:
+            self._infra_failure_timeout = None
+        elif self._infra_failure_timeout is None:
+            self._infra_failure_timeout = timezone.utcnow() + self._infra_failure_retry_window
+
+    def should_fail_current_run(self):
+        """
+        :return: Should the sensor fail
+        :type: boolean
+        """
+        return not self.is_infra_failure or timezone.utcnow() > self._infra_failure_timeout
+
+    @property
+    def exception_info(self):
+        """
+        :return: exception msg.
+        """
+        return self._exception_info
+
+    @property
+    def is_infra_failure(self):
+        """
+
+        :return: If the exception is an infra failure
+        :type: boolean
+        """
+        return self._is_infra_failure
+
+    def is_expired(self):
+        """
+        :return: If current exception need to be kept.
+        :type: boolean
+        """
+        if not self._is_infra_failure:
+            return True
+        return timezone.utcnow() > self._infra_failure_timeout + datetime.timedelta(minutes=30)
+
+
+class SmartSensorOperator(BaseOperator, SkipMixin):
+    """
+    Smart sensor operators are derived from this class.
+
+    Smart Sensor operators keep refresh a dictionary by visiting DB.
+    Taking qualified active sensor tasks. Different from sensor operator,
+    Smart sensor operators poke for all sensor tasks in the dictionary at
+    a time interval. When a criteria is met or fail by time out, it update
+    all sensor task state in task_instance table
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str
+    """
+    ui_color = '#e6f1f2'
+
+    @apply_defaults
+    def __init__(self,
+                 poke_interval=180,
+                 smart_sensor_timeout=60 * 60 * 24 * 7,
+                 soft_fail=False,
+                 shard_min=0,
+                 shard_max=100000,
+                 poke_exception_cache_ttl=600,
+                 poke_timeout=6,
+                 poke_exception_to_fail_task_threshold=3,
+                 *args,
+                 **kwargs):
+        super().__init__(*args, **kwargs)
+        # super(SmartSensorOperator, self).__init__(*args, **kwargs)
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        self.timeout = smart_sensor_timeout
+        self._validate_input_values()
+        self.hostname = ""
+
+        self.sensor_works = []
+        self.cached_dedup_works = {}
+        self.cached_sensor_exceptions = {}
+
+        self.max_tis_per_query = 50
+        self.shard_min = shard_min
+        self.shard_max = shard_max
+        self.poke_exception_cache_ttl = poke_exception_cache_ttl
+        self.poke_timeout = poke_timeout
+        self._poke_exception_to_fail_task_threshold = poke_exception_to_fail_task_threshold
+
+    def _validate_input_values(self):
+        if not isinstance(self.poke_interval, (int, float)) or self.poke_interval < 0:
+            raise AirflowException(
+                "The poke_interval must be a non-negative number")
+        if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
+            raise AirflowException(
+                "The timeout must be a non-negative number")
+
+    @provide_session
+    def _load_sensor_works(self, session=None):
+        """
+        Refresh sensor instances need to be handled by this operator. Put the context,
+        hashcode and sensor instance start_date in a wrapper class
+        :param session:
+        :return:
+        """
+        SI = SensorInstance
+        start_query_time = time.time()
+        query = session.query(SI) \
+            .filter(SI.state == State.SENSING)\
+            .filter(SI.shardcode < self.shard_max,
+                    SI.shardcode >= self.shard_min)
+        tis = query.all()
+        session.commit()
+        self.log.info("Performance query %s tis, time: %s", len(tis), time.time() - start_query_time)
+
+        # Query without checking dagrun state might keep some failed dag_run tasks alive.
+        # Join with DagRun table will be very slow based on the number of sensor tasks we
+        # need to handle. We query all smart tasks in this operator
+        # and expect scheduler correct the states in _change_state_for_tis_without_dagrun()
+
+        sensor_works = []
+        for ti in tis:
+            try:
+                sensor_works.append(SensorWork(ti))
+            except Exception as e:  # pylint: disable=broad-except
+                self.log.exception("Exception at creating sensor work for ti %s", ti.key)
+                self.log.exception(e, exc_info=True)
+
+        self.log.info("%d tasks detected.", len(sensor_works))
+
+        new_sensor_works = [x for x in sensor_works if x not in self.sensor_works]
+
+        self._update_ti_hostname(new_sensor_works)
+
+        self.sensor_works = sensor_works
+
+    @provide_session
+    def _update_ti_hostname(self, sensor_works, session=None):
+        """
+        Update task instance hostname for new sensor works.
+        :param sensor_works:
+        :return:
+        """
+        TI = TaskInstance
+        ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in sensor_works]
+
+        def update_ti_hostname_with_count(count, ti_keys):
+            # Using or_ instead of in_ here to prevent from full table scan.
+            tis = session.query(TI) \
+                .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key
+                            for ti_key in ti_keys)) \
+                .all()
+
+            for ti in tis:
+                ti.hostname = self.hostname
+            session.commit()
+
+            return count + len(ti_keys)
+
+        count = helpers.reduce_in_chunks(update_ti_hostname_with_count, ti_keys, 0, self.max_tis_per_query)
+        if count:
+            self.log.info("Updated hostname on %s tis.", count)
+
+    @provide_session
+    def _mark_multi_state(self, operator, poke_hash, encoded_poke_context, state, session=None):
+        """
+        Mark state for multiple tasks that have hashcode=poke_hash.
+        :param poke_hash:
+        :param state:
+        :param session:
+        :return:
+        """
+
+        def mark_state(ti, sensor_instance):
+            ti.state = state
+            sensor_instance.state = state
+            if state in State.finished():
+                ti.end_date = end_date
+                ti.set_duration()
+
+        SI = SensorInstance
+        TI = TaskInstance
+
+        count_marked = 0
+        try:
+            query_result = session.query(TI, SI)\
+                .join(TI, and_(TI.dag_id == SI.dag_id,
+                               TI.task_id == SI.task_id,
+                               TI.execution_date == SI.execution_date)) \
+                .filter(SI.state == State.SENSING) \
+                .filter(SI.hashcode == poke_hash) \
+                .filter(SI.operator == operator) \
+                .with_for_update().all()

Review comment:
       How about if we filter first with `State.SENSING` to decrease the set we do joins, WDYT?

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state
+    """
+    def __init__(self):
+        self.state = None
+        self.sensor_task = None
+        self.last_poke_time = None
+        self.to_flush = False
+
+    def set_state(self, state):
+        """
+        Set state for cached poke work.
+        :param state:
+        """
+        self.state = state
+        self.last_poke_time = timezone.utcnow()
+
+    def clear_state(self):
+        """
+        Clear state for cached poke work.
+        """
+        self.state = None
+
+    def set_to_flush(self):
+        """
+        Mark this poke work to be popped from cached dict after current loop.
+        """
+        self.to_flush = True
+
+    def is_expired(self):
+        """
+        The cached task object expires if there is no poke for 20 mins.
+        :return: Boolean
+        """
+        return self.to_flush or (timezone.utcnow() - self.last_poke_time).total_seconds() > 1200
+
+
+class SensorExceptionInfo:
+    """
+    Hold sensor exception information and the type of exception. For possible transient
+    infra failure, give the task more chance to retry before fail it.
+    """
+    def __init__(self,
+                 exception_info,
+                 is_infra_failure=False,
+                 infra_failure_retry_window=datetime.timedelta(minutes=130)):
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+        self._infra_failure_retry_window = infra_failure_retry_window
+
+        self._infra_failure_timeout = None
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_latest_exception(self, exception_info, is_infra_failure=False):
+        """
+        This function set the latest exception information for sensor exception. If the exception
+        implies an infra failure, this function will check the recorded infra failure timeout
+        which was set at the first infra failure exception arrives. There is a 6 hours window
+        for retry without failing current run.
+
+        :param exception_info:
+        :param is_infra_failure:
+        :return:
+        """
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_infra_failure_timeout(self):
+        """
+        Set the time point when the sensor should be failed if it kept getting infra
+        failure.
+        :return:
+        """
+        # Only set the infra_failure_timeout if there is no existing one
+        if not self._is_infra_failure:
+            self._infra_failure_timeout = None
+        elif self._infra_failure_timeout is None:
+            self._infra_failure_timeout = timezone.utcnow() + self._infra_failure_retry_window
+
+    def should_fail_current_run(self):
+        """
+        :return: Should the sensor fail
+        :type: boolean
+        """
+        return not self.is_infra_failure or timezone.utcnow() > self._infra_failure_timeout
+
+    @property
+    def exception_info(self):
+        """
+        :return: exception msg.
+        """
+        return self._exception_info
+
+    @property
+    def is_infra_failure(self):
+        """
+
+        :return: If the exception is an infra failure
+        :type: boolean
+        """
+        return self._is_infra_failure
+
+    def is_expired(self):
+        """
+        :return: If current exception need to be kept.
+        :type: boolean
+        """
+        if not self._is_infra_failure:
+            return True
+        return timezone.utcnow() > self._infra_failure_timeout + datetime.timedelta(minutes=30)
+
+
+class SmartSensorOperator(BaseOperator, SkipMixin):
+    """
+    Smart sensor operators are derived from this class.
+
+    Smart Sensor operators keep refresh a dictionary by visiting DB.
+    Taking qualified active sensor tasks. Different from sensor operator,
+    Smart sensor operators poke for all sensor tasks in the dictionary at
+    a time interval. When a criteria is met or fail by time out, it update
+    all sensor task state in task_instance table
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str
+    """
+    ui_color = '#e6f1f2'
+
+    @apply_defaults
+    def __init__(self,
+                 poke_interval=180,
+                 smart_sensor_timeout=60 * 60 * 24 * 7,
+                 soft_fail=False,
+                 shard_min=0,
+                 shard_max=100000,
+                 poke_exception_cache_ttl=600,
+                 poke_timeout=6,
+                 poke_exception_to_fail_task_threshold=3,
+                 *args,
+                 **kwargs):
+        super().__init__(*args, **kwargs)
+        # super(SmartSensorOperator, self).__init__(*args, **kwargs)
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        self.timeout = smart_sensor_timeout
+        self._validate_input_values()
+        self.hostname = ""
+
+        self.sensor_works = []
+        self.cached_dedup_works = {}
+        self.cached_sensor_exceptions = {}
+
+        self.max_tis_per_query = 50
+        self.shard_min = shard_min
+        self.shard_max = shard_max
+        self.poke_exception_cache_ttl = poke_exception_cache_ttl
+        self.poke_timeout = poke_timeout
+        self._poke_exception_to_fail_task_threshold = poke_exception_to_fail_task_threshold
+
+    def _validate_input_values(self):
+        if not isinstance(self.poke_interval, (int, float)) or self.poke_interval < 0:
+            raise AirflowException(
+                "The poke_interval must be a non-negative number")
+        if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
+            raise AirflowException(
+                "The timeout must be a non-negative number")
+
+    @provide_session
+    def _load_sensor_works(self, session=None):
+        """
+        Refresh sensor instances need to be handled by this operator. Put the context,
+        hashcode and sensor instance start_date in a wrapper class
+        :param session:
+        :return:
+        """
+        SI = SensorInstance
+        start_query_time = time.time()
+        query = session.query(SI) \
+            .filter(SI.state == State.SENSING)\
+            .filter(SI.shardcode < self.shard_max,
+                    SI.shardcode >= self.shard_min)
+        tis = query.all()
+        session.commit()
+        self.log.info("Performance query %s tis, time: %s", len(tis), time.time() - start_query_time)
+
+        # Query without checking dagrun state might keep some failed dag_run tasks alive.
+        # Join with DagRun table will be very slow based on the number of sensor tasks we
+        # need to handle. We query all smart tasks in this operator
+        # and expect scheduler correct the states in _change_state_for_tis_without_dagrun()
+
+        sensor_works = []
+        for ti in tis:
+            try:
+                sensor_works.append(SensorWork(ti))
+            except Exception as e:  # pylint: disable=broad-except
+                self.log.exception("Exception at creating sensor work for ti %s", ti.key)
+                self.log.exception(e, exc_info=True)
+
+        self.log.info("%d tasks detected.", len(sensor_works))
+
+        new_sensor_works = [x for x in sensor_works if x not in self.sensor_works]
+
+        self._update_ti_hostname(new_sensor_works)
+
+        self.sensor_works = sensor_works
+
+    @provide_session
+    def _update_ti_hostname(self, sensor_works, session=None):
+        """
+        Update task instance hostname for new sensor works.
+        :param sensor_works:
+        :return:
+        """
+        TI = TaskInstance
+        ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in sensor_works]
+
+        def update_ti_hostname_with_count(count, ti_keys):
+            # Using or_ instead of in_ here to prevent from full table scan.
+            tis = session.query(TI) \
+                .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key
+                            for ti_key in ti_keys)) \
+                .all()
+
+            for ti in tis:
+                ti.hostname = self.hostname
+            session.commit()
+
+            return count + len(ti_keys)
+
+        count = helpers.reduce_in_chunks(update_ti_hostname_with_count, ti_keys, 0, self.max_tis_per_query)
+        if count:
+            self.log.info("Updated hostname on %s tis.", count)
+
+    @provide_session
+    def _mark_multi_state(self, operator, poke_hash, encoded_poke_context, state, session=None):
+        """
+        Mark state for multiple tasks that have hashcode=poke_hash.
+        :param poke_hash:
+        :param state:
+        :param session:
+        :return:
+        """
+
+        def mark_state(ti, sensor_instance):
+            ti.state = state
+            sensor_instance.state = state
+            if state in State.finished():
+                ti.end_date = end_date
+                ti.set_duration()
+
+        SI = SensorInstance
+        TI = TaskInstance
+
+        count_marked = 0
+        try:
+            query_result = session.query(TI, SI)\
+                .join(TI, and_(TI.dag_id == SI.dag_id,
+                               TI.task_id == SI.task_id,
+                               TI.execution_date == SI.execution_date)) \
+                .filter(SI.state == State.SENSING) \
+                .filter(SI.hashcode == poke_hash) \
+                .filter(SI.operator == operator) \
+                .with_for_update().all()
+
+            end_date = timezone.utcnow()
+            for ti, sensor_instance in query_result:
+                if sensor_instance.poke_context != encoded_poke_context:
+                    continue
+
+                ti.hostname = self.hostname
+                if ti.state == State.SENSING:
+                    mark_state(ti=ti, sensor_instance=sensor_instance)
+                    count_marked += 1
+                else:
+                    # ti.state != State.SENSING

Review comment:
       Can this be removed?

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state
+    """
+    def __init__(self):
+        self.state = None
+        self.sensor_task = None
+        self.last_poke_time = None
+        self.to_flush = False
+
+    def set_state(self, state):
+        """
+        Set state for cached poke work.
+        :param state:
+        """
+        self.state = state
+        self.last_poke_time = timezone.utcnow()
+
+    def clear_state(self):
+        """
+        Clear state for cached poke work.
+        """
+        self.state = None
+
+    def set_to_flush(self):
+        """
+        Mark this poke work to be popped from cached dict after current loop.
+        """
+        self.to_flush = True
+
+    def is_expired(self):
+        """
+        The cached task object expires if there is no poke for 20 mins.
+        :return: Boolean
+        """
+        return self.to_flush or (timezone.utcnow() - self.last_poke_time).total_seconds() > 1200
+
+
+class SensorExceptionInfo:
+    """
+    Hold sensor exception information and the type of exception. For possible transient
+    infra failure, give the task more chance to retry before fail it.
+    """
+    def __init__(self,
+                 exception_info,
+                 is_infra_failure=False,
+                 infra_failure_retry_window=datetime.timedelta(minutes=130)):
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+        self._infra_failure_retry_window = infra_failure_retry_window
+
+        self._infra_failure_timeout = None
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_latest_exception(self, exception_info, is_infra_failure=False):
+        """
+        This function set the latest exception information for sensor exception. If the exception
+        implies an infra failure, this function will check the recorded infra failure timeout
+        which was set at the first infra failure exception arrives. There is a 6 hours window
+        for retry without failing current run.
+
+        :param exception_info:
+        :param is_infra_failure:
+        :return:
+        """
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_infra_failure_timeout(self):
+        """
+        Set the time point when the sensor should be failed if it kept getting infra
+        failure.
+        :return:
+        """
+        # Only set the infra_failure_timeout if there is no existing one
+        if not self._is_infra_failure:
+            self._infra_failure_timeout = None
+        elif self._infra_failure_timeout is None:
+            self._infra_failure_timeout = timezone.utcnow() + self._infra_failure_retry_window
+
+    def should_fail_current_run(self):
+        """
+        :return: Should the sensor fail
+        :type: boolean
+        """
+        return not self.is_infra_failure or timezone.utcnow() > self._infra_failure_timeout
+
+    @property
+    def exception_info(self):
+        """
+        :return: exception msg.
+        """
+        return self._exception_info
+
+    @property
+    def is_infra_failure(self):
+        """
+
+        :return: If the exception is an infra failure
+        :type: boolean
+        """
+        return self._is_infra_failure
+
+    def is_expired(self):
+        """
+        :return: If current exception need to be kept.
+        :type: boolean
+        """
+        if not self._is_infra_failure:
+            return True
+        return timezone.utcnow() > self._infra_failure_timeout + datetime.timedelta(minutes=30)
+
+
+class SmartSensorOperator(BaseOperator, SkipMixin):
+    """
+    Smart sensor operators are derived from this class.
+
+    Smart Sensor operators keep refresh a dictionary by visiting DB.
+    Taking qualified active sensor tasks. Different from sensor operator,
+    Smart sensor operators poke for all sensor tasks in the dictionary at
+    a time interval. When a criteria is met or fail by time out, it update
+    all sensor task state in task_instance table
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str
+    """
+    ui_color = '#e6f1f2'
+
+    @apply_defaults
+    def __init__(self,
+                 poke_interval=180,
+                 smart_sensor_timeout=60 * 60 * 24 * 7,
+                 soft_fail=False,
+                 shard_min=0,
+                 shard_max=100000,
+                 poke_exception_cache_ttl=600,
+                 poke_timeout=6,
+                 poke_exception_to_fail_task_threshold=3,
+                 *args,
+                 **kwargs):
+        super().__init__(*args, **kwargs)
+        # super(SmartSensorOperator, self).__init__(*args, **kwargs)
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        self.timeout = smart_sensor_timeout
+        self._validate_input_values()
+        self.hostname = ""
+
+        self.sensor_works = []
+        self.cached_dedup_works = {}
+        self.cached_sensor_exceptions = {}
+
+        self.max_tis_per_query = 50
+        self.shard_min = shard_min
+        self.shard_max = shard_max
+        self.poke_exception_cache_ttl = poke_exception_cache_ttl
+        self.poke_timeout = poke_timeout
+        self._poke_exception_to_fail_task_threshold = poke_exception_to_fail_task_threshold
+
+    def _validate_input_values(self):
+        if not isinstance(self.poke_interval, (int, float)) or self.poke_interval < 0:
+            raise AirflowException(
+                "The poke_interval must be a non-negative number")
+        if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
+            raise AirflowException(
+                "The timeout must be a non-negative number")
+
+    @provide_session
+    def _load_sensor_works(self, session=None):
+        """
+        Refresh sensor instances need to be handled by this operator. Put the context,
+        hashcode and sensor instance start_date in a wrapper class
+        :param session:
+        :return:
+        """
+        SI = SensorInstance
+        start_query_time = time.time()
+        query = session.query(SI) \
+            .filter(SI.state == State.SENSING)\
+            .filter(SI.shardcode < self.shard_max,
+                    SI.shardcode >= self.shard_min)
+        tis = query.all()
+        session.commit()
+        self.log.info("Performance query %s tis, time: %s", len(tis), time.time() - start_query_time)
+
+        # Query without checking dagrun state might keep some failed dag_run tasks alive.
+        # Join with DagRun table will be very slow based on the number of sensor tasks we
+        # need to handle. We query all smart tasks in this operator
+        # and expect scheduler correct the states in _change_state_for_tis_without_dagrun()
+
+        sensor_works = []
+        for ti in tis:
+            try:
+                sensor_works.append(SensorWork(ti))
+            except Exception as e:  # pylint: disable=broad-except
+                self.log.exception("Exception at creating sensor work for ti %s", ti.key)
+                self.log.exception(e, exc_info=True)
+
+        self.log.info("%d tasks detected.", len(sensor_works))
+
+        new_sensor_works = [x for x in sensor_works if x not in self.sensor_works]
+
+        self._update_ti_hostname(new_sensor_works)
+
+        self.sensor_works = sensor_works
+
+    @provide_session
+    def _update_ti_hostname(self, sensor_works, session=None):
+        """
+        Update task instance hostname for new sensor works.
+        :param sensor_works:
+        :return:
+        """
+        TI = TaskInstance
+        ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in sensor_works]
+
+        def update_ti_hostname_with_count(count, ti_keys):
+            # Using or_ instead of in_ here to prevent from full table scan.
+            tis = session.query(TI) \
+                .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key
+                            for ti_key in ti_keys)) \
+                .all()
+
+            for ti in tis:
+                ti.hostname = self.hostname
+            session.commit()
+
+            return count + len(ti_keys)
+
+        count = helpers.reduce_in_chunks(update_ti_hostname_with_count, ti_keys, 0, self.max_tis_per_query)
+        if count:
+            self.log.info("Updated hostname on %s tis.", count)
+
+    @provide_session
+    def _mark_multi_state(self, operator, poke_hash, encoded_poke_context, state, session=None):
+        """
+        Mark state for multiple tasks that have hashcode=poke_hash.
+        :param poke_hash:
+        :param state:
+        :param session:
+        :return:
+        """
+
+        def mark_state(ti, sensor_instance):
+            ti.state = state
+            sensor_instance.state = state
+            if state in State.finished():
+                ti.end_date = end_date
+                ti.set_duration()
+
+        SI = SensorInstance
+        TI = TaskInstance
+
+        count_marked = 0
+        try:
+            query_result = session.query(TI, SI)\
+                .join(TI, and_(TI.dag_id == SI.dag_id,
+                               TI.task_id == SI.task_id,
+                               TI.execution_date == SI.execution_date)) \
+                .filter(SI.state == State.SENSING) \
+                .filter(SI.hashcode == poke_hash) \
+                .filter(SI.operator == operator) \
+                .with_for_update().all()
+
+            end_date = timezone.utcnow()
+            for ti, sensor_instance in query_result:
+                if sensor_instance.poke_context != encoded_poke_context:
+                    continue
+
+                ti.hostname = self.hostname
+                if ti.state == State.SENSING:
+                    mark_state(ti=ti, sensor_instance=sensor_instance)
+                    count_marked += 1
+                else:
+                    # ti.state != State.SENSING
+                    sensor_instance.state = ti.state
+
+            session.commit()
+
+        except Exception as e:  # pylint: disable=broad-except
+            self.log.exception("Exception _mark_multi_state in smart sensor for hashcode %s",
+                               str(poke_hash))
+            self.log.exception(e, exc_info=True)
+        self.log.info("Marked %s tasks out of %s to state %s", count_marked, len(query_result), state)
+
+    @provide_session
+    def _retry_or_fail_task(self, sensor_work, error, session=None):
+        """
+        Change single task state for sensor task. For final state, set the end_date.
+        Since smart sensor take care all retries in one process. Failed sensor tasks
+        logically experienced all retries and the try_nubmer should be set to max_tries.

Review comment:
       ```suggestion
           logically experienced all retries and the try_number should be set to max_tries.
   ```

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state
+    """
+    def __init__(self):
+        self.state = None
+        self.sensor_task = None
+        self.last_poke_time = None
+        self.to_flush = False
+
+    def set_state(self, state):
+        """
+        Set state for cached poke work.
+        :param state:
+        """
+        self.state = state
+        self.last_poke_time = timezone.utcnow()
+
+    def clear_state(self):
+        """
+        Clear state for cached poke work.
+        """
+        self.state = None
+
+    def set_to_flush(self):
+        """
+        Mark this poke work to be popped from cached dict after current loop.
+        """
+        self.to_flush = True
+
+    def is_expired(self):
+        """
+        The cached task object expires if there is no poke for 20 mins.
+        :return: Boolean
+        """
+        return self.to_flush or (timezone.utcnow() - self.last_poke_time).total_seconds() > 1200
+
+
+class SensorExceptionInfo:
+    """
+    Hold sensor exception information and the type of exception. For possible transient
+    infra failure, give the task more chance to retry before fail it.
+    """
+    def __init__(self,
+                 exception_info,
+                 is_infra_failure=False,
+                 infra_failure_retry_window=datetime.timedelta(minutes=130)):
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+        self._infra_failure_retry_window = infra_failure_retry_window
+
+        self._infra_failure_timeout = None
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_latest_exception(self, exception_info, is_infra_failure=False):
+        """
+        This function set the latest exception information for sensor exception. If the exception
+        implies an infra failure, this function will check the recorded infra failure timeout
+        which was set at the first infra failure exception arrives. There is a 6 hours window
+        for retry without failing current run.
+
+        :param exception_info:
+        :param is_infra_failure:
+        :return:
+        """
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_infra_failure_timeout(self):
+        """
+        Set the time point when the sensor should be failed if it kept getting infra
+        failure.
+        :return:
+        """
+        # Only set the infra_failure_timeout if there is no existing one
+        if not self._is_infra_failure:
+            self._infra_failure_timeout = None
+        elif self._infra_failure_timeout is None:
+            self._infra_failure_timeout = timezone.utcnow() + self._infra_failure_retry_window
+
+    def should_fail_current_run(self):
+        """
+        :return: Should the sensor fail
+        :type: boolean
+        """
+        return not self.is_infra_failure or timezone.utcnow() > self._infra_failure_timeout
+
+    @property
+    def exception_info(self):
+        """
+        :return: exception msg.
+        """
+        return self._exception_info
+
+    @property
+    def is_infra_failure(self):
+        """
+
+        :return: If the exception is an infra failure
+        :type: boolean
+        """
+        return self._is_infra_failure
+
+    def is_expired(self):
+        """
+        :return: If current exception need to be kept.
+        :type: boolean
+        """
+        if not self._is_infra_failure:
+            return True
+        return timezone.utcnow() > self._infra_failure_timeout + datetime.timedelta(minutes=30)
+
+
+class SmartSensorOperator(BaseOperator, SkipMixin):
+    """
+    Smart sensor operators are derived from this class.
+
+    Smart Sensor operators keep refresh a dictionary by visiting DB.
+    Taking qualified active sensor tasks. Different from sensor operator,
+    Smart sensor operators poke for all sensor tasks in the dictionary at
+    a time interval. When a criteria is met or fail by time out, it update
+    all sensor task state in task_instance table
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str
+    """
+    ui_color = '#e6f1f2'
+
+    @apply_defaults
+    def __init__(self,
+                 poke_interval=180,
+                 smart_sensor_timeout=60 * 60 * 24 * 7,
+                 soft_fail=False,
+                 shard_min=0,
+                 shard_max=100000,
+                 poke_exception_cache_ttl=600,
+                 poke_timeout=6,
+                 poke_exception_to_fail_task_threshold=3,
+                 *args,
+                 **kwargs):
+        super().__init__(*args, **kwargs)
+        # super(SmartSensorOperator, self).__init__(*args, **kwargs)
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        self.timeout = smart_sensor_timeout
+        self._validate_input_values()
+        self.hostname = ""
+
+        self.sensor_works = []
+        self.cached_dedup_works = {}
+        self.cached_sensor_exceptions = {}
+
+        self.max_tis_per_query = 50
+        self.shard_min = shard_min
+        self.shard_max = shard_max
+        self.poke_exception_cache_ttl = poke_exception_cache_ttl
+        self.poke_timeout = poke_timeout
+        self._poke_exception_to_fail_task_threshold = poke_exception_to_fail_task_threshold
+
+    def _validate_input_values(self):
+        if not isinstance(self.poke_interval, (int, float)) or self.poke_interval < 0:
+            raise AirflowException(
+                "The poke_interval must be a non-negative number")
+        if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
+            raise AirflowException(
+                "The timeout must be a non-negative number")
+
+    @provide_session
+    def _load_sensor_works(self, session=None):
+        """
+        Refresh sensor instances need to be handled by this operator. Put the context,
+        hashcode and sensor instance start_date in a wrapper class
+        :param session:
+        :return:
+        """
+        SI = SensorInstance
+        start_query_time = time.time()
+        query = session.query(SI) \
+            .filter(SI.state == State.SENSING)\
+            .filter(SI.shardcode < self.shard_max,
+                    SI.shardcode >= self.shard_min)
+        tis = query.all()
+        session.commit()
+        self.log.info("Performance query %s tis, time: %s", len(tis), time.time() - start_query_time)
+
+        # Query without checking dagrun state might keep some failed dag_run tasks alive.
+        # Join with DagRun table will be very slow based on the number of sensor tasks we
+        # need to handle. We query all smart tasks in this operator
+        # and expect scheduler correct the states in _change_state_for_tis_without_dagrun()
+
+        sensor_works = []
+        for ti in tis:
+            try:
+                sensor_works.append(SensorWork(ti))
+            except Exception as e:  # pylint: disable=broad-except
+                self.log.exception("Exception at creating sensor work for ti %s", ti.key)
+                self.log.exception(e, exc_info=True)
+
+        self.log.info("%d tasks detected.", len(sensor_works))
+
+        new_sensor_works = [x for x in sensor_works if x not in self.sensor_works]
+
+        self._update_ti_hostname(new_sensor_works)
+
+        self.sensor_works = sensor_works
+
+    @provide_session
+    def _update_ti_hostname(self, sensor_works, session=None):
+        """
+        Update task instance hostname for new sensor works.
+        :param sensor_works:
+        :return:
+        """
+        TI = TaskInstance
+        ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in sensor_works]
+
+        def update_ti_hostname_with_count(count, ti_keys):
+            # Using or_ instead of in_ here to prevent from full table scan.
+            tis = session.query(TI) \
+                .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key
+                            for ti_key in ti_keys)) \
+                .all()
+
+            for ti in tis:
+                ti.hostname = self.hostname
+            session.commit()
+
+            return count + len(ti_keys)
+
+        count = helpers.reduce_in_chunks(update_ti_hostname_with_count, ti_keys, 0, self.max_tis_per_query)
+        if count:
+            self.log.info("Updated hostname on %s tis.", count)
+
+    @provide_session
+    def _mark_multi_state(self, operator, poke_hash, encoded_poke_context, state, session=None):
+        """
+        Mark state for multiple tasks that have hashcode=poke_hash.
+        :param poke_hash:
+        :param state:
+        :param session:
+        :return:
+        """
+
+        def mark_state(ti, sensor_instance):
+            ti.state = state
+            sensor_instance.state = state
+            if state in State.finished():
+                ti.end_date = end_date
+                ti.set_duration()
+
+        SI = SensorInstance
+        TI = TaskInstance
+
+        count_marked = 0
+        try:
+            query_result = session.query(TI, SI)\
+                .join(TI, and_(TI.dag_id == SI.dag_id,
+                               TI.task_id == SI.task_id,
+                               TI.execution_date == SI.execution_date)) \
+                .filter(SI.state == State.SENSING) \
+                .filter(SI.hashcode == poke_hash) \
+                .filter(SI.operator == operator) \
+                .with_for_update().all()
+
+            end_date = timezone.utcnow()
+            for ti, sensor_instance in query_result:
+                if sensor_instance.poke_context != encoded_poke_context:
+                    continue
+
+                ti.hostname = self.hostname
+                if ti.state == State.SENSING:
+                    mark_state(ti=ti, sensor_instance=sensor_instance)
+                    count_marked += 1
+                else:
+                    # ti.state != State.SENSING
+                    sensor_instance.state = ti.state
+
+            session.commit()
+
+        except Exception as e:  # pylint: disable=broad-except
+            self.log.exception("Exception _mark_multi_state in smart sensor for hashcode %s",
+                               str(poke_hash))
+            self.log.exception(e, exc_info=True)
+        self.log.info("Marked %s tasks out of %s to state %s", count_marked, len(query_result), state)
+
+    @provide_session
+    def _retry_or_fail_task(self, sensor_work, error, session=None):
+        """
+        Change single task state for sensor task. For final state, set the end_date.
+        Since smart sensor take care all retries in one process. Failed sensor tasks
+        logically experienced all retries and the try_nubmer should be set to max_tries.
+        :param ti_key:
+        :param state:
+        :param session:
+        :return:

Review comment:
       Needs info

##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context else {}
+        self.execution_context = json.loads(ti.execution_context) if ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state
+    """
+    def __init__(self):
+        self.state = None
+        self.sensor_task = None
+        self.last_poke_time = None
+        self.to_flush = False
+
+    def set_state(self, state):
+        """
+        Set state for cached poke work.
+        :param state:
+        """
+        self.state = state
+        self.last_poke_time = timezone.utcnow()
+
+    def clear_state(self):
+        """
+        Clear state for cached poke work.
+        """
+        self.state = None
+
+    def set_to_flush(self):
+        """
+        Mark this poke work to be popped from cached dict after current loop.
+        """
+        self.to_flush = True
+
+    def is_expired(self):
+        """
+        The cached task object expires if there is no poke for 20 mins.
+        :return: Boolean
+        """
+        return self.to_flush or (timezone.utcnow() - self.last_poke_time).total_seconds() > 1200
+
+
+class SensorExceptionInfo:
+    """
+    Hold sensor exception information and the type of exception. For possible transient
+    infra failure, give the task more chance to retry before fail it.
+    """
+    def __init__(self,
+                 exception_info,
+                 is_infra_failure=False,
+                 infra_failure_retry_window=datetime.timedelta(minutes=130)):
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+        self._infra_failure_retry_window = infra_failure_retry_window
+
+        self._infra_failure_timeout = None
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_latest_exception(self, exception_info, is_infra_failure=False):
+        """
+        This function set the latest exception information for sensor exception. If the exception
+        implies an infra failure, this function will check the recorded infra failure timeout
+        which was set at the first infra failure exception arrives. There is a 6 hours window
+        for retry without failing current run.
+
+        :param exception_info:
+        :param is_infra_failure:
+        :return:
+        """
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_infra_failure_timeout(self):
+        """
+        Set the time point when the sensor should be failed if it kept getting infra
+        failure.
+        :return:
+        """
+        # Only set the infra_failure_timeout if there is no existing one
+        if not self._is_infra_failure:
+            self._infra_failure_timeout = None
+        elif self._infra_failure_timeout is None:
+            self._infra_failure_timeout = timezone.utcnow() + self._infra_failure_retry_window
+
+    def should_fail_current_run(self):
+        """
+        :return: Should the sensor fail
+        :type: boolean
+        """
+        return not self.is_infra_failure or timezone.utcnow() > self._infra_failure_timeout
+
+    @property
+    def exception_info(self):
+        """
+        :return: exception msg.
+        """
+        return self._exception_info
+
+    @property
+    def is_infra_failure(self):
+        """
+
+        :return: If the exception is an infra failure
+        :type: boolean
+        """
+        return self._is_infra_failure
+
+    def is_expired(self):
+        """
+        :return: If current exception need to be kept.
+        :type: boolean
+        """
+        if not self._is_infra_failure:
+            return True
+        return timezone.utcnow() > self._infra_failure_timeout + datetime.timedelta(minutes=30)
+
+
+class SmartSensorOperator(BaseOperator, SkipMixin):
+    """
+    Smart sensor operators are derived from this class.
+
+    Smart Sensor operators keep refresh a dictionary by visiting DB.
+    Taking qualified active sensor tasks. Different from sensor operator,
+    Smart sensor operators poke for all sensor tasks in the dictionary at
+    a time interval. When a criteria is met or fail by time out, it update
+    all sensor task state in task_instance table
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str
+    """
+    ui_color = '#e6f1f2'
+
+    @apply_defaults
+    def __init__(self,
+                 poke_interval=180,
+                 smart_sensor_timeout=60 * 60 * 24 * 7,
+                 soft_fail=False,
+                 shard_min=0,
+                 shard_max=100000,
+                 poke_exception_cache_ttl=600,
+                 poke_timeout=6,
+                 poke_exception_to_fail_task_threshold=3,
+                 *args,
+                 **kwargs):
+        super().__init__(*args, **kwargs)
+        # super(SmartSensorOperator, self).__init__(*args, **kwargs)
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        self.timeout = smart_sensor_timeout
+        self._validate_input_values()
+        self.hostname = ""
+
+        self.sensor_works = []
+        self.cached_dedup_works = {}
+        self.cached_sensor_exceptions = {}
+
+        self.max_tis_per_query = 50
+        self.shard_min = shard_min
+        self.shard_max = shard_max
+        self.poke_exception_cache_ttl = poke_exception_cache_ttl
+        self.poke_timeout = poke_timeout
+        self._poke_exception_to_fail_task_threshold = poke_exception_to_fail_task_threshold
+
+    def _validate_input_values(self):
+        if not isinstance(self.poke_interval, (int, float)) or self.poke_interval < 0:
+            raise AirflowException(
+                "The poke_interval must be a non-negative number")
+        if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
+            raise AirflowException(
+                "The timeout must be a non-negative number")
+
+    @provide_session
+    def _load_sensor_works(self, session=None):
+        """
+        Refresh sensor instances need to be handled by this operator. Put the context,
+        hashcode and sensor instance start_date in a wrapper class
+        :param session:
+        :return:
+        """
+        SI = SensorInstance
+        start_query_time = time.time()
+        query = session.query(SI) \
+            .filter(SI.state == State.SENSING)\
+            .filter(SI.shardcode < self.shard_max,
+                    SI.shardcode >= self.shard_min)
+        tis = query.all()
+        session.commit()
+        self.log.info("Performance query %s tis, time: %s", len(tis), time.time() - start_query_time)
+
+        # Query without checking dagrun state might keep some failed dag_run tasks alive.
+        # Join with DagRun table will be very slow based on the number of sensor tasks we
+        # need to handle. We query all smart tasks in this operator
+        # and expect scheduler correct the states in _change_state_for_tis_without_dagrun()
+
+        sensor_works = []
+        for ti in tis:
+            try:
+                sensor_works.append(SensorWork(ti))
+            except Exception as e:  # pylint: disable=broad-except
+                self.log.exception("Exception at creating sensor work for ti %s", ti.key)
+                self.log.exception(e, exc_info=True)
+
+        self.log.info("%d tasks detected.", len(sensor_works))
+
+        new_sensor_works = [x for x in sensor_works if x not in self.sensor_works]
+
+        self._update_ti_hostname(new_sensor_works)
+
+        self.sensor_works = sensor_works
+
+    @provide_session
+    def _update_ti_hostname(self, sensor_works, session=None):
+        """
+        Update task instance hostname for new sensor works.
+        :param sensor_works:
+        :return:
+        """
+        TI = TaskInstance
+        ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in sensor_works]
+
+        def update_ti_hostname_with_count(count, ti_keys):
+            # Using or_ instead of in_ here to prevent from full table scan.
+            tis = session.query(TI) \
+                .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key
+                            for ti_key in ti_keys)) \
+                .all()
+
+            for ti in tis:
+                ti.hostname = self.hostname
+            session.commit()
+
+            return count + len(ti_keys)
+
+        count = helpers.reduce_in_chunks(update_ti_hostname_with_count, ti_keys, 0, self.max_tis_per_query)
+        if count:
+            self.log.info("Updated hostname on %s tis.", count)
+
+    @provide_session
+    def _mark_multi_state(self, operator, poke_hash, encoded_poke_context, state, session=None):
+        """
+        Mark state for multiple tasks that have hashcode=poke_hash.
+        :param poke_hash:
+        :param state:
+        :param session:
+        :return:
+        """
+
+        def mark_state(ti, sensor_instance):
+            ti.state = state
+            sensor_instance.state = state
+            if state in State.finished():
+                ti.end_date = end_date
+                ti.set_duration()
+
+        SI = SensorInstance
+        TI = TaskInstance
+
+        count_marked = 0
+        try:
+            query_result = session.query(TI, SI)\
+                .join(TI, and_(TI.dag_id == SI.dag_id,
+                               TI.task_id == SI.task_id,
+                               TI.execution_date == SI.execution_date)) \
+                .filter(SI.state == State.SENSING) \
+                .filter(SI.hashcode == poke_hash) \
+                .filter(SI.operator == operator) \
+                .with_for_update().all()
+
+            end_date = timezone.utcnow()
+            for ti, sensor_instance in query_result:
+                if sensor_instance.poke_context != encoded_poke_context:
+                    continue
+
+                ti.hostname = self.hostname
+                if ti.state == State.SENSING:
+                    mark_state(ti=ti, sensor_instance=sensor_instance)
+                    count_marked += 1
+                else:
+                    # ti.state != State.SENSING
+                    sensor_instance.state = ti.state
+
+            session.commit()
+
+        except Exception as e:  # pylint: disable=broad-except
+            self.log.exception("Exception _mark_multi_state in smart sensor for hashcode %s",
+                               str(poke_hash))
+            self.log.exception(e, exc_info=True)
+        self.log.info("Marked %s tasks out of %s to state %s", count_marked, len(query_result), state)
+
+    @provide_session
+    def _retry_or_fail_task(self, sensor_work, error, session=None):
+        """
+        Change single task state for sensor task. For final state, set the end_date.
+        Since smart sensor take care all retries in one process. Failed sensor tasks
+        logically experienced all retries and the try_nubmer should be set to max_tries.
+        :param ti_key:
+        :param state:
+        :param session:
+        :return:
+        """
+        def email_alert(task_instance, error_info):
+            try:
+                subject, html_content, _ = task_instance.get_email_subject_content(error_info)
+                email = sensor_work.execution_context.get('email')
+
+                send_email(email, subject, html_content)
+            except Exception as e:  # pylint: disable=broad-except
+                sensor_work.log.warning("Exception alerting email.")
+                sensor_work.log.exception(e, exc_info=True)
+
+        def handle_failure(sensor_work, ti):
+            if sensor_work.execution_context.get('retries', None) and \
+                    ti.try_number <= ti.max_tries:
+                # retry
+                ti.state = State.UP_FOR_RETRY
+                if sensor_work.execution_context.get('email_on_retry', None) and \
+                        sensor_work.execution_context.get('email', None):
+                    sensor_work.log.info("%s sending email alert for retry", sensor_work.ti_key)
+                    email_alert(ti, error)
+            else:
+                ti.state = State.FAILED
+                if sensor_work.execution_context.get('email_on_failure', None) and \
+                        sensor_work.execution_context.get('email', None):
+                    sensor_work.log.info("%s sending email alert for failure", sensor_work.ti_key)
+                    email_alert(ti, error)
+
+        try:
+            dag_id, task_id, execution_date = sensor_work.ti_key
+            TI = TaskInstance
+            SI = SensorInstance
+            sensor_instance = session.query(SI).filter(
+                SI.dag_id == dag_id,
+                SI.task_id == task_id,
+                SI.execution_date == execution_date) \
+                .with_for_update() \
+                .first()
+
+            if sensor_instance.hashcode != sensor_work.hashcode:
+                # Return without setting state
+                return
+
+            ti = session.query(TI).filter(
+                TI.dag_id == dag_id,
+                TI.task_id == task_id,
+                TI.execution_date == execution_date) \
+                .with_for_update() \
+                .first()
+
+            if ti:
+                if ti.state == State.SENSING:
+                    ti.hostname = self.hostname
+                    handle_failure(sensor_work, ti)
+
+                    sensor_instance.state = State.FAILED
+                    ti.end_date = timezone.utcnow()
+                    ti.set_duration()
+                else:
+                    sensor_instance.state = ti.state
+                session.merge(sensor_instance)
+                session.merge(ti)
+                session.commit()
+
+                sensor_work.log.info("Task %s got an error: %s. Set the state to failed. Exit.",
+                                     str(sensor_work.ti_key), error)
+                sensor_work.close_sensor_logger()
+
+        except AirflowException as e:
+            sensor_work.log.warning("Exception on failing %s", sensor_work.ti_key)
+            sensor_work.log.exception(e, exc_info=True)
+
+    def _check_and_handle_ti_timeout(self, sensor_work):
+        """
+        Check if a sensor task in smart sensor is timeout. Could be either sensor operator timeout
+        or general operator execution_timeout.
+        :param task: SensorWork
+        :return:
+        """
+        task_timeout = sensor_work.execution_context.get('timeout', self.timeout)
+        task_execution_timeout = sensor_work.execution_context.get('execution_timeout', None)
+        if task_execution_timeout:
+            task_timeout = min(task_timeout, task_execution_timeout)
+
+        if (timezone.utcnow() - sensor_work.start_date).total_seconds() > task_timeout:
+            error = "Sensor Timeout"
+            sensor_work.log.exception(error)
+            self._retry_or_fail_task(sensor_work, error)
+
+    def _handle_poke_exception(self, sensor_work):
+        """
+        Fail task if accumulated exceptions exceeds retries.
+        :param sensor_work: SensorWork
+        :return:
+        """
+        sensor_exception = self.cached_sensor_exceptions.get(sensor_work.cache_key)
+        error = sensor_exception.exception_info
+        sensor_work.log.exception("Handling poke exception: %s", error)
+
+        if sensor_exception.fail_current_run:
+            if sensor_exception.is_infra_failure:
+                sensor_work.log.exception("Task %s failed by infra failure in smart sensor.",
+                                          sensor_work.ti_key)
+                # There is a risk for sensor object cached in smart sensor keep throwing
+                # exception and cause an infra failure. To make sure the sensor tasks after
+                # retry will not fall into same object and have endless infra failure,
+                # we mark the sensor task after an infra failure so that  it can be popped
+                # before next poke loop.
+                cache_key = sensor_work.cache_key
+                self.cached_dedup_works[cache_key].set_to_flush()
+            else:
+                sensor_work.log.exception("Task %s failed by exceptions.", sensor_work.ti_key)
+            self._retry_or_fail_task(sensor_work, error)
+        else:
+            sensor_work.log.info("Exception detected, retrying without failing current run.")
+            self._check_and_handle_ti_timeout(sensor_work)
+
+    def _process_sensor_work_with_cached_state(self, sensor_work, state):
+        if state == PokeState.LANDED:
+            sensor_work.log.info("Task %s succeeded", str(sensor_work.ti_key))
+            sensor_work.close_sensor_logger()
+
+        if state == PokeState.NOT_LANDED:
+            # Handle timeout if connection valid but not landed yet
+            self._check_and_handle_ti_timeout(sensor_work)
+        elif state == PokeState.POKE_EXCEPTION:
+            self._handle_poke_exception(sensor_work)
+
+    def _execute_sensor_work(self, sensor_work):
+        ti_key = sensor_work.ti_key
+        log = sensor_work.log or self.log
+        log.info("Sensing ti: %s", str(ti_key))
+        log.info("Poking with arguments: %s", sensor_work.encoded_poke_context)
+
+        cache_key = sensor_work.cache_key
+        if cache_key not in self.cached_dedup_works:
+            # create an empty cached_work for a new cache_key
+            self.cached_dedup_works[cache_key] = CachedPokeWork()
+
+        cached_work = self.cached_dedup_works[cache_key]
+
+        if cached_work.state is not None:
+            # Have a valid cached state, don't poke twice in certain time interval
+            self._process_sensor_work_with_cached_state(sensor_work, cached_work.state)
+            return
+
+        try:
+            with timeout(seconds=self.poke_timeout):
+                if self.poke(sensor_work):
+                    # Got a landed signal, mark all tasks waiting for this partition
+                    cached_work.set_state(PokeState.LANDED)
+
+                    self._mark_multi_state(sensor_work.operator,
+                                           sensor_work.hashcode,
+                                           sensor_work.encoded_poke_context,
+                                           State.SUCCESS)
+
+                    log.info("Task %s succeeded", str(ti_key))
+                    sensor_work.close_sensor_logger()
+                else:
+                    # Not landed yet. Handle possible timeout
+                    cached_work.set_state(PokeState.NOT_LANDED)
+                    self._check_and_handle_ti_timeout(sensor_work)
+
+                self.cached_sensor_exceptions.pop(cache_key, None)
+        except Exception as e:  # pylint: disable=broad-except
+            # The retry_infra_failure decorator inside hive_hooks will raise exception with
+            # is_infra_failure == True. Long poking timeout here is also considered an infra
+            # failure. Other exceptions should fail.
+            is_infra_failure = getattr(e, 'is_infra_failure', False) or isinstance(e, AirflowTaskTimeout)
+            exception_info = traceback.format_exc()
+            cached_work.set_state(PokeState.POKE_EXCEPTION)
+
+            if cache_key in self.cached_sensor_exceptions:
+                self.cached_sensor_exceptions[cache_key].set_latest_exception(
+                    exception_info,
+                    is_infra_failure=is_infra_failure)
+            else:
+                self.cached_sensor_exceptions[cache_key] = SensorExceptionInfo(
+                    exception_info,
+                    is_infra_failure=is_infra_failure)
+
+            self._handle_poke_exception(sensor_work)
+
+    def flush_cached_sensor_poke_results(self):
+        """
+        Flush outdated cached sensor states saved in previous loop.
+        :return:
+        """
+        for key, cached_work in self.cached_dedup_works.items():
+            if cached_work.is_expired():
+                self.cached_dedup_works.pop(key, None)
+            else:
+                cached_work.state = None
+
+        for ti_key, sensor_exception in self.cached_sensor_exceptions.items():
+            if sensor_exception.fail_current_run or sensor_exception.is_expired():
+                self.cached_sensor_exceptions.pop(ti_key, None)
+
+    def poke(self, sensor_work):
+        """
+        Function that the sensors defined while deriving this class should
+        override.
+        """
+        cached_work = self.cached_dedup_works[sensor_work.cache_key]
+        if not cached_work.sensor_task:
+            init_args = dict(list(sensor_work.poke_context.items())
+                             + [('task_id', sensor_work.task_id)])
+            operator_class = import_string(sensor_work.op_classpath)
+            cached_work.sensor_task = operator_class(**init_args)
+
+        return cached_work.sensor_task.poke(sensor_work.poke_context)
+
+    def _emit_loop_stats(self):
+        try:
+            count_poke = 0
+            count_poke_success = 0
+            count_poke_exception = 0
+            count_exception_failures = 0
+            count_infra_failure = 0
+            for cached_work in self.cached_dedup_works.values():
+                if cached_work.state is None:
+                    continue
+                count_poke += 1
+                if cached_work.state == PokeState.LANDED:
+                    count_poke_success += 1
+                elif cached_work.state == PokeState.POKE_EXCEPTION:
+                    count_poke_exception += 1
+            for cached_exception in self.cached_sensor_exceptions.values():
+                if cached_exception.is_infra_failure and cached_exception.fail_current_run:
+                    count_infra_failure += 1
+                if cached_exception.fail_current_run:
+                    count_exception_failures += 1
+
+            Stats.gauge("smart_sensor_operator.poked_tasks", count_poke)
+            Stats.gauge("smart_sensor_operator.poked_success", count_poke_success)
+            Stats.gauge("smart_sensor_operator.poked_exception", count_poke_exception)
+            Stats.gauge("smart_sensor_operator.exception_failures", count_exception_failures)
+            Stats.gauge("smart_sensor_operator.infra_failures", count_infra_failure)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log.exception("Exception at getting loop stats %s")
+            self.log.exception(e, exc_info=True)
+
+    def execute(self, context):
+        started_at = timezone.utcnow()
+
+        self.hostname = get_hostname()
+        while True:

Review comment:
       When they are no pokes needed, does the smart sensor DAG keeps running or is it marked complete




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org