You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/09 00:09:59 UTC

[airflow] branch master updated: Rename remaining Sensors to match AIP-21 (#12927)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new a075b6d  Rename remaining Sensors to match AIP-21 (#12927)
a075b6d is described below

commit a075b6df99a4f5e21d198f7be56b577432e6f9db
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Wed Dec 9 00:09:08 2020 +0000

    Rename remaining Sensors to match AIP-21 (#12927)
    
    As discussed in AIP-21
    
    * Rename airflow.sensors.external_task_sensor to airflow.sensors.external_task
    * Rename airflow.sensors.sql_sensor to airflow.sensors.sql
    * Rename airflow.contrib.sensors.weekday_sensor to airflow.sensors.weekday
---
 UPDATING.md                                        |   4 +
 airflow/contrib/sensors/weekday_sensor.py          |   2 +-
 .../example_external_task_marker_dag.py            |   2 +-
 airflow/models/dag.py                              |   2 +-
 .../apache/hive/sensors/metastore_partition.py     |   2 +-
 .../{external_task_sensor.py => external_task.py}  |   0
 airflow/sensors/external_task_sensor.py            | 298 +--------------------
 .../{smart_sensor_operator.py => smart_sensor.py}  |   0
 airflow/sensors/{sql_sensor.py => sql.py}          |   0
 airflow/sensors/sql_sensor.py                      | 105 +-------
 airflow/sensors/{weekday_sensor.py => weekday.py}  |   0
 airflow/serialization/serialized_objects.py        |   2 +-
 airflow/smart_sensor_dags/smart_sensor_group.py    |   2 +-
 .../refactor_provider_packages.py                  |   3 +
 .../howto/operator/external_task_sensor.rst        |   2 +-
 docs/apache-airflow/operators-and-hooks-ref.rst    |   4 +-
 tests/deprecated_classes.py                        |   2 +-
 tests/sensors/test_external_task_sensor.py         |   2 +-
 tests/sensors/test_smart_sensor_operator.py        |   2 +-
 tests/sensors/test_sql_sensor.py                   |  18 +-
 tests/sensors/test_weekday_sensor.py               |   2 +-
 21 files changed, 47 insertions(+), 407 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 8e356e8..1088e41 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -281,7 +281,11 @@ The following table shows changes in import paths.
 | airflow.operators.subdag_operator.SubDagOperator | airflow.operators.subdag.SubDagOperator |
 | airflow.sensors.base_sensor_operator.BaseSensorOperator | airflow.sensors.base.BaseSensorOperator |
 | airflow.sensors.date_time_sensor.DateTimeSensor | airflow.sensors.date_time.DateTimeSensor |
+| airflow.sensors.external_task_sensor.ExternalTaskMarker | airflow.sensors.external_task.ExternalTaskMarker |
+| airflow.sensors.external_task_sensor.ExternalTaskSensor | airflow.sensors.external_task.ExternalTaskSensor |
+| airflow.sensors.sql_sensor.SqlSensor | airflow.sensors.sql.SqlSensor |
 | airflow.sensors.time_delta_sensor.TimeDeltaSensor | airflow.sensors.time_delta.TimeDeltaSensor |
+| airflow.contrib.sensors.weekday_sensor.DayOfWeekSensor | airflow.sensors.weekday.DayOfWeekSensor |
 
 
 ### Database schema changes
diff --git a/airflow/contrib/sensors/weekday_sensor.py b/airflow/contrib/sensors/weekday_sensor.py
index 8f462bb..697f8f4 100644
--- a/airflow/contrib/sensors/weekday_sensor.py
+++ b/airflow/contrib/sensors/weekday_sensor.py
@@ -20,7 +20,7 @@
 import warnings
 
 # pylint: disable=unused-import
-from airflow.sensors.weekday_sensor import DayOfWeekSensor  # noqa
+from airflow.sensors.weekday import DayOfWeekSensor  # noqa
 
 warnings.warn(
     "This module is deprecated. Please use `airflow.sensors.weekday_sensor`.",
diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py
index effb509..9476080 100644
--- a/airflow/example_dags/example_external_task_marker_dag.py
+++ b/airflow/example_dags/example_external_task_marker_dag.py
@@ -41,7 +41,7 @@ import datetime
 
 from airflow import DAG
 from airflow.operators.dummy import DummyOperator
-from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor
+from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
 
 start_date = datetime.datetime(2015, 1, 1)
 
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index be6403c..1b639d5 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1224,7 +1224,7 @@ class DAG(LoggingMixin):
             tis = tis.filter(TI.state == State.RUNNING)
 
         if include_subdags:
-            from airflow.sensors.external_task_sensor import ExternalTaskMarker
+            from airflow.sensors.external_task import ExternalTaskMarker
 
             # Recursively find external tasks indicated by ExternalTaskMarker
             instances = tis.all()
diff --git a/airflow/providers/apache/hive/sensors/metastore_partition.py b/airflow/providers/apache/hive/sensors/metastore_partition.py
index 0955291..2333ef5 100644
--- a/airflow/providers/apache/hive/sensors/metastore_partition.py
+++ b/airflow/providers/apache/hive/sensors/metastore_partition.py
@@ -17,7 +17,7 @@
 # under the License.
 from typing import Any, Dict
 
-from airflow.sensors.sql_sensor import SqlSensor
+from airflow.sensors.sql import SqlSensor
 from airflow.utils.decorators import apply_defaults
 
 
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task.py
similarity index 100%
copy from airflow/sensors/external_task_sensor.py
copy to airflow/sensors/external_task.py
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
index 20b6f90..2842822 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -15,293 +15,17 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""This module is deprecated. Please use `airflow.sensors.external_task`."""
 
-import datetime
-import os
-from typing import Any, Callable, FrozenSet, Iterable, Optional, Union
+import warnings
 
-from sqlalchemy import func
+# pylint: disable=unused-import
+from airflow.sensors.external_task import (  # noqa
+    ExternalTaskMarker,
+    ExternalTaskSensor,
+    ExternalTaskSensorLink,
+)
 
-from airflow.exceptions import AirflowException
-from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
-from airflow.operators.dummy import DummyOperator
-from airflow.sensors.base import BaseSensorOperator
-from airflow.utils.decorators import apply_defaults
-from airflow.utils.helpers import build_airflow_url_with_query
-from airflow.utils.session import provide_session
-from airflow.utils.state import State
-
-
-class ExternalTaskSensorLink(BaseOperatorLink):
-    """
-    Operator link for ExternalTaskSensor. It allows users to access
-    DAG waited with ExternalTaskSensor.
-    """
-
-    name = 'External DAG'
-
-    def get_link(self, operator, dttm):
-        query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()}
-        return build_airflow_url_with_query(query)
-
-
-class ExternalTaskSensor(BaseSensorOperator):
-    """
-    Waits for a different DAG or a task in a different DAG to complete for a
-    specific execution_date
-
-    :param external_dag_id: The dag_id that contains the task you want to
-        wait for
-    :type external_dag_id: str
-    :param external_task_id: The task_id that contains the task you want to
-        wait for. If ``None`` (default value) the sensor waits for the DAG
-    :type external_task_id: str or None
-    :param allowed_states: Iterable of allowed states, default is ``['success']``
-    :type allowed_states: Iterable
-    :param failed_states: Iterable of failed or dis-allowed states, default is ``None``
-    :type failed_states: Iterable
-    :param execution_delta: time difference with the previous execution to
-        look at, the default is the same execution_date as the current task or DAG.
-        For yesterday, use [positive!] datetime.timedelta(days=1). Either
-        execution_delta or execution_date_fn can be passed to
-        ExternalTaskSensor, but not both.
-    :type execution_delta: Optional[datetime.timedelta]
-    :param execution_date_fn: function that receives the current execution date as the first
-        positional argument and optionally any number of keyword arguments available in the
-        context dictionary, and returns the desired execution dates to query.
-        Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor,
-        but not both.
-    :type execution_date_fn: Optional[Callable]
-    :param check_existence: Set to `True` to check if the external task exists (when
-        external_task_id is not None) or check if the DAG to wait for exists (when
-        external_task_id is None), and immediately cease waiting if the external task
-        or DAG does not exist (default value: False).
-    :type check_existence: bool
-    """
-
-    template_fields = ['external_dag_id', 'external_task_id']
-    ui_color = '#19647e'
-
-    @property
-    def operator_extra_links(self):
-        """Return operator extra links"""
-        return [ExternalTaskSensorLink()]
-
-    @apply_defaults
-    def __init__(
-        self,
-        *,
-        external_dag_id: str,
-        external_task_id: Optional[str] = None,
-        allowed_states: Optional[Iterable[str]] = None,
-        failed_states: Optional[Iterable[str]] = None,
-        execution_delta: Optional[datetime.timedelta] = None,
-        execution_date_fn: Optional[Callable] = None,
-        check_existence: bool = False,
-        **kwargs,
-    ):
-        super().__init__(**kwargs)
-        self.allowed_states = list(allowed_states) if allowed_states else [State.SUCCESS]
-        self.failed_states = list(failed_states) if failed_states else []
-
-        total_states = self.allowed_states + self.failed_states
-        total_states = set(total_states)
-
-        if set(self.failed_states).intersection(set(self.allowed_states)):
-            raise AirflowException(
-                "Duplicate values provided as allowed "
-                "`{}` and failed states `{}`".format(self.allowed_states, self.failed_states)
-            )
-
-        if external_task_id:
-            if not total_states <= set(State.task_states):
-                raise ValueError(
-                    f'Valid values for `allowed_states` and `failed_states` '
-                    f'when `external_task_id` is not `None`: {State.task_states}'
-                )
-        elif not total_states <= set(State.dag_states):
-            raise ValueError(
-                f'Valid values for `allowed_states` and `failed_states` '
-                f'when `external_task_id` is `None`: {State.dag_states}'
-            )
-
-        if execution_delta is not None and execution_date_fn is not None:
-            raise ValueError(
-                'Only one of `execution_delta` or `execution_date_fn` may '
-                'be provided to ExternalTaskSensor; not both.'
-            )
-
-        self.execution_delta = execution_delta
-        self.execution_date_fn = execution_date_fn
-        self.external_dag_id = external_dag_id
-        self.external_task_id = external_task_id
-        self.check_existence = check_existence
-        self._has_checked_existence = False
-
-    @provide_session
-    def poke(self, context, session=None):
-        if self.execution_delta:
-            dttm = context['execution_date'] - self.execution_delta
-        elif self.execution_date_fn:
-            dttm = self._handle_execution_date_fn(context=context)
-        else:
-            dttm = context['execution_date']
-
-        dttm_filter = dttm if isinstance(dttm, list) else [dttm]
-        serialized_dttm_filter = ','.join(dt.isoformat() for dt in dttm_filter)
-
-        self.log.info(
-            'Poking for %s.%s on %s ... ', self.external_dag_id, self.external_task_id, serialized_dttm_filter
-        )
-
-        # In poke mode this will check dag existence only once
-        if self.check_existence and not self._has_checked_existence:
-            self._check_for_existence(session=session)
-
-        count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
-
-        count_failed = -1
-        if self.failed_states:
-            count_failed = self.get_count(dttm_filter, session, self.failed_states)
-
-        if count_failed == len(dttm_filter):
-            if self.external_task_id:
-                raise AirflowException(
-                    f'The external task {self.external_task_id} in DAG {self.external_dag_id} failed.'
-                )
-            else:
-                raise AirflowException(f'The external DAG {self.external_dag_id} failed.')
-
-        return count_allowed == len(dttm_filter)
-
-    def _check_for_existence(self, session) -> None:
-        dag_to_wait = session.query(DagModel).filter(DagModel.dag_id == self.external_dag_id).first()
-
-        if not dag_to_wait:
-            raise AirflowException(f'The external DAG {self.external_dag_id} does not exist.')
-
-        if not os.path.exists(dag_to_wait.fileloc):
-            raise AirflowException(f'The external DAG {self.external_dag_id} was deleted.')
-
-        if self.external_task_id:
-            refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
-            if not refreshed_dag_info.has_task(self.external_task_id):
-                raise AirflowException(
-                    f'The external task {self.external_task_id} in '
-                    f'DAG {self.external_dag_id} does not exist.'
-                )
-        self._has_checked_existence = True
-
-    def get_count(self, dttm_filter, session, states) -> int:
-        """
-        Get the count of records against dttm filter and states
-
-        :param dttm_filter: date time filter for execution date
-        :type dttm_filter: list
-        :param session: airflow session object
-        :type session: SASession
-        :param states: task or dag states
-        :type states: list
-        :return: count of record against the filters
-        """
-        TI = TaskInstance
-        DR = DagRun
-        if self.external_task_id:
-            count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
-                .scalar()
-            )
-        else:
-            count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
-                .scalar()
-            )
-        return count
-
-    def _handle_execution_date_fn(self, context) -> Any:
-        """
-        This function is to handle backwards compatibility with how this operator was
-        previously where it only passes the execution date, but also allow for the newer
-        implementation to pass all context variables as keyword arguments, to allow
-        for more sophisticated returns of dates to return.
-        """
-        from airflow.utils.operator_helpers import make_kwargs_callable
-
-        # Remove "execution_date" because it is already a mandatory positional argument
-        execution_date = context["execution_date"]
-        kwargs = {k: v for k, v in context.items() if k != "execution_date"}
-        # Add "context" in the kwargs for backward compatibility (because context used to be
-        # an acceptable argument of execution_date_fn)
-        kwargs["context"] = context
-        kwargs_callable = make_kwargs_callable(self.execution_date_fn)
-        return kwargs_callable(execution_date, **kwargs)
-
-
-class ExternalTaskMarker(DummyOperator):
-    """
-    Use this operator to indicate that a task on a different DAG depends on this task.
-    When this task is cleared with "Recursive" selected, Airflow will clear the task on
-    the other DAG and its downstream tasks recursively. Transitive dependencies are followed
-    until the recursion_depth is reached.
-
-    :param external_dag_id: The dag_id that contains the dependent task that needs to be cleared.
-    :type external_dag_id: str
-    :param external_task_id: The task_id of the dependent task that needs to be cleared.
-    :type external_task_id: str
-    :param execution_date: The execution_date of the dependent task that needs to be cleared.
-    :type execution_date: str or datetime.datetime
-    :param recursion_depth: The maximum level of transitive dependencies allowed. Default is 10.
-        This is mostly used for preventing cyclic dependencies. It is fine to increase
-        this number if necessary. However, too many levels of transitive dependencies will make
-        it slower to clear tasks in the web UI.
-    """
-
-    template_fields = ['external_dag_id', 'external_task_id', 'execution_date']
-    ui_color = '#19647e'
-
-    # The _serialized_fields are lazily loaded when get_serialized_fields() method is called
-    __serialized_fields: Optional[FrozenSet[str]] = None
-
-    @apply_defaults
-    def __init__(
-        self,
-        *,
-        external_dag_id: str,
-        external_task_id: str,
-        execution_date: Optional[Union[str, datetime.datetime]] = "{{ execution_date.isoformat() }}",
-        recursion_depth: int = 10,
-        **kwargs,
-    ):
-        super().__init__(**kwargs)
-        self.external_dag_id = external_dag_id
-        self.external_task_id = external_task_id
-        if isinstance(execution_date, datetime.datetime):
-            self.execution_date = execution_date.isoformat()
-        elif isinstance(execution_date, str):
-            self.execution_date = execution_date
-        else:
-            raise TypeError(
-                f'Expected str or datetime.datetime type for execution_date. Got {type(execution_date)}'
-            )
-
-        if recursion_depth <= 0:
-            raise ValueError("recursion_depth should be a positive integer")
-        self.recursion_depth = recursion_depth
-
-    @classmethod
-    def get_serialized_fields(cls):
-        """Serialized ExternalTaskMarker contain exactly these fields + templated_fields ."""
-        if not cls.__serialized_fields:
-            cls.__serialized_fields = frozenset(super().get_serialized_fields() | {"recursion_depth"})
-        return cls.__serialized_fields
+warnings.warn(
+    "This module is deprecated. Please use `airflow.sensors.external_task`.", DeprecationWarning, stacklevel=2
+)
diff --git a/airflow/sensors/smart_sensor_operator.py b/airflow/sensors/smart_sensor.py
similarity index 100%
rename from airflow/sensors/smart_sensor_operator.py
rename to airflow/sensors/smart_sensor.py
diff --git a/airflow/sensors/sql_sensor.py b/airflow/sensors/sql.py
similarity index 100%
copy from airflow/sensors/sql_sensor.py
copy to airflow/sensors/sql.py
diff --git a/airflow/sensors/sql_sensor.py b/airflow/sensors/sql_sensor.py
index 573c7cd..da41ab2 100644
--- a/airflow/sensors/sql_sensor.py
+++ b/airflow/sensors/sql_sensor.py
@@ -15,104 +15,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""This module is deprecated. Please use `airflow.sensors.sql`."""
 
-from typing import Iterable
+import warnings
 
-from airflow.exceptions import AirflowException
-from airflow.hooks.base import BaseHook
-from airflow.sensors.base import BaseSensorOperator
-from airflow.utils.decorators import apply_defaults
+# pylint: disable=unused-import
+from airflow.sensors.sql import SqlSensor  # noqa
 
-
-class SqlSensor(BaseSensorOperator):
-    """
-    Runs a sql statement repeatedly until a criteria is met. It will keep trying until
-    success or failure criteria are met, or if the first cell is not in (0, '0', '', None).
-    Optional success and failure callables are called with the first cell returned as the argument.
-    If success callable is defined the sensor will keep retrying until the criteria is met.
-    If failure callable is defined and the criteria is met the sensor will raise AirflowException.
-    Failure criteria is evaluated before success criteria. A fail_on_empty boolean can also
-    be passed to the sensor in which case it will fail if no rows have been returned
-
-    :param conn_id: The connection to run the sensor against
-    :type conn_id: str
-    :param sql: The sql to run. To pass, it needs to return at least one cell
-        that contains a non-zero / empty string value.
-    :type sql: str
-    :param parameters: The parameters to render the SQL query with (optional).
-    :type parameters: dict or iterable
-    :param success: Success criteria for the sensor is a Callable that takes first_cell
-        as the only argument, and returns a boolean (optional).
-    :type: success: Optional<Callable[[Any], bool]>
-    :param failure: Failure criteria for the sensor is a Callable that takes first_cell
-        as the only argument and return a boolean (optional).
-    :type: failure: Optional<Callable[[Any], bool]>
-    :param fail_on_empty: Explicitly fail on no rows returned.
-    :type: fail_on_empty: bool
-    """
-
-    template_fields: Iterable[str] = ('sql',)
-    template_ext: Iterable[str] = (
-        '.hql',
-        '.sql',
-    )
-    ui_color = '#7c7287'
-
-    @apply_defaults
-    def __init__(
-        self, *, conn_id, sql, parameters=None, success=None, failure=None, fail_on_empty=False, **kwargs
-    ):
-        self.conn_id = conn_id
-        self.sql = sql
-        self.parameters = parameters
-        self.success = success
-        self.failure = failure
-        self.fail_on_empty = fail_on_empty
-        super().__init__(**kwargs)
-
-    def _get_hook(self):
-        conn = BaseHook.get_connection(self.conn_id)
-
-        allowed_conn_type = {
-            'google_cloud_platform',
-            'jdbc',
-            'mssql',
-            'mysql',
-            'odbc',
-            'oracle',
-            'postgres',
-            'presto',
-            'snowflake',
-            'sqlite',
-            'vertica',
-        }
-        if conn.conn_type not in allowed_conn_type:
-            raise AirflowException(
-                "The connection type is not supported by SqlSensor. "
-                + "Supported connection types: {}".format(list(allowed_conn_type))
-            )
-        return conn.get_hook()
-
-    def poke(self, context):
-        hook = self._get_hook()
-
-        self.log.info('Poking: %s (with parameters %s)', self.sql, self.parameters)
-        records = hook.get_records(self.sql, self.parameters)
-        if not records:
-            if self.fail_on_empty:
-                raise AirflowException("No rows returned, raising as per fail_on_empty flag")
-            else:
-                return False
-        first_cell = records[0][0]
-        if self.failure is not None:
-            if callable(self.failure):
-                if self.failure(first_cell):
-                    raise AirflowException(f"Failure criteria met. self.failure({first_cell}) returned True")
-            else:
-                raise AirflowException(f"self.failure is present, but not callable -> {self.failure}")
-        if self.success is not None:
-            if callable(self.success):
-                return self.success(first_cell)
-            else:
-                raise AirflowException(f"self.success is present, but not callable -> {self.success}")
-        return bool(first_cell)
+warnings.warn(
+    "This module is deprecated. Please use `airflow.sensors.sql`.", DeprecationWarning, stacklevel=2
+)
diff --git a/airflow/sensors/weekday_sensor.py b/airflow/sensors/weekday.py
similarity index 100%
rename from airflow/sensors/weekday_sensor.py
rename to airflow/sensors/weekday.py
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 4039d41..4f39114 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -66,7 +66,7 @@ log = logging.getLogger(__name__)
 
 _OPERATOR_EXTRA_LINKS: Set[str] = {
     "airflow.operators.dagrun_operator.TriggerDagRunLink",
-    "airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
+    "airflow.sensors.external_task.ExternalTaskSensorLink",
 }
 
 
diff --git a/airflow/smart_sensor_dags/smart_sensor_group.py b/airflow/smart_sensor_dags/smart_sensor_group.py
index fcd166a..696cf57 100644
--- a/airflow/smart_sensor_dags/smart_sensor_group.py
+++ b/airflow/smart_sensor_dags/smart_sensor_group.py
@@ -21,7 +21,7 @@ from datetime import timedelta
 
 from airflow.configuration import conf
 from airflow.models import DAG
-from airflow.sensors.smart_sensor_operator import SmartSensorOperator
+from airflow.sensors.smart_sensor import SmartSensorOperator
 from airflow.utils.dates import days_ago
 
 args = {
diff --git a/dev/provider_packages/refactor_provider_packages.py b/dev/provider_packages/refactor_provider_packages.py
index f6cca97..ea7b3fc 100755
--- a/dev/provider_packages/refactor_provider_packages.py
+++ b/dev/provider_packages/refactor_provider_packages.py
@@ -147,7 +147,10 @@ class RefactorBackportPackages:
             ("airflow.operators.python", "airflow.operators.python_operator"),
             ("airflow.sensors.base", "airflow.sensors.base_sensor_operator"),
             ("airflow.sensors.date_time", "airflow.sensors.date_time_sensor"),
+            ("airflow.sensors.external_task", "airflow.sensors.external_task_sensor"),
+            ("airflow.sensors.sql", "airflow.sensors.sql_sensor"),
             ("airflow.sensors.time_delta", "airflow.sensors.time_delta_sensor"),
+            ("airflow.sensors.weekday", "airflow.contrib.sensors.weekday_sensor"),
             ("airflow.utils.session", "airflow.utils.db"),
         ]
         for new, old in changes:
diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst
index 697e41a..eec8074 100644
--- a/docs/apache-airflow/howto/operator/external_task_sensor.rst
+++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst
@@ -38,7 +38,7 @@ DAGs.
 ExternalTaskSensor
 ^^^^^^^^^^^^^^^^^^
 
-Use the :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor` to make tasks on a DAG
+Use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
 wait for another task on a different DAG for a specific ``execution_date``.
 
 ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed
diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst
index ff45837..1d6d7f4 100644
--- a/docs/apache-airflow/operators-and-hooks-ref.rst
+++ b/docs/apache-airflow/operators-and-hooks-ref.rst
@@ -94,7 +94,7 @@ Airflow has many more integrations available for separate installation as a prov
    * - :mod:`airflow.sensors.date_time`
      -
 
-   * - :mod:`airflow.sensors.external_task_sensor`
+   * - :mod:`airflow.sensors.external_task`
      - :doc:`How to use <howto/operator/external_task_sensor>`
 
    * - :mod:`airflow.sensors.filesystem`
@@ -103,7 +103,7 @@ Airflow has many more integrations available for separate installation as a prov
    * - :mod:`airflow.sensors.python`
      -
 
-   * - :mod:`airflow.sensors.sql_sensor`
+   * - :mod:`airflow.sensors.sql`
      -
 
    * - :mod:`airflow.sensors.time_delta`
diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py
index 0463e0c..132986a 100644
--- a/tests/deprecated_classes.py
+++ b/tests/deprecated_classes.py
@@ -1456,7 +1456,7 @@ SENSORS = [
         'airflow.sensors.hdfs_sensor.HdfsSensor',
     ),
     (
-        'airflow.sensors.weekday_sensor.DayOfWeekSensor',
+        'airflow.sensors.weekday.DayOfWeekSensor',
         'airflow.contrib.sensors.weekday_sensor.DayOfWeekSensor',
     ),
     (
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 9542507..afa30e2 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -26,7 +26,7 @@ from airflow.models import DagBag, TaskInstance
 from airflow.models.dag import DAG
 from airflow.operators.bash import BashOperator
 from airflow.operators.dummy import DummyOperator
-from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor
+from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
 from airflow.sensors.time_sensor import TimeSensor
 from airflow.serialization.serialized_objects import SerializedBaseOperator
 from airflow.utils.state import State
diff --git a/tests/sensors/test_smart_sensor_operator.py b/tests/sensors/test_smart_sensor_operator.py
index cb8295e..5b798cf 100644
--- a/tests/sensors/test_smart_sensor_operator.py
+++ b/tests/sensors/test_smart_sensor_operator.py
@@ -30,7 +30,7 @@ from airflow.configuration import conf
 from airflow.models import DagRun, SensorInstance, TaskInstance
 from airflow.operators.dummy import DummyOperator
 from airflow.sensors.base import BaseSensorOperator
-from airflow.sensors.smart_sensor_operator import SmartSensorOperator
+from airflow.sensors.smart_sensor import SmartSensorOperator
 from airflow.utils import timezone
 from airflow.utils.state import State
 
diff --git a/tests/sensors/test_sql_sensor.py b/tests/sensors/test_sql_sensor.py
index 1dce2ed..88a1856 100644
--- a/tests/sensors/test_sql_sensor.py
+++ b/tests/sensors/test_sql_sensor.py
@@ -23,7 +23,7 @@ import pytest
 
 from airflow.exceptions import AirflowException
 from airflow.models.dag import DAG
-from airflow.sensors.sql_sensor import SqlSensor
+from airflow.sensors.sql import SqlSensor
 from airflow.utils.timezone import datetime
 from tests.providers.apache.hive import TestHiveEnvironment
 
@@ -86,7 +86,7 @@ class TestSqlSensor(TestHiveEnvironment):
         )
         op2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
-    @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+    @mock.patch('airflow.sensors.sql.BaseHook')
     def test_sql_sensor_postgres_poke(self, mock_hook):
         op = SqlSensor(
             task_id='sql_sensor_check',
@@ -118,7 +118,7 @@ class TestSqlSensor(TestHiveEnvironment):
         mock_get_records.return_value = [['1']]
         self.assertTrue(op.poke(None))
 
-    @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+    @mock.patch('airflow.sensors.sql.BaseHook')
     def test_sql_sensor_postgres_poke_fail_on_empty(self, mock_hook):
         op = SqlSensor(
             task_id='sql_sensor_check', conn_id='postgres_default', sql="SELECT 1", fail_on_empty=True
@@ -130,7 +130,7 @@ class TestSqlSensor(TestHiveEnvironment):
         mock_get_records.return_value = []
         self.assertRaises(AirflowException, op.poke, None)
 
-    @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+    @mock.patch('airflow.sensors.sql.BaseHook')
     def test_sql_sensor_postgres_poke_success(self, mock_hook):
         op = SqlSensor(
             task_id='sql_sensor_check', conn_id='postgres_default', sql="SELECT 1", success=lambda x: x in [1]
@@ -148,7 +148,7 @@ class TestSqlSensor(TestHiveEnvironment):
         mock_get_records.return_value = [['1']]
         self.assertFalse(op.poke(None))
 
-    @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+    @mock.patch('airflow.sensors.sql.BaseHook')
     def test_sql_sensor_postgres_poke_failure(self, mock_hook):
         op = SqlSensor(
             task_id='sql_sensor_check', conn_id='postgres_default', sql="SELECT 1", failure=lambda x: x in [1]
@@ -163,7 +163,7 @@ class TestSqlSensor(TestHiveEnvironment):
         mock_get_records.return_value = [[1]]
         self.assertRaises(AirflowException, op.poke, None)
 
-    @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+    @mock.patch('airflow.sensors.sql.BaseHook')
     def test_sql_sensor_postgres_poke_failure_success(self, mock_hook):
         op = SqlSensor(
             task_id='sql_sensor_check',
@@ -185,7 +185,7 @@ class TestSqlSensor(TestHiveEnvironment):
         mock_get_records.return_value = [[2]]
         self.assertTrue(op.poke(None))
 
-    @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+    @mock.patch('airflow.sensors.sql.BaseHook')
     def test_sql_sensor_postgres_poke_failure_success_same(self, mock_hook):
         op = SqlSensor(
             task_id='sql_sensor_check',
@@ -204,7 +204,7 @@ class TestSqlSensor(TestHiveEnvironment):
         mock_get_records.return_value = [[1]]
         self.assertRaises(AirflowException, op.poke, None)
 
-    @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+    @mock.patch('airflow.sensors.sql.BaseHook')
     def test_sql_sensor_postgres_poke_invalid_failure(self, mock_hook):
         op = SqlSensor(
             task_id='sql_sensor_check',
@@ -221,7 +221,7 @@ class TestSqlSensor(TestHiveEnvironment):
             op.poke(None)
         self.assertEqual("self.failure is present, but not callable -> [1]", str(e.exception))
 
-    @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+    @mock.patch('airflow.sensors.sql.BaseHook')
     def test_sql_sensor_postgres_poke_invalid_success(self, mock_hook):
         op = SqlSensor(
             task_id='sql_sensor_check',
diff --git a/tests/sensors/test_weekday_sensor.py b/tests/sensors/test_weekday_sensor.py
index 26b65a0..f632e82 100644
--- a/tests/sensors/test_weekday_sensor.py
+++ b/tests/sensors/test_weekday_sensor.py
@@ -24,7 +24,7 @@ from parameterized import parameterized
 from airflow.exceptions import AirflowSensorTimeout
 from airflow.models import DagBag
 from airflow.models.dag import DAG
-from airflow.sensors.weekday_sensor import DayOfWeekSensor
+from airflow.sensors.weekday import DayOfWeekSensor
 from airflow.utils.timezone import datetime
 from airflow.utils.weekday import WeekDay
 from tests.test_utils import db