You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/20 10:15:02 UTC

[airflow] branch v1-10-test updated (f2ee8e8 -> 0ac4185)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    omit f2ee8e8  flake8 tests
    omit db795a4  remove core_to_contrib
    omit 2208485  alembic fix
    omit 212a4c8  flake8 pass Merging multiple sql operators (#9124)
    omit 88eebcb  Merging multiple sql operators (#9124)
    omit 52f3796  Fix retries causing constraint violation on MySQL with DAG Serialization (#9336)
    omit a9dc0b5  Add 'main' param to template_fields in DataprocSubmitPySparkJobOperator (#9154)
    omit 5b30d45  Parameterized bash/python in the prod image (#9157)
    omit 5bc5018  [AIRFLOW-3973] Commit after each alembic migration (#4797)
    omit ba645ec  [AIRFLOW-4472] Use json.dumps/loads for templating lineage data (#5253)
    omit 2bdf471  Add SQL Branch Operator
    omit 89b0615  [AIRFLOW-XXX] Extract operators and hooks to separate page (#6213)
    omit b4275b5  Make it possible to silence warnings from Airflow (#9208)
     new 45b91ce  [AIRFLOW-XXX] Extract operators and hooks to separate page (#6213)
     new 619720b  Add SQL Branch Operator
     new 82b7644  [AIRFLOW-4472] Use json.dumps/loads for templating lineage data (#5253)
     new 57225c6  Parameterized bash/python in the prod image (#9157)
     new ba39830  Add 'main' param to template_fields in DataprocSubmitPySparkJobOperator (#9154)
     new c7117cc  Fix retries causing constraint violation on MySQL with DAG Serialization (#9336)
     new 0ac4185  Merging multiple sql operators (#9124)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f2ee8e8)
            \
             N -- N -- N   refs/heads/v1-10-test (0ac4185)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/operators/sql.py                           | 16 +++++++--------
 docs/operators-and-hooks-ref.rst                   | 24 +++++++++-------------
 tests/api/common/experimental/test_pool.py         |  4 ++--
 tests/contrib/hooks/test_gcp_api_base_hook.py      |  2 +-
 tests/contrib/hooks/test_gcp_cloud_build_hook.py   |  4 ++--
 tests/contrib/hooks/test_gcp_transfer_hook.py      |  4 ++--
 .../operators/test_gcp_cloud_build_operator.py     |  8 ++++----
 tests/contrib/operators/test_gcs_to_gdrive.py      |  2 +-
 tests/contrib/operators/test_sftp_operator.py      |  6 +++---
 tests/contrib/operators/test_ssh_operator.py       |  6 +++---
 tests/contrib/secrets/test_hashicorp_vault.py      |  4 ++--
 .../contrib/utils/test_gcp_credentials_provider.py |  2 +-
 tests/operators/test_sql.py                        | 14 ++++++-------
 tests/www_rbac/test_validators.py                  |  4 ++--
 14 files changed, 48 insertions(+), 52 deletions(-)


[airflow] 01/07: [AIRFLOW-XXX] Extract operators and hooks to separate page (#6213)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 45b91ce4a44d37cf5858f6577120eee0cefb2f3b
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Thu Oct 3 08:23:57 2019 +0200

    [AIRFLOW-XXX] Extract operators and hooks to separate page (#6213)
    
    (cherry picked from commit bd822dd8c27f4f7c584da4a5d0524140e64d7613)
---
 docs/index.rst                   |    1 +
 docs/integration.rst             |  470 +--------------
 docs/operators-and-hooks-ref.rst | 1234 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 1247 insertions(+), 458 deletions(-)

diff --git a/docs/index.rst b/docs/index.rst
index 44717ac..65329d6 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -104,6 +104,7 @@ Content
     :maxdepth: 1
     :caption: References
 
+    Operators and hooks <operators-and-hooks-ref>
     CLI <cli-ref>
     Macros <macros-ref>
     Python API <_api/index>
diff --git a/docs/integration.rst b/docs/integration.rst
index 74a8207..ae252a6 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -18,461 +18,15 @@
 Integration
 ===========
 
-.. contents:: Content
-  :local:
-  :depth: 1
-
-.. _Azure:
-
-Azure: Microsoft Azure
-----------------------
-
-Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob
-Storage and Azure Data Lake. Hook, Sensor and Operator for Blob Storage and
-Azure Data Lake Hook are in contrib section.
-
-Logging
-'''''''
-
-Airflow can be configured to read and write task logs in Azure Blob Storage.
-See :ref:`write-logs-azure`.
-
-
-Azure Blob Storage
-''''''''''''''''''
-
-All classes communicate via the Window Azure Storage Blob protocol. Make sure that a
-Airflow connection of type ``wasb`` exists. Authorization can be done by supplying a
-login (=Storage account name) and password (=KEY), or login and SAS token in the extra
-field (see connection ``wasb_default`` for an example).
-
-The operators are defined in the following module:
-
-* :mod:`airflow.contrib.sensors.wasb_sensor`
-* :mod:`airflow.contrib.operators.wasb_delete_blob_operator`
-* :mod:`airflow.contrib.operators.file_to_wasb`
-
-They use :class:`airflow.contrib.hooks.wasb_hook.WasbHook` to communicate with Microsoft Azure.
-
-Azure File Share
-''''''''''''''''
-
-Cloud variant of a SMB file share. Make sure that a Airflow connection of
-type ``wasb`` exists. Authorization can be done by supplying a login (=Storage account name)
-and password (=Storage account key), or login and SAS token in the extra field
-(see connection ``wasb_default`` for an example).
-
-It uses :class:`airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook` to communicate with Microsoft Azure.
-
-Azure CosmosDB
-''''''''''''''
-
-AzureCosmosDBHook communicates via the Azure Cosmos library. Make sure that a
-Airflow connection of type ``azure_cosmos`` exists. Authorization can be done by supplying a
-login (=Endpoint uri), password (=secret key) and extra fields database_name and collection_name to specify the
-default database and collection to use (see connection ``azure_cosmos_default`` for an example).
-
-The operators are defined in the following modules:
-
-* :mod:`airflow.contrib.operators.azure_cosmos_operator`
-* :mod:`airflow.contrib.sensors.azure_cosmos_sensor`
-
-They also use :class:`airflow.contrib.hooks.azure_cosmos_hook.AzureCosmosDBHook` to communicate with Microsoft Azure.
-
-Azure Data Lake
-'''''''''''''''
-
-AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a
-Airflow connection of type ``azure_data_lake`` exists. Authorization can be done by supplying a
-login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name (Account Name)
-(see connection ``azure_data_lake_default`` for an example).
-
-The operators are defined in the following modules:
-
-* :mod:`airflow.contrib.operators.adls_list_operator`
-* :mod:`airflow.contrib.operators.adls_to_gcs`
-
-They also use :class:`airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook` to communicate with Microsoft Azure.
-
-
-Azure Container Instances
-'''''''''''''''''''''''''
-
-Azure Container Instances provides a method to run a docker container without having to worry
-about managing infrastructure. The AzureContainerInstanceHook requires a service principal. The
-credentials for this principal can either be defined in the extra field ``key_path``, as an
-environment variable named ``AZURE_AUTH_LOCATION``,
-or by providing a login/password and tenantId in extras.
-
-The operator is defined in the :mod:`airflow.contrib.operators.azure_container_instances_operator` module.
-
-They also use :class:`airflow.contrib.hooks.azure_container_volume_hook.AzureContainerVolumeHook`,
-:class:`airflow.contrib.hooks.azure_container_registry_hook.AzureContainerRegistryHook` and
-:class:`airflow.contrib.hooks.azure_container_instance_hook.AzureContainerInstanceHook` to communicate with Microsoft Azure.
-
-The AzureContainerRegistryHook requires a host/login/password to be defined in the connection.
-
-
-.. _AWS:
-
-AWS: Amazon Web Services
-------------------------
-
-Airflow has extensive support for Amazon Web Services. But note that the Hooks, Sensors and
-Operators are in the contrib section.
-
-Logging
-'''''''
-
-Airflow can be configured to read and write task logs in Amazon Simple Storage Service (Amazon S3).
-See :ref:`write-logs-amazon`.
-
-
-AWS EMR
-'''''''
-
-The operators are defined in the following modules:
-
-* :mod:`airflow.contrib.operators.emr_add_steps_operator`
-* :mod:`airflow.contrib.operators.emr_create_job_flow_operator`
-* :mod:`airflow.contrib.operators.emr_terminate_job_flow_operator`
-
-They also use :class:`airflow.contrib.hooks.emr_hook.EmrHook` to communicate with Amazon Web Service.
-
-AWS S3
-''''''
-
-The operators are defined in the following modules:
-
-* :mod:`airflow.operators.s3_file_transform_operator`
-* :mod:`airflow.contrib.operators.s3_list_operator`
-* :mod:`airflow.contrib.operators.s3_to_gcs_operator`
-* :mod:`airflow.contrib.operators.s3_to_gcs_transfer_operator`
-* :mod:`airflow.operators.s3_to_hive_operator`
-
-They also use :class:`airflow.hooks.S3_hook.S3Hook` to communicate with Amazon Web Service.
-
-AWS Batch Service
-'''''''''''''''''
-
-The operator is defined in the :class:`airflow.contrib.operators.awsbatch_operator.AWSBatchOperator` module.
-
-AWS RedShift
-''''''''''''
-
-The operators are defined in the following modules:
-
-* :mod:`airflow.contrib.sensors.aws_redshift_cluster_sensor`
-* :mod:`airflow.operators.redshift_to_s3_operator`
-* :mod:`airflow.operators.s3_to_redshift_operator`
-
-They also use :class:`airflow.contrib.hooks.redshift_hook.RedshiftHook` to communicate with Amazon Web Service.
-
-
-AWS DynamoDB
-''''''''''''
-
-The operator is defined in the :class:`airflow.contrib.operators.hive_to_dynamodb` module.
-
-It uses :class:`airflow.contrib.hooks.aws_dynamodb_hook.AwsDynamoDBHook` to communicate with Amazon Web Service.
-
-
-AWS Lambda
-''''''''''
-
-It uses :class:`airflow.contrib.hooks.aws_lambda_hook.AwsLambdaHook` to communicate with Amazon Web Service.
-
-AWS Kinesis
-'''''''''''
-
-It uses :class:`airflow.contrib.hooks.aws_firehose_hook.AwsFirehoseHook` to communicate with Amazon Web Service.
-
-
-Amazon SageMaker
-''''''''''''''''
-
-For more instructions on using Amazon SageMaker in Airflow, please see `the SageMaker Python SDK README`_.
-
-.. _the SageMaker Python SDK README: https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/workflow/README.rst
-
-The operators are defined in the following modules:
-
-:mod:`airflow.contrib.operators.sagemaker_training_operator`
-:mod:`airflow.contrib.operators.sagemaker_tuning_operator`
-:mod:`airflow.contrib.operators.sagemaker_model_operator`
-:mod:`airflow.contrib.operators.sagemaker_transform_operator`
-:mod:`airflow.contrib.operators.sagemaker_endpoint_config_operator`
-:mod:`airflow.contrib.operators.sagemaker_endpoint_operator`
-
-They uses :class:`airflow.contrib.hooks.sagemaker_hook.SageMakerHook` to communicate with Amazon Web Service.
-
-.. _Databricks:
-
-Databricks
-----------
-
-With contributions from `Databricks <https://databricks.com/>`__, Airflow has several operators
-which enable the submitting and running of jobs to the Databricks platform. Internally the
-operators talk to the ``api/2.0/jobs/runs/submit`` `endpoint <https://docs.databricks.com/api/latest/jobs.html#runs-submit>`_.
-
-The operators are defined in the :class:`airflow.contrib.operators.databricks_operator` module.
-
-.. _GCP:
-
-GCP: Google Cloud Platform
---------------------------
-
-Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and
-Operators are in the contrib section. Meaning that they have a *beta* status, meaning that
-they can have breaking changes between minor releases.
-
-See the :doc:`GCP connection type <howto/connection/gcp>` documentation to
-configure connections to GCP.
-
-Logging
-'''''''
-
-Airflow can be configured to read and write task logs in Google Cloud Storage.
-See :ref:`write-logs-gcp`.
-
-
-GoogleCloudBaseHook
-'''''''''''''''''''
-
-All hooks is based on :class:`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`.
-
-
-BigQuery
-''''''''
-
-The operators are defined in the following module:
-
- * :mod:`airflow.contrib.operators.bigquery_check_operator`
- * :mod:`airflow.contrib.operators.bigquery_get_data`
- * :mod:`airflow.contrib.operators.bigquery_table_delete_operator`
- * :mod:`airflow.contrib.operators.bigquery_to_bigquery`
- * :mod:`airflow.contrib.operators.bigquery_to_gcs`
-
-They also use :class:`airflow.contrib.hooks.bigquery_hook.BigQueryHook` to communicate with Google Cloud Platform.
-
-
-Cloud Spanner
-'''''''''''''
-
-The operator is defined in the :class:`airflow.contrib.operators.gcp_spanner_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_spanner_hook.CloudSpannerHook` to communicate with Google Cloud Platform.
-
-
-Cloud SQL
-'''''''''
-
-The operator is defined in the :class:`airflow.contrib.operators.gcp_sql_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_sql_hook.CloudSqlDatabaseHook` and :class:`airflow.contrib.hooks.gcp_sql_hook.CloudSqlHook` to communicate with Google Cloud Platform.
-
-
-Cloud Bigtable
-''''''''''''''
-
-The operator is defined in the :class:`airflow.contrib.operators.gcp_bigtable_operator` package.
-
-
-They also use :class:`airflow.contrib.hooks.gcp_bigtable_hook.BigtableHook` to communicate with Google Cloud Platform.
-
-Cloud Build
-'''''''''''
-
-The operator is defined in the :class:`airflow.contrib.operators.gcp_cloud_build_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_cloud_build_hook.CloudBuildHook` to communicate with Google Cloud Platform.
-
-
-Compute Engine
-''''''''''''''
-
-The operators are defined in the :class:`airflow.contrib.operators.gcp_compute_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_compute_hook.GceHook` to communicate with Google Cloud Platform.
-
-
-Cloud Functions
-'''''''''''''''
-
-The operators are defined in the :class:`airflow.contrib.operators.gcp_function_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_function_hook.GcfHook` to communicate with Google Cloud Platform.
-
-
-Cloud DataFlow
-''''''''''''''
-
-The operators are defined in the :class:`airflow.contrib.operators.dataflow_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook` to communicate with Google Cloud Platform.
-
-
-Cloud DataProc
-''''''''''''''
-
-The operators are defined in the :class:`airflow.contrib.operators.dataproc_operator` package.
-
-
-Cloud Datastore
-'''''''''''''''
-
-:class:`airflow.contrib.operators.datastore_export_operator.DatastoreExportOperator`
-    Export entities from Google Cloud Datastore to Cloud Storage.
-
-:class:`airflow.contrib.operators.datastore_import_operator.DatastoreImportOperator`
-    Import entities from Cloud Storage to Google Cloud Datastore.
-
-They also use :class:`airflow.contrib.hooks.datastore_hook.DatastoreHook` to communicate with Google Cloud Platform.
-
-
-Cloud ML Engine
-'''''''''''''''
-
-:class:`airflow.contrib.operators.mlengine_operator.MLEngineBatchPredictionOperator`
-    Start a Cloud ML Engine batch prediction job.
-
-:class:`airflow.contrib.operators.mlengine_operator.MLEngineModelOperator`
-    Manages a Cloud ML Engine model.
-
-:class:`airflow.contrib.operators.mlengine_operator.MLEngineTrainingOperator`
-    Start a Cloud ML Engine training job.
-
-:class:`airflow.contrib.operators.mlengine_operator.MLEngineVersionOperator`
-    Manages a Cloud ML Engine model version.
-
-The operators are defined in the :class:`airflow.contrib.operators.mlengine_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHook` to communicate with Google Cloud Platform.
-
-Cloud Storage
-'''''''''''''
-
-The operators are defined in the following module:
-
- * :mod:`airflow.contrib.operators.file_to_gcs`
- * :mod:`airflow.contrib.operators.gcs_acl_operator`
- * :mod:`airflow.contrib.operators.gcs_download_operator`
- * :mod:`airflow.contrib.operators.gcs_list_operator`
- * :mod:`airflow.contrib.operators.gcs_operator`
- * :mod:`airflow.contrib.operators.gcs_to_bq`
- * :mod:`airflow.contrib.operators.gcs_to_gcs`
- * :mod:`airflow.contrib.operators.mysql_to_gcs`
- * :mod:`airflow.contrib.operators.mssql_to_gcs`
- * :mod:`airflow.contrib.sensors.gcs_sensor`
- * :mod:`airflow.contrib.operators.gcs_delete_operator`
-
-They also use :class:`airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook` to communicate with Google Cloud Platform.
-
-
-Transfer Service
-''''''''''''''''
-
-
-The operators are defined in the following module:
-
- * :mod:`airflow.contrib.operators.gcp_transfer_operator`
- * :mod:`airflow.contrib.sensors.gcp_transfer_operator`
-
-They also use :class:`airflow.contrib.hooks.gcp_transfer_hook.GCPTransferServiceHook` to communicate with Google Cloud Platform.
-
-
-Cloud Vision
-''''''''''''
-
-
-The operator is defined in the :class:`airflow.contrib.operators.gcp_vision_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook` to communicate with Google Cloud Platform.
-
-Cloud Text to Speech
-''''''''''''''''''''
-
-The operator is defined in the :class:`airflow.contrib.operators.gcp_text_to_speech_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_text_to_speech_hook.GCPTextToSpeechHook` to communicate with Google Cloud Platform.
-
-Cloud Speech to Text
-''''''''''''''''''''
-
-The operator is defined in the :class:`airflow.contrib.operators.gcp_speech_to_text_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_speech_to_text_hook.GCPSpeechToTextHook` to communicate with Google Cloud Platform.
-
-Cloud Speech Translate Operators
---------------------------------
-
-The operator is defined in the :class:`airflow.contrib.operators.gcp_translate_speech_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_speech_to_text_hook.GCPSpeechToTextHook` and
-    :class:`airflow.contrib.hooks.gcp_translate_hook.CloudTranslateHook` to communicate with Google Cloud Platform.
-
-Cloud Translate
-'''''''''''''''
-
-Cloud Translate Text Operators
-""""""""""""""""""""""""""""""
-
-:class:`airflow.contrib.operators.gcp_translate_operator.CloudTranslateTextOperator`
-    Translate a string or list of strings.
-
-The operator is defined in the :class:`airflow.contrib.operators.gcp_translate_operator` package.
-
-Cloud Video Intelligence
-''''''''''''''''''''''''
-
-The operators are defined in the :class:`airflow.contrib.operators.gcp_video_intelligence_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_video_intelligence_hook.CloudVideoIntelligenceHook` to communicate with Google Cloud Platform.
-
-Google Kubernetes Engine
-''''''''''''''''''''''''
-
-The operators are defined in the :class:`airflow.contrib.operators.gcp_container_operator` package.
-
-
-They also use :class:`airflow.contrib.hooks.gcp_container_hook.GKEClusterHook` to communicate with Google Cloud Platform.
-
-
-Google Natural Language
-'''''''''''''''''''''''
-
-The operators are defined in the :class:`airflow.contrib.operators.gcp_natural_language_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_natural_language_operator.CloudNaturalLanguageHook` to communicate with Google Cloud Platform.
-
-
-Google Cloud Data Loss Prevention (DLP)
-'''''''''''''''''''''''''''''''''''''''
-
-The operators are defined in the :class:`airflow.contrib.operators.gcp_dlp_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_dlp_hook.CloudDLPHook` to communicate with Google Cloud Platform.
-
-
-Google Cloud Tasks
-''''''''''''''''''
-
-The operators are defined in the :class:`airflow.contrib.operators.gcp_tasks_operator` package.
-
-They also use :class:`airflow.contrib.hooks.gcp_tasks_hook.CloudTasksHook` to communicate with Google Cloud Platform.
-
-
-.. _Qubole:
-
-Qubole
-------
-
-Apache Airflow has a native operator and hooks to talk to `Qubole <https://qubole.com/>`__,
-which lets you submit your big data jobs directly to Qubole from Apache Airflow.
-
-The operators are defined in the following module:
-
- * :mod:`airflow.contrib.operators.qubole_operator`
- * :mod:`airflow.contrib.sensors.qubole_sensor`
- * :mod:`airflow.contrib.sensors.qubole_sensor`
- * :mod:`airflow.contrib.operators.qubole_check_operator`
+Airflow has a mechanism that allows you to expand its functionality and integrate with other systems.
+
+* :doc:`Operators and hooks </operators-and-hooks-ref>`
+* :doc:`Executor </executor/index>`
+* :doc:`Plugins </plugins>`
+* :doc:`Metrics (statsd) </metrics>`
+* :doc:`Authentication backends </security>`
+* :doc:`Logging </howto/write-logs>`
+* :doc:`Trakcing systems </howto/tracking-user-activity>`
+
+It also has integration with :doc:`Sentry <errors>` service for error tracking. Other applications can also integrate using
+the :doc:`REST API <rest-api-ref>`.
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
new file mode 100644
index 0000000..6c80858
--- /dev/null
+++ b/docs/operators-and-hooks-ref.rst
@@ -0,0 +1,1234 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Operators and Hooks Reference
+=============================
+
+.. contents:: Content
+  :local:
+  :depth: 1
+
+.. _Apache:
+
+ASF: Apache Software Foundation
+-------------------------------
+
+Airflow supports various software created by `Apache Software Foundation <https://www.apache.org/foundation/>`__.
+
+Software operators and hooks
+''''''''''''''''''''''''''''
+
+These integrations allow you to perform various operations within software developed by Apache Software
+Foundation.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Service name
+     - Guides
+     - Hook
+     - Operators
+     - Sensors
+
+   * - `Apache Cassandra <http://cassandra.apache.org/>`__
+     -
+     - :mod:`airflow.contrib.hooks.cassandra_hook`
+     -
+     - :mod:`airflow.contrib.sensors.cassandra_record_sensor`,
+       :mod:`airflow.contrib.sensors.cassandra_table_sensor`
+
+   * - `Apache Druid <https://druid.apache.org/>`__
+     -
+     - :mod:`airflow.hooks.druid_hook`
+     - :mod:`airflow.contrib.operators.druid_operator`,
+       :mod:`airflow.operators.druid_check_operator`
+     -
+   * - `Hadoop Distributed File System (HDFS) <https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html>`__
+     -
+     - :mod:`airflow.hooks.hdfs_hook`
+     -
+     - :mod:`airflow.sensors.hdfs_sensor`,
+       :mod:`airflow.contrib.sensors.hdfs_sensor`
+
+   * - `Apache Hive <https://hive.apache.org/>`__
+     -
+     - :mod:`airflow.hooks.hive_hooks`
+     - :mod:`airflow.operators.hive_operator`,
+       :mod:`airflow.operators.hive_stats_operator`
+     - :mod:`airflow.sensors.named_hive_partition_sensor`,
+       :mod:`airflow.sensors.hive_partition_sensor`,
+       :mod:`airflow.sensors.metastore_partition_sensor`
+
+   * - `Apache Pig <https://pig.apache.org/>`__
+     -
+     - :mod:`airflow.hooks.pig_hook`
+     - :mod:`airflow.operators.pig_operator`
+     -
+
+   * - `Apache Pinot <https://pinot.apache.org/>`__
+     -
+     - :mod:`airflow.contrib.hooks.pinot_hook`
+     -
+     -
+
+   * - `Apache Spark <https://spark.apache.org/>`__
+     -
+     - :mod:`airflow.contrib.hooks.spark_jdbc_hook`,
+       :mod:`airflow.contrib.hooks.spark_jdbc_script`,
+       :mod:`airflow.contrib.hooks.spark_sql_hook`,
+       :mod:`airflow.contrib.hooks.spark_submit_hook`
+     - :mod:`airflow.contrib.operators.spark_jdbc_operator`,
+       :mod:`airflow.contrib.operators.spark_sql_operator`,
+       :mod:`airflow.contrib.operators.spark_submit_operator`
+     -
+
+   * - `Apache Sqoop <https://sqoop.apache.org/>`__
+     -
+     - :mod:`airflow.contrib.hooks.sqoop_hook`
+     - :mod:`airflow.contrib.operators.sqoop_operator`
+     -
+
+   * - `WebHDFS <https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html>`__
+     -
+     - :mod:`airflow.hooks.webhdfs_hook`
+     -
+     - :mod:`airflow.sensors.web_hdfs_sensor`
+
+
+Transfer operators and hooks
+''''''''''''''''''''''''''''
+
+These integrations allow you to copy data from/to software developed by Apache Software
+Foundation.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Source
+     - Destination
+     - Guide
+     - Operators
+
+   * - `Apache Hive <https://hive.apache.org/>`__
+     - `Amazon DynamoDB <https://aws.amazon.com/dynamodb/>`__
+     -
+     - :mod:`airflow.contrib.operators.hive_to_dynamodb`
+
+   * - `Apache Hive <https://hive.apache.org/>`__
+     - `Apache Druid <https://druid.apache.org/>`__
+     -
+     - :mod:`airflow.operators.hive_to_druid`
+
+   * - `Apache Hive <https://hive.apache.org/>`__
+     - `MySQL <https://www.mysql.com/>`__
+     -
+     - :mod:`airflow.operators.hive_to_mysql`
+
+   * - `Apache Hive <https://hive.apache.org/>`__
+     - `Samba <https://www.samba.org/>`__
+     -
+     - :mod:`airflow.operators.hive_to_samba_operator`
+
+   * - `Microsoft SQL Server (MSSQL) <https://www.microsoft.com/pl-pl/sql-server/sql-server-downloads>`__
+     - `Apache Hive <https://hive.apache.org/>`__
+     -
+     - :mod:`airflow.operators.mssql_to_hive`
+
+   * - `MySQL <https://www.mysql.com/>`__
+     - `Apache Hive <https://hive.apache.org/>`__
+     -
+     - :mod:`airflow.operators.mysql_to_hive`
+
+   * - `Vertica <https://www.vertica.com/>`__
+     - `Apache Hive <https://hive.apache.org/>`__
+     -
+     - :mod:`airflow.contrib.operators.vertica_to_hive`
+
+   * - `Apache Cassandra <http://cassandra.apache.org/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.cassandra_to_gcs`
+
+   * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
+     - `Apache Hive <https://hive.apache.org/>`__
+     -
+     - :mod:`airflow.operators.s3_to_hive_operator`
+
+   * - `Apache Hive <https://hive.apache.org/>`__
+     - `MySQL <https://www.mysql.com/>`__
+     -
+     - :mod:`airflow.operators.hive_to_mysql`
+
+.. _Azure:
+
+Azure: Microsoft Azure
+----------------------
+
+Airflow has limited support for `Microsoft Azure <https://azure.microsoft.com/>`__.
+
+Service operators and hooks
+'''''''''''''''''''''''''''
+
+These integrations allow you to perform various operations within the Microsoft Azure.
+
+
+.. list-table::
+   :header-rows: 1
+
+   * - Service name
+     - Hook
+     - Operators
+     - Sensors
+
+   * - `Azure Blob Storage <https://azure.microsoft.com/en-us/services/storage/blobs/>`__
+     - :mod:`airflow.contrib.hooks.wasb_hook`
+     - :mod:`airflow.contrib.operators.wasb_delete_blob_operator`
+     - :mod:`airflow.contrib.sensors.wasb_sensor`
+
+   * - `Azure Container Instances <https://azure.microsoft.com/en-us/services/container-instances/>`__
+     - :mod:`airflow.contrib.hooks.azure_container_instance_hook`,
+       :mod:`airflow.contrib.hooks.azure_container_registry_hook`,
+       :mod:`airflow.contrib.hooks.azure_container_volume_hook`
+     - :mod:`airflow.contrib.operators.azure_container_instances_operator`
+     -
+
+   * - `Azure Cosmos DB <https://azure.microsoft.com/en-us/services/cosmos-db/>`__
+     - :mod:`airflow.contrib.hooks.azure_cosmos_hook`
+     - :mod:`airflow.contrib.operators.azure_cosmos_operator`
+     - :mod:`airflow.contrib.sensors.azure_cosmos_sensor`
+
+   * - `Azure Data Lake Storage <https://azure.microsoft.com/en-us/services/storage/data-lake-storage/>`__
+     - :mod:`airflow.contrib.hooks.azure_data_lake_hook`
+     - :mod:`airflow.contrib.operators.adls_list_operator`
+     -
+
+   * - `Azure Files <https://azure.microsoft.com/en-us/services/storage/files/>`__
+     - :mod:`airflow.contrib.hooks.azure_fileshare_hook`
+     -
+     -
+
+
+Transfer operators and hooks
+""""""""""""""""""""""""""""
+
+These integrations allow you to copy data from/to Microsoft Azure.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Source
+     - Destination
+     - Guide
+     - Operators
+
+   * - `Azure Data Lake Storage <https://azure.microsoft.com/en-us/services/storage/data-lake-storage/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.adls_to_gcs`
+
+   * - Local
+     - `Azure Blob Storage <https://azure.microsoft.com/en-us/services/storage/blobs/>`__
+     -
+     - :mod:`airflow.contrib.operators.file_to_wasb`
+
+   * - `Oracle <https://www.oracle.com/pl/database/>`__
+     - `Azure Data Lake Storage <https://azure.microsoft.com/en-us/services/storage/data-lake-storage/>`__
+     -
+     - :mod:`airflow.contrib.operators.oracle_to_azure_data_lake_transfer`
+
+
+.. _AWS:
+
+AWS: Amazon Web Services
+------------------------
+
+Airflow has support for `Amazon Web Services <https://aws.amazon.com/>`__.
+
+All hooks are based on :mod:`airflow.contrib.hooks.aws_hook`.
+
+Service operators and hooks
+'''''''''''''''''''''''''''
+
+These integrations allow you to perform various operations within the Amazon Web Services.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Service name
+     - Hook
+     - Operators
+     - Sensors
+
+   * - `Amazon Athena <https://aws.amazon.com/athena/>`__
+     - :mod:`airflow.contrib.hooks.aws_athena_hook`
+     - :mod:`airflow.contrib.operators.aws_athena_operator`
+     - :mod:`airflow.contrib.sensors.aws_athena_sensor`
+
+   * - `AWS Batch <https://aws.amazon.com/athena/>`__
+     -
+     - :mod:`airflow.contrib.operators.awsbatch_operator`
+     -
+
+   * - `Amazon CloudWatch Logs <https://aws.amazon.com/cloudwatch/>`__
+     - :mod:`airflow.contrib.hooks.aws_logs_hook`
+     -
+     -
+
+   * - `Amazon DynamoDB <https://aws.amazon.com/dynamodb/>`__
+     - :mod:`airflow.contrib.hooks.aws_dynamodb_hook`
+     -
+     -
+
+   * - `Amazon EC2 <https://aws.amazon.com/ec2/>`__
+     -
+     - :mod:`airflow.contrib.operators.ecs_operator`
+     -
+
+   * - `Amazon EMR <https://aws.amazon.com/emr/>`__
+     - :mod:`airflow.contrib.hooks.emr_hook`
+     - :mod:`airflow.contrib.operators.emr_add_steps_operator`,
+       :mod:`airflow.contrib.operators.emr_create_job_flow_operator`,
+       :mod:`airflow.contrib.operators.emr_terminate_job_flow_operator`
+     - :mod:`airflow.contrib.sensors.emr_base_sensor`,
+       :mod:`airflow.contrib.sensors.emr_job_flow_sensor`,
+       :mod:`airflow.contrib.sensors.emr_step_sensor`
+
+   * - `AWS Glue Catalog <https://aws.amazon.com/glue/>`__
+     - :mod:`airflow.contrib.hooks.aws_glue_catalog_hook`
+     -
+     - :mod:`airflow.contrib.sensors.aws_glue_catalog_partition_sensor`
+
+   * - `Amazon Kinesis Data Firehose <https://aws.amazon.com/kinesis/data-firehose/>`__
+     - :mod:`airflow.contrib.hooks.aws_firehose_hook`
+     -
+     -
+
+   * - `AWS Lambda <https://aws.amazon.com/kinesis/>`__
+     - :mod:`airflow.contrib.hooks.aws_lambda_hook`
+     -
+     -
+
+   * - `Amazon Redshift <https://aws.amazon.com/redshift/>`__
+     - :mod:`airflow.contrib.hooks.redshift_hook`
+     -
+     - :mod:`airflow.contrib.sensors.aws_redshift_cluster_sensor`
+
+   * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
+     - :mod:`airflow.hooks.S3_hook`
+     - :mod:`airflow.operators.s3_file_transform_operator`,
+       :mod:`airflow.contrib.operators.s3_copy_object_operator`,
+       :mod:`airflow.contrib.operators.s3_delete_objects_operator`,
+       :mod:`airflow.contrib.operators.s3_list_operator`
+     - :mod:`airflow.sensors.s3_key_sensor`,
+       :mod:`airflow.sensors.s3_prefix_sensor`
+
+   * - `Amazon SageMaker <https://aws.amazon.com/sagemaker/>`__
+     - :mod:`airflow.contrib.hooks.sagemaker_hook`
+     - :mod:`airflow.contrib.operators.sagemaker_base_operator`,
+       :mod:`airflow.contrib.operators.sagemaker_endpoint_config_operator`,
+       :mod:`airflow.contrib.operators.sagemaker_endpoint_operator`,
+       :mod:`airflow.contrib.operators.sagemaker_model_operator`,
+       :mod:`airflow.contrib.operators.sagemaker_training_operator`,
+       :mod:`airflow.contrib.operators.sagemaker_transform_operator`,
+       :mod:`airflow.contrib.operators.sagemaker_tuning_operator`
+     - :mod:`airflow.contrib.sensors.sagemaker_base_sensor`,
+       :mod:`airflow.contrib.sensors.sagemaker_endpoint_sensor`,
+       :mod:`airflow.contrib.sensors.sagemaker_training_sensor`,
+       :mod:`airflow.contrib.sensors.sagemaker_transform_sensor`,
+       :mod:`airflow.contrib.sensors.sagemaker_tuning_sensor`
+
+   * - `Amazon Simple Notification Service (SNS) <https://aws.amazon.com/sns/>`__
+     - :mod:`airflow.contrib.hooks.aws_sns_hook`
+     - :mod:`airflow.contrib.operators.sns_publish_operator`
+     -
+
+   * - `Amazon Simple Queue Service (SQS) <https://aws.amazon.com/sns/>`__
+     - :mod:`airflow.contrib.hooks.aws_sqs_hook`
+     - :mod:`airflow.contrib.operators.aws_sqs_publish_operator`
+     - :mod:`airflow.contrib.sensors.aws_sqs_sensor`
+
+Transfer operators and hooks
+""""""""""""""""""""""""""""
+
+These integrations allow you to copy data from/to Amazon Web Services.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Source
+     - Destination
+     - Guide
+     - Operators
+
+   * -
+       .. _integration:AWS-Discovery-ref:
+
+       All GCP services :ref:`[1] <integration:GCP-Discovery>`
+     - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
+     -
+     - :mod:`airflow.operators.google_api_to_s3_transfer`
+
+   * - `Apache Hive <https://hive.apache.org/>`__
+     - `Amazon DynamoDB <https://aws.amazon.com/dynamodb/>`__
+     -
+     - :mod:`airflow.contrib.operators.hive_to_dynamodb`
+
+   * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     - :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     - :mod:`airflow.contrib.operators.s3_to_gcs_operator`,
+       :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
+
+   * - `Amazon Redshift <https://aws.amazon.com/redshift/>`__
+     - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
+     -
+     - :mod:`airflow.operators.redshift_to_s3_operator`
+
+   * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
+     - `Apache Hive <https://hive.apache.org/>`__
+     -
+     - :mod:`airflow.operators.s3_to_hive_operator`
+
+   * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
+     - `Amazon Redshift <https://aws.amazon.com/redshift/>`__
+     -
+     - :mod:`airflow.operators.s3_to_redshift_operator`
+
+   * - `Amazon DynamoDB <https://aws.amazon.com/dynamodb/>`__
+     - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
+     -
+     - :mod:`airflow.contrib.operators.dynamodb_to_s3`
+
+   * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
+     - `SSH File Transfer Protocol (SFTP) <https://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/>`__
+     -
+     - :mod:`airflow.contrib.operators.s3_to_sftp_operator`
+
+   * - `SSH File Transfer Protocol (SFTP) <https://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/>`__
+     - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
+     -
+     - :mod:`airflow.contrib.operators.sftp_to_s3_operator`
+
+   * - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
+     -
+     - :mod:`airflow.operators.gcs_to_s3`
+
+   * - `Internet Message Access Protocol (IMAP) <https://tools.ietf.org/html/rfc3501>`__
+     - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
+     -
+     - :mod:`airflow.contrib.operators.imap_attachment_to_s3_operator`
+
+:ref:`[1] <integration:AWS-Discovery-ref>` Those discovery-based operators use
+:class:`airflow.gcp.hooks.discovery_api.GoogleDiscoveryApiHook` to communicate with Google
+Services via the `Google API Python Client <https://github.com/googleapis/google-api-python-client>`__.
+Please note that this library is in maintenance mode hence it won't fully support GCP in the future.
+Therefore it is recommended that you use the custom GCP Service Operators for working with the Google
+Cloud Platform.
+
+.. _GCP:
+
+GCP: Google Cloud Platform
+--------------------------
+
+Airflow has extensive support for the `Google Cloud Platform <https://cloud.google.com/>`__.
+
+See the :doc:`GCP connection type <howto/connection/gcp>` documentation to
+configure connections to GCP.
+
+All hooks are based on :class:`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`.
+
+Service operators and hooks
+'''''''''''''''''''''''''''
+
+These integrations allow you to perform various operations within the Google Cloud Platform.
+
+..
+  PLEASE KEEP THE ALPHABETICAL ORDER OF THE LIST BELOW, BUT OMIT THE "Cloud" PREFIX
+
+.. list-table::
+   :header-rows: 1
+
+   * - Service name
+     - Guide
+     - Hook
+     - Operators
+     - Sensors
+
+   * - `AutoML <https://cloud.google.com/automl/>`__
+     - :doc:`How to use <howto/operator/gcp/automl>`
+     - :mod:`airflow.gcp.hooks.automl`
+     - :mod:`airflow.gcp.operators.automl`
+     -
+
+   * - `BigQuery <https://cloud.google.com/bigquery/>`__
+     -
+     - :mod:`airflow.gcp.hooks.bigquery`
+     - :mod:`airflow.gcp.operators.bigquery`
+     - :mod:`airflow.gcp.sensors.bigquery`
+
+   * - `BigQuery Data Transfer Service <https://cloud.google.com/bigquery/transfer/>`__
+     - :doc:`How to use <howto/operator/gcp/bigquery_dts>`
+     - :mod:`airflow.gcp.hooks.bigquery_dts`
+     - :mod:`airflow.gcp.operators.bigquery_dts`
+     - :mod:`airflow.gcp.sensors.bigquery_dts`
+
+   * - `Bigtable <https://cloud.google.com/bigtable/>`__
+     - :doc:`How to use <howto/operator/gcp/bigtable>`
+     - :mod:`airflow.gcp.hooks.bigtable`
+     - :mod:`airflow.gcp.operators.bigtable`
+     - :mod:`airflow.gcp.sensors.bigtable`
+
+   * - `Cloud Build <https://cloud.google.com/cloud-build/>`__
+     - :doc:`How to use <howto/operator/gcp/cloud_build>`
+     - :mod:`airflow.gcp.hooks.cloud_build`
+     - :mod:`airflow.gcp.operators.cloud_build`
+     -
+
+   * - `Compute Engine <https://cloud.google.com/compute/>`__
+     - :doc:`How to use <howto/operator/gcp/compute>`
+     - :mod:`airflow.gcp.hooks.compute`
+     - :mod:`airflow.gcp.operators.compute`
+     -
+
+   * - `Cloud Data Loss Prevention (DLP) <https://cloud.google.com/dlp/>`__
+     -
+     - :mod:`airflow.gcp.hooks.dlp`
+     - :mod:`airflow.gcp.operators.dlp`
+     -
+
+   * - `Dataflow <https://cloud.google.com/dataflow/>`__
+     -
+     - :mod:`airflow.gcp.hooks.dataflow`
+     - :mod:`airflow.gcp.operators.dataflow`
+     -
+
+   * - `Dataproc <https://cloud.google.com/dataproc/>`__
+     -
+     - :mod:`airflow.gcp.hooks.dataproc`
+     - :mod:`airflow.gcp.operators.dataproc`
+     -
+
+   * - `Datastore <https://cloud.google.com/datastore/>`__
+     -
+     - :mod:`airflow.gcp.hooks.datastore`
+     - :mod:`airflow.gcp.operators.datastore`
+     -
+
+   * - `Cloud Functions <https://cloud.google.com/functions/>`__
+     - :doc:`How to use <howto/operator/gcp/functions>`
+     - :mod:`airflow.gcp.hooks.functions`
+     - :mod:`airflow.gcp.operators.functions`
+     -
+
+   * - `Cloud Key Management Service (KMS) <https://cloud.google.com/kms/>`__
+     -
+     - :mod:`airflow.gcp.hooks.kms`
+     -
+     -
+
+   * - `Kubernetes Engine <https://cloud.google.com/kubernetes_engine/>`__
+     -
+     - :mod:`airflow.gcp.hooks.kubernetes_engine`
+     - :mod:`airflow.gcp.operators.kubernetes_engine`
+     -
+
+   * - `Machine Learning Engine <https://cloud.google.com/ml-engine/>`__
+     -
+     - :mod:`airflow.gcp.hooks.mlengine`
+     - :mod:`airflow.gcp.operators.mlengine`
+     -
+
+   * - `Cloud Memorystore <https://cloud.google.com/memorystore/>`__
+     - :doc:`How to use <howto/operator/gcp/cloud_memorystore>`
+     - :mod:`airflow.gcp.hooks.cloud_memorystore`
+     - :mod:`airflow.gcp.operators.cloud_memorystore`
+     -
+
+   * - `Natural Language <https://cloud.google.com/natural-language/>`__
+     - :doc:`How to use <howto/operator/gcp/natural_language>`
+     - :mod:`airflow.gcp.hooks.natural_language`
+     - :mod:`airflow.gcp.operators.natural_language`
+     -
+
+   * - `Cloud Pub/Sub <https://cloud.google.com/pubsub/>`__
+     -
+     - :mod:`airflow.gcp.hooks.pubsub`
+     - :mod:`airflow.gcp.operators.pubsub`
+     - :mod:`airflow.gcp.sensors.pubsub`
+
+   * - `Cloud Spanner <https://cloud.google.com/spanner/>`__
+     - :doc:`How to use <howto/operator/gcp/spanner>`
+     - :mod:`airflow.gcp.hooks.spanner`
+     - :mod:`airflow.gcp.operators.spanner`
+     -
+
+   * - `Cloud Speech-to-Text <https://cloud.google.com/speech-to-text/>`__
+     - :doc:`How to use <howto/operator/gcp/speech>`
+     - :mod:`airflow.gcp.hooks.speech_to_text`
+     - :mod:`airflow.gcp.operators.speech_to_text`
+     -
+
+   * - `Cloud SQL <https://cloud.google.com/sql/>`__
+     - :doc:`How to use <howto/operator/gcp/sql>`
+     - :mod:`airflow.gcp.hooks.cloud_sql`
+     - :mod:`airflow.gcp.operators.cloud_sql`
+     -
+
+   * - `Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     - :doc:`How to use <howto/operator/gcp/gcs>`
+     - :mod:`airflow.gcp.hooks.gcs`
+     - :mod:`airflow.gcp.operators.gcs`
+     - :mod:`airflow.gcp.sensors.gcs`
+
+   * - `Storage Transfer Service <https://cloud.google.com/storage/transfer/>`__
+     - :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     - :mod:`airflow.gcp.hooks.cloud_storage_transfer_service`
+     - :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
+     - :mod:`airflow.gcp.sensors.cloud_storage_transfer_service`
+
+   * - `Cloud Tasks <https://cloud.google.com/tasks/>`__
+     -
+     - :mod:`airflow.gcp.hooks.tasks`
+     - :mod:`airflow.gcp.operators.tasks`
+     -
+
+   * - `Cloud Text-to-Speech <https://cloud.google.com/text-to-speech/>`__
+     - :doc:`How to use <howto/operator/gcp/speech>`
+     - :mod:`airflow.gcp.hooks.text_to_speech`
+     - :mod:`airflow.gcp.operators.text_to_speech`
+     -
+
+   * - `Cloud Translation <https://cloud.google.com/translate/>`__
+     - :doc:`How to use <howto/operator/gcp/translate>`
+     - :mod:`airflow.gcp.hooks.translate`
+     - :mod:`airflow.gcp.operators.translate`
+     -
+
+   * - `Cloud Video Intelligence <https://cloud.google.com/video_intelligence/>`__
+     - :doc:`How to use <howto/operator/gcp/video_intelligence>`
+     - :mod:`airflow.gcp.hooks.video_intelligence`
+     - :mod:`airflow.gcp.operators.video_intelligence`
+     -
+
+   * - `Cloud Vision <https://cloud.google.com/vision/>`__
+     - :doc:`How to use <howto/operator/gcp/vision>`
+     - :mod:`airflow.gcp.hooks.vision`
+     - :mod:`airflow.gcp.operators.vision`
+     -
+
+
+Transfer operators and hooks
+''''''''''''''''''''''''''''
+
+These integrations allow you to copy data from/to Google Cloud Platform.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Source
+     - Destination
+     - Guide
+     - Operators
+
+   * -
+       .. _integration:GCP-Discovery-ref:
+
+       All services :ref:`[1] <integration:GCP-Discovery>`
+     - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
+     -
+     - :mod:`airflow.operators.google_api_to_s3_transfer`
+
+   * - `Azure Data Lake Storage <https://azure.microsoft.com/pl-pl/services/storage/data-lake-storage/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.adls_to_gcs`
+
+   * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     - :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     - :mod:`airflow.contrib.operators.s3_to_gcs_operator`,
+       :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
+
+   * - `Google BigQuery <https://cloud.google.com/bigquery/>`__
+     - `Google BigQuery <https://cloud.google.com/bigquery/>`__
+     -
+     - :mod:`airflow.operators.bigquery_to_bigquery`
+
+   * - `Google BigQuery <https://cloud.google.com/bigquery/>`__
+     - `Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.bigquery_to_gcs`
+
+   * - `BigQuery <https://cloud.google.com/bigquery/>`__
+     - `MySQL <https://www.mysql.com/>`__
+     -
+     - :mod:`airflow.operators.bigquery_to_mysql`
+
+   * - `Apache Cassandra <http://cassandra.apache.org/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.cassandra_to_gcs`
+
+   * - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     - `Google BigQuery <https://cloud.google.com/bigquery/>`__
+     -
+     - :mod:`airflow.operators.gcs_to_bq`
+
+   * - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     - :doc:`How to use <howto/operator/gcp/gcs_to_gcs>`,
+       :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     - :mod:`airflow.operators.gcs_to_gcs`,
+       :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
+
+   * - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
+     -
+     - :mod:`airflow.operators.gcs_to_s3`
+
+   * - Local
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.local_to_gcs`
+
+   * - `Microsoft SQL Server (MSSQL) <https://www.microsoft.com/pl-pl/sql-server/sql-server-downloads>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.mssql_to_gcs`
+
+   * - `MySQL <https://www.mysql.com/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.mysql_to_gcs`
+
+   * - `PostgresSQL <https://www.postgresql.org/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.postgres_to_gcs`
+
+   * - SQL
+     - `Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.sql_to_gcs`
+
+   * - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     - `Google Drive <https://www.google.com/drive/>`__
+     -
+     - :mod:`airflow.contrib.operators.gcs_to_gdrive_operator`
+
+
+.. _integration:GCP-Discovery:
+
+:ref:`[1] <integration:GCP-Discovery-ref>` Those discovery-based operators use
+:class:`airflow.gcp.hooks.discovery_api.GoogleDiscoveryApiHook` to communicate with Google
+Services via the `Google API Python Client <https://github.com/googleapis/google-api-python-client>`__.
+Please note that this library is in maintenance mode hence it won't fully support GCP in the future.
+Therefore it is recommended that you use the custom GCP Service Operators for working with the Google
+Cloud Platform.
+
+.. note::
+    You can learn how to use GCP integrations by analyzing the
+    `source code <https://github.com/apache/airflow/tree/master/airflow/gcp/example_dags/>`_ of the particular example DAGs.
+
+Other operators and hooks
+'''''''''''''''''''''''''
+
+.. list-table::
+   :header-rows: 1
+
+   * - Guide
+     - Operators
+     - Hooks
+
+   * - :doc:`How to use <howto/operator/gcp/translate-speech>`
+     - :mod:`airflow.gcp.operators.translate_speech`
+     -
+
+   * -
+     -
+     - :mod:`airflow.gcp.hooks.discovery_api`
+
+.. _service:
+
+Service integrations
+--------------------
+
+Service operators and hooks
+'''''''''''''''''''''''''''
+
+These integrations allow you to perform various operations within various services.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Service name
+     - Guide
+     - Hook
+     - Operators
+     - Sensors
+
+   * - `Atlassian Jira <https://www.atlassian.com/pl/software/jira>`__
+     -
+     - :mod:`airflow.contrib.hooks.jira_hook`
+     - :mod:`airflow.contrib.operators.jira_operator`
+     - :mod:`airflow.contrib.sensors.jira_sensor`
+
+   * - `Databricks <https://databricks.com/>`__
+     -
+     - :mod:`airflow.contrib.hooks.databricks_hook`
+     - :mod:`airflow.contrib.operators.databricks_operator`
+     -
+
+   * - `Datadog <https://www.datadoghq.com/>`__
+     -
+     - :mod:`airflow.contrib.hooks.datadog_hook`
+     -
+     - :mod:`airflow.contrib.sensors.datadog_sensor`
+
+
+   * - `Dingding <https://oapi.dingtalk.com>`__
+     - :doc:`How to use <howto/operator/dingding>`
+     - :mod:`airflow.contrib.hooks.dingding_hook`
+     - :mod:`airflow.contrib.operators.dingding_operator`
+     -
+
+   * - `Discord <https://discordapp.com>`__
+     -
+     - :mod:`airflow.contrib.hooks.discord_webhook_hook`
+     - :mod:`airflow.contrib.operators.discord_webhook_operator`
+     -
+
+   * - `Google Drive <https://www.google.com/drive/>`__
+     -
+     - :mod:`airflow.contrib.hooks.gdrive_hook`
+     -
+     -
+
+   * - `Google Spreadsheet <https://www.google.com/intl/en/sheets/about/>`__
+     -
+     - :mod:`airflow.gcp.hooks.gsheets`
+     -
+     -
+
+   * - `IBM Cloudant <https://www.ibm.com/cloud/cloudant>`__
+     -
+     - :mod:`airflow.contrib.hooks.cloudant_hook`
+     -
+     -
+
+   * - `Jenkins <https://jenkins.io/>`__
+     -
+     - :mod:`airflow.contrib.hooks.jenkins_hook`
+     - :mod:`airflow.contrib.operators.jenkins_job_trigger_operator`
+     -
+
+   * - `Opsgenie <https://www.opsgenie.com/>`__
+     -
+     - :mod:`airflow.contrib.hooks.opsgenie_alert_hook`
+     - :mod:`airflow.contrib.operators.opsgenie_alert_operator`
+     -
+
+   * - `Qubole <https://www.qubole.com/>`__
+     -
+     - :mod:`airflow.contrib.hooks.qubole_hook`,
+       :mod:`airflow.contrib.hooks.qubole_check_hook`
+     - :mod:`airflow.contrib.operators.qubole_operator`,
+       :mod:`airflow.contrib.operators.qubole_check_operator`
+     - :mod:`airflow.contrib.sensors.qubole_sensor`
+
+   * - `Salesforce <https://www.salesforce.com/>`__
+     -
+     - :mod:`airflow.contrib.hooks.salesforce_hook`
+     -
+     -
+
+   * - `Segment <https://oapi.dingtalk.com>`__
+     -
+     - :mod:`airflow.contrib.hooks.segment_hook`
+     - :mod:`airflow.contrib.operators.segment_track_event_operator`
+     -
+
+   * - `Slack <https://slack.com/>`__
+     -
+     - :mod:`airflow.hooks.slack_hook`,
+       :mod:`airflow.contrib.hooks.slack_webhook_hook`
+     - :mod:`airflow.operators.slack_operator`,
+       :mod:`airflow.contrib.operators.slack_webhook_operator`
+     -
+
+   * - `Snowflake <https://www.snowflake.com/>`__
+     -
+     - :mod:`airflow.contrib.hooks.snowflake_hook`
+     - :mod:`airflow.contrib.operators.snowflake_operator`
+     -
+
+   * - `Vertica <https://www.vertica.com/>`__
+     -
+     - :mod:`airflow.contrib.hooks.vertica_hook`
+     - :mod:`airflow.contrib.operators.vertica_operator`
+     -
+
+   * - `Zendesk <https://www.zendesk.com/>`__
+     -
+     - :mod:`airflow.hooks.zendesk_hook`
+     -
+     -
+
+Transfer operators and hooks
+''''''''''''''''''''''''''''
+
+These integrations allow you to perform various operations within various services.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Source
+     - Destination
+     - Guide
+     - Operators
+
+   * - `Vertica <https://www.vertica.com/>`__
+     - `MySQL <https://www.mysql.com/>`__
+     -
+     - :mod:`airflow.contrib.operators.vertica_to_mysql`
+
+   * - `Vertica <https://www.vertica.com/>`__
+     - `Apache Hive <https://hive.apache.org/>`__
+     -
+     - :mod:`airflow.contrib.operators.vertica_to_hive`
+
+   * - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     - `Google Drive <https://www.google.com/drive/>`__
+     -
+     - :mod:`airflow.contrib.operators.gcs_to_gdrive_operator`
+
+.. _software:
+
+Software integrations
+---------------------
+
+Software operators and hooks
+''''''''''''''''''''''''''''
+
+These integrations allow you to perform various operations using various software.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Service name
+     - Guide
+     - Hook
+     - Operators
+     - Sensors
+
+   * - `Celery <http://www.celeryproject.org/>`__
+     -
+     -
+     -
+     - :mod:`airflow.contrib.sensors.celery_queue_sensor`
+
+   * - `Docker <https://docs.docker.com/install/>`__
+     -
+     - :mod:`airflow.hooks.docker_hook`
+     - :mod:`airflow.operators.docker_operator`,
+       :mod:`airflow.contrib.operators.docker_swarm_operator`
+     -
+
+   * - `GNU Bash <https://www.gnu.org/software/bash/>`__
+     - :doc:`How to use <howto/operator/bash>`
+     -
+     - :mod:`airflow.operators.bash_operator`
+     - :mod:`airflow.contrib.sensors.bash_sensor`
+
+   * - `Kubernetes <https://kubernetes.io/>`__
+     - :doc:`How to use <howto/operator/kubernetes>`
+     -
+     - :mod:`airflow.contrib.operators.kubernetes_pod_operator`
+     -
+
+   * - `Microsoft SQL Server (MSSQL) <https://www.microsoft.com/pl-pl/sql-server/sql-server-downloads>`__
+     -
+     - :mod:`airflow.hooks.mssql_hook`
+     - :mod:`airflow.operators.mssql_operator`
+     -
+
+   * - `MongoDB <https://www.mongodb.com/what-is-mongodb>`__
+     -
+     - :mod:`airflow.contrib.hooks.mongo_hook`
+     -
+     - :mod:`airflow.contrib.sensors.mongo_sensor`
+
+
+   * - `MySQL <https://www.mysql.com/products/>`__
+     -
+     - :mod:`airflow.hooks.mysql_hook`
+     - :mod:`airflow.operators.mysql_operator`
+     -
+
+   * - `OpenFaaS <https://www.openfaas.com/>`__
+     -
+     - :mod:`airflow.contrib.hooks.openfaas_hook`
+     -
+     -
+
+   * - `Oracle <https://www.oracle.com/pl/database/>`__
+     -
+     - :mod:`airflow.hooks.oracle_hook`
+     - :mod:`airflow.operators.oracle_operator`
+     -
+
+   * - `Papermill <https://github.com/nteract/papermill>`__
+     - :doc:`How to use <howto/operator/papermill>`
+     -
+     - :mod:`airflow.operators.papermill_operator`
+     -
+
+   * - `PostgresSQL <https://www.postgresql.org/>`__
+     -
+     - :mod:`airflow.hooks.postgres_hook`
+     - :mod:`airflow.operators.postgres_operator`
+     -
+
+   * - `Presto <http://prestodb.github.io/>`__
+     -
+     - :mod:`airflow.hooks.presto_hook`
+     - :mod:`airflow.operators.presto_check_operator`
+     -
+
+   * - `Python <https://www.python.org>`__
+     -
+     -
+     - :mod:`airflow.operators.python_operator`
+     - :mod:`airflow.contrib.sensors.python_sensor`
+
+   * - `Redis <https://redis.io/>`__
+     -
+     - :mod:`airflow.contrib.hooks.redis_hook`
+     - :mod:`airflow.contrib.operators.redis_publish_operator`
+     - :mod:`airflow.contrib.sensors.redis_pub_sub_sensor`,
+       :mod:`airflow.contrib.sensors.redis_key_sensor`
+
+   * - `Samba <https://www.samba.org/>`__
+     -
+     - :mod:`airflow.hooks.samba_hook`
+     -
+     -
+
+   * - `SQLite <https://www.sqlite.org/index.html>`__
+     -
+     - :mod:`airflow.hooks.sqlite_hook`
+     - :mod:`airflow.operators.sqlite_operator`
+     -
+
+
+Transfer operators and hooks
+''''''''''''''''''''''''''''
+
+These integrations allow you to copy data.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Source
+     - Destination
+     - Guide
+     - Operators
+
+   * - `Oracle <https://www.oracle.com/pl/database/>`__
+     - `Azure Data Lake Storage <https://azure.microsoft.com/en-us/services/storage/data-lake-storage/>`__
+     -
+     - :mod:`airflow.contrib.operators.oracle_to_azure_data_lake_transfer`
+
+   * - `Oracle <https://www.oracle.com/pl/database/>`__
+     - `Oracle <https://www.oracle.com/pl/database/>`__
+     -
+     - :mod:`airflow.contrib.operators.oracle_to_oracle_transfer`
+
+   * - `BigQuery <https://cloud.google.com/bigquery/>`__
+     - `MySQL <https://www.mysql.com/>`__
+     -
+     - :mod:`airflow.operators.bigquery_to_mysql`
+
+   * - `Microsoft SQL Server (MSSQL) <https://www.microsoft.com/pl-pl/sql-server/sql-server-downloads>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.mssql_to_gcs`
+
+   * - `Microsoft SQL Server (MSSQL) <https://www.microsoft.com/pl-pl/sql-server/sql-server-downloads>`__
+     - `Apache Hive <https://hive.apache.org/>`__
+     -
+     - :mod:`airflow.operators.mssql_to_hive`
+
+   * - `MySQL <https://www.mysql.com/>`__
+     - `Apache Hive <https://hive.apache.org/>`__
+     -
+     - :mod:`airflow.operators.mysql_to_hive`
+
+   * - `MySQL <https://www.mysql.com/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.mysql_to_gcs`
+
+   * - `PostgresSQL <https://www.postgresql.org/>`__
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.postgres_to_gcs`
+
+   * - SQL
+     - `Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.sql_to_gcs`
+
+   * - `Vertica <https://www.vertica.com/>`__
+     - `Apache Hive <https://hive.apache.org/>`__
+     -
+     - :mod:`airflow.contrib.operators.vertica_to_hive`
+
+   * - `Vertica <https://www.vertica.com/>`__
+     - `MySQL <https://www.mysql.com/>`__
+     -
+     - :mod:`airflow.contrib.operators.vertica_to_mysql`
+
+   * - `Presto <https://prestodb.github.io/>`__
+     - `MySQL <https://www.mysql.com/>`__
+     -
+     - :mod:`airflow.operators.presto_to_mysql`
+
+   * - `Apache Hive <https://hive.apache.org/>`__
+     - `Samba <https://www.samba.org/>`__
+     -
+     - :mod:`airflow.operators.hive_to_samba_operator`
+
+
+.. _protocol:
+
+Protocol integrations
+---------------------
+
+Protocol operators and hooks
+''''''''''''''''''''''''''''
+
+These integrations allow you to perform various operations within various services using standardized
+communication protocols or interface.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Service name
+     - Guide
+     - Hook
+     - Operators
+     - Sensors
+
+   * - `Internet Message Access Protocol (IMAP) <https://tools.ietf.org/html/rfc3501>`__
+     -
+     - :mod:`airflow.contrib.hooks.imap_hook`
+     -
+     - :mod:`airflow.contrib.sensors.imap_attachment_sensor`
+
+   * - `Secure Shell (SSH) <https://tools.ietf.org/html/rfc4251>`__
+     -
+     - :mod:`airflow.contrib.hooks.ssh_hook`
+     - :mod:`airflow.contrib.operators.ssh_operator`
+     -
+
+   * - Filesystem
+     -
+     - :mod:`airflow.contrib.hooks.fs_hook`
+     -
+     - :mod:`airflow.contrib.sensors.file_sensor`
+
+   * - `SSH File Transfer Protocol (SFTP) <https://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/>`__
+     -
+     - :mod:`airflow.contrib.hooks.sftp_hook`
+     - :mod:`airflow.contrib.operators.sftp_operator`
+     - :mod:`airflow.contrib.sensors.sftp_sensor`
+
+   * - `File Transfer Protocol (FTP) <https://tools.ietf.org/html/rfc114>`__
+     -
+     - :mod:`airflow.contrib.hooks.ftp_hook`
+     -
+     - :mod:`airflow.contrib.sensors.ftp_sensor`
+
+   * - `Hypertext Transfer Protocol (HTTP) <https://www.w3.org/Protocols/>`__
+     -
+     - :mod:`airflow.hooks.http_hook`
+     - :mod:`airflow.operators.http_operator`
+     - :mod:`airflow.sensors.http_sensor`
+
+   * - `gRPC <https://grpc.io/>`__
+     -
+     - :mod:`airflow.contrib.hooks.grpc_hook`
+     - :mod:`airflow.contrib.operators.grpc_operator`
+     -
+
+   * - `Simple Mail Transfer Protocol (SMTP) <https://tools.ietf.org/html/rfc821>`__
+     -
+     -
+     - :mod:`airflow.operators.email_operator`
+     -
+
+   * - `Java Database Connectivity (JDBC) <https://docs.oracle.com/javase/8/docs/technotes/guides/jdbc/>`__
+     -
+     - :mod:`airflow.hooks.jdbc_hook`
+     - :mod:`airflow.operators.jdbc_operator`
+     -
+
+   * - `Windows Remote Management (WinRM) <https://docs.microsoft.com/en-gb/windows/win32/winrm/portal>`__
+     -
+     - :mod:`airflow.contrib.hooks.winrm_hook`
+     - :mod:`airflow.contrib.operators.winrm_operator`
+     -
+
+Transfer operators and hooks
+""""""""""""""""""""""""""""
+
+These integrations allow you to copy data.
+
+.. list-table::
+   :header-rows: 1
+
+   * - Source
+     - Destination
+     - Guide
+     - Operators
+
+   * - Filesystem
+     - `Azure Blob Storage <https://azure.microsoft.com/en-us/services/storage/blobs/>`__
+     -
+     - :mod:`airflow.contrib.operators.file_to_wasb`
+
+   * - Filesystem
+     - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
+     -
+     - :mod:`airflow.operators.local_to_gcs`
+
+   * - `Internet Message Access Protocol (IMAP) <https://tools.ietf.org/html/rfc3501>`__
+     - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
+     -
+     - :mod:`airflow.contrib.operators.imap_attachment_to_s3_operator`
+
+   * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
+     - `SSH File Transfer Protocol (SFTP) <https://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/>`__
+     -
+     - :mod:`airflow.contrib.operators.s3_to_sftp_operator`
+
+   * - `SSH File Transfer Protocol (SFTP) <https://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/>`__
+     - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
+     -
+     - :mod:`airflow.contrib.operators.sftp_to_s3_operator`


[airflow] 03/07: [AIRFLOW-4472] Use json.dumps/loads for templating lineage data (#5253)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 82b764440d18b6591c96ee331f542e5d5320913a
Author: bolkedebruin <bo...@users.noreply.github.com>
AuthorDate: Tue May 7 22:42:28 2019 +0200

    [AIRFLOW-4472] Use json.dumps/loads for templating lineage data (#5253)
    
    jinja2 cannot use dict/lists as templates hence converting
    it to json solves this while keeping complexity down.
    
    (cherry picked from commit a6daeb544e815fe350a96d24ae3bb14aee4079a7)
---
 airflow/lineage/datasets.py    | 11 +++++++++--
 airflow/models/baseoperator.py |  4 ++--
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/airflow/lineage/datasets.py b/airflow/lineage/datasets.py
index 2602770..3d61e5d 100644
--- a/airflow/lineage/datasets.py
+++ b/airflow/lineage/datasets.py
@@ -16,6 +16,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import json
 import six
 
 from typing import List
@@ -62,7 +63,11 @@ class DataSet(object):
         if attr in self.attributes:
             if self.context:
                 env = Environment()
-                return env.from_string(self._data.get(attr)).render(**self.context)
+                # dump to json here in order to be able to manage dicts and lists
+                rendered = env.from_string(
+                    json.dumps(self._data.get(attr))
+                ).render(**self.context)
+                return json.loads(rendered)
 
             return self._data.get(attr)
 
@@ -82,7 +87,9 @@ class DataSet(object):
         env = Environment()
         if self.context:
             for key, value in six.iteritems(attributes):
-                attributes[key] = env.from_string(value).render(**self.context)
+                attributes[key] = json.loads(
+                    env.from_string(json.dumps(value)).render(**self.context)
+                )
 
         d = {
             "typeName": self.type_name,
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index b61c980..52037c5 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -428,8 +428,8 @@ class BaseOperator(LoggingMixin):
         self._log = logging.getLogger("airflow.task.operators")
 
         # lineage
-        self.inlets = []  # type: Iterable[DataSet]
-        self.outlets = []  # type: Iterable[DataSet]
+        self.inlets = []   # type: List[DataSet]
+        self.outlets = []  # type: List[DataSet]
         self.lineage_data = None
 
         self._inlets = {


[airflow] 04/07: Parameterized bash/python in the prod image (#9157)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 57225c6fdd49407799663364daa6dad0e6eeb12a
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Sat Jun 6 09:48:02 2020 +0200

    Parameterized bash/python in the prod image (#9157)
    
    (cherry picked from commit b809afa3e33ec5cc76f846f525ab8d6bdcf9d1f0)
---
 entrypoint.sh | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/entrypoint.sh b/entrypoint.sh
index 3d436e2..220e9b7 100755
--- a/entrypoint.sh
+++ b/entrypoint.sh
@@ -102,8 +102,12 @@ if [[ -n ${AIRFLOW__CELERY__BROKER_URL} ]] && \
     verify_db_connection "${AIRFLOW__CELERY__BROKER_URL}"
 fi
 
-if [[ ${AIRFLOW_COMMAND} == "" ]]; then
-   exec "/bin/bash"
+if [[ ${AIRFLOW_COMMAND} == "bash" ]]; then
+   shift
+   exec "/bin/bash" "${@}"
+elif [[ ${AIRFLOW_COMMAND} == "python" ]]; then
+   shift
+   exec "python" "${@}"
 fi
 
 # Run the command


[airflow] 07/07: Merging multiple sql operators (#9124)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0ac4185ca564b0e1a741069bfd670c551efefe31
Author: samuelkhtu <46...@users.noreply.github.com>
AuthorDate: Wed Jun 17 14:32:46 2020 -0400

    Merging multiple sql operators (#9124)
    
    * Merge various SQL Operators into sql.py
    
    * Fix unit test code format
    
    * Merge multiple SQL operators into one
    
    1. Merge check_operator.py into airflow.operators.sql
    2. Merge sql_branch_operator.py into airflow.operators.sql
    3. Merge unit test for both into test_sql.py
    
    * Rename test_core_to_contrib Interval/ValueCheckOperator to SQLInterval/ValueCheckOperator
    
    * Fixed deprecated class and added check to test_core_to_contrib
    
    (cherry picked from commit 0b9bf4a285a074bbde270839a90fb53c257340be)
---
 ...eea_add_precision_to_execution_date_in_mysql.py |   2 +-
 airflow/operators/check_operator.py                | 425 +++------------------
 airflow/operators/{check_operator.py => sql.py}    | 420 +++++++++++++++-----
 airflow/operators/sql_branch_operator.py           | 162 +-------
 docs/operators-and-hooks-ref.rst                   |  30 +-
 tests/api/common/experimental/test_pool.py         |   4 +-
 tests/contrib/hooks/test_gcp_api_base_hook.py      |   2 +-
 tests/contrib/hooks/test_gcp_cloud_build_hook.py   |   4 +-
 tests/contrib/hooks/test_gcp_transfer_hook.py      |   4 +-
 .../operators/test_gcp_cloud_build_operator.py     |   8 +-
 tests/contrib/operators/test_gcs_to_gdrive.py      |   2 +-
 tests/contrib/operators/test_sftp_operator.py      |   6 +-
 tests/contrib/operators/test_ssh_operator.py       |   6 +-
 tests/contrib/secrets/test_hashicorp_vault.py      |   4 +-
 .../contrib/utils/test_gcp_credentials_provider.py |   2 +-
 tests/operators/test_check_operator.py             | 327 ----------------
 .../{test_sql_branch_operator.py => test_sql.py}   | 339 ++++++++++++++--
 tests/www_rbac/test_validators.py                  |   4 +-
 18 files changed, 707 insertions(+), 1044 deletions(-)

diff --git a/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
index ecb589d..59098a8 100644
--- a/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
+++ b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
@@ -29,7 +29,7 @@ from sqlalchemy.dialects import mysql
 
 # revision identifiers, used by Alembic.
 revision = 'a66efa278eea'
-down_revision = '8f966b9c467a'
+down_revision = '952da73b5eff'
 branch_labels = None
 depends_on = None
 
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index b6d3a18..12ac472 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -17,409 +17,70 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from builtins import str, zip
-from typing import Optional, Any, Iterable, Dict, SupportsAbs
+"""This module is deprecated. Please use `airflow.operators.sql`."""
 
-from airflow.exceptions import AirflowException
-from airflow.hooks.base_hook import BaseHook
-from airflow.models import BaseOperator
-from airflow.utils.decorators import apply_defaults
+import warnings
 
+from airflow.operators.sql import (
+    SQLCheckOperator, SQLIntervalCheckOperator, SQLThresholdCheckOperator, SQLValueCheckOperator,
+)
 
-class CheckOperator(BaseOperator):
-    """
-    Performs checks against a db. The ``CheckOperator`` expects
-    a sql query that will return a single row. Each value on that
-    first row is evaluated using python ``bool`` casting. If any of the
-    values return ``False`` the check is failed and errors out.
-
-    Note that Python bool casting evals the following as ``False``:
-
-    * ``False``
-    * ``0``
-    * Empty string (``""``)
-    * Empty list (``[]``)
-    * Empty dictionary or set (``{}``)
-
-    Given a query like ``SELECT COUNT(*) FROM foo``, it will fail only if
-    the count ``== 0``. You can craft much more complex query that could,
-    for instance, check that the table has the same number of rows as
-    the source table upstream, or that the count of today's partition is
-    greater than yesterday's partition, or that a set of metrics are less
-    than 3 standard deviation for the 7 day average.
-
-    This operator can be used as a data quality check in your pipeline, and
-    depending on where you put it in your DAG, you have the choice to
-    stop the critical path, preventing from
-    publishing dubious data, or on the side and receive email alerts
-    without stopping the progress of the DAG.
 
-    Note that this is an abstract class and get_db_hook
-    needs to be defined. Whereas a get_db_hook is hook that gets a
-    single record from an external source.
-
-    :param sql: the sql to be executed. (templated)
-    :type sql: str
+class CheckOperator(SQLCheckOperator):
     """
-
-    template_fields = ('sql',)  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
-
-    @apply_defaults
-    def __init__(
-        self,
-        sql,  # type: str
-        conn_id=None,  # type: Optional[str]
-        *args,
-        **kwargs
-    ):
-        super(CheckOperator, self).__init__(*args, **kwargs)
-        self.conn_id = conn_id
-        self.sql = sql
-
-    def execute(self, context=None):
-        self.log.info('Executing SQL check: %s', self.sql)
-        records = self.get_db_hook().get_first(self.sql)
-
-        self.log.info('Record: %s', records)
-        if not records:
-            raise AirflowException("The query returned None")
-        elif not all([bool(r) for r in records]):
-            raise AirflowException("Test failed.\nQuery:\n{query}\nResults:\n{records!s}".format(
-                query=self.sql, records=records))
-
-        self.log.info("Success.")
-
-    def get_db_hook(self):
-        return BaseHook.get_hook(conn_id=self.conn_id)
-
-
-def _convert_to_float_if_possible(s):
+    This class is deprecated.
+    Please use `airflow.operators.sql.SQLCheckOperator`.
     """
-    A small helper function to convert a string to a numeric value
-    if appropriate
 
-    :param s: the string to be converted
-    :type s: str
-    """
-    try:
-        ret = float(s)
-    except (ValueError, TypeError):
-        ret = s
-    return ret
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.operators.sql.SQLCheckOperator`.""",
+            DeprecationWarning, stacklevel=2
+        )
+        super(CheckOperator, self).__init__(*args, **kwargs)
 
 
-class ValueCheckOperator(BaseOperator):
+class IntervalCheckOperator(SQLIntervalCheckOperator):
     """
-    Performs a simple value check using sql code.
-
-    Note that this is an abstract class and get_db_hook
-    needs to be defined. Whereas a get_db_hook is hook that gets a
-    single record from an external source.
-
-    :param sql: the sql to be executed. (templated)
-    :type sql: str
+    This class is deprecated.
+    Please use `airflow.operators.sql.SQLIntervalCheckOperator`.
     """
 
-    __mapper_args__ = {
-        'polymorphic_identity': 'ValueCheckOperator'
-    }
-    template_fields = ('sql', 'pass_value',)  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
-
-    @apply_defaults
-    def __init__(
-        self,
-        sql,  # type: str
-        pass_value,  # type: Any
-        tolerance=None,  # type: Any
-        conn_id=None,  # type: Optional[str]
-        *args,
-        **kwargs
-    ):
-        super(ValueCheckOperator, self).__init__(*args, **kwargs)
-        self.sql = sql
-        self.conn_id = conn_id
-        self.pass_value = str(pass_value)
-        tol = _convert_to_float_if_possible(tolerance)
-        self.tol = tol if isinstance(tol, float) else None
-        self.has_tolerance = self.tol is not None
-
-    def execute(self, context=None):
-        self.log.info('Executing SQL check: %s', self.sql)
-        records = self.get_db_hook().get_first(self.sql)
-
-        if not records:
-            raise AirflowException("The query returned None")
-
-        pass_value_conv = _convert_to_float_if_possible(self.pass_value)
-        is_numeric_value_check = isinstance(pass_value_conv, float)
-
-        tolerance_pct_str = str(self.tol * 100) + '%' if self.has_tolerance else None
-        error_msg = ("Test failed.\nPass value:{pass_value_conv}\n"
-                     "Tolerance:{tolerance_pct_str}\n"
-                     "Query:\n{sql}\nResults:\n{records!s}").format(
-            pass_value_conv=pass_value_conv,
-            tolerance_pct_str=tolerance_pct_str,
-            sql=self.sql,
-            records=records
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.operators.sql.SQLIntervalCheckOperator`.""",
+            DeprecationWarning, stacklevel=2
         )
-
-        if not is_numeric_value_check:
-            tests = self._get_string_matches(records, pass_value_conv)
-        elif is_numeric_value_check:
-            try:
-                numeric_records = self._to_float(records)
-            except (ValueError, TypeError):
-                raise AirflowException("Converting a result to float failed.\n{}".format(error_msg))
-            tests = self._get_numeric_matches(numeric_records, pass_value_conv)
-        else:
-            tests = []
-
-        if not all(tests):
-            raise AirflowException(error_msg)
-
-    def _to_float(self, records):
-        return [float(record) for record in records]
-
-    def _get_string_matches(self, records, pass_value_conv):
-        return [str(record) == pass_value_conv for record in records]
-
-    def _get_numeric_matches(self, numeric_records, numeric_pass_value_conv):
-        if self.has_tolerance:
-            return [
-                numeric_pass_value_conv * (1 - self.tol) <= record <= numeric_pass_value_conv * (1 + self.tol)
-                for record in numeric_records
-            ]
-
-        return [record == numeric_pass_value_conv for record in numeric_records]
-
-    def get_db_hook(self):
-        return BaseHook.get_hook(conn_id=self.conn_id)
+        super(IntervalCheckOperator, self).__init__(*args, **kwargs)
 
 
-class IntervalCheckOperator(BaseOperator):
+class ThresholdCheckOperator(SQLThresholdCheckOperator):
     """
-    Checks that the values of metrics given as SQL expressions are within
-    a certain tolerance of the ones from days_back before.
-
-    Note that this is an abstract class and get_db_hook
-    needs to be defined. Whereas a get_db_hook is hook that gets a
-    single record from an external source.
-
-    :param table: the table name
-    :type table: str
-    :param days_back: number of days between ds and the ds we want to check
-        against. Defaults to 7 days
-    :type days_back: int
-    :param ratio_formula: which formula to use to compute the ratio between
-        the two metrics. Assuming cur is the metric of today and ref is
-        the metric to today - days_back.
-
-        max_over_min: computes max(cur, ref) / min(cur, ref)
-        relative_diff: computes abs(cur-ref) / ref
-
-        Default: 'max_over_min'
-    :type ratio_formula: str
-    :param ignore_zero: whether we should ignore zero metrics
-    :type ignore_zero: bool
-    :param metrics_threshold: a dictionary of ratios indexed by metrics
-    :type metrics_threshold: dict
+    This class is deprecated.
+    Please use `airflow.operators.sql.SQLThresholdCheckOperator`.
     """
 
-    __mapper_args__ = {
-        'polymorphic_identity': 'IntervalCheckOperator'
-    }
-    template_fields = ('sql1', 'sql2')  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
-
-    ratio_formulas = {
-        'max_over_min': lambda cur, ref: float(max(cur, ref)) / min(cur, ref),
-        'relative_diff': lambda cur, ref: float(abs(cur - ref)) / ref,
-    }
-
-    @apply_defaults
-    def __init__(
-        self,
-        table,  # type: str
-        metrics_thresholds,  # type: Dict[str, int]
-        date_filter_column='ds',  # type: Optional[str]
-        days_back=-7,  # type: SupportsAbs[int]
-        ratio_formula='max_over_min',  # type: Optional[str]
-        ignore_zero=True,  # type: Optional[bool]
-        conn_id=None,  # type: Optional[str]
-        *args, **kwargs
-    ):
-        super(IntervalCheckOperator, self).__init__(*args, **kwargs)
-        if ratio_formula not in self.ratio_formulas:
-            msg_template = "Invalid diff_method: {diff_method}. " \
-                           "Supported diff methods are: {diff_methods}"
-
-            raise AirflowException(
-                msg_template.format(diff_method=ratio_formula,
-                                    diff_methods=self.ratio_formulas)
-            )
-        self.ratio_formula = ratio_formula
-        self.ignore_zero = ignore_zero
-        self.table = table
-        self.metrics_thresholds = metrics_thresholds
-        self.metrics_sorted = sorted(metrics_thresholds.keys())
-        self.date_filter_column = date_filter_column
-        self.days_back = -abs(days_back)
-        self.conn_id = conn_id
-        sqlexp = ', '.join(self.metrics_sorted)
-        sqlt = "SELECT {sqlexp} FROM {table} WHERE {date_filter_column}=".format(
-            sqlexp=sqlexp, table=table, date_filter_column=date_filter_column
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.operators.sql.SQLThresholdCheckOperator`.""",
+            DeprecationWarning, stacklevel=2
         )
-
-        self.sql1 = sqlt + "'{{ ds }}'"
-        self.sql2 = sqlt + "'{{ macros.ds_add(ds, " + str(self.days_back) + ") }}'"
-
-    def execute(self, context=None):
-        hook = self.get_db_hook()
-        self.log.info('Using ratio formula: %s', self.ratio_formula)
-        self.log.info('Executing SQL check: %s', self.sql2)
-        row2 = hook.get_first(self.sql2)
-        self.log.info('Executing SQL check: %s', self.sql1)
-        row1 = hook.get_first(self.sql1)
-
-        if not row2:
-            raise AirflowException("The query {} returned None".format(self.sql2))
-        if not row1:
-            raise AirflowException("The query {} returned None".format(self.sql1))
-
-        current = dict(zip(self.metrics_sorted, row1))
-        reference = dict(zip(self.metrics_sorted, row2))
-
-        ratios = {}
-        test_results = {}
-
-        for m in self.metrics_sorted:
-            cur = current[m]
-            ref = reference[m]
-            threshold = self.metrics_thresholds[m]
-            if cur == 0 or ref == 0:
-                ratios[m] = None
-                test_results[m] = self.ignore_zero
-            else:
-                ratios[m] = self.ratio_formulas[self.ratio_formula](current[m], reference[m])
-                test_results[m] = ratios[m] < threshold
-
-            self.log.info(
-                (
-                    "Current metric for %s: %s\n"
-                    "Past metric for %s: %s\n"
-                    "Ratio for %s: %s\n"
-                    "Threshold: %s\n"
-                ), m, cur, m, ref, m, ratios[m], threshold)
-
-        if not all(test_results.values()):
-            failed_tests = [it[0] for it in test_results.items() if not it[1]]
-            j = len(failed_tests)
-            n = len(self.metrics_sorted)
-            self.log.warning("The following %s tests out of %s failed:", j, n)
-            for k in failed_tests:
-                self.log.warning(
-                    "'%s' check failed. %s is above %s", k, ratios[k], self.metrics_thresholds[k]
-                )
-            raise AirflowException("The following tests have failed:\n {0}".format(", ".join(
-                sorted(failed_tests))))
-
-        self.log.info("All tests have passed")
-
-    def get_db_hook(self):
-        return BaseHook.get_hook(conn_id=self.conn_id)
+        super(ThresholdCheckOperator, self).__init__(*args, **kwargs)
 
 
-class ThresholdCheckOperator(BaseOperator):
+class ValueCheckOperator(SQLValueCheckOperator):
     """
-    Performs a value check using sql code against a mininmum threshold
-    and a maximum threshold. Thresholds can be in the form of a numeric
-    value OR a sql statement that results a numeric.
-
-    Note that this is an abstract class and get_db_hook
-    needs to be defined. Whereas a get_db_hook is hook that gets a
-    single record from an external source.
-
-    :param sql: the sql to be executed. (templated)
-    :type sql: str
-    :param min_threshold: numerical value or min threshold sql to be executed (templated)
-    :type min_threshold: numeric or str
-    :param max_threshold: numerical value or max threshold sql to be executed (templated)
-    :type max_threshold: numeric or str
+    This class is deprecated.
+    Please use `airflow.operators.sql.SQLValueCheckOperator`.
     """
 
-    template_fields = ('sql', 'min_threshold', 'max_threshold')  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-
-    @apply_defaults
-    def __init__(
-        self,
-        sql,   # type: str
-        min_threshold,   # type: Any
-        max_threshold,   # type: Any
-        conn_id=None,   # type: Optional[str]
-        *args, **kwargs
-    ):
-        super(ThresholdCheckOperator, self).__init__(*args, **kwargs)
-        self.sql = sql
-        self.conn_id = conn_id
-        self.min_threshold = _convert_to_float_if_possible(min_threshold)
-        self.max_threshold = _convert_to_float_if_possible(max_threshold)
-
-    def execute(self, context=None):
-        hook = self.get_db_hook()
-        result = hook.get_first(self.sql)[0][0]
-
-        if isinstance(self.min_threshold, float):
-            lower_bound = self.min_threshold
-        else:
-            lower_bound = hook.get_first(self.min_threshold)[0][0]
-
-        if isinstance(self.max_threshold, float):
-            upper_bound = self.max_threshold
-        else:
-            upper_bound = hook.get_first(self.max_threshold)[0][0]
-
-        meta_data = {
-            "result": result,
-            "task_id": self.task_id,
-            "min_threshold": lower_bound,
-            "max_threshold": upper_bound,
-            "within_threshold": lower_bound <= result <= upper_bound
-        }
-
-        self.push(meta_data)
-        if not meta_data["within_threshold"]:
-            error_msg = (
-                'Threshold Check: "{task_id}" failed.\n'
-                'DAG: {dag_id}\nTask_id: {task_id}\n'
-                'Check description: {description}\n'
-                'SQL: {sql}\n'
-                'Result: {result} is not within thresholds '
-                '{min_threshold} and {max_threshold}'
-            ).format(
-                task_id=self.task_id, dag_id=self.dag_id,
-                description=meta_data.get("description"), sql=self.sql,
-                result=round(meta_data.get("result"), 2),
-                min_threshold=meta_data.get("min_threshold"),
-                max_threshold=meta_data.get("max_threshold")
-            )
-            raise AirflowException(error_msg)
-
-        self.log.info("Test %s Successful.", self.task_id)
-
-    def push(self, meta_data):
-        """
-        Optional: Send data check info and metadata to an external database.
-        Default functionality will log metadata.
-        """
-
-        info = "\n".join(["""{}: {}""".format(key, item) for key, item in meta_data.items()])
-        self.log.info("Log from %s:\n%s", self.dag_id, info)
-
-    def get_db_hook(self):
-        return BaseHook.get_hook(conn_id=self.conn_id)
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.operators.sql.SQLValueCheckOperator`.""",
+            DeprecationWarning, stacklevel=2
+        )
+        super(ValueCheckOperator, self).__init__(*args, **kwargs)
diff --git a/airflow/operators/check_operator.py b/airflow/operators/sql.py
similarity index 50%
copy from airflow/operators/check_operator.py
copy to airflow/operators/sql.py
index b6d3a18..91ddc1a 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/sql.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -16,19 +15,31 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from builtins import str, zip
-from typing import Optional, Any, Iterable, Dict, SupportsAbs
+from distutils.util import strtobool
+from typing import Iterable
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.models import BaseOperator
+from airflow.models import BaseOperator, SkipMixin
 from airflow.utils.decorators import apply_defaults
 
-
-class CheckOperator(BaseOperator):
+ALLOWED_CONN_TYPE = {
+    "google_cloud_platform",
+    "jdbc",
+    "mssql",
+    "mysql",
+    "odbc",
+    "oracle",
+    "postgres",
+    "presto",
+    "sqlite",
+    "vertica",
+}
+
+
+class SQLCheckOperator(BaseOperator):
     """
-    Performs checks against a db. The ``CheckOperator`` expects
+    Performs checks against a db. The ``SQLCheckOperator`` expects
     a sql query that will return a single row. Each value on that
     first row is evaluated using python ``bool`` casting. If any of the
     values return ``False`` the check is failed and errors out.
@@ -62,36 +73,44 @@ class CheckOperator(BaseOperator):
     :type sql: str
     """
 
-    template_fields = ('sql',)  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
+    template_fields = ("sql",)  # type: Iterable[str]
+    template_ext = (
+        ".hql",
+        ".sql",
+    )  # type: Iterable[str]
+    ui_color = "#fff7e6"
 
     @apply_defaults
     def __init__(
-        self,
-        sql,  # type: str
-        conn_id=None,  # type: Optional[str]
-        *args,
-        **kwargs
+        self, sql, conn_id=None, *args, **kwargs
     ):
-        super(CheckOperator, self).__init__(*args, **kwargs)
+        super(SQLCheckOperator, self).__init__(*args, **kwargs)
         self.conn_id = conn_id
         self.sql = sql
 
     def execute(self, context=None):
-        self.log.info('Executing SQL check: %s', self.sql)
+        self.log.info("Executing SQL check: %s", self.sql)
         records = self.get_db_hook().get_first(self.sql)
 
-        self.log.info('Record: %s', records)
+        self.log.info("Record: %s", records)
         if not records:
             raise AirflowException("The query returned None")
         elif not all([bool(r) for r in records]):
-            raise AirflowException("Test failed.\nQuery:\n{query}\nResults:\n{records!s}".format(
-                query=self.sql, records=records))
+            raise AirflowException(
+                "Test failed.\nQuery:\n{query}\nResults:\n{records!s}".format(
+                    query=self.sql, records=records
+                )
+            )
 
         self.log.info("Success.")
 
     def get_db_hook(self):
+        """
+        Get the database hook for the connection.
+
+        :return: the database hook object.
+        :rtype: DbApiHook
+        """
         return BaseHook.get_hook(conn_id=self.conn_id)
 
 
@@ -110,7 +129,7 @@ def _convert_to_float_if_possible(s):
     return ret
 
 
-class ValueCheckOperator(BaseOperator):
+class SQLValueCheckOperator(BaseOperator):
     """
     Performs a simple value check using sql code.
 
@@ -122,24 +141,28 @@ class ValueCheckOperator(BaseOperator):
     :type sql: str
     """
 
-    __mapper_args__ = {
-        'polymorphic_identity': 'ValueCheckOperator'
-    }
-    template_fields = ('sql', 'pass_value',)  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
+    __mapper_args__ = {"polymorphic_identity": "SQLValueCheckOperator"}
+    template_fields = (
+        "sql",
+        "pass_value",
+    )  # type: Iterable[str]
+    template_ext = (
+        ".hql",
+        ".sql",
+    )  # type: Iterable[str]
+    ui_color = "#fff7e6"
 
     @apply_defaults
     def __init__(
-        self,
-        sql,  # type: str
-        pass_value,  # type: Any
-        tolerance=None,  # type: Any
-        conn_id=None,  # type: Optional[str]
-        *args,
-        **kwargs
-    ):
-        super(ValueCheckOperator, self).__init__(*args, **kwargs)
+            self,
+            sql,
+            pass_value,
+            tolerance=None,
+            conn_id=None,
+            *args,
+            **kwargs
+            ):
+        super(SQLValueCheckOperator, self).__init__(*args, **kwargs)
         self.sql = sql
         self.conn_id = conn_id
         self.pass_value = str(pass_value)
@@ -148,7 +171,7 @@ class ValueCheckOperator(BaseOperator):
         self.has_tolerance = self.tol is not None
 
     def execute(self, context=None):
-        self.log.info('Executing SQL check: %s', self.sql)
+        self.log.info("Executing SQL check: %s", self.sql)
         records = self.get_db_hook().get_first(self.sql)
 
         if not records:
@@ -157,14 +180,16 @@ class ValueCheckOperator(BaseOperator):
         pass_value_conv = _convert_to_float_if_possible(self.pass_value)
         is_numeric_value_check = isinstance(pass_value_conv, float)
 
-        tolerance_pct_str = str(self.tol * 100) + '%' if self.has_tolerance else None
-        error_msg = ("Test failed.\nPass value:{pass_value_conv}\n"
-                     "Tolerance:{tolerance_pct_str}\n"
-                     "Query:\n{sql}\nResults:\n{records!s}").format(
+        tolerance_pct_str = str(self.tol * 100) + "%" if self.has_tolerance else None
+        error_msg = (
+            "Test failed.\nPass value:{pass_value_conv}\n"
+            "Tolerance:{tolerance_pct_str}\n"
+            "Query:\n{sql}\nResults:\n{records!s}"
+        ).format(
             pass_value_conv=pass_value_conv,
             tolerance_pct_str=tolerance_pct_str,
             sql=self.sql,
-            records=records
+            records=records,
         )
 
         if not is_numeric_value_check:
@@ -173,7 +198,9 @@ class ValueCheckOperator(BaseOperator):
             try:
                 numeric_records = self._to_float(records)
             except (ValueError, TypeError):
-                raise AirflowException("Converting a result to float failed.\n{}".format(error_msg))
+                raise AirflowException(
+                    "Converting a result to float failed.\n{}".format(error_msg)
+                )
             tests = self._get_numeric_matches(numeric_records, pass_value_conv)
         else:
             tests = []
@@ -197,10 +224,16 @@ class ValueCheckOperator(BaseOperator):
         return [record == numeric_pass_value_conv for record in numeric_records]
 
     def get_db_hook(self):
+        """
+        Get the database hook for the connection.
+
+        :return: the database hook object.
+        :rtype: DbApiHook
+        """
         return BaseHook.get_hook(conn_id=self.conn_id)
 
 
-class IntervalCheckOperator(BaseOperator):
+class SQLIntervalCheckOperator(BaseOperator):
     """
     Checks that the values of metrics given as SQL expressions are within
     a certain tolerance of the ones from days_back before.
@@ -229,38 +262,43 @@ class IntervalCheckOperator(BaseOperator):
     :type metrics_threshold: dict
     """
 
-    __mapper_args__ = {
-        'polymorphic_identity': 'IntervalCheckOperator'
-    }
-    template_fields = ('sql1', 'sql2')  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
-    ui_color = '#fff7e6'
+    __mapper_args__ = {"polymorphic_identity": "SQLIntervalCheckOperator"}
+    template_fields = ("sql1", "sql2")  # type: Iterable[str]
+    template_ext = (
+        ".hql",
+        ".sql",
+    )  # type: Iterable[str]
+    ui_color = "#fff7e6"
 
     ratio_formulas = {
-        'max_over_min': lambda cur, ref: float(max(cur, ref)) / min(cur, ref),
-        'relative_diff': lambda cur, ref: float(abs(cur - ref)) / ref,
+        "max_over_min": lambda cur, ref: float(max(cur, ref)) / min(cur, ref),
+        "relative_diff": lambda cur, ref: float(abs(cur - ref)) / ref,
     }
 
     @apply_defaults
     def __init__(
         self,
-        table,  # type: str
-        metrics_thresholds,  # type: Dict[str, int]
-        date_filter_column='ds',  # type: Optional[str]
-        days_back=-7,  # type: SupportsAbs[int]
-        ratio_formula='max_over_min',  # type: Optional[str]
-        ignore_zero=True,  # type: Optional[bool]
-        conn_id=None,  # type: Optional[str]
-        *args, **kwargs
+        table,
+        metrics_thresholds,
+        date_filter_column="ds",
+        days_back=-7,
+        ratio_formula="max_over_min",
+        ignore_zero=True,
+        conn_id=None,
+        *args,
+        **kwargs
     ):
-        super(IntervalCheckOperator, self).__init__(*args, **kwargs)
+        super(SQLIntervalCheckOperator, self).__init__(*args, **kwargs)
         if ratio_formula not in self.ratio_formulas:
-            msg_template = "Invalid diff_method: {diff_method}. " \
-                           "Supported diff methods are: {diff_methods}"
+            msg_template = (
+                "Invalid diff_method: {diff_method}. "
+                "Supported diff methods are: {diff_methods}"
+            )
 
             raise AirflowException(
-                msg_template.format(diff_method=ratio_formula,
-                                    diff_methods=self.ratio_formulas)
+                msg_template.format(
+                    diff_method=ratio_formula, diff_methods=self.ratio_formulas
+                )
             )
         self.ratio_formula = ratio_formula
         self.ignore_zero = ignore_zero
@@ -270,7 +308,7 @@ class IntervalCheckOperator(BaseOperator):
         self.date_filter_column = date_filter_column
         self.days_back = -abs(days_back)
         self.conn_id = conn_id
-        sqlexp = ', '.join(self.metrics_sorted)
+        sqlexp = ", ".join(self.metrics_sorted)
         sqlt = "SELECT {sqlexp} FROM {table} WHERE {date_filter_column}=".format(
             sqlexp=sqlexp, table=table, date_filter_column=date_filter_column
         )
@@ -280,10 +318,10 @@ class IntervalCheckOperator(BaseOperator):
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-        self.log.info('Using ratio formula: %s', self.ratio_formula)
-        self.log.info('Executing SQL check: %s', self.sql2)
+        self.log.info("Using ratio formula: %s", self.ratio_formula)
+        self.log.info("Executing SQL check: %s", self.sql2)
         row2 = hook.get_first(self.sql2)
-        self.log.info('Executing SQL check: %s', self.sql1)
+        self.log.info("Executing SQL check: %s", self.sql1)
         row1 = hook.get_first(self.sql1)
 
         if not row2:
@@ -297,16 +335,18 @@ class IntervalCheckOperator(BaseOperator):
         ratios = {}
         test_results = {}
 
-        for m in self.metrics_sorted:
-            cur = current[m]
-            ref = reference[m]
-            threshold = self.metrics_thresholds[m]
+        for metric in self.metrics_sorted:
+            cur = current[metric]
+            ref = reference[metric]
+            threshold = self.metrics_thresholds[metric]
             if cur == 0 or ref == 0:
-                ratios[m] = None
-                test_results[m] = self.ignore_zero
+                ratios[metric] = None
+                test_results[metric] = self.ignore_zero
             else:
-                ratios[m] = self.ratio_formulas[self.ratio_formula](current[m], reference[m])
-                test_results[m] = ratios[m] < threshold
+                ratios[metric] = self.ratio_formulas[self.ratio_formula](
+                    current[metric], reference[metric]
+                )
+                test_results[metric] = ratios[metric] < threshold
 
             self.log.info(
                 (
@@ -314,27 +354,49 @@ class IntervalCheckOperator(BaseOperator):
                     "Past metric for %s: %s\n"
                     "Ratio for %s: %s\n"
                     "Threshold: %s\n"
-                ), m, cur, m, ref, m, ratios[m], threshold)
+                ),
+                metric,
+                cur,
+                metric,
+                ref,
+                metric,
+                ratios[metric],
+                threshold,
+            )
 
         if not all(test_results.values()):
             failed_tests = [it[0] for it in test_results.items() if not it[1]]
-            j = len(failed_tests)
-            n = len(self.metrics_sorted)
-            self.log.warning("The following %s tests out of %s failed:", j, n)
+            self.log.warning(
+                "The following %s tests out of %s failed:",
+                len(failed_tests),
+                len(self.metrics_sorted),
+            )
             for k in failed_tests:
                 self.log.warning(
-                    "'%s' check failed. %s is above %s", k, ratios[k], self.metrics_thresholds[k]
+                    "'%s' check failed. %s is above %s",
+                    k,
+                    ratios[k],
+                    self.metrics_thresholds[k],
+                )
+            raise AirflowException(
+                "The following tests have failed:\n {0}".format(
+                    ", ".join(sorted(failed_tests))
                 )
-            raise AirflowException("The following tests have failed:\n {0}".format(", ".join(
-                sorted(failed_tests))))
+            )
 
         self.log.info("All tests have passed")
 
     def get_db_hook(self):
+        """
+        Get the database hook for the connection.
+
+        :return: the database hook object.
+        :rtype: DbApiHook
+        """
         return BaseHook.get_hook(conn_id=self.conn_id)
 
 
-class ThresholdCheckOperator(BaseOperator):
+class SQLThresholdCheckOperator(BaseOperator):
     """
     Performs a value check using sql code against a mininmum threshold
     and a maximum threshold. Thresholds can be in the form of a numeric
@@ -352,19 +414,23 @@ class ThresholdCheckOperator(BaseOperator):
     :type max_threshold: numeric or str
     """
 
-    template_fields = ('sql', 'min_threshold', 'max_threshold')  # type: Iterable[str]
-    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
+    template_fields = ("sql", "min_threshold", "max_threshold")  # type: Iterable[str]
+    template_ext = (
+        ".hql",
+        ".sql",
+    )  # type: Iterable[str]
 
     @apply_defaults
     def __init__(
         self,
-        sql,   # type: str
-        min_threshold,   # type: Any
-        max_threshold,   # type: Any
-        conn_id=None,   # type: Optional[str]
-        *args, **kwargs
+        sql,
+        min_threshold,
+        max_threshold,
+        conn_id=None,
+        *args,
+        **kwargs
     ):
-        super(ThresholdCheckOperator, self).__init__(*args, **kwargs)
+        super(SQLThresholdCheckOperator, self).__init__(*args, **kwargs)
         self.sql = sql
         self.conn_id = conn_id
         self.min_threshold = _convert_to_float_if_possible(min_threshold)
@@ -389,7 +455,7 @@ class ThresholdCheckOperator(BaseOperator):
             "task_id": self.task_id,
             "min_threshold": lower_bound,
             "max_threshold": upper_bound,
-            "within_threshold": lower_bound <= result <= upper_bound
+            "within_threshold": lower_bound <= result <= upper_bound,
         }
 
         self.push(meta_data)
@@ -398,16 +464,17 @@ class ThresholdCheckOperator(BaseOperator):
                 'Threshold Check: "{task_id}" failed.\n'
                 'DAG: {dag_id}\nTask_id: {task_id}\n'
                 'Check description: {description}\n'
-                'SQL: {sql}\n'
-                'Result: {result} is not within thresholds '
-                '{min_threshold} and {max_threshold}'
-            ).format(
-                task_id=self.task_id, dag_id=self.dag_id,
-                description=meta_data.get("description"), sql=self.sql,
-                result=round(meta_data.get("result"), 2),
-                min_threshold=meta_data.get("min_threshold"),
-                max_threshold=meta_data.get("max_threshold")
-            )
+                "SQL: {sql}\n"
+                'Result: {round} is not within thresholds '
+                '{min} and {max}'
+                .format(task_id=meta_data.get("task_id"),
+                        dag_id=self.dag_id,
+                        description=meta_data.get("description"),
+                        sql=self.sql,
+                        round=round(meta_data.get("result"), 2),
+                        min=meta_data.get("min_threshold"),
+                        max=meta_data.get("max_threshold"),
+                        ))
             raise AirflowException(error_msg)
 
         self.log.info("Test %s Successful.", self.task_id)
@@ -418,8 +485,149 @@ class ThresholdCheckOperator(BaseOperator):
         Default functionality will log metadata.
         """
 
-        info = "\n".join(["""{}: {}""".format(key, item) for key, item in meta_data.items()])
+        info = "\n".join(["{key}: {item}".format(key=key, item=item) for key, item in meta_data.items()])
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
     def get_db_hook(self):
+        """
+        Returns DB hook
+        """
         return BaseHook.get_hook(conn_id=self.conn_id)
+
+
+class BranchSQLOperator(BaseOperator, SkipMixin):
+    """
+    Executes sql code in a specific database
+
+    :param sql: the sql code to be executed. (templated)
+    :type sql: Can receive a str representing a sql statement or reference to a template file.
+               Template reference are recognized by str ending in '.sql'.
+               Expected SQL query to return Boolean (True/False), integer (0 = False, Otherwise = 1)
+               or string (true/y/yes/1/on/false/n/no/0/off).
+    :param follow_task_ids_if_true: task id or task ids to follow if query return true
+    :type follow_task_ids_if_true: str or list
+    :param follow_task_ids_if_false: task id or task ids to follow if query return true
+    :type follow_task_ids_if_false: str or list
+    :param conn_id: reference to a specific database
+    :type conn_id: str
+    :param database: name of database which overwrite defined one in connection
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :type parameters: mapping or iterable
+    """
+
+    template_fields = ("sql",)
+    template_ext = (".sql",)
+    ui_color = "#a22034"
+    ui_fgcolor = "#F7F7F7"
+
+    @apply_defaults
+    def __init__(
+        self,
+        sql,
+        follow_task_ids_if_true,
+        follow_task_ids_if_false,
+        conn_id="default_conn_id",
+        database=None,
+        parameters=None,
+        *args,
+        **kwargs
+    ):
+        super(BranchSQLOperator, self).__init__(*args, **kwargs)
+        self.conn_id = conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.follow_task_ids_if_true = follow_task_ids_if_true
+        self.follow_task_ids_if_false = follow_task_ids_if_false
+        self.database = database
+        self._hook = None
+
+    def _get_hook(self):
+        self.log.debug("Get connection for %s", self.conn_id)
+        conn = BaseHook.get_connection(self.conn_id)
+
+        if conn.conn_type not in ALLOWED_CONN_TYPE:
+            raise AirflowException(
+                "The connection type is not supported by BranchSQLOperator.\
+                Supported connection types: {}".format(list(ALLOWED_CONN_TYPE))
+            )
+
+        if not self._hook:
+            self._hook = conn.get_hook()
+            if self.database:
+                self._hook.schema = self.database
+
+        return self._hook
+
+    def execute(self, context):
+        # get supported hook
+        self._hook = self._get_hook()
+
+        if self._hook is None:
+            raise AirflowException(
+                "Failed to establish connection to '%s'" % self.conn_id
+            )
+
+        if self.sql is None:
+            raise AirflowException("Expected 'sql' parameter is missing.")
+
+        if self.follow_task_ids_if_true is None:
+            raise AirflowException(
+                "Expected 'follow_task_ids_if_true' paramter is missing."
+            )
+
+        if self.follow_task_ids_if_false is None:
+            raise AirflowException(
+                "Expected 'follow_task_ids_if_false' parameter is missing."
+            )
+
+        self.log.info(
+            "Executing: %s (with parameters %s) with connection: %s",
+            self.sql,
+            self.parameters,
+            self._hook,
+        )
+        record = self._hook.get_first(self.sql, self.parameters)
+        if not record:
+            raise AirflowException(
+                "No rows returned from sql query. Operator expected True or False return value."
+            )
+
+        if isinstance(record, list):
+            if isinstance(record[0], list):
+                query_result = record[0][0]
+            else:
+                query_result = record[0]
+        elif isinstance(record, tuple):
+            query_result = record[0]
+        else:
+            query_result = record
+
+        self.log.info("Query returns %s, type '%s'", query_result, type(query_result))
+
+        follow_branch = None
+        try:
+            if isinstance(query_result, bool):
+                if query_result:
+                    follow_branch = self.follow_task_ids_if_true
+            elif isinstance(query_result, str):
+                # return result is not Boolean, try to convert from String to Boolean
+                if bool(strtobool(query_result)):
+                    follow_branch = self.follow_task_ids_if_true
+            elif isinstance(query_result, int):
+                if bool(query_result):
+                    follow_branch = self.follow_task_ids_if_true
+            else:
+                raise AirflowException(
+                    "Unexpected query return result '%s' type '%s'"
+                    % (query_result, type(query_result))
+                )
+
+            if follow_branch is None:
+                follow_branch = self.follow_task_ids_if_false
+        except ValueError:
+            raise AirflowException(
+                "Unexpected query return result '%s' type '%s'"
+                % (query_result, type(query_result))
+            )
+
+        self.skip_all_except(context["ti"], follow_branch)
diff --git a/airflow/operators/sql_branch_operator.py b/airflow/operators/sql_branch_operator.py
index 072c40c..b911e34 100644
--- a/airflow/operators/sql_branch_operator.py
+++ b/airflow/operators/sql_branch_operator.py
@@ -14,160 +14,22 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""This module is deprecated. Please use `airflow.operators.sql`."""
+import warnings
 
-from distutils.util import strtobool
+from airflow.operators.sql import BranchSQLOperator
 
-from airflow.exceptions import AirflowException
-from airflow.hooks.base_hook import BaseHook
-from airflow.models import BaseOperator, SkipMixin
-from airflow.utils.decorators import apply_defaults
 
-ALLOWED_CONN_TYPE = {
-    "google_cloud_platform",
-    "jdbc",
-    "mssql",
-    "mysql",
-    "odbc",
-    "oracle",
-    "postgres",
-    "presto",
-    "sqlite",
-    "vertica",
-}
-
-
-class BranchSqlOperator(BaseOperator, SkipMixin):
+class BranchSqlOperator(BranchSQLOperator):
     """
-    Executes sql code in a specific database
-
-    :param sql: the sql code to be executed. (templated)
-    :type sql: Can receive a str representing a sql statement or reference to a template file.
-               Template reference are recognized by str ending in '.sql'.
-               Expected SQL query to return Boolean (True/False), integer (0 = False, Otherwise = 1)
-               or string (true/y/yes/1/on/false/n/no/0/off).
-    :param follow_task_ids_if_true: task id or task ids to follow if query return true
-    :type follow_task_ids_if_true: str or list
-    :param follow_task_ids_if_false: task id or task ids to follow if query return true
-    :type follow_task_ids_if_false: str or list
-    :param conn_id: reference to a specific database
-    :type conn_id: str
-    :param database: name of database which overwrite defined one in connection
-    :param parameters: (optional) the parameters to render the SQL query with.
-    :type parameters: mapping or iterable
+    This class is deprecated.
+    Please use `airflow.operators.sql.BranchSQLOperator`.
     """
 
-    template_fields = ("sql",)
-    template_ext = (".sql",)
-    ui_color = "#a22034"
-    ui_fgcolor = "#F7F7F7"
-
-    @apply_defaults
-    def __init__(
-            self,
-            sql,
-            follow_task_ids_if_true,
-            follow_task_ids_if_false,
-            conn_id="default_conn_id",
-            database=None,
-            parameters=None,
-            *args,
-            **kwargs):
-        super(BranchSqlOperator, self).__init__(*args, **kwargs)
-        self.conn_id = conn_id
-        self.sql = sql
-        self.parameters = parameters
-        self.follow_task_ids_if_true = follow_task_ids_if_true
-        self.follow_task_ids_if_false = follow_task_ids_if_false
-        self.database = database
-        self._hook = None
-
-    def _get_hook(self):
-        self.log.debug("Get connection for %s", self.conn_id)
-        conn = BaseHook.get_connection(self.conn_id)
-
-        if conn.conn_type not in ALLOWED_CONN_TYPE:
-            raise AirflowException(
-                "The connection type is not supported by BranchSqlOperator. "
-                + "Supported connection types: {}".format(list(ALLOWED_CONN_TYPE))
-            )
-
-        if not self._hook:
-            self._hook = conn.get_hook()
-            if self.database:
-                self._hook.schema = self.database
-
-        return self._hook
-
-    def execute(self, context):
-        # get supported hook
-        self._hook = self._get_hook()
-
-        if self._hook is None:
-            raise AirflowException(
-                "Failed to establish connection to '%s'" % self.conn_id
-            )
-
-        if self.sql is None:
-            raise AirflowException("Expected 'sql' parameter is missing.")
-
-        if self.follow_task_ids_if_true is None:
-            raise AirflowException(
-                "Expected 'follow_task_ids_if_true' paramter is missing."
-            )
-
-        if self.follow_task_ids_if_false is None:
-            raise AirflowException(
-                "Expected 'follow_task_ids_if_false' parameter is missing."
-            )
-
-        self.log.info(
-            "Executing: %s (with parameters %s) with connection: %s",
-            self.sql,
-            self.parameters,
-            self._hook,
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.operators.sql.BranchSQLOperator`.""",
+            DeprecationWarning, stacklevel=2
         )
-        record = self._hook.get_first(self.sql, self.parameters)
-        if not record:
-            raise AirflowException(
-                "No rows returned from sql query. Operator expected True or False return value."
-            )
-
-        if isinstance(record, list):
-            if isinstance(record[0], list):
-                query_result = record[0][0]
-            else:
-                query_result = record[0]
-        elif isinstance(record, tuple):
-            query_result = record[0]
-        else:
-            query_result = record
-
-        self.log.info("Query returns %s, type '%s'", query_result, type(query_result))
-
-        follow_branch = None
-        try:
-            if isinstance(query_result, bool):
-                if query_result:
-                    follow_branch = self.follow_task_ids_if_true
-            elif isinstance(query_result, str):
-                # return result is not Boolean, try to convert from String to Boolean
-                if bool(strtobool(query_result)):
-                    follow_branch = self.follow_task_ids_if_true
-            elif isinstance(query_result, int):
-                if bool(query_result):
-                    follow_branch = self.follow_task_ids_if_true
-            else:
-                raise AirflowException(
-                    "Unexpected query return result '%s' type '%s'"
-                    % (query_result, type(query_result))
-                )
-
-            if follow_branch is None:
-                follow_branch = self.follow_task_ids_if_false
-        except ValueError:
-            raise AirflowException(
-                "Unexpected query return result '%s' type '%s'"
-                % (query_result, type(query_result))
-            )
-
-        self.skip_all_except(context["ti"], follow_branch)
+        super(BranchSqlOperator, self).__init__(*args, **kwargs)
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 55176f8..1fd11c3 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -57,10 +57,6 @@ Fundamentals
 
    * - :mod:`airflow.operators.branch_operator`
      -
-
-   * - :mod:`airflow.operators.check_operator`
-     -
-
    * - :mod:`airflow.operators.dagrun_operator`
      -
 
@@ -76,7 +72,7 @@ Fundamentals
    * - :mod:`airflow.operators.subdag_operator`
      -
 
-   * - :mod:`airflow.operators.sql_branch_operator`
+   * - :mod:`airflow.operators.sql`
      -
 
 **Sensors:**
@@ -90,9 +86,6 @@ Fundamentals
    * - :mod:`airflow.sensors.weekday_sensor`
      -
 
-   * - :mod:`airflow.sensors.external_task_sensor`
-     - :doc:`How to use <howto/operator/external_task_sensor>`
-
    * - :mod:`airflow.sensors.sql_sensor`
      -
 
@@ -470,7 +463,7 @@ These integrations allow you to copy data from/to Amazon Web Services.
 
    * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
      - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
-     - :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     -
      - :mod:`airflow.contrib.operators.s3_to_gcs_operator`,
        :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
 
@@ -551,7 +544,7 @@ These integrations allow you to perform various operations within the Google Clo
      - Sensors
 
    * - `AutoML <https://cloud.google.com/automl/>`__
-     - :doc:`How to use <howto/operator/gcp/automl>`
+     -
      - :mod:`airflow.gcp.hooks.automl`
      - :mod:`airflow.gcp.operators.automl`
      -
@@ -563,7 +556,7 @@ These integrations allow you to perform various operations within the Google Clo
      - :mod:`airflow.gcp.sensors.bigquery`
 
    * - `BigQuery Data Transfer Service <https://cloud.google.com/bigquery/transfer/>`__
-     - :doc:`How to use <howto/operator/gcp/bigquery_dts>`
+     -
      - :mod:`airflow.gcp.hooks.bigquery_dts`
      - :mod:`airflow.gcp.operators.bigquery_dts`
      - :mod:`airflow.gcp.sensors.bigquery_dts`
@@ -611,7 +604,7 @@ These integrations allow you to perform various operations within the Google Clo
      -
 
    * - `Cloud Functions <https://cloud.google.com/functions/>`__
-     - :doc:`How to use <howto/operator/gcp/functions>`
+     - :doc:`How to use <howto/operator/gcp/function>`
      - :mod:`airflow.gcp.hooks.functions`
      - :mod:`airflow.gcp.operators.functions`
      -
@@ -635,7 +628,7 @@ These integrations allow you to perform various operations within the Google Clo
      -
 
    * - `Cloud Memorystore <https://cloud.google.com/memorystore/>`__
-     - :doc:`How to use <howto/operator/gcp/cloud_memorystore>`
+     -
      - :mod:`airflow.gcp.hooks.cloud_memorystore`
      - :mod:`airflow.gcp.operators.cloud_memorystore`
      -
@@ -677,7 +670,7 @@ These integrations allow you to perform various operations within the Google Clo
      - :mod:`airflow.gcp.sensors.gcs`
 
    * - `Storage Transfer Service <https://cloud.google.com/storage/transfer/>`__
-     - :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     -
      - :mod:`airflow.gcp.hooks.cloud_storage_transfer_service`
      - :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
      - :mod:`airflow.gcp.sensors.cloud_storage_transfer_service`
@@ -701,7 +694,7 @@ These integrations allow you to perform various operations within the Google Clo
      -
 
    * - `Cloud Video Intelligence <https://cloud.google.com/video_intelligence/>`__
-     - :doc:`How to use <howto/operator/gcp/video_intelligence>`
+     - :doc:`How to use <howto/operator/gcp/video>`
      - :mod:`airflow.gcp.hooks.video_intelligence`
      - :mod:`airflow.gcp.operators.video_intelligence`
      -
@@ -741,7 +734,7 @@ These integrations allow you to copy data from/to Google Cloud Platform.
 
    * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
      - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
-     - :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     -
      - :mod:`airflow.contrib.operators.s3_to_gcs_operator`,
        :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
 
@@ -772,8 +765,7 @@ These integrations allow you to copy data from/to Google Cloud Platform.
 
    * - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
      - `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
-     - :doc:`How to use <howto/operator/gcp/gcs_to_gcs>`,
-       :doc:`How to use <howto/operator/gcp/cloud_storage_transfer_service>`
+     -
      - :mod:`airflow.operators.gcs_to_gcs`,
        :mod:`airflow.gcp.operators.cloud_storage_transfer_service`
 
@@ -1037,7 +1029,7 @@ These integrations allow you to perform various operations using various softwar
      - :mod:`airflow.contrib.sensors.bash_sensor`
 
    * - `Kubernetes <https://kubernetes.io/>`__
-     - :doc:`How to use <howto/operator/kubernetes>`
+     -
      -
      - :mod:`airflow.contrib.operators.kubernetes_pod_operator`
      -
diff --git a/tests/api/common/experimental/test_pool.py b/tests/api/common/experimental/test_pool.py
index 29c7105..79944de 100644
--- a/tests/api/common/experimental/test_pool.py
+++ b/tests/api/common/experimental/test_pool.py
@@ -131,8 +131,8 @@ class TestPool(unittest.TestCase):
                                     name=name)
 
     def test_delete_default_pool_not_allowed(self):
-        with self.assertRaisesRegex(AirflowBadRequest,
-                                    "^default_pool cannot be deleted$"):
+        with self.assertRaisesRegexp(AirflowBadRequest,
+                                     "^default_pool cannot be deleted$"):
             pool_api.delete_pool(Pool.DEFAULT_POOL_NAME)
 
 
diff --git a/tests/contrib/hooks/test_gcp_api_base_hook.py b/tests/contrib/hooks/test_gcp_api_base_hook.py
index e3f99b5..9fc9924 100644
--- a/tests/contrib/hooks/test_gcp_api_base_hook.py
+++ b/tests/contrib/hooks/test_gcp_api_base_hook.py
@@ -98,7 +98,7 @@ class QuotaRetryTestCase(unittest.TestCase):  # ptlint: disable=invalid-name
         self.assertEqual(5, custom_fn.counter)
 
     def test_raise_exception_on_non_quota_exception(self):
-        with six.assertRaisesRegex(self, Forbidden, "Daily Limit Exceeded"):
+        with six.assertRaisesRegexp(self, Forbidden, "Daily Limit Exceeded"):
             message = "POST https://translation.googleapis.com/language/translate/v2: Daily Limit Exceeded"
             errors = [
                 {'message': 'Daily Limit Exceeded', 'domain': 'usageLimits', 'reason': 'dailyLimitExceeded'}
diff --git a/tests/contrib/hooks/test_gcp_cloud_build_hook.py b/tests/contrib/hooks/test_gcp_cloud_build_hook.py
index 81c7ae0..6d5aa16 100644
--- a/tests/contrib/hooks/test_gcp_cloud_build_hook.py
+++ b/tests/contrib/hooks/test_gcp_cloud_build_hook.py
@@ -117,7 +117,7 @@ class TestCloudBuildHookWithPassedProjectId(unittest.TestCase):
 
         execute_mock = mock.Mock(**{"side_effect": [TEST_WAITING_OPERATION, TEST_ERROR_OPERATION]})
         service_mock.operations.return_value.get.return_value.execute = execute_mock
-        with six.assertRaisesRegex(self, AirflowException, "error"):
+        with six.assertRaisesRegexp(self, AirflowException, "error"):
             self.hook.create_build(body={})
 
 
@@ -186,7 +186,7 @@ class TestGcpComputeHookWithDefaultProjectIdFromConnection(unittest.TestCase):
 
         execute_mock = mock.Mock(**{"side_effect": [TEST_WAITING_OPERATION, TEST_ERROR_OPERATION]})
         service_mock.operations.return_value.get.return_value.execute = execute_mock
-        with six.assertRaisesRegex(self, AirflowException, "error"):
+        with six.assertRaisesRegexp(self, AirflowException, "error"):
             self.hook.create_build(body={})
 
 
diff --git a/tests/contrib/hooks/test_gcp_transfer_hook.py b/tests/contrib/hooks/test_gcp_transfer_hook.py
index ab0cfbf..e78d67b 100644
--- a/tests/contrib/hooks/test_gcp_transfer_hook.py
+++ b/tests/contrib/hooks/test_gcp_transfer_hook.py
@@ -265,7 +265,7 @@ class TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
         }
 
         get_conn.return_value.transferOperations.return_value.list_next.return_value = None
-        with six.assertRaisesRegex(
+        with six.assertRaisesRegexp(
             self, AirflowException, "An unexpected operation status was encountered. Expected: SUCCESS"
         ):
             self.gct_hook.wait_for_transfer_job(
@@ -298,7 +298,7 @@ class TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
     def test_operations_contain_expected_statuses_red_path(self, statuses, expected_statuses):
         operations = [{NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: status}} for status in statuses]
 
-        with six.assertRaisesRegex(
+        with six.assertRaisesRegexp(
             self,
             AirflowException,
             "An unexpected operation status was encountered. Expected: {}".format(
diff --git a/tests/contrib/operators/test_gcp_cloud_build_operator.py b/tests/contrib/operators/test_gcp_cloud_build_operator.py
index 2136757..4a7ca73 100644
--- a/tests/contrib/operators/test_gcp_cloud_build_operator.py
+++ b/tests/contrib/operators/test_gcp_cloud_build_operator.py
@@ -42,7 +42,7 @@ TEST_PROJECT_ID = "example-id"
 
 class BuildProcessorTestCase(TestCase):
     def test_verify_source(self):
-        with six.assertRaisesRegex(self, AirflowException, "The source could not be determined."):
+        with six.assertRaisesRegexp(self, AirflowException, "The source could not be determined."):
             BuildProcessor(body={"source": {"storageSource": {}, "repoSource": {}}}).process_body()
 
     @parameterized.expand(
@@ -77,7 +77,7 @@ class BuildProcessorTestCase(TestCase):
     )
     def test_convert_repo_url_to_storage_dict_invalid(self, url):
         body = {"source": {"repoSource": url}}
-        with six.assertRaisesRegex(self, AirflowException, "Invalid URL."):
+        with six.assertRaisesRegexp(self, AirflowException, "Invalid URL."):
             BuildProcessor(body=body).process_body()
 
     @parameterized.expand(
@@ -102,7 +102,7 @@ class BuildProcessorTestCase(TestCase):
     )
     def test_convert_storage_url_to_dict_invalid(self, url):
         body = {"source": {"storageSource": url}}
-        with six.assertRaisesRegex(self, AirflowException, "Invalid URL."):
+        with six.assertRaisesRegexp(self, AirflowException, "Invalid URL."):
             BuildProcessor(body=body).process_body()
 
     @parameterized.expand([("storageSource",), ("repoSource",)])
@@ -128,7 +128,7 @@ class GcpCloudBuildCreateBuildOperatorTestCase(TestCase):
 
     @parameterized.expand([({},), (None,)])
     def test_missing_input(self, body):
-        with six.assertRaisesRegex(self, AirflowException, "The required parameter 'body' is missing"):
+        with six.assertRaisesRegexp(self, AirflowException, "The required parameter 'body' is missing"):
             CloudBuildCreateBuildOperator(body=body, project_id=TEST_PROJECT_ID, task_id="task-id")
 
     @mock.patch(  # type: ignore
diff --git a/tests/contrib/operators/test_gcs_to_gdrive.py b/tests/contrib/operators/test_gcs_to_gdrive.py
index 4f49055..03a4e66 100644
--- a/tests/contrib/operators/test_gcs_to_gdrive.py
+++ b/tests/contrib/operators/test_gcs_to_gdrive.py
@@ -147,5 +147,5 @@ class TestGcsToGDriveOperator(unittest.TestCase):
         task = GcsToGDriveOperator(
             task_id="move_files", source_bucket="data", source_object="sales/*/*.avro", move_object=True
         )
-        with six.assertRaisesRegex(self, AirflowException, "Only one wildcard"):
+        with six.assertRaisesRegexp(self, AirflowException, "Only one wildcard"):
             task.execute(mock.MagicMock())
diff --git a/tests/contrib/operators/test_sftp_operator.py b/tests/contrib/operators/test_sftp_operator.py
index 24db36e..d597d8d 100644
--- a/tests/contrib/operators/test_sftp_operator.py
+++ b/tests/contrib/operators/test_sftp_operator.py
@@ -363,9 +363,9 @@ class SFTPOperatorTest(unittest.TestCase):
 
         # Exception should be raised if neither ssh_hook nor ssh_conn_id is provided
         if six.PY2:
-            self.assertRaisesRegex = self.assertRaisesRegexp
-        with self.assertRaisesRegex(AirflowException,
-                                    "Cannot operate without ssh_hook or ssh_conn_id."):
+            self.assertRaisesRegexp = self.assertRaisesRegexp
+        with self.assertRaisesRegexp(AirflowException,
+                                     "Cannot operate without ssh_hook or ssh_conn_id."):
             task_0 = SFTPOperator(
                 task_id="test_sftp",
                 local_filepath=self.test_local_filepath,
diff --git a/tests/contrib/operators/test_ssh_operator.py b/tests/contrib/operators/test_ssh_operator.py
index f2294ba..aa113ec 100644
--- a/tests/contrib/operators/test_ssh_operator.py
+++ b/tests/contrib/operators/test_ssh_operator.py
@@ -153,9 +153,9 @@ class SSHOperatorTest(TestCase):
 
         # Exception should be raised if neither ssh_hook nor ssh_conn_id is provided
         if six.PY2:
-            self.assertRaisesRegex = self.assertRaisesRegexp
-        with self.assertRaisesRegex(AirflowException,
-                                    "Cannot operate without ssh_hook or ssh_conn_id."):
+            self.assertRaisesRegexp = self.assertRaisesRegexp
+        with self.assertRaisesRegexp(AirflowException,
+                                     "Cannot operate without ssh_hook or ssh_conn_id."):
             task_0 = SSHOperator(task_id="test", command=COMMAND,
                                  timeout=TIMEOUT, dag=self.dag)
             task_0.execute(None)
diff --git a/tests/contrib/secrets/test_hashicorp_vault.py b/tests/contrib/secrets/test_hashicorp_vault.py
index 0d52c3a..1887db3 100644
--- a/tests/contrib/secrets/test_hashicorp_vault.py
+++ b/tests/contrib/secrets/test_hashicorp_vault.py
@@ -217,7 +217,7 @@ class TestVaultSecrets(TestCase):
             "token": "test_wrong_token"
         }
 
-        with six.assertRaisesRegex(self, VaultError, "Vault Authentication Error!"):
+        with six.assertRaisesRegexp(self, VaultError, "Vault Authentication Error!"):
             VaultBackend(**kwargs).get_connections(conn_id='test')
 
     @mock.patch("airflow.contrib.secrets.hashicorp_vault.hvac")
@@ -232,5 +232,5 @@ class TestVaultSecrets(TestCase):
             "url": "http://127.0.0.1:8200",
         }
 
-        with six.assertRaisesRegex(self, VaultError, "token cannot be None for auth_type='token'"):
+        with six.assertRaisesRegexp(self, VaultError, "token cannot be None for auth_type='token'"):
             VaultBackend(**kwargs).get_connections(conn_id='test')
diff --git a/tests/contrib/utils/test_gcp_credentials_provider.py b/tests/contrib/utils/test_gcp_credentials_provider.py
index 3478a42..40e180b 100644
--- a/tests/contrib/utils/test_gcp_credentials_provider.py
+++ b/tests/contrib/utils/test_gcp_credentials_provider.py
@@ -97,7 +97,7 @@ class TestGetGcpCredentialsAndProjectId(unittest.TestCase):
     def test_get_credentials_and_project_id_with_mutually_exclusive_configuration(
         self,
     ):
-        with six.assertRaisesRegex(self, AirflowException, re.escape(
+        with six.assertRaisesRegexp(self, AirflowException, re.escape(
             'The `keyfile_dict` and `key_path` fields are mutually exclusive.'
         )):
             get_credentials_and_project_id(key_path='KEY.json', keyfile_dict={'private_key': 'PRIVATE_KEY'})
diff --git a/tests/operators/test_check_operator.py b/tests/operators/test_check_operator.py
deleted file mode 100644
index 22523a4..0000000
--- a/tests/operators/test_check_operator.py
+++ /dev/null
@@ -1,327 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import six
-import unittest
-from datetime import datetime
-
-from airflow.exceptions import AirflowException
-from airflow.models import DAG
-from airflow.operators.check_operator import (
-    CheckOperator, IntervalCheckOperator, ThresholdCheckOperator, ValueCheckOperator,
-)
-from tests.compat import mock
-
-
-class TestCheckOperator(unittest.TestCase):
-
-    @mock.patch.object(CheckOperator, 'get_db_hook')
-    def test_execute_no_records(self, mock_get_db_hook):
-        mock_get_db_hook.return_value.get_first.return_value = []
-
-        with self.assertRaises(AirflowException):
-            CheckOperator(sql='sql').execute()
-
-    @mock.patch.object(CheckOperator, 'get_db_hook')
-    def test_execute_not_all_records_are_true(self, mock_get_db_hook):
-        mock_get_db_hook.return_value.get_first.return_value = ["data", ""]
-
-        with self.assertRaises(AirflowException):
-            CheckOperator(sql='sql').execute()
-
-
-class TestValueCheckOperator(unittest.TestCase):
-
-    def setUp(self):
-        self.task_id = 'test_task'
-        self.conn_id = 'default_conn'
-
-    def _construct_operator(self, sql, pass_value, tolerance=None):
-        dag = DAG('test_dag', start_date=datetime(2017, 1, 1))
-
-        return ValueCheckOperator(
-            dag=dag,
-            task_id=self.task_id,
-            conn_id=self.conn_id,
-            sql=sql,
-            pass_value=pass_value,
-            tolerance=tolerance)
-
-    def test_pass_value_template_string(self):
-        pass_value_str = "2018-03-22"
-        operator = self._construct_operator('select date from tab1;', "{{ ds }}")
-
-        operator.render_template_fields({'ds': pass_value_str})
-
-        self.assertEqual(operator.task_id, self.task_id)
-        self.assertEqual(operator.pass_value, pass_value_str)
-
-    def test_pass_value_template_string_float(self):
-        pass_value_float = 4.0
-        operator = self._construct_operator('select date from tab1;', pass_value_float)
-
-        operator.render_template_fields({})
-
-        self.assertEqual(operator.task_id, self.task_id)
-        self.assertEqual(operator.pass_value, str(pass_value_float))
-
-    @mock.patch.object(ValueCheckOperator, 'get_db_hook')
-    def test_execute_pass(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [10]
-        mock_get_db_hook.return_value = mock_hook
-        sql = 'select value from tab1 limit 1;'
-        operator = self._construct_operator(sql, 5, 1)
-
-        operator.execute(None)
-
-        mock_hook.get_first.assert_called_with(sql)
-
-    @mock.patch.object(ValueCheckOperator, 'get_db_hook')
-    def test_execute_fail(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [11]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator('select value from tab1 limit 1;', 5, 1)
-
-        with self.assertRaisesRegexp(AirflowException, 'Tolerance:100.0%'):
-            operator.execute()
-
-
-class IntervalCheckOperatorTest(unittest.TestCase):
-
-    def _construct_operator(self, table, metric_thresholds,
-                            ratio_formula, ignore_zero):
-        return IntervalCheckOperator(
-            task_id='test_task',
-            table=table,
-            metrics_thresholds=metric_thresholds,
-            ratio_formula=ratio_formula,
-            ignore_zero=ignore_zero,
-        )
-
-    def test_invalid_ratio_formula(self):
-        with self.assertRaisesRegexp(AirflowException, 'Invalid diff_method'):
-            self._construct_operator(
-                table='test_table',
-                metric_thresholds={
-                    'f1': 1,
-                },
-                ratio_formula='abs',
-                ignore_zero=False,
-            )
-
-    @mock.patch.object(IntervalCheckOperator, 'get_db_hook')
-    def test_execute_not_ignore_zero(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [0]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            table='test_table',
-            metric_thresholds={
-                'f1': 1,
-            },
-            ratio_formula='max_over_min',
-            ignore_zero=False,
-        )
-
-        with self.assertRaises(AirflowException):
-            operator.execute()
-
-    @mock.patch.object(IntervalCheckOperator, 'get_db_hook')
-    def test_execute_ignore_zero(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [0]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            table='test_table',
-            metric_thresholds={
-                'f1': 1,
-            },
-            ratio_formula='max_over_min',
-            ignore_zero=True,
-        )
-
-        operator.execute()
-
-    @mock.patch.object(IntervalCheckOperator, 'get_db_hook')
-    def test_execute_min_max(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-
-        def returned_row():
-            rows = [
-                [2, 2, 2, 2],  # reference
-                [1, 1, 1, 1],  # current
-            ]
-
-            for r in rows:
-                yield r
-
-        mock_hook.get_first.side_effect = returned_row()
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            table='test_table',
-            metric_thresholds={
-                'f0': 1.0,
-                'f1': 1.5,
-                'f2': 2.0,
-                'f3': 2.5,
-            },
-            ratio_formula='max_over_min',
-            ignore_zero=True,
-        )
-
-        with self.assertRaisesRegexp(AirflowException, "f0, f1, f2"):
-            operator.execute()
-
-    @mock.patch.object(IntervalCheckOperator, 'get_db_hook')
-    def test_execute_diff(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-
-        def returned_row():
-            rows = [
-                [3, 3, 3, 3],  # reference
-                [1, 1, 1, 1],  # current
-            ]
-
-            for r in rows:
-                yield r
-
-        mock_hook.get_first.side_effect = returned_row()
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            table='test_table',
-            metric_thresholds={
-                'f0': 0.5,
-                'f1': 0.6,
-                'f2': 0.7,
-                'f3': 0.8,
-            },
-            ratio_formula='relative_diff',
-            ignore_zero=True,
-        )
-
-        with self.assertRaisesRegexp(AirflowException, "f0, f1"):
-            operator.execute()
-
-
-class TestThresholdCheckOperator(unittest.TestCase):
-
-    def _construct_operator(self, sql, min_threshold, max_threshold):
-        dag = DAG('test_dag', start_date=datetime(2017, 1, 1))
-
-        return ThresholdCheckOperator(
-            task_id='test_task',
-            sql=sql,
-            min_threshold=min_threshold,
-            max_threshold=max_threshold,
-            dag=dag
-        )
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_pass_min_value_max_value(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [(10,)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select avg(val) from table1 limit 1',
-            1,
-            100
-        )
-
-        operator.execute()
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_fail_min_value_max_value(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [(10,)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select avg(val) from table1 limit 1',
-            20,
-            100
-        )
-
-        with six.assertRaisesRegex(self, AirflowException, '10.*20.0.*100.0'):
-            operator.execute()
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_pass_min_sql_max_sql(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select 10',
-            'Select 1',
-            'Select 100'
-        )
-
-        operator.execute()
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_fail_min_sql_max_sql(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select 10',
-            'Select 20',
-            'Select 100'
-        )
-
-        with six.assertRaisesRegex(self, AirflowException, '10.*20.*100'):
-            operator.execute()
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_pass_min_value_max_sql(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select 75',
-            45,
-            'Select 100'
-        )
-
-        operator.execute()
-
-    @mock.patch.object(ThresholdCheckOperator, 'get_db_hook')
-    def test_fail_min_sql_max_value(self, mock_get_db_hook):
-        mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
-        mock_get_db_hook.return_value = mock_hook
-
-        operator = self._construct_operator(
-            'Select 155',
-            'Select 45',
-            100
-        )
-
-        with six.assertRaisesRegex(self, AirflowException, '155.*45.*100.0'):
-            operator.execute()
diff --git a/tests/operators/test_sql_branch_operator.py b/tests/operators/test_sql.py
similarity index 57%
rename from tests/operators/test_sql_branch_operator.py
rename to tests/operators/test_sql.py
index 6510609..ca88883 100644
--- a/tests/operators/test_sql_branch_operator.py
+++ b/tests/operators/test_sql.py
@@ -24,8 +24,11 @@ import pytest
 
 from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.check_operator import (
+    CheckOperator, IntervalCheckOperator, ThresholdCheckOperator, ValueCheckOperator,
+)
 from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.sql_branch_operator import BranchSqlOperator
+from airflow.operators.sql import BranchSQLOperator
 from airflow.utils import timezone
 from airflow.utils.db import create_session
 from airflow.utils.state import State
@@ -60,6 +63,266 @@ SUPPORTED_FALSE_VALUES = [
 ]
 
 
+class TestCheckOperator(unittest.TestCase):
+    @mock.patch.object(CheckOperator, "get_db_hook")
+    def test_execute_no_records(self, mock_get_db_hook):
+        mock_get_db_hook.return_value.get_first.return_value = []
+
+        with self.assertRaises(AirflowException):
+            CheckOperator(sql="sql").execute()
+
+    @mock.patch.object(CheckOperator, "get_db_hook")
+    def test_execute_not_all_records_are_true(self, mock_get_db_hook):
+        mock_get_db_hook.return_value.get_first.return_value = ["data", ""]
+
+        with self.assertRaises(AirflowException):
+            CheckOperator(sql="sql").execute()
+
+
+class TestValueCheckOperator(unittest.TestCase):
+    def setUp(self):
+        self.task_id = "test_task"
+        self.conn_id = "default_conn"
+
+    def _construct_operator(self, sql, pass_value, tolerance=None):
+        dag = DAG("test_dag", start_date=datetime.datetime(2017, 1, 1))
+
+        return ValueCheckOperator(
+            dag=dag,
+            task_id=self.task_id,
+            conn_id=self.conn_id,
+            sql=sql,
+            pass_value=pass_value,
+            tolerance=tolerance,
+        )
+
+    def test_pass_value_template_string(self):
+        pass_value_str = "2018-03-22"
+        operator = self._construct_operator(
+            "select date from tab1;", "{{ ds }}")
+
+        operator.render_template_fields({"ds": pass_value_str})
+
+        self.assertEqual(operator.task_id, self.task_id)
+        self.assertEqual(operator.pass_value, pass_value_str)
+
+    def test_pass_value_template_string_float(self):
+        pass_value_float = 4.0
+        operator = self._construct_operator(
+            "select date from tab1;", pass_value_float)
+
+        operator.render_template_fields({})
+
+        self.assertEqual(operator.task_id, self.task_id)
+        self.assertEqual(operator.pass_value, str(pass_value_float))
+
+    @mock.patch.object(ValueCheckOperator, "get_db_hook")
+    def test_execute_pass(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [10]
+        mock_get_db_hook.return_value = mock_hook
+        sql = "select value from tab1 limit 1;"
+        operator = self._construct_operator(sql, 5, 1)
+
+        operator.execute(None)
+
+        mock_hook.get_first.assert_called_once_with(sql)
+
+    @mock.patch.object(ValueCheckOperator, "get_db_hook")
+    def test_execute_fail(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [11]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            "select value from tab1 limit 1;", 5, 1)
+
+        with self.assertRaisesRegexp(AirflowException, "Tolerance:100.0%"):
+            operator.execute()
+
+
+class TestIntervalCheckOperator(unittest.TestCase):
+    def _construct_operator(self, table, metric_thresholds, ratio_formula, ignore_zero):
+        return IntervalCheckOperator(
+            task_id="test_task",
+            table=table,
+            metrics_thresholds=metric_thresholds,
+            ratio_formula=ratio_formula,
+            ignore_zero=ignore_zero,
+        )
+
+    def test_invalid_ratio_formula(self):
+        with self.assertRaisesRegexp(AirflowException, "Invalid diff_method"):
+            self._construct_operator(
+                table="test_table",
+                metric_thresholds={"f1": 1, },
+                ratio_formula="abs",
+                ignore_zero=False,
+            )
+
+    @mock.patch.object(IntervalCheckOperator, "get_db_hook")
+    def test_execute_not_ignore_zero(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [0]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            table="test_table",
+            metric_thresholds={"f1": 1, },
+            ratio_formula="max_over_min",
+            ignore_zero=False,
+        )
+
+        with self.assertRaises(AirflowException):
+            operator.execute()
+
+    @mock.patch.object(IntervalCheckOperator, "get_db_hook")
+    def test_execute_ignore_zero(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [0]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            table="test_table",
+            metric_thresholds={"f1": 1, },
+            ratio_formula="max_over_min",
+            ignore_zero=True,
+        )
+
+        operator.execute()
+
+    @mock.patch.object(IntervalCheckOperator, "get_db_hook")
+    def test_execute_min_max(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+
+        def returned_row():
+            rows = [
+                [2, 2, 2, 2],  # reference
+                [1, 1, 1, 1],  # current
+            ]
+            return rows
+
+        mock_hook.get_first.side_effect = returned_row()
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            table="test_table",
+            metric_thresholds={"f0": 1.0, "f1": 1.5, "f2": 2.0, "f3": 2.5, },
+            ratio_formula="max_over_min",
+            ignore_zero=True,
+        )
+
+        with self.assertRaisesRegexp(AirflowException, "f0, f1, f2"):
+            operator.execute()
+
+    @mock.patch.object(IntervalCheckOperator, "get_db_hook")
+    def test_execute_diff(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+
+        def returned_row():
+            rows = [
+                [3, 3, 3, 3],  # reference
+                [1, 1, 1, 1],  # current
+            ]
+
+            return rows
+
+        mock_hook.get_first.side_effect = returned_row()
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            table="test_table",
+            metric_thresholds={"f0": 0.5, "f1": 0.6, "f2": 0.7, "f3": 0.8, },
+            ratio_formula="relative_diff",
+            ignore_zero=True,
+        )
+
+        with self.assertRaisesRegexp(AirflowException, "f0, f1"):
+            operator.execute()
+
+
+class TestThresholdCheckOperator(unittest.TestCase):
+    def _construct_operator(self, sql, min_threshold, max_threshold):
+        dag = DAG("test_dag", start_date=datetime.datetime(2017, 1, 1))
+
+        return ThresholdCheckOperator(
+            task_id="test_task",
+            sql=sql,
+            min_threshold=min_threshold,
+            max_threshold=max_threshold,
+            dag=dag,
+        )
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_pass_min_value_max_value(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [(10,)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            "Select avg(val) from table1 limit 1", 1, 100
+        )
+
+        operator.execute()
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_fail_min_value_max_value(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = [(10,)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            "Select avg(val) from table1 limit 1", 20, 100
+        )
+
+        with self.assertRaisesRegexp(AirflowException, "10.*20.0.*100.0"):
+            operator.execute()
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_pass_min_sql_max_sql(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            "Select 10", "Select 1", "Select 100")
+
+        operator.execute()
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_fail_min_sql_max_sql(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator(
+            "Select 10", "Select 20", "Select 100")
+
+        with self.assertRaisesRegexp(AirflowException, "10.*20.*100"):
+            operator.execute()
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_pass_min_value_max_sql(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator("Select 75", 45, "Select 100")
+
+        operator.execute()
+
+    @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
+    def test_fail_min_sql_max_value(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_get_db_hook.return_value = mock_hook
+
+        operator = self._construct_operator("Select 155", "Select 45", 100)
+
+        with self.assertRaisesRegexp(AirflowException, "155.*45.*100.0"):
+            operator.execute()
+
+
 class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
     """
     Test for SQL Branch Operator
@@ -92,8 +355,8 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
             session.query(TI).delete()
 
     def test_unsupported_conn_type(self):
-        """ Check if BranchSqlOperator throws an exception for unsupported connection type """
-        op = BranchSqlOperator(
+        """ Check if BranchSQLOperator throws an exception for unsupported connection type """
+        op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="redis_default",
             sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
@@ -103,11 +366,12 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
         )
 
         with self.assertRaises(AirflowException):
-            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            op.run(start_date=DEFAULT_DATE,
+                   end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_invalid_conn(self):
-        """ Check if BranchSqlOperator throws an exception for invalid connection """
-        op = BranchSqlOperator(
+        """ Check if BranchSQLOperator throws an exception for invalid connection """
+        op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="invalid_connection",
             sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
@@ -117,11 +381,12 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
         )
 
         with self.assertRaises(AirflowException):
-            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            op.run(start_date=DEFAULT_DATE,
+                   end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_invalid_follow_task_true(self):
-        """ Check if BranchSqlOperator throws an exception for invalid connection """
-        op = BranchSqlOperator(
+        """ Check if BranchSQLOperator throws an exception for invalid connection """
+        op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="invalid_connection",
             sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
@@ -131,11 +396,12 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
         )
 
         with self.assertRaises(AirflowException):
-            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            op.run(start_date=DEFAULT_DATE,
+                   end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_invalid_follow_task_false(self):
-        """ Check if BranchSqlOperator throws an exception for invalid connection """
-        op = BranchSqlOperator(
+        """ Check if BranchSQLOperator throws an exception for invalid connection """
+        op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="invalid_connection",
             sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
@@ -145,12 +411,13 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
         )
 
         with self.assertRaises(AirflowException):
-            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            op.run(start_date=DEFAULT_DATE,
+                   end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     @pytest.mark.backend("mysql")
     def test_sql_branch_operator_mysql(self):
-        """ Check if BranchSqlOperator works with backend """
-        branch_op = BranchSqlOperator(
+        """ Check if BranchSQLOperator works with backend """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -164,8 +431,8 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
 
     @pytest.mark.backend("postgres")
     def test_sql_branch_operator_postgres(self):
-        """ Check if BranchSqlOperator works with backend """
-        branch_op = BranchSqlOperator(
+        """ Check if BranchSQLOperator works with backend """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="postgres_default",
             sql="SELECT 1",
@@ -177,10 +444,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
             start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True
         )
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_branch_single_value_with_dag_run(self, mock_hook):
-        """ Check BranchSqlOperator branch operation """
-        branch_op = BranchSqlOperator(
+        """ Check BranchSQLOperator branch operation """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -220,10 +487,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
             else:
                 raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_branch_true_with_dag_run(self, mock_hook):
-        """ Check BranchSqlOperator branch operation """
-        branch_op = BranchSqlOperator(
+        """ Check BranchSQLOperator branch operation """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -264,10 +531,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
                 else:
                     raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_branch_false_with_dag_run(self, mock_hook):
-        """ Check BranchSqlOperator branch operation """
-        branch_op = BranchSqlOperator(
+        """ Check BranchSQLOperator branch operation """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -308,10 +575,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
                 else:
                     raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_branch_list_with_dag_run(self, mock_hook):
-        """ Checks if the BranchSqlOperator supports branching off to a list of tasks."""
-        branch_op = BranchSqlOperator(
+        """ Checks if the BranchSQLOperator supports branching off to a list of tasks."""
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -354,10 +621,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
             else:
                 raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_invalid_query_result_with_dag_run(self, mock_hook):
-        """ Check BranchSqlOperator branch operation """
-        branch_op = BranchSqlOperator(
+        """ Check BranchSQLOperator branch operation """
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -387,10 +654,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
         with self.assertRaises(AirflowException):
             branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_with_skip_in_branch_downstream_dependencies(self, mock_hook):
         """ Test SQL Branch with skipping all downstream dependencies """
-        branch_op = BranchSqlOperator(
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
@@ -431,10 +698,10 @@ class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
                 else:
                     raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
 
-    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    @mock.patch("airflow.operators.sql.BaseHook")
     def test_with_skip_in_branch_downstream_dependencies2(self, mock_hook):
         """ Test skipping downstream dependency for false condition"""
-        branch_op = BranchSqlOperator(
+        branch_op = BranchSQLOperator(
             task_id="make_choice",
             conn_id="mysql_default",
             sql="SELECT 1",
diff --git a/tests/www_rbac/test_validators.py b/tests/www_rbac/test_validators.py
index 4a543ff..415c53f 100644
--- a/tests/www_rbac/test_validators.py
+++ b/tests/www_rbac/test_validators.py
@@ -119,7 +119,7 @@ class TestValidJson(unittest.TestCase):
     def test_validation_raises_default_message(self):
         self.form_field_mock.data = '2017-05-04'
 
-        six.assertRaisesRegex(
+        six.assertRaisesRegexp(
             self,
             validators.ValidationError,
             "JSON Validation Error:.*",
@@ -129,7 +129,7 @@ class TestValidJson(unittest.TestCase):
     def test_validation_raises_custom_message(self):
         self.form_field_mock.data = '2017-05-04'
 
-        six.assertRaisesRegex(
+        six.assertRaisesRegexp(
             self,
             validators.ValidationError,
             "Invalid JSON",


[airflow] 02/07: Add SQL Branch Operator

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 619720b16a6517c0ce88011ca050fd23311e89b4
Author: samuelkhtu <46...@users.noreply.github.com>
AuthorDate: Mon Jun 1 14:14:13 2020 -0400

    Add SQL Branch Operator
    
    SQL Branch Operator allow user to execute a SQL query in any supported backend to decide which
    branch to follow. The SQL branch operator expect query to return True/False (Boolean) or
    0/1 (Integer) or true/y/yes/1/on/false/n/no/0/off (String).
    
    (cherry picked from commit 55b9b8f6456a7a46921b0bf7a893c7f08bf8237c)
---
 airflow/operators/sql_branch_operator.py    | 173 ++++++++++
 docs/operators-and-hooks-ref.rst            |  81 +++++
 tests/operators/test_sql_branch_operator.py | 476 ++++++++++++++++++++++++++++
 3 files changed, 730 insertions(+)

diff --git a/airflow/operators/sql_branch_operator.py b/airflow/operators/sql_branch_operator.py
new file mode 100644
index 0000000..072c40c
--- /dev/null
+++ b/airflow/operators/sql_branch_operator.py
@@ -0,0 +1,173 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from distutils.util import strtobool
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import BaseOperator, SkipMixin
+from airflow.utils.decorators import apply_defaults
+
+ALLOWED_CONN_TYPE = {
+    "google_cloud_platform",
+    "jdbc",
+    "mssql",
+    "mysql",
+    "odbc",
+    "oracle",
+    "postgres",
+    "presto",
+    "sqlite",
+    "vertica",
+}
+
+
+class BranchSqlOperator(BaseOperator, SkipMixin):
+    """
+    Executes sql code in a specific database
+
+    :param sql: the sql code to be executed. (templated)
+    :type sql: Can receive a str representing a sql statement or reference to a template file.
+               Template reference are recognized by str ending in '.sql'.
+               Expected SQL query to return Boolean (True/False), integer (0 = False, Otherwise = 1)
+               or string (true/y/yes/1/on/false/n/no/0/off).
+    :param follow_task_ids_if_true: task id or task ids to follow if query return true
+    :type follow_task_ids_if_true: str or list
+    :param follow_task_ids_if_false: task id or task ids to follow if query return true
+    :type follow_task_ids_if_false: str or list
+    :param conn_id: reference to a specific database
+    :type conn_id: str
+    :param database: name of database which overwrite defined one in connection
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :type parameters: mapping or iterable
+    """
+
+    template_fields = ("sql",)
+    template_ext = (".sql",)
+    ui_color = "#a22034"
+    ui_fgcolor = "#F7F7F7"
+
+    @apply_defaults
+    def __init__(
+            self,
+            sql,
+            follow_task_ids_if_true,
+            follow_task_ids_if_false,
+            conn_id="default_conn_id",
+            database=None,
+            parameters=None,
+            *args,
+            **kwargs):
+        super(BranchSqlOperator, self).__init__(*args, **kwargs)
+        self.conn_id = conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.follow_task_ids_if_true = follow_task_ids_if_true
+        self.follow_task_ids_if_false = follow_task_ids_if_false
+        self.database = database
+        self._hook = None
+
+    def _get_hook(self):
+        self.log.debug("Get connection for %s", self.conn_id)
+        conn = BaseHook.get_connection(self.conn_id)
+
+        if conn.conn_type not in ALLOWED_CONN_TYPE:
+            raise AirflowException(
+                "The connection type is not supported by BranchSqlOperator. "
+                + "Supported connection types: {}".format(list(ALLOWED_CONN_TYPE))
+            )
+
+        if not self._hook:
+            self._hook = conn.get_hook()
+            if self.database:
+                self._hook.schema = self.database
+
+        return self._hook
+
+    def execute(self, context):
+        # get supported hook
+        self._hook = self._get_hook()
+
+        if self._hook is None:
+            raise AirflowException(
+                "Failed to establish connection to '%s'" % self.conn_id
+            )
+
+        if self.sql is None:
+            raise AirflowException("Expected 'sql' parameter is missing.")
+
+        if self.follow_task_ids_if_true is None:
+            raise AirflowException(
+                "Expected 'follow_task_ids_if_true' paramter is missing."
+            )
+
+        if self.follow_task_ids_if_false is None:
+            raise AirflowException(
+                "Expected 'follow_task_ids_if_false' parameter is missing."
+            )
+
+        self.log.info(
+            "Executing: %s (with parameters %s) with connection: %s",
+            self.sql,
+            self.parameters,
+            self._hook,
+        )
+        record = self._hook.get_first(self.sql, self.parameters)
+        if not record:
+            raise AirflowException(
+                "No rows returned from sql query. Operator expected True or False return value."
+            )
+
+        if isinstance(record, list):
+            if isinstance(record[0], list):
+                query_result = record[0][0]
+            else:
+                query_result = record[0]
+        elif isinstance(record, tuple):
+            query_result = record[0]
+        else:
+            query_result = record
+
+        self.log.info("Query returns %s, type '%s'", query_result, type(query_result))
+
+        follow_branch = None
+        try:
+            if isinstance(query_result, bool):
+                if query_result:
+                    follow_branch = self.follow_task_ids_if_true
+            elif isinstance(query_result, str):
+                # return result is not Boolean, try to convert from String to Boolean
+                if bool(strtobool(query_result)):
+                    follow_branch = self.follow_task_ids_if_true
+            elif isinstance(query_result, int):
+                if bool(query_result):
+                    follow_branch = self.follow_task_ids_if_true
+            else:
+                raise AirflowException(
+                    "Unexpected query return result '%s' type '%s'"
+                    % (query_result, type(query_result))
+                )
+
+            if follow_branch is None:
+                follow_branch = self.follow_task_ids_if_false
+        except ValueError:
+            raise AirflowException(
+                "Unexpected query return result '%s' type '%s'"
+                % (query_result, type(query_result))
+            )
+
+        self.skip_all_except(context["ti"], follow_branch)
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 6c80858..55176f8 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -22,6 +22,87 @@ Operators and Hooks Reference
   :local:
   :depth: 1
 
+.. _fundamentals:
+
+Fundamentals
+------------
+
+**Base:**
+
+.. list-table::
+   :header-rows: 1
+
+   * - Module
+     - Guides
+
+   * - :mod:`airflow.hooks.base_hook`
+     -
+
+   * - :mod:`airflow.hooks.dbapi_hook`
+     -
+
+   * - :mod:`airflow.models.baseoperator`
+     -
+
+   * - :mod:`airflow.sensors.base_sensor_operator`
+     -
+
+**Operators:**
+
+.. list-table::
+   :header-rows: 1
+
+   * - Operators
+     - Guides
+
+   * - :mod:`airflow.operators.branch_operator`
+     -
+
+   * - :mod:`airflow.operators.check_operator`
+     -
+
+   * - :mod:`airflow.operators.dagrun_operator`
+     -
+
+   * - :mod:`airflow.operators.dummy_operator`
+     -
+
+   * - :mod:`airflow.operators.generic_transfer`
+     -
+
+   * - :mod:`airflow.operators.latest_only_operator`
+     -
+
+   * - :mod:`airflow.operators.subdag_operator`
+     -
+
+   * - :mod:`airflow.operators.sql_branch_operator`
+     -
+
+**Sensors:**
+
+.. list-table::
+   :header-rows: 1
+
+   * - Sensors
+     - Guides
+
+   * - :mod:`airflow.sensors.weekday_sensor`
+     -
+
+   * - :mod:`airflow.sensors.external_task_sensor`
+     - :doc:`How to use <howto/operator/external_task_sensor>`
+
+   * - :mod:`airflow.sensors.sql_sensor`
+     -
+
+   * - :mod:`airflow.sensors.time_delta_sensor`
+     -
+
+   * - :mod:`airflow.sensors.time_sensor`
+     -
+
+
 .. _Apache:
 
 ASF: Apache Software Foundation
diff --git a/tests/operators/test_sql_branch_operator.py b/tests/operators/test_sql_branch_operator.py
new file mode 100644
index 0000000..6510609
--- /dev/null
+++ b/tests/operators/test_sql_branch_operator.py
@@ -0,0 +1,476 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import unittest
+from tests.compat import mock
+
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.sql_branch_operator import BranchSqlOperator
+from airflow.utils import timezone
+from airflow.utils.db import create_session
+from airflow.utils.state import State
+from tests.hooks.test_hive_hook import TestHiveEnvironment
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+INTERVAL = datetime.timedelta(hours=12)
+
+SUPPORTED_TRUE_VALUES = [
+    ["True"],
+    ["true"],
+    ["1"],
+    ["on"],
+    [1],
+    True,
+    "true",
+    "1",
+    "on",
+    1,
+]
+SUPPORTED_FALSE_VALUES = [
+    ["False"],
+    ["false"],
+    ["0"],
+    ["off"],
+    [0],
+    False,
+    "false",
+    "0",
+    "off",
+    0,
+]
+
+
+class TestSqlBranch(TestHiveEnvironment, unittest.TestCase):
+    """
+    Test for SQL Branch Operator
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestSqlBranch, cls).setUpClass()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def setUp(self):
+        super(TestSqlBranch, self).setUp()
+        self.dag = DAG(
+            "sql_branch_operator_test",
+            default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
+            schedule_interval=INTERVAL,
+        )
+        self.branch_1 = DummyOperator(task_id="branch_1", dag=self.dag)
+        self.branch_2 = DummyOperator(task_id="branch_2", dag=self.dag)
+        self.branch_3 = None
+
+    def tearDown(self):
+        super(TestSqlBranch, self).tearDown()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def test_unsupported_conn_type(self):
+        """ Check if BranchSqlOperator throws an exception for unsupported connection type """
+        op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="redis_default",
+            sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+
+        with self.assertRaises(AirflowException):
+            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_invalid_conn(self):
+        """ Check if BranchSqlOperator throws an exception for invalid connection """
+        op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="invalid_connection",
+            sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+
+        with self.assertRaises(AirflowException):
+            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_invalid_follow_task_true(self):
+        """ Check if BranchSqlOperator throws an exception for invalid connection """
+        op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="invalid_connection",
+            sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
+            follow_task_ids_if_true=None,
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+
+        with self.assertRaises(AirflowException):
+            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_invalid_follow_task_false(self):
+        """ Check if BranchSqlOperator throws an exception for invalid connection """
+        op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="invalid_connection",
+            sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false=None,
+            dag=self.dag,
+        )
+
+        with self.assertRaises(AirflowException):
+            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    @pytest.mark.backend("mysql")
+    def test_sql_branch_operator_mysql(self):
+        """ Check if BranchSqlOperator works with backend """
+        branch_op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="mysql_default",
+            sql="SELECT 1",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+        branch_op.run(
+            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True
+        )
+
+    @pytest.mark.backend("postgres")
+    def test_sql_branch_operator_postgres(self):
+        """ Check if BranchSqlOperator works with backend """
+        branch_op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="postgres_default",
+            sql="SELECT 1",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+        branch_op.run(
+            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True
+        )
+
+    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    def test_branch_single_value_with_dag_run(self, mock_hook):
+        """ Check BranchSqlOperator branch operation """
+        branch_op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="mysql_default",
+            sql="SELECT 1",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+
+        self.branch_1.set_upstream(branch_op)
+        self.branch_2.set_upstream(branch_op)
+        self.dag.clear()
+
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        mock_hook.get_connection("mysql_default").conn_type = "mysql"
+        mock_get_records = (
+            mock_hook.get_connection.return_value.get_hook.return_value.get_first
+        )
+
+        mock_get_records.return_value = 1
+
+        branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        tis = dr.get_task_instances()
+        for ti in tis:
+            if ti.task_id == "make_choice":
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == "branch_1":
+                self.assertEqual(ti.state, State.NONE)
+            elif ti.task_id == "branch_2":
+                self.assertEqual(ti.state, State.SKIPPED)
+            else:
+                raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
+
+    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    def test_branch_true_with_dag_run(self, mock_hook):
+        """ Check BranchSqlOperator branch operation """
+        branch_op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="mysql_default",
+            sql="SELECT 1",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+
+        self.branch_1.set_upstream(branch_op)
+        self.branch_2.set_upstream(branch_op)
+        self.dag.clear()
+
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        mock_hook.get_connection("mysql_default").conn_type = "mysql"
+        mock_get_records = (
+            mock_hook.get_connection.return_value.get_hook.return_value.get_first
+        )
+
+        for true_value in SUPPORTED_TRUE_VALUES:
+            mock_get_records.return_value = true_value
+
+            branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+            tis = dr.get_task_instances()
+            for ti in tis:
+                if ti.task_id == "make_choice":
+                    self.assertEqual(ti.state, State.SUCCESS)
+                elif ti.task_id == "branch_1":
+                    self.assertEqual(ti.state, State.NONE)
+                elif ti.task_id == "branch_2":
+                    self.assertEqual(ti.state, State.SKIPPED)
+                else:
+                    raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
+
+    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    def test_branch_false_with_dag_run(self, mock_hook):
+        """ Check BranchSqlOperator branch operation """
+        branch_op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="mysql_default",
+            sql="SELECT 1",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+
+        self.branch_1.set_upstream(branch_op)
+        self.branch_2.set_upstream(branch_op)
+        self.dag.clear()
+
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        mock_hook.get_connection("mysql_default").conn_type = "mysql"
+        mock_get_records = (
+            mock_hook.get_connection.return_value.get_hook.return_value.get_first
+        )
+
+        for false_value in SUPPORTED_FALSE_VALUES:
+            mock_get_records.return_value = false_value
+
+            branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+            tis = dr.get_task_instances()
+            for ti in tis:
+                if ti.task_id == "make_choice":
+                    self.assertEqual(ti.state, State.SUCCESS)
+                elif ti.task_id == "branch_1":
+                    self.assertEqual(ti.state, State.SKIPPED)
+                elif ti.task_id == "branch_2":
+                    self.assertEqual(ti.state, State.NONE)
+                else:
+                    raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
+
+    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    def test_branch_list_with_dag_run(self, mock_hook):
+        """ Checks if the BranchSqlOperator supports branching off to a list of tasks."""
+        branch_op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="mysql_default",
+            sql="SELECT 1",
+            follow_task_ids_if_true=["branch_1", "branch_2"],
+            follow_task_ids_if_false="branch_3",
+            dag=self.dag,
+        )
+
+        self.branch_1.set_upstream(branch_op)
+        self.branch_2.set_upstream(branch_op)
+        self.branch_3 = DummyOperator(task_id="branch_3", dag=self.dag)
+        self.branch_3.set_upstream(branch_op)
+        self.dag.clear()
+
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        mock_hook.get_connection("mysql_default").conn_type = "mysql"
+        mock_get_records = (
+            mock_hook.get_connection.return_value.get_hook.return_value.get_first
+        )
+        mock_get_records.return_value = [["1"]]
+
+        branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        tis = dr.get_task_instances()
+        for ti in tis:
+            if ti.task_id == "make_choice":
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == "branch_1":
+                self.assertEqual(ti.state, State.NONE)
+            elif ti.task_id == "branch_2":
+                self.assertEqual(ti.state, State.NONE)
+            elif ti.task_id == "branch_3":
+                self.assertEqual(ti.state, State.SKIPPED)
+            else:
+                raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
+
+    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    def test_invalid_query_result_with_dag_run(self, mock_hook):
+        """ Check BranchSqlOperator branch operation """
+        branch_op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="mysql_default",
+            sql="SELECT 1",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+
+        self.branch_1.set_upstream(branch_op)
+        self.branch_2.set_upstream(branch_op)
+        self.dag.clear()
+
+        self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        mock_hook.get_connection("mysql_default").conn_type = "mysql"
+        mock_get_records = (
+            mock_hook.get_connection.return_value.get_hook.return_value.get_first
+        )
+
+        mock_get_records.return_value = ["Invalid Value"]
+
+        with self.assertRaises(AirflowException):
+            branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    def test_with_skip_in_branch_downstream_dependencies(self, mock_hook):
+        """ Test SQL Branch with skipping all downstream dependencies """
+        branch_op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="mysql_default",
+            sql="SELECT 1",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+
+        branch_op >> self.branch_1 >> self.branch_2
+        branch_op >> self.branch_2
+        self.dag.clear()
+
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        mock_hook.get_connection("mysql_default").conn_type = "mysql"
+        mock_get_records = (
+            mock_hook.get_connection.return_value.get_hook.return_value.get_first
+        )
+
+        for true_value in SUPPORTED_TRUE_VALUES:
+            mock_get_records.return_value = [true_value]
+
+            branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+            tis = dr.get_task_instances()
+            for ti in tis:
+                if ti.task_id == "make_choice":
+                    self.assertEqual(ti.state, State.SUCCESS)
+                elif ti.task_id == "branch_1":
+                    self.assertEqual(ti.state, State.NONE)
+                elif ti.task_id == "branch_2":
+                    self.assertEqual(ti.state, State.NONE)
+                else:
+                    raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))
+
+    @mock.patch("airflow.operators.sql_branch_operator.BaseHook")
+    def test_with_skip_in_branch_downstream_dependencies2(self, mock_hook):
+        """ Test skipping downstream dependency for false condition"""
+        branch_op = BranchSqlOperator(
+            task_id="make_choice",
+            conn_id="mysql_default",
+            sql="SELECT 1",
+            follow_task_ids_if_true="branch_1",
+            follow_task_ids_if_false="branch_2",
+            dag=self.dag,
+        )
+
+        branch_op >> self.branch_1 >> self.branch_2
+        branch_op >> self.branch_2
+        self.dag.clear()
+
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        mock_hook.get_connection("mysql_default").conn_type = "mysql"
+        mock_get_records = (
+            mock_hook.get_connection.return_value.get_hook.return_value.get_first
+        )
+
+        for false_value in SUPPORTED_FALSE_VALUES:
+            mock_get_records.return_value = [false_value]
+
+            branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+            tis = dr.get_task_instances()
+            for ti in tis:
+                if ti.task_id == "make_choice":
+                    self.assertEqual(ti.state, State.SUCCESS)
+                elif ti.task_id == "branch_1":
+                    self.assertEqual(ti.state, State.SKIPPED)
+                elif ti.task_id == "branch_2":
+                    self.assertEqual(ti.state, State.NONE)
+                else:
+                    raise ValueError("Invalid task id {task_id} found!".format(task_id=ti.task_id))


[airflow] 06/07: Fix retries causing constraint violation on MySQL with DAG Serialization (#9336)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c7117cc7b54de50f829d9f35cdd5e36b820d6eb4
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Wed Jun 17 11:22:13 2020 +0100

    Fix retries causing constraint violation on MySQL with DAG Serialization (#9336)
    
    The issue was caused because the `rendered_task_instance_fields` table did not have precision and hence causing `_mysql_exceptions.IntegrityError`.
    
    closes https://github.com/apache/airflow/issues/9148
    
    (cherry picked from commit 9e6b5abea08eb35c48839e33a4518f292ef7564e)
---
 ...eea_add_precision_to_execution_date_in_mysql.py |  61 ++++++++++++
 tests/models/test_taskinstance.py                  | 106 +++++++++++----------
 2 files changed, 118 insertions(+), 49 deletions(-)

diff --git a/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
new file mode 100644
index 0000000..ecb589d
--- /dev/null
+++ b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add Precision to execution_date in RenderedTaskInstanceFields table
+
+Revision ID: a66efa278eea
+Revises: 8f966b9c467a
+Create Date: 2020-06-16 21:44:02.883132
+
+"""
+
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = 'a66efa278eea'
+down_revision = '8f966b9c467a'
+branch_labels = None
+depends_on = None
+
+TABLE_NAME = 'rendered_task_instance_fields'
+COLUMN_NAME = 'execution_date'
+
+
+def upgrade():
+    """Add Precision to execution_date in RenderedTaskInstanceFields table for MySQL"""
+    conn = op.get_bind()
+    if conn.dialect.name == "mysql":
+        op.alter_column(
+            table_name=TABLE_NAME,
+            column_name=COLUMN_NAME,
+            type_=mysql.TIMESTAMP(fsp=6),
+            nullable=False
+        )
+
+
+def downgrade():
+    """Unapply Add Precision to execution_date in RenderedTaskInstanceFields table"""
+    conn = op.get_bind()
+    if conn.dialect.name == "mysql":
+        op.alter_column(
+            table_name=TABLE_NAME,
+            column_name=COLUMN_NAME,
+            type_=mysql.TIMESTAMP(),
+            nullable=False
+        )
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 9bce7d5..4534a07 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -53,6 +53,7 @@ class TaskInstanceTest(unittest.TestCase):
 
     def setUp(self):
         db.clear_db_pools()
+        db.clear_rendered_ti_fields()
         with create_session() as session:
             test_pool = Pool(pool='test_pool', slots=1)
             session.add(test_pool)
@@ -60,6 +61,7 @@ class TaskInstanceTest(unittest.TestCase):
 
     def tearDown(self):
         db.clear_db_pools()
+        db.clear_rendered_ti_fields()
         with create_session() as session:
             session.query(TaskFail).delete()
             session.query(TaskReschedule).delete()
@@ -463,58 +465,64 @@ class TaskInstanceTest(unittest.TestCase):
         run_with_error(ti)
         self.assertEqual(ti.state, State.FAILED)
 
-    def test_retry_handling(self):
+    @parameterized.expand([
+        (False, None,),
+        (True, {'env': None, 'bash_command': 'echo test_retry_handling; exit 1'},),
+    ])
+    def test_retry_handling(self, dag_serialization, expected_rendered_ti_fields):
         """
         Test that task retries are handled properly
         """
-        dag = models.DAG(dag_id='test_retry_handling')
-        task = BashOperator(
-            task_id='test_retry_handling_op',
-            bash_command='exit 1',
-            retries=1,
-            retry_delay=datetime.timedelta(seconds=0),
-            dag=dag,
-            owner='test_pool',
-            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
-
-        def run_with_error(ti):
-            try:
-                ti.run()
-            except AirflowException:
-                pass
-
-        ti = TI(
-            task=task, execution_date=timezone.utcnow())
-        self.assertEqual(ti.try_number, 1)
-
-        # first run -- up for retry
-        run_with_error(ti)
-        self.assertEqual(ti.state, State.UP_FOR_RETRY)
-        self.assertEqual(ti._try_number, 1)
-        self.assertEqual(ti.try_number, 2)
-
-        # second run -- fail
-        run_with_error(ti)
-        self.assertEqual(ti.state, State.FAILED)
-        self.assertEqual(ti._try_number, 2)
-        self.assertEqual(ti.try_number, 3)
-
-        # Clear the TI state since you can't run a task with a FAILED state without
-        # clearing it first
-        dag.clear()
-
-        # third run -- up for retry
-        run_with_error(ti)
-        self.assertEqual(ti.state, State.UP_FOR_RETRY)
-        self.assertEqual(ti._try_number, 3)
-        self.assertEqual(ti.try_number, 4)
-
-        # fourth run -- fail
-        run_with_error(ti)
-        ti.refresh_from_db()
-        self.assertEqual(ti.state, State.FAILED)
-        self.assertEqual(ti._try_number, 4)
-        self.assertEqual(ti.try_number, 5)
+        with patch("airflow.models.taskinstance.STORE_SERIALIZED_DAGS", dag_serialization):
+            dag = models.DAG(dag_id='test_retry_handling')
+            task = BashOperator(
+                task_id='test_retry_handling_op',
+                bash_command='echo {{dag.dag_id}}; exit 1',
+                retries=1,
+                retry_delay=datetime.timedelta(seconds=0),
+                dag=dag,
+                owner='test_pool',
+                start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
+
+            def run_with_error(ti):
+                try:
+                    ti.run()
+                except AirflowException:
+                    pass
+
+            ti = TI(
+                task=task, execution_date=timezone.utcnow())
+            self.assertEqual(ti.try_number, 1)
+
+            # first run -- up for retry
+            run_with_error(ti)
+            self.assertEqual(ti.state, State.UP_FOR_RETRY)
+            self.assertEqual(ti._try_number, 1)
+            self.assertEqual(ti.try_number, 2)
+
+            # second run -- fail
+            run_with_error(ti)
+            self.assertEqual(ti.state, State.FAILED)
+            self.assertEqual(ti._try_number, 2)
+            self.assertEqual(ti.try_number, 3)
+
+            # Clear the TI state since you can't run a task with a FAILED state without
+            # clearing it first
+            dag.clear()
+
+            # third run -- up for retry
+            run_with_error(ti)
+            self.assertEqual(ti.state, State.UP_FOR_RETRY)
+            self.assertEqual(ti._try_number, 3)
+            self.assertEqual(ti.try_number, 4)
+
+            # fourth run -- fail
+            run_with_error(ti)
+            ti.refresh_from_db()
+            self.assertEqual(ti.state, State.FAILED)
+            self.assertEqual(ti._try_number, 4)
+            self.assertEqual(ti.try_number, 5)
+            self.assertEqual(RenderedTaskInstanceFields.get_templated_fields(ti), expected_rendered_ti_fields)
 
     def test_next_retry_datetime(self):
         delay = datetime.timedelta(seconds=30)


[airflow] 05/07: Add 'main' param to template_fields in DataprocSubmitPySparkJobOperator (#9154)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ba39830d83f9279c98a9a7051f6c2658d0deea40
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jun 5 15:39:42 2020 +0100

    Add 'main' param to template_fields in DataprocSubmitPySparkJobOperator (#9154)
    
    (cherry picked from commit 9bcdadaf7e6e73d3d2246fbbd32a9f30a1b43ca9)
---
 airflow/contrib/operators/dataproc_operator.py    | 4 ++--
 tests/contrib/operators/test_dataproc_operator.py | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index a5e126b..ff07be5 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -1057,7 +1057,7 @@ class DataProcPySparkOperator(DataProcJobBaseOperator):
     Start a PySpark Job on a Cloud DataProc cluster.
 
     :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main
-            Python file to use as the driver. Must be a .py file.
+            Python file to use as the driver. Must be a .py file. (templated)
     :type main: str
     :param arguments: Arguments for the job. (templated)
     :type arguments: list
@@ -1077,7 +1077,7 @@ class DataProcPySparkOperator(DataProcJobBaseOperator):
     :type dataproc_pyspark_jars: list
     """
 
-    template_fields = ['arguments', 'job_name', 'cluster_name',
+    template_fields = ['main', 'arguments', 'job_name', 'cluster_name',
                        'region', 'dataproc_jars', 'dataproc_properties']
     ui_color = '#0273d4'
     job_type = 'pysparkJob'
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index d2beeb3..7e21ffd 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -1112,7 +1112,7 @@ class DataProcPySparkOperatorTest(unittest.TestCase):
         )
 
         self.assertEqual(
-            task.template_fields, ['arguments', 'job_name', 'cluster_name',
+            task.template_fields, ['main', 'arguments', 'job_name', 'cluster_name',
                                    'region', 'dataproc_jars', 'dataproc_properties'])
 
         ti = TaskInstance(task, DEFAULT_DATE)