You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/12 00:08:13 UTC

[GitHub] elizabethhalper closed pull request #4491: [AIRFLOW-3284] Azure Batch AI Operator (#18)

elizabethhalper closed pull request #4491: [AIRFLOW-3284] Azure Batch AI Operator (#18)
URL: https://github.com/apache/airflow/pull/4491
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index f7f69ac0b3..0a3e7016a7 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -28,6 +28,7 @@ Make sure you have checked _all_ steps below.
 
 - [ ] In case of new functionality, my PR adds documentation that describes how to use it.
   - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added.
+  - All the public functions and the classes in the PR contain docstrings that explain what it does
 
 ### Code Quality
 
diff --git a/airflow/contrib/example_dags/example_azure_batchai_dag.py b/airflow/contrib/example_dags/example_azure_batchai_dag.py
new file mode 100644
index 0000000000..f228a4b3a7
--- /dev/null
+++ b/airflow/contrib/example_dags/example_azure_batchai_dag.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This is an example DAG to highlight usage of the AzureBatchAIOperator to run a
+Batch AI Cluster on Azure
+"""
+
+import airflow
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.contrib.operators.azure_batchai_operator import AzureBatchAIOperator
+from airflow.models import DAG
+
+
+args = {
+    'owner': 'airflow',
+    'start_date': airflow.utils.dates.days_ago(2)
+}
+
+dag = DAG(
+    dag_id='example_azure_batchai_operator',
+    default_args=args,
+    schedule_interval="@daily")
+
+batch_ai_node = AzureBatchAIOperator(
+    bai_conn_id='azure_batchai_default',
+    resource_group='batch-ai-test-rg',
+    workspace_name='batch-ai-workspace-name',
+    cluster_name='batch-ai-cluster-name',
+    location='WestUS2',
+    environment_variables={},
+    volumes=[],
+    task_id='run_this_first',
+    dag=dag)
+
+dummy = DummyOperator(
+    task_id='join',
+    dag=dag
+)
+
+batch_ai_node >> dummy
diff --git a/airflow/contrib/example_dags/example_cosmosdb_sensor.py b/airflow/contrib/example_dags/example_cosmosdb_sensor.py
new file mode 100644
index 0000000000..a801d9f41b
--- /dev/null
+++ b/airflow/contrib/example_dags/example_cosmosdb_sensor.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This is only an example DAG to highlight usage of AzureCosmosDocumentSensor to detect
+if a document now exists.
+
+You can trigger this manually with `airflow trigger_dag example_cosmosdb_sensor`.
+
+*Note: Make sure that connection `azure_cosmos_default` is properly set before running
+this example.*
+"""
+
+from airflow import DAG
+from airflow.contrib.sensors.azure_cosmos_sensor import AzureCosmosDocumentSensor
+from airflow.contrib.operators.azure_cosmos_insertdocument_operator import AzureCosmosInsertDocumentOperator
+from airflow.utils import dates
+
+default_args = {
+    'owner': 'airflow',
+    'depends_on_past': False,
+    'start_date': dates.days_ago(2),
+    'email': ['airflow@example.com'],
+    'email_on_failure': False,
+    'email_on_retry': False
+}
+
+dag = DAG('example_cosmosdb_sensor', default_args=default_args)
+
+dag.doc_md = __doc__
+
+t1 = AzureCosmosDocumentSensor(
+    task_id='check_cosmos_file',
+    database_name='airflow_example_db',
+    collection_name='airflow_example_coll',
+    document_id='airflow_checkid',
+    azure_cosmos_conn_id='azure_cosmos_default',
+    dag=dag)
+
+t2 = AzureCosmosInsertDocumentOperator(
+    task_id='insert_cosmos_file',
+    dag=dag,
+    database_name='airflow_example_db',
+    collection_name='new-collection',
+    document={"id": "someuniqueid", "param1": "value1", "param2": "value2"},
+    azure_cosmos_conn_id='azure_cosmos_default')
+
+t2.set_upstream(t1)
diff --git a/airflow/contrib/example_dags/example_gcp_sql.py b/airflow/contrib/example_dags/example_gcp_sql.py
index 45c0895b0f..c6838a2baf 100644
--- a/airflow/contrib/example_dags/example_gcp_sql.py
+++ b/airflow/contrib/example_dags/example_gcp_sql.py
@@ -29,22 +29,32 @@
 """
 
 import os
-import datetime
+
+import re
 
 import airflow
 from airflow import models
-
 from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
     CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \
     CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \
-    CloudSqlInstanceDatabaseDeleteOperator
+    CloudSqlInstanceDatabaseDeleteOperator, CloudSqlInstanceExportOperator, \
+    CloudSqlInstanceImportOperator
+from airflow.contrib.operators.gcs_acl_operator import \
+    GoogleCloudStorageBucketCreateAclEntryOperator, \
+    GoogleCloudStorageObjectCreateAclEntryOperator
 
 # [START howto_operator_cloudsql_arguments]
 PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testpostgres')
+INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'test-mysql')
+INSTANCE_NAME2 = os.environ.get('INSTANCE_NAME2', 'test-mysql2')
 DB_NAME = os.environ.get('DB_NAME', 'testdb')
 # [END howto_operator_cloudsql_arguments]
 
+# [START howto_operator_cloudsql_export_import_arguments]
+EXPORT_URI = os.environ.get('EXPORT_URI', 'gs://bucketName/fileName')
+IMPORT_URI = os.environ.get('IMPORT_URI', 'gs://bucketName/fileName')
+# [END howto_operator_cloudsql_export_import_arguments]
+
 # Bodies below represent Cloud SQL instance resources:
 # https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances
 
@@ -86,6 +96,16 @@
     "region": "europe-west4",
 }
 # [END howto_operator_cloudsql_create_body]
+
+body2 = {
+    "name": INSTANCE_NAME2,
+    "settings": {
+        "tier": "db-n1-standard-1",
+    },
+    "databaseVersion": "MYSQL_5_7",
+    "region": "europe-west4",
+}
+
 # [START howto_operator_cloudsql_patch_body]
 patch_body = {
     "name": INSTANCE_NAME,
@@ -102,6 +122,25 @@
     }
 }
 # [END howto_operator_cloudsql_patch_body]
+# [START howto_operator_cloudsql_export_body]
+export_body = {
+    "exportContext": {
+        "fileType": "sql",
+        "uri": EXPORT_URI,
+        "sqlExportOptions": {
+            "schemaOnly": False
+        }
+    }
+}
+# [END howto_operator_cloudsql_export_body]
+# [START howto_operator_cloudsql_import_body]
+import_body = {
+    "importContext": {
+        "fileType": "sql",
+        "uri": IMPORT_URI
+    }
+}
+# [END howto_operator_cloudsql_import_body]
 # [START howto_operator_cloudsql_db_create_body]
 db_create_body = {
     "instance": INSTANCE_NAME,
@@ -123,16 +162,40 @@
 with models.DAG(
     'example_gcp_sql',
     default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
+    schedule_interval=None
 ) as dag:
+    prev_task = None
+
+    def next_dep(task, prev):
+        prev >> task
+        return task
+
+    # ############################################## #
+    # ### INSTANCES SET UP ######################### #
+    # ############################################## #
+
     # [START howto_operator_cloudsql_create]
-    sql_instance_create_task = CloudSqlInstanceCreateOperator(
+    sql_instance_create = CloudSqlInstanceCreateOperator(
         project_id=PROJECT_ID,
         body=body,
         instance=INSTANCE_NAME,
-        task_id='sql_instance_create_task'
+        task_id='sql_instance_create'
     )
     # [END howto_operator_cloudsql_create]
+    prev_task = sql_instance_create
+
+    sql_instance_create_2 = CloudSqlInstanceCreateOperator(
+        project_id=PROJECT_ID,
+        body=body2,
+        instance=INSTANCE_NAME2,
+        task_id='sql_instance_create_2'
+    )
+    prev_task = next_dep(sql_instance_create_2, prev_task)
+
+    # ############################################## #
+    # ### MODIFYING INSTANCE AND ITS DATABASE ###### #
+    # ############################################## #
+
     # [START howto_operator_cloudsql_patch]
     sql_instance_patch_task = CloudSqlInstancePatchOperator(
         project_id=PROJECT_ID,
@@ -141,6 +204,8 @@
         task_id='sql_instance_patch_task'
     )
     # [END howto_operator_cloudsql_patch]
+    prev_task = next_dep(sql_instance_patch_task, prev_task)
+
     # [START howto_operator_cloudsql_db_create]
     sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
         project_id=PROJECT_ID,
@@ -149,6 +214,8 @@
         task_id='sql_db_create_task'
     )
     # [END howto_operator_cloudsql_db_create]
+    prev_task = next_dep(sql_db_create_task, prev_task)
+
     # [START howto_operator_cloudsql_db_patch]
     sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
         project_id=PROJECT_ID,
@@ -158,6 +225,65 @@
         task_id='sql_db_patch_task'
     )
     # [END howto_operator_cloudsql_db_patch]
+    prev_task = next_dep(sql_db_patch_task, prev_task)
+
+    # ############################################## #
+    # ### EXPORTING SQL FROM INSTANCE 1 ############ #
+    # ############################################## #
+
+    # For export to work we need to add the Cloud SQL instance's Service Account
+    # write access to the destination GCS bucket.
+    # [START howto_operator_cloudsql_export_gcs_permissions]
+    sql_gcp_add_bucket_permission = GoogleCloudStorageBucketCreateAclEntryOperator(
+        entity="user-{{ task_instance.xcom_pull('sql_instance_create', key='service_account_email') }}",
+        role="WRITER",
+        bucket=re.match(r'gs:\/\/(\S*)\/', EXPORT_URI).group(1),
+        task_id='sql_gcp_add_bucket_permission'
+    )
+    # [END howto_operator_cloudsql_export_gcs_permissions]
+    prev_task = next_dep(sql_gcp_add_bucket_permission, prev_task)
+
+    # [START howto_operator_cloudsql_export]
+    sql_export_task = CloudSqlInstanceExportOperator(
+        project_id=PROJECT_ID,
+        body=export_body,
+        instance=INSTANCE_NAME,
+        task_id='sql_export_task'
+    )
+    # [END howto_operator_cloudsql_export]
+    prev_task = next_dep(sql_export_task, prev_task)
+
+    # ############################################## #
+    # ### IMPORTING SQL TO INSTANCE 2 ############## #
+    # ############################################## #
+
+    # For import to work we need to add the Cloud SQL instance's Service Account
+    # read access to the target GCS object.
+    # [START howto_operator_cloudsql_import_gcs_permissions]
+    sql_gcp_add_object_permission = GoogleCloudStorageObjectCreateAclEntryOperator(
+        entity="user-{{ task_instance.xcom_pull('sql_instance_create_2', key='service_account_email') }}",
+        role="READER",
+        bucket=re.match(r'gs:\/\/(\S*)\/', IMPORT_URI).group(1),
+        object_name=re.match(r'gs:\/\/[^\/]*\/(\S*)', IMPORT_URI).group(1),
+        task_id='sql_gcp_add_object_permission',
+    )
+    # [END howto_operator_cloudsql_import_gcs_permissions]
+    prev_task = next_dep(sql_gcp_add_object_permission, prev_task)
+
+    # [START howto_operator_cloudsql_import]
+    sql_import_task = CloudSqlInstanceImportOperator(
+        project_id=PROJECT_ID,
+        body=import_body,
+        instance=INSTANCE_NAME2,
+        task_id='sql_import_task'
+    )
+    # [END howto_operator_cloudsql_import]
+    prev_task = next_dep(sql_import_task, prev_task)
+
+    # ############################################## #
+    # ### DELETING A DATABASE FROM AN INSTANCE ##### #
+    # ############################################## #
+
     # [START howto_operator_cloudsql_db_delete]
     sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
         project_id=PROJECT_ID,
@@ -166,6 +292,12 @@
         task_id='sql_db_delete_task'
     )
     # [END howto_operator_cloudsql_db_delete]
+    prev_task = next_dep(sql_db_delete_task, prev_task)
+
+    # ############################################## #
+    # ### INSTANCES TEAR DOWN ###################### #
+    # ############################################## #
+
     # [START howto_operator_cloudsql_delete]
     sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
         project_id=PROJECT_ID,
@@ -173,7 +305,11 @@
         task_id='sql_instance_delete_task'
     )
     # [END howto_operator_cloudsql_delete]
+    prev_task = next_dep(sql_instance_delete_task, prev_task)
 
-    sql_instance_create_task >> sql_instance_patch_task \
-        >> sql_db_create_task >> sql_db_patch_task \
-        >> sql_db_delete_task >> sql_instance_delete_task
+    sql_instance_delete_task_2 = CloudSqlInstanceDeleteOperator(
+        project_id=PROJECT_ID,
+        instance=INSTANCE_NAME2,
+        task_id='sql_instance_delete_task_2'
+    )
+    prev_task = next_dep(sql_instance_delete_task_2, prev_task)
diff --git a/airflow/contrib/hooks/azure_batchai_hook.py b/airflow/contrib/hooks/azure_batchai_hook.py
new file mode 100644
index 0000000000..011df5eb01
--- /dev/null
+++ b/airflow/contrib/hooks/azure_batchai_hook.py
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowException
+
+from azure.common.client_factory import get_client_from_auth_file
+from azure.common.credentials import ServicePrincipalCredentials
+
+from azure.mgmt.batchai import BatchAIManagementClient
+
+
+class AzureBatchAIHook(BaseHook):
+    """
+    Interact with Azure Batch AI
+
+    :param azure_batchai_conn_id: Reference to the Azure Batch AI connection
+    :type azure_batchai_conn_id: str
+    :param config_data: JSON Object with credential and subscription information
+    :type config_data: str
+    """
+
+    def __init__(self, azure_batchai_conn_id='azure_batchai_default', config_data=None):
+        self.conn_id = azure_batchai_conn_id
+        self.connection = self.get_conn()
+        self.configData = config_data
+        self.credentials = None
+        self.subscription_id = None
+
+    def get_conn(self):
+        try:
+            conn = self.get_connection(self.conn_id)
+            key_path = conn.extra_dejson.get('key_path', False)
+            if key_path:
+                if key_path.endswith('.json'):
+                    self.log.info('Getting connection using a JSON key file.')
+                    return get_client_from_auth_file(BatchAIManagementClient,
+                                                     key_path)
+                else:
+                    raise AirflowException('Unrecognised extension for key file.')
+
+            elif os.environ.get('AZURE_AUTH_LOCATION'):
+                key_path = os.environ.get('AZURE_AUTH_LOCATION')
+                if key_path.endswith('.json'):
+                    self.log.info('Getting connection using a JSON key file.')
+                    return get_client_from_auth_file(BatchAIManagementClient,
+                                                     key_path)
+                else:
+                    raise AirflowException('Unrecognised extension for key file.')
+
+            self.credentials = ServicePrincipalCredentials(
+                client_id=self.configData['clientId'],
+                secret=self.configData['clientSecret'],
+                tenant=self.configData['tenantId']
+            )
+        except AirflowException:
+            # No connection found
+            pass
+
+        return BatchAIManagementClient(self.credentials, self.configData['subscriptionId'])
+
+    def create(self, resource_group, workspace_name, cluster_name, location, parameters):
+        self.log.info("creating workspace.....")
+        self.connection.workspaces._create_initial(resource_group,
+                                                   workspace_name,
+                                                   location)
+
+        self.log.info("creating cluster.....")
+        self.connection.clusters._create_initial(resource_group,
+                                                 workspace_name,
+                                                 cluster_name,
+                                                 parameters)
+
+    def update(self, resource_group, workspace_name, cluster_name):
+        self.log.info("updating cluster.....")
+        self.connection.clusters.update(resource_group,
+                                        workspace_name,
+                                        cluster_name)
+
+    def get_state_exitcode(self, resource_group, workspace_name, cluster_name):
+        response = self.connection.clusters.get(resource_group,
+                                                workspace_name,
+                                                cluster_name,
+                                                raw=True).response.json()
+        current_state = response['properties']['provisioningState']
+        return current_state
+
+    def get_messages(self, resource_group, workspace_name, cluster_name):
+        response = self.connection.clusters.get(resource_group,
+                                                workspace_name,
+                                                cluster_name,
+                                                raw=True).response.json()
+        cluster = response['properties']['cluster']
+        instance_view = cluster[0]['properties'].get('instanceView', {})
+        return [event['message'] for event in instance_view.get('events', [])]
+
+    def delete(self, resource_group, workspace_name, cluster_name):
+        self.connection.clusters.delete(resource_group, workspace_name, cluster_name)
diff --git a/airflow/contrib/hooks/azure_cosmos_hook.py b/airflow/contrib/hooks/azure_cosmos_hook.py
new file mode 100644
index 0000000000..01b4007b03
--- /dev/null
+++ b/airflow/contrib/hooks/azure_cosmos_hook.py
@@ -0,0 +1,287 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import azure.cosmos.cosmos_client as cosmos_client
+from azure.cosmos.errors import HTTPFailure
+import uuid
+
+from airflow.exceptions import AirflowBadRequest
+from airflow.hooks.base_hook import BaseHook
+
+
+class AzureCosmosDBHook(BaseHook):
+    """
+    Interacts with Azure CosmosDB.
+
+    login should be the endpoint uri, password should be the master key
+    optionally, you can use the following extras to default these values
+    {"database_name": "<DATABASE_NAME>", "collection_name": "COLLECTION_NAME"}.
+
+    :param azure_cosmos_conn_id: Reference to the Azure CosmosDB connection.
+    :type azure_cosmos_conn_id: str
+    """
+
+    def __init__(self, azure_cosmos_conn_id='azure_cosmos_default'):
+        self.conn_id = azure_cosmos_conn_id
+        self.connection = self.get_connection(self.conn_id)
+        self.extras = self.connection.extra_dejson
+
+        self.endpoint_uri = self.connection.login
+        self.master_key = self.connection.password
+        self.default_database_name = self.extras.get('database_name')
+        self.default_collection_name = self.extras.get('collection_name')
+        self.cosmos_client = None
+
+    def get_conn(self):
+        """
+        Return a cosmos db client.
+        """
+        if self.cosmos_client is not None:
+            return self.cosmos_client
+
+        # Initialize the Python Azure Cosmos DB client
+        self.cosmos_client = cosmos_client.CosmosClient(self.endpoint_uri, {'masterKey': self.master_key})
+
+        return self.cosmos_client
+
+    def __get_database_name(self, database_name=None):
+        db_name = database_name
+        if db_name is None:
+            db_name = self.default_database_name
+
+        if db_name is None:
+            raise AirflowBadRequest("Database name must be specified")
+
+        return db_name
+
+    def __get_collection_name(self, collection_name=None):
+        coll_name = collection_name
+        if coll_name is None:
+            coll_name = self.default_collection_name
+
+        if coll_name is None:
+            raise AirflowBadRequest("Collection name must be specified")
+
+        return coll_name
+
+    def does_collection_exist(self, collection_name, database_name=None):
+        """
+        Checks if a collection exists in CosmosDB.
+        """
+        if collection_name is None:
+            raise AirflowBadRequest("Collection name cannot be None.")
+
+        existing_container = list(self.get_conn().QueryContainers(
+            get_database_link(self.__get_database_name(database_name)), {
+                "query": "SELECT * FROM r WHERE r.id=@id",
+                "parameters": [
+                    {"name": "@id", "value": collection_name}
+                ]
+            }))
+        if len(existing_container) == 0:
+            return False
+
+        return True
+
+    def create_collection(self, collection_name, database_name=None):
+        """
+        Creates a new collection in the CosmosDB database.
+        """
+        if collection_name is None:
+            raise AirflowBadRequest("Collection name cannot be None.")
+
+        # We need to check to see if this container already exists so we don't try
+        # to create it twice
+        existing_container = list(self.get_conn().QueryContainers(
+            get_database_link(self.__get_database_name(database_name)), {
+                "query": "SELECT * FROM r WHERE r.id=@id",
+                "parameters": [
+                    {"name": "@id", "value": collection_name}
+                ]
+            }))
+
+        # Only create if we did not find it already existing
+        if len(existing_container) == 0:
+            self.get_conn().CreateContainer(
+                get_database_link(self.__get_database_name(database_name)),
+                {"id": collection_name})
+
+    def does_database_exist(self, database_name):
+        """
+        Checks if a database exists in CosmosDB.
+        """
+        if database_name is None:
+            raise AirflowBadRequest("Database name cannot be None.")
+
+        existing_database = list(self.get_conn().QueryDatabases({
+            "query": "SELECT * FROM r WHERE r.id=@id",
+            "parameters": [
+                {"name": "@id", "value": database_name}
+            ]
+        }))
+        if len(existing_database) == 0:
+            return False
+
+        return True
+
+    def create_database(self, database_name):
+        """
+        Creates a new database in CosmosDB.
+        """
+        if database_name is None:
+            raise AirflowBadRequest("Database name cannot be None.")
+
+        # We need to check to see if this database already exists so we don't try
+        # to create it twice
+        existing_database = list(self.get_conn().QueryDatabases({
+            "query": "SELECT * FROM r WHERE r.id=@id",
+            "parameters": [
+                {"name": "@id", "value": database_name}
+            ]
+        }))
+
+        # Only create if we did not find it already existing
+        if len(existing_database) == 0:
+            self.get_conn().CreateDatabase({"id": database_name})
+
+    def delete_database(self, database_name):
+        """
+        Deletes an existing database in CosmosDB.
+        """
+        if database_name is None:
+            raise AirflowBadRequest("Database name cannot be None.")
+
+        self.get_conn().DeleteDatabase(get_database_link(database_name))
+
+    def delete_collection(self, collection_name, database_name=None):
+        """
+        Deletes an existing collection in the CosmosDB database.
+        """
+        if collection_name is None:
+            raise AirflowBadRequest("Collection name cannot be None.")
+
+        self.get_conn().DeleteContainer(
+            get_collection_link(self.__get_database_name(database_name), collection_name))
+
+    def upsert_document(self, document, database_name=None, collection_name=None, document_id=None):
+        """
+        Inserts a new document (or updates an existing one) into an existing
+        collection in the CosmosDB database.
+        """
+        # Assign unique ID if one isn't provided
+        if document_id is None:
+            document_id = str(uuid.uuid4())
+
+        if document is None:
+            raise AirflowBadRequest("You cannot insert a None document")
+
+        # Add document id if isn't found
+        if 'id' in document:
+            if document['id'] is None:
+                document['id'] = document_id
+        else:
+            document['id'] = document_id
+
+        created_document = self.get_conn().CreateItem(
+            get_collection_link(
+                self.__get_database_name(database_name),
+                self.__get_collection_name(collection_name)),
+            document)
+
+        return created_document
+
+    def insert_documents(self, documents, database_name=None, collection_name=None):
+        """
+        Insert a list of new documents into an existing collection in the CosmosDB database.
+        """
+        if documents is None:
+            raise AirflowBadRequest("You cannot insert empty documents")
+
+        created_documents = []
+        for single_document in documents:
+            created_documents.append(
+                self.get_conn().CreateItem(
+                    get_collection_link(
+                        self.__get_database_name(database_name),
+                        self.__get_collection_name(collection_name)),
+                    single_document))
+
+        return created_documents
+
+    def delete_document(self, document_id, database_name=None, collection_name=None):
+        """
+        Delete an existing document out of a collection in the CosmosDB database.
+        """
+        if document_id is None:
+            raise AirflowBadRequest("Cannot delete a document without an id")
+
+        self.get_conn().DeleteItem(
+            get_document_link(
+                self.__get_database_name(database_name),
+                self.__get_collection_name(collection_name),
+                document_id))
+
+    def get_document(self, document_id, database_name=None, collection_name=None):
+        """
+        Get a document from an existing collection in the CosmosDB database.
+        """
+        if document_id is None:
+            raise AirflowBadRequest("Cannot get a document without an id")
+
+        try:
+            return self.get_conn().ReadItem(
+                get_document_link(
+                    self.__get_database_name(database_name),
+                    self.__get_collection_name(collection_name),
+                    document_id))
+        except HTTPFailure:
+            return None
+
+    def get_documents(self, sql_string, database_name=None, collection_name=None, partition_key=None):
+        """
+        Get a list of documents from an existing collection in the CosmosDB database via SQL query.
+        """
+        if sql_string is None:
+            raise AirflowBadRequest("SQL query string cannot be None")
+
+        # Query them in SQL
+        query = {'query': sql_string}
+
+        try:
+            result_iterable = self.get_conn().QueryItems(
+                get_collection_link(
+                    self.__get_database_name(database_name),
+                    self.__get_collection_name(collection_name)),
+                query,
+                partition_key)
+
+            return list(result_iterable)
+        except HTTPFailure:
+            return None
+
+
+def get_database_link(database_id):
+    return "dbs/" + database_id
+
+
+def get_collection_link(database_id, collection_id):
+    return get_database_link(database_id) + "/colls/" + collection_id
+
+
+def get_document_link(database_id, collection_id, document_id):
+    return get_collection_link(database_id, collection_id) + "/docs/" + document_id
diff --git a/airflow/contrib/hooks/gcp_sql_hook.py b/airflow/contrib/hooks/gcp_sql_hook.py
index 43e664d15c..1581637e0d 100644
--- a/airflow/contrib/hooks/gcp_sql_hook.py
+++ b/airflow/contrib/hooks/gcp_sql_hook.py
@@ -27,6 +27,7 @@
 import time
 import uuid
 from os.path import isfile
+from googleapiclient import errors
 from subprocess import Popen, PIPE
 from six.moves.urllib.parse import quote_plus
 
@@ -110,7 +111,7 @@ def create_instance(self, project_id, body):
         :param body: Body required by the Cloud SQL insert API, as described in
             https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert#request-body.
         :type body: dict
-        :return: True if the operation succeeded, raises an error otherwise.
+        :return: True if the operation succeeded otherwise raises an error.
         :rtype: bool
         """
         response = self.get_conn().instances().insert(
@@ -134,7 +135,7 @@ def patch_instance(self, project_id, body, instance):
         :type body: dict
         :param instance: Cloud SQL instance ID. This does not include the project ID.
         :type instance: str
-        :return: True if the operation succeeded, raises an error otherwise.
+        :return: True if the operation succeeded otherwise raises an error.
         :rtype: bool
         """
         response = self.get_conn().instances().patch(
@@ -153,7 +154,7 @@ def delete_instance(self, project_id, instance):
         :type project_id: str
         :param instance: Cloud SQL instance ID. This does not include the project ID.
         :type instance: str
-        :return: True if the operation succeeded, raises an error otherwise
+        :return: True if the operation succeeded otherwise raises an error.
         :rtype: bool
         """
         response = self.get_conn().instances().delete(
@@ -194,7 +195,7 @@ def create_database(self, project, instance, body):
         :param body: The request body, as described in
             https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body.
         :type body: dict
-        :return: True if the operation succeeded, raises an error otherwise.
+        :return: True if the operation succeeded otherwise raises an error.
         :rtype: bool
         """
         response = self.get_conn().databases().insert(
@@ -210,7 +211,7 @@ def patch_database(self, project, instance, database, body):
         Updates a database resource inside a Cloud SQL instance.
 
         This method supports patch semantics.
-        See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
+        See https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch.
 
         :param project: Project ID of the project that contains the instance.
         :type project: str
@@ -221,7 +222,7 @@ def patch_database(self, project, instance, database, body):
         :param body: The request body, as described in
             https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body.
         :type body: dict
-        :return: True if the operation succeeded, raises an error otherwise.
+        :return: True if the operation succeeded otherwise raises an error.
         :rtype: bool
         """
         response = self.get_conn().databases().patch(
@@ -243,7 +244,7 @@ def delete_database(self, project, instance, database):
         :type instance: str
         :param database: Name of the database to be deleted in the instance.
         :type database: str
-        :return: True if the operation succeeded, raises an error otherwise.
+        :return: True if the operation succeeded otherwise raises an error.
         :rtype: bool
         """
         response = self.get_conn().databases().delete(
@@ -254,6 +255,64 @@ def delete_database(self, project, instance, database):
         operation_name = response["name"]
         return self._wait_for_operation_to_complete(project, operation_name)
 
+    def export_instance(self, project_id, instance_id, body):
+        """
+        Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump
+        or CSV file.
+
+        :param project_id: Project ID of the project where the instance exists.
+        :type project_id: str
+        :param instance_id: Name of the Cloud SQL instance. This does not include the
+            project ID.
+        :type instance_id: str
+        :param body: The request body, as described in
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
+        :type body: dict
+        :return: True if the operation succeeded, raises an error otherwise
+        :rtype: bool
+        """
+        try:
+            response = self.get_conn().instances().export(
+                project=project_id,
+                instance=instance_id,
+                body=body
+            ).execute(num_retries=NUM_RETRIES)
+            operation_name = response["name"]
+            return self._wait_for_operation_to_complete(project_id, operation_name)
+        except errors.HttpError as ex:
+            raise AirflowException(
+                'Exporting instance {} failed: {}'.format(instance_id, ex.content)
+            )
+
+    def import_instance(self, project_id, instance_id, body):
+        """
+        Imports data into a Cloud SQL instance from a SQL dump or CSV file in
+        Cloud Storage.
+
+        :param project_id: Project ID of the project where the instance exists.
+        :type project_id: str
+        :param instance_id: Name of the Cloud SQL instance. This does not include the
+            project ID.
+        :type instance_id: str
+        :param body: The request body, as described in
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
+        :type body: dict
+        :return: True if the operation succeeded, raises an error otherwise
+        :rtype: bool
+        """
+        try:
+            response = self.get_conn().instances().import_(
+                project=project_id,
+                instance=instance_id,
+                body=body
+            ).execute(num_retries=NUM_RETRIES)
+            operation_name = response["name"]
+            return self._wait_for_operation_to_complete(project_id, operation_name)
+        except errors.HttpError as ex:
+            raise AirflowException(
+                'Importing instance {} failed: {}'.format(instance_id, ex.content)
+            )
+
     def _wait_for_operation_to_complete(self, project_id, operation_name):
         """
         Waits for the named operation to complete - checks status of the
@@ -293,11 +352,11 @@ def _wait_for_operation_to_complete(self, project_id, operation_name):
 
 class CloudSqlProxyRunner(LoggingMixin):
     """
-    Downloads and runs cloud-sql-proxy as subprocess of the python process.
+    Downloads and runs cloud-sql-proxy as subprocess of the Python process.
 
     The cloud-sql-proxy needs to be downloaded and started before we can connect
     to the Google Cloud SQL instance via database connection. It establishes
-    secure tunnel connection to the database - it authorizes using the
+    secure tunnel connection to the database. It authorizes using the
     GCP credentials that are passed by the configuration.
 
     More details about the proxy can be found here:
@@ -438,7 +497,7 @@ def _get_credential_parameters(self, session):
 
     def start_proxy(self):
         """
-        Starts Cloud Sql Proxy.
+        Starts Cloud SQL Proxy.
 
         You have to remember to stop the proxy if you started it!
         """
@@ -516,7 +575,7 @@ def stop_proxy(self):
 
     def get_proxy_version(self):
         """
-        Returns version of the Cloud Sql Proxy.
+        Returns version of the Cloud SQL Proxy.
         """
         self._download_sql_proxy_if_needed()
         command_to_run = [self.sql_proxy_path]
@@ -532,7 +591,7 @@ def get_proxy_version(self):
 
     def get_socket_path(self):
         """
-        Retrieves UNIX socket path used by Cloud Sql Proxy.
+        Retrieves UNIX socket path used by Cloud SQL Proxy.
 
         :return: The dynamically generated path for the socket created by the proxy.
         :rtype: str
@@ -583,38 +642,38 @@ def get_socket_path(self):
 # noinspection PyAbstractClass
 class CloudSqlDatabaseHook(BaseHook):
     """
-    Serves DB connection configuration for CloudSQL (Connections
+    Serves DB connection configuration for Google Cloud SQL (Connections
     of *gcpcloudsql://* type).
 
-    The hook is a "meta" one - it does not perform an actual connection,
-    it is there to retrieve all the parameters configured in gcpcloudsql:// connection,
-    start/stop Cloud Sql Proxy if needed, dynamically generate Postgres or MySQL
+    The hook is a "meta" one. It does not perform an actual connection.
+    It is there to retrieve all the parameters configured in gcpcloudsql:// connection,
+    start/stop Cloud SQL Proxy if needed, dynamically generate Postgres or MySQL
     connection in the database and return an actual Postgres or MySQL hook.
-    The returned Postgres/MySQL hooks are using direct connection or Cloud Sql
-    Proxy socket/tcp as configured.
+    The returned Postgres/MySQL hooks are using direct connection or Cloud SQL
+    Proxy socket/TCP as configured.
 
     Main parameters of the hook are retrieved from the standard URI components:
 
     * **user** - User name to authenticate to the database (from login of the URI).
-    * **password** - Password to authenticate to the database (from password of the URI)
-    * **public_ip** - IP to connect to for public connection (from host of the URI)
-    * **public_port** - Port to connect to for public connection (from port of the URI)
-    * **database** - Database to connect to (from schema of the URI)
+    * **password** - Password to authenticate to the database (from password of the URI).
+    * **public_ip** - IP to connect to for public connection (from host of the URI).
+    * **public_port** - Port to connect to for public connection (from port of the URI).
+    * **database** - Database to connect to (from schema of the URI).
 
     Remaining parameters are retrieved from the extras (URI query parameters):
 
     * **project_id** - Google Cloud Platform project where the Cloud SQL instance exists.
     * **instance** -  Name of the instance of the Cloud SQL database instance.
-    * **location** - The location of the cloud sql instance (for example europe-west1).
-    * **database_type** - The type of the database instance (mysql or postgres).
+    * **location** - The location of the Cloud SQL instance (for example europe-west1).
+    * **database_type** - The type of the database instance (MySQL or Postgres).
     * **use_proxy** - (default False) Whether SQL proxy should be used to connect to Cloud
       SQL DB.
     * **use_ssl** - (default False) Whether SSL should be used to connect to Cloud SQL DB.
-      You cannot use proxy and ssl together.
+      You cannot use proxy and SSL together.
     * **sql_proxy_use_tcp** - (default False) If set to true, TCP is used to connect via
       proxy, otherwise UNIX sockets are used.
-    * **sql_proxy_binary_path** - Optional path to sql proxy binary. If the binary is not
-      specified or the binary is not present, it is automatically downloaded.
+    * **sql_proxy_binary_path** - Optional path to Cloud SQL Proxy binary. If the binary
+      is not specified or the binary is not present, it is automatically downloaded.
     * **sql_proxy_version** -  Specific version of the proxy to download (for example
       v1.13). If not specified, the latest version is downloaded.
     * **sslcert** - Path to client certificate to authenticate when SSL is used.
@@ -683,8 +742,8 @@ def _validate_inputs(self):
                 self.database_type, CLOUD_SQL_VALID_DATABASE_TYPES
             ))
         if self.use_proxy and self.use_ssl:
-            raise AirflowException("Cloud Sql Proxy does not support SSL connections."
-                                   " SSL is not needed as Cloud Sql Proxy "
+            raise AirflowException("Cloud SQL Proxy does not support SSL connections."
+                                   " SSL is not needed as Cloud SQL Proxy "
                                    "provides encryption on its own")
         if self.use_ssl:
             self._check_ssl_file(self.sslcert, "sslcert")
@@ -770,7 +829,7 @@ def _get_sqlproxy_instance_specification(self):
     @provide_session
     def create_connection(self, session=None):
         """
-        Create connection in the Connection table - according to whether it uses
+        Create connection in the Connection table, according to whether it uses
         proxy, TCP, UNIX sockets, SSL. Connection ID will be randomly generated.
 
         :param session: Session of the SQL Alchemy ORM (automatically generated with
@@ -799,10 +858,10 @@ def delete_connection(self, session=None):
 
     def get_sqlproxy_runner(self):
         """
-        Retrieve Cloud Sql Proxy runner. It is used to manage the proxy
+        Retrieve Cloud SQL Proxy runner. It is used to manage the proxy
         lifecycle per task.
 
-        :return: The Cloud Sql Proxy runner.
+        :return: The Cloud SQL Proxy runner.
         :rtype: CloudSqlProxyRunner
         """
         return CloudSqlProxyRunner(
@@ -815,8 +874,8 @@ def get_sqlproxy_runner(self):
 
     def get_database_hook(self):
         """
-        Retrieve database hook - this is the actual Postgres or MySQL database hook
-        that uses proxy or connects directly to the Google Cloud Sql database.
+        Retrieve database hook. This is the actual Postgres or MySQL database hook
+        that uses proxy or connects directly to the Google Cloud SQL database.
         """
         if self.database_type == 'postgres':
             self.db_hook = PostgresHook(postgres_conn_id=self.db_conn_id,
@@ -836,7 +895,7 @@ def cleanup_database_hook(self):
 
     def reserve_free_tcp_port(self):
         """
-        Reserve free TCP port to be used by Cloud Sql Proxy
+        Reserve free TCP port to be used by Cloud SQL Proxy
         """
         self.reserved_tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.reserved_tcp_socket.bind(('127.0.0.1', 0))
@@ -844,7 +903,7 @@ def reserve_free_tcp_port(self):
 
     def free_reserved_port(self):
         """
-        Free TCP port - makes it immediately ready to be used by Cloud Sql Proxy.
+        Free TCP port. Makes it immediately ready to be used by Cloud SQL Proxy.
         """
         if self.reserved_tcp_socket:
             self.reserved_tcp_socket.close()
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index f848d25dce..6499ea38b5 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -577,14 +577,9 @@ def insert_bucket_acl(self, bucket, entity, role, user_project):
         :param bucket: Name of a bucket.
         :type bucket: str
         :param entity: The entity holding the permission, in one of the following forms:
-        - user-userId
-        - user-email
-        - group-groupId
-        - group-email
-        - domain-domain
-        - project-team-projectId
-        - allUsers
-        - allAuthenticatedUsers
+            user-userId, user-email, group-groupId, group-email, domain-domain,
+            project-team-projectId, allUsers, allAuthenticatedUsers.
+            See: https://cloud.google.com/storage/docs/access-control/lists#scopes
         :type entity: str
         :param role: The access permission for the entity.
             Acceptable values are: "OWNER", "READER", "WRITER".
@@ -625,14 +620,9 @@ def insert_object_acl(self, bucket, object_name, entity, role, generation,
             https://cloud.google.com/storage/docs/json_api/#encoding
         :type object_name: str
         :param entity: The entity holding the permission, in one of the following forms:
-            - user-userId
-            - user-email
-            - group-groupId
-            - group-email
-            - domain-domain
-            - project-team-projectId
-            - allUsers
-            - allAuthenticatedUsers
+            user-userId, user-email, group-groupId, group-email, domain-domain,
+            project-team-projectId, allUsers, allAuthenticatedUsers
+            See: https://cloud.google.com/storage/docs/access-control/lists#scopes
         :type entity: str
         :param role: The access permission for the entity.
             Acceptable values are: "OWNER", "READER".
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 3df77d3a1f..1c98f26afc 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -34,7 +34,6 @@
     PigCommand, ShellCommand, SparkCommand, DbTapQueryCommand, DbExportCommand, \
     DbImportCommand
 
-
 COMMAND_CLASSES = {
     "hivecmd": HiveCommand,
     "prestocmd": PrestoCommand,
@@ -47,35 +46,52 @@
     "dbimportcmd": DbImportCommand
 }
 
-HYPHEN_ARGS = ['cluster_label', 'app_id', 'note_id']
-
-POSITIONAL_ARGS = ['sub_command', 'parameters']
-
-COMMAND_ARGS = {
-    "hivecmd": ['query', 'script_location', 'macros', 'tags', 'sample_size',
-                'cluster_label', 'name'],
-    'prestocmd': ['query', 'script_location', 'macros', 'tags', 'cluster_label', 'name'],
-    'hadoopcmd': ['sub_command', 'tags', 'cluster_label', 'name'],
-    'shellcmd': ['script', 'script_location', 'files', 'archives', 'parameters', 'tags',
-                 'cluster_label', 'name'],
-    'pigcmd': ['script', 'script_location', 'parameters', 'tags', 'cluster_label',
-               'name'],
-    'dbtapquerycmd': ['db_tap_id', 'query', 'macros', 'tags', 'name'],
-    'sparkcmd': ['program', 'cmdline', 'sql', 'script_location', 'macros', 'tags',
-                 'cluster_label', 'language', 'app_id', 'name', 'arguments', 'note_id',
-                 'user_program_arguments'],
-    'dbexportcmd': ['mode', 'hive_table', 'partition_spec', 'dbtap_id', 'db_table',
-                    'db_update_mode', 'db_update_keys', 'export_dir',
-                    'fields_terminated_by', 'tags', 'name', 'customer_cluster_label',
-                    'use_customer_cluster', 'additional_options'],
-    'dbimportcmd': ['mode', 'hive_table', 'dbtap_id', 'db_table', 'where_clause',
-                    'parallelism', 'extract_query', 'boundary_query', 'split_column',
-                    'tags', 'name', 'hive_serde', 'customer_cluster_label',
-                    'use_customer_cluster', 'schema', 'additional_options']
+POSITIONAL_ARGS = {
+    'hadoopcmd': ['sub_command'],
+    'shellcmd': ['parameters'],
+    'pigcmd': ['parameters']
 }
 
 
-class QuboleHook(BaseHook, LoggingMixin):
+def flatten_list(list_of_lists):
+    return [element for array in list_of_lists for element in array]
+
+
+def filter_options(options):
+    options_to_remove = ["help", "print-logs-live", "print-logs"]
+    return [option for option in options if option not in options_to_remove]
+
+
+def get_options_list(command_class):
+    options_list = [option.get_opt_string().strip("--") for option in command_class.optparser.option_list]
+    return filter_options(options_list)
+
+
+def build_command_args():
+    command_args, hyphen_args = {}, set()
+    for cmd in COMMAND_CLASSES:
+
+        # get all available options from the class
+        opts_list = get_options_list(COMMAND_CLASSES[cmd])
+
+        # append positional args if any for the command
+        if cmd in POSITIONAL_ARGS:
+            opts_list += POSITIONAL_ARGS[cmd]
+
+        # get args with a hyphen and replace them with underscore
+        for index, opt in enumerate(opts_list):
+            if "-" in opt:
+                opts_list[index] = opt.replace("-", "_")
+                hyphen_args.add(opts_list[index])
+
+        command_args[cmd] = opts_list
+    return command_args, list(hyphen_args)
+
+
+COMMAND_ARGS, HYPHEN_ARGS = build_command_args()
+
+
+class QuboleHook(BaseHook):
     def __init__(self, *args, **kwargs):
         conn = self.get_connection(kwargs['qubole_conn_id'])
         Qubole.configure(api_token=conn.password, api_url=conn.host)
@@ -189,12 +205,13 @@ def create_cmd_args(self, context):
         cmd_type = self.kwargs['command_type']
         inplace_args = None
         tags = set([self.dag_id, self.task_id, context['run_id']])
+        positional_args_list = flatten_list(POSITIONAL_ARGS.values())
 
         for k, v in self.kwargs.items():
             if k in COMMAND_ARGS[cmd_type]:
                 if k in HYPHEN_ARGS:
                     args.append("--{0}={1}".format(k.replace('_', '-'), v))
-                elif k in POSITIONAL_ARGS:
+                elif k in positional_args_list:
                     inplace_args = v
                 elif k == 'tags':
                     if isinstance(v, six.string_types):
diff --git a/airflow/contrib/operators/azure_batchai_operator.py b/airflow/contrib/operators/azure_batchai_operator.py
new file mode 100644
index 0000000000..04fd419883
--- /dev/null
+++ b/airflow/contrib/operators/azure_batchai_operator.py
@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from time import sleep
+
+from airflow.contrib.hooks.azure_batchai_hook import (AzureBatchAIHook)
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator
+
+from azure.mgmt.batchai.models import (ClusterCreateParameters,
+                                       UserAccountSettings)
+
+from msrestazure.azure_exceptions import CloudError
+
+
+class AzureBatchAIOperator(BaseOperator):
+    """
+    Start a cluster on Azure Batch AI
+    :param bai_conn_id: connection id of a service principal which will be used
+        to start the batch ai cluster
+    :type bai_conn_id: str
+    :param resource_group: name of the resource group wherein this cluster
+        should be started
+    :type resource_group: str
+    :param workspace_name: name of the workspace wherein this cluster
+        should be started
+    :type workspace_name: str
+    :param cluster_name: name of the batch ai cluster
+    :type cluster_name: str
+    :param location: the location wherein this cluster should be started
+    :type location: str
+    :param scale_type: either "manual" or "auto" based on desired scale settings
+    :type scale_type: str
+    :param: environment_variables: key,value pairs containing environment variables
+        which will be passed to the running cluster, must include username and password
+    :type: environment_variables: dict
+    :param: volumes: list of volumes to be mounted to the cluster.
+        Currently only Azure Fileshares are supported.
+    :type: volumes: list[<conn_id, account_name, share_name, mount_path, read_only>]
+    :param: publisher: publisher of the image to be used in the cluster
+    :type: publisher: str
+    :param: offer: offer of the image to be used in the cluster
+    :type: offer: str
+    :param: sku: sku of the image to be used in the cluster
+    :type: sku: str
+    :param: version: publisher of the image to be used in the cluster
+    :type: version: str
+     :Example:
+     >>>  a = AzureBatchAIOperator(
+                bai_conn_id='azure_service_principal',
+                resource_group='my-resource-group',
+                workspace_name='my-workspace-name-{{ ds }}',
+                cluster_name='my-cluster-name',
+                location='westeurope',
+                scale_type'auto_scale',
+                environment_variables={'USERNAME': '{{ ds }}',
+                 'PASSWORD': '{{ ds }}},
+                volumes=[],
+            )
+    """
+
+    template_fields = ('name', 'environment_variables')
+    template_ext = tuple()
+
+    def __init__(self, bai_conn_id, resource_group, workspace_name, cluster_name, location, scale_type,
+                 environment_variables={}, volumes=[], publisher='Canonical', offer='UbuntuServer',
+                 sku='16.04-LTS', version='latest', *args, **kwargs):
+        self.bai_conn_id = bai_conn_id
+        self.resource_group = resource_group
+        self.workspace_name = workspace_name
+        self.cluster_name = cluster_name
+        self.location = location
+        self.scale_type = scale_type
+        self.environment_variables = environment_variables
+        self.volumes = volumes
+        self.publisher = publisher
+        self.offer = offer
+        self.sku = sku
+        self.version = version
+        super(AzureBatchAIOperator, self).__init__(*args, **kwargs)
+
+    def execute(self):
+        batch_ai_hook = AzureBatchAIHook(self.bai_conn_id)
+
+        try:
+            self.log.info("Starting Batch AI cluster with offer %s and sku %s",
+                          self.offer, self.sku)
+
+            username = self.environment_variables['USERNAME']
+            password = self.environment_variables['PASSWORD']
+
+            user_account_settings = UserAccountSettings(
+                admin_user_name=username,
+                admin_user_password=password)
+
+            parameters = ClusterCreateParameters(
+                vm_size='STANDARD_NC6',
+                user_account_settings=user_account_settings,
+                location=self.location,
+                vm_priority='dedicated',
+                scale_settings=None,
+                virtual_machine_configuration=None,
+                node_setup=None,
+                subnet=None)
+
+            batch_ai_hook.create(self.resource_group,
+                                 self.workspace_name,
+                                 self.cluster_name,
+                                 self.location, parameters)
+
+            self.log.info("Cluster started")
+
+            exit_code = self._monitor_logging(batch_ai_hook, self.resource_group, self.workspace_name)
+            self.log.info("Container had exit code: %s", exit_code)
+
+            if exit_code and exit_code != 0:
+                raise AirflowException("Container had a non-zero exit code, %s"
+                                       % exit_code)
+        except CloudError as e:
+            self.log.exception("Could not start batch ai cluster, %s", str(e))
+            raise AirflowException("Could not start batch ai cluster")
+
+        finally:
+            self.log.info("Deleting Batch AI cluster")
+            try:
+                batch_ai_hook.delete(self.resource_group, self.workspace_name, self.cluster_name)
+            except Exception:
+                self.log.exception("Could not delete batch ai cluster")
+
+    def _monitor_logging(self, batch_ai_hook, resource_group, name):
+        last_state = None
+        last_message_logged = None
+        last_line_logged = None
+        for _ in range(43200):
+            # roughly 12 hours
+            try:
+                state, exit_code = batch_ai_hook.get_state_exitcode(self.resource_group,
+                                                                    self.workspace_name,
+                                                                    self.cluster_name)
+
+                if state != last_state:
+                    self.log.info("Cluster state changed to %s", state)
+                    last_state = state
+
+                if state == "Terminated":
+                    return exit_code
+                messages = batch_ai_hook.get_messages(self.resource_group,
+                                                      self.workspace_name,
+                                                      self.cluster_name)
+                last_message_logged = self._log_last(messages, last_message_logged)
+
+                if state == "Running":
+                    try:
+                        logs = batch_ai_hook.get_messages(self.resource_group,
+                                                          self.workspace_name,
+                                                          self.cluster_name)
+                        last_line_logged = self._log_last(logs, last_line_logged)
+                    except CloudError as err:
+                        self.log.exception("Exception (%s) while getting logs from cluster, "
+                                           "retrying...", str(err))
+
+            except CloudError as err:
+                if 'ResourceNotFound' in str(err):
+                    self.log.warning("ResourceNotFound, cluster is probably removed by another process "
+                                     "(make sure that the name is unique). Error: %s", str(err))
+                    return 1
+                else:
+                    self.log.exception("Exception while getting cluster")
+
+            except Exception as e:
+                self.log.exception("Exception while getting cluster: %s", str(e))
+            sleep(1)
+        raise AirflowTaskTimeout("Did not complete on time")
diff --git a/airflow/contrib/operators/azure_cosmos_insertdocument_operator.py b/airflow/contrib/operators/azure_cosmos_insertdocument_operator.py
new file mode 100644
index 0000000000..930ff402d0
--- /dev/null
+++ b/airflow/contrib/operators/azure_cosmos_insertdocument_operator.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.contrib.hooks.azure_cosmos_hook import AzureCosmosDBHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AzureCosmosInsertDocumentOperator(BaseOperator):
+    """
+    Inserts a new document into the specified Cosmos database and collection
+    It will create both the database and collection if they do not already exist
+
+    :param database_name: The name of the database. (templated)
+    :type database_name: str
+    :param collection_name: The name of the collection. (templated)
+    :type collection_name: str
+    :param document: The document to insert
+    :type document: json
+    :param azure_cosmos_conn_id: reference to a CosmosDB connection.
+    :type azure_cosmos_conn_id: str
+    """
+    template_fields = ('database_name', 'collection_name')
+    ui_color = '#e4f0e8'
+
+    @apply_defaults
+    def __init__(self,
+                 database_name,
+                 collection_name,
+                 document,
+                 azure_cosmos_conn_id='azure_cosmos_default',
+                 *args,
+                 **kwargs):
+        super(AzureCosmosInsertDocumentOperator, self).__init__(*args, **kwargs)
+        self.database_name = database_name
+        self.collection_name = collection_name
+        self.document = document
+        self.azure_cosmos_conn_id = azure_cosmos_conn_id
+
+    def execute(self, context):
+        # Create the hook
+        hook = AzureCosmosDBHook(azure_cosmos_conn_id=self.azure_cosmos_conn_id)
+
+        # Create the DB if it doesn't already exist
+        if (not hook.does_database_exist(self.database_name)):
+            hook.create_database(self.database_name)
+
+        # Create the collection as well
+        if (not hook.does_collection_exist(self.collection_name, self.database_name)):
+            hook.create_collection(self.collection_name, self.database_name)
+
+        # finally insert the document
+        hook.upsert_document(self.document, self.database_name, self.collection_name)
diff --git a/airflow/contrib/operators/gcp_sql_operator.py b/airflow/contrib/operators/gcp_sql_operator.py
index 711f2c8d15..abdefb5190 100644
--- a/airflow/contrib/operators/gcp_sql_operator.py
+++ b/airflow/contrib/operators/gcp_sql_operator.py
@@ -27,7 +27,7 @@
 SETTINGS = 'settings'
 SETTINGS_VERSION = 'settingsVersion'
 
-CLOUD_SQL_VALIDATION = [
+CLOUD_SQL_CREATE_VALIDATION = [
     dict(name="name", allow_empty=False),
     dict(name="settings", type="dict", fields=[
         dict(name="tier", allow_empty=False),
@@ -95,10 +95,10 @@
     dict(name="exportContext", type="dict", fields=[
         dict(name="fileType", allow_empty=False),
         dict(name="uri", allow_empty=False),
-        dict(name="databases", type="list"),
+        dict(name="databases", optional=True, type="list"),
         dict(name="sqlExportOptions", type="dict", optional=True, fields=[
-            dict(name="tables", type="list"),
-            dict(name="schemaOnly")
+            dict(name="tables", optional=True, type="list"),
+            dict(name="schemaOnly", optional=True)
         ]),
         dict(name="csvExportOptions", type="dict", optional=True, fields=[
             dict(name="selectQuery")
@@ -117,7 +117,7 @@
         ])
     ])
 ]
-CLOUD_SQL_DATABASE_INSERT_VALIDATION = [
+CLOUD_SQL_DATABASE_CREATE_VALIDATION = [
     dict(name="instance", allow_empty=False),
     dict(name="name", allow_empty=False),
     dict(name="project", allow_empty=False),
@@ -142,7 +142,7 @@ class CloudSqlBaseOperator(BaseOperator):
     :type instance: str
     :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
     :type gcp_conn_id: str
-    :param api_version: API version used (e.g. v1).
+    :param api_version: API version used (e.g. v1beta4).
     :type api_version: str
     """
     @apply_defaults
@@ -241,17 +241,21 @@ def _validate_inputs(self):
 
     def _validate_body_fields(self):
         if self.validate_body:
-            GcpBodyFieldValidator(CLOUD_SQL_VALIDATION,
+            GcpBodyFieldValidator(CLOUD_SQL_CREATE_VALIDATION,
                                   api_version=self.api_version).validate(self.body)
 
     def execute(self, context):
         self._validate_body_fields()
         if not self._check_if_instance_exists(self.instance):
-            return self._hook.create_instance(self.project_id, self.body)
+            self._hook.create_instance(self.project_id, self.body)
         else:
             self.log.info("Cloud SQL instance with ID {} already exists. "
                           "Aborting create.".format(self.instance))
-            return True
+
+        instance_resource = self._hook.get_instance(self.project_id, self.instance)
+        service_account_email = instance_resource["serviceAccountEmailAddress"]
+        task_instance = context['task_instance']
+        task_instance.xcom_push(key="service_account_email", value=service_account_email)
 
 
 class CloudSqlInstancePatchOperator(CloudSqlBaseOperator):
@@ -389,7 +393,7 @@ def _validate_inputs(self):
 
     def _validate_body_fields(self):
         if self.validate_body:
-            GcpBodyFieldValidator(CLOUD_SQL_DATABASE_INSERT_VALIDATION,
+            GcpBodyFieldValidator(CLOUD_SQL_DATABASE_CREATE_VALIDATION,
                                   api_version=self.api_version).validate(self.body)
 
     def execute(self, context):
@@ -526,6 +530,131 @@ def execute(self, context):
                                               self.database)
 
 
+class CloudSqlInstanceExportOperator(CloudSqlBaseOperator):
+    """
+    Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump
+    or CSV file.
+
+    Note: This operator is idempotent. If executed multiple times with the same
+    export file URI, the export file in GCS will simply be overridden.
+
+    :param project_id: Project ID of the project that contains the instance to be
+        exported.
+    :type project_id: str
+    :param instance: Cloud SQL instance ID. This does not include the project ID.
+    :type instance: str
+    :param body: The request body, as described in
+        https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
+    :type body: dict
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1beta4).
+    :type api_version: str
+    :param validate_body: Whether the body should be validated. Defaults to True.
+    :type validate_body: bool
+    """
+    # [START gcp_sql_export_template_fields]
+    template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
+    # [END gcp_sql_export_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance,
+                 body,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 validate_body=True,
+                 *args, **kwargs):
+        self.body = body
+        self.validate_body = validate_body
+        super(CloudSqlInstanceExportOperator, self).__init__(
+            project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
+            api_version=api_version, *args, **kwargs)
+
+    def _validate_inputs(self):
+        super(CloudSqlInstanceExportOperator, self)._validate_inputs()
+        if not self.body:
+            raise AirflowException("The required parameter 'body' is empty")
+
+    def _validate_body_fields(self):
+        if self.validate_body:
+            GcpBodyFieldValidator(CLOUD_SQL_EXPORT_VALIDATION,
+                                  api_version=self.api_version).validate(self.body)
+
+    def execute(self, context):
+        self._validate_body_fields()
+        return self._hook.export_instance(self.project_id, self.instance, self.body)
+
+
+class CloudSqlInstanceImportOperator(CloudSqlBaseOperator):
+    """
+    Imports data into a Cloud SQL instance from a SQL dump or CSV file in Cloud Storage.
+
+    CSV IMPORT:
+
+    This operator is NOT idempotent for a CSV import. If the same file is imported
+    multiple times, the imported data will be duplicated in the database.
+    Moreover, if there are any unique constraints the duplicate import may result in an
+    error.
+
+    SQL IMPORT:
+
+    This operator is idempotent for a SQL import if it was also exported by Cloud SQL.
+    The exported SQL contains 'DROP TABLE IF EXISTS' statements for all tables
+    to be imported.
+
+    If the import file was generated in a different way, idempotence is not guaranteed.
+    It has to be ensured on the SQL file level.
+
+    :param project_id: Project ID of the project that contains the instance.
+    :type project_id: str
+    :param instance: Cloud SQL instance ID. This does not include the project ID.
+    :type instance: str
+    :param body: The request body, as described in
+        https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
+    :type body: dict
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1beta4).
+    :type api_version: str
+    :param validate_body: Whether the body should be validated. Defaults to True.
+    :type validate_body: bool
+    """
+    # [START gcp_sql_import_template_fields]
+    template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
+    # [END gcp_sql_import_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance,
+                 body,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 validate_body=True,
+                 *args, **kwargs):
+        self.body = body
+        self.validate_body = validate_body
+        super(CloudSqlInstanceImportOperator, self).__init__(
+            project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
+            api_version=api_version, *args, **kwargs)
+
+    def _validate_inputs(self):
+        super(CloudSqlInstanceImportOperator, self)._validate_inputs()
+        if not self.body:
+            raise AirflowException("The required parameter 'body' is empty")
+
+    def _validate_body_fields(self):
+        if self.validate_body:
+            GcpBodyFieldValidator(CLOUD_SQL_IMPORT_VALIDATION,
+                                  api_version=self.api_version).validate(self.body)
+
+    def execute(self, context):
+        self._validate_body_fields()
+        return self._hook.import_instance(self.project_id, self.instance, self.body)
+
+
 class CloudSqlQueryOperator(BaseOperator):
     """
     Performs DML or DDL query on an existing Cloud Sql instance. It optionally uses
diff --git a/airflow/contrib/operators/gcs_acl_operator.py b/airflow/contrib/operators/gcs_acl_operator.py
index d918444131..a39b8cf5d9 100644
--- a/airflow/contrib/operators/gcs_acl_operator.py
+++ b/airflow/contrib/operators/gcs_acl_operator.py
@@ -29,14 +29,8 @@ class GoogleCloudStorageBucketCreateAclEntryOperator(BaseOperator):
     :param bucket: Name of a bucket.
     :type bucket: str
     :param entity: The entity holding the permission, in one of the following forms:
-        - user-userId
-        - user-email
-        - group-groupId
-        - group-email
-        - domain-domain
-        - project-team-projectId
-        - allUsers
-        - allAuthenticatedUsers
+        user-userId, user-email, group-groupId, group-email, domain-domain,
+        project-team-projectId, allUsers, allAuthenticatedUsers
     :type entity: str
     :param role: The access permission for the entity.
         Acceptable values are: "OWNER", "READER", "WRITER".
@@ -82,14 +76,8 @@ class GoogleCloudStorageObjectCreateAclEntryOperator(BaseOperator):
         https://cloud.google.com/storage/docs/json_api/#encoding
     :type object_name: str
     :param entity: The entity holding the permission, in one of the following forms:
-        - user-userId
-        - user-email
-        - group-groupId
-        - group-email
-        - domain-domain
-        - project-team-projectId
-        - allUsers
-        - allAuthenticatedUsers
+        user-userId, user-email, group-groupId, group-email, domain-domain,
+        project-team-projectId, allUsers, allAuthenticatedUsers
     :type entity: str
     :param role: The access permission for the entity.
         Acceptable values are: "OWNER", "READER".
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index 82ee293b93..7818d961c4 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -43,6 +43,8 @@ class QuboleOperator(BaseOperator):
             :script_location: s3 location containing query statement
             :sample_size: size of sample in bytes on which to run query
             :macros: macro values which were used in query
+            :sample_size: size of sample in bytes on which to run query
+            :hive-version: Specifies the hive version to be used. eg: 0.13,1.2,etc.
         prestocmd:
             :query: inline query statement
             :script_location: s3 location containing query statement
@@ -77,12 +79,14 @@ class QuboleOperator(BaseOperator):
             :arguments: spark-submit command line arguments
             :user_program_arguments: arguments that the user program takes in
             :macros: macro values which were used in query
+            :note_id: Id of the Notebook to run
         dbtapquerycmd:
             :db_tap_id: data store ID of the target database, in Qubole.
             :query: inline query statement
             :macros: macro values which were used in query
         dbexportcmd:
-            :mode: 1 (simple), 2 (advance)
+            :mode: Can be 1 for Hive export or 2 for HDFS/S3 export
+            :schema: Db schema name assumed accordingly by database if not specified
             :hive_table: Name of the hive table
             :partition_spec: partition specification for Hive table.
             :dbtap_id: data store ID of the target database, in Qubole.
@@ -91,9 +95,15 @@ class QuboleOperator(BaseOperator):
             :db_update_keys: columns used to determine the uniqueness of rows
             :export_dir: HDFS/S3 location from which data will be exported.
             :fields_terminated_by: hex of the char used as column separator in the dataset
+            :use_customer_cluster: To use cluster to run command
+            :customer_cluster_label: the label of the cluster to run the command on
+            :additional_options: Additional Sqoop options which are needed enclose options in
+                double or single quotes e.g. '--map-column-hive id=int,data=string'
         dbimportcmd:
             :mode: 1 (simple), 2 (advance)
             :hive_table: Name of the hive table
+            :schema: Db schema name assumed accordingly by database if not specified
+            :hive_serde: Output format of the Hive Table
             :dbtap_id: data store ID of the target database, in Qubole.
             :db_table: name of the db table
             :where_clause: where clause, if any
@@ -102,6 +112,10 @@ class QuboleOperator(BaseOperator):
                 of the where clause.
             :boundary_query: Query to be used get range of row IDs to be extracted
             :split_column: Column used as row ID to split data into ranges (mode 2)
+            :use_customer_cluster: To use cluster to run command
+            :customer_cluster_label: the label of the cluster to run the command on
+            :additional_options: Additional Sqoop options which are needed enclose options in
+                double or single quotes
 
     .. note:: Following fields are template-supported : ``query``, ``script_location``,
         ``sub_command``, ``script``, ``files``, ``archives``, ``program``, ``cmdline``,
diff --git a/airflow/contrib/sensors/azure_cosmos_sensor.py b/airflow/contrib/sensors/azure_cosmos_sensor.py
new file mode 100644
index 0000000000..78b340d4ef
--- /dev/null
+++ b/airflow/contrib/sensors/azure_cosmos_sensor.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.contrib.hooks.azure_cosmos_hook import AzureCosmosDBHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AzureCosmosDocumentSensor(BaseSensorOperator):
+    """
+    Checks for the existence of a document which
+    matches the given query in CosmosDB. Example:
+
+    >>> azure_cosmos_sensor = AzureCosmosDocumentSensor(database_name="somedatabase_name",
+    ...                            collection_name="somecollection_name",
+    ...                            document_id="unique-doc-id",
+    ...                            azure_cosmos_conn_id="azure_cosmos_default",
+    ...                            task_id="azure_cosmos_sensor")
+    """
+    template_fields = ('database_name', 'collection_name', 'document_id')
+
+    @apply_defaults
+    def __init__(
+            self,
+            database_name,
+            collection_name,
+            document_id,
+            azure_cosmos_conn_id="azure_cosmos_default",
+            *args,
+            **kwargs):
+        """
+        Create a new AzureCosmosDocumentSensor
+
+        :param database_name: Target CosmosDB database_name.
+        :type database_name: str
+        :param collection_name: Target CosmosDB collection_name.
+        :type collection_name: str
+        :param document_id: The ID of the target document.
+        :type query: str
+        :param azure_cosmos_conn_id: Reference to the Azure CosmosDB connection.
+        :type azure_cosmos_conn_id: str
+        """
+        super(AzureCosmosDocumentSensor, self).__init__(*args, **kwargs)
+        self.azure_cosmos_conn_id = azure_cosmos_conn_id
+        self.database_name = database_name
+        self.collection_name = collection_name
+        self.document_id = document_id
+
+    def poke(self, context):
+        self.log.info("*** Intering poke")
+        hook = AzureCosmosDBHook(self.azure_cosmos_conn_id)
+        return hook.get_document(self.document_id, self.database_name, self.collection_name) is not None
diff --git a/airflow/models.py b/airflow/models.py
index 1bca27cbc8..1b46a811bf 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -667,6 +667,8 @@ class Connection(Base, LoggingMixin):
         ('snowflake', 'Snowflake',),
         ('segment', 'Segment',),
         ('azure_data_lake', 'Azure Data Lake'),
+        ('azure_batch_ai', 'Azure Batch AI'),
+        ('azure_cosmos', 'Azure CosmosDB'),
         ('cassandra', 'Cassandra',),
         ('qubole', 'Qubole'),
         ('mongo', 'MongoDB'),
@@ -808,6 +810,12 @@ def get_hook(self):
             elif self.conn_type == 'azure_data_lake':
                 from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
                 return AzureDataLakeHook(azure_data_lake_conn_id=self.conn_id)
+            elif self.conn_type == 'azure_batch_ai':
+                from airflow.contrib.hooks.azure_batchai_hook import AzureBatchAIHook
+                return AzureBatchAIHook(azure_batchai_conn_id=self.conn_id)
+            elif self.conn_type == 'azure_cosmos':
+                from airflow.contrib.hooks.azure_cosmos_hook import AzureCosmosDBHook
+                return AzureCosmosDBHook(azure_cosmos_conn_id=self.conn_id)
             elif self.conn_type == 'cassandra':
                 from airflow.contrib.hooks.cassandra_hook import CassandraHook
                 return CassandraHook(cassandra_conn_id=self.conn_id)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index b6a807c86c..cdc73899dd 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -277,6 +277,14 @@ def initdb(rbac=False):
         models.Connection(
             conn_id='azure_data_lake_default', conn_type='azure_data_lake',
             extra='{"tenant": "<TENANT>", "account_name": "<ACCOUNTNAME>" }'))
+    merge_conn(
+        models.Connection(
+            conn_id='azure_batchai_default', conn_type='azure_batch_ai',
+            extra='{"key_path": "<AZURE_AUTH_LOCATION>" }'))
+    merge_conn(
+        models.Connection(
+            conn_id='azure_cosmos_default', conn_type='azure_cosmos',
+            extra='{"database_name": "<DATABASE_NAME>", "collection_name": "<COLLECTION_NAME>" }'))
     merge_conn(
         models.Connection(
             conn_id='cassandra_default', conn_type='cassandra',
diff --git a/docs/howto/manage-connections.rst b/docs/howto/manage-connections.rst
index 35b5807c9e..f38ff2fb61 100644
--- a/docs/howto/manage-connections.rst
+++ b/docs/howto/manage-connections.rst
@@ -16,7 +16,7 @@
     under the License.
 
 Managing Connections
-=====================
+====================
 
 Airflow needs to know how to connect to your environment. Information
 such as hostname, port, login and passwords to other systems and services is
@@ -170,21 +170,21 @@ Password (required)
     Specify the password to connect.    
     
 Extra (optional)
-    Specify the extra parameters (as json dictionary) that can be used in mysql
+    Specify the extra parameters (as json dictionary) that can be used in MySQL
     connection. The following parameters are supported:
 
     * **charset**: specify charset of the connection
-    * **cursor**: one of "sscursor", "dictcursor, "ssdictcursor" - specifies cursor class to be
+    * **cursor**: one of "sscursor", "dictcursor, "ssdictcursor" . Specifies cursor class to be
       used
     * **local_infile**: controls MySQL's LOCAL capability (permitting local data loading by
       clients). See `MySQLdb docs <https://mysqlclient.readthedocs.io/user_guide.html>`_
       for details.
-    * **unix_socket**: UNIX socket used instead of the default socket
-    * **ssl**: Dictionary of SSL parameters that control connecting using SSL (those
+    * **unix_socket**: UNIX socket used instead of the default socket.
+    * **ssl**: Dictionary of SSL parameters that control connecting using SSL. Those
       parameters are server specific and should contain "ca", "cert", "key", "capath",
       "cipher" parameters. See
       `MySQLdb docs <https://mysqlclient.readthedocs.io/user_guide.html>`_ for details.
-      Note that in order to be useful in URL notation, this parameter might also be
+      Note that to be useful in URL notation, this parameter might also be
       a string where the SSL dictionary is a string-encoded JSON dictionary.
 
     Example "extras" field:
@@ -216,8 +216,8 @@ Extra (optional)
        }
 
     When specifying the connection as URI (in AIRFLOW_CONN_* variable) you should specify it
-    following the standard syntax of DB connections, where extras are passed as parameters
-    of the URI (note that all components of the URI should be URL-encoded).
+    following the standard syntax of DB connections - where extras are passed as parameters
+    of the URI. Note that all components of the URI should be URL-encoded.
 
     For example:
 
@@ -226,7 +226,7 @@ Extra (optional)
        mysql://mysql_user:XXXXXXXXXXXX@1.1.1.1:3306/mysqldb?ssl=%7B%22cert%22%3A+%22%2Ftmp%2Fclient-cert.pem%22%2C+%22ca%22%3A+%22%2Ftmp%2Fserver-ca.pem%22%2C+%22key%22%3A+%22%2Ftmp%2Fclient-key.pem%22%7D
 
     .. note::
-        If encounter UnicodeDecodeError while working with MySQL connection check
+        If encounter UnicodeDecodeError while working with MySQL connection, check
         the charset defined is matched to the database charset.
 
 Postgres
@@ -269,7 +269,7 @@ Extra (optional)
       should send a keepalive message to the server.
 
     More details on all Postgres parameters supported can be found in
-    `Postgres documentation <https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-CONNSTRING>`_
+    `Postgres documentation <https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-CONNSTRING>`_.
 
     Example "extras" field:
 
@@ -297,14 +297,14 @@ Cloudsql
 The gcpcloudsql:// connection is used by
 :class:`airflow.contrib.operators.gcp_sql_operator.CloudSqlQueryOperator` to perform query
 on a Google Cloud SQL database. Google Cloud SQL database can be either
-Postgres or MySQL, so this is a "meta" connection type - it introduces common schema
+Postgres or MySQL, so this is a "meta" connection type. It introduces common schema
 for both MySQL and Postgres, including what kind of connectivity should be used.
-Google Cloud SQL supports connecting via public IP or via Cloud Sql Proxy
-and in the latter case the
+Google Cloud SQL supports connecting via public IP or via Cloud SQL Proxy.
+In the latter case the
 :class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlDatabaseHook` uses
 :class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlProxyRunner` to automatically prepare
 and use temporary Postgres or MySQL connection that will use the proxy to connect
-(either via TCP or UNIX socket)
+(either via TCP or UNIX socket.
 
 Configuring the Connection
 ''''''''''''''''''''''''''
@@ -321,7 +321,7 @@ Password (required)
     Specify the password to connect.
 
 Extra (optional)
-    Specify the extra parameters (as json dictionary) that can be used in gcpcloudsql
+    Specify the extra parameters (as JSON dictionary) that can be used in Google Cloud SQL
     connection.
 
     Details of all the parameters supported in extra field can be found in
@@ -340,9 +340,9 @@ Extra (optional)
           "sql_proxy_use_tcp": false
        }
 
-    When specifying the connection as URI (in AIRFLOW_CONN_* variable) you should specify it
-    following the standard syntax of DB connections, where extras are passed as parameters
-    of the URI (note that all components of the URI should be URL-encoded).
+    When specifying the connection as URI (in AIRFLOW_CONN_* variable), you should specify
+    it following the standard syntax of DB connection, where extras are passed as
+    parameters of the URI. Note that all components of the URI should be URL-encoded.
 
     For example:
 
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index dc0f4c99d3..cd6e73b1f2 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -162,7 +162,8 @@ Templating
 More information
 """"""""""""""""
 
-See `Google Compute Engine API documentation <https://cloud.google.com/compute/docs/reference/rest/v1/instances/start>`_
+See `Google Compute Engine API documentation
+<https://cloud.google.com/compute/docs/reference/rest/v1/instances/start>`_.
 
 
 GceInstanceStopOperator
@@ -170,7 +171,7 @@ GceInstanceStopOperator
 
 Use the operator to stop Google Compute Engine instance.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStopOperator`
 
 Arguments
@@ -204,7 +205,8 @@ Templating
 More information
 """"""""""""""""
 
-See `Google Compute Engine API documentation <https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop>`_
+See `Google Compute Engine API documentation
+<https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop>`_.
 
 
 GceSetMachineTypeOperator
@@ -212,8 +214,8 @@ GceSetMachineTypeOperator
 
 Use the operator to change machine type of a Google Compute Engine instance.
 
-For parameter definition take a look at
-:class:`~airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator`
+For parameter definition, take a look at
+:class:`~airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator`.
 
 Arguments
 """""""""
@@ -252,7 +254,8 @@ Templating
 More information
 """"""""""""""""
 
-See `Google Compute Engine API documentation <https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType>`_
+See `Google Compute Engine API documentation
+<https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType>`_.
 
 
 GceInstanceTemplateCopyOperator
@@ -261,7 +264,7 @@ GceInstanceTemplateCopyOperator
 Use the operator to copy an existing Google Compute Engine instance template
 applying a patch to it.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceTemplateCopyOperator`.
 
 Arguments
@@ -300,14 +303,15 @@ Templating
 More information
 """"""""""""""""
 
-See `Google Compute Engine API documentation <https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates>`_
+See `Google Compute Engine API documentation
+<https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates>`_.
 
 GceInstanceGroupManagerUpdateTemplateOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 Use the operator to update template in Google Compute Engine Instance Group Manager.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceGroupManagerUpdateTemplateOperator`.
 
 Arguments
@@ -347,13 +351,15 @@ Troubleshooting
 """""""""""""""
 
 You might find that your GceInstanceGroupManagerUpdateTemplateOperator fails with
-missing permissions. The service account has to have Service Account User role assigned
-via IAM permissions in order to execute the operation.
+missing permissions. To execute the operation, the service account requires
+the permissions that theService Account User role provides
+(assigned via Google Cloud IAM).
 
 More information
 """"""""""""""""
 
-See `Google Compute Engine API documentation <https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers>`_
+See `Google Compute Engine API documentation
+<https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers>`_.
 
 Google Cloud Functions Operators
 --------------------------------
@@ -363,7 +369,7 @@ GcfFunctionDeleteOperator
 
 Use the operator to delete a function from Google Cloud Functions.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator`.
 
 Arguments
@@ -417,19 +423,21 @@ and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
     --role="roles/iam.serviceAccountUser"
 
 
-See `Adding the IAM service agent user role to the runtime service <https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account>`_  for details
+See `Adding the IAM service agent user role to the runtime service
+<https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account>`_.
 
 More information
 """"""""""""""""
 
-See `Google Cloud Functions API documentation <https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/delete>`_
+See `Google Cloud Functions API documentation
+<https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/delete>`_.
 
 GcfFunctionDeployOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^
 
 Use the operator to deploy a function to Google Cloud Functions.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator`.
 
 
@@ -463,7 +471,8 @@ Note that the neither the body nor the default args are complete in the above ex
 Depending on the variables set, there might be different variants on how to pass source
 code related fields. Currently, you can pass either sourceArchiveUrl, sourceRepository
 or sourceUploadUrl as described in the
-`Cloud Functions API specification <https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#CloudFunction>`_.
+`Cloud Functions API specification
+<https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#CloudFunction>`_.
 
 Additionally, default_args or direct operator args might contain zip_path parameter
 to run the extra step of uploading the source code before deploying it.
@@ -530,7 +539,8 @@ can be downloaded if necessary.
 More information
 """"""""""""""""
 
-See `Google Cloud Functions API documentation <https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/create>`_
+See `Google Cloud Functions API documentation
+<https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/create>`_.
 
 Google Cloud Sql Operators
 --------------------------
@@ -540,7 +550,7 @@ CloudSqlInstanceDatabaseCreateOperator
 
 Creates a new database inside a Cloud SQL instance.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseCreateOperator`.
 
 Arguments
@@ -589,7 +599,7 @@ CloudSqlInstanceDatabaseDeleteOperator
 
 Deletes a database from a Cloud SQL instance.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseDeleteOperator`.
 
 Arguments
@@ -633,7 +643,7 @@ Updates a resource containing information about a database inside a Cloud SQL in
 using patch semantics.
 See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabasePatchOperator`.
 
 Arguments
@@ -682,7 +692,7 @@ CloudSqlInstanceDeleteOperator
 
 Deletes a Cloud SQL instance in Google Cloud Platform.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDeleteOperator`.
 
 Arguments
@@ -719,6 +729,178 @@ More information
 See `Google Cloud SQL API documentation for delete
 <https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/delete>`_.
 
+.. CloudSqlInstanceExportOperator:
+
+CloudSqlInstanceExportOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump
+or CSV file.
+
+Note: This operator is idempotent. If executed multiple times with the same
+export file URI, the export file in GCS will simply be overridden.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceExportOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from Airflow variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_arguments]
+    :end-before: [END howto_operator_cloudsql_arguments]
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_export_import_arguments]
+    :end-before: [END howto_operator_cloudsql_export_import_arguments]
+
+Example body defining the export operation:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_export_body]
+    :end-before: [END howto_operator_cloudsql_export_body]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_export]
+    :end-before: [END howto_operator_cloudsql_export]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gcp_sql_export_template_fields]
+    :end-before: [END gcp_sql_export_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud SQL API documentation for export <https://cloud.google
+.com/sql/docs/mysql/admin-api/v1beta4/instances/export>`_.
+
+Troubleshooting
+"""""""""""""""
+
+If you receive an "Unauthorized" error in GCP, make sure that the service account
+of the Cloud SQL instance is authorized to write to the selected GCS bucket.
+
+It is not the service account configured in Airflow that communicates with GCS,
+but rather the service account of the particular Cloud SQL instance.
+
+To grant the service account with the appropriate WRITE permissions for the GCS bucket
+you can use the :class:`~airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageBucketCreateAclEntryOperator`,
+as shown in the example:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_export_gcs_permissions]
+    :end-before: [END howto_operator_cloudsql_export_gcs_permissions]
+
+
+.. CloudSqlInstanceImportOperator:
+
+CloudSqlInstanceImportOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Imports data into a Cloud SQL instance from a SQL dump or CSV file in Cloud Storage.
+
+CSV import:
+"""""""""""
+
+This operator is NOT idempotent for a CSV import. If the same file is imported
+multiple times, the imported data will be duplicated in the database.
+Moreover, if there are any unique constraints the duplicate import may result in an
+error.
+
+SQL import:
+"""""""""""
+
+This operator is idempotent for a SQL import if it was also exported by Cloud SQL.
+The exported SQL contains 'DROP TABLE IF EXISTS' statements for all tables
+to be imported.
+
+If the import file was generated in a different way, idempotence is not guaranteed.
+It has to be ensured on the SQL file level.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceImportOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from Airflow variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_arguments]
+    :end-before: [END howto_operator_cloudsql_arguments]
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_export_import_arguments]
+    :end-before: [END howto_operator_cloudsql_export_import_arguments]
+
+Example body defining the import operation:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_import_body]
+    :end-before: [END howto_operator_cloudsql_import_body]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_import]
+    :end-before: [END howto_operator_cloudsql_import]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gcp_sql_import_template_fields]
+    :end-before: [END gcp_sql_import_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud SQL API documentation for import <https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/import>`_.
+
+Troubleshooting
+"""""""""""""""
+
+If you receive an "Unauthorized" error in GCP, make sure that the service account
+of the Cloud SQL instance is authorized to read from the selected GCS object.
+
+It is not the service account configured in Airflow that communicates with GCS,
+but rather the service account of the particular Cloud SQL instance.
+
+To grant the service account with the appropriate READ permissions for the GCS object
+you can use the :class:`~airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageObjectCreateAclEntryOperator`,
+as shown in the example:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_import_gcs_permissions]
+    :end-before: [END howto_operator_cloudsql_import_gcs_permissions]
+
 .. _CloudSqlInstanceCreateOperator:
 
 CloudSqlInstanceCreateOperator
@@ -726,7 +908,7 @@ CloudSqlInstanceCreateOperator
 
 Creates a new Cloud SQL instance in Google Cloud Platform.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceCreateOperator`.
 
 If an instance with the same name exists, no action will be taken and the operator
@@ -770,9 +952,8 @@ Templating
 More information
 """"""""""""""""
 
-See `Google Cloud SQL API documentation for insert <https://cloud.google
-.com/sql/docs/mysql/admin-api/v1beta4/instances/insert>`_.
-
+See `Google Cloud SQL API documentation for insert
+<https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert>`_.
 
 .. _CloudSqlInstancePatchOperator:
 
@@ -781,7 +962,7 @@ CloudSqlInstancePatchOperator
 
 Updates settings of a Cloud SQL instance in Google Cloud Platform (partial update).
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstancePatchOperator`.
 
 This is a partial update, so only values for the settings specified in the body
@@ -826,44 +1007,44 @@ Templating
 More information
 """"""""""""""""
 
-See `Google Cloud SQL API documentation for patch <https://cloud.google
-.com/sql/docs/mysql/admin-api/v1beta4/instances/patch>`_.
+See `Google Cloud SQL API documentation for patch
+<https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch>`_.
 
 
 CloudSqlQueryOperator
 ^^^^^^^^^^^^^^^^^^^^^
 
 Performs DDL or DML SQL queries in Google Cloud SQL instance. The DQL
-(retrieving data from Google Cloud SQL) is not supported - you might run the SELECT
-queries but results of those queries are discarded.
+(retrieving data from Google Cloud SQL) is not supported. You might run the SELECT
+queries, but the results of those queries are discarded.
 
-You can specify various connectivity methods to connect to running instance -
+You can specify various connectivity methods to connect to running instance,
 starting from public IP plain connection through public IP with SSL or both TCP and
-socket connection via Cloud Sql Proxy. The proxy is downloaded and started/stopped
+socket connection via Cloud SQL Proxy. The proxy is downloaded and started/stopped
 dynamically as needed by the operator.
 
 There is a *gcpcloudsql://* connection type that you should use to define what
 kind of connectivity you want the operator to use. The connection is a "meta"
 type of connection. It is not used to make an actual connectivity on its own, but it
-determines whether Cloud Sql Proxy should be started by `CloudSqlDatabaseHook`
-and what kind of the database connection (Postgres or MySQL) should be created
-dynamically - to either connect to Cloud SQL via public IP address or via the proxy.
+determines whether Cloud SQL Proxy should be started by `CloudSqlDatabaseHook`
+and what kind of database connection (Postgres or MySQL) should be created
+dynamically to connect to Cloud SQL via public IP address or via the proxy.
 The 'CloudSqlDatabaseHook` uses
-:class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlProxyRunner` to manage Cloud Sql
-Proxy lifecycle (each task has its own Cloud Sql Proxy)
+:class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlProxyRunner` to manage Cloud SQL
+Proxy lifecycle (each task has its own Cloud SQL Proxy)
 
 When you build connection, you should use connection parameters as described in
 :class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlDatabaseHook`. You can see
 examples of connections below for all the possible types of connectivity. Such connection
-can be reused between different tasks (instances of `CloudSqlQueryOperator`) - each
+can be reused between different tasks (instances of `CloudSqlQueryOperator`). Each
 task will get their own proxy started if needed with their own TCP or UNIX socket.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlQueryOperator`.
 
-Since query operator can run arbitrary query - it cannot be guaranteed to be
-idempotent. SQL query designer should design the queries to be idempotent. For example
-both Postgres and MySql support CREATE TABLE IF NOT EXISTS statements that can be
+Since query operator can run arbitrary query, it cannot be guaranteed to be
+idempotent. SQL query designer should design the queries to be idempotent. For example,
+both Postgres and MySQL support CREATE TABLE IF NOT EXISTS statements that can be
 used to create tables in an idempotent way.
 
 Arguments
@@ -896,8 +1077,8 @@ of the connection URI should be URL-encoded:
 Using the operator
 """"""""""""""""""
 
-Example operators below are using all connectivity options (note connection id
-from the operator matches the `AIRFLOW_CONN_*` postfix uppercase - this is
+Example operators below are using all connectivity options. Note connection id
+from the operator matches the `AIRFLOW_CONN_*` postfix uppercase. This is
 standard AIRFLOW notation for defining connection via environment variables):
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql_query.py
@@ -917,9 +1098,8 @@ Templating
 More information
 """"""""""""""""
 
-See `Google Cloud Sql Proxy documentation
-<https://cloud.google.com/sql/docs/postgres/sql-proxy>`_
-for details about Cloud Sql Proxy.
+See `Google Cloud SQL Proxy documentation
+<https://cloud.google.com/sql/docs/postgres/sql-proxy>`_.
 
 Google Cloud Storage Operators
 ------------------------------
@@ -929,7 +1109,7 @@ GoogleCloudStorageBucketCreateAclEntryOperator
 
 Creates a new ACL entry on the specified bucket.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageBucketCreateAclEntryOperator`
 
 Arguments
@@ -964,15 +1144,14 @@ More information
 """"""""""""""""
 
 See `Google Cloud Storage BucketAccessControls insert documentation
-<https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert>`_
-for details.
+<https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert>`_.
 
 GoogleCloudStorageObjectCreateAclEntryOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 Creates a new ACL entry on the specified object.
 
-For parameter definition take a look at
+For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageObjectCreateAclEntryOperator`
 
 Arguments
@@ -1007,6 +1186,5 @@ More information
 """"""""""""""""
 
 See `Google Cloud Storage ObjectAccessControls insert documentation
-<https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert>`_
-for details.
+<https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert>`_.
 
diff --git a/docs/howto/run-with-systemd.rst b/docs/howto/run-with-systemd.rst
index 131fc3ddc0..bbd2235ae4 100644
--- a/docs/howto/run-with-systemd.rst
+++ b/docs/howto/run-with-systemd.rst
@@ -27,6 +27,5 @@ have been tested on Redhat based systems. You can copy those to
 based system) you probably need to adjust the unit files.
 
 Environment configuration is picked up from ``/etc/sysconfig/airflow``.
-An example file is supplied. Make sure to specify the ``SCHEDULER_RUNS``
-variable in this file when you run the scheduler. You
+An example file is supplied. You
 can also define here, for example, ``AIRFLOW_HOME`` or ``AIRFLOW_CONFIG``.
diff --git a/docs/installation.rst b/docs/installation.rst
index baabea3eb0..0c8be1ff40 100644
--- a/docs/installation.rst
+++ b/docs/installation.rst
@@ -94,6 +94,8 @@ Here's the list of the subpackages and what they enable:
 +---------------------+---------------------------------------------------+-------------------------------------------------+
 | kerberos            | ``pip install apache-airflow[kerberos]``          | Kerberos integration for Kerberized Hadoop      |
 +---------------------+---------------------------------------------------+-------------------------------------------------+
+| kubernetes          | ``pip install apache-airflow[kubernetes]``        | Kubernetes Executor and operator                |
++---------------------+---------------------------------------------------+-------------------------------------------------+
 | ldap                | ``pip install apache-airflow[ldap]``              | LDAP authentication for users                   |
 +---------------------+---------------------------------------------------+-------------------------------------------------+
 | mssql               | ``pip install apache-airflow[mssql]``             | Microsoft SQL Server operators and hook,        |
diff --git a/docs/integration.rst b/docs/integration.rst
index 7387fc25f4..db5a23210f 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -161,6 +161,35 @@ Logging
 Airflow can be configured to read and write task logs in Azure Blob Storage.
 See :ref:`write-logs-azure`.
 
+Azure CosmosDB 
+'''''''''''''''''' 
+ 
+AzureCosmosDBHook communicates via the Azure Cosmos library. Make sure that a 
+Airflow connection of type `azure_cosmos` exists. Authorization can be done by supplying a 
+login (=Endpoint uri), password (=secret key) and extra fields database_name and collection_name to specify the  
+default database and collection to use (see connection `azure_cosmos_default` for an example). 
+ 
+- :ref:`AzureCosmosDBHook`: Interface with Azure CosmosDB. 
+- :ref:`AzureCosmosInsertDocumentOperator`: Simple operator to insert document into CosmosDB. 
+- :ref:`AzureCosmosDocumentSensor`: Simple sensor to detect document existence in CosmosDB. 
+
+.. AzureCosmosDBHook: 
+ 
+AzureCosmosDBHook 
+""""""""" 
+ 
+.. autoclass:: airflow.contrib.hooks.azure_cosmos_hook.AzureCosmosDBHook 
+ 
+AzureCosmosInsertDocumentOperator 
+""""""""" 
+ 
+.. autoclass:: airflow.contrib.operators.azure_cosmos_insertdocument_operator.AzureCosmosInsertDocumentOperator 
+ 
+AzureCosmosDocumentSensor 
+""""""""" 
+ 
+.. autoclass:: airflow.contrib.sensors.azure_cosmos_sensor.AzureCosmosDocumentSensor 
+
 Azure Data Lake
 '''''''''''''''
 
@@ -194,6 +223,25 @@ AdlsToGoogleCloudStorageOperator
 
 .. autoclass:: airflow.contrib.operators.adls_to_gcs.AdlsToGoogleCloudStorageOperator
 
+Azure Batch AI
+''''''''''''''
+ Azure Batch AI provides a method to run a docker container without having to worry
+about managing infrastructure. The AzureBatchAIHook requires a service principal. The
+credentials for this principal can either be defined in the extra field `key_path`, as an 
+environment variable named `AZURE_AUTH_LOCATION`, or by providing a login/password and tenantId in extras.
+
+The AzureBatchAIHook requires a host/login/password to be defined in the connection.
+- :ref:`AzureBatchAIOperator` : Start/Monitor a new Azure Batch Workspace and Cluster.
+- :ref:`AzureBatchAIHook` : Wrapper around a single Batch AI workspace.
+
+AzureBatchAIOperator
+""""""""""""""""""""
+ .. autoclass:: airflow.contrib.operators.azure_batchai_operator.AzureContainerInstancesOperator
+
+AzureBatchAIHook
+""""""""""""""""
+ .. autoclass:: airflow.contrib.hooks.azure_batchai_hook.AzureContainerInstanceHook
+
 .. _AWS:
 
 AWS: Amazon Web Services
@@ -398,6 +446,70 @@ AwsFirehoseHook
 
 .. autoclass:: airflow.contrib.hooks.aws_firehose_hook.AwsFirehoseHook
 
+Amazon SageMaker
+''''''''''''''''
+
+For more instructions on using Amazon SageMaker in Airflow, please see `the SageMaker Python SDK README`_.
+
+.. _the SageMaker Python SDK README: https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/workflow/README.rst
+
+- :ref:`SageMakerHook` : Interact with Amazon SageMaker.
+- :ref:`SageMakerTrainingOperator` : Create a SageMaker training job.
+- :ref:`SageMakerTuningOperator` : Create a SageMaker tuning job.
+- :ref:`SageMakerModelOperator` : Create a SageMaker model.
+- :ref:`SageMakerTransformOperator` : Create a SageMaker transform job.
+- :ref:`SageMakerEndpointConfigOperator` : Create a SageMaker endpoint config.
+- :ref:`SageMakerEndpointOperator` : Create a SageMaker endpoint.
+
+.. _SageMakerHook:
+
+SageMakerHook
+"""""""""""""
+
+.. autoclass:: airflow.contrib.hooks.sagemaker_hook.SageMakerHook
+
+.. _SageMakerTrainingOperator:
+
+SageMakerTrainingOperator
+"""""""""""""""""""""""""
+
+.. autoclass:: airflow.contrib.operators.sagemaker_training_operator.SageMakerTrainingOperator
+
+.. _SageMakerTuningOperator:
+
+SageMakerTuningOperator
+"""""""""""""""""""""""
+
+.. autoclass:: airflow.contrib.operators.sagemaker_tuning_operator.SageMakerTuningOperator
+
+.. _SageMakerModelOperator:
+
+SageMakerModelOperator
+""""""""""""""""""""""
+
+.. autoclass:: airflow.contrib.operators.sagemaker_model_operator.SageMakerModelOperator
+
+.. _SageMakerTransformOperator:
+
+SageMakerTransformOperator
+""""""""""""""""""""""""""
+
+.. autoclass:: airflow.contrib.operators.sagemaker_transform_operator.SageMakerTransformOperator
+
+.. _SageMakerEndpointConfigOperator:
+
+SageMakerEndpointConfigOperator
+"""""""""""""""""""""""""""""""
+
+.. autoclass:: airflow.contrib.operators.sagemaker_endpoint_config_operator.SageMakerEndpointConfigOperator
+
+.. _SageMakerEndpointOperator:
+
+SageMakerEndpointOperator
+"""""""""""""""""""""""""
+
+.. autoclass:: airflow.contrib.operators.sagemaker_endpoint_operator.SageMakerEndpointOperator
+
 .. _Databricks:
 
 Databricks
@@ -556,6 +668,8 @@ Cloud SQL Operators
 - :ref:`CloudSqlInstanceDatabasePatchOperator` : updates a database inside a Cloud
   SQL instance.
 - :ref:`CloudSqlInstanceDeleteOperator` : delete a Cloud SQL instance.
+- :ref:`CloudSqlInstanceExportOperator` : exports data from a Cloud SQL instance.
+- :ref:`CloudSqlInstanceImportOperator` : imports data into a Cloud SQL instance.
 - :ref:`CloudSqlInstanceCreateOperator` : create a new Cloud SQL instance.
 - :ref:`CloudSqlInstancePatchOperator` : patch a Cloud SQL instance.
 - :ref:`CloudSqlQueryOperator` : run query in a Cloud SQL instance.
@@ -588,6 +702,20 @@ CloudSqlInstanceDeleteOperator
 
 .. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDeleteOperator
 
+.. _CloudSqlInstanceExportOperator:
+
+CloudSqlInstanceExportOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceExportOperator
+
+.. _CloudSqlInstanceImportOperator:
+
+CloudSqlInstanceImportOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceImportOperator
+
 .. _CloudSqlInstanceCreateOperator:
 
 CloudSqlInstanceCreateOperator
@@ -641,11 +769,11 @@ Compute Engine Operators
 - :ref:`GceInstanceGroupManagerUpdateTemplateOperator` : patch the Instance Group Manager,
   replacing source Instance Template URL with the destination one.
 
-The operators have common base operator:
+The operators have the common base operator:
 
 .. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceBaseOperator
 
-They also use :ref:`GceHook` hook to communicate with Google Cloud Platform.
+They also use :ref:`GceHook` to communicate with Google Cloud Platform.
 
 .. _GceInstanceStartOperator:
 
@@ -702,7 +830,7 @@ Cloud Functions Operators
 
 .. autoclass:: airflow.contrib.operators.gcp_operator.GCP
 
-They also use :ref:`GcfHook` hook to communicate with Google Cloud Platform.
+They also use :ref:`GcfHook` to communicate with Google Cloud Platform.
 
 .. _GcfFunctionDeployOperator:
 
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index bb2d99e3ad..479cef87f6 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -34,6 +34,9 @@ Kubernetes Operator
     from airflow.contrib.operators import KubernetesOperator
     from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
     from airflow.contrib.kubernetes.secret import Secret
+    from airflow.contrib.kubernetes.volume import Volume
+    from airflow.contrib.kubernetes.volume_mount import VolumeMount
+
 
     secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
     secret_env  = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
diff --git a/scripts/systemd/README b/scripts/systemd/README
index c53144a04c..e32df5f52f 100644
--- a/scripts/systemd/README
+++ b/scripts/systemd/README
@@ -6,6 +6,6 @@ You can then start the different servers by using systemctl start <service>. Ena
  systemctl enable <service>.
 
 By default the environment configuration points to /etc/sysconfig/airflow . You can copy the "airflow" file in this
-directory and adjust it to your liking. Make sure to specify the SCHEDULER_RUNS variable.
+directory and adjust it to your liking.
 
-With some minor changes they probably work on other systemd systems.
\ No newline at end of file
+With some minor changes they probably work on other systemd systems.
diff --git a/setup.py b/setup.py
index f4d226de5b..4de6bfda54 100644
--- a/setup.py
+++ b/setup.py
@@ -145,12 +145,15 @@ def write_version(filename=os.path.join(*['airflow',
     'gevent>=0.13'
 ]
 atlas = ['atlasclient>=0.1.2']
+azure_batch_ai = ['azure-mgmt-batchai==0.2.0']
+azure_resources = ['azure-mgmt-resource==2.0.0']
 azure_blob_storage = ['azure-storage>=0.34.0']
 azure_data_lake = [
     'azure-mgmt-resource==1.2.2',
     'azure-mgmt-datalake-store==0.4.0',
     'azure-datalake-store==0.0.19'
 ]
+azure_cosmos = ['azure-cosmos>=3.0.1']
 cassandra = ['cassandra-driver>=3.13.0']
 celery = [
     'celery>=4.1.1, <4.2.0',
@@ -217,7 +220,7 @@ def write_version(filename=os.path.join(*['airflow',
 ]
 pinot = ['pinotdb>=0.1.1']
 postgres = ['psycopg2-binary>=2.7.4']
-qds = ['qds-sdk>=1.9.6']
+qds = ['qds-sdk>=1.10.4']
 rabbitmq = ['librabbitmq>=1.6.1']
 redis = ['redis>=2.10.5,<3.0.0']
 s3 = ['boto3>=1.7.0, <1.8.0']
@@ -265,11 +268,12 @@ def write_version(filename=os.path.join(*['airflow',
 
 devel_minreq = devel + kubernetes + mysql + doc + password + s3 + cgroups
 devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
+devel_azure = devel_minreq + azure_data_lake + azure_cosmos
 devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle +
              docker + ssh + kubernetes + celery + azure_blob_storage + redis + gcp_api +
              datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
              druid + pinot + segment + snowflake + elasticsearch + azure_data_lake +
-             atlas)
+             azure_batch_ai + azure_cosmos + atlas)
 
 # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
 if PY3:
@@ -343,6 +347,7 @@ def do_setup():
             'async': async_packages,
             'azure_blob_storage': azure_blob_storage,
             'azure_data_lake': azure_data_lake,
+            'azure_cosmos': azure_cosmos,
             'cassandra': cassandra,
             'celery': celery,
             'cgroups': cgroups,
@@ -353,6 +358,7 @@ def do_setup():
             'datadog': datadog,
             'devel': devel_minreq,
             'devel_hadoop': devel_hadoop,
+            'devel_azure': devel_azure,
             'doc': doc,
             'docker': docker,
             'druid': druid,
diff --git a/tests/contrib/hooks/test_azure_cosmos_hook.py b/tests/contrib/hooks/test_azure_cosmos_hook.py
new file mode 100644
index 0000000000..653242a34b
--- /dev/null
+++ b/tests/contrib/hooks/test_azure_cosmos_hook.py
@@ -0,0 +1,202 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+import json
+import unittest
+import uuid
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.azure_cosmos_hook import AzureCosmosDBHook
+
+from airflow import configuration
+from airflow import models
+from airflow.utils import db
+
+import logging
+
+try:
+    from unittest import mock
+
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestAzureCosmosDbHook(unittest.TestCase):
+
+    # Set up an environment to test with
+    def setUp(self):
+        # set up some test variables
+        self.test_end_point = 'https://test_endpoint:443'
+        self.test_master_key = 'magic_test_key'
+        self.test_database_name = 'test_database_name'
+        self.test_collection_name = 'test_collection_name'
+        self.test_database_default = 'test_database_default'
+        self.test_collection_default = 'test_collection_default'
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='azure_cosmos_test_key_id',
+                conn_type='azure_cosmos',
+                login=self.test_end_point,
+                password=self.test_master_key,
+                extra=json.dumps({'database_name': self.test_database_default,
+                                  'collection_name': self.test_collection_default})
+            )
+        )
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_create_database(self, cosmos_mock):
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        self.cosmos.create_database(self.test_database_name)
+        expected_calls = [mock.call().CreateDatabase({'id': self.test_database_name})]
+        cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
+        cosmos_mock.assert_has_calls(expected_calls)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_create_database_exception(self, cosmos_mock):
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        self.assertRaises(AirflowException, self.cosmos.create_database, None)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_create_container_exception(self, cosmos_mock):
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        self.assertRaises(AirflowException, self.cosmos.create_collection, None)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_create_container(self, cosmos_mock):
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        self.cosmos.create_collection(self.test_collection_name, self.test_database_name)
+        expected_calls = [mock.call().CreateContainer(
+            'dbs/test_database_name',
+            {'id': self.test_collection_name})]
+        cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
+        cosmos_mock.assert_has_calls(expected_calls)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_create_container_default(self, cosmos_mock):
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        self.cosmos.create_collection(self.test_collection_name)
+        expected_calls = [mock.call().CreateContainer(
+            'dbs/test_database_default',
+            {'id': self.test_collection_name})]
+        cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
+        cosmos_mock.assert_has_calls(expected_calls)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_upsert_document_default(self, cosmos_mock):
+        test_id = str(uuid.uuid4())
+        cosmos_mock.return_value.CreateItem.return_value = {'id': test_id}
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        returned_item = self.cosmos.upsert_document({'id': test_id})
+        expected_calls = [mock.call().CreateItem(
+            'dbs/' + self.test_database_default + '/colls/' + self.test_collection_default,
+            {'id': test_id})]
+        cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
+        cosmos_mock.assert_has_calls(expected_calls)
+        logging.getLogger().info(returned_item)
+        self.assertEqual(returned_item['id'], test_id)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_upsert_document(self, cosmos_mock):
+        test_id = str(uuid.uuid4())
+        cosmos_mock.return_value.CreateItem.return_value = {'id': test_id}
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        returned_item = self.cosmos.upsert_document(
+            {'data1': 'somedata'},
+            database_name=self.test_database_name,
+            collection_name=self.test_collection_name,
+            document_id=test_id)
+
+        expected_calls = [mock.call().CreateItem(
+            'dbs/' + self.test_database_name + '/colls/' + self.test_collection_name,
+            {'data1': 'somedata', 'id': test_id})]
+
+        cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
+        cosmos_mock.assert_has_calls(expected_calls)
+        logging.getLogger().info(returned_item)
+        self.assertEqual(returned_item['id'], test_id)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_insert_documents(self, cosmos_mock):
+        test_id1 = str(uuid.uuid4())
+        test_id2 = str(uuid.uuid4())
+        test_id3 = str(uuid.uuid4())
+        documents = [
+            {'id': test_id1, 'data': 'data1'},
+            {'id': test_id2, 'data': 'data2'},
+            {'id': test_id3, 'data': 'data3'}]
+
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        returned_item = self.cosmos.insert_documents(documents)
+        expected_calls = [
+            mock.call().CreateItem(
+                'dbs/' + self.test_database_default + '/colls/' + self.test_collection_default,
+                {'data': 'data1', 'id': test_id1}),
+            mock.call().CreateItem(
+                'dbs/' + self.test_database_default + '/colls/' + self.test_collection_default,
+                {'data': 'data2', 'id': test_id2}),
+            mock.call().CreateItem(
+                'dbs/' + self.test_database_default + '/colls/' + self.test_collection_default,
+                {'data': 'data3', 'id': test_id3})]
+        logging.getLogger().info(returned_item)
+        cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
+        cosmos_mock.assert_has_calls(expected_calls)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_delete_database(self, cosmos_mock):
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        self.cosmos.delete_database(self.test_database_name)
+        expected_calls = [mock.call().DeleteDatabase('dbs/test_database_name')]
+        cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
+        cosmos_mock.assert_has_calls(expected_calls)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_delete_database_exception(self, cosmos_mock):
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        self.assertRaises(AirflowException, self.cosmos.delete_database, None)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_delete_container_exception(self, cosmos_mock):
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        self.assertRaises(AirflowException, self.cosmos.delete_collection, None)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_delete_container(self, cosmos_mock):
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        self.cosmos.delete_collection(self.test_collection_name, self.test_database_name)
+        expected_calls = [mock.call().DeleteContainer('dbs/test_database_name/colls/test_collection_name')]
+        cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
+        cosmos_mock.assert_has_calls(expected_calls)
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_delete_container_default(self, cosmos_mock):
+        self.cosmos = AzureCosmosDBHook(azure_cosmos_conn_id='azure_cosmos_test_key_id')
+        self.cosmos.delete_collection(self.test_collection_name)
+        expected_calls = [mock.call().DeleteContainer('dbs/test_database_default/colls/test_collection_name')]
+        cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
+        cosmos_mock.assert_has_calls(expected_calls)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/contrib/hooks/test_gcp_sql_hook.py b/tests/contrib/hooks/test_gcp_sql_hook.py
new file mode 100644
index 0000000000..cb56237736
--- /dev/null
+++ b/tests/contrib/hooks/test_gcp_sql_hook.py
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from googleapiclient.errors import HttpError
+
+from airflow.contrib.hooks.gcp_sql_hook import CloudSqlHook
+from airflow.exceptions import AirflowException
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestGcpSqlHook(unittest.TestCase):
+    def test_instance_import_ex(self):
+        # Mocking __init__ with an empty anonymous function
+        with mock.patch.object(CloudSqlHook, "__init__", lambda x, y, z: None):
+            hook = CloudSqlHook(None, None)
+            # Simulating HttpError inside import_instance
+            hook.get_conn = mock.Mock(
+                side_effect=HttpError(resp={'status': '400'},
+                                      content='Error content'.encode('utf-8'))
+            )
+            with self.assertRaises(AirflowException) as cm:
+                hook.import_instance(None, None, None)
+            err = cm.exception
+            self.assertIn("Importing instance ", str(err))
+
+    def test_instance_export_ex(self):
+        # Mocking __init__ with an empty anonymous function
+        with mock.patch.object(CloudSqlHook, "__init__", lambda x, y, z: None):
+            hook = CloudSqlHook(None, None)
+            # Simulating HttpError inside export_instance
+            hook.get_conn = mock.Mock(
+                side_effect=HttpError(resp={'status': '400'},
+                                      content='Error content'.encode('utf-8'))
+            )
+            with self.assertRaises(AirflowException) as cm:
+                hook.export_instance(None, None, None)
+            err = cm.exception
+            self.assertIn("Exporting instance ", str(err))
diff --git a/tests/contrib/operators/mycredentials.json b/tests/contrib/operators/mycredentials.json
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests/contrib/operators/test_azure_batchai_operator.py b/tests/contrib/operators/test_azure_batchai_operator.py
new file mode 100644
index 0000000000..a602b20f9b
--- /dev/null
+++ b/tests/contrib/operators/test_azure_batchai_operator.py
@@ -0,0 +1,104 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+from airflow import configuration
+from airflow.exceptions import AirflowException
+from airflow.contrib.operators.azure_batchai_operator import AzureBatchAIOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+CONFIG_DATA = {
+    "clientId": "Id",
+    "clientSecret": "secret",
+    "tenantId": "tenant",
+    "subscription_id": "subscription",
+}
+
+
+class TestAzureBatchAIOperator(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.azure_batchai_operator.AzureBatchAIHook')
+    def setUp(self, azure_batchai_hook_mock):
+        configuration.load_test_config()
+
+        self.azure_batchai_hook_mock = azure_batchai_hook_mock(CONFIG_DATA)
+        self.batch = AzureBatchAIOperator('azure_batchai_default',
+                                          'batch-ai-test-rg',
+                                          'batch-ai-workspace',
+                                          'batch-ai-cluster',
+                                          'eastus',
+                                          'auto',
+                                          environment_variables=CONFIG_DATA,
+                                          volumes=[],
+                                          task_id='test_operator')
+
+    @mock.patch('airflow.contrib.operators.azure_batchai_operator.AzureBatchAIHook')
+    def test_execute(self, abai_mock):
+        abai_mock.return_value.get_state_exitcode.return_value = 'Terminated', 0
+        self.batch = AzureBatchAIOperator('azure_batchai_default',
+                                          'batch-ai-test-rg',
+                                          'batch-ai-workspace',
+                                          'batch-ai-cluster',
+                                          'eastus',
+                                          'auto',
+                                          environment_variables={
+                                              'USERNAME': 'azureuser',
+                                              'PASSWORD': 'azurepass'
+                                          },
+                                          volumes=[],
+                                          task_id='test_operator')
+        self.batch.execute()
+
+        self.assertEqual(self.batch.resource_group, 'batch-ai-test-rg')
+        self.assertEqual(self.batch.workspace_name, 'batch-ai-workspace')
+        self.assertEqual(self.batch.cluster_name, 'batch-ai-cluster')
+        self.assertEqual(self.batch.location, 'eastus')
+        self.assertEqual(self.batch.scale_type, 'auto')
+
+    @mock.patch('airflow.contrib.operators.azure_batchai_operator.AzureBatchAIHook')
+    def test_execute_with_failures(self, abai_mock):
+        abai_mock.return_value.get_state_exitcode.return_value = "Terminated", 1
+        self.batch = AzureBatchAIOperator('azure_default',
+                                          'batch-ai-test-rg',
+                                          'batch-ai-workspace',
+                                          'batch-ai-cluster',
+                                          'eastus',
+                                          'auto',
+                                          environment_variables={
+                                              'USERNAME': 'azureuser',
+                                              'PASSWORD': 'azurepass'
+                                          },
+                                          volumes=[],
+                                          task_id='test_operator')
+
+        with self.assertRaises(AirflowException):
+            self.batch.execute()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/contrib/operators/test_azure_cosmos_insertdocument_operator.py b/tests/contrib/operators/test_azure_cosmos_insertdocument_operator.py
new file mode 100644
index 0000000000..26099d0cb3
--- /dev/null
+++ b/tests/contrib/operators/test_azure_cosmos_insertdocument_operator.py
@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+import json
+import unittest
+import uuid
+
+from airflow.contrib.operators.azure_cosmos_insertdocument_operator import AzureCosmosInsertDocumentOperator
+
+from airflow import configuration
+from airflow import models
+from airflow.utils import db
+
+try:
+    from unittest import mock
+
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestAzureCosmosDbHook(unittest.TestCase):
+
+    # Set up an environment to test with
+    def setUp(self):
+        # set up some test variables
+        self.test_end_point = 'https://test_endpoint:443'
+        self.test_master_key = 'magic_test_key'
+        self.test_database_name = 'test_database_name'
+        self.test_collection_name = 'test_collection_name'
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='azure_cosmos_test_key_id',
+                conn_type='azure_cosmos',
+                login=self.test_end_point,
+                password=self.test_master_key,
+                extra=json.dumps({'database_name': self.test_database_name,
+                                  'collection_name': self.test_collection_name})
+            )
+        )
+
+    @mock.patch('azure.cosmos.cosmos_client.CosmosClient')
+    def test_insert_document(self, cosmos_mock):
+        test_id = str(uuid.uuid4())
+        cosmos_mock.return_value.CreateItem.return_value = {'id': test_id}
+        self.cosmos = AzureCosmosInsertDocumentOperator(
+            database_name=self.test_database_name,
+            collection_name=self.test_collection_name,
+            document={'id': test_id, 'data': 'sometestdata'},
+            azure_cosmos_conn_id='azure_cosmos_test_key_id',
+            task_id='azure_cosmos_sensor')
+
+        expected_calls = [mock.call().CreateItem(
+            'dbs/' + self.test_database_name + '/colls/' + self.test_collection_name,
+            {'data': 'sometestdata', 'id': test_id})]
+
+        self.cosmos.execute(None)
+        cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
+        cosmos_mock.assert_has_calls(expected_calls)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/contrib/operators/test_gcp_sql_operator.py b/tests/contrib/operators/test_gcp_sql_operator.py
index 516fcef4aa..9f631493e0 100644
--- a/tests/contrib/operators/test_gcp_sql_operator.py
+++ b/tests/contrib/operators/test_gcp_sql_operator.py
@@ -18,17 +18,18 @@
 # under the License.
 import json
 import os
-import time
 import unittest
-from uuid import uuid1
 
+import time
 from parameterized import parameterized
+from uuid import uuid1
 
 from airflow import AirflowException
 from airflow.contrib.hooks.gcp_sql_hook import CloudSqlProxyRunner
 from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
     CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \
     CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \
+    CloudSqlInstanceExportOperator, CloudSqlInstanceImportOperator, \
     CloudSqlInstanceDatabaseDeleteOperator, CloudSqlQueryOperator
 from airflow.models import Connection
 from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
@@ -137,6 +138,36 @@
     "charset": "utf16",
     "collation": "utf16_general_ci"
 }
+EXPORT_BODY = {
+    "exportContext": {
+        "fileType": "CSV",
+        "uri": "gs://bucketName/fileName",
+        "databases": [],
+        "sqlExportOptions": {
+            "tables": [
+                "table1", "table2"
+            ],
+            "schemaOnly": False
+        },
+        "csvExportOptions": {
+            "selectQuery": "SELECT * FROM ..."
+        }
+    }
+}
+IMPORT_BODY = {
+    "importContext": {
+        "fileType": "CSV",
+        "uri": "gs://bucketName/fileName",
+        "database": "db1",
+        "importUser": "",
+        "csvImportOptions": {
+            "table": "my_table",
+            "columns": [
+                "col1", "col2"
+            ]
+        }
+    }
+}
 
 
 class CloudSqlTest(unittest.TestCase):
@@ -152,13 +183,15 @@ def test_instance_create(self, mock_hook, _check_if_instance_exists):
             body=CREATE_BODY,
             task_id="id"
         )
-        result = op.execute(None)
+        result = op.execute(context={
+            'task_instance': mock.Mock()
+        })
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.create_instance.assert_called_once_with(
             PROJECT_ID, CREATE_BODY
         )
-        self.assertTrue(result)
+        self.assertIsNone(result)
 
     @mock.patch("airflow.contrib.operators.gcp_sql_operator"
                 ".CloudSqlInstanceCreateOperator._check_if_instance_exists")
@@ -172,11 +205,13 @@ def test_instance_create_idempotent(self, mock_hook, _check_if_instance_exists):
             body=CREATE_BODY,
             task_id="id"
         )
-        result = op.execute(None)
+        result = op.execute(context={
+            'task_instance': mock.Mock()
+        })
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.create_instance.assert_not_called()
-        self.assertTrue(result)
+        self.assertIsNone(result)
 
     @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
     def test_create_should_throw_ex_when_empty_project_id(self, mock_hook):
@@ -475,6 +510,40 @@ def test_instance_db_delete_should_abort_and_succeed_if_not_exists(
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.delete_database.assert_not_called()
 
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_export(self, mock_hook):
+        mock_hook.return_value.export_instance.return_value = True
+        op = CloudSqlInstanceExportOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            body=EXPORT_BODY,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.export_instance.assert_called_once_with(
+            PROJECT_ID, INSTANCE_NAME, EXPORT_BODY
+        )
+        self.assertTrue(result)
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_import(self, mock_hook):
+        mock_hook.return_value.export_instance.return_value = True
+        op = CloudSqlInstanceImportOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            body=IMPORT_BODY,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.import_instance.assert_called_once_with(
+            PROJECT_ID, INSTANCE_NAME, IMPORT_BODY
+        )
+        self.assertTrue(result)
+
 
 class CloudSqlQueryValidationTest(unittest.TestCase):
     @parameterized.expand([
@@ -492,8 +561,8 @@ class CloudSqlQueryValidationTest(unittest.TestCase):
          "Invalid database type 'wrong'. Must be one of ['postgres', 'mysql']"),
         ('project_id', 'location', 'instance_name', 'postgres', True, True,
          'SELECT * FROM TEST',
-         "Cloud Sql Proxy does not support SSL connections. SSL is not needed as"
-         " Cloud Sql Proxy provides encryption on its own"),
+         "Cloud SQL Proxy does not support SSL connections. SSL is not needed as"
+         " Cloud SQL Proxy provides encryption on its own"),
         ('project_id', 'location', 'instance_name', 'postgres', False, True,
          'SELECT * FROM TEST',
          "SSL connections requires sslcert to be set"),
@@ -789,6 +858,19 @@ def test_start_proxy_with_all_instances_specific_version(self):
         self.assertEqual(runner.get_proxy_version(), "1.13")
 
 
+@unittest.skipIf(
+    BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
+class CloudSqlExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudSqlExampleDagsIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_sql',
+            gcp_key=GCP_CLOUDSQL_KEY)
+
+    def test_run_example_dag_cloudsql_query(self):
+        self._run_dag()
+
+
 @unittest.skipIf(
     BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
 class CloudSqlQueryExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services