You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/04 13:50:17 UTC

[GitHub] kaxil closed pull request #4354: [AIRFLOW-3446] Add Google Cloud BigTable operators

kaxil closed pull request #4354: [AIRFLOW-3446] Add Google Cloud BigTable operators
URL: https://github.com/apache/incubator-airflow/pull/4354
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_gcp_bigtable_operators.py b/airflow/contrib/example_dags/example_gcp_bigtable_operators.py
new file mode 100644
index 0000000000..48c4245cba
--- /dev/null
+++ b/airflow/contrib/example_dags/example_gcp_bigtable_operators.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# 'License'); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that creates and performs following operations on Cloud Bigtable:
+- creates an Instance
+- creates a Table
+- updates Cluster
+- waits for Table replication completeness
+- deletes the Table
+- deletes the Instance
+
+This DAG relies on the following environment variables
+* GCP_PROJECT_ID - Google Cloud Platform project
+* CBT_INSTANCE_ID - desired ID of a Cloud Bigtable instance
+* CBT_INSTANCE_DISPLAY_NAME - desired human-readable display name of the Instance
+* CBT_INSTANCE_TYPE - type of the Instance, e.g. 1 for DEVELOPMENT
+    See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance
+* CBT_INSTANCE_LABELS - labels to add for the Instance
+* CBT_CLUSTER_ID - desired ID of the main Cluster created for the Instance
+* CBT_CLUSTER_ZONE - zone in which main Cluster will be created. e.g. europe-west1-b
+    See available zones: https://cloud.google.com/bigtable/docs/locations
+* CBT_CLUSTER_NODES - initial amount of nodes of the Cluster
+* CBT_CLUSTER_NODES_UPDATED - amount of nodes for BigtableClusterUpdateOperator
+* CBT_CLUSTER_STORAGE_TYPE - storage for the Cluster, e.g. 1 for SSD
+    See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.cluster # noqa: E501
+* CBT_TABLE_ID - desired ID of the Table
+* CBT_POKE_INTERVAL - number of seconds between every attempt of Sensor check
+
+"""
+
+import datetime
+import json
+
+from os import getenv
+
+import airflow
+from airflow import models
+from airflow.contrib.operators.gcp_bigtable_operator import BigtableInstanceCreateOperator, \
+    BigtableInstanceDeleteOperator, BigtableClusterUpdateOperator, BigtableTableCreateOperator, \
+    BigtableTableWaitForReplicationSensor, BigtableTableDeleteOperator
+
+# [START howto_operator_gcp_bigtable_args]
+GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project')
+CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id')
+CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name')
+CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2')
+CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}')
+CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id')
+CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b')
+CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3')
+CBT_CLUSTER_NODES_UPDATED = getenv('CBT_CLUSTER_NODES_UPDATED', '5')
+CBT_CLUSTER_STORAGE_TYPE = getenv('CBT_CLUSTER_STORAGE_TYPE', '2')
+CBT_TABLE_ID = getenv('CBT_TABLE_ID', 'some-table-id')
+CBT_POKE_INTERVAL = getenv('CBT_POKE_INTERVAL', '60')
+# [END howto_operator_gcp_bigtable_args]
+
+default_args = {
+    'start_date': airflow.utils.dates.days_ago(1)
+}
+
+with models.DAG(
+    'example_gcp_bigtable_operators',
+    default_args=default_args,
+    schedule_interval=datetime.timedelta(days=1)
+) as dag:
+    # [START howto_operator_gcp_bigtable_instance_create]
+    create_instance_task = BigtableInstanceCreateOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=CBT_INSTANCE_ID,
+        main_cluster_id=CBT_CLUSTER_ID,
+        main_cluster_zone=CBT_CLUSTER_ZONE,
+        instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
+        instance_type=int(CBT_INSTANCE_TYPE),
+        instance_labels=json.loads(CBT_INSTANCE_LABELS),
+        cluster_nodes=int(CBT_CLUSTER_NODES),
+        cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
+        task_id='create_instance',
+    )
+    # [END howto_operator_gcp_bigtable_instance_create]
+
+    # [START howto_operator_gcp_bigtable_cluster_update]
+    cluster_update_task = BigtableClusterUpdateOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=CBT_INSTANCE_ID,
+        cluster_id=CBT_CLUSTER_ID,
+        nodes=int(CBT_CLUSTER_NODES_UPDATED),
+        task_id='update_cluster',
+    )
+    # [END howto_operator_gcp_bigtable_cluster_update]
+
+    # [START howto_operator_gcp_bigtable_instance_delete]
+    delete_instance_task = BigtableInstanceDeleteOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=CBT_INSTANCE_ID,
+        task_id='delete_instance',
+    )
+    # [END howto_operator_gcp_bigtable_instance_delete]
+
+    # [START howto_operator_gcp_bigtable_table_create]
+    create_table_task = BigtableTableCreateOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=CBT_INSTANCE_ID,
+        table_id=CBT_TABLE_ID,
+        task_id='create_table',
+    )
+    # [END howto_operator_gcp_bigtable_table_create]
+
+    # [START howto_operator_gcp_bigtable_table_wait_for_replication]
+    wait_for_table_replication_task = BigtableTableWaitForReplicationSensor(
+        project_id=GCP_PROJECT_ID,
+        instance_id=CBT_INSTANCE_ID,
+        table_id=CBT_TABLE_ID,
+        poke_interval=int(CBT_POKE_INTERVAL),
+        task_id='wait_for_table_replication',
+    )
+    # [END howto_operator_gcp_bigtable_table_wait_for_replication]
+
+    # [START howto_operator_gcp_bigtable_table_delete]
+    delete_table_task = BigtableTableDeleteOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=CBT_INSTANCE_ID,
+        table_id=CBT_TABLE_ID,
+        task_id='delete_table',
+    )
+    # [END howto_operator_gcp_bigtable_table_delete]
+
+    wait_for_table_replication_task >> delete_table_task
+    create_instance_task \
+        >> create_table_task \
+        >> cluster_update_task \
+        >> delete_table_task \
+        >> delete_instance_task
diff --git a/airflow/contrib/hooks/gcp_bigtable_hook.py b/airflow/contrib/hooks/gcp_bigtable_hook.py
new file mode 100644
index 0000000000..5d1b6f01c9
--- /dev/null
+++ b/airflow/contrib/hooks/gcp_bigtable_hook.py
@@ -0,0 +1,231 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from google.cloud.bigtable import Client
+from google.cloud.bigtable.cluster import Cluster
+from google.cloud.bigtable.instance import Instance
+from google.cloud.bigtable.table import Table
+from google.cloud.bigtable_admin_v2 import enums
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+
+class BigtableHook(GoogleCloudBaseHook):
+    """
+    Hook for Google Cloud Bigtable APIs.
+    """
+
+    _client = None
+
+    def __init__(self,
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None):
+        super(BigtableHook, self).__init__(gcp_conn_id, delegate_to)
+
+    def get_client(self, project_id):
+        if not self._client:
+            self._client = Client(project=project_id, credentials=self._get_credentials(), admin=True)
+        return self._client
+
+    def get_instance(self, project_id, instance_id):
+        """
+        Retrieves and returns the specified Cloud Bigtable instance if it exists.
+        Otherwise, returns None.
+
+        :param project_id: The ID of the GCP project.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Bigtable instance.
+        :type instance_id: str
+        """
+
+        client = self.get_client(project_id)
+
+        instance = Instance(instance_id, client)
+        if not instance.exists():
+            return None
+        return instance
+
+    def delete_instance(self, project_id, instance_id):
+        """
+        Deletes the specified Cloud Bigtable instance.
+        Raises google.api_core.exceptions.NotFound if the Cloud Bigtable instance does not exist.
+
+        :param project_id: The ID of the GCP project.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Bigtable instance.
+        :type instance_id: str
+        """
+        instance = Instance(instance_id, self.get_client(project_id))
+        instance.delete()
+
+    def create_instance(self,
+                        project_id,
+                        instance_id,
+                        main_cluster_id,
+                        main_cluster_zone,
+                        replica_cluster_id=None,
+                        replica_cluster_zone=None,
+                        instance_display_name=None,
+                        instance_type=enums.Instance.Type.TYPE_UNSPECIFIED,
+                        instance_labels=None,
+                        cluster_nodes=None,
+                        cluster_storage_type=enums.StorageType.STORAGE_TYPE_UNSPECIFIED,
+                        timeout=None):
+        """
+        Creates new instance.
+
+        :type project_id: str
+        :param project_id: The ID of the GCP project.
+        :type instance_id: str
+        :param instance_id: The ID for the new instance.
+        :type main_cluster_id: str
+        :param main_cluster_id: The ID for main cluster for the new instance.
+        :type main_cluster_zone: str
+        :param main_cluster_zone: The zone for main cluster.
+            See https://cloud.google.com/bigtable/docs/locations for more details.
+        :type replica_cluster_id: str
+        :param replica_cluster_id: (optional) The ID for replica cluster for the new instance.
+        :type replica_cluster_zone: str
+        :param replica_cluster_zone: (optional)  The zone for replica cluster.
+        :type instance_type: enums.Instance.Type
+        :param instance_type: (optional) The type of the instance.
+        :type instance_display_name: str
+        :param instance_display_name: (optional) Human-readable name of the instance.
+                Defaults to ``instance_id``.
+        :type instance_labels: dict
+        :param instance_labels: (optional) Dictionary of labels to associate with the instance.
+        :type cluster_nodes: int
+        :param cluster_nodes: (optional) Number of nodes for cluster.
+        :type cluster_storage_type: enums.StorageType
+        :param cluster_storage_type: (optional) The type of storage.
+        :type timeout: int
+        :param timeout: (optional) timeout (in seconds) for instance creation.
+                        If None is not specified, Operator will wait indefinitely.
+        """
+        cluster_storage_type = enums.StorageType(cluster_storage_type)
+        instance_type = enums.Instance.Type(instance_type)
+
+        instance = Instance(
+            instance_id,
+            self.get_client(project_id),
+            instance_display_name,
+            instance_type,
+            instance_labels,
+        )
+
+        clusters = [
+            instance.cluster(
+                main_cluster_id,
+                main_cluster_zone,
+                cluster_nodes,
+                cluster_storage_type
+            )
+        ]
+        if replica_cluster_id and replica_cluster_zone:
+            clusters.append(instance.cluster(
+                replica_cluster_id,
+                replica_cluster_zone,
+                cluster_nodes,
+                cluster_storage_type
+            ))
+        operation = instance.create(
+            clusters=clusters
+        )
+        operation.result(timeout)
+        return instance
+
+    # noinspection PyMethodMayBeStatic
+    def create_table(self, instance, table_id, initial_split_keys, column_families):
+        """
+        Creates the specified Cloud Bigtable table.
+        Raises google.api_core.exceptions.AlreadyExists if the table exists.
+
+        :type instance: Instance
+        :param instance: The Cloud Bigtable instance that owns the table.
+        :type table_id: str
+        :param table_id: The ID of the table to create in Cloud Bigtable.
+        :type initial_split_keys: list
+        :param initial_split_keys: (Optional) A list of row keys in bytes to use to initially split the table.
+        :type column_families: dict
+        :param column_families: (Optional) A map of columns to create. The key is the column_id str, and the
+        value is a GarbageCollectionRule.
+        """
+        table = Table(table_id, instance)
+        table.create(initial_split_keys, column_families)
+
+    def delete_table(self, project_id, instance_id, table_id):
+        """
+        Deletes the specified table in Cloud Bigtable.
+        Raises google.api_core.exceptions.NotFound if the table does not exist.
+
+        :type project_id: str
+        :param project_id: The ID of the GCP project.
+        :type instance_id: str
+        :param instance_id: The ID of the Cloud Bigtable instance.
+        :type table_id: str
+        :param table_id: The ID of the table in Cloud Bigtable.
+        """
+        instance = Instance(instance_id, self.get_client(project_id))
+        table = Table(table_id, instance)
+        table.delete()
+
+    @staticmethod
+    def update_cluster(instance, cluster_id, nodes):
+        """
+        Updates number of nodes in the specified Cloud Bigtable cluster.
+        Raises google.api_core.exceptions.NotFound if the cluster does not exist.
+
+        :type instance: Instance
+        :param instance: The Cloud Bigtable instance that owns the cluster.
+        :type cluster_id: str
+        :param cluster_id: The ID of the cluster.
+        :type nodes: int
+        :param nodes: The desired number of nodes.
+        """
+        cluster = Cluster(cluster_id, instance)
+        cluster.serve_nodes = nodes
+        cluster.update()
+
+    @staticmethod
+    def get_column_families_for_table(instance, table_id):
+        """
+        Fetches Column Families for the specified table in Cloud Bigtable.
+
+        :type instance: Instance
+        :param instance: The Cloud Bigtable instance that owns the table.
+        :type table_id: str
+        :param table_id: The ID of the table in Cloud Bigtable to fetch Column Families from.
+        """
+
+        table = Table(table_id, instance)
+        return table.list_column_families()
+
+    @staticmethod
+    def get_cluster_states_for_table(instance, table_id):
+        """
+        Fetches Cluster States for the specified table in Cloud Bigtable.
+        Raises google.api_core.exceptions.NotFound if the table does not exist.
+
+        :type instance: Instance
+        :param instance: The Cloud Bigtable instance that owns the table.
+        :type table_id: str
+        :param table_id: The ID of the table in Cloud Bigtable to fetch Cluster States from.
+        """
+
+        table = Table(table_id, instance)
+        return table.get_cluster_states()
diff --git a/airflow/contrib/operators/gcp_bigtable_operator.py b/airflow/contrib/operators/gcp_bigtable_operator.py
new file mode 100644
index 0000000000..640851e76e
--- /dev/null
+++ b/airflow/contrib/operators/gcp_bigtable_operator.py
@@ -0,0 +1,424 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import google.api_core.exceptions
+
+from airflow import AirflowException
+from airflow.models import BaseOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.contrib.hooks.gcp_bigtable_hook import BigtableHook
+from airflow.utils.decorators import apply_defaults
+from google.cloud.bigtable_admin_v2 import enums
+from google.cloud.bigtable.table import ClusterState
+
+
+class BigtableValidationMixin(object):
+    """
+    Common class for Cloud Bigtable operators for validating required fields.
+    """
+
+    REQUIRED_ATTRIBUTES = []
+
+    def _validate_inputs(self):
+        for attr_name in self.REQUIRED_ATTRIBUTES:
+            if not getattr(self, attr_name):
+                raise AirflowException('Empty parameter: {}'.format(attr_name))
+
+
+class BigtableInstanceCreateOperator(BaseOperator, BigtableValidationMixin):
+    """
+    Creates a new Cloud Bigtable instance.
+    If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration
+    and immediately succeeds. No changes are made to the existing instance.
+
+    For more details about instance creation have a look at the reference:
+    https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.create
+
+    :type project_id: str
+    :param project_id: The ID of the GCP project.
+    :type instance_id: str
+    :param instance_id: The ID of the Cloud Bigtable instance to create.
+    :type main_cluster_id: str
+    :param main_cluster_id: The ID for main cluster for the new instance.
+    :type main_cluster_zone: str
+    :param main_cluster_zone: The zone for main cluster
+        See https://cloud.google.com/bigtable/docs/locations for more details.
+    :type replica_cluster_id: str
+    :param replica_cluster_id: (optional) The ID for replica cluster for the new instance.
+    :type replica_cluster_zone: str
+    :param replica_cluster_zone: (optional)  The zone for replica cluster.
+    :type instance_type: IntEnum
+    :param instance_type: (optional) The type of the instance.
+    :type instance_display_name: str
+    :param instance_display_name: (optional) Human-readable name of the instance. Defaults to ``instance_id``.
+    :type instance_labels: dict
+    :param instance_labels: (optional) Dictionary of labels to associate with the instance.
+    :type cluster_nodes: int
+    :param cluster_nodes: (optional) Number of nodes for cluster.
+    :type cluster_storage_type: IntEnum
+    :param cluster_storage_type: (optional) The type of storage.
+    :type timeout: int
+    :param timeout: (optional) timeout (in seconds) for instance creation.
+                    If None is not specified, Operator will wait indefinitely.
+    """
+
+    REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'main_cluster_id', 'main_cluster_zone')
+    template_fields = ['project_id', 'instance_id', 'main_cluster_id', 'main_cluster_zone']
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 main_cluster_id,
+                 main_cluster_zone,
+                 replica_cluster_id=None,
+                 replica_cluster_zone=None,
+                 instance_display_name=None,
+                 instance_type=None,
+                 instance_labels=None,
+                 cluster_nodes=None,
+                 cluster_storage_type=None,
+                 timeout=None,
+                 *args, **kwargs):
+        self.project_id = project_id
+        self.instance_id = instance_id
+        self.main_cluster_id = main_cluster_id
+        self.main_cluster_zone = main_cluster_zone
+        self.replica_cluster_id = replica_cluster_id
+        self.replica_cluster_zone = replica_cluster_zone
+        self.instance_display_name = instance_display_name
+        self.instance_type = instance_type
+        self.instance_labels = instance_labels
+        self.cluster_nodes = cluster_nodes
+        self.cluster_storage_type = cluster_storage_type
+        self.timeout = timeout
+        self._validate_inputs()
+        self.hook = BigtableHook()
+        super(BigtableInstanceCreateOperator, self).__init__(*args, **kwargs)
+
+    def execute(self, context):
+        instance = self.hook.get_instance(self.project_id, self.instance_id)
+        if instance:
+            # Based on Instance.__eq__ instance with the same ID and client is considered as equal.
+            self.log.info(
+                "The instance '%s' already exists in this project. Consider it as created",
+                self.instance_id
+            )
+            return
+        try:
+            self.hook.create_instance(
+                self.project_id,
+                self.instance_id,
+                self.main_cluster_id,
+                self.main_cluster_zone,
+                self.replica_cluster_id,
+                self.replica_cluster_zone,
+                self.instance_display_name,
+                self.instance_type,
+                self.instance_labels,
+                self.cluster_nodes,
+                self.cluster_storage_type,
+                self.timeout,
+            )
+        except google.api_core.exceptions.GoogleAPICallError as e:
+            self.log.error('An error occurred. Exiting.')
+            raise e
+
+
+class BigtableInstanceDeleteOperator(BaseOperator, BigtableValidationMixin):
+    """
+    Deletes the Cloud Bigtable instance, including its clusters and all related tables.
+
+    For more details about deleting instance have a look at the reference:
+    https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.delete
+
+    :type project_id: str
+    :param project_id: The ID of the GCP project.
+    :type instance_id: str
+    :param instance_id: The ID of the Cloud Bigtable instance to delete.
+    """
+    REQUIRED_ATTRIBUTES = ('project_id', 'instance_id')
+    template_fields = ['project_id', 'instance_id']
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 *args, **kwargs):
+        self.project_id = project_id
+        self.instance_id = instance_id
+        self._validate_inputs()
+        self.hook = BigtableHook()
+        super(BigtableInstanceDeleteOperator, self).__init__(*args, **kwargs)
+
+    def execute(self, context):
+        try:
+            self.hook.delete_instance(self.project_id, self.instance_id)
+        except google.api_core.exceptions.NotFound:
+            self.log.info(
+                "The instance '%s' does not exist in project '%s'. Consider it as deleted",
+                self.instance_id, self.project_id
+            )
+        except google.api_core.exceptions.GoogleAPICallError as e:
+            self.log.error('An error occurred. Exiting.')
+            raise e
+
+
+class BigtableTableCreateOperator(BaseOperator, BigtableValidationMixin):
+    """
+    Creates the table in the Cloud Bigtable instance.
+
+    For more details about creating table have a look at the reference:
+    https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.create
+
+    :type project_id: str
+    :param project_id: The ID of the GCP project.
+    :type instance_id: str
+    :param instance_id: The ID of the Cloud Bigtable instance that will hold the new table.
+    :type table_id: str
+    :param table_id: The ID of the table to be created.
+    :type initial_split_keys: list
+    :param initial_split_keys: (Optional) list of row keys in bytes that will be used to initially split
+                                the table into several tablets.
+    :type column_families: dict
+    :param column_families: (Optional) A map columns to create.
+                            The key is the column_id str and the value is a GarbageCollectionRule
+    """
+    REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'table_id')
+    template_fields = ['project_id', 'instance_id', 'table_id']
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 table_id,
+                 initial_split_keys=None,
+                 column_families=None,
+                 *args, **kwargs):
+        self.project_id = project_id
+        self.instance_id = instance_id
+        self.table_id = table_id
+        self.initial_split_keys = initial_split_keys or list()
+        self.column_families = column_families or dict()
+        self._validate_inputs()
+        self.hook = BigtableHook()
+        self.instance = None
+        super(BigtableTableCreateOperator, self).__init__(*args, **kwargs)
+
+    def _compare_column_families(self):
+        table_column_families = self.hook.get_column_families_for_table(self.instance, self.table_id)
+        if set(table_column_families.keys()) != set(self.column_families.keys()):
+            self.log.error("Table '%s' has different set of Column Families", self.table_id)
+            self.log.error("Expected: %s", self.column_families.keys())
+            self.log.error("Actual: %s", table_column_families.keys())
+            return False
+
+        for key in table_column_families.keys():
+            # There is difference in structure between local Column Families and remote ones
+            # Local `self.column_families` is dict with column_id as key and GarbageCollectionRule as value.
+            # Remote `table_column_families` is list of ColumnFamily objects.
+            # For more information about ColumnFamily please refer to the documentation:
+            # https://googleapis.github.io/google-cloud-python/latest/bigtable/column-family.html#google.cloud.bigtable.column_family.ColumnFamily
+            if table_column_families[key].gc_rule != self.column_families[key]:
+                self.log.error("Column Family '%s' differs for table '%s'.", key, self.table_id)
+                return False
+        return True
+
+    def execute(self, context):
+        self.instance = self.hook.get_instance(self.project_id, self.instance_id)
+        if not self.instance:
+            raise AirflowException("Dependency: instance '{}' does not exist in project '{}'.".format(
+                self.instance_id, self.project_id))
+        try:
+            self.hook.create_table(
+                self.instance,
+                self.table_id,
+                self.initial_split_keys,
+                self.column_families
+            )
+        except google.api_core.exceptions.AlreadyExists:
+            if not self._compare_column_families():
+                raise AirflowException(
+                    "Table '{}' already exists with different Column Families.".format(self.table_id))
+            self.log.info("The table '%s' already exists. Consider it as created", self.table_id)
+
+
+class BigtableTableDeleteOperator(BaseOperator, BigtableValidationMixin):
+    """
+    Deletes the Cloud Bigtable table.
+
+    For more details about deleting table have a look at the reference:
+    https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.delete
+
+    :type project_id: str
+    :param project_id: The ID of the GCP project.
+    :type instance_id: str
+    :param instance_id: The ID of the Cloud Bigtable instance.
+    :type table_id: str
+    :param table_id: The ID of the table to be deleted.
+    """
+    REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'table_id')
+    template_fields = ['project_id', 'instance_id', 'table_id']
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 table_id,
+                 app_profile_id=None,
+                 *args, **kwargs):
+        self.project_id = project_id
+        self.instance_id = instance_id
+        self.table_id = table_id
+        self.app_profile_id = app_profile_id
+        self._validate_inputs()
+        self.hook = BigtableHook()
+        super(BigtableTableDeleteOperator, self).__init__(*args, **kwargs)
+
+    def execute(self, context):
+        instance = self.hook.get_instance(self.project_id, self.instance_id)
+        if not instance:
+            raise AirflowException("Dependency: instance '{}' does not exist.".format(self.instance_id))
+
+        try:
+            self.hook.delete_table(
+                self.project_id,
+                self.instance_id,
+                self.table_id,
+            )
+        except google.api_core.exceptions.NotFound:
+            # It's OK if table doesn't exists.
+            self.log.info("The table '%s' no longer exists. Consider it as deleted", self.table_id)
+        except google.api_core.exceptions.GoogleAPICallError as e:
+            self.log.error('An error occurred. Exiting.')
+            raise e
+
+
+class BigtableClusterUpdateOperator(BaseOperator, BigtableValidationMixin):
+    """
+    Updates a Cloud Bigtable cluster.
+
+    For more details about updating a Cloud Bigtable cluster, have a look at the reference:
+    https://googleapis.github.io/google-cloud-python/latest/bigtable/cluster.html#google.cloud.bigtable.cluster.Cluster.update
+
+    :type project_id: str
+    :param project_id: The ID of the GCP project.
+    :type instance_id: str
+    :param instance_id: The ID of the Cloud Bigtable instance.
+    :type cluster_id: str
+    :param cluster_id: The ID of the Cloud Bigtable cluster to update.
+    :type nodes: int
+    :param nodes: The desired number of nodes for the Cloud Bigtable cluster.
+    """
+    REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'cluster_id', 'nodes')
+    template_fields = ['project_id', 'instance_id', 'cluster_id', 'nodes']
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 cluster_id,
+                 nodes,
+                 *args, **kwargs):
+        self.project_id = project_id
+        self.instance_id = instance_id
+        self.cluster_id = cluster_id
+        self.nodes = nodes
+        self._validate_inputs()
+        self.hook = BigtableHook()
+        super(BigtableClusterUpdateOperator, self).__init__(*args, **kwargs)
+
+    def execute(self, context):
+        instance = self.hook.get_instance(self.project_id, self.instance_id)
+        if not instance:
+            raise AirflowException("Dependency: instance '{}' does not exist.".format(self.instance_id))
+
+        try:
+            self.hook.update_cluster(
+                instance,
+                self.cluster_id,
+                self.nodes
+            )
+        except google.api_core.exceptions.NotFound:
+            raise AirflowException("Dependency: cluster '{}' does not exist for instance '{}'.".format(
+                self.cluster_id,
+                self.instance_id
+            ))
+        except google.api_core.exceptions.GoogleAPICallError as e:
+            self.log.error('An error occurred. Exiting.')
+            raise e
+
+
+class BigtableTableWaitForReplicationSensor(BaseSensorOperator, BigtableValidationMixin):
+    """
+    Sensor that waits for Cloud Bigtable table to be fully replicated to its clusters.
+    No exception will be raised if the instance or the table does not exist.
+
+    For more details about cluster states for a table, have a look at the reference:
+    https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.get_cluster_states
+
+    :type project_id: str
+    :param project_id: The ID of the GCP project.
+    :type instance_id: str
+    :param instance_id: The ID of the Cloud Bigtable instance.
+    :type table_id: str
+    :param table_id: The ID of the table to check replication status.
+    """
+    REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'table_id')
+    template_fields = ['project_id', 'instance_id', 'table_id']
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 table_id,
+                 *args, **kwargs):
+        self.project_id = project_id
+        self.instance_id = instance_id
+        self.table_id = table_id
+        self._validate_inputs()
+        self.hook = BigtableHook()
+        super(BigtableTableWaitForReplicationSensor, self).__init__(*args, **kwargs)
+
+    def poke(self, context):
+        instance = self.hook.get_instance(self.project_id, self.instance_id)
+        if not instance:
+            self.log.info("Dependency: instance '%s' does not exist.", self.instance_id)
+            return False
+
+        try:
+            cluster_states = self.hook.get_cluster_states_for_table(instance, self.table_id)
+        except google.api_core.exceptions.NotFound:
+            self.log.info(
+                "Dependency: table '%s' does not exist in instance '%s'.", self.table_id, self.instance_id)
+            return False
+
+        ready_state = ClusterState(enums.Table.ClusterState.ReplicationState.READY)
+
+        is_table_replicated = True
+        for cluster_id in cluster_states.keys():
+            if cluster_states[cluster_id] != ready_state:
+                self.log.info("Table '%s' is not yet replicated on cluster '%s'.", self.table_id, cluster_id)
+                is_table_replicated = False
+
+        if not is_table_replicated:
+            return False
+
+        self.log.info("Table '%s' is replicated.", self.table_id)
+        return True
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index 221913dec0..3a88d66339 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -361,6 +361,135 @@ More information
 See `Google Compute Engine API documentation
 <https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers>`_.
 
+Google Cloud Bigtable Operators
+-------------------------------
+
+Arguments
+"""""""""
+
+All examples below rely on the following variables, which can be passed via environment variables.
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
+    :language: python
+    :start-after: [START howto_operator_gcp_bigtable_args]
+    :end-before: [END howto_operator_gcp_bigtable_args]
+
+
+BigtableInstanceCreateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator`
+to create a Google Cloud Bigtable instance.
+
+If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration
+and immediately succeeds. No changes are made to the existing instance.
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcp_bigtable_instance_create]
+    :end-before: [END howto_operator_gcp_bigtable_instance_create]
+
+
+BigtableInstanceDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator`
+to delete a Google Cloud Bigtable instance.
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcp_bigtable_instance_delete]
+    :end-before: [END howto_operator_gcp_bigtable_instance_delete]
+
+BigtableClusterUpdateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator`
+to modify number of nodes in a Cloud Bigtable cluster.
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcp_bigtable_cluster_update]
+    :end-before: [END howto_operator_gcp_bigtable_cluster_update]
+
+
+BigtableTableCreateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Creates a table in a Cloud Bigtable instance.
+
+If the table with given ID exists in the Cloud Bigtable instance, the operator compares the Column Families.
+If the Column Families are identical operator succeeds. Otherwise, the operator fails with the appropriate
+error message.
+
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcp_bigtable_table_create]
+    :end-before: [END howto_operator_gcp_bigtable_table_create]
+
+Advanced
+""""""""
+
+When creating a table, you can specify the optional ``initial_split_keys`` and ``column_familes``.
+Please refer to the Python Client for Google Cloud Bigtable documentation
+`for Table <https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html>`_ and `for Column
+Families <https://googleapis.github.io/google-cloud-python/latest/bigtable/column-family.html>`_.
+
+
+BigtableTableDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator`
+to delete a table in Google Cloud Bigtable.
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcp_bigtable_table_delete]
+    :end-before: [END howto_operator_gcp_bigtable_table_delete]
+
+BigtableTableWaitForReplicationSensor
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableTableWaitForReplicationSensor`
+to wait for the table to replicate fully.
+
+The same arguments apply to this sensor as the BigtableTableCreateOperator_.
+
+**Note:** If the table or the Cloud Bigtable instance does not exist, this sensor waits for the table until
+timeout hits and does not raise any exception.
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcp_bigtable_table_wait_for_replication]
+    :end-before: [END howto_operator_gcp_bigtable_table_wait_for_replication]
+
+
+
 Google Cloud Functions Operators
 --------------------------------
 
@@ -1318,3 +1447,4 @@ More information
 See `Google Cloud Storage ObjectAccessControls insert documentation
 <https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert>`_.
 
+
diff --git a/docs/integration.rst b/docs/integration.rst
index f35c8e87ea..dd104cf790 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -1,3 +1,4 @@
+
 ..  Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
@@ -814,6 +815,69 @@ Cloud SQL Hooks
     :members:
 
 
+Cloud Bigtable
+''''''''''''''
+
+Cloud Bigtable Operators
+""""""""""""""""""""""""
+
+- :ref:`BigtableInstanceCreateOperator` : creates a Cloud Bigtable instance.
+- :ref:`BigtableInstanceDeleteOperator` : deletes a Google Cloud Bigtable instance.
+- :ref:`BigtableClusterUpdateOperator` : updates the number of nodes in a Google Cloud Bigtable cluster.
+- :ref:`BigtableTableCreateOperator` : creates a table in a Google Cloud Bigtable instance.
+- :ref:`BigtableTableDeleteOperator` : deletes a table in a Google Cloud Bigtable instance.
+- :ref:`BigtableTableWaitForReplicationSensor` : (sensor) waits for a table to be fully replicated.
+
+.. _BigtableInstanceCreateOperator:
+
+BigtableInstanceCreateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator
+
+.. _BigtableInstanceDeleteOperator:
+
+BigtableInstanceDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator
+
+.. _BigtableClusterUpdateOperator:
+
+BigtableClusterUpdateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator
+
+.. _BigtableTableCreateOperator:
+
+BigtableTableCreateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableTableCreateOperator
+
+.. _BigtableTableDeleteOperator:
+
+BigtableTableDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator
+
+.. _BigtableTableWaitForReplicationSensor:
+
+BigtableTableWaitForReplicationSensor
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableTableWaitForReplicationSensor
+
+.. _BigtableHook:
+
+Cloud Bigtable Hook
+""""""""""""""""""""
+
+.. autoclass:: airflow.contrib.hooks.gcp_bigtable_hook.BigtableHook
+    :members:
+
 Compute Engine
 ''''''''''''''
 
diff --git a/setup.py b/setup.py
index 6dc452302f..410502c302 100644
--- a/setup.py
+++ b/setup.py
@@ -190,6 +190,7 @@ def write_version(filename=os.path.join(*['airflow',
     'google-auth>=1.0.0, <2.0.0dev',
     'google-auth-httplib2>=0.0.1',
     'google-cloud-container>=0.1.1',
+    'google-cloud-bigtable==0.31.0',
     'google-cloud-spanner>=1.6.0',
     'grpcio-gcp>=0.2.2',
     'PyOpenSSL',
diff --git a/tests/contrib/operators/test_gcp_bigtable_operator.py b/tests/contrib/operators/test_gcp_bigtable_operator.py
new file mode 100644
index 0000000000..8417efc997
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_bigtable_operator.py
@@ -0,0 +1,540 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+import google.api_core.exceptions
+from google.cloud.bigtable.column_family import MaxVersionsGCRule
+from google.cloud.bigtable.instance import Instance
+from google.cloud.bigtable.table import ClusterState
+from parameterized import parameterized
+
+from airflow import AirflowException
+from airflow.contrib.operators.gcp_bigtable_operator import BigtableInstanceDeleteOperator, \
+    BigtableTableDeleteOperator, BigtableTableCreateOperator, BigtableTableWaitForReplicationSensor, \
+    BigtableClusterUpdateOperator, BigtableInstanceCreateOperator
+
+try:
+    # noinspection PyProtectedMember
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+PROJECT_ID = 'test_project_id'
+INSTANCE_ID = 'test-instance-id'
+CLUSTER_ID = 'test-cluster-id'
+CLUSTER_ZONE = 'us-central1-f'
+NODES = 5
+TABLE_ID = 'test-table-id'
+INITIAL_SPLIT_KEYS = []
+EMPTY_COLUMN_FAMILIES = {}
+
+
+class BigtableInstanceCreateTest(unittest.TestCase):
+    @parameterized.expand([
+        ('project_id', '', INSTANCE_ID, CLUSTER_ID, CLUSTER_ZONE),
+        ('instance_id', PROJECT_ID, '', CLUSTER_ID, CLUSTER_ZONE),
+        ('main_cluster_id', PROJECT_ID, INSTANCE_ID, '', CLUSTER_ZONE),
+        ('main_cluster_zone', PROJECT_ID, INSTANCE_ID, CLUSTER_ID, ''),
+    ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_empty_attribute(self, missing_attribute, project_id, instance_id, main_cluster_id,
+                             main_cluster_zone, mock_hook):
+        with self.assertRaises(AirflowException) as e:
+            BigtableInstanceCreateOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                main_cluster_id=main_cluster_id,
+                main_cluster_zone=main_cluster_zone,
+                task_id="id"
+            )
+        err = e.exception
+        self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
+        mock_hook.assert_not_called()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_create_instance_that_exists(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
+
+        op = BigtableInstanceCreateOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            main_cluster_id=CLUSTER_ID,
+            main_cluster_zone=CLUSTER_ZONE,
+            task_id="id"
+        )
+        op.execute(None)
+
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.create_instance.assert_not_called()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_different_error_reraised(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = None
+        op = BigtableInstanceCreateOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            main_cluster_id=CLUSTER_ID,
+            main_cluster_zone=CLUSTER_ZONE,
+            task_id="id"
+        )
+
+        mock_hook.return_value.create_instance.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.GoogleAPICallError('error'))
+
+        with self.assertRaises(google.api_core.exceptions.GoogleAPICallError):
+            op.execute(None)
+
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.create_instance.assert_called_once_with(
+            PROJECT_ID, INSTANCE_ID, CLUSTER_ID, CLUSTER_ZONE, None, None, None, None, None, None, None, None
+        )
+
+
+class BigtableClusterUpdateTest(unittest.TestCase):
+    @parameterized.expand([
+        ('project_id', '', INSTANCE_ID, CLUSTER_ID, NODES),
+        ('instance_id', PROJECT_ID, '', CLUSTER_ID, NODES),
+        ('cluster_id', PROJECT_ID, INSTANCE_ID, '', NODES),
+        ('nodes', PROJECT_ID, INSTANCE_ID, CLUSTER_ID, ''),
+    ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_empty_attribute(self, missing_attribute, project_id, instance_id, cluster_id, nodes, mock_hook):
+        with self.assertRaises(AirflowException) as e:
+            BigtableClusterUpdateOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                cluster_id=cluster_id,
+                nodes=nodes,
+                task_id="id"
+            )
+        err = e.exception
+        self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
+        mock_hook.assert_not_called()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_updating_cluster_but_instance_does_not_exists(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = None
+
+        with self.assertRaises(AirflowException) as e:
+            op = BigtableClusterUpdateOperator(
+                project_id=PROJECT_ID,
+                instance_id=INSTANCE_ID,
+                cluster_id=CLUSTER_ID,
+                nodes=NODES,
+                task_id="id"
+            )
+            op.execute(None)
+
+        err = e.exception
+        self.assertEqual(str(err), "Dependency: instance '{}' does not exist.".format(INSTANCE_ID))
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.update_cluster.assert_not_called()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_updating_cluster_that_does_not_exists(self, mock_hook):
+        instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
+        mock_hook.return_value.update_cluster.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.NotFound("Cluster not found."))
+
+        with self.assertRaises(AirflowException) as e:
+            op = BigtableClusterUpdateOperator(
+                project_id=PROJECT_ID,
+                instance_id=INSTANCE_ID,
+                cluster_id=CLUSTER_ID,
+                nodes=NODES,
+                task_id="id"
+            )
+            op.execute(None)
+
+        err = e.exception
+        self.assertEqual(
+            str(err),
+            "Dependency: cluster '{}' does not exist for instance '{}'.".format(CLUSTER_ID, INSTANCE_ID)
+        )
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.update_cluster.assert_called_once_with(instance, CLUSTER_ID, NODES)
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_different_error_reraised(self, mock_hook):
+        op = BigtableClusterUpdateOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            cluster_id=CLUSTER_ID,
+            nodes=NODES,
+            task_id="id"
+        )
+        instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
+        mock_hook.return_value.update_cluster.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.GoogleAPICallError('error'))
+
+        with self.assertRaises(google.api_core.exceptions.GoogleAPICallError):
+            op.execute(None)
+
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.update_cluster.assert_called_once_with(instance, CLUSTER_ID, NODES)
+
+
+class BigtableInstanceDeleteTest(unittest.TestCase):
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_delete_execute(self, mock_hook):
+        op = BigtableInstanceDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            task_id="id"
+        )
+        op.execute(None)
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.delete_instance.assert_called_once_with(PROJECT_ID, INSTANCE_ID)
+
+    @parameterized.expand([
+        ('project_id', '', INSTANCE_ID),
+        ('instance_id', PROJECT_ID, ''),
+    ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_empty_attribute(self, missing_attribute, project_id, instance_id, mock_hook):
+        with self.assertRaises(AirflowException) as e:
+            BigtableInstanceDeleteOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                task_id="id"
+            )
+        err = e.exception
+        self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
+        mock_hook.assert_not_called()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_deleting_instance_that_doesnt_exists(self, mock_hook):
+        op = BigtableInstanceDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            task_id="id"
+        )
+        mock_hook.return_value.delete_instance.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.NotFound("Instance not found."))
+        op.execute(None)
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.delete_instance.assert_called_once_with(PROJECT_ID, INSTANCE_ID)
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_different_error_reraised(self, mock_hook):
+        op = BigtableInstanceDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            task_id="id"
+        )
+        mock_hook.return_value.delete_instance.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.GoogleAPICallError('error'))
+
+        with self.assertRaises(google.api_core.exceptions.GoogleAPICallError):
+            op.execute(None)
+
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.delete_instance.assert_called_once_with(PROJECT_ID, INSTANCE_ID)
+
+
+class BigtableTableDeleteTest(unittest.TestCase):
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_delete_execute(self, mock_hook):
+        op = BigtableTableDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            task_id="id"
+        )
+        op.execute(None)
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.delete_table.assert_called_once_with(PROJECT_ID, INSTANCE_ID, TABLE_ID)
+
+    @parameterized.expand([
+        ('project_id', '', INSTANCE_ID, TABLE_ID),
+        ('instance_id', PROJECT_ID, '', TABLE_ID),
+        ('table_id', PROJECT_ID, INSTANCE_ID, ''),
+    ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id, mock_hook):
+        with self.assertRaises(AirflowException) as e:
+            BigtableTableDeleteOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                table_id=table_id,
+                task_id="id"
+            )
+        err = e.exception
+        self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
+        mock_hook.assert_not_called()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_deleting_table_that_doesnt_exists(self, mock_hook):
+        op = BigtableTableDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            task_id="id"
+        )
+
+        mock_hook.return_value.delete_table.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.NotFound("Table not found."))
+        op.execute(None)
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.delete_table.assert_called_once_with(PROJECT_ID, INSTANCE_ID, TABLE_ID)
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_deleting_table_when_instance_doesnt_exists(self, mock_hook):
+        op = BigtableTableDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            task_id="id"
+        )
+
+        mock_hook.return_value.get_instance.return_value = None
+        with self.assertRaises(AirflowException) as e:
+            op.execute(None)
+        err = e.exception
+        self.assertEqual(str(err), "Dependency: instance '{}' does not exist.".format(INSTANCE_ID))
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.delete_table.assert_not_called()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_different_error_reraised(self, mock_hook):
+        op = BigtableTableDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            task_id="id"
+        )
+        mock_hook.return_value.delete_table.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.GoogleAPICallError('error'))
+
+        with self.assertRaises(google.api_core.exceptions.GoogleAPICallError):
+            op.execute(None)
+
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.delete_table.assert_called_once_with(PROJECT_ID, INSTANCE_ID, TABLE_ID)
+
+
+class BigtableTableCreateTest(unittest.TestCase):
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_create_execute(self, mock_hook):
+        op = BigtableTableCreateOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            initial_split_keys=INITIAL_SPLIT_KEYS,
+            column_families=EMPTY_COLUMN_FAMILIES,
+            task_id="id"
+        )
+        instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
+        op.execute(None)
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.create_table.assert_called_once_with(
+            instance, TABLE_ID, INITIAL_SPLIT_KEYS, EMPTY_COLUMN_FAMILIES)
+
+    @parameterized.expand([
+        ('project_id', '', INSTANCE_ID, TABLE_ID),
+        ('instance_id', PROJECT_ID, '', TABLE_ID),
+        ('table_id', PROJECT_ID, INSTANCE_ID, ''),
+    ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id, mock_hook):
+        with self.assertRaises(AirflowException) as e:
+            BigtableTableCreateOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                table_id=table_id,
+                task_id="id"
+            )
+        err = e.exception
+        self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
+        mock_hook.assert_not_called()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_instance_not_exists(self, mock_hook):
+        op = BigtableTableCreateOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            initial_split_keys=INITIAL_SPLIT_KEYS,
+            column_families=EMPTY_COLUMN_FAMILIES,
+            task_id="id"
+        )
+        mock_hook.return_value.get_instance.return_value = None
+        with self.assertRaises(AirflowException) as e:
+            op.execute(None)
+        err = e.exception
+        self.assertEqual(
+            str(err),
+            "Dependency: instance '{}' does not exist in project '{}'.".format(INSTANCE_ID, PROJECT_ID)
+        )
+        mock_hook.assert_called_once_with()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_creating_table_that_exists(self, mock_hook):
+        op = BigtableTableCreateOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            initial_split_keys=INITIAL_SPLIT_KEYS,
+            column_families=EMPTY_COLUMN_FAMILIES,
+            task_id="id"
+        )
+
+        mock_hook.return_value.get_column_families_for_table.return_value = EMPTY_COLUMN_FAMILIES
+        instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
+        mock_hook.return_value.create_table.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.AlreadyExists("Table already exists."))
+        op.execute(None)
+
+        mock_hook.assert_called_once_with()
+        mock_hook.return_value.create_table.assert_called_once_with(
+            instance, TABLE_ID, INITIAL_SPLIT_KEYS, EMPTY_COLUMN_FAMILIES)
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_creating_table_that_exists_with_different_column_families_ids_in_the_table(self, mock_hook):
+        op = BigtableTableCreateOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            initial_split_keys=INITIAL_SPLIT_KEYS,
+            column_families=EMPTY_COLUMN_FAMILIES,
+            task_id="id"
+        )
+
+        mock_hook.return_value.get_column_families_for_table.return_value = {"existing_family": None}
+        mock_hook.return_value.create_table.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.AlreadyExists("Table already exists."))
+
+        with self.assertRaises(AirflowException) as e:
+            op.execute(None)
+        err = e.exception
+        self.assertEqual(
+            str(err),
+            "Table '{}' already exists with different Column Families.".format(TABLE_ID)
+        )
+        mock_hook.assert_called_once_with()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_creating_table_that_exists_with_different_column_families_gc_rule_in_the_table(self, mock_hook):
+        op = BigtableTableCreateOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            initial_split_keys=INITIAL_SPLIT_KEYS,
+            column_families={"cf-id": MaxVersionsGCRule(1)},
+            task_id="id"
+        )
+
+        cf_mock = mock.Mock()
+        cf_mock.gc_rule = mock.Mock(return_value=MaxVersionsGCRule(2))
+
+        mock_hook.return_value.get_column_families_for_table.return_value = {
+            "cf-id": cf_mock
+        }
+        mock_hook.return_value.create_table.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.AlreadyExists("Table already exists."))
+
+        with self.assertRaises(AirflowException) as e:
+            op.execute(None)
+        err = e.exception
+        self.assertEqual(
+            str(err),
+            "Table '{}' already exists with different Column Families.".format(TABLE_ID)
+        )
+        mock_hook.assert_called_once_with()
+
+
+class BigtableWaitForTableReplicationTest(unittest.TestCase):
+    @parameterized.expand([
+        ('project_id', '', INSTANCE_ID, TABLE_ID),
+        ('instance_id', PROJECT_ID, '', TABLE_ID),
+        ('table_id', PROJECT_ID, INSTANCE_ID, ''),
+    ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id, mock_hook):
+        with self.assertRaises(AirflowException) as e:
+            BigtableTableWaitForReplicationSensor(
+                project_id=project_id,
+                instance_id=instance_id,
+                table_id=table_id,
+                task_id="id"
+            )
+        err = e.exception
+        self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
+        mock_hook.assert_not_called()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_wait_no_instance(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = None
+
+        op = BigtableTableWaitForReplicationSensor(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            task_id="id"
+        )
+        self.assertFalse(op.poke(None))
+        mock_hook.assert_called_once_with()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_wait_no_table(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
+        mock_hook.return_value.get_cluster_states_for_table.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.NotFound("Table not found."))
+
+        op = BigtableTableWaitForReplicationSensor(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            task_id="id"
+        )
+        self.assertFalse(op.poke(None))
+        mock_hook.assert_called_once_with()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_wait_not_ready(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
+        mock_hook.return_value.get_cluster_states_for_table.return_value = {
+            "cl-id": ClusterState(0)
+        }
+        op = BigtableTableWaitForReplicationSensor(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            task_id="id"
+        )
+        self.assertFalse(op.poke(None))
+        mock_hook.assert_called_once_with()
+
+    @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
+    def test_wait_ready(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
+        mock_hook.return_value.get_cluster_states_for_table.return_value = {
+            "cl-id": ClusterState(4)
+        }
+        op = BigtableTableWaitForReplicationSensor(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            table_id=TABLE_ID,
+            task_id="id"
+        )
+        self.assertTrue(op.poke(None))
+        mock_hook.assert_called_once_with()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services