You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/20 10:27:35 UTC

[airflow] branch v1-10-test updated (0ac4185 -> 18d0918)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    omit 0ac4185  Merging multiple sql operators (#9124)
     new 18d0918  Merging multiple sql operators (#9124)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (0ac4185)
            \
             N -- N -- N   refs/heads/v1-10-test (18d0918)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/operators/sql.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)


[airflow] 01/01: Merging multiple sql operators (#9124)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 18d09181b53c8c1b50e00bf627569ad81194888e
Author: samuelkhtu <46...@users.noreply.github.com>
AuthorDate: Wed Jun 17 14:32:46 2020 -0400

    Merging multiple sql operators (#9124)
    
    * Merge various SQL Operators into sql.py
    
    * Fix unit test code format
    
    * Merge multiple SQL operators into one
    
    1. Merge check_operator.py into airflow.operators.sql
    2. Merge sql_branch_operator.py into airflow.operators.sql
    3. Merge unit test for both into test_sql.py
    
    * Rename test_core_to_contrib Interval/ValueCheckOperator to SQLInterval/ValueCheckOperator
    
    * Fixed deprecated class and added check to test_core_to_contrib
    
    (cherry picked from commit 0b9bf4a285a074bbde270839a90fb53c257340be)
---
 ...eea_add_precision_to_execution_date_in_mysql.py |   2 +-
 airflow/operators/check_operator.py                | 425 +++------------------
 airflow/operators/{check_operator.py => sql.py}    | 419 +++++++++++++++-----
 airflow/operators/sql_branch_operator.py           | 162 +-------
 docs/operators-and-hooks-ref.rst                   |  30 +-
 tests/api/common/experimental/test_pool.py         |   4 +-
 tests/contrib/hooks/test_gcp_api_base_hook.py      |   2 +-
 tests/contrib/hooks/test_gcp_cloud_build_hook.py   |   4 +-
 tests/contrib/hooks/test_gcp_transfer_hook.py      |   4 +-
 .../operators/test_gcp_cloud_build_operator.py     |   8 +-
 tests/contrib/operators/test_gcs_to_gdrive.py      |   2 +-
 tests/contrib/operators/test_sftp_operator.py      |   6 +-
 tests/contrib/operators/test_ssh_operator.py       |   6 +-
 tests/contrib/secrets/test_hashicorp_vault.py      |   4 +-
 .../contrib/utils/test_gcp_credentials_provider.py |   2 +-
 tests/operators/test_check_operator.py             | 327 ----------------
 .../{test_sql_branch_operator.py => test_sql.py}   | 339 ++++++++++++++--
 tests/www_rbac/test_validators.py                  |   4 +-
 18 files changed, 706 insertions(+), 1044 deletions(-)

diff --git a/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
index ecb589d..59098a8 100644
--- a/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
+++ b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
@@ -29,7 +29,7 @@ from sqlalchemy.dialects import mysql
 
 # revision identifiers, used by Alembic.
 revision = 'a66efa278eea'
-down_revision = '8f966b9c467a'
+down_revision = '952da73b5eff'
 branch_labels = None
 depends_on = None
 
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index b6d3a18..12ac472 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -17,409 +17,70 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from builtins import str, zip
-from typing import Optional, Any, Iterable, Dict, SupportsAbs
+"""This module is deprecated. Please use `airflow.operators.sql`."""
 
-from airflow.exceptions import AirflowException
-from airflow.hooks.base_hook import BaseHook
-from airflow.models import BaseOperator
-from airflow.utils.decorators import apply_defaults
+import warnings
 
+from airflow.operators.sql import (
+    SQLCheckOperator, SQLIntervalCheckOperator, SQLThresholdCheckOperator, SQLValueCheckOperator,
+)
 
-class CheckOperator(BaseOperator):
-    """
-    Performs checks against a db. The ``CheckOperator`` expects
-    a sql query that will return a single row. Each value on that
-    first row is evaluated using python ``bool`` casting. If any of the
-    values return ``False`` the check is failed and errors out.
-
-    Note that Python bool casting evals the following as ``False``:
-
-    * ``False``
-    * ``0``
-    * Empty string (``""``)
-    * Empty list (``[]``)
-    * Empty dictionary or set (``{}``)
-
-    Given a query like ``SELECT COUNT(*) FROM foo``, it will fail only if
-    the count ``== 0``. You can craft much more complex query that could,
-    for instance, check that the table has the same number of rows as
-    the source table upstream, or that the count of today's partition is
-    greater than yesterday's partition, or that a set of metrics are less
-    than 3 standard deviation for the 7 day average.
-
-    This operator can be used as a data quality check in your pipeline, and
-    depending on where you put it in your DAG, you have the choice to
-    stop the critical path, preventing from
-    publishing dubious data, or on the side and receive email alerts
-    without stopping the progress of the DAG.
 
-    Note that this is an abstract class and get_db_hook
-    needs to be defined. Whereas a get_db_hook is hook that gets a
-    single record from an external source.
-
-    :param sql: the sql to be executed. (templated)
-    :type sql: str
+class CheckOperator(SQLCheckOperator):
     """
-
-    template_fields = ('sql',)  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
-
-    @apply_defaults
-    def __init__(
-        self,
-        sql,  # type: str
-        conn_id=None,  # type: Optional[str]
-        *args,
-        **kwargs
-    ):
-        super(CheckOperator, self).__init__(*args, **kwargs)
-        self.conn_id = conn_id
-        self.sql = sql
-
-    def execute(self, context=None):
-        self.log.info('Executing SQL check: %s', self.sql)
-        records = self.get_db_hook().get_first(self.sql)
-
-        self.log.info('Record: %s', records)
-        if not records:
-            raise AirflowException("The query returned None")
-        elif not all([bool(r) for r in records]):
-            raise AirflowException("Test failed.\nQuery:\n{query}\nResults:\n{records!s}".format(
-                query=self.sql, records=records))
-
-        self.log.info("Success.")
-
-    def get_db_hook(self):
-        return BaseHook.get_hook(conn_id=self.conn_id)
-
-
-def _convert_to_float_if_possible(s):
+    This class is deprecated.
+    Please use `airflow.operators.sql.SQLCheckOperator`.
     """
-    A small helper function to convert a string to a numeric value
-    if appropriate
 
-    :param s: the string to be converted
-    :type s: str
-    """
-    try:
-        ret = float(s)
-    except (ValueError, TypeError):
-        ret = s
-    return ret
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.operators.sql.SQLCheckOperator`.""",
+            DeprecationWarning, stacklevel=2
+        )
+        super(CheckOperator, self).__init__(*args, **kwargs)
 
 
-class ValueCheckOperator(BaseOperator):
+class IntervalCheckOperator(SQLIntervalCheckOperator):
     """
-    Performs a simple value check using sql code.
-
-    Note that this is an abstract class and get_db_hook
-    needs to be defined. Whereas a get_db_hook is hook that gets a
-    single record from an external source.
-
-    :param sql: the sql to be executed. (templated)
-    :type sql: str
+    This class is deprecated.
+    Please use `airflow.operators.sql.SQLIntervalCheckOperator`.
     """
 
-    __mapper_args__ = {
-        'polymorphic_identity': 'ValueCheckOperator'
-    }
-    template_fields = ('sql', 'pass_value',)  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
-
-    @apply_defaults
-    def __init__(
-        self,
-        sql,  # type: str
-        pass_value,  # type: Any
-        tolerance=None,  # type: Any
-        conn_id=None,  # type: Optional[str]
-        *args,
-        **kwargs
-    ):
-        super(ValueCheckOperator, self).__init__(*args, **kwargs)
-        self.sql = sql
-        self.conn_id = conn_id
-        self.pass_value = str(pass_value)
-        tol = _convert_to_float_if_possible(tolerance)
-        self.tol = tol if isinstance(tol, float) else None
-        self.has_tolerance = self.tol is not None
-
-    def execute(self, context=None):
-        self.log.info('Executing SQL check: %s', self.sql)
-        records = self.get_db_hook().get_first(self.sql)
-
-        if not records:
-            raise AirflowException("The query returned None")
-
-        pass_value_conv = _convert_to_float_if_possible(self.pass_value)
-        is_numeric_value_check = isinstance(pass_value_conv, float)
-
-        tolerance_pct_str = str(self.tol * 100) + '%' if self.has_tolerance else None
-        error_msg = ("Test failed.\nPass value:{pass_value_conv}\n"
-                     "Tolerance:{tolerance_pct_str}\n"
-                     "Query:\n{sql}\nResults:\n{records!s}").format(
-            pass_value_conv=pass_value_conv,
-            tolerance_pct_str=tolerance_pct_str,
-            sql=self.sql,
-            records=records
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.operators.sql.SQLIntervalCheckOperator`.""",
+            DeprecationWarning, stacklevel=2
         )
-
-        if not is_numeric_value_check:
-            tests = self._get_string_matches(records, pass_value_conv)
-        elif is_numeric_value_check:
-            try:
-                numeric_records = self._to_float(records)
-            except (ValueError, TypeError):
-                raise AirflowException("Converting a result to float failed.\n{}".format(error_msg))
-            tests = self._get_numeric_matches(numeric_records, pass_value_conv)
-        else:
-            tests = []
-
-        if not all(tests):
-            raise AirflowException(error_msg)
-
-    def _to_float(self, records):
-        return [float(record) for record in records]
-
-    def _get_string_matches(self, records, pass_value_conv):
-        return [str(record) == pass_value_conv for record in records]
-
-    def _get_numeric_matches(self, numeric_records, numeric_pass_value_conv):
-        if self.has_tolerance:
-            return [
-                numeric_pass_value_conv * (1 - self.tol) <= record <= numeric_pass_value_conv * (1 + self.tol)
-                for record in numeric_records
-            ]
-
-        return [record == numeric_pass_value_conv for record in numeric_records]
-
-    def get_db_hook(self):
-        return BaseHook.get_hook(conn_id=self.conn_id)
+        super(IntervalCheckOperator, self).__init__(*args, **kwargs)
 
 
-class IntervalCheckOperator(BaseOperator):
+class ThresholdCheckOperator(SQLThresholdCheckOperator):
     """
-    Checks that the values of metrics given as SQL expressions are within
-    a certain tolerance of the ones from days_back before.
-
-    Note that this is an abstract class and get_db_hook
-    needs to be defined. Whereas a get_db_hook is hook that gets a
-    single record from an external source.
-
-    :param table: the table name
-    :type table: str
-    :param days_back: number of days between ds and the ds we want to check
-        against. Defaults to 7 days
-    :type days_back: int
-    :param ratio_formula: which formula to use to compute the ratio between
-        the two metrics. Assuming cur is the metric of today and ref is
-        the metric to today - days_back.
-
-        max_over_min: computes max(cur, ref) / min(cur, ref)
-        relative_diff: computes abs(cur-ref) / ref
-
-        Default: 'max_over_min'
-    :type ratio_formula: str
-    :param ignore_zero: whether we should ignore zero metrics
-    :type ignore_zero: bool
-    :param metrics_threshold: a dictionary of ratios indexed by metrics
-    :type metrics_threshold: dict
+    This class is deprecated.
+    Please use `airflow.operators.sql.SQLThresholdCheckOperator`.
     """
 
-    __mapper_args__ = {
-        'polymorphic_identity': 'IntervalCheckOperator'
-    }
-    template_fields = ('sql1', 'sql2')  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
-
-    ratio_formulas = {
-        'max_over_min': lambda cur, ref: float(max(cur, ref)) / min(cur, ref),
-        'relative_diff': lambda cur, ref: float(abs(cur - ref)) / ref,
-    }
-
-    @apply_defaults
-    def __init__(
-        self,
-        table,  # type: str
-        metrics_thresholds,  # type: Dict[str, int]
-        date_filter_column='ds',  # type: Optional[str]
-        days_back=-7,  # type: SupportsAbs[int]
-        ratio_formula='max_over_min',  # type: Optional[str]
-        ignore_zero=True,  # type: Optional[bool]
-        conn_id=None,  # type: Optional[str]
-        *args, **kwargs
-    ):
-        super(IntervalCheckOperator, self).__init__(*args, **kwargs)
-        if ratio_formula not in self.ratio_formulas:
-            msg_template = "Invalid diff_method: {diff_method}. " \
-                           "Supported diff methods are: {diff_methods}"
-
-            raise AirflowException(
-                msg_template.format(diff_method=ratio_formula,
-                                    diff_methods=self.ratio_formulas)
-            )
-        self.ratio_formula = ratio_formula
-        self.ignore_zero = ignore_zero
-        self.table = table
-        self.metrics_thresholds = metrics_thresholds
-        self.metrics_sorted = sorted(metrics_thresholds.keys())
-        self.date_filter_column = date_filter_column
-        self.days_back = -abs(days_back)
-        self.conn_id = conn_id
-        sqlexp = ', '.join(self.metrics_sorted)
-        sqlt = "SELECT {sqlexp} FROM {table} WHERE {date_filter_column}=".format(
-            sqlexp=sqlexp, table=table, date_filter_column=date_filter_column
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.operators.sql.SQLThresholdCheckOperator`.""",
+            DeprecationWarning, stacklevel=2
         )
-
-        self.sql1 = sqlt + "'{{ ds }}'"
-        self.sql2 = sqlt + "'{{ macros.ds_add(ds, " + str(self.days_back) + ") }}'"
-
-    def execute(self, context=None):
-        hook = self.get_db_hook()
-        self.log.info('Using ratio formula: %s', self.ratio_formula)
-        self.log.info('Executing SQL check: %s', self.sql2)
-        row2 = hook.get_first(self.sql2)
-        self.log.info('Executing SQL check: %s', self.sql1)
-        row1 = hook.get_first(self.sql1)
-
-        if not row2:
-            raise AirflowException("The query {} returned None".format(self.sql2))
-        if not row1:
-            raise AirflowException("The query {} returned None".format(self.sql1))
-
-        current = dict(zip(self.metrics_sorted, row1))
-        reference = dict(zip(self.metrics_sorted, row2))
-
-        ratios = {}
-        test_results = {}
-
-        for m in self.metrics_sorted:
-            cur = current[m]
-            ref = reference[m]
-            threshold = self.metrics_thresholds[m]
-            if cur == 0 or ref == 0:
-                ratios[m] = None
-                test_results[m] = self.ignore_zero
-            else:
-                ratios[m] = self.ratio_formulas[self.ratio_formula](current[m], reference[m])
-                test_results[m] = ratios[m] < threshold
-
-            self.log.info(
-                (
-                    "Current metric for %s: %s\n"
-                    "Past metric for %s: %s\n"
-                    "Ratio for %s: %s\n"
-                    "Threshold: %s\n"
-                ), m, cur, m, ref, m, ratios[m], threshold)
-
-        if not all(test_results.values()):
-            failed_tests = [it[0] for it in test_results.items() if not it[1]]
-            j = len(failed_tests)
-            n = len(self.metrics_sorted)
-            self.log.warning("The following %s tests out of %s failed:", j, n)
-            for k in failed_tests:
-                self.log.warning(
-                    "'%s' check failed. %s is above %s", k, ratios[k], self.metrics_thresholds[k]
-                )
-            raise AirflowException("The following tests have failed:\n {0}".format(", ".join(
-                sorted(failed_tests))))
-
-        self.log.info("All tests have passed")
-
-    def get_db_hook(self):
-        return BaseHook.get_hook(conn_id=self.conn_id)
+        super(ThresholdCheckOperator, self).__init__(*args, **kwargs)
 
 
-class ThresholdCheckOperator(BaseOperator):
+class ValueCheckOperator(SQLValueCheckOperator):
     """
-    Performs a value check using sql code against a mininmum threshold
-    and a maximum threshold. Thresholds can be in the form of a numeric
-    value OR a sql statement that results a numeric.
-
-    Note that this is an abstract class and get_db_hook
-    needs to be defined. Whereas a get_db_hook is hook that gets a
-    single record from an external source.
-
-    :param sql: the sql to be executed. (templated)
-    :type sql: str
-    :param min_threshold: numerical value or min threshold sql to be executed (templated)
-    :type min_threshold: numeric or str
-    :param max_threshold: numerical value or max threshold sql to be executed (templated)
-    :type max_threshold: numeric or str
+    This class is deprecated.
+    Please use `airflow.operators.sql.SQLValueCheckOperator`.
     """
 
-    template_fields = ('sql', 'min_threshold', 'max_threshold')  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-
-    @apply_defaults
-    def __init__(
-        self,
-        sql,   # type: str
-        min_threshold,   # type: Any
-        max_threshold,   # type: Any
-        conn_id=None,   # type: Optional[str]
-        *args, **kwargs
-    ):
-        super(ThresholdCheckOperator, self).__init__(*args, **kwargs)
-        self.sql = sql
-        self.conn_id = conn_id
-        self.min_threshold = _convert_to_float_if_possible(min_threshold)
-        self.max_threshold = _convert_to_float_if_possible(max_threshold)
-
-    def execute(self, context=None):
-        hook = self.get_db_hook()
-        result = hook.get_first(self.sql)[0][0]
-
-        if isinstance(self.min_threshold, float):
-            lower_bound = self.min_threshold
-        else:
-            lower_bound = hook.get_first(self.min_threshold)[0][0]
-
-        if isinstance(self.max_threshold, float):
-            upper_bound = self.max_threshold
-        else:
-            upper_bound = hook.get_first(self.max_threshold)[0][0]
-
-        meta_data = {
-            "result": result,
-            "task_id": self.task_id,
-            "min_threshold": lower_bound,
-            "max_threshold": upper_bound,
-            "within_threshold": lower_bound <= result <= upper_bound
-        }
-
-        self.push(meta_data)
-        if not meta_data["within_threshold"]:
-            error_msg = (
-                'Threshold Check: "{task_id}" failed.\n'
-                'DAG: {dag_id}\nTask_id: {task_id}\n'
-                'Check description: {description}\n'
-                'SQL: {sql}\n'
-                'Result: {result} is not within thresholds '
-                '{min_threshold} and {max_threshold}'
-            ).format(
-                task_id=self.task_id, dag_id=self.dag_id,
-                description=meta_data.get("description"), sql=self.sql,
-                result=round(meta_data.get("result"), 2),
-                min_threshold=meta_data.get("min_threshold"),
-                max_threshold=meta_data.get("max_threshold")
-            )
-            raise AirflowException(error_msg)
-
-        self.log.info("Test %s Successful.", self.task_id)
-
-    def push(self, meta_data):
-        """
-        Optional: Send data check info and metadata to an external database.
-        Default functionality will log metadata.
-        """
-
-        info = "\n".join(["""{}: {}""".format(key, item) for key, item in meta_data.items()])
-        self.log.info("Log from %s:\n%s", self.dag_id, info)
-
-    def get_db_hook(self):
-        return BaseHook.get_hook(conn_id=self.conn_id)
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.operators.sql.SQLValueCheckOperator`.""",
+            DeprecationWarning, stacklevel=2
+        )
+        super(ValueCheckOperator, self).__init__(*args, **kwargs)
diff --git a/airflow/operators/check_operator.py b/airflow/operators/sql.py
similarity index 50%
copy from airflow/operators/check_operator.py
copy to airflow/operators/sql.py
index b6d3a18..3e53fbf 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/sql.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -16,19 +15,31 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from builtins import str, zip
-from typing import Optional, Any, Iterable, Dict, SupportsAbs
+from distutils.util import strtobool
+from typing import Iterable
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.models import BaseOperator
+from airflow.models import BaseOperator, SkipMixin
 from airflow.utils.decorators import apply_defaults
 
-
-class CheckOperator(BaseOperator):
+ALLOWED_CONN_TYPE = {
+    "google_cloud_platform",
+    "jdbc",
+    "mssql",
+    "mysql",
+    "odbc",
+    "oracle",
+    "postgres",
+    "presto",
+    "sqlite",
+    "vertica",
+}
+
+
+class SQLCheckOperator(BaseOperator):
     """
-    Performs checks against a db. The ``CheckOperator`` expects
+    Performs checks against a db. The ``SQLCheckOperator`` expects
     a sql query that will return a single row. Each value on that
     first row is evaluated using python ``bool`` casting. If any of the
     values return ``False`` the check is failed and errors out.
@@ -62,36 +73,44 @@ class CheckOperator(BaseOperator):
     :type sql: str
     """
 
-    template_fields = ('sql',)  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
+    template_fields = ("sql",)  # type: Iterable[str]
+    template_ext = (
+        ".hql",
+        ".sql",
+    )  # type: Iterable[str]
+    ui_color = "#fff7e6"
 
     @apply_defaults
     def __init__(
-        self,
-        sql,  # type: str
-        conn_id=None,  # type: Optional[str]
-        *args,
-        **kwargs
+        self, sql, conn_id=None, *args, **kwargs
     ):
-        super(CheckOperator, self).__init__(*args, **kwargs)
+        super(SQLCheckOperator, self).__init__(*args, **kwargs)
         self.conn_id = conn_id
         self.sql = sql
 
     def execute(self, context=None):
-        self.log.info('Executing SQL check: %s', self.sql)
+        self.log.info("Executing SQL check: %s", self.sql)
         records = self.get_db_hook().get_first(self.sql)
 
-        self.log.info('Record: %s', records)
+        self.log.info("Record: %s", records)
         if not records:
             raise AirflowException("The query returned None")
         elif not all([bool(r) for r in records]):
-            raise AirflowException("Test failed.\nQuery:\n{query}\nResults:\n{records!s}".format(
-                query=self.sql, records=records))
+            raise AirflowException(
+                "Test failed.\nQuery:\n{query}\nResults:\n{records!s}".format(
+                    query=self.sql, records=records
+                )
+            )
 
         self.log.info("Success.")
 
     def get_db_hook(self):
+        """
+        Get the database hook for the connection.
+
+        :return: the database hook object.
+        :rtype: DbApiHook
+        """
         return BaseHook.get_hook(conn_id=self.conn_id)
 
 
@@ -110,7 +129,7 @@ def _convert_to_float_if_possible(s):
     return ret
 
 
-class ValueCheckOperator(BaseOperator):
+class SQLValueCheckOperator(BaseOperator):
     """
     Performs a simple value check using sql code.
 
@@ -122,24 +141,27 @@ class ValueCheckOperator(BaseOperator):
     :type sql: str
     """
 
-    __mapper_args__ = {
-        'polymorphic_identity': 'ValueCheckOperator'
-    }
-    template_fields = ('sql', 'pass_value',)  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
+    __mapper_args__ = {"polymorphic_identity": "SQLValueCheckOperator"}
+    template_fields = (
+        "sql",
+        "pass_value",
+    )  # type: Iterable[str]
+    template_ext = (
+        ".hql",
+        ".sql",
+    )  # type: Iterable[str]
+    ui_color = "#fff7e6"
 
     @apply_defaults
     def __init__(
-        self,
-        sql,  # type: str
-        pass_value,  # type: Any
-        tolerance=None,  # type: Any
-        conn_id=None,  # type: Optional[str]
-        *args,
-        **kwargs
-    ):
-        super(ValueCheckOperator, self).__init__(*args, **kwargs)
+            self,
+            sql,
+            pass_value,
+            tolerance=None,
+            conn_id=None,
+            *args,
+            **kwargs):
+        super(SQLValueCheckOperator, self).__init__(*args, **kwargs)
         self.sql = sql
         self.conn_id = conn_id
         self.pass_value = str(pass_value)
@@ -148,7 +170,7 @@ class ValueCheckOperator(BaseOperator):
         self.has_tolerance = self.tol is not None
 
     def execute(self, context=None):
-        self.log.info('Executing SQL check: %s', self.sql)
+        self.log.info("Executing SQL check: %s", self.sql)
         records = self.get_db_hook().get_first(self.sql)
 
         if not records:
@@ -157,14 +179,16 @@ class ValueCheckOperator(BaseOperator):
         pass_value_conv = _convert_to_float_if_possible(self.pass_value)
         is_numeric_value_check = isinstance(pass_value_conv, float)
 
-        tolerance_pct_str = str(self.tol * 100) + '%' if self.has_tolerance else None
-        error_msg = ("Test failed.\nPass value:{pass_value_conv}\n"
-                     "Tolerance:{tolerance_pct_str}\n"
-                     "Query:\n{sql}\nResults:\n{records!s}").format(
+        tolerance_pct_str = str(self.tol * 100) + "%" if self.has_tolerance else None
+        error_msg = (
+            "Test failed.\nPass value:{pass_value_conv}\n"
+            "Tolerance:{tolerance_pct_str}\n"
+            "Query:\n{sql}\nResults:\n{records!s}"
+        ).format(
             pass_value_conv=pass_value_conv,
             tolerance_pct_str=tolerance_pct_str,
             sql=self.sql,
-            records=records
+            records=records,
         )
 
         if not is_numeric_value_check:
@@ -173,7 +197,9 @@ class ValueCheckOperator(BaseOperator):
             try:
                 numeric_records = self._to_float(records)
             except (ValueError, TypeError):
-                raise AirflowException("Converting a result to float failed.\n{}".format(error_msg))
+                raise AirflowException(
+                    "Converting a result to float failed.\n{}".format(error_msg)
+                )
             tests = self._get_numeric_matches(numeric_records, pass_value_conv)
         else:
             tests = []
@@ -197,10 +223,16 @@ class ValueCheckOperator(BaseOperator):
         return [record == numeric_pass_value_conv for record in numeric_records]
 
     def get_db_hook(self):
+        """
+        Get the database hook for the connection.
+
+        :return: the database hook object.
+        :rtype: DbApiHook
+        """
         return BaseHook.get_hook(conn_id=self.conn_id)
 
 
-class IntervalCheckOperator(BaseOperator):
+class SQLIntervalCheckOperator(BaseOperator):
     """
     Checks that the values of metrics given as SQL expressions are within
     a certain tolerance of the ones from days_back before.
@@ -229,38 +261,43 @@ class IntervalCheckOperator(BaseOperator):
     :type metrics_threshold: dict
     """
 
-    __mapper_args__ = {
-        'polymorphic_identity': 'IntervalCheckOperator'
-    }
-    template_fields = ('sql1', 'sql2')  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
+    __mapper_args__ = {"polymorphic_identity": "SQLIntervalCheckOperator"}
+    template_fields = ("sql1", "sql2")  # type: Iterable[str]
+    template_ext = (
+        ".hql",
+        ".sql",
+    )  # type: Iterable[str]
+    ui_color = "#fff7e6"
 
     ratio_formulas = {
-        'max_over_min': lambda cur, ref: float(max(cur, ref)) / min(cur, ref),
-        'relative_diff': lambda cur, ref: float(abs(cur - ref)) / ref,
+        "max_over_min": lambda cur, ref: float(max(cur, ref)) / min(cur, ref),
+        "relative_diff": lambda cur, ref: float(abs(cur - ref)) / ref,
     }
 
     @apply_defaults
     def __init__(
         self,
-        table,  # type: str
-        metrics_thresholds,  # type: Dict[str, int]
-        date_filter_column='ds',  # type: Optional[str]
-        days_back=-7,  # type: SupportsAbs[int]
-        ratio_formula='max_over_min',  # type: Optional[str]
-        ignore_zero=True,  # type: Optional[bool]
-        conn_id=None,  # type: Optional[str]
-        *args, **kwargs
+        table,
+        metrics_thresholds,
+        date_filter_column="ds",
+        days_back=-7,
+        ratio_formula="max_over_min",
+        ignore_zero=True,
+        conn_id=None,
+        *args,
+        **kwargs
     ):
-        super(IntervalCheckOperator, self).__init__(*args, **kwargs)
+        super(SQLIntervalCheckOperator, self).__init__(*args, **kwargs)
         if ratio_formula not in self.ratio_formulas:
-            msg_template = "Invalid diff_method: {diff_method}. " \
-                           "Supported diff methods are: {diff_methods}"
+            msg_template = (
+                "Invalid diff_method: {diff_method}. "
+                "Supported diff methods are: {diff_methods}"
+            )
 
             raise AirflowException(
-                msg_template.format(diff_method=ratio_formula,
-                                    diff_methods=self.ratio_formulas)
+                msg_template.format(
+                    diff_method=ratio_formula, diff_methods=self.ratio_formulas
+                )
             )
         self.ratio_formula = ratio_formula
         self.ignore_zero = ignore_zero
@@ -270,7 +307,7 @@ class IntervalCheckOperator(BaseOperator):
         self.date_filter_column = date_filter_column
         self.days_back = -abs(days_back)
         self.conn_id = conn_id
-        sqlexp = ', '.join(self.metrics_sorted)
+        sqlexp = ", ".join(self.metrics_sorted)
         sqlt = "SELECT {sqlexp} FROM {table} WHERE {date_filter_column}=".format(
             sqlexp=sqlexp, table=table, date_filter_column=date_filter_column
         )
@@ -280,10 +317,10 @@ class IntervalCheckOperator(BaseOperator):
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-        self.log.info('Using ratio formula: %s', self.ratio_formula)
-        self.log.info('Executing SQL check: %s', self.sql2)
+        self.log.info("Using ratio formula: %s", self.ratio_formula)
+        self.log.info("Executing SQL check: %s", self.sql2)
         row2 = hook.get_first(self.sql2)
-        self.log.info('Executing SQL check: %s', self.sql1)
+        self.log.info("Executing SQL check: %s", self.sql1)
         row1 = hook.get_first(self.sql1)
 
         if not row2:
@@ -297,16 +334,18 @@ class IntervalCheckOperator(BaseOperator):
         ratios = {}
         test_results = {}
 
-        for m in self.metrics_sorted:
-            cur = current[m]
-            ref = reference[m]
-            threshold = self.metrics_thresholds[m]
+        for metric in self.metrics_sorted:
+            cur = current[metric]
+            ref = reference[metric]
+            threshold = self.metrics_thresholds[metric]
             if cur == 0 or ref == 0:
-                ratios[m] = None
-                test_results[m] = self.ignore_zero
+                ratios[metric] = None
+                test_results[metric] = self.ignore_zero
             else:
-                ratios[m] = self.ratio_formulas[self.ratio_formula](current[m], reference[m])
-                test_results[m] = ratios[m] < threshold
+                ratios[metric] = self.ratio_formulas[self.ratio_formula](
+                    current[metric], reference[metric]
+                )
+                test_results[metric] = ratios[metric] < threshold
 
             self.log.info(
                 (
@@ -314,27 +353,49 @@ class IntervalCheckOperator(BaseOperator):
                     "Past metric for %s: %s\n"
                     "Ratio for %s: %s\n"
                     "Threshold: %s\n"
-                ), m, cur, m, ref, m, ratios[m], threshold)
+                ),
+                metric,
+                cur,
+                metric,
+                ref,
+                metric,
+                ratios[metric],
+                threshold,
+            )
 
         if not all(test_results.values()):
             failed_tests = [it[0] for it in test_results.items() if not it[1]]
-            j = len(failed_tests)
-            n = len(self.metrics_sorted)
-            self.log.warning("The following %s tests out of %s failed:", j, n)
+            self.log.warning(
+                "The following %s tests out of %s failed:",
+                len(failed_tests),
+                len(self.metrics_sorted),
+            )
             for k in failed_tests:
                 self.log.warning(
-                    "'%s' check failed. %s is above %s", k, ratios[k], self.metrics_thresholds[k]
+                    "'%s' check failed. %s is above %s",
+                    k,
+                    ratios[k],
+                    self.metrics_thresholds[k],
+                )
+            raise AirflowException(
+                "The following tests have failed:\n {0}".format(
+                    ", ".join(sorted(failed_tests))
                 )
-            raise AirflowException("The following tests have failed:\n {0}".format(", ".join(
-                sorted(failed_tests))))
+            )
 
         self.log.info("All tests have passed")
 
     def get_db_hook(self):
+        """
+        Get the database hook for the connection.
+
+        :return: the database hook object.
+        :rtype: DbApiHook
+        """
         return BaseHook.get_hook(conn_id=self.conn_id)
 
 
-class ThresholdCheckOperator(BaseOperator):
+class SQLThresholdCheckOperator(BaseOperator):
     """
     Performs a value check using sql code against a mininmum threshold
     and a maximum threshold. Thresholds can be in the form of a numeric
@@ -352,19 +413,23 @@ class ThresholdCheckOperator(BaseOperator):
     :type max_threshold: numeric or str
     """
 
-    template_fields = ('sql', 'min_threshold', 'max_threshold')  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
+    template_fields = ("sql", "min_threshold", "max_threshold")  # type: Iterable[str]
+    template_ext = (
+        ".hql",
+        ".sql",
+    )  # type: Iterable[str]
 
     @apply_defaults
     def __init__(
         self,
-        sql,   # type: str
-        min_threshold,   # type: Any
-        max_threshold,   # type: Any
-        conn_id=None,   # type: Optional[str]
-        *args, **kwargs
+        sql,
+        min_threshold,
+        max_threshold,
+        conn_id=None,
+        *args,
+        **kwargs
     ):
-        super(ThresholdCheckOperator, self).__init__(*args, **kwargs)
+        super(SQLThresholdCheckOperator, self).__init__(*args, **kwargs)
         self.sql = sql
         self.conn_id = conn_id
         self.min_threshold = _convert_to_float_if_possible(min_threshold)
@@ -389,7 +454,7 @@ class ThresholdCheckOperator(BaseOperator):
             "task_id": self.task_id,
             "min_threshold": lower_bound,
             "max_threshold": upper_bound,
-            "within_threshold": lower_bound <= result <= upper_bound
+            "within_threshold": lower_bound <= result <= upper_bound,
         }
 
         self.push(meta_data)
@@ -398,16 +463,17 @@ class ThresholdCheckOperator(BaseOperator):
                 'Threshold Check: "{task_id}" failed.\n'
                 'DAG: {dag_id}\nTask_id: {task_id}\n'
                 'Check description: {description}\n'
-                'SQL: {sql}\n'
-                'Result: {result} is not within thresholds '
-                '{min_threshold} and {max_threshold}'
-            ).format(
-                task_id=self.task_id, dag_id=self.dag_id,
-                description=meta_data.get("description"), sql=self.sql,
-                result=round(meta_data.get("result"), 2),
-                min_threshold=meta_data.get("min_threshold"),
-                max_threshold=meta_data.get("max_threshold")
-            )
+                "SQL: {sql}\n"
+                'Result: {round} is not within thresholds '
+                '{min} and {max}'
+                .format(task_id=meta_data.get("task_id"),
+                        dag_id=self.dag_id,
+                        description=meta_data.get("description"),
+                        sql=self.sql,
+                        round=round(meta_data.get("result"), 2),
+                        min=meta_data.get("min_threshold"),
+                        max=meta_data.get("max_threshold"),
+                        ))
             raise AirflowException(error_msg)
 
         self.log.info("Test %s Successful.", self.task_id)
@@ -418,8 +484,149 @@ class ThresholdCheckOperator(BaseOperator):
         Default functionality will log metadata.
         """
 
-        info = "\n".join(["""{}: {}""".format(key, item) for key, item in meta_data.items()])
+        info = "\n".join(["{key}: {item}".format(key=key, item=item) for key, item in meta_data.items()])
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
     def get_db_hook(self):
+        """
+        Returns DB hook
+        """
         return BaseHook.get_hook(conn_id=self.conn_id)
+
+
+class BranchSQLOperator(BaseOperator, SkipMixin):
+    """
+    Executes sql code in a specific database
+
+    :param sql: the sql code to be executed. (templated)
+    :type sql: Can receive a str representing a sql statement or reference to a template file.
+               Template reference are recognized by str ending in '.sql'.
+               Expected SQL query to return Boolean (True/False), integer (0 = False, Otherwise = 1)
+               or string (true/y/yes/1/on/false/n/no/0/off).
+    :param follow_task_ids_if_true: task id or task ids to follow if query return true
+    :type follow_task_ids_if_true: str or list
+    :param follow_task_ids_if_false: task id or task ids to follow if query return true
+    :type follow_task_ids_if_false: str or list
+    :param conn_id: reference to a specific database
+    :type conn_id: str
+    :param database: name of database which overwrite defined one in connection
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :type parameters: mapping or iterable
+    """
+
+    template_fields = ("sql",)
+    template_ext = (".sql",)
+    ui_color = "#a22034"
+    ui_fgcolor = "#F7F7F7"
+
+    @apply_defaults
+    def __init__(
+        self,
+        sql,
+        follow_task_ids_if_true,
+        follow_task_ids_if_false,
+        conn_id="default_conn_id",
+        database=None,
+        parameters=None,
+        *args,
+        **kwargs
+    ):
+        super(BranchSQLOperator, self).__init__(*args, **kwargs)
+        self.conn_id = conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.follow_task_ids_if_true = follow_task_ids_if_true
+        self.follow_task_ids_if_false = follow_task_ids_if_false
+        self.database = database
+        self._hook = None
+
+    def _get_hook(self):
+        self.log.debug("Get connection for %s", self.conn_id)
+        conn = BaseHook.get_connection(self.conn_id)
+
+        if conn.conn_type not in ALLOWED_CONN_TYPE:
+            raise AirflowException(
+                "The connection type is not supported by BranchSQLOperator.\
+                Supported connection types: {}".format(list(ALLOWED_CONN_TYPE))
+            )
+
+        if not self._hook:
+            self._hook = conn.get_hook()
+            if self.database:
+                self._hook.schema = self.database
+
+        return self._hook
+
+    def execute(self, context):
+        # get supported hook
+        self._hook = self._get_hook()
+
+        if self._hook is None:
+            raise AirflowException(
+                "Failed to establish connection to '%s'" % self.conn_id
+            )
+
+        if self.sql is None:
+            raise AirflowException("Expected 'sql' parameter is missing.")
+
+        if self.follow_task_ids_if_true is None:
+            raise AirflowException(
+                "Expected 'follow_task_ids_if_true' paramter is missing."
+            )
+
+        if self.follow_task_ids_if_false is None:
+            raise AirflowException(
+                "Expected 'follow_task_ids_if_false' parameter is missing."
+            )
+
+        self.log.info(
+            "Executing: %s (with parameters %s) with connection: %s",
+            self.sql,
+            self.parameters,
+            self._hook,
+        )
+        record = self._hook.get_first(self.sql, self.parameters)
+        if not record:
+            raise AirflowException(
+                "No rows returned from sql query. Operator expected True or False return value."
+            )
+
+        if isinstance(record, list):
+            if isinstance(record[0], list):
+                query_result = record[0][0]
+            else:
+                query_result = record[0]
+        elif isinstance(record, tuple):
+            query_result = record[0]
+        else:
+            query_result = record
+
+        self.log.info("Query returns %s, type '%s'", query_result, type(query_result))
+
+        follow_branch = None
+        try:
+            if isinstance(query_result, bool):
+                if query_result:
+                    follow_branch = self.follow_task_ids_if_true
+            elif isinstance(query_result, str):
+                # return result is not Boolean, try to convert from String to Boolean
+                if bool(strtobool(query_result)):
+                    follow_branch = self.follow_task_ids_if_true
+            elif isinstance(query_result, int):
+                if bool(query_result):
+                    follow_branch = self.follow_task_ids_if_true
+            else:
+                raise AirflowException(
+                    "Unexpected query return result '%s' type '%s'"
+                    % (query_result, type(query_result))
+                )
+
+            if follow_branch is None:
+                follow_branch = self.follow_task_ids_if_false
+        except ValueError:
+            raise AirflowException(
+                "Unexpected query return result '%s' type '%s'"
+                % (query_result, type(query_result))
+            )
+
+        self.skip_all_except(context["ti"], follow_branch)
diff --git a/airflow/operators/sql_branch_operator.py b/airflow/operators/sql_branch_operator.py
index 072c40c..b911e34 100644
--- a/airflow/operators/sql_branch_operator.py
+++ b/airflow/operators/sql_branch_operator.py
@@ -14,160 +14,22 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""This module is deprecated. Please use `airflow.operators.sql`."""
+import warnings
 
-from distutils.util import strtobool
+from airflow.operators.sql import BranchSQLOperator
 
-from airflow.exceptions import AirflowException
-from airflow.hooks.base_hook import BaseHook
-from airflow.models import BaseOperator, SkipMixin
-from airflow.utils.decorators import apply_defaults
 
-ALLOWED_CONN_TYPE = {
-    "google_cloud_platform",
-    "jdbc",
-    "mssql",
-    "mysql",
-    "odbc",
-    "oracle",
-    "postgres",
-    "presto",
-    "sqlite",
-    "vertica",
-}
-
-
-class BranchSqlOperator(BaseOperator, SkipMixin):
+class BranchSqlOperator(BranchSQLOperator):
     """
-    Executes sql code in a specific database
-
-    :param sql: the sql code to be executed. (templated)
-    :type sql: Can receive a str representing a sql statement or reference to a template file.
-               Template reference are recognized by str ending in '.sql'.
-               Expected SQL query to return Boolean (True/False), integer (0 = False, Otherwise = 1)
-               or string (true/y/yes/1/on/false/n/no/0/off).
-    :param follow_task_ids_if_true: task id or task ids to follow if query return true
-    :type follow_task_ids_if_true: str or list
-    :param follow_task_ids_if_false: task id or task ids to follow if query return true
-    :type follow_task_ids_if_false: str or list
-    :param conn_id: reference to a specific database
-    :type conn_id: str
-    :param database: name of database which overwrite defined one in connection
-    :param parameters: (optional) the parameters to render the SQL query with.
-    :type parameters: mapping or iterable
+    This class is deprecated.
+    Please use `airflow.operators.sql.BranchSQLOperator`.
     """
 
-    template_fields = ("sql",)
-    template_ext = (".sql",)
-    ui_color = "#a22034"
-    ui_fgcolor = "#F7F7F7"
-
-    @apply_defaults
-    def __init__(
-            self,
-            sql,
-            follow_task_ids_if_true,
-            follow_task_ids_if_false,
-            conn_id="default_conn_id",
-            database=None,
-            parameters=None,
-            *args,
-            **kwargs):
-        super(BranchSqlOperator, self).__init__(*args, **kwargs)
-        self.conn_id = conn_id
-        self.sql = sql
-        self.parameters = parameters
-        self.follow_task_ids_if_true = follow_task_ids_if_true
-        self.follow_task_ids_if_false = follow_task_ids_if_false
-        self.database = database
-        self._hook = None
-
-    def _get_hook(self):
-        self.log.debug("Get connection for %s", self.conn_id)
-        conn = BaseHook.get_connection(self.conn_id)
-
-        if conn.conn_type not in ALLOWED_CONN_TYPE:
-            raise AirflowException(
-                "The connection type is not supported by BranchSqlOperator. "
-                + "Supported connection types: {}".format(list(ALLOWED_CONN_TYPE))
-            )
-
-        if not self._hook:
-            self._hook = conn.get_hook()
-            if self.database:
-                self._hook.schema = self.database
-
-        return self._hook
-
-    def execute(self, context):
-        # get supported hook
-        self._hook = self._get_hook()
-
-        if self._hook is None:
-            raise AirflowException(
-                "Failed to establish connection to '%s'" % self.conn_id
-            )
-
-        if self.sql is None:
-            raise AirflowException("Expected 'sql' parameter is missing.")
-
-        if self.follow_task_ids_if_true is None:
-            raise AirflowException(
-                "Expected 'follow_task_ids_if_true' paramter is missing."
-            )
-
-        if self.follow_task_ids_if_false is None:
-            raise AirflowException(
-                "Expected 'follow_task_ids_if_false' parameter is missing."
-            )
-
-        self.log.info(
-            "Executing: %s (with parameters %s) with connection: %s",
-            self.sql,
-            self.parameters,
-            self._hook,
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.operators.sql.BranchSQLOperator`.""",
+            DeprecationWarning, stacklevel=2
         )
-        record = self._hook.get_first(self.sql, self.parameters)
-        if not record:
-            raise AirflowException(
-                "No rows returned from sql query. Operator expected True or False return value."
-            )
-
-        if isinstance(record, list):
-            if isinstance(record[0], list):
-                query_result = record[0][0]
-            else:
-                query_result = record[0]
-        elif isinstance(record, tuple):
-            query_result = record[0]
-        else:
-            query_result = record
-
-        self.log.info("Query returns %s, type '%s'", query_result, type(query_result))
-
-        follow_branch = None
-        try:
-            if isinstance(query_result, bool):
-                if query_result:
-                    follow_branch = self.follow_task_ids_if_true
-            elif isinstance(query_result, str):
-                # return result is not Boolean, try to convert from String to Boolean
-                if bool(strtobool(query_result)):
-                    follow_branch = self.follow_task_ids_if_true
-            elif isinstance(query_result, int):
-                if bool(query_result):
-                    follow_branch = self.follow_task_ids_if_true
-            else:
-                raise AirflowException(
-                    "Unexpected query return result '%s' type '%s'"
-                    % (query_result, type(query_result))
-                )
-
-            if follow_branch is None:
-                follow_branch = self.follow_task_ids_if_false
-        except ValueError:
-            raise AirflowException(
-                "Unexpected query return result '%s' type '%s'"
-                % (query_result, type(query_result))
-            )
-
-        self.skip_all_except(context["ti"], follow_branch)
+        super(BranchSqlOperator, self).__init__(*args, **kwargs)
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 55176f8..1fd11c3 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -57,10 +57,6 @@ Fundamentals
 
    * - :mod:`airflow.operators.branch_operator`
      -
-
-   * - :mod:`airflow.operators.check_operator`
-     -
-
    * - :mod:`airflow.operators.dagrun_operator`
      -
 
@@ -76,7 +72,7 @@ Fundamentals
    * - :mod:`airflow.operators.subdag_operator`
      -
 
-   * - :mod:`airflow.operators.sql_branch_operator`
+   * - :mod:`airflow.operators.sql`
      -
 
 **Sensors:**
@@ -90,9 +86,6 @@ Fundamentals
    * - :mod:`airflow.sensors.weekday_sensor`
      -
 
-   * - :mod:`airflow.sensors.external_task_sensor`
-     - :doc:`How to use <howto/operator/external_task_sensor>`
-
    * - :mod:`airflow.sensors.sql_sensor`
      -
 
@@ -470,7 +463,7 @@ These integrations allow you to copy data from/to Amazon Web Services.
 
    * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
      - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
-     - :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     -
      - :mod:`airflow.contrib.operators.s3_to_gcs_operator`,
        :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
 
@@ -551,7 +544,7 @@ These integrations allow you to perform various operations within the Google Clo
      - Sensors
 
    * - `AutoML <https://cloud.google.com/automl/>`__
-     - :doc:`How to use <howto/operator/gcp/automl>`
+     -
      - :mod:`airflow.gcp.hooks.automl`
      - :mod:`airflow.gcp.operators.automl`
      -
@@ -563,7 +556,7 @@ These integrations allow you to perform various operations within the Google Clo
      - :mod:`airflow.gcp.sensors.bigquery`
 
    * - `BigQuery Data Transfer Service <https://cloud.google.com/bigquery/transfer/>`__
-     - :doc:`How to use <howto/operator/gcp/bigquery_dts>`
+     -
      - :mod:`airflow.gcp.hooks.bigquery_dts`
      - :mod:`airflow.gcp.operators.bigquery_dts`
      - :mod:`airflow.gcp.sensors.bigquery_dts`
@@ -611,7 +604,7 @@ These integrations allow you to perform various operations within the Google Clo
      -
 
    * - `Cloud Functions <https://cloud.google.com/functions/>`__
-     - :doc:`How to use <howto/operator/gcp/functions>`
+     - :doc:`How to use <howto/operator/gcp/function>`
      - :mod:`airflow.gcp.hooks.functions`
      - :mod:`airflow.gcp.operators.functions`
      -
@@ -635,7 +628,7 @@ These integrations allow you to perform various operations within the Google Clo
      -
 
    * - `Cloud Memorystore <https://cloud.google.com/memorystore/>`__
-     - :doc:`How to use <howto/operator/gcp/cloud_memorystore>`
+     -
      - :mod:`airflow.gcp.hooks.cloud_memorystore`
      - :mod:`airflow.gcp.operators.cloud_memorystore`
      -
@@ -677,7 +670,7 @@ These integrations allow you to perform various operations within the Google Clo
      - :mod:`airflow.gcp.sensors.gcs`
 
    * - `Storage Transfer Service <https://cloud.google.com/storage/transfer/>`__
-     - :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     -
      - :mod:`airflow.gcp.hooks.cloud_storage_transfer_service`
      - :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
      - :mod:`airflow.gcp.sensors.cloud_storage_transfer_service`
@@ -701,7 +694,7 @@ These integrations allow you to perform various operations within the Google Clo
      -
 
    * - `Cloud Video Intelligence <https://cloud.google.com/video_intelligence/>`__
-     - :doc:`How to use <howto/operator/gcp/video_intelligence>`
+     - :doc:`How to use <howto/operator/gcp/video>`
      - :mod:`airflow.gcp.hooks.video_intelligence`
      - :mod:`airflow.gcp.operators.video_intelligence`
      -
@@ -741,7 +734,7 @@ These integrations allow you to copy data from/to Google Cloud Platform.
 
    * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
      - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
-     - :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     -
      - :mod:`airflow.contrib.operators.s3_to_gcs_operator`,
        :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
 
@@ -772,8 +765,7 @@ These integrations allow you to copy data from/to Google Cloud Platform.
 
    * - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
      - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
-     - :doc:`How to use <howto/operator/gcp/gcs_to_gcs>`,
-       :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     -
      - :mod:`airflow.operators.gcs_to_gcs`,
        :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
 
@@ -1037,7 +1029,7 @@ These integrations allow you to perform various operations using various softwar
      - :mod:`airflow.contrib.sensors.bash_sensor`
 
    * - `Kubernetes <https://kubernetes.io/>`__
-     - :doc:`How to use <howto/operator/kubernetes>`
+     -
      -
      - :mod:`airflow.contrib.operators.kubernetes_pod_operator`
      -
diff --git a/tests/api/common/experimental/test_pool.py b/tests/api/common/experimental/test_pool.py
index 29c7105..79944de 100644
--- a/tests/api/common/experimental/test_pool.py
+++ b/tests/api/common/experimental/test_pool.py
@@ -131,8 +131,8 @@ class TestPool(unittest.TestCase):
                                     name=name)
 
     def test_delete_default_pool_not_allowed(self):
-        with self.assertRaisesRegex(AirflowBadRequest,
-                                    "^default_pool cannot be deleted$"):
+        with self.assertRaisesRegexp(AirflowBadRequest,
+                                     "^default_pool cannot be deleted$"):
             pool_api.delete_pool(Pool.DEFAULT_POOL_NAME)
 
 
diff --git a/tests/contrib/hooks/test_gcp_api_base_hook.py b/tests/contrib/hooks/test_gcp_api_base_hook.py
index e3f99b5..9fc9924 100644
--- a/tests/contrib/hooks/test_gcp_api_base_hook.py
+++ b/tests/contrib/hooks/test_gcp_api_base_hook.py
@@ -98,7 +98,7 @@ class QuotaRetryTestCase(unittest.TestCase):  # ptlint: disable=invalid-name
         self.assertEqual(5, custom_fn.counter)
 
     def test_raise_exception_on_non_quota_exception(self):
-        with six.assertRaisesRegex(self, Forbidden, "Daily Limit Exceeded"):
+        with six.assertRaisesRegexp(self, Forbidden, "Daily Limit Exceeded"):
             message = "POST https://translation.googleapis.com/language/translate/v2: Daily Limit Exceeded"
             errors = [
                 {'message': 'Daily Limit Exceeded', 'domain': 'usageLimits', 'reason': 'dailyLimitExceeded'}
diff --git a/tests/contrib/hooks/test_gcp_cloud_build_hook.py b/tests/contrib/hooks/test_gcp_cloud_build_hook.py
index 81c7ae0..6d5aa16 100644
--- a/tests/contrib/hooks/test_gcp_cloud_build_hook.py
+++ b/tests/contrib/hooks/test_gcp_cloud_build_hook.py
@@ -117,7 +117,7 @@ class TestCloudBuildHookWithPassedProjectId(unittest.TestCase):
 
         execute_mock = mock.Mock(**{"side_effect": [TEST_WAITING_OPERATION, TEST_ERROR_OPERATION]})
         service_mock.operations.return_value.get.return_value.execute = execute_mock
-        with six.assertRaisesRegex(self, AirflowException, "error"):
+        with six.assertRaisesRegexp(self, AirflowException, "error"):
             self.hook.create_build(body={})
 
 
@@ -186,7 +186,7 @@ class TestGcpComputeHookWithDefaultProjectIdFromConnection(unittest.TestCase):
 
         execute_mock = mock.Mock(**{"side_effect": [TEST_WAITING_OPERATION, TEST_ERROR_OPERATION]})
         service_mock.operations.return_value.get.return_value.execute = execute_mock
-        with six.assertRaisesRegex(self, AirflowException, "error"):
+        with six.assertRaisesRegexp(self, AirflowException, "error"):
             self.hook.create_build(body={})
 
 
diff --git a/tests/contrib/hooks/test_gcp_transfer_hook.py b/tests/contrib/hooks/test_gcp_transfer_hook.py
index ab0cfbf..e78d67b 100644
--- a/tests/contrib/hooks/test_gcp_transfer_hook.py
+++ b/tests/contrib/hooks/test_gcp_transfer_hook.py
@@ -265,7 +265,7 @@ class TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
         }
 
         get_conn.return_value.transferOperations.return_value.list_next.return_value = None
-        with six.assertRaisesRegex(
+        with six.assertRaisesRegexp(
             self, AirflowException, "An unexpected operation status was encountered. Expected: SUCCESS"
         ):
             self.gct_hook.wait_for_transfer_job(
@@ -298,7 +298,7 @@ class TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
     def test_operations_contain_expected_statuses_red_path(self, statuses, expected_statuses):
         operations = [{NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: status}} for status in statuses]
 
-        with six.assertRaisesRegex(
+        with six.assertRaisesRegexp(
             self,
             AirflowException,
             "An unexpected operation status was encountered. Expected: {}".format(
diff --git a/tests/contrib/operators/test_gcp_cloud_build_operator.py b/tests/contrib/operators/test_gcp_cloud_build_operator.py
index 2136757..4a7ca73 100644
--- a/tests/contrib/operators/test_gcp_cloud_build_operator.py
+++ b/tests/contrib/operators/test_gcp_cloud_build_operator.py
@@ -42,7 +42,7 @@ TEST_PROJECT_ID = "example-id"
 
 class BuildProcessorTestCase(TestCase):
     def test_verify_source(self):
-        with six.assertRaisesRegex(self, AirflowException, "The source could not be determined."):
+        with six.assertRaisesRegexp(self, AirflowException, "The source could not be determined."):
             BuildProcessor(body={"source": {"storageSource": {}, "repoSource": {}}}).process_body()
 
     @parameterized.expand(
@@ -77,7 +77,7 @@ class BuildProcessorTestCase(TestCase):
     )
     def test_convert_repo_url_to_storage_dict_invalid(self, url):
         body = {"source": {"repoSource": url}}
-        with six.assertRaisesRegex(self, AirflowException, "Invalid URL."):
+        with six.assertRaisesRegexp(self, AirflowException, "Invalid URL."):
             BuildProcessor(body=body).process_body()
 
     @parameterized.expand(
@@ -102,7 +102,7 @@ class BuildProcessorTestCase(TestCase):
     )
     def test_convert_storage_url_to_dict_invalid(self, url):
         body = {"source": {"storageSource": url}}
-        with six.assertRaisesRegex(self, AirflowException, "Invalid URL."):
+        with six.assertRaisesRegexp(self, AirflowException, "Invalid URL."):
             BuildProcessor(body=body).process_body()
 
     @parameterized.expand([("storageSource",), ("repoSource",)])
@@ -128,7 +128,7 @@ class GcpCloudBuildCreateBuildOperatorTestCase(TestCase):
 
     @parameterized.expand([({},), (None,)])
     def test_missing_input(self, body):
-        with six.assertRaisesRegex(self, AirflowException, "The required parameter 'body' is missing"):
+        with six.assertRaisesRegexp(self, AirflowException, "The required parameter 'body' is missing"):
             CloudBuildCreateBuildOperator(body=body, project_id=TEST_PROJECT_ID, task_id="task-id")
 
     @mock.patch(  # type: ignore
diff --git a/tests/contrib/operators/test_gcs_to_gdrive.py b/tests/contrib/operators/test_gcs_to_gdrive.py
index 4f49055..03a4e66 100644
--- a/tests/contrib/operators/test_gcs_to_gdrive.py
+++ b/tests/contrib/operators/test_gcs_to_gdrive.py
@@ -147,5 +147,5 @@ class TestGcsToGDriveOperator(unittest.TestCase):
         task = GcsToGDriveOperator(
             task_id="move_files", source_bucket="data", source_object="sales/*/*.avro", move_object=True
         )
-        with six.assertRaisesRegex(self, AirflowException, "Only one wildcard"):
+        with six.assertRaisesRegexp(self, AirflowException, "Only one wildcard"):
             task.execute(mock.MagicMock())
diff --git a/tests/contrib/operators/test_sftp_operator.py b/tests/contrib/operators/test_sftp_operator.py
index 24db36e..d597d8d 100644
--- a/tests/contrib/operators/test_sftp_operator.py
+++ b/tests/contrib/operators/test_sftp_operator.py
@@ -363,9 +363,9 @@ class SFTPOperatorTest(unittest.TestCase):
 
         # Exception should be raised if neither ssh_hook nor ssh_conn_id is provided
         if six.PY2:
-            self.assertRaisesRegex = self.assertRaisesRegexp
-        with self.assertRaisesRegex(AirflowException,
-                                    "Cannot operate without ssh_hook or ssh_conn_id."):
+            self.assertRaisesRegexp = self.assertRaisesRegexp
+        with self.assertRaisesRegexp(AirflowException,
+                                     "Cannot operate without ssh_hook or ssh_conn_id."):
             task_0 = SFTPOperator(
                 task_id="test_sftp",
                 local_filepath=self.test_local_filepath,
diff --git a/tests/contrib/operators/test_ssh_operator.py b/tests/contrib/operators/test_ssh_operator.py
index f2294ba..aa113ec 100644
--- a/tests/contrib/operators/test_ssh_operator.py
+++ b/tests/contrib/operators/test_ssh_operator.py
@@ -153,9 +153,9 @@ class SSHOperatorTest(TestCase):
 
         # Exception should be raised if neither ssh_hook nor ssh_conn_id is provided
         if six.PY2:
-            self.assertRaisesRegex = self.assertRaisesRegexp
-        with self.assertRaisesRegex(AirflowException,
-                                    "Cannot operate without ssh_hook or ssh_conn_id."):
+            self.assertRaisesRegexp = self.assertRaisesRegexp
+        with self.assertRaisesRegexp(AirflowException,
+                                     "Cannot operate without ssh_hook or ssh_conn_id."):
             task_0 = SSHOperator(task_id="test", command=COMMAND,
                                  timeout=TIMEOUT, dag=self.dag)
             task_0.execute(None)
diff --git a/tests/contrib/secrets/test_hashicorp_vault.py b/tests/contrib/secrets/test_hashicorp_vault.py
index 0d52c3a..1887db3 100644
--- a/tests/contrib/secrets/test_hashicorp_vault.py
+++ b/tests/contrib/secrets/test_hashicorp_vault.py
@@ -217,7 +217,7 @@ class TestVaultSecrets(TestCase):
             "token": "test_wrong_token"
         }
 
-        with six.assertRaisesRegex(self, VaultError, "Vault Authentication Error!"):
+        with six.assertRaisesRegexp(self, VaultError, "Vault Authentication Error!"):
             VaultBackend(**kwargs).get_connections(conn_id='test')
 
     @mock.patch("airflow.contrib.secrets.hashicorp_vault.hvac")
@@ -232,5 +232,5 @@ class TestVaultSecrets(TestCase):
             "url": "http://127.0.0.1:8200",
         }
 
-        with six.assertRaisesRegex(self, VaultError, "token cannot be None for auth_type='token'"):
+        with six.assertRaisesRegexp(self, VaultError, "token cannot be None for auth_type='token'"):
             VaultBackend(**kwargs).get_connections(conn_id='test')
diff --git a/tests/contrib/utils/test_gcp_credentials_provider.py b/tests/contrib/utils/test_gcp_credentials_provider.py
index 3478a42..40e180b 100644
--- a/tests/contrib/utils/test_gcp_credentials_provider.py
+++ b/tests/contrib/utils/test_gcp_credentials_provider.py
@@ -97,7 +97,7 @@ class TestGetGcpCredentialsAndProjectId(unittest.TestCase):
     def test_get_credentials_and_project_id_with_mutually_exclusive_configuration(
         self,
     ):
-        with six.assertRaisesRegex(self, AirflowException, re.escape(
+        with six.assertRaisesRegexp(self, AirflowException, re.escape(
             'The `keyfile_dict` and `key_path` fields are mutually exclusive.'
         )):
             get_credentials_and_project_id(key_path='KEY.json', keyfile_dict={'private_key': 'PRIVATE_KEY'})
diff --git a/tests/operators/test_check_operator.py b/tests/operators/test_check_operator.py
deleted file mode 100644
index 22523a4..0000000
--- a/tests/operators/test_check_operator.py
+++ /dev/null
@@ -1,327 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import six
-import unittest
-from datetime import datetime
-
-from airflow.exceptions import AirflowException
-from airflow.models import DAG
-from airflow.operators.check_operator import (
-    CheckOperator, IntervalCheckOperator, ThresholdCheckOperator, ValueCheckOperator,
-)
-from tests.compat import mock
-
-
-class TestCheckOperator(unittest.TestCase):
-
-    @mock.patch.object(CheckOperator, 'get_db_hook')
-    def test_execute_no_records(self, mock_get_db_hook):
-        mock_get_db_hook.return_value.get_first.return_value = []
-
-        with self.assertRaises(AirflowException):
-            CheckOperator(sql='sql').execute()
-
-    @mock.patch.object(CheckOperator, 'get_db_hook')
-    def test_execute_not_all_records_are_true(self, mock_get_db_hook):
-        mock_get_db_hook.return_value.get_first.return_value = ["data", ""]
-
-        with self.assertRaises(AirflowException):
-            CheckOperator(sql='sql').execute()
-
-
-class TestValueCheckOperator(unittest.TestCase):
-
-    def setUp(self):
-        self.task_id = 'test_task'
-        self.conn_id = 'default_conn'
-
-    def _construct_operator(self, sql, pass_value, tolerance=None):
-        dag = DAG('test_dag', start_date=datetime(2017, 1, 1))
-
-        return ValueCheckOperator(
-            dag=dag,
-            task_id=self.task_id,
-            conn_id=self.conn_id,
-            sql=sql,
-            pass_value=pass_value,
-            tolerance=tolerance)
-
-    def test_pass_value_template_string(self):
-        pass_value_str = "2018-03-22"
-        operator = self._construct_operator('select date from tab1;', "{{ ds }}")
-
-        operator.render_template_fields({'ds': pass_value_str})
-
-        self.assertEqual(operator.task_id, self.task_id)
-        self.assertEqual(operator.pass_value, pass_value_str)
-
-    def test_pass_value_template_string_float(self):
-        pass_value_float = 4.0
-        operator = self._construct_operator('select date from tab1;', pass_value_float)
-
-        operator.render_template_fields({})
-
-        self.assertEqual(operator.task_id, self.task_id)
-        self.assertEqual(operator.pass_value, str(pass_value_float))
-
-    @mock.patch.object(ValueCheckOperator, 'get_db_hook')
-    def test_execute_pass(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [10]
-        mock_get_db_hook.return_value = mock_hook
-        sql = 'select value from tab1 limit 1;'
-        operator = self._construct_operator(sql, 5, 1)
-
-        operator.execute(None)
-
-        mock_hook.get_first.assert_called_with(sql)
-
-    @mock.patch.object(ValueCheckOperator, 'get_db_hook')
-    def test_execute_fail(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [11]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator('select value from tab1 limit 1;', 5, 1)
-
-        with self.assertRaisesRegexp(AirflowException, 'Tolerance:100.0%'):
-            operator.execute()
-
-
-class IntervalCheckOperatorTest(unittest.TestCase):
-
-    def _construct_operator(self, table, metric_thresholds,
-                            ratio_formula, ignore_zero):
-        return IntervalCheckOperator(
-            task_id='test_task',
-            table=table,
-            metrics_thresholds=metric_thresholds,
-            ratio_formula=ratio_formula,
-            ignore_zero=ignore_zero,
-        )
-
-    def test_invalid_ratio_formula(self):
-        with self.assertRaisesRegexp(AirflowException, 'Invalid diff_method'):
-            self._construct_operator(
-                table='test_table',
-                metric_thresholds={
-                    'f1': 1,
-                },
-                ratio_formula='abs',
-                ignore_zero=False,
-            )
-
-    @mock.patch.object(IntervalCheckOperator, 'get_db_hook')
-    def test_execute_not_ignore_zero(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [0]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            table='test_table',
-            metric_thresholds={
-                'f1': 1,
-            },
-            ratio_formula='max_over_min',
-            ignore_zero=False,
-        )
-
-        with self.assertRaises(AirflowException):
-            operator.execute()
-
-    @mock.patch.object(IntervalCheckOperator, 'get_db_hook')
-    def test_execute_ignore_zero(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [0]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            table='test_table',
-            metric_thresholds={
-                'f1': 1,
-            },
-            ratio_formula='max_over_min',
-            ignore_zero=True,
-        )
-
-        operator.execute()
-
-    @mock.patch.object(IntervalCheckOperator, 'get_db_hook')
-    def test_execute_min_max(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-
-        def returned_row():
-            rows = [
-                [2, 2, 2, 2],  # reference
-                [1, 1, 1, 1],  # current
-            ]
-
-            for r in rows:
-                yield r
-
-        mock_hook.get_first.side_effect = returned_row()
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            table='test_table',
-            metric_thresholds={
-                'f0': 1.0,
-                'f1': 1.5,
-                'f2': 2.0,
-                'f3': 2.5,
-            },
-            ratio_formula='max_over_min',
-            ignore_zero=True,
-        )
-
-        with self.assertRaisesRegexp(AirflowException, "f0, f1, f2"):
-            operator.execute()
-
-    @mock.patch.object(IntervalCheckOperator, 'get_db_hook')
-    def test_execute_diff(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-
-        def returned_row():
-            rows = [
-                [3, 3, 3, 3],  # reference
-                [1, 1, 1, 1],  # current
-            ]
-
-            for r in rows:
-                yield r
-
-        mock_hook.get_first.side_effect = returned_row()
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            table='test_table',
-            metric_thresholds={
-                'f0': 0.5,
-                'f1': 0.6,
-                'f2': 0.7,
-                'f3': 0.8,
-            },
-            ratio_formula='relative_diff',
-            ignore_zero=True,
-        )
-
-        with self.assertRaisesRegexp(AirflowException, "f0, f1"):
-            operator.execute()
-
-
-class TestThresholdCheckOperator(unittest.TestCase):
-
-    def _construct_operator(self, sql, min_threshold, max_threshold):
-        dag = DAG('test_dag', start_date=datetime(2017, 1, 1))
-
-        return ThresholdCheckOperator(
-            task_id='test_task',
-            sql=sql,
-            min_threshold=min_threshold,
-            max_threshold=max_threshold,
-            dag=dag
-        )
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_pass_min_value_max_value(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [(10,)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select avg(val) from table1 limit 1',
-            1,
-            100
-        )
-
-        operator.execute()
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_fail_min_value_max_value(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [(10,)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select avg(val) from table1 limit 1',
-            20,
-            100
-        )
-
-        with six.assertRaisesRegex(self, AirflowException, '10.*20.0.*100.0'):
-            operator.execute()
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_pass_min_sql_max_sql(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select 10',
-            'Select 1',
-            'Select 100'
-        )
-
-        operator.execute()
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_fail_min_sql_max_sql(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select 10',
-            'Select 20',
-            'Select 100'
-        )
-
-        with six.assertRaisesRegex(self, AirflowException, '10.*20.*100'):
-            operator.execute()
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_pass_min_value_max_sql(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select 75',
-            45,
-            'Select 100'
-        )
-
-        operator.execute()
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_fail_min_sql_max_value(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select 155',
-            'Select 45',
-            100
-        )
-
-        with six.assertRaisesRegex(self, AirflowException, '155.*45.*100.0'):
-            operator.execute()
diff --git a/tests/operators/test_sql_branch_operator.py b/tests/operators/test_sql.py
similarity index 57%
rename from tests/operators/test_sql_branch_operator.py
rename to tests/operators/test_sql.py
index 6510609..ca88883 100644
--- a/tests/operators/test_sql_branch_operator.py
+++ b/tests/operators/test_sql.py
@@ -24,8 +24,11 @@ import pytest
 
 from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.check_operator import (
+    CheckOperator, IntervalCheckOperator, ThresholdCheckOperator, ValueCheckOperator,
+)
 from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.sql_branch_operator import BranchSqlOperator
+from airflow.operators.sql import BranchSQLOperator
 from airflow.utils import timezone
 from airflow.utils.db import create_session
 from airflow.utils.state import State
@@ -60,6 +63,266 @@ SUPPORTED_FALSE_VALUES = [
 ]
 
 
+class TestCheckOperator(unittest.TestCase):
+    @mock.patch.object(CheckOperator, "get_db_hook")
+    def test_execute_no_records(self, mock_get_db_hook):
+        mock_get_db_hook.return_value.get_first.return_value = []
+
+        with self.assertRaises(AirflowException):
+            CheckOperator(sql="sql").execute()
+
+    @mock.patch.object(CheckOperator, "get_db_hook")
+    def test_execute_not_all_records_are_true(self, mock_get_db_hook):
+        mock_get_db_hook.return_value.get_first.return_value = ["data", ""]
+
+        with self.assertRaises(AirflowException):
+            CheckOperator(sql="sql").execute()
+
+
+class TestValueCheckOperator(unittest.TestCase):
+    def setUp(self):
+        self.task_id = "test_task"
+        self.conn_id = "default_conn"
+
+    def _construct_operator(self, sql, pass_value, tolerance=None):
+        dag = DAG("test_dag", start_date=datetime.datetime(2017, 1, 1))
+
+        return ValueCheckOperator(
+            dag=dag,
+            task_id=self.task_id,
+            conn_id=self.conn_id,
+            sql=sql,
+            pass_value=pass_value,
+            tolerance=tolerance,
+        )
+
+    def test_pass_value_template_string(self):
+        pass_value_str = "2018-03-22"
+        operator = self._construct_operator(
+            "select date from tab1;", "{{ ds }}")
+
+        operator.render_template_fields({"ds": pass_value_str})
+
+        self.assertEqual(operator.task_id, self.task_id)
+        self.assertEqual(operator.pass_value, pass_value_str)
+
+    def test_pass_value_template_string_float(self):
+        pass_value_float = 4.0
+        operator = self._construct_operator(
+            "select date from tab1;", pass_value_float)
+
+        operator.render_template_fields({})
+
+        self.assertEqual(operator.task_id, self.task_id)
+        self.assertEqual(operator.pass_value, str(pass_value_float))
+
+    @mock.patch.object(ValueCheckOperator, "get_db_hook")
+    def test_execute_pass(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [10]
+        mock_get_db_hook.return_value = mock_hook
+        sql = "select value from tab1 limit 1;"
+        operator = self._construct_operator(sql, 5, 1)
+
+        operator.execute(None)
+
+        mock_hook.get_first.assert_called_once_with(sql)
+
+    @mock.patch.object(ValueCheckOperator, "get_db_hook")
+    def test_execute_fail(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [11]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            "select value from tab1 limit 1;", 5, 1)
+
+        with self.assertRaisesRegexp(AirflowException, "Tolerance:100.0%"):
+            operator.execute()
+
+
+class TestIntervalCheckOperator(unittest.TestCase):
+    def _construct_operator(self, table, metric_thresholds, ratio_formula, ignore_zero):
+        return IntervalCheckOperator(
+            task_id="test_task",
+            table=table,
+            metrics_thresholds=metric_thresholds,
+            ratio_formula=ratio_formula,
+            ignore_zero=ignore_zero,
+        )
+
+    def test_invalid_ratio_formula(self):
+        with self.assertRaisesRegexp(AirflowException, "Invalid diff_method"):
+            self._construct_operator(
+                table="test_table",
+                metric_thresholds={"f1": 1, },
+                ratio_formula="abs",
+                ignore_zero=False,
+            )
+
+    @mock.patch.object(IntervalCheckOperator, "get_db_hook")
+    def test_execute_not_ignore_zero(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [0]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            table="test_table",
+            metric_thresholds={"f1": 1, },
+            ratio_formula="max_over_min",
+            ignore_zero=False,
+        )
+
+        with self.assertRaises(AirflowException):
+            operator.execute()
+
+    @mock.patch.object(IntervalCheckOperator, "get_db_hook")
+    def test_execute_ignore_zero(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [0]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            table="test_table",
+            metric_thresholds={"f1": 1, },
+            ratio_formula="max_over_min",
+            ignore_zero=True,
+        )
+
+        operator.execute()
+
+    @mock.patch.object(IntervalCheckOperator, "get_db_hook")
+    def test_execute_min_max(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+
+        def returned_row():
+            rows = [
+                [2, 2, 2, 2],  # reference
+                [1, 1, 1, 1],  # current
+            ]
+            return rows
+
+        mock_hook.get_first.side_effect = returned_row()
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            table="test_table",
+            metric_thresholds={"f0": 1.0, "f1": 1.5, "f2": 2.0, "f3": 2.5, },
+            ratio_formula="max_over_min",
+            ignore_zero=True,
+        )
+
+        with self.assertRaisesRegexp(AirflowException, "f0, f1, f2"):
+            operator.execute()
+
+    @mock.patch.object(IntervalCheckOperator, "get_db_hook")
+    def test_execute_diff(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+
+        def returned_row():
+            rows = [
+                [3, 3, 3, 3],  # reference
+                [1, 1, 1, 1],  # current
+            ]
+
+            return rows
+
+        mock_hook.get_first.side_effect = returned_row()
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            table="test_table",
+            metric_thresholds={"f0": 0.5, "f1": 0.6, "f2": 0.7, "f3": 0.8, },
+            ratio_formula="relative_diff",
+            ignore_zero=True,
+        )
+
+        with self.assertRaisesRegexp(AirflowException, "f0, f1"):
+            operator.execute()
+
+
+class TestThresholdCheckOperator(unittest.TestCase):
+    def _construct_operator(self, sql, min_threshold, max_threshold):
+        dag = DAG("test_dag", start_date=datetime.datetime(2017, 1, 1))
+
+        return ThresholdCheckOperator(
+            task_id="test_task",
+            sql=sql,
+            min_threshold=min_threshold,
+            max_threshold=max_threshold,
+            dag=dag,
+        )
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_pass_min_value_max_value(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [(10,)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            "Select avg(val) from table1 limit 1", 1, 100
+        )
+
+        operator.execute()
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_fail_min_value_max_value(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [(10,)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            "Select avg(val) from table1 limit 1", 20, 100
+        )
+
+        with self.assertRaisesRegexp(AirflowException, "10.*20.0.*100.0"):
+            operator.execute()
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_pass_min_sql_max_sql(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            "Select 10", "Select 1", "Select 100")
+
+        operator.execute()
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_fail_min_sql_max_sql(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            "Select 10", "Select 20", "Select 100")
+
+        with self.assertRaisesRegexp(AirflowException, "10.*20.*100"):
+            operator.execute()
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_pass_min_value_max_sql(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator("Select 75", 45, "Select 100")
+
+        operator.execute()
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_fail_min_sql_max_value(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator("Select 155", "Select 45", 100)
+
+        with self.assertRaisesRegexp(AirflowException, "155.*45.*100.0"):
+            operator.execute()
+
+
 class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
     """
     Test for SQL Branch Operator
@@ -92,8 +355,8 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
             session.query(TI).delete()
 
     def test_unsupported_conn_type(self):
-        """ Check if BranchSqlOperator throws an exception for unsupported connection type """
-        op = BranchSqlOperator(
+        """ Check if BranchSQLOperator throws an exception for unsupported connection type """
+        op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="redis_default",
             sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
@@ -103,11 +366,12 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
         )
 
         with self.assertRaises(AirflowException):
-            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            op.run(start_date=DEFAULT_DATE,
+                   end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_invalid_conn(self):
-        """ Check if BranchSqlOperator throws an exception for invalid connection """
-        op = BranchSqlOperator(
+        """ Check if BranchSQLOperator throws an exception for invalid connection """
+        op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="invalid_connection",
             sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
@@ -117,11 +381,12 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
         )
 
         with self.assertRaises(AirflowException):
-            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            op.run(start_date=DEFAULT_DATE,
+                   end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_invalid_follow_task_true(self):
-        """ Check if BranchSqlOperator throws an exception for invalid connection """
-        op = BranchSqlOperator(
+        """ Check if BranchSQLOperator throws an exception for invalid connection """
+        op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="invalid_connection",
             sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
@@ -131,11 +396,12 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
         )
 
         with self.assertRaises(AirflowException):
-            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            op.run(start_date=DEFAULT_DATE,
+                   end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_invalid_follow_task_false(self):
-        """ Check if BranchSqlOperator throws an exception for invalid connection """
-        op = BranchSqlOperator(
+        """ Check if BranchSQLOperator throws an exception for invalid connection """
+        op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="invalid_connection",
             sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
@@ -145,12 +411,13 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
         )
 
         with self.assertRaises(AirflowException):
-            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            op.run(start_date=DEFAULT_DATE,
+                   end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     @pytest.mark.backend("mysql")
     def test_sql_branch_operator_mysql(self):
-        """ Check if BranchSqlOperator works with backend """
-        branch_op = BranchSqlOperator(
+        """ Check if BranchSQLOperator works with backend """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -164,8 +431,8 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
 
     @pytest.mark.backend("postgres")
     def test_sql_branch_operator_postgres(self):
-        """ Check if BranchSqlOperator works with backend """
-        branch_op = BranchSqlOperator(
+        """ Check if BranchSQLOperator works with backend """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="postgres_default",
             sql="SELECT 1",
@@ -177,10 +444,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
             start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True
         )
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_branch_single_value_with_dag_run(self, mock_hook):
-        """ Check BranchSqlOperator branch operation """
-        branch_op = BranchSqlOperator(
+        """ Check BranchSQLOperator branch operation """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -220,10 +487,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
             else:
                 raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_branch_true_with_dag_run(self, mock_hook):
-        """ Check BranchSqlOperator branch operation """
-        branch_op = BranchSqlOperator(
+        """ Check BranchSQLOperator branch operation """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -264,10 +531,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
                 else:
                     raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_branch_false_with_dag_run(self, mock_hook):
-        """ Check BranchSqlOperator branch operation """
-        branch_op = BranchSqlOperator(
+        """ Check BranchSQLOperator branch operation """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -308,10 +575,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
                 else:
                     raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_branch_list_with_dag_run(self, mock_hook):
-        """ Checks if the BranchSqlOperator supports branching off to a list of tasks."""
-        branch_op = BranchSqlOperator(
+        """ Checks if the BranchSQLOperator supports branching off to a list of tasks."""
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -354,10 +621,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
             else:
                 raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_invalid_query_result_with_dag_run(self, mock_hook):
-        """ Check BranchSqlOperator branch operation """
-        branch_op = BranchSqlOperator(
+        """ Check BranchSQLOperator branch operation """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -387,10 +654,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
         with self.assertRaises(AirflowException):
             branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_with_skip_in_branch_downstream_dependencies(self, mock_hook):
         """ Test SQL Branch with skipping all downstream dependencies """
-        branch_op = BranchSqlOperator(
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -431,10 +698,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
                 else:
                     raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_with_skip_in_branch_downstream_dependencies2(self, mock_hook):
         """ Test skipping downstream dependency for false condition"""
-        branch_op = BranchSqlOperator(
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
diff --git a/tests/www_rbac/test_validators.py b/tests/www_rbac/test_validators.py
index 4a543ff..415c53f 100644
--- a/tests/www_rbac/test_validators.py
+++ b/tests/www_rbac/test_validators.py
@@ -119,7 +119,7 @@ class TestValidJson(unittest.TestCase):
     def test_validation_raises_default_message(self):
         self.form_field_mock.data = '2017-05-04'
 
-        six.assertRaisesRegex(
+        six.assertRaisesRegexp(
             self,
             validators.ValidationError,
             "JSON Validation Error:.*",
@@ -129,7 +129,7 @@ class TestValidJson(unittest.TestCase):
     def test_validation_raises_custom_message(self):
         self.form_field_mock.data = '2017-05-04'
 
-        six.assertRaisesRegex(
+        six.assertRaisesRegexp(
             self,
             validators.ValidationError,
             "Invalid JSON",