You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/09 00:09:59 UTC
[airflow] branch master updated: Rename remaining Sensors to match
AIP-21 (#12927)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new a075b6d Rename remaining Sensors to match AIP-21 (#12927)
a075b6d is described below
commit a075b6df99a4f5e21d198f7be56b577432e6f9db
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Wed Dec 9 00:09:08 2020 +0000
Rename remaining Sensors to match AIP-21 (#12927)
As discussed in AIP-21
* Rename airflow.sensors.external_task_sensor to airflow.sensors.external_task
* Rename airflow.sensors.sql_sensor to airflow.sensors.sql
* Rename airflow.contrib.sensors.weekday_sensor to airflow.sensors.weekday
---
UPDATING.md | 4 +
airflow/contrib/sensors/weekday_sensor.py | 2 +-
.../example_external_task_marker_dag.py | 2 +-
airflow/models/dag.py | 2 +-
.../apache/hive/sensors/metastore_partition.py | 2 +-
.../{external_task_sensor.py => external_task.py} | 0
airflow/sensors/external_task_sensor.py | 298 +--------------------
.../{smart_sensor_operator.py => smart_sensor.py} | 0
airflow/sensors/{sql_sensor.py => sql.py} | 0
airflow/sensors/sql_sensor.py | 105 +-------
airflow/sensors/{weekday_sensor.py => weekday.py} | 0
airflow/serialization/serialized_objects.py | 2 +-
airflow/smart_sensor_dags/smart_sensor_group.py | 2 +-
.../refactor_provider_packages.py | 3 +
.../howto/operator/external_task_sensor.rst | 2 +-
docs/apache-airflow/operators-and-hooks-ref.rst | 4 +-
tests/deprecated_classes.py | 2 +-
tests/sensors/test_external_task_sensor.py | 2 +-
tests/sensors/test_smart_sensor_operator.py | 2 +-
tests/sensors/test_sql_sensor.py | 18 +-
tests/sensors/test_weekday_sensor.py | 2 +-
21 files changed, 47 insertions(+), 407 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index 8e356e8..1088e41 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -281,7 +281,11 @@ The following table shows changes in import paths.
| airflow.operators.subdag_operator.SubDagOperator | airflow.operators.subdag.SubDagOperator |
| airflow.sensors.base_sensor_operator.BaseSensorOperator | airflow.sensors.base.BaseSensorOperator |
| airflow.sensors.date_time_sensor.DateTimeSensor | airflow.sensors.date_time.DateTimeSensor |
+| airflow.sensors.external_task_sensor.ExternalTaskMarker | airflow.sensors.external_task.ExternalTaskMarker |
+| airflow.sensors.external_task_sensor.ExternalTaskSensor | airflow.sensors.external_task.ExternalTaskSensor |
+| airflow.sensors.sql_sensor.SqlSensor | airflow.sensors.sql.SqlSensor |
| airflow.sensors.time_delta_sensor.TimeDeltaSensor | airflow.sensors.time_delta.TimeDeltaSensor |
+| airflow.contrib.sensors.weekday_sensor.DayOfWeekSensor | airflow.sensors.weekday.DayOfWeekSensor |
### Database schema changes
diff --git a/airflow/contrib/sensors/weekday_sensor.py b/airflow/contrib/sensors/weekday_sensor.py
index 8f462bb..697f8f4 100644
--- a/airflow/contrib/sensors/weekday_sensor.py
+++ b/airflow/contrib/sensors/weekday_sensor.py
@@ -20,7 +20,7 @@
import warnings
# pylint: disable=unused-import
-from airflow.sensors.weekday_sensor import DayOfWeekSensor # noqa
+from airflow.sensors.weekday import DayOfWeekSensor # noqa
warnings.warn(
"This module is deprecated. Please use `airflow.sensors.weekday_sensor`.",
diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py
index effb509..9476080 100644
--- a/airflow/example_dags/example_external_task_marker_dag.py
+++ b/airflow/example_dags/example_external_task_marker_dag.py
@@ -41,7 +41,7 @@ import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
-from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor
+from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
start_date = datetime.datetime(2015, 1, 1)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index be6403c..1b639d5 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1224,7 +1224,7 @@ class DAG(LoggingMixin):
tis = tis.filter(TI.state == State.RUNNING)
if include_subdags:
- from airflow.sensors.external_task_sensor import ExternalTaskMarker
+ from airflow.sensors.external_task import ExternalTaskMarker
# Recursively find external tasks indicated by ExternalTaskMarker
instances = tis.all()
diff --git a/airflow/providers/apache/hive/sensors/metastore_partition.py b/airflow/providers/apache/hive/sensors/metastore_partition.py
index 0955291..2333ef5 100644
--- a/airflow/providers/apache/hive/sensors/metastore_partition.py
+++ b/airflow/providers/apache/hive/sensors/metastore_partition.py
@@ -17,7 +17,7 @@
# under the License.
from typing import Any, Dict
-from airflow.sensors.sql_sensor import SqlSensor
+from airflow.sensors.sql import SqlSensor
from airflow.utils.decorators import apply_defaults
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task.py
similarity index 100%
copy from airflow/sensors/external_task_sensor.py
copy to airflow/sensors/external_task.py
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
index 20b6f90..2842822 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -15,293 +15,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""This module is deprecated. Please use `airflow.sensors.external_task`."""
-import datetime
-import os
-from typing import Any, Callable, FrozenSet, Iterable, Optional, Union
+import warnings
-from sqlalchemy import func
+# pylint: disable=unused-import
+from airflow.sensors.external_task import ( # noqa
+ ExternalTaskMarker,
+ ExternalTaskSensor,
+ ExternalTaskSensorLink,
+)
-from airflow.exceptions import AirflowException
-from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
-from airflow.operators.dummy import DummyOperator
-from airflow.sensors.base import BaseSensorOperator
-from airflow.utils.decorators import apply_defaults
-from airflow.utils.helpers import build_airflow_url_with_query
-from airflow.utils.session import provide_session
-from airflow.utils.state import State
-
-
-class ExternalTaskSensorLink(BaseOperatorLink):
- """
- Operator link for ExternalTaskSensor. It allows users to access
- DAG waited with ExternalTaskSensor.
- """
-
- name = 'External DAG'
-
- def get_link(self, operator, dttm):
- query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()}
- return build_airflow_url_with_query(query)
-
-
-class ExternalTaskSensor(BaseSensorOperator):
- """
- Waits for a different DAG or a task in a different DAG to complete for a
- specific execution_date
-
- :param external_dag_id: The dag_id that contains the task you want to
- wait for
- :type external_dag_id: str
- :param external_task_id: The task_id that contains the task you want to
- wait for. If ``None`` (default value) the sensor waits for the DAG
- :type external_task_id: str or None
- :param allowed_states: Iterable of allowed states, default is ``['success']``
- :type allowed_states: Iterable
- :param failed_states: Iterable of failed or dis-allowed states, default is ``None``
- :type failed_states: Iterable
- :param execution_delta: time difference with the previous execution to
- look at, the default is the same execution_date as the current task or DAG.
- For yesterday, use [positive!] datetime.timedelta(days=1). Either
- execution_delta or execution_date_fn can be passed to
- ExternalTaskSensor, but not both.
- :type execution_delta: Optional[datetime.timedelta]
- :param execution_date_fn: function that receives the current execution date as the first
- positional argument and optionally any number of keyword arguments available in the
- context dictionary, and returns the desired execution dates to query.
- Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor,
- but not both.
- :type execution_date_fn: Optional[Callable]
- :param check_existence: Set to `True` to check if the external task exists (when
- external_task_id is not None) or check if the DAG to wait for exists (when
- external_task_id is None), and immediately cease waiting if the external task
- or DAG does not exist (default value: False).
- :type check_existence: bool
- """
-
- template_fields = ['external_dag_id', 'external_task_id']
- ui_color = '#19647e'
-
- @property
- def operator_extra_links(self):
- """Return operator extra links"""
- return [ExternalTaskSensorLink()]
-
- @apply_defaults
- def __init__(
- self,
- *,
- external_dag_id: str,
- external_task_id: Optional[str] = None,
- allowed_states: Optional[Iterable[str]] = None,
- failed_states: Optional[Iterable[str]] = None,
- execution_delta: Optional[datetime.timedelta] = None,
- execution_date_fn: Optional[Callable] = None,
- check_existence: bool = False,
- **kwargs,
- ):
- super().__init__(**kwargs)
- self.allowed_states = list(allowed_states) if allowed_states else [State.SUCCESS]
- self.failed_states = list(failed_states) if failed_states else []
-
- total_states = self.allowed_states + self.failed_states
- total_states = set(total_states)
-
- if set(self.failed_states).intersection(set(self.allowed_states)):
- raise AirflowException(
- "Duplicate values provided as allowed "
- "`{}` and failed states `{}`".format(self.allowed_states, self.failed_states)
- )
-
- if external_task_id:
- if not total_states <= set(State.task_states):
- raise ValueError(
- f'Valid values for `allowed_states` and `failed_states` '
- f'when `external_task_id` is not `None`: {State.task_states}'
- )
- elif not total_states <= set(State.dag_states):
- raise ValueError(
- f'Valid values for `allowed_states` and `failed_states` '
- f'when `external_task_id` is `None`: {State.dag_states}'
- )
-
- if execution_delta is not None and execution_date_fn is not None:
- raise ValueError(
- 'Only one of `execution_delta` or `execution_date_fn` may '
- 'be provided to ExternalTaskSensor; not both.'
- )
-
- self.execution_delta = execution_delta
- self.execution_date_fn = execution_date_fn
- self.external_dag_id = external_dag_id
- self.external_task_id = external_task_id
- self.check_existence = check_existence
- self._has_checked_existence = False
-
- @provide_session
- def poke(self, context, session=None):
- if self.execution_delta:
- dttm = context['execution_date'] - self.execution_delta
- elif self.execution_date_fn:
- dttm = self._handle_execution_date_fn(context=context)
- else:
- dttm = context['execution_date']
-
- dttm_filter = dttm if isinstance(dttm, list) else [dttm]
- serialized_dttm_filter = ','.join(dt.isoformat() for dt in dttm_filter)
-
- self.log.info(
- 'Poking for %s.%s on %s ... ', self.external_dag_id, self.external_task_id, serialized_dttm_filter
- )
-
- # In poke mode this will check dag existence only once
- if self.check_existence and not self._has_checked_existence:
- self._check_for_existence(session=session)
-
- count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
-
- count_failed = -1
- if self.failed_states:
- count_failed = self.get_count(dttm_filter, session, self.failed_states)
-
- if count_failed == len(dttm_filter):
- if self.external_task_id:
- raise AirflowException(
- f'The external task {self.external_task_id} in DAG {self.external_dag_id} failed.'
- )
- else:
- raise AirflowException(f'The external DAG {self.external_dag_id} failed.')
-
- return count_allowed == len(dttm_filter)
-
- def _check_for_existence(self, session) -> None:
- dag_to_wait = session.query(DagModel).filter(DagModel.dag_id == self.external_dag_id).first()
-
- if not dag_to_wait:
- raise AirflowException(f'The external DAG {self.external_dag_id} does not exist.')
-
- if not os.path.exists(dag_to_wait.fileloc):
- raise AirflowException(f'The external DAG {self.external_dag_id} was deleted.')
-
- if self.external_task_id:
- refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
- if not refreshed_dag_info.has_task(self.external_task_id):
- raise AirflowException(
- f'The external task {self.external_task_id} in '
- f'DAG {self.external_dag_id} does not exist.'
- )
- self._has_checked_existence = True
-
- def get_count(self, dttm_filter, session, states) -> int:
- """
- Get the count of records against dttm filter and states
-
- :param dttm_filter: date time filter for execution date
- :type dttm_filter: list
- :param session: airflow session object
- :type session: SASession
- :param states: task or dag states
- :type states: list
- :return: count of record against the filters
- """
- TI = TaskInstance
- DR = DagRun
- if self.external_task_id:
- count = (
- session.query(func.count()) # .count() is inefficient
- .filter(
- TI.dag_id == self.external_dag_id,
- TI.task_id == self.external_task_id,
- TI.state.in_(states), # pylint: disable=no-member
- TI.execution_date.in_(dttm_filter),
- )
- .scalar()
- )
- else:
- count = (
- session.query(func.count())
- .filter(
- DR.dag_id == self.external_dag_id,
- DR.state.in_(states), # pylint: disable=no-member
- DR.execution_date.in_(dttm_filter),
- )
- .scalar()
- )
- return count
-
- def _handle_execution_date_fn(self, context) -> Any:
- """
- This function is to handle backwards compatibility with how this operator was
- previously where it only passes the execution date, but also allow for the newer
- implementation to pass all context variables as keyword arguments, to allow
- for more sophisticated returns of dates to return.
- """
- from airflow.utils.operator_helpers import make_kwargs_callable
-
- # Remove "execution_date" because it is already a mandatory positional argument
- execution_date = context["execution_date"]
- kwargs = {k: v for k, v in context.items() if k != "execution_date"}
- # Add "context" in the kwargs for backward compatibility (because context used to be
- # an acceptable argument of execution_date_fn)
- kwargs["context"] = context
- kwargs_callable = make_kwargs_callable(self.execution_date_fn)
- return kwargs_callable(execution_date, **kwargs)
-
-
-class ExternalTaskMarker(DummyOperator):
- """
- Use this operator to indicate that a task on a different DAG depends on this task.
- When this task is cleared with "Recursive" selected, Airflow will clear the task on
- the other DAG and its downstream tasks recursively. Transitive dependencies are followed
- until the recursion_depth is reached.
-
- :param external_dag_id: The dag_id that contains the dependent task that needs to be cleared.
- :type external_dag_id: str
- :param external_task_id: The task_id of the dependent task that needs to be cleared.
- :type external_task_id: str
- :param execution_date: The execution_date of the dependent task that needs to be cleared.
- :type execution_date: str or datetime.datetime
- :param recursion_depth: The maximum level of transitive dependencies allowed. Default is 10.
- This is mostly used for preventing cyclic dependencies. It is fine to increase
- this number if necessary. However, too many levels of transitive dependencies will make
- it slower to clear tasks in the web UI.
- """
-
- template_fields = ['external_dag_id', 'external_task_id', 'execution_date']
- ui_color = '#19647e'
-
- # The _serialized_fields are lazily loaded when get_serialized_fields() method is called
- __serialized_fields: Optional[FrozenSet[str]] = None
-
- @apply_defaults
- def __init__(
- self,
- *,
- external_dag_id: str,
- external_task_id: str,
- execution_date: Optional[Union[str, datetime.datetime]] = "{{ execution_date.isoformat() }}",
- recursion_depth: int = 10,
- **kwargs,
- ):
- super().__init__(**kwargs)
- self.external_dag_id = external_dag_id
- self.external_task_id = external_task_id
- if isinstance(execution_date, datetime.datetime):
- self.execution_date = execution_date.isoformat()
- elif isinstance(execution_date, str):
- self.execution_date = execution_date
- else:
- raise TypeError(
- f'Expected str or datetime.datetime type for execution_date. Got {type(execution_date)}'
- )
-
- if recursion_depth <= 0:
- raise ValueError("recursion_depth should be a positive integer")
- self.recursion_depth = recursion_depth
-
- @classmethod
- def get_serialized_fields(cls):
- """Serialized ExternalTaskMarker contain exactly these fields + templated_fields ."""
- if not cls.__serialized_fields:
- cls.__serialized_fields = frozenset(super().get_serialized_fields() | {"recursion_depth"})
- return cls.__serialized_fields
+warnings.warn(
+ "This module is deprecated. Please use `airflow.sensors.external_task`.", DeprecationWarning, stacklevel=2
+)
diff --git a/airflow/sensors/smart_sensor_operator.py b/airflow/sensors/smart_sensor.py
similarity index 100%
rename from airflow/sensors/smart_sensor_operator.py
rename to airflow/sensors/smart_sensor.py
diff --git a/airflow/sensors/sql_sensor.py b/airflow/sensors/sql.py
similarity index 100%
copy from airflow/sensors/sql_sensor.py
copy to airflow/sensors/sql.py
diff --git a/airflow/sensors/sql_sensor.py b/airflow/sensors/sql_sensor.py
index 573c7cd..da41ab2 100644
--- a/airflow/sensors/sql_sensor.py
+++ b/airflow/sensors/sql_sensor.py
@@ -15,104 +15,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""This module is deprecated. Please use `airflow.sensors.sql`."""
-from typing import Iterable
+import warnings
-from airflow.exceptions import AirflowException
-from airflow.hooks.base import BaseHook
-from airflow.sensors.base import BaseSensorOperator
-from airflow.utils.decorators import apply_defaults
+# pylint: disable=unused-import
+from airflow.sensors.sql import SqlSensor # noqa
-
-class SqlSensor(BaseSensorOperator):
- """
- Runs a sql statement repeatedly until a criteria is met. It will keep trying until
- success or failure criteria are met, or if the first cell is not in (0, '0', '', None).
- Optional success and failure callables are called with the first cell returned as the argument.
- If success callable is defined the sensor will keep retrying until the criteria is met.
- If failure callable is defined and the criteria is met the sensor will raise AirflowException.
- Failure criteria is evaluated before success criteria. A fail_on_empty boolean can also
- be passed to the sensor in which case it will fail if no rows have been returned
-
- :param conn_id: The connection to run the sensor against
- :type conn_id: str
- :param sql: The sql to run. To pass, it needs to return at least one cell
- that contains a non-zero / empty string value.
- :type sql: str
- :param parameters: The parameters to render the SQL query with (optional).
- :type parameters: dict or iterable
- :param success: Success criteria for the sensor is a Callable that takes first_cell
- as the only argument, and returns a boolean (optional).
- :type: success: Optional<Callable[[Any], bool]>
- :param failure: Failure criteria for the sensor is a Callable that takes first_cell
- as the only argument and return a boolean (optional).
- :type: failure: Optional<Callable[[Any], bool]>
- :param fail_on_empty: Explicitly fail on no rows returned.
- :type: fail_on_empty: bool
- """
-
- template_fields: Iterable[str] = ('sql',)
- template_ext: Iterable[str] = (
- '.hql',
- '.sql',
- )
- ui_color = '#7c7287'
-
- @apply_defaults
- def __init__(
- self, *, conn_id, sql, parameters=None, success=None, failure=None, fail_on_empty=False, **kwargs
- ):
- self.conn_id = conn_id
- self.sql = sql
- self.parameters = parameters
- self.success = success
- self.failure = failure
- self.fail_on_empty = fail_on_empty
- super().__init__(**kwargs)
-
- def _get_hook(self):
- conn = BaseHook.get_connection(self.conn_id)
-
- allowed_conn_type = {
- 'google_cloud_platform',
- 'jdbc',
- 'mssql',
- 'mysql',
- 'odbc',
- 'oracle',
- 'postgres',
- 'presto',
- 'snowflake',
- 'sqlite',
- 'vertica',
- }
- if conn.conn_type not in allowed_conn_type:
- raise AirflowException(
- "The connection type is not supported by SqlSensor. "
- + "Supported connection types: {}".format(list(allowed_conn_type))
- )
- return conn.get_hook()
-
- def poke(self, context):
- hook = self._get_hook()
-
- self.log.info('Poking: %s (with parameters %s)', self.sql, self.parameters)
- records = hook.get_records(self.sql, self.parameters)
- if not records:
- if self.fail_on_empty:
- raise AirflowException("No rows returned, raising as per fail_on_empty flag")
- else:
- return False
- first_cell = records[0][0]
- if self.failure is not None:
- if callable(self.failure):
- if self.failure(first_cell):
- raise AirflowException(f"Failure criteria met. self.failure({first_cell}) returned True")
- else:
- raise AirflowException(f"self.failure is present, but not callable -> {self.failure}")
- if self.success is not None:
- if callable(self.success):
- return self.success(first_cell)
- else:
- raise AirflowException(f"self.success is present, but not callable -> {self.success}")
- return bool(first_cell)
+warnings.warn(
+ "This module is deprecated. Please use `airflow.sensors.sql`.", DeprecationWarning, stacklevel=2
+)
diff --git a/airflow/sensors/weekday_sensor.py b/airflow/sensors/weekday.py
similarity index 100%
rename from airflow/sensors/weekday_sensor.py
rename to airflow/sensors/weekday.py
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 4039d41..4f39114 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -66,7 +66,7 @@ log = logging.getLogger(__name__)
_OPERATOR_EXTRA_LINKS: Set[str] = {
"airflow.operators.dagrun_operator.TriggerDagRunLink",
- "airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
+ "airflow.sensors.external_task.ExternalTaskSensorLink",
}
diff --git a/airflow/smart_sensor_dags/smart_sensor_group.py b/airflow/smart_sensor_dags/smart_sensor_group.py
index fcd166a..696cf57 100644
--- a/airflow/smart_sensor_dags/smart_sensor_group.py
+++ b/airflow/smart_sensor_dags/smart_sensor_group.py
@@ -21,7 +21,7 @@ from datetime import timedelta
from airflow.configuration import conf
from airflow.models import DAG
-from airflow.sensors.smart_sensor_operator import SmartSensorOperator
+from airflow.sensors.smart_sensor import SmartSensorOperator
from airflow.utils.dates import days_ago
args = {
diff --git a/dev/provider_packages/refactor_provider_packages.py b/dev/provider_packages/refactor_provider_packages.py
index f6cca97..ea7b3fc 100755
--- a/dev/provider_packages/refactor_provider_packages.py
+++ b/dev/provider_packages/refactor_provider_packages.py
@@ -147,7 +147,10 @@ class RefactorBackportPackages:
("airflow.operators.python", "airflow.operators.python_operator"),
("airflow.sensors.base", "airflow.sensors.base_sensor_operator"),
("airflow.sensors.date_time", "airflow.sensors.date_time_sensor"),
+ ("airflow.sensors.external_task", "airflow.sensors.external_task_sensor"),
+ ("airflow.sensors.sql", "airflow.sensors.sql_sensor"),
("airflow.sensors.time_delta", "airflow.sensors.time_delta_sensor"),
+ ("airflow.sensors.weekday", "airflow.contrib.sensors.weekday_sensor"),
("airflow.utils.session", "airflow.utils.db"),
]
for new, old in changes:
diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst
index 697e41a..eec8074 100644
--- a/docs/apache-airflow/howto/operator/external_task_sensor.rst
+++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst
@@ -38,7 +38,7 @@ DAGs.
ExternalTaskSensor
^^^^^^^^^^^^^^^^^^
-Use the :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor` to make tasks on a DAG
+Use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
wait for another task on a different DAG for a specific ``execution_date``.
ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed
diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst
index ff45837..1d6d7f4 100644
--- a/docs/apache-airflow/operators-and-hooks-ref.rst
+++ b/docs/apache-airflow/operators-and-hooks-ref.rst
@@ -94,7 +94,7 @@ Airflow has many more integrations available for separate installation as a prov
* - :mod:`airflow.sensors.date_time`
-
- * - :mod:`airflow.sensors.external_task_sensor`
+ * - :mod:`airflow.sensors.external_task`
- :doc:`How to use <howto/operator/external_task_sensor>`
* - :mod:`airflow.sensors.filesystem`
@@ -103,7 +103,7 @@ Airflow has many more integrations available for separate installation as a prov
* - :mod:`airflow.sensors.python`
-
- * - :mod:`airflow.sensors.sql_sensor`
+ * - :mod:`airflow.sensors.sql`
-
* - :mod:`airflow.sensors.time_delta`
diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py
index 0463e0c..132986a 100644
--- a/tests/deprecated_classes.py
+++ b/tests/deprecated_classes.py
@@ -1456,7 +1456,7 @@ SENSORS = [
'airflow.sensors.hdfs_sensor.HdfsSensor',
),
(
- 'airflow.sensors.weekday_sensor.DayOfWeekSensor',
+ 'airflow.sensors.weekday.DayOfWeekSensor',
'airflow.contrib.sensors.weekday_sensor.DayOfWeekSensor',
),
(
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 9542507..afa30e2 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -26,7 +26,7 @@ from airflow.models import DagBag, TaskInstance
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
-from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor
+from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
from airflow.sensors.time_sensor import TimeSensor
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils.state import State
diff --git a/tests/sensors/test_smart_sensor_operator.py b/tests/sensors/test_smart_sensor_operator.py
index cb8295e..5b798cf 100644
--- a/tests/sensors/test_smart_sensor_operator.py
+++ b/tests/sensors/test_smart_sensor_operator.py
@@ -30,7 +30,7 @@ from airflow.configuration import conf
from airflow.models import DagRun, SensorInstance, TaskInstance
from airflow.operators.dummy import DummyOperator
from airflow.sensors.base import BaseSensorOperator
-from airflow.sensors.smart_sensor_operator import SmartSensorOperator
+from airflow.sensors.smart_sensor import SmartSensorOperator
from airflow.utils import timezone
from airflow.utils.state import State
diff --git a/tests/sensors/test_sql_sensor.py b/tests/sensors/test_sql_sensor.py
index 1dce2ed..88a1856 100644
--- a/tests/sensors/test_sql_sensor.py
+++ b/tests/sensors/test_sql_sensor.py
@@ -23,7 +23,7 @@ import pytest
from airflow.exceptions import AirflowException
from airflow.models.dag import DAG
-from airflow.sensors.sql_sensor import SqlSensor
+from airflow.sensors.sql import SqlSensor
from airflow.utils.timezone import datetime
from tests.providers.apache.hive import TestHiveEnvironment
@@ -86,7 +86,7 @@ class TestSqlSensor(TestHiveEnvironment):
)
op2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
- @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+ @mock.patch('airflow.sensors.sql.BaseHook')
def test_sql_sensor_postgres_poke(self, mock_hook):
op = SqlSensor(
task_id='sql_sensor_check',
@@ -118,7 +118,7 @@ class TestSqlSensor(TestHiveEnvironment):
mock_get_records.return_value = [['1']]
self.assertTrue(op.poke(None))
- @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+ @mock.patch('airflow.sensors.sql.BaseHook')
def test_sql_sensor_postgres_poke_fail_on_empty(self, mock_hook):
op = SqlSensor(
task_id='sql_sensor_check', conn_id='postgres_default', sql="SELECT 1", fail_on_empty=True
@@ -130,7 +130,7 @@ class TestSqlSensor(TestHiveEnvironment):
mock_get_records.return_value = []
self.assertRaises(AirflowException, op.poke, None)
- @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+ @mock.patch('airflow.sensors.sql.BaseHook')
def test_sql_sensor_postgres_poke_success(self, mock_hook):
op = SqlSensor(
task_id='sql_sensor_check', conn_id='postgres_default', sql="SELECT 1", success=lambda x: x in [1]
@@ -148,7 +148,7 @@ class TestSqlSensor(TestHiveEnvironment):
mock_get_records.return_value = [['1']]
self.assertFalse(op.poke(None))
- @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+ @mock.patch('airflow.sensors.sql.BaseHook')
def test_sql_sensor_postgres_poke_failure(self, mock_hook):
op = SqlSensor(
task_id='sql_sensor_check', conn_id='postgres_default', sql="SELECT 1", failure=lambda x: x in [1]
@@ -163,7 +163,7 @@ class TestSqlSensor(TestHiveEnvironment):
mock_get_records.return_value = [[1]]
self.assertRaises(AirflowException, op.poke, None)
- @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+ @mock.patch('airflow.sensors.sql.BaseHook')
def test_sql_sensor_postgres_poke_failure_success(self, mock_hook):
op = SqlSensor(
task_id='sql_sensor_check',
@@ -185,7 +185,7 @@ class TestSqlSensor(TestHiveEnvironment):
mock_get_records.return_value = [[2]]
self.assertTrue(op.poke(None))
- @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+ @mock.patch('airflow.sensors.sql.BaseHook')
def test_sql_sensor_postgres_poke_failure_success_same(self, mock_hook):
op = SqlSensor(
task_id='sql_sensor_check',
@@ -204,7 +204,7 @@ class TestSqlSensor(TestHiveEnvironment):
mock_get_records.return_value = [[1]]
self.assertRaises(AirflowException, op.poke, None)
- @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+ @mock.patch('airflow.sensors.sql.BaseHook')
def test_sql_sensor_postgres_poke_invalid_failure(self, mock_hook):
op = SqlSensor(
task_id='sql_sensor_check',
@@ -221,7 +221,7 @@ class TestSqlSensor(TestHiveEnvironment):
op.poke(None)
self.assertEqual("self.failure is present, but not callable -> [1]", str(e.exception))
- @mock.patch('airflow.sensors.sql_sensor.BaseHook')
+ @mock.patch('airflow.sensors.sql.BaseHook')
def test_sql_sensor_postgres_poke_invalid_success(self, mock_hook):
op = SqlSensor(
task_id='sql_sensor_check',
diff --git a/tests/sensors/test_weekday_sensor.py b/tests/sensors/test_weekday_sensor.py
index 26b65a0..f632e82 100644
--- a/tests/sensors/test_weekday_sensor.py
+++ b/tests/sensors/test_weekday_sensor.py
@@ -24,7 +24,7 @@ from parameterized import parameterized
from airflow.exceptions import AirflowSensorTimeout
from airflow.models import DagBag
from airflow.models.dag import DAG
-from airflow.sensors.weekday_sensor import DayOfWeekSensor
+from airflow.sensors.weekday import DayOfWeekSensor
from airflow.utils.timezone import datetime
from airflow.utils.weekday import WeekDay
from tests.test_utils import db