You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/12 12:44:03 UTC

[GitHub] kaxil closed pull request #4493: [AIRFLOW-3680] Consistency update in tests for All GCP-related operators

kaxil closed pull request #4493: [AIRFLOW-3680] Consistency update in tests for All GCP-related operators
URL: https://github.com/apache/airflow/pull/4493
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_gcp_bigtable_operators.py b/airflow/contrib/example_dags/example_gcp_bigtable_operators.py
index 48c4245cba..4239f78004 100644
--- a/airflow/contrib/example_dags/example_gcp_bigtable_operators.py
+++ b/airflow/contrib/example_dags/example_gcp_bigtable_operators.py
@@ -44,8 +44,6 @@
 * CBT_POKE_INTERVAL - number of seconds between every attempt of Sensor check
 
 """
-
-import datetime
 import json
 
 from os import getenv
@@ -78,7 +76,7 @@
 with models.DAG(
     'example_gcp_bigtable_operators',
     default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
+    schedule_interval=None  # Override to match your needs
 ) as dag:
     # [START howto_operator_gcp_bigtable_instance_create]
     create_instance_task = BigtableInstanceCreateOperator(
@@ -128,6 +126,7 @@
         instance_id=CBT_INSTANCE_ID,
         table_id=CBT_TABLE_ID,
         poke_interval=int(CBT_POKE_INTERVAL),
+        timeout=180,
         task_id='wait_for_table_replication',
     )
     # [END howto_operator_gcp_bigtable_table_wait_for_replication]
diff --git a/airflow/contrib/example_dags/example_gcp_compute.py b/airflow/contrib/example_dags/example_gcp_compute.py
index 928e9744b6..f0608e466c 100644
--- a/airflow/contrib/example_dags/example_gcp_compute.py
+++ b/airflow/contrib/example_dags/example_gcp_compute.py
@@ -23,14 +23,13 @@
 
 This DAG relies on the following OS environment variables
 
-* PROJECT_ID - Google Cloud Platform project where the Compute Engine instance exists.
-* ZONE - Google Cloud Platform zone where the instance exists.
-* INSTANCE - Name of the Compute Engine instance.
-* SHORT_MACHINE_TYPE_NAME - Machine type resource name to set, e.g. 'n1-standard-1'.
+* GCP_PROJECT_ID - Google Cloud Platform project where the Compute Engine instance exists.
+* GCE_ZONE - Google Cloud Platform zone where the instance exists.
+* GCE_INSTANCE - Name of the Compute Engine instance.
+* GCE_SHORT_MACHINE_TYPE_NAME - Machine type resource name to set, e.g. 'n1-standard-1'.
     See https://cloud.google.com/compute/docs/machine-types
 """
 import os
-import datetime
 
 import airflow
 from airflow import models
@@ -38,19 +37,19 @@
     GceInstanceStopOperator, GceSetMachineTypeOperator
 
 # [START howto_operator_gce_args_common]
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-ZONE = os.environ.get('ZONE', 'europe-west1-b')
-INSTANCE = os.environ.get('INSTANCE', 'testinstance')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
+GCE_INSTANCE = os.environ.get('GCE_INSTANCE', 'testinstance')
 # [END howto_operator_gce_args_common]
 
 default_args = {
-    'start_date': airflow.utils.dates.days_ago(1)
+    'start_date': airflow.utils.dates.days_ago(1),
 }
 
 # [START howto_operator_gce_args_set_machine_type]
-SHORT_MACHINE_TYPE_NAME = os.environ.get('SHORT_MACHINE_TYPE_NAME', 'n1-standard-1')
+GCE_SHORT_MACHINE_TYPE_NAME = os.environ.get('GCE_SHORT_MACHINE_TYPE_NAME', 'n1-standard-1')
 SET_MACHINE_TYPE_BODY = {
-    'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, SHORT_MACHINE_TYPE_NAME)
+    'machineType': 'zones/{}/machineTypes/{}'.format(GCE_ZONE, GCE_SHORT_MACHINE_TYPE_NAME)
 }
 # [END howto_operator_gce_args_set_machine_type]
 
@@ -58,52 +57,52 @@
 with models.DAG(
     'example_gcp_compute',
     default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
+    schedule_interval=None  # Override to match your needs
 ) as dag:
     # [START howto_operator_gce_start]
     gce_instance_start = GceInstanceStartOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         task_id='gcp_compute_start_task'
     )
     # [END howto_operator_gce_start]
     # Duplicate start for idempotence testing
     gce_instance_start2 = GceInstanceStartOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         task_id='gcp_compute_start_task2'
     )
     # [START howto_operator_gce_stop]
     gce_instance_stop = GceInstanceStopOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         task_id='gcp_compute_stop_task'
     )
     # [END howto_operator_gce_stop]
     # Duplicate stop for idempotence testing
     gce_instance_stop2 = GceInstanceStopOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         task_id='gcp_compute_stop_task2'
     )
     # [START howto_operator_gce_set_machine_type]
     gce_set_machine_type = GceSetMachineTypeOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         body=SET_MACHINE_TYPE_BODY,
         task_id='gcp_compute_set_machine_type'
     )
     # [END howto_operator_gce_set_machine_type]
     # Duplicate set machine type for idempotence testing
     gce_set_machine_type2 = GceSetMachineTypeOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         body=SET_MACHINE_TYPE_BODY,
         task_id='gcp_compute_set_machine_type2'
     )
diff --git a/airflow/contrib/example_dags/example_gcp_compute_igm.py b/airflow/contrib/example_dags/example_gcp_compute_igm.py
index 3e4543c60d..400bc1afc1 100644
--- a/airflow/contrib/example_dags/example_gcp_compute_igm.py
+++ b/airflow/contrib/example_dags/example_gcp_compute_igm.py
@@ -24,23 +24,22 @@
 
 This DAG relies on the following OS environment variables
 
-* PROJECT_ID - the Google Cloud Platform project where the Compute Engine instance exists
-* ZONE - the zone where the Compute Engine instance exists
+* GCP_PROJECT_ID - the Google Cloud Platform project where the Compute Engine instance exists
+* GCE_ZONE - the zone where the Compute Engine instance exists
 
 Variables for copy template operator:
-* TEMPLATE_NAME - name of the template to copy
-* NEW_TEMPLATE_NAME - name of the new template
-* NEW_DESCRIPTION - description added to the template
+* GCE_TEMPLATE_NAME - name of the template to copy
+* GCE_NEW_TEMPLATE_NAME - name of the new template
+* GCE_NEW_DESCRIPTION - description added to the template
 
 Variables for update template in Group Manager:
 
-* INSTANCE_GROUP_MANAGER_NAME - name of the Instance Group Manager
+* GCE_INSTANCE_GROUP_MANAGER_NAME - name of the Instance Group Manager
 * SOURCE_TEMPLATE_URL - url of the template to replace in the Instance Group Manager
 * DESTINATION_TEMPLATE_URL - url of the new template to set in the Instance Group Manager
 """
 
 import os
-import datetime
 
 import airflow
 from airflow import models
@@ -48,8 +47,8 @@
     GceInstanceTemplateCopyOperator, GceInstanceGroupManagerUpdateTemplateOperator
 
 # [START howto_operator_compute_igm_common_args]
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-ZONE = os.environ.get('ZONE', 'europe-west1-b')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
 # [END howto_operator_compute_igm_common_args]
 
 default_args = {
@@ -57,13 +56,13 @@
 }
 
 # [START howto_operator_compute_template_copy_args]
-TEMPLATE_NAME = os.environ.get('TEMPLATE_NAME', 'instance-template-test')
-NEW_TEMPLATE_NAME = os.environ.get('NEW_TEMPLATE_NAME',
-                                   'instance-template-test-new')
-NEW_DESCRIPTION = os.environ.get('NEW_DESCRIPTION', 'Test new description')
+GCE_TEMPLATE_NAME = os.environ.get('GCE_TEMPLATE_NAME', 'instance-template-test')
+GCE_NEW_TEMPLATE_NAME = os.environ.get('GCE_NEW_TEMPLATE_NAME',
+                                       'instance-template-test-new')
+GCE_NEW_DESCRIPTION = os.environ.get('GCE_NEW_DESCRIPTION', 'Test new description')
 GCE_INSTANCE_TEMPLATE_BODY_UPDATE = {
-    "name": NEW_TEMPLATE_NAME,
-    "description": NEW_DESCRIPTION,
+    "name": GCE_NEW_TEMPLATE_NAME,
+    "description": GCE_NEW_DESCRIPTION,
     "properties": {
         "machineType": "n1-standard-2"
     }
@@ -71,18 +70,18 @@
 # [END howto_operator_compute_template_copy_args]
 
 # [START howto_operator_compute_igm_update_template_args]
-INSTANCE_GROUP_MANAGER_NAME = os.environ.get('INSTANCE_GROUP_MANAGER_NAME',
-                                             'instance-group-test')
+GCE_INSTANCE_GROUP_MANAGER_NAME = os.environ.get('GCE_INSTANCE_GROUP_MANAGER_NAME',
+                                                 'instance-group-test')
 
 SOURCE_TEMPLATE_URL = os.environ.get(
     'SOURCE_TEMPLATE_URL',
-    "https://www.googleapis.com/compute/beta/projects/"
-    "example-project/global/instanceTemplates/instance-template-test")
+    "https://www.googleapis.com/compute/beta/projects/" + GCP_PROJECT_ID +
+    "/global/instanceTemplates/instance-template-test")
 
 DESTINATION_TEMPLATE_URL = os.environ.get(
     'DESTINATION_TEMPLATE_URL',
-    "https://www.googleapis.com/compute/beta/projects/"
-    "example-airflow/global/instanceTemplates/" + NEW_TEMPLATE_NAME)
+    "https://www.googleapis.com/compute/beta/projects/" + GCP_PROJECT_ID +
+    "/global/instanceTemplates/" + GCE_NEW_TEMPLATE_NAME)
 
 UPDATE_POLICY = {
     "type": "OPPORTUNISTIC",
@@ -99,29 +98,29 @@
 with models.DAG(
     'example_gcp_compute_igm',
     default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
+    schedule_interval=None  # Override to match your needs
 ) as dag:
     # [START howto_operator_gce_igm_copy_template]
     gce_instance_template_copy = GceInstanceTemplateCopyOperator(
-        project_id=PROJECT_ID,
-        resource_id=TEMPLATE_NAME,
+        project_id=GCP_PROJECT_ID,
+        resource_id=GCE_TEMPLATE_NAME,
         body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
         task_id='gcp_compute_igm_copy_template_task'
     )
     # [END howto_operator_gce_igm_copy_template]
     # Added to check for idempotence
     gce_instance_template_copy2 = GceInstanceTemplateCopyOperator(
-        project_id=PROJECT_ID,
-        resource_id=TEMPLATE_NAME,
+        project_id=GCP_PROJECT_ID,
+        resource_id=GCE_TEMPLATE_NAME,
         body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
         task_id='gcp_compute_igm_copy_template_task_2'
     )
     # [START howto_operator_gce_igm_update_template]
     gce_instance_group_manager_update_template = \
         GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            resource_id=INSTANCE_GROUP_MANAGER_NAME,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            zone=GCE_ZONE,
             source_template=SOURCE_TEMPLATE_URL,
             destination_template=DESTINATION_TEMPLATE_URL,
             update_policy=UPDATE_POLICY,
@@ -131,9 +130,9 @@
     # Added to check for idempotence (and without UPDATE_POLICY)
     gce_instance_group_manager_update_template2 = \
         GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            resource_id=INSTANCE_GROUP_MANAGER_NAME,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            zone=GCE_ZONE,
             source_template=SOURCE_TEMPLATE_URL,
             destination_template=DESTINATION_TEMPLATE_URL,
             task_id='gcp_compute_igm_group_manager_update_template_2'
diff --git a/airflow/contrib/example_dags/example_gcp_function.py b/airflow/contrib/example_dags/example_gcp_function.py
new file mode 100644
index 0000000000..afd6d36aed
--- /dev/null
+++ b/airflow/contrib/example_dags/example_gcp_function.py
@@ -0,0 +1,125 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that displays interactions with Google Cloud Functions.
+It creates a function and then deletes it.
+
+This DAG relies on the following OS environment variables
+https://airflow.apache.org/concepts.html#variables
+* GCP_PROJECT_ID - Google Cloud Project to use for the Cloud Function.
+* GCP_LOCATION - Google Cloud Functions region where the function should be
+  created.
+* GCF_ENTRYPOINT - Name of the executable function in the source code.
+* and one of the below:
+    - GCF_SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage
+    or
+    (
+        - GCF_SOURCE_UPLOAD_URL - Generated upload URL for the zipped source
+        and
+        - GCF_ZIP_PATH - Local path to the zipped source archive
+    )
+    or
+    - GCF_SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function
+    is defined in a supported Cloud Source Repository URL format
+    https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository
+"""
+
+import os
+
+from airflow import models
+from airflow.contrib.operators.gcp_function_operator \
+    import GcfFunctionDeployOperator, GcfFunctionDeleteOperator
+from airflow.utils import dates
+
+# [START howto_operator_gcf_common_variables]
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
+GCF_SHORT_FUNCTION_NAME = os.environ.get('GCF_SHORT_FUNCTION_NAME', 'hello').\
+    replace("-", "_")  # make sure there are no dashes in function name (!)
+FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
+                                                               GCP_LOCATION,
+                                                               GCF_SHORT_FUNCTION_NAME)
+# [END howto_operator_gcf_common_variables]
+# [START howto_operator_gcf_deploy_variables]
+GCF_SOURCE_ARCHIVE_URL = os.environ.get('GCF_SOURCE_ARCHIVE_URL', '')
+GCF_SOURCE_UPLOAD_URL = os.environ.get('GCF_SOURCE_UPLOAD_URL', '')
+GCF_SOURCE_REPOSITORY = os.environ.get(
+    'GCF_SOURCE_REPOSITORY',
+    'https://source.developers.google.com/'
+    'projects/{}/repos/hello-world/moveable-aliases/master'.format(GCP_PROJECT_ID))
+GCF_ZIP_PATH = os.environ.get('GCF_ZIP_PATH', '')
+GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld')
+GCF_RUNTIME = 'nodejs6'
+GCP_VALIDATE_BODY = os.environ.get('GCP_VALIDATE_BODY', True)
+# [END howto_operator_gcf_deploy_variables]
+
+# [START howto_operator_gcf_deploy_body]
+body = {
+    "name": FUNCTION_NAME,
+    "entryPoint": GCF_ENTRYPOINT,
+    "runtime": GCF_RUNTIME,
+    "httpsTrigger": {}
+}
+# [END howto_operator_gcf_deploy_body]
+
+# [START howto_operator_gcf_default_args]
+default_args = {
+    'start_date': dates.days_ago(1)
+}
+# [END howto_operator_gcf_default_args]
+
+# [START howto_operator_gcf_deploy_variants]
+if GCF_SOURCE_ARCHIVE_URL:
+    body['sourceArchiveUrl'] = GCF_SOURCE_ARCHIVE_URL
+elif GCF_SOURCE_REPOSITORY:
+    body['sourceRepository'] = {
+        'url': GCF_SOURCE_REPOSITORY
+    }
+elif GCF_ZIP_PATH:
+    body['sourceUploadUrl'] = ''
+    default_args['zip_path'] = GCF_ZIP_PATH
+elif GCF_SOURCE_UPLOAD_URL:
+    body['sourceUploadUrl'] = GCF_SOURCE_UPLOAD_URL
+else:
+    raise Exception("Please provide one of the source_code parameters")
+# [END howto_operator_gcf_deploy_variants]
+
+
+with models.DAG(
+    'example_gcp_function',
+    default_args=default_args,
+    schedule_interval=None  # Override to match your needs
+) as dag:
+    # [START howto_operator_gcf_deploy]
+    deploy_task = GcfFunctionDeployOperator(
+        task_id="gcf_deploy_task",
+        project_id=GCP_PROJECT_ID,
+        location=GCP_LOCATION,
+        body=body,
+        validate_body=GCP_VALIDATE_BODY
+    )
+    # [END howto_operator_gcf_deploy]
+    # [START howto_operator_gcf_delete]
+    delete_task = GcfFunctionDeleteOperator(
+        task_id="gcf_delete_task",
+        name=FUNCTION_NAME
+    )
+    # [END howto_operator_gcf_delete]
+    deploy_task >> delete_task
diff --git a/airflow/contrib/example_dags/example_gcp_function_delete.py b/airflow/contrib/example_dags/example_gcp_function_delete.py
deleted file mode 100644
index 642e3a744c..0000000000
--- a/airflow/contrib/example_dags/example_gcp_function_delete.py
+++ /dev/null
@@ -1,59 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG that deletes a Google Cloud Function.
-This DAG relies on the following OS environment variables
-* PROJECT_ID - Google Cloud Project where the Cloud Function exists.
-* LOCATION - Google Cloud Functions region where the function exists.
-* ENTRYPOINT - Name of the executable function in the source code.
-"""
-
-import os
-import datetime
-
-import airflow
-from airflow import models
-from airflow.contrib.operators.gcp_function_operator import GcfFunctionDeleteOperator
-
-# [START howto_operator_gcf_delete_args]
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-LOCATION = os.environ.get('LOCATION', 'europe-west1')
-ENTRYPOINT = os.environ.get('ENTRYPOINT', 'helloWorld')
-# A fully-qualified name of the function to delete
-
-FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
-                                                               ENTRYPOINT)
-# [END howto_operator_gcf_delete_args]
-
-default_args = {
-    'start_date': airflow.utils.dates.days_ago(1)
-}
-
-with models.DAG(
-    'example_gcp_function_delete',
-    default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
-) as dag:
-    # [START howto_operator_gcf_delete]
-    t1 = GcfFunctionDeleteOperator(
-        task_id="gcf_delete_task",
-        name=FUNCTION_NAME
-    )
-    # [END howto_operator_gcf_delete]
diff --git a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
deleted file mode 100644
index 76563d7596..0000000000
--- a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
+++ /dev/null
@@ -1,117 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG that creates a Google Cloud Function and then deletes it.
-
-This DAG relies on the following OS environment variables
-https://airflow.apache.org/concepts.html#variables
-* PROJECT_ID - Google Cloud Project to use for the Cloud Function.
-* LOCATION - Google Cloud Functions region where the function should be
-  created.
-* SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage
-or
-    * SOURCE_UPLOAD_URL - Generated upload URL for the zipped source
-    or
-    * ZIP_PATH - Local path to the zipped source archive
-or
-* SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function is
-defined in a supported Cloud Source Repository URL format
-https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository
-* ENTRYPOINT - Name of the executable function in the source code.
-"""
-
-import os
-import datetime
-
-from airflow import models
-from airflow.contrib.operators.gcp_function_operator \
-    import GcfFunctionDeployOperator, GcfFunctionDeleteOperator
-from airflow.utils import dates
-
-# [START howto_operator_gcf_deploy_variables]
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-LOCATION = os.environ.get('LOCATION', 'europe-west1')
-SOURCE_ARCHIVE_URL = os.environ.get('SOURCE_ARCHIVE_URL', '')
-SOURCE_UPLOAD_URL = os.environ.get('SOURCE_UPLOAD_URL', '')
-SOURCE_REPOSITORY = os.environ.get(
-    'SOURCE_REPOSITORY',
-    'https://source.developers.google.com/'
-    'projects/example-project/repos/hello-world/moveable-aliases/master')
-ZIP_PATH = os.environ.get('ZIP_PATH', '')
-ENTRYPOINT = os.environ.get('ENTRYPOINT', 'helloWorld')
-FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
-                                                               ENTRYPOINT)
-RUNTIME = 'nodejs6'
-VALIDATE_BODY = os.environ.get('VALIDATE_BODY', True)
-
-# [END howto_operator_gcf_deploy_variables]
-
-# [START howto_operator_gcf_deploy_body]
-body = {
-    "name": FUNCTION_NAME,
-    "entryPoint": ENTRYPOINT,
-    "runtime": RUNTIME,
-    "httpsTrigger": {}
-}
-# [END howto_operator_gcf_deploy_body]
-
-# [START howto_operator_gcf_default_args]
-default_args = {
-    'start_date': dates.days_ago(1)
-}
-# [END howto_operator_gcf_default_args]
-
-# [START howto_operator_gcf_deploy_variants]
-if SOURCE_ARCHIVE_URL:
-    body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
-elif SOURCE_REPOSITORY:
-    body['sourceRepository'] = {
-        'url': SOURCE_REPOSITORY
-    }
-elif ZIP_PATH:
-    body['sourceUploadUrl'] = ''
-    default_args['zip_path'] = ZIP_PATH
-elif SOURCE_UPLOAD_URL:
-    body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
-else:
-    raise Exception("Please provide one of the source_code parameters")
-# [END howto_operator_gcf_deploy_variants]
-
-
-with models.DAG(
-    'example_gcp_function_deploy_delete',
-    default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
-) as dag:
-    # [START howto_operator_gcf_deploy]
-    deploy_task = GcfFunctionDeployOperator(
-        task_id="gcf_deploy_task",
-        name=FUNCTION_NAME,
-        project_id=PROJECT_ID,
-        location=LOCATION,
-        body=body,
-        validate_body=VALIDATE_BODY
-    )
-    # [END howto_operator_gcf_deploy]
-    delete_task = GcfFunctionDeleteOperator(
-        task_id="gcf_delete_task",
-        name=FUNCTION_NAME
-    )
-    deploy_task >> delete_task
diff --git a/airflow/contrib/example_dags/example_gcp_spanner.py b/airflow/contrib/example_dags/example_gcp_spanner.py
index 0aeb1e63a0..f421451c89 100644
--- a/airflow/contrib/example_dags/example_gcp_spanner.py
+++ b/airflow/contrib/example_dags/example_gcp_spanner.py
@@ -39,7 +39,8 @@
 import airflow
 from airflow import models
 from airflow.contrib.operators.gcp_spanner_operator import \
-    CloudSpannerInstanceDeployOperator, CloudSpannerInstanceDatabaseQueryOperator, \
+    CloudSpannerInstanceDeployOperator, \
+    CloudSpannerInstanceDatabaseQueryOperator, \
     CloudSpannerInstanceDeleteOperator, \
     CloudSpannerInstanceDatabaseDeployOperator, \
     CloudSpannerInstanceDatabaseUpdateOperator, \
@@ -140,7 +141,7 @@
     spanner_instance_query_task = CloudSpannerInstanceDatabaseQueryOperator(
         project_id=GCP_PROJECT_ID,
         instance_id=GCP_SPANNER_INSTANCE_ID,
-        database_id='db1',
+        database_id=GCP_SPANNER_DATABASE_ID,
         query=["DELETE FROM my_table2 WHERE true"],
         task_id='spanner_instance_query'
     )
@@ -149,7 +150,7 @@
     spanner_instance_query2_task = CloudSpannerInstanceDatabaseQueryOperator(
         project_id=GCP_PROJECT_ID,
         instance_id=GCP_SPANNER_INSTANCE_ID,
-        database_id='db1',
+        database_id=GCP_SPANNER_DATABASE_ID,
         query="example_gcp_spanner.sql",
         task_id='spanner_instance_query2'
     )
diff --git a/airflow/contrib/example_dags/example_gcp_sql.py b/airflow/contrib/example_dags/example_gcp_sql.py
index eaf1f7d404..b663b4eceb 100644
--- a/airflow/contrib/example_dags/example_gcp_sql.py
+++ b/airflow/contrib/example_dags/example_gcp_sql.py
@@ -23,7 +23,7 @@
 
 This DAG relies on the following OS environment variables
 https://airflow.apache.org/concepts.html#variables
-* PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance.
+* GCP_PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance.
 * INSTANCE_NAME - Name of the Cloud SQL instance.
 * DB_NAME - Name of the database inside a Cloud SQL instance.
 """
@@ -44,15 +44,15 @@
 from six.moves.urllib.parse import urlsplit
 
 # [START howto_operator_cloudsql_arguments]
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'test-mysql')
-INSTANCE_NAME2 = os.environ.get('INSTANCE_NAME2', 'test-mysql2')
-DB_NAME = os.environ.get('DB_NAME', 'testdb')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql')
+INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2')
+DB_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'testdb')
 # [END howto_operator_cloudsql_arguments]
 
 # [START howto_operator_cloudsql_export_import_arguments]
-EXPORT_URI = os.environ.get('EXPORT_URI', 'gs://bucketName/fileName')
-IMPORT_URI = os.environ.get('IMPORT_URI', 'gs://bucketName/fileName')
+EXPORT_URI = os.environ.get('GCSQL_MYSQL_EXPORT_URI', 'gs://bucketName/fileName')
+IMPORT_URI = os.environ.get('GCSQL_MYSQL_IMPORT_URI', 'gs://bucketName/fileName')
 # [END howto_operator_cloudsql_export_import_arguments]
 
 # Bodies below represent Cloud SQL instance resources:
@@ -145,7 +145,7 @@
 db_create_body = {
     "instance": INSTANCE_NAME,
     "name": DB_NAME,
-    "project": PROJECT_ID
+    "project": GCP_PROJECT_ID
 }
 # [END howto_operator_cloudsql_db_create_body]
 # [START howto_operator_cloudsql_db_patch_body]
@@ -162,9 +162,8 @@
 with models.DAG(
     'example_gcp_sql',
     default_args=default_args,
-    schedule_interval=None
+    schedule_interval=None  # Override to match your needs
 ) as dag:
-    prev_task = None
 
     def next_dep(task, prev):
         prev >> task
@@ -176,7 +175,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_create]
     sql_instance_create_task = CloudSqlInstanceCreateOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=body,
         instance=INSTANCE_NAME,
         task_id='sql_instance_create_task'
@@ -185,7 +184,7 @@ def next_dep(task, prev):
     prev_task = sql_instance_create_task
 
     sql_instance_create_2_task = CloudSqlInstanceCreateOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=body2,
         instance=INSTANCE_NAME2,
         task_id='sql_instance_create_2_task'
@@ -198,7 +197,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_patch]
     sql_instance_patch_task = CloudSqlInstancePatchOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=patch_body,
         instance=INSTANCE_NAME,
         task_id='sql_instance_patch_task'
@@ -208,7 +207,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_db_create]
     sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=db_create_body,
         instance=INSTANCE_NAME,
         task_id='sql_db_create_task'
@@ -218,7 +217,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_db_patch]
     sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=db_patch_body,
         instance=INSTANCE_NAME,
         database=DB_NAME,
@@ -236,7 +235,9 @@ def next_dep(task, prev):
     # write access to the destination GCS bucket.
     # [START howto_operator_cloudsql_export_gcs_permissions]
     sql_gcp_add_bucket_permission_task = GoogleCloudStorageBucketCreateAclEntryOperator(
-        entity="user-{{ task_instance.xcom_pull('sql_instance_create_task', key='service_account_email') }}",
+        entity="user-{{ task_instance.xcom_pull("
+               "'sql_instance_create_task', key='service_account_email') "
+               "}}",
         role="WRITER",
         bucket=export_url_split[1],  # netloc (bucket)
         task_id='sql_gcp_add_bucket_permission_task'
@@ -246,7 +247,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_export]
     sql_export_task = CloudSqlInstanceExportOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=export_body,
         instance=INSTANCE_NAME,
         task_id='sql_export_task'
@@ -263,8 +264,9 @@ def next_dep(task, prev):
     # read access to the target GCS object.
     # [START howto_operator_cloudsql_import_gcs_permissions]
     sql_gcp_add_object_permission_task = GoogleCloudStorageObjectCreateAclEntryOperator(
-        entity="user-{{ task_instance.xcom_pull('sql_instance_create_2_task', "
-               "key='service_account_email') }}",
+        entity="user-{{ task_instance.xcom_pull("
+               "'sql_instance_create_2_task', key='service_account_email')"
+               " }}",
         role="READER",
         bucket=import_url_split[1],  # netloc (bucket)
         object_name=import_url_split[2][1:],  # path (strip first '/')
@@ -287,7 +289,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_import]
     sql_import_task = CloudSqlInstanceImportOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=import_body,
         instance=INSTANCE_NAME2,
         task_id='sql_import_task'
@@ -301,7 +303,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_db_delete]
     sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         instance=INSTANCE_NAME,
         database=DB_NAME,
         task_id='sql_db_delete_task'
@@ -315,7 +317,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_delete]
     sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         instance=INSTANCE_NAME,
         task_id='sql_instance_delete_task'
     )
@@ -323,7 +325,7 @@ def next_dep(task, prev):
     prev_task = next_dep(sql_instance_delete_task, prev_task)
 
     sql_instance_delete_2_task = CloudSqlInstanceDeleteOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         instance=INSTANCE_NAME2,
         task_id='sql_instance_delete_2_task'
     )
diff --git a/airflow/contrib/example_dags/example_gcp_sql_query.py b/airflow/contrib/example_dags/example_gcp_sql_query.py
index 5439fb6afc..b830417d9c 100644
--- a/airflow/contrib/example_dags/example_gcp_sql_query.py
+++ b/airflow/contrib/example_dags/example_gcp_sql_query.py
@@ -22,26 +22,25 @@
 
 This DAG relies on the following OS environment variables
 
-* PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance
-* LOCATION - Google Cloud location where the database is created
+* GCP_PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance
+* GCP_REGION - Google Cloud region where the database is created
 *
-* POSTGRES_INSTANCE_NAME - Name of the postgres Cloud SQL instance
-* POSTGRES_USER - Name of the postgres database user
-* POSTGRES_PASSWORD - Password of the postgres database user
-* POSTGRES_PROXY_PORT - Local port number for proxy connections for postgres
-* POSTGRES_PUBLIC_IP - Public IP of the Postgres database
-* POSTGRES_PUBLIC_PORT - Port of the postgres database
+* GCSQL_POSTGRES_INSTANCE_NAME - Name of the postgres Cloud SQL instance
+* GCSQL_POSTGRES_USER - Name of the postgres database user
+* GCSQL_POSTGRES_PASSWORD - Password of the postgres database user
+* GCSQL_POSTGRES_PUBLIC_IP - Public IP of the Postgres database
+* GCSQL_POSTGRES_PUBLIC_PORT - Port of the postgres database
 *
-* MYSQL_INSTANCE_NAME - Name of the postgres Cloud SQL instance
-* MYSQL_USER - Name of the mysql database user
-* MYSQL_PASSWORD - Password of the mysql database user
-* MYSQL_PROXY_PORT - Local port number for proxy connections for mysql
-* MYSQL_PUBLIC_IP - Public IP of the mysql database
-* MYSQL_PUBLIC_PORT - Port of the mysql database
+* GCSQL_MYSQL_INSTANCE_NAME - Name of the postgres Cloud SQL instance
+* GCSQL_MYSQL_USER - Name of the mysql database user
+* GCSQL_MYSQL_PASSWORD - Password of the mysql database user
+* GCSQL_MYSQL_PUBLIC_IP - Public IP of the mysql database
+* GCSQL_MYSQL_PUBLIC_PORT - Port of the mysql database
 """
 
 import os
 import subprocess
+from os.path import expanduser
 
 from six.moves.urllib.parse import quote_plus
 
@@ -51,34 +50,38 @@
 
 # [START howto_operator_cloudsql_query_arguments]
 
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-LOCATION = os.environ.get('REGION', 'europe-west-1')
-
-POSTGRES_INSTANCE_NAME = os.environ.get('POSTGRES_INSTANCE_NAME', 'testpostgres')
-POSTGRES_DATABASE_NAME = os.environ.get('POSTGRES_DATABASE_NAME', 'postgresdb')
-POSTGRES_USER = os.environ.get('POSTGRES_USER', 'postgres_user')
-POSTGRES_PASSWORD = os.environ.get('POSTGRES_PASSWORD', 'password')
-POSTGRES_PUBLIC_IP = os.environ.get('POSTGRES_PUBLIC_IP', '0.0.0.0')
-POSTGRES_PUBLIC_PORT = os.environ.get('POSTGRES_PUBLIC_PORT', 5432)
-POSTGRES_CLIENT_CERT_FILE = os.environ.get('POSTGRES_CLIENT_CERT_FILE',
-                                           "/tmp/client-cert.pem")
-POSTGRES_CLIENT_KEY_FILE = os.environ.get('POSTGRES_CLIENT_KEY_FILE',
-                                          "/tmp/client-key.pem")
-POSTGRES_SERVER_CA_FILE = os.environ.get('POSTGRES_SERVER_CA_FILE',
-                                         "/tmp/server-ca.pem")
-
-MYSQL_INSTANCE_NAME = os.environ.get('MYSQL_INSTANCE_NAME', 'testmysql')
-MYSQL_DATABASE_NAME = os.environ.get('MYSQL_DATABASE_NAME', 'mysqldb')
-MYSQL_USER = os.environ.get('MYSQL_USER', 'mysql_user')
-MYSQL_PASSWORD = os.environ.get('MYSQL_PASSWORD', 'password')
-MYSQL_PUBLIC_IP = os.environ.get('MYSQL_PUBLIC_IP', '0.0.0.0')
-MYSQL_PUBLIC_PORT = os.environ.get('MYSQL_PUBLIC_PORT', 3306)
-MYSQL_CLIENT_CERT_FILE = os.environ.get('MYSQL_CLIENT_CERT_FILE',
-                                        "/tmp/client-cert.pem")
-MYSQL_CLIENT_KEY_FILE = os.environ.get('MYSQL_CLIENT_KEY_FILE',
-                                       "/tmp/client-key.pem")
-MYSQL_SERVER_CA_FILE = os.environ.get('MYSQL_SERVER_CA_FILE',
-                                      "/tmp/server-ca.pem")
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_REGION = os.environ.get('GCP_REGION', 'europe-west-1b')
+
+GCSQL_POSTGRES_INSTANCE_NAME_QUERY = os.environ.get(
+    'GCSQL_POSTGRES_INSTANCE_NAME_QUERY',
+    'testpostgres')
+GCSQL_POSTGRES_DATABASE_NAME = os.environ.get('GCSQL_POSTGRES_DATABASE_NAME',
+                                              'postgresdb')
+GCSQL_POSTGRES_USER = os.environ.get('GCSQL_POSTGRES_USER', 'postgres_user')
+GCSQL_POSTGRES_PASSWORD = os.environ.get('GCSQL_POSTGRES_PASSWORD', 'password')
+GCSQL_POSTGRES_PUBLIC_IP = os.environ.get('GCSQL_POSTGRES_PUBLIC_IP', '0.0.0.0')
+GCSQL_POSTGRES_PUBLIC_PORT = os.environ.get('GCSQL_POSTGRES_PUBLIC_PORT', 5432)
+GCSQL_POSTGRES_CLIENT_CERT_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_CERT_FILE',
+                                                 ".key/postgres-client-cert.pem")
+GCSQL_POSTGRES_CLIENT_KEY_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_KEY_FILE',
+                                                ".key/postgres-client-key.pem")
+GCSQL_POSTGRES_SERVER_CA_FILE = os.environ.get('GCSQL_POSTGRES_SERVER_CA_FILE',
+                                               ".key/postgres-server-ca.pem")
+
+GCSQL_MYSQL_INSTANCE_NAME_QUERY = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME_QUERY',
+                                                 'testmysql')
+GCSQL_MYSQL_DATABASE_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'mysqldb')
+GCSQL_MYSQL_USER = os.environ.get('GCSQL_MYSQL_USER', 'mysql_user')
+GCSQL_MYSQL_PASSWORD = os.environ.get('GCSQL_MYSQL_PASSWORD', 'password')
+GCSQL_MYSQL_PUBLIC_IP = os.environ.get('GCSQL_MYSQL_PUBLIC_IP', '0.0.0.0')
+GCSQL_MYSQL_PUBLIC_PORT = os.environ.get('GCSQL_MYSQL_PUBLIC_PORT', 3306)
+GCSQL_MYSQL_CLIENT_CERT_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_CERT_FILE',
+                                              ".key/mysql-client-cert.pem")
+GCSQL_MYSQL_CLIENT_KEY_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_KEY_FILE',
+                                             ".key/mysql-client-key.pem")
+GCSQL_MYSQL_SERVER_CA_FILE = os.environ.get('GCSQL_MYSQL_SERVER_CA_FILE',
+                                            ".key/mysql-server-ca.pem")
 
 SQL = [
     'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
@@ -97,18 +100,28 @@
 
 # [START howto_operator_cloudsql_query_connections]
 
+HOME_DIR = expanduser("~")
+
+
+def get_absolute_path(path):
+    if path.startswith("/"):
+        return path
+    else:
+        return os.path.join(HOME_DIR, path)
+
+
 postgres_kwargs = dict(
-    user=quote_plus(POSTGRES_USER),
-    password=quote_plus(POSTGRES_PASSWORD),
-    public_port=POSTGRES_PUBLIC_PORT,
-    public_ip=quote_plus(POSTGRES_PUBLIC_IP),
-    project_id=quote_plus(PROJECT_ID),
-    location=quote_plus(LOCATION),
-    instance=quote_plus(POSTGRES_INSTANCE_NAME),
-    database=quote_plus(POSTGRES_DATABASE_NAME),
-    client_cert_file=quote_plus(POSTGRES_CLIENT_CERT_FILE),
-    client_key_file=quote_plus(POSTGRES_CLIENT_KEY_FILE),
-    server_ca_file=quote_plus(POSTGRES_SERVER_CA_FILE)
+    user=quote_plus(GCSQL_POSTGRES_USER),
+    password=quote_plus(GCSQL_POSTGRES_PASSWORD),
+    public_port=GCSQL_POSTGRES_PUBLIC_PORT,
+    public_ip=quote_plus(GCSQL_POSTGRES_PUBLIC_IP),
+    project_id=quote_plus(GCP_PROJECT_ID),
+    location=quote_plus(GCP_REGION),
+    instance=quote_plus(GCSQL_POSTGRES_INSTANCE_NAME_QUERY),
+    database=quote_plus(GCSQL_POSTGRES_DATABASE_NAME),
+    client_cert_file=quote_plus(get_absolute_path(GCSQL_POSTGRES_CLIENT_CERT_FILE)),
+    client_key_file=quote_plus(get_absolute_path(GCSQL_POSTGRES_CLIENT_KEY_FILE)),
+    server_ca_file=quote_plus(get_absolute_path(GCSQL_POSTGRES_SERVER_CA_FILE))
 )
 
 # The connections below are created using one of the standard approaches - via environment
@@ -161,17 +174,17 @@
     .format(**postgres_kwargs)
 
 mysql_kwargs = dict(
-    user=quote_plus(MYSQL_USER),
-    password=quote_plus(MYSQL_PASSWORD),
-    public_port=MYSQL_PUBLIC_PORT,
-    public_ip=quote_plus(MYSQL_PUBLIC_IP),
-    project_id=quote_plus(PROJECT_ID),
-    location=quote_plus(LOCATION),
-    instance=quote_plus(MYSQL_INSTANCE_NAME),
-    database=quote_plus(MYSQL_DATABASE_NAME),
-    client_cert_file=quote_plus(MYSQL_CLIENT_CERT_FILE),
-    client_key_file=quote_plus(MYSQL_CLIENT_KEY_FILE),
-    server_ca_file=quote_plus(MYSQL_SERVER_CA_FILE)
+    user=quote_plus(GCSQL_MYSQL_USER),
+    password=quote_plus(GCSQL_MYSQL_PASSWORD),
+    public_port=GCSQL_MYSQL_PUBLIC_PORT,
+    public_ip=quote_plus(GCSQL_MYSQL_PUBLIC_IP),
+    project_id=quote_plus(GCP_PROJECT_ID),
+    location=quote_plus(GCP_REGION),
+    instance=quote_plus(GCSQL_MYSQL_INSTANCE_NAME_QUERY),
+    database=quote_plus(GCSQL_MYSQL_DATABASE_NAME),
+    client_cert_file=quote_plus(get_absolute_path(GCSQL_MYSQL_CLIENT_CERT_FILE)),
+    client_key_file=quote_plus(get_absolute_path(GCSQL_MYSQL_CLIENT_KEY_FILE)),
+    server_ca_file=quote_plus(get_absolute_path(GCSQL_MYSQL_SERVER_CA_FILE))
 )
 
 # MySQL: connect via proxy over TCP (specific proxy version)
@@ -243,17 +256,23 @@
 
 tasks = []
 
+
 with models.DAG(
     dag_id='example_gcp_sql_query',
     default_args=default_args,
     schedule_interval=None
 ) as dag:
+    prev_task = None
+
     for connection_name in connection_names:
-        tasks.append(
-            CloudSqlQueryOperator(
-                gcp_cloudsql_conn_id=connection_name,
-                task_id="example_gcp_sql_task_" + connection_name,
-                sql=SQL
-            )
+        task = CloudSqlQueryOperator(
+            gcp_cloudsql_conn_id=connection_name,
+            task_id="example_gcp_sql_task_" + connection_name,
+            sql=SQL
         )
+        tasks.append(task)
+        if prev_task:
+            prev_task >> task
+        prev_task = task
+
 # [END howto_operator_cloudsql_query_operators]
diff --git a/airflow/contrib/hooks/gcp_function_hook.py b/airflow/contrib/hooks/gcp_function_hook.py
index 29cef1716c..7f1b395d67 100644
--- a/airflow/contrib/hooks/gcp_function_hook.py
+++ b/airflow/contrib/hooks/gcp_function_hook.py
@@ -71,26 +71,12 @@ def get_function(self, name):
         return self.get_conn().projects().locations().functions().get(
             name=name).execute(num_retries=NUM_RETRIES)
 
-    def list_functions(self, full_location):
-        """
-        Lists all Cloud Functions created in the location.
-
-        :param full_location: full location including the project in the form of
-            of /projects/<PROJECT>/location/<LOCATION>
-        :type full_location: str
-        :return: array of Cloud Functions objects - representing functions in the location
-        :rtype: [dict]
-        """
-        list_response = self.get_conn().projects().locations().functions().list(
-            parent=full_location).execute(num_retries=NUM_RETRIES)
-        return list_response.get("functions", [])
-
     def create_new_function(self, full_location, body):
         """
         Creates a new function in Cloud Function in the location specified in the body.
 
         :param full_location: full location including the project in the form of
-            of /projects/<PROJECT>/location/<LOCATION>
+            of /projects/<PROJECT>/location/<GCP_LOCATION>
         :type full_location: str
         :param body: body required by the Cloud Functions insert API
         :type body: dict
@@ -130,7 +116,7 @@ def upload_function_zip(self, parent, zip_path):
         Uploads zip file with sources.
 
         :param parent: Google Cloud Platform project id and region where zip file should
-         be uploaded in the form of /projects/<PROJECT>/location/<LOCATION>
+            be uploaded in the form of /projects/<PROJECT>/location/<GCP_LOCATION>
         :type parent: str
         :param zip_path: path of the valid .zip file to upload
         :type zip_path: str
@@ -143,7 +129,7 @@ def upload_function_zip(self, parent, zip_path):
         with open(zip_path, 'rb') as fp:
             requests.put(
                 url=upload_url,
-                data=fp.read(),
+                data=fp,
                 # Those two headers needs to be specified according to:
                 # https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/generateUploadUrl
                 # nopep8
diff --git a/airflow/contrib/operators/gcp_function_operator.py b/airflow/contrib/operators/gcp_function_operator.py
index 7f7da1d3ec..811dab7a15 100644
--- a/airflow/contrib/operators/gcp_function_operator.py
+++ b/airflow/contrib/operators/gcp_function_operator.py
@@ -81,8 +81,9 @@ def _validate_max_instances(value):
 class GcfFunctionDeployOperator(BaseOperator):
     """
     Creates a function in Google Cloud Functions.
+    If a function with this name already exists, it will be updated.
 
-    :param project_id: Google Cloud Platform Project ID where the function should
+    :param project_id: Google Cloud Platform project ID where the function should
         be created.
     :type project_id: str
     :param location: Google Cloud Platform region where the function should be created.
@@ -105,9 +106,9 @@ class GcfFunctionDeployOperator(BaseOperator):
     :param validate_body: If set to False, body validation is not performed.
     :type validate_body: bool
     """
-    # [START gce_function_deploy_template_operator_template_fields]
+    # [START gcf_function_deploy_template_fields]
     template_fields = ('project_id', 'location', 'gcp_conn_id', 'api_version')
-    # [END gce_function_deploy_template_operator_template_fields]
+    # [END gcf_function_deploy_template_fields]
 
     @apply_defaults
     def __init__(self,
@@ -181,7 +182,7 @@ def _set_airflow_version_label(self):
 
     def execute(self, context):
         if self.zip_path_preprocessor.should_upload_function():
-            self.body[SOURCE_UPLOAD_URL] = self._upload_source_code()
+            self.body[GCF_SOURCE_UPLOAD_URL] = self._upload_source_code()
         self._validate_all_body_fields()
         self._set_airflow_version_label()
         if not self._check_if_function_exists():
@@ -190,10 +191,10 @@ def execute(self, context):
             self._update_function()
 
 
-SOURCE_ARCHIVE_URL = 'sourceArchiveUrl'
-SOURCE_UPLOAD_URL = 'sourceUploadUrl'
+GCF_SOURCE_ARCHIVE_URL = 'sourceArchiveUrl'
+GCF_SOURCE_UPLOAD_URL = 'sourceUploadUrl'
 SOURCE_REPOSITORY = 'sourceRepository'
-ZIP_PATH = 'zip_path'
+GCF_ZIP_PATH = 'zip_path'
 
 
 class ZipPathPreprocessor:
@@ -226,28 +227,28 @@ def _is_present_and_empty(dictionary, field):
         return field in dictionary and not dictionary[field]
 
     def _verify_upload_url_and_no_zip_path(self):
-        if self._is_present_and_empty(self.body, SOURCE_UPLOAD_URL):
+        if self._is_present_and_empty(self.body, GCF_SOURCE_UPLOAD_URL):
             if not self.zip_path:
                 raise AirflowException(
                     "Parameter '{}' is empty in the body and argument '{}' "
                     "is missing or empty. You need to have non empty '{}' "
                     "when '{}' is present and empty.".
-                    format(SOURCE_UPLOAD_URL, ZIP_PATH, ZIP_PATH, SOURCE_UPLOAD_URL))
+                    format(GCF_SOURCE_UPLOAD_URL, GCF_ZIP_PATH, GCF_ZIP_PATH, GCF_SOURCE_UPLOAD_URL))
 
     def _verify_upload_url_and_zip_path(self):
-        if SOURCE_UPLOAD_URL in self.body and self.zip_path:
-            if not self.body[SOURCE_UPLOAD_URL]:
+        if GCF_SOURCE_UPLOAD_URL in self.body and self.zip_path:
+            if not self.body[GCF_SOURCE_UPLOAD_URL]:
                 self.upload_function = True
             else:
                 raise AirflowException("Only one of '{}' in body or '{}' argument "
                                        "allowed. Found both."
-                                       .format(SOURCE_UPLOAD_URL, ZIP_PATH))
+                                       .format(GCF_SOURCE_UPLOAD_URL, GCF_ZIP_PATH))
 
     def _verify_archive_url_and_zip_path(self):
-        if SOURCE_ARCHIVE_URL in self.body and self.zip_path:
+        if GCF_SOURCE_ARCHIVE_URL in self.body and self.zip_path:
             raise AirflowException("Only one of '{}' in body or '{}' argument "
                                    "allowed. Found both."
-                                   .format(SOURCE_ARCHIVE_URL, ZIP_PATH))
+                                   .format(GCF_SOURCE_ARCHIVE_URL, GCF_ZIP_PATH))
 
     def should_upload_function(self):
         if self.upload_function is None:
@@ -279,9 +280,9 @@ class GcfFunctionDeleteOperator(BaseOperator):
     :param api_version: API version used (for example v1 or v1beta1).
     :type api_version: str
     """
-    # [START gce_function_delete_template_operator_template_fields]
+    # [START gcf_function_delete_template_fields]
     template_fields = ('name', 'gcp_conn_id', 'api_version')
-    # [END gce_function_delete_template_operator_template_fields]
+    # [END gcf_function_delete_template_fields]
 
     @apply_defaults
     def __init__(self,
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index 7d7e055d88..950f4b3c97 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -507,15 +507,15 @@ Arguments
 The following examples of OS environment variables show how you can build function name
 to use in the operator:
 
-.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function.py
     :language: python
-    :start-after: [START howto_operator_gcf_delete_args]
-    :end-before: [END howto_operator_gcf_delete_args]
+    :start-after: [START howto_operator_gcf_common_variables]
+    :end-before: [END howto_operator_gcf_common_variables]
 
 Using the operator
 """"""""""""""""""
 
-.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcf_delete]
@@ -527,33 +527,8 @@ Templating
 .. literalinclude:: ../../airflow/contrib/operators/gcp_function_operator.py
     :language: python
     :dedent: 4
-    :start-after: [START gce_function_delete_template_operator_template_fields]
-    :end-before: [END gce_function_delete_template_operator_template_fields]
-
-Troubleshooting
-"""""""""""""""
-If you want to run or deploy an operator using a service account and get “forbidden 403”
-errors, it means that your service account does not have the correct
-Cloud IAM permissions.
-
-1. Assign your Service Account the Cloud Functions Developer role.
-2. Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime
-   service account.
-
-The typical way of assigning Cloud IAM permissions with `gcloud` is
-shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project
-and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
-
-.. code-block:: bash
-
-  gcloud iam service-accounts add-iam-policy-binding \
-    PROJECT_ID@appspot.gserviceaccount.com \
-    --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
-    --role="roles/iam.serviceAccountUser"
-
-
-See `Adding the IAM service agent user role to the runtime service
-<https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account>`_.
+    :start-after: [START gcf_function_delete_template_fields]
+    :end-before: [END gcf_function_delete_template_fields]
 
 More information
 """"""""""""""""
@@ -565,6 +540,7 @@ GcfFunctionDeployOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^
 
 Use the operator to deploy a function to Google Cloud Functions.
+If a function with this name already exists, it will be updated.
 
 For parameter definition, take a look at
 :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator`.
@@ -573,55 +549,61 @@ For parameter definition, take a look at
 Arguments
 """""""""
 
-The following examples of OS environment variables show several variants of args you can
-use with the operator:
+In the example DAG the following environment variables are used to parameterize the
+operator's definition:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function.py
+    :language: python
+    :start-after: [START howto_operator_gcf_common_variables]
+    :end-before: [END howto_operator_gcf_common_variables]
 
-.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function.py
     :language: python
     :start-after: [START howto_operator_gcf_deploy_variables]
     :end-before: [END howto_operator_gcf_deploy_variables]
 
-With those variables you can define the body of the request:
+Some of those variables are used to create the request's body:
 
-.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function.py
     :language: python
     :start-after: [START howto_operator_gcf_deploy_body]
     :end-before: [END howto_operator_gcf_deploy_body]
 
-When you create a DAG, the default_args dictionary can be used to pass
+When a DAG is created, the default_args dictionary can be used to pass
 arguments common with other tasks:
 
-.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function.py
     :language: python
     :start-after: [START howto_operator_gcf_default_args]
     :end-before: [END howto_operator_gcf_default_args]
 
 Note that the neither the body nor the default args are complete in the above examples.
 Depending on the variables set, there might be different variants on how to pass source
-code related fields. Currently, you can pass either sourceArchiveUrl, sourceRepository
-or sourceUploadUrl as described in the
+code related fields. Currently, you can pass either ``sourceArchiveUrl``,
+``sourceRepository`` or ``sourceUploadUrl`` as described in the
 `Cloud Functions API specification
 <https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#CloudFunction>`_.
 
-Additionally, default_args or direct operator args might contain zip_path parameter
+Additionally, ``default_args`` or direct operator args might contain ``zip_path``
+parameter
 to run the extra step of uploading the source code before deploying it.
-In this case, you also need to provide an empty `sourceUploadUrl`
+In this case, you also need to provide an empty ``sourceUploadUrl``
 parameter in the body.
 
 Using the operator
 """"""""""""""""""
 
-Based on the variables defined above, example logic of setting the source code
-related fields is shown here:
+Depending on the combination of parameters, the Function's source code can be obtained
+from different sources:
 
-.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function.py
     :language: python
     :start-after: [START howto_operator_gcf_deploy_variants]
     :end-before: [END howto_operator_gcf_deploy_variants]
 
 The code to create the operator:
 
-.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcf_deploy]
@@ -633,16 +615,20 @@ Templating
 .. literalinclude:: ../../airflow/contrib/operators/gcp_function_operator.py
     :language: python
     :dedent: 4
-    :start-after: [START gce_function_deploy_template_operator_template_fields]
-    :end-before: [END gce_function_deploy_template_operator_template_fields]
+    :start-after: [START gcf_function_deploy_template_fields]
+    :end-before: [END gcf_function_deploy_template_fields]
 
 
 Troubleshooting
 """""""""""""""
 
-If you want to run or deploy an operator using a service account and get “forbidden 403”
-errors, it means that your service account does not have the correct
-Cloud IAM permissions.
+If during the deploy you see an error similar to:
+
+`"HttpError 403: Missing necessary permission iam.serviceAccounts.actAs for on resource
+project-name@appspot.gserviceaccount.com. Please grant the
+roles/iam.serviceAccountUser role."`
+
+it means that your service account does not have the correct Cloud IAM permissions.
 
 1. Assign your Service Account the Cloud Functions Developer role.
 2. Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime
@@ -659,7 +645,9 @@ and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
     --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
     --role="roles/iam.serviceAccountUser"
 
-See `Adding the IAM service agent user role to the runtime service <https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account>`_  for details
+You can also do that via the GCP Web console.
+
+See `Adding the IAM service agent user role to the runtime service <https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account>`_  for details.
 
 If the source code for your function is in Google Source Repository, make sure that
 your service account has the Source Repository Viewer role so that the source code
diff --git a/tests/contrib/hooks/test_gcp_container_hook.py b/tests/contrib/hooks/test_gcp_container_hook.py
index f4a21da7fc..dfcc98cb5c 100644
--- a/tests/contrib/hooks/test_gcp_container_hook.py
+++ b/tests/contrib/hooks/test_gcp_container_hook.py
@@ -26,13 +26,13 @@
 
 TASK_ID = 'test-gke-cluster-operator'
 CLUSTER_NAME = 'test-cluster'
-TEST_PROJECT_ID = 'test-project'
-ZONE = 'test-zone'
+TEST_GCP_PROJECT_ID = 'test-project'
+GKE_ZONE = 'test-zone'
 
 
 class GKEClusterHookDeleteTest(unittest.TestCase):
     def setUp(self):
-        self.gke_hook = GKEClusterHook(location=ZONE)
+        self.gke_hook = GKEClusterHook(location=GKE_ZONE)
         self.gke_hook._client = mock.Mock()
 
     @mock.patch("airflow.contrib.hooks.gcp_container_hook.GKEClusterHook._dict_to_proto")
@@ -43,12 +43,15 @@ def test_delete_cluster(self, wait_mock, convert_mock):
 
         client_delete = self.gke_hook._client.delete_cluster = mock.Mock()
 
-        self.gke_hook.delete_cluster(name=CLUSTER_NAME, project_id=TEST_PROJECT_ID, retry=retry_mock,
+        self.gke_hook.delete_cluster(name=CLUSTER_NAME, project_id=TEST_GCP_PROJECT_ID,
+                                     retry=retry_mock,
                                      timeout=timeout_mock)
 
-        client_delete.assert_called_with(project_id=TEST_PROJECT_ID, zone=ZONE,
+        client_delete.assert_called_with(project_id=TEST_GCP_PROJECT_ID,
+                                         zone=GKE_ZONE,
                                          cluster_id=CLUSTER_NAME,
-                                         retry=retry_mock, timeout=timeout_mock)
+                                         retry=retry_mock,
+                                         timeout=timeout_mock)
         wait_mock.assert_called_with(client_delete.return_value)
         convert_mock.assert_not_called()
 
@@ -63,7 +66,7 @@ def test_delete_cluster_not_found(self, wait_mock, convert_mock, log_mock):
         message = 'Not Found'
         self.gke_hook._client.delete_cluster.side_effect = NotFound(message=message)
 
-        self.gke_hook.delete_cluster(None)
+        self.gke_hook.delete_cluster('not-existing')
         wait_mock.assert_not_called()
         convert_mock.assert_not_called()
         log_mock.info.assert_any_call("Assuming Success: " + message)
@@ -76,14 +79,14 @@ def test_delete_cluster_error(self, wait_mock, convert_mock):
         self.gke_hook._client.delete_cluster.side_effect = AirflowException('400')
 
         with self.assertRaises(AirflowException):
-            self.gke_hook.delete_cluster(None)
+            self.gke_hook.delete_cluster('a-cluster')
             wait_mock.assert_not_called()
             convert_mock.assert_not_called()
 
 
 class GKEClusterHookCreateTest(unittest.TestCase):
     def setUp(self):
-        self.gke_hook = GKEClusterHook(location=ZONE)
+        self.gke_hook = GKEClusterHook(location=GKE_ZONE)
         self.gke_hook._client = mock.Mock()
 
     @mock.patch("airflow.contrib.hooks.gcp_container_hook.GKEClusterHook._dict_to_proto")
@@ -99,10 +102,13 @@ def test_create_cluster_proto(self, wait_mock, convert_mock):
 
         client_create = self.gke_hook._client.create_cluster = mock.Mock()
 
-        self.gke_hook.create_cluster(mock_cluster_proto, project_id=TEST_PROJECT_ID, retry=retry_mock,
+        self.gke_hook.create_cluster(mock_cluster_proto,
+                                     project_id=TEST_GCP_PROJECT_ID,
+                                     retry=retry_mock,
                                      timeout=timeout_mock)
 
-        client_create.assert_called_with(project_id=TEST_PROJECT_ID, zone=ZONE,
+        client_create.assert_called_with(project_id=TEST_GCP_PROJECT_ID,
+                                         zone=GKE_ZONE,
                                          cluster=mock_cluster_proto,
                                          retry=retry_mock, timeout=timeout_mock)
         wait_mock.assert_called_with(client_create.return_value)
@@ -118,10 +124,13 @@ def test_create_cluster_dict(self, wait_mock, convert_mock):
         client_create = self.gke_hook._client.create_cluster = mock.Mock()
         proto_mock = convert_mock.return_value = mock.Mock()
 
-        self.gke_hook.create_cluster(mock_cluster_dict, project_id=TEST_PROJECT_ID, retry=retry_mock,
+        self.gke_hook.create_cluster(mock_cluster_dict,
+                                     project_id=TEST_GCP_PROJECT_ID,
+                                     retry=retry_mock,
                                      timeout=timeout_mock)
 
-        client_create.assert_called_with(project_id=TEST_PROJECT_ID, zone=ZONE,
+        client_create.assert_called_with(project_id=TEST_GCP_PROJECT_ID,
+                                         zone=GKE_ZONE,
                                          cluster=proto_mock,
                                          retry=retry_mock, timeout=timeout_mock)
         wait_mock.assert_called_with(client_create.return_value)
@@ -158,7 +167,7 @@ def test_create_cluster_already_exists(self, wait_mock, convert_mock, log_mock):
 
 class GKEClusterHookGetTest(unittest.TestCase):
     def setUp(self):
-        self.gke_hook = GKEClusterHook(location=ZONE)
+        self.gke_hook = GKEClusterHook(location=GKE_ZONE)
         self.gke_hook._client = mock.Mock()
 
     def test_get_cluster(self):
@@ -166,10 +175,13 @@ def test_get_cluster(self):
 
         client_get = self.gke_hook._client.get_cluster = mock.Mock()
 
-        self.gke_hook.get_cluster(name=CLUSTER_NAME, project_id=TEST_PROJECT_ID, retry=retry_mock,
+        self.gke_hook.get_cluster(name=CLUSTER_NAME,
+                                  project_id=TEST_GCP_PROJECT_ID,
+                                  retry=retry_mock,
                                   timeout=timeout_mock)
 
-        client_get.assert_called_with(project_id=TEST_PROJECT_ID, zone=ZONE,
+        client_get.assert_called_with(project_id=TEST_GCP_PROJECT_ID,
+                                      zone=GKE_ZONE,
                                       cluster_id=CLUSTER_NAME,
                                       retry=retry_mock, timeout=timeout_mock)
 
@@ -177,10 +189,11 @@ def test_get_cluster(self):
 class GKEClusterHookTest(unittest.TestCase):
 
     def setUp(self):
-        self.gke_hook = GKEClusterHook(location=ZONE)
+        self.gke_hook = GKEClusterHook(location=GKE_ZONE)
         self.gke_hook._client = mock.Mock()
 
-    @mock.patch('airflow.contrib.hooks.gcp_container_hook.container_v1.ClusterManagerClient')
+    @mock.patch('airflow.contrib.hooks.gcp_container_hook.container_v1.'
+                'ClusterManagerClient')
     @mock.patch('airflow.contrib.hooks.gcp_container_hook.ClientInfo')
     @mock.patch('airflow.contrib.hooks.gcp_container_hook.GKEClusterHook._get_credentials')
     def test_get_client(self, mock_get_credentials, mock_client_info, mock_client):
@@ -193,10 +206,9 @@ def test_get_client(self, mock_get_credentials, mock_client_info, mock_client):
 
     def test_get_operation(self):
         self.gke_hook._client.get_operation = mock.Mock()
-        self.gke_hook.get_operation('TEST_OP', project_id=TEST_PROJECT_ID)
-        self.gke_hook._client.get_operation.assert_called_with(project_id=TEST_PROJECT_ID,
-                                                               zone=ZONE,
-                                                               operation_id='TEST_OP')
+        self.gke_hook.get_operation('TEST_OP', project_id=TEST_GCP_PROJECT_ID)
+        self.gke_hook._client.get_operation.assert_called_with(
+            project_id=TEST_GCP_PROJECT_ID, zone=GKE_ZONE, operation_id='TEST_OP')
 
     def test_append_label(self):
         key = 'test-key'
@@ -244,11 +256,11 @@ def test_wait_for_response_running(self, time_mock, operation_mock):
 
         # Status goes from Running -> Pending -> Done
         operation_mock.side_effect = [pending_op, done_op]
-        self.gke_hook.wait_for_operation(running_op, project_id=TEST_PROJECT_ID)
+        self.gke_hook.wait_for_operation(running_op, project_id=TEST_GCP_PROJECT_ID)
 
         self.assertEqual(time_mock.call_count, 3)
-        operation_mock.assert_any_call(running_op.name, project_id=TEST_PROJECT_ID)
-        operation_mock.assert_any_call(pending_op.name, project_id=TEST_PROJECT_ID)
+        operation_mock.assert_any_call(running_op.name, project_id=TEST_GCP_PROJECT_ID)
+        operation_mock.assert_any_call(pending_op.name, project_id=TEST_GCP_PROJECT_ID)
         self.assertEqual(operation_mock.call_count, 2)
 
     @mock.patch("google.protobuf.json_format.Parse")
diff --git a/tests/contrib/hooks/test_gcp_dataproc_hook.py b/tests/contrib/hooks/test_gcp_dataproc_hook.py
index e22b27a8e7..acc4922f82 100644
--- a/tests/contrib/hooks/test_gcp_dataproc_hook.py
+++ b/tests/contrib/hooks/test_gcp_dataproc_hook.py
@@ -31,8 +31,8 @@
         mock = None
 
 JOB = 'test-job'
-PROJECT_ID = 'test-project-id'
-REGION = 'global'
+GCP_PROJECT_ID = 'test-project-id'
+GCP_REGION = 'global'
 TASK_ID = 'test-task-id'
 
 BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
@@ -53,8 +53,8 @@ def setUp(self):
     def test_submit(self, job_mock):
         with mock.patch(DATAPROC_STRING.format('DataProcHook.get_conn',
                                                return_value=None)):
-            self.dataproc_hook.submit(PROJECT_ID, JOB)
-            job_mock.assert_called_once_with(mock.ANY, PROJECT_ID, JOB, REGION,
+            self.dataproc_hook.submit(GCP_PROJECT_ID, JOB)
+            job_mock.assert_called_once_with(mock.ANY, GCP_PROJECT_ID, JOB, GCP_REGION,
                                              job_error_states=mock.ANY)
 
 
diff --git a/tests/contrib/operators/postgres_local_executor.cfg b/tests/contrib/operators/postgres_local_executor.cfg
new file mode 100644
index 0000000000..1f8eb3d3c3
--- /dev/null
+++ b/tests/contrib/operators/postgres_local_executor.cfg
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# This configuration file is used to override
+# the executor to be LocalExecutor and switch to using Postgres as the database
+# Some system tests require LocalExecutor to be used because of parallelism required
+# One example is test_gcp_bigtable_operator_system.py in which sensor
+# Has to be run in parallell with a task to create BigTable table
+[core]
+executor = LocalExecutor
+sql_alchemy_conn = postgresql:///airflow/airflow.db
diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py
index 84f1750bed..3046994109 100644
--- a/tests/contrib/operators/test_bigquery_operator.py
+++ b/tests/contrib/operators/test_bigquery_operator.py
@@ -41,7 +41,7 @@
 
 TASK_ID = 'test-bq-create-table-operator'
 TEST_DATASET = 'test-dataset'
-TEST_PROJECT_ID = 'test-project'
+TEST_GCP_PROJECT_ID = 'test-project'
 TEST_TABLE_ID = 'test-table-id'
 TEST_GCS_BUCKET = 'test-bucket'
 TEST_GCS_DATA = ['dir1/*.csv']
@@ -56,7 +56,7 @@ class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase):
     def test_execute(self, mock_hook):
         operator = BigQueryCreateEmptyTableOperator(task_id=TASK_ID,
                                                     dataset_id=TEST_DATASET,
-                                                    project_id=TEST_PROJECT_ID,
+                                                    project_id=TEST_GCP_PROJECT_ID,
                                                     table_id=TEST_TABLE_ID)
 
         operator.execute(None)
@@ -66,7 +66,7 @@ def test_execute(self, mock_hook):
             .create_empty_table \
             .assert_called_once_with(
                 dataset_id=TEST_DATASET,
-                project_id=TEST_PROJECT_ID,
+                project_id=TEST_GCP_PROJECT_ID,
                 table_id=TEST_TABLE_ID,
                 schema_fields=None,
                 time_partitioning={},
@@ -120,7 +120,7 @@ def test_execute(self, mock_hook):
         operator = BigQueryDeleteDatasetOperator(
             task_id=TASK_ID,
             dataset_id=TEST_DATASET,
-            project_id=TEST_PROJECT_ID
+            project_id=TEST_GCP_PROJECT_ID
         )
 
         operator.execute(None)
@@ -130,7 +130,7 @@ def test_execute(self, mock_hook):
             .delete_dataset \
             .assert_called_once_with(
                 dataset_id=TEST_DATASET,
-                project_id=TEST_PROJECT_ID
+                project_id=TEST_GCP_PROJECT_ID
             )
 
 
@@ -140,7 +140,7 @@ def test_execute(self, mock_hook):
         operator = BigQueryCreateEmptyDatasetOperator(
             task_id=TASK_ID,
             dataset_id=TEST_DATASET,
-            project_id=TEST_PROJECT_ID
+            project_id=TEST_GCP_PROJECT_ID
         )
 
         operator.execute(None)
@@ -150,7 +150,7 @@ def test_execute(self, mock_hook):
             .create_empty_dataset \
             .assert_called_once_with(
                 dataset_id=TEST_DATASET,
-                project_id=TEST_PROJECT_ID,
+                project_id=TEST_GCP_PROJECT_ID,
                 dataset_reference={}
             )
 
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 219d7255ff..28e4e035e9 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -50,9 +50,9 @@
 
 TASK_ID = 'test-dataproc-operator'
 CLUSTER_NAME = 'test-cluster-name'
-PROJECT_ID = 'test-project-id'
+GCP_PROJECT_ID = 'test-project-id'
 NUM_WORKERS = 123
-ZONE = 'us-central1-a'
+GCE_ZONE = 'us-central1-a'
 NETWORK_URI = '/projects/project_id/regions/global/net'
 SUBNETWORK_URI = '/projects/project_id/regions/global/subnet'
 INTERNAL_IP_ONLY = True
@@ -78,7 +78,7 @@
 AUTO_DELETE_TIME = datetime.datetime(2017, 6, 7)
 AUTO_DELETE_TTL = 654
 DEFAULT_DATE = datetime.datetime(2017, 6, 6)
-REGION = 'test-region'
+GCP_REGION = 'test-region'
 MAIN_URI = 'test-uri'
 TEMPLATE_ID = 'template-id'
 
@@ -87,7 +87,7 @@
 DATAPROC_JOB_TO_SUBMIT = {
     'job': {
         'reference': {
-            'projectId': PROJECT_ID,
+            'projectId': GCP_PROJECT_ID,
             'jobId': DATAPROC_JOB_ID,
         },
         'placement': {
@@ -118,9 +118,9 @@ def setUp(self):
                 DataprocClusterCreateOperator(
                     task_id=TASK_ID,
                     cluster_name=CLUSTER_NAME,
-                    project_id=PROJECT_ID,
+                    project_id=GCP_PROJECT_ID,
                     num_workers=NUM_WORKERS,
-                    zone=ZONE,
+                    zone=GCE_ZONE,
                     network_uri=NETWORK_URI,
                     subnetwork_uri=SUBNETWORK_URI,
                     internal_ip_only=INTERNAL_IP_ONLY,
@@ -154,9 +154,9 @@ def test_init(self):
         """Test DataProcClusterOperator instance is properly initialized."""
         for suffix, dataproc_operator in enumerate(self.dataproc_operators):
             self.assertEqual(dataproc_operator.cluster_name, CLUSTER_NAME)
-            self.assertEqual(dataproc_operator.project_id, PROJECT_ID)
+            self.assertEqual(dataproc_operator.project_id, GCP_PROJECT_ID)
             self.assertEqual(dataproc_operator.num_workers, NUM_WORKERS)
-            self.assertEqual(dataproc_operator.zone, ZONE)
+            self.assertEqual(dataproc_operator.zone, GCE_ZONE)
             self.assertEqual(dataproc_operator.network_uri, NETWORK_URI)
             self.assertEqual(dataproc_operator.subnetwork_uri, SUBNETWORK_URI)
             self.assertEqual(dataproc_operator.tags, TAGS)
@@ -186,7 +186,7 @@ def test_build_cluster_data(self):
         for suffix, dataproc_operator in enumerate(self.dataproc_operators):
             cluster_data = dataproc_operator._build_cluster_data()
             self.assertEqual(cluster_data['clusterName'], CLUSTER_NAME)
-            self.assertEqual(cluster_data['projectId'], PROJECT_ID)
+            self.assertEqual(cluster_data['projectId'], GCP_PROJECT_ID)
             self.assertEqual(cluster_data['config']['softwareConfig'],
                              {'imageVersion': IMAGE_VERSION})
             self.assertEqual(cluster_data['config']['configBucket'], STORAGE_BUCKET)
@@ -223,9 +223,9 @@ def test_build_cluster_data_with_autoDeleteTime(self):
         dataproc_operator = DataprocClusterCreateOperator(
             task_id=TASK_ID,
             cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             num_workers=NUM_WORKERS,
-            zone=ZONE,
+            zone=GCE_ZONE,
             dag=self.dag,
             auto_delete_time=AUTO_DELETE_TIME,
         )
@@ -237,9 +237,9 @@ def test_build_cluster_data_with_autoDeleteTtl(self):
         dataproc_operator = DataprocClusterCreateOperator(
             task_id=TASK_ID,
             cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             num_workers=NUM_WORKERS,
-            zone=ZONE,
+            zone=GCE_ZONE,
             dag=self.dag,
             auto_delete_ttl=AUTO_DELETE_TTL,
         )
@@ -251,9 +251,9 @@ def test_build_cluster_data_with_autoDeleteTime_and_autoDeleteTtl(self):
         dataproc_operator = DataprocClusterCreateOperator(
             task_id=TASK_ID,
             cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             num_workers=NUM_WORKERS,
-            zone=ZONE,
+            zone=GCE_ZONE,
             dag=self.dag,
             auto_delete_time=AUTO_DELETE_TIME,
             auto_delete_ttl=AUTO_DELETE_TTL,
@@ -270,9 +270,9 @@ def test_init_with_image_version_and_custom_image_both_set(self):
             DataprocClusterCreateOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
-                zone=ZONE,
+                zone=GCE_ZONE,
                 dag=self.dag,
                 image_version=IMAGE_VERSION,
                 custom_image=CUSTOM_IMAGE
@@ -282,9 +282,9 @@ def test_init_with_custom_image(self):
         dataproc_operator = DataprocClusterCreateOperator(
             task_id=TASK_ID,
             cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             num_workers=NUM_WORKERS,
-            zone=ZONE,
+            zone=GCE_ZONE,
             dag=self.dag,
             custom_image=CUSTOM_IMAGE
         )
@@ -292,7 +292,7 @@ def test_init_with_custom_image(self):
         cluster_data = dataproc_operator._build_cluster_data()
         expected_custom_image_url = \
             'https://www.googleapis.com/compute/beta/projects/' \
-            '{}/global/images/{}'.format(PROJECT_ID, CUSTOM_IMAGE)
+            '{}/global/images/{}'.format(GCP_PROJECT_ID, CUSTOM_IMAGE)
         self.assertEqual(cluster_data['config']['masterConfig']['imageUri'],
                          expected_custom_image_url)
         self.assertEqual(cluster_data['config']['workerConfig']['imageUri'],
@@ -302,10 +302,10 @@ def test_build_single_node_cluster(self):
         dataproc_operator = DataprocClusterCreateOperator(
             task_id=TASK_ID,
             cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             num_workers=0,
             num_preemptible_workers=0,
-            zone=ZONE,
+            zone=GCE_ZONE,
             dag=self.dag
         )
         cluster_data = dataproc_operator._build_cluster_data()
@@ -318,10 +318,10 @@ def test_init_cluster_with_zero_workers_and_not_non_zero_preemtibles(self):
             DataprocClusterCreateOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=0,
                 num_preemptible_workers=2,
-                zone=ZONE,
+                zone=GCE_ZONE,
                 dag=self.dag,
                 image_version=IMAGE_VERSION,
             )
@@ -332,9 +332,9 @@ def test_cluster_name_log_no_sub(self):
             dataproc_task = DataprocClusterCreateOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
-                zone=ZONE,
+                zone=GCE_ZONE,
                 dag=self.dag
             )
             with patch.object(dataproc_task.log, 'info') as mock_info:
@@ -348,9 +348,9 @@ def test_cluster_name_log_sub(self):
             dataproc_task = DataprocClusterCreateOperator(
                 task_id=TASK_ID,
                 cluster_name='smoke-cluster-{{ ts_nodash }}',
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
-                zone=ZONE,
+                zone=GCE_ZONE,
                 dag=self.dag
             )
             with patch.object(dataproc_task.log, 'info') as mock_info:
@@ -371,9 +371,9 @@ def create_cluster_with_invalid_internal_ip_only_setup():
             create_cluster = DataprocClusterCreateOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
-                zone=ZONE,
+                zone=GCE_ZONE,
                 dag=self.dag,
                 internal_ip_only=True)
 
@@ -418,7 +418,7 @@ def test_cluster_name_log_no_sub(self):
             dataproc_task = DataprocClusterScaleOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
                 num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
                 dag=self.dag
@@ -434,7 +434,7 @@ def test_cluster_name_log_sub(self):
             dataproc_task = DataprocClusterScaleOperator(
                 task_id=TASK_ID,
                 cluster_name='smoke-cluster-{{ ts_nodash }}',
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
                 num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
                 dag=self.dag
@@ -482,7 +482,7 @@ def test_cluster_name_log_no_sub(self):
             dataproc_task = DataprocClusterDeleteOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 dag=self.dag
             )
             with patch.object(dataproc_task.log, 'info') as mock_info:
@@ -496,7 +496,7 @@ def test_cluster_name_log_sub(self):
             dataproc_task = DataprocClusterDeleteOperator(
                 task_id=TASK_ID,
                 cluster_name='smoke-cluster-{{ ts_nodash }}',
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 dag=self.dag
             )
 
@@ -520,12 +520,12 @@ def test_hook_correct_region():
         with patch(HOOK) as mock_hook:
             dataproc_task = DataProcHadoopOperator(
                 task_id=TASK_ID,
-                region=REGION
+                region=GCP_REGION
             )
 
             dataproc_task.execute(None)
             mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
-                                                                  REGION, mock.ANY)
+                                                                  GCP_REGION, mock.ANY)
 
     @staticmethod
     def test_dataproc_job_id_is_set():
@@ -544,12 +544,12 @@ def test_hook_correct_region():
         with patch(HOOK) as mock_hook:
             dataproc_task = DataProcHiveOperator(
                 task_id=TASK_ID,
-                region=REGION
+                region=GCP_REGION
             )
 
             dataproc_task.execute(None)
             mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
-                                                                  REGION, mock.ANY)
+                                                                  GCP_REGION, mock.ANY)
 
     @staticmethod
     def test_dataproc_job_id_is_set():
@@ -569,12 +569,12 @@ def test_hook_correct_region():
             dataproc_task = DataProcPySparkOperator(
                 task_id=TASK_ID,
                 main=MAIN_URI,
-                region=REGION
+                region=GCP_REGION
             )
 
             dataproc_task.execute(None)
             mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
-                                                                  REGION, mock.ANY)
+                                                                  GCP_REGION, mock.ANY)
 
     @staticmethod
     def test_dataproc_job_id_is_set():
@@ -594,12 +594,12 @@ def test_hook_correct_region():
         with patch(HOOK) as mock_hook:
             dataproc_task = DataProcSparkOperator(
                 task_id=TASK_ID,
-                region=REGION
+                region=GCP_REGION
             )
 
             dataproc_task.execute(None)
             mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
-                                                                  REGION, mock.ANY)
+                                                                  GCP_REGION, mock.ANY)
 
     @staticmethod
     def test_dataproc_job_id_is_set():
@@ -642,8 +642,8 @@ def test_workflow(self):
 
             dataproc_task = DataprocWorkflowTemplateInstantiateOperator(
                 task_id=TASK_ID,
-                project_id=PROJECT_ID,
-                region=REGION,
+                project_id=GCP_PROJECT_ID,
+                region=GCP_REGION,
                 template_id=TEMPLATE_ID,
                 dag=self.dag
             )
@@ -694,7 +694,7 @@ def test_iniline_workflow(self):
                         "cluster_name": CLUSTER_NAME,
                         "config": {
                             "gce_cluster_config": {
-                                "zone_uri": ZONE,
+                                "zone_uri": GCE_ZONE,
                             }
                         }
                     }
@@ -710,8 +710,8 @@ def test_iniline_workflow(self):
 
             dataproc_task = DataprocWorkflowTemplateInstantiateInlineOperator(
                 task_id=TASK_ID,
-                project_id=PROJECT_ID,
-                region=REGION,
+                project_id=GCP_PROJECT_ID,
+                region=GCP_REGION,
                 template=template,
                 dag=self.dag
             )
diff --git a/tests/contrib/operators/test_gcp_base.py b/tests/contrib/operators/test_gcp_base.py
deleted file mode 100644
index 2df3e39ebd..0000000000
--- a/tests/contrib/operators/test_gcp_base.py
+++ /dev/null
@@ -1,176 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import json
-import os
-import subprocess
-import unittest
-
-from airflow import models, settings, configuration, AirflowException
-from airflow.models.connection import Connection
-from airflow.utils.timezone import datetime
-
-DEFAULT_DATE = datetime(2015, 1, 1)
-
-KEYPATH_EXTRA = 'extra__google_cloud_platform__key_path'
-KEYFILE_DICT_EXTRA = 'extra__google_cloud_platform__keyfile_dict'
-SCOPE_EXTRA = 'extra__google_cloud_platform__scope'
-PROJECT_EXTRA = 'extra__google_cloud_platform__project'
-
-AIRFLOW_MAIN_FOLDER = os.path.realpath(os.path.join(
-    os.path.dirname(os.path.realpath(__file__)),
-    os.pardir, os.pardir, os.pardir))
-
-CONTRIB_OPERATORS_EXAMPLES_DAG_FOLDER = os.path.join(
-    AIRFLOW_MAIN_FOLDER, "airflow", "contrib", "example_dags")
-
-OPERATORS_EXAMPLES_DAG_FOLDER = os.path.join(
-    AIRFLOW_MAIN_FOLDER, "airflow", "example_dags")
-
-TESTS_DAG_FOLDER = os.path.join(
-    AIRFLOW_MAIN_FOLDER, "tests", "dags")
-
-GCP_FOLDER_ENVIRONMENT_VARIABLE = "GCP_SERVICE_ACCOUNT_KEY_FOLDER"
-
-GCP_COMPUTE_KEY = 'gcp_compute.json'
-GCP_FUNCTION_KEY = 'gcp_function.json'
-GCP_CLOUDSQL_KEY = 'gcp_cloudsql.json'
-GCP_BIGTABLE_KEY = 'gcp_bigtable.json'
-GCP_SPANNER_KEY = 'gcp_spanner.json'
-GCP_GCS_KEY = 'gcp_gcs.json'
-
-SKIP_TEST_WARNING = """
-The test is only run when there is GCP connection available! "
-Set GCP_SERVICE_ACCOUNT_KEY_FOLDER environment variable if "
-you want to run them".
-"""
-
-
-class BaseGcpIntegrationTestCase(unittest.TestCase):
-    def __init__(self,
-                 method_name,
-                 dag_id,
-                 gcp_key,
-                 dag_name=None,
-                 example_dags_folder=CONTRIB_OPERATORS_EXAMPLES_DAG_FOLDER,
-                 project_extra=None):
-        super(BaseGcpIntegrationTestCase, self).__init__(method_name)
-        self.dag_id = dag_id
-        self.dag_name = self.dag_id + '.py' if not dag_name else dag_name
-        self.gcp_key = gcp_key
-        self.example_dags_folder = example_dags_folder
-        self.project_extra = project_extra
-        self.full_key_path = None
-
-    def _gcp_authenticate(self):
-        key_dir_path = os.environ['GCP_SERVICE_ACCOUNT_KEY_FOLDER']
-        self.full_key_path = os.path.join(key_dir_path, self.gcp_key)
-
-        if not os.path.isfile(self.full_key_path):
-            raise Exception("The key {} could not be found. Please copy it to the "
-                            "{} folder.".format(self.gcp_key, key_dir_path))
-        print("Setting the GCP key to {}".format(self.full_key_path))
-        # Checking if we can authenticate using service account credentials provided
-        retcode = subprocess.call(['gcloud', 'auth', 'activate-service-account',
-                                   '--key-file={}'.format(self.full_key_path)])
-        if retcode != 0:
-            raise AirflowException("The gcloud auth method was not successful!")
-        self.update_connection_with_key_path()
-        # Now we revoke all authentication here because we want to make sure
-        # that all works fine with the credentials retrieved from the gcp_connection
-        subprocess.call(['gcloud', 'auth', 'revoke'])
-
-    def update_connection_with_key_path(self):
-        session = settings.Session()
-        try:
-            conn = session.query(Connection).filter(
-                Connection.conn_id == 'google_cloud_default')[0]
-            extras = conn.extra_dejson
-            extras[KEYPATH_EXTRA] = self.full_key_path
-            if extras.get(KEYFILE_DICT_EXTRA):
-                del extras[KEYFILE_DICT_EXTRA]
-            extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
-            extras[PROJECT_EXTRA] = self.project_extra
-            conn.extra = json.dumps(extras)
-            session.commit()
-        except BaseException as e:
-            print('Airflow DB Session error:' + str(e.message))
-            session.rollback()
-            raise
-        finally:
-            session.close()
-
-    def update_connection_with_dictionary(self):
-        session = settings.Session()
-        try:
-            conn = session.query(Connection).filter(
-                Connection.conn_id == 'google_cloud_default')[0]
-            extras = conn.extra_dejson
-            with open(self.full_key_path, "r") as f:
-                content = json.load(f)
-            extras[KEYFILE_DICT_EXTRA] = json.dumps(content)
-            if extras.get(KEYPATH_EXTRA):
-                del extras[KEYPATH_EXTRA]
-            extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
-            extras[PROJECT_EXTRA] = self.project_extra
-            conn.extra = json.dumps(extras)
-            session.commit()
-        except BaseException as e:
-            print('Airflow DB Session error:' + str(e.message))
-            session.rollback()
-            raise
-        finally:
-            session.close()
-
-    def _symlink_dag(self):
-        target_path = os.path.join(TESTS_DAG_FOLDER, self.dag_name)
-        if os.path.exists(target_path):
-            os.remove(target_path)
-        os.symlink(
-            os.path.join(self.example_dags_folder, self.dag_name),
-            os.path.join(target_path))
-
-    def _rm_symlink_dag(self):
-        os.remove(os.path.join(TESTS_DAG_FOLDER, self.dag_name))
-
-    def _run_dag(self):
-        dag_bag = models.DagBag(dag_folder=TESTS_DAG_FOLDER, include_examples=False)
-        self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
-        dag = dag_bag.get_dag(self.dag_id)
-        dag.clear(reset_dag_runs=True)
-        dag.run(ignore_first_depends_on_past=True, verbose=True)
-
-    def setUp(self):
-        configuration.conf.load_test_config()
-        self._gcp_authenticate()
-        self._symlink_dag()
-
-    def tearDown(self):
-        self._rm_symlink_dag()
-
-    @staticmethod
-    def skip_check(key):
-        if GCP_FOLDER_ENVIRONMENT_VARIABLE not in os.environ:
-            return True
-        key_folder = os.environ[GCP_FOLDER_ENVIRONMENT_VARIABLE]
-        if not os.path.isdir(key_folder):
-            return True
-        key_path = os.path.join(key_folder, key)
-        if not os.path.isfile(key_path):
-            return True
-        return False
diff --git a/tests/contrib/operators/test_gcp_bigtable_operator_system.py b/tests/contrib/operators/test_gcp_bigtable_operator_system.py
new file mode 100644
index 0000000000..525da7776c
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_bigtable_operator_system.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from tests.contrib.operators.test_gcp_bigtable_operator_system_helper import \
+    GCPBigtableTestHelper
+from tests.contrib.utils.base_gcp_system_test_case import \
+    SKIP_TEST_WARNING, DagGcpSystemTestCase
+from tests.contrib.utils.gcp_authenticator import GCP_BIGTABLE_KEY
+
+
+@unittest.skipIf(DagGcpSystemTestCase.skip_check(GCP_BIGTABLE_KEY), SKIP_TEST_WARNING)
+class BigTableExampleDagsSystemTest(DagGcpSystemTestCase):
+    def __init__(self, method_name='runTest'):
+        super(BigTableExampleDagsSystemTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_bigtable_operators',
+            require_local_executor=True,
+            gcp_key=GCP_BIGTABLE_KEY)
+        self.helper = GCPBigtableTestHelper()
+
+    def test_run_example_dag_gcs_bigtable(self):
+        self._run_dag()
+
+    def tearDown(self):
+        self.gcp_authenticator.gcp_authenticate()
+        try:
+            self.helper.delete_instance()
+        finally:
+            self.gcp_authenticator.gcp_revoke_authentication()
+        super(BigTableExampleDagsSystemTest, self).tearDown()
diff --git a/tests/contrib/operators/test_gcp_bigtable_operator_system_helper.py b/tests/contrib/operators/test_gcp_bigtable_operator_system_helper.py
new file mode 100755
index 0000000000..68d579d3d8
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_bigtable_operator_system_helper.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+import argparse
+
+from tests.contrib.utils.gcp_authenticator import GcpAuthenticator, GCP_SPANNER_KEY
+from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+CBT_INSTANCE = os.environ.get('CBT_INSTANCE_ID', 'testinstance')
+
+
+class GCPBigtableTestHelper(LoggingCommandExecutor):
+
+    def delete_instance(self):
+        self.execute_cmd([
+            'gcloud', 'bigtable', '--project', GCP_PROJECT_ID,
+            '--quiet', '--verbosity=none',
+            'instances', 'delete', CBT_INSTANCE
+        ])
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(
+        description='Create or delete spanner instances for system tests.')
+    parser.add_argument('--action', dest='action', required=True,
+                        choices=('delete-instance',
+                                 'before-tests', 'after-tests'))
+    action = parser.parse_args().action
+
+    helper = GCPBigtableTestHelper()
+    gcp_authenticator = GcpAuthenticator(GCP_SPANNER_KEY)
+    helper.log.info('Starting action: {}'.format(action))
+
+    gcp_authenticator.gcp_store_authentication()
+    try:
+        gcp_authenticator.gcp_authenticate()
+        if action == 'before-tests':
+            pass
+        elif action == 'after-tests':
+            pass
+        elif action == 'delete-instance':
+            helper.delete_instance()
+        else:
+            raise Exception("Unknown action: {}".format(action))
+    finally:
+        gcp_authenticator.gcp_restore_authentication()
+
+    helper.log.info('Finishing action: {}'.format(action))
diff --git a/tests/contrib/operators/test_gcp_compute_operator.py b/tests/contrib/operators/test_gcp_compute_operator.py
index 4a4e336b7c..a0907b4a57 100644
--- a/tests/contrib/operators/test_gcp_compute_operator.py
+++ b/tests/contrib/operators/test_gcp_compute_operator.py
@@ -41,12 +41,12 @@
 
 EMPTY_CONTENT = ''.encode('utf8')
 
-PROJECT_ID = 'project-id'
-ZONE = 'zone'
+GCP_PROJECT_ID = 'project-id'
+GCE_ZONE = 'zone'
 RESOURCE_ID = 'resource-id'
-SHORT_MACHINE_TYPE_NAME = 'n1-machine-type'
+GCE_SHORT_MACHINE_TYPE_NAME = 'n1-machine-type'
 SET_MACHINE_TYPE_BODY = {
-    'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, SHORT_MACHINE_TYPE_NAME)
+    'machineType': 'zones/{}/machineTypes/{}'.format(GCE_ZONE, GCE_SHORT_MACHINE_TYPE_NAME)
 }
 
 DEFAULT_DATE = timezone.datetime(2017, 1, 1)
@@ -57,8 +57,8 @@ class GceInstanceStartTest(unittest.TestCase):
     def test_instance_start(self, mock_hook):
         mock_hook.return_value.start_instance.return_value = True
         op = GceInstanceStartOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=RESOURCE_ID,
             task_id='id'
         )
@@ -66,14 +66,14 @@ def test_instance_start(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.start_instance.assert_called_once_with(
-            PROJECT_ID, ZONE, RESOURCE_ID
+            GCP_PROJECT_ID, GCE_ZONE, RESOURCE_ID
         )
         self.assertTrue(result)
 
     # Setting all of the operator's input parameters as template dag_ids
     # (could be anything else) just to test if the templating works for all fields
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
-    def test_instance_start_with_templates(self, mock_hook):
+    def test_instance_start_with_templates(self, _):
         dag_id = 'test_dag_id'
         configuration.load_test_config()
         args = {
@@ -102,7 +102,7 @@ def test_start_should_throw_ex_when_missing_project_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStartOperator(
                 project_id="",
-                zone=ZONE,
+                zone=GCE_ZONE,
                 resource_id=RESOURCE_ID,
                 task_id='id'
             )
@@ -115,7 +115,7 @@ def test_start_should_throw_ex_when_missing_project_id(self, mock_hook):
     def test_start_should_throw_ex_when_missing_zone(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStartOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 zone="",
                 resource_id=RESOURCE_ID,
                 task_id='id'
@@ -129,8 +129,8 @@ def test_start_should_throw_ex_when_missing_zone(self, mock_hook):
     def test_start_should_throw_ex_when_missing_resource_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStartOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id="",
                 task_id='id'
             )
@@ -145,8 +145,8 @@ class GceInstanceStopTest(unittest.TestCase):
     def test_instance_stop(self, mock_hook):
         mock_hook.return_value.stop_instance.return_value = True
         op = GceInstanceStopOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=RESOURCE_ID,
             task_id='id'
         )
@@ -154,14 +154,14 @@ def test_instance_stop(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.stop_instance.assert_called_once_with(
-            PROJECT_ID, ZONE, RESOURCE_ID
+            GCP_PROJECT_ID, GCE_ZONE, RESOURCE_ID
         )
         self.assertTrue(result)
 
     # Setting all of the operator's input parameters as templated dag_ids
     # (could be anything else) just to test if the templating works for all fields
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
-    def test_instance_stop_with_templates(self, mock_hook):
+    def test_instance_stop_with_templates(self, _):
         dag_id = 'test_dag_id'
         configuration.load_test_config()
         args = {
@@ -190,7 +190,7 @@ def test_stop_should_throw_ex_when_missing_project_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStopOperator(
                 project_id="",
-                zone=ZONE,
+                zone=GCE_ZONE,
                 resource_id=RESOURCE_ID,
                 task_id='id'
             )
@@ -203,7 +203,7 @@ def test_stop_should_throw_ex_when_missing_project_id(self, mock_hook):
     def test_stop_should_throw_ex_when_missing_zone(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStopOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 zone="",
                 resource_id=RESOURCE_ID,
                 task_id='id'
@@ -217,8 +217,8 @@ def test_stop_should_throw_ex_when_missing_zone(self, mock_hook):
     def test_stop_should_throw_ex_when_missing_resource_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStopOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id="",
                 task_id='id'
             )
@@ -233,8 +233,8 @@ class GceInstanceSetMachineTypeTest(unittest.TestCase):
     def test_set_machine_type(self, mock_hook):
         mock_hook.return_value.set_machine_type.return_value = True
         op = GceSetMachineTypeOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=RESOURCE_ID,
             body=SET_MACHINE_TYPE_BODY,
             task_id='id'
@@ -243,14 +243,14 @@ def test_set_machine_type(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.set_machine_type.assert_called_once_with(
-            PROJECT_ID, ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY
+            GCP_PROJECT_ID, GCE_ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY
         )
         self.assertTrue(result)
 
     # Setting all of the operator's input parameters as templated dag_ids
     # (could be anything else) just to test if the templating works for all fields
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
-    def test_set_machine_type_with_templates(self, mock_hook):
+    def test_set_machine_type_with_templates(self, _):
         dag_id = 'test_dag_id'
         configuration.load_test_config()
         args = {
@@ -280,7 +280,7 @@ def test_set_machine_type_should_throw_ex_when_missing_project_id(self, mock_hoo
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
                 project_id="",
-                zone=ZONE,
+                zone=GCE_ZONE,
                 resource_id=RESOURCE_ID,
                 body=SET_MACHINE_TYPE_BODY,
                 task_id='id'
@@ -294,7 +294,7 @@ def test_set_machine_type_should_throw_ex_when_missing_project_id(self, mock_hoo
     def test_set_machine_type_should_throw_ex_when_missing_zone(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 zone="",
                 resource_id=RESOURCE_ID,
                 body=SET_MACHINE_TYPE_BODY,
@@ -309,8 +309,8 @@ def test_set_machine_type_should_throw_ex_when_missing_zone(self, mock_hook):
     def test_set_machine_type_should_throw_ex_when_missing_resource_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id="",
                 body=SET_MACHINE_TYPE_BODY,
                 task_id='id'
@@ -324,8 +324,8 @@ def test_set_machine_type_should_throw_ex_when_missing_resource_id(self, mock_ho
     def test_set_machine_type_should_throw_ex_when_missing_machine_type(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id=RESOURCE_ID,
                 body={},
                 task_id='id'
@@ -367,11 +367,12 @@ def test_set_machine_type_should_handle_and_trim_gce_error(
             self, get_conn, _execute_set_machine_type, _check_zone_operation_status):
         get_conn.return_value = {}
         _execute_set_machine_type.return_value = {"name": "test-operation"}
-        _check_zone_operation_status.return_value = ast.literal_eval(self.MOCK_OP_RESPONSE)
+        _check_zone_operation_status.return_value = ast.literal_eval(
+            self.MOCK_OP_RESPONSE)
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id=RESOURCE_ID,
                 body=SET_MACHINE_TYPE_BODY,
                 task_id='id'
@@ -379,9 +380,9 @@ def test_set_machine_type_should_handle_and_trim_gce_error(
             op.execute(None)
         err = cm.exception
         _check_zone_operation_status.assert_called_once_with(
-            {}, "test-operation", PROJECT_ID, ZONE)
+            {}, "test-operation", GCP_PROJECT_ID, GCE_ZONE)
         _execute_set_machine_type.assert_called_once_with(
-            PROJECT_ID, ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY)
+            GCP_PROJECT_ID, GCE_ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY)
         # Checking the full message was sometimes failing due to different order
         # of keys in the serialized JSON
         self.assertIn("400 BAD REQUEST: {", str(err))  # checking the square bracket trim
@@ -490,7 +491,7 @@ def test_successful_copy_template(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME}
@@ -499,7 +500,7 @@ def test_successful_copy_template(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=GCE_INSTANCE_TEMPLATE_BODY_INSERT,
             request_id=None
         )
@@ -511,7 +512,7 @@ def test_idempotent_copy_template_when_already_copied(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME}
@@ -530,7 +531,7 @@ def test_successful_copy_template_with_request_id(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
             task_id='id',
@@ -540,7 +541,7 @@ def test_successful_copy_template_with_request_id(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=GCE_INSTANCE_TEMPLATE_BODY_INSERT,
             request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
         )
@@ -554,7 +555,7 @@ def test_successful_copy_template_with_description_fields(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
             task_id='id',
@@ -568,7 +569,7 @@ def test_successful_copy_template_with_description_fields(self, mock_hook):
         body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
         body_insert["description"] = "New description"
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=body_insert,
             request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
         )
@@ -582,7 +583,7 @@ def test_copy_with_some_validation_warnings(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
@@ -598,7 +599,7 @@ def test_copy_with_some_validation_warnings(self, mock_hook):
         body_insert["some_wrong_field"] = "test"
         body_insert["properties"]["some_other_wrong_field"] = "test"
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=body_insert,
             request_id=None,
         )
@@ -612,7 +613,7 @@ def test_successful_copy_template_with_updated_nested_fields(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={
@@ -628,7 +629,7 @@ def test_successful_copy_template_with_updated_nested_fields(self, mock_hook):
         body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
         body_insert["properties"]["machineType"] = "n1-standard-2"
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=body_insert,
             request_id=None
         )
@@ -642,7 +643,7 @@ def test_successful_copy_template_with_smaller_array_fields(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={
@@ -681,7 +682,7 @@ def test_successful_copy_template_with_smaller_array_fields(self, mock_hook):
             }
         ]
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=body_insert,
             request_id=None
         )
@@ -695,7 +696,7 @@ def test_successful_copy_template_with_bigger_array_fields(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={
@@ -742,7 +743,7 @@ def test_successful_copy_template_with_bigger_array_fields(self, mock_hook):
             }
         ]
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=body_insert,
             request_id=None,
         )
@@ -757,7 +758,7 @@ def test_missing_name(self, mock_hook):
         ]
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceTemplateCopyOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 resource_id=GCE_INSTANCE_TEMPLATE_NAME,
                 request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
                 task_id='id',
@@ -876,8 +877,8 @@ def test_successful_instance_group_update(self, mock_hook):
         mock_hook.return_value.get_instance_group_manager.return_value = \
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
@@ -887,8 +888,8 @@ def test_successful_instance_group_update(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='beta',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             body=GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH,
             request_id=None
@@ -902,8 +903,8 @@ def test_successful_instance_group_update_no_instance_template_field(self, mock_
         mock_hook.return_value.get_instance_group_manager.return_value = \
             instance_group_manager_no_template
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
@@ -916,8 +917,8 @@ def test_successful_instance_group_update_no_instance_template_field(self, mock_
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH)
         del expected_patch_no_instance_template['instanceTemplate']
         mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             body=expected_patch_no_instance_template,
             request_id=None
@@ -931,8 +932,8 @@ def test_successful_instance_group_update_no_versions_field(self, mock_hook):
         mock_hook.return_value.get_instance_group_manager.return_value = \
             instance_group_manager_no_versions
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
@@ -945,8 +946,8 @@ def test_successful_instance_group_update_no_versions_field(self, mock_hook):
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH)
         del expected_patch_no_versions['versions']
         mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             body=expected_patch_no_versions,
             request_id=None
@@ -958,8 +959,8 @@ def test_successful_instance_group_update_with_update_policy(self, mock_hook):
         mock_hook.return_value.get_instance_group_manager.return_value = \
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             update_policy=GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY,
@@ -974,8 +975,8 @@ def test_successful_instance_group_update_with_update_policy(self, mock_hook):
         expected_patch_with_update_policy['updatePolicy'] = \
             GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY
         mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             body=expected_patch_with_update_policy,
             request_id=None
@@ -987,8 +988,8 @@ def test_successful_instance_group_update_with_request_id(self, mock_hook):
         mock_hook.return_value.get_instance_group_manager.return_value = \
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
@@ -999,8 +1000,8 @@ def test_successful_instance_group_update_with_request_id(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='beta',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             body=GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH,
             request_id=GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID
@@ -1011,8 +1012,8 @@ def test_successful_instance_group_update_with_request_id(self, mock_hook):
     def test_try_to_use_api_v1(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             GceInstanceGroupManagerUpdateTemplateOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
                 task_id='id',
                 api_version='v1',
@@ -1027,8 +1028,8 @@ def test_try_to_use_non_existing_template(self, mock_hook):
         mock_hook.return_value.get_instance_group_manager.return_value = \
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             source_template=GCE_INSTANCE_TEMPLATE_NON_EXISTING_URL,
diff --git a/tests/contrib/operators/test_gcp_compute_operator_system.py b/tests/contrib/operators/test_gcp_compute_operator_system.py
new file mode 100644
index 0000000000..0054df99d7
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_compute_operator_system.py
@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from tests.contrib.utils.base_gcp_system_test_case import \
+    SKIP_TEST_WARNING, DagGcpSystemTestCase
+
+from tests.contrib.operators.test_gcp_compute_operator_system_helper import \
+    GCPComputeTestHelper
+from tests.contrib.utils.gcp_authenticator import GCP_COMPUTE_KEY
+
+
+@unittest.skipIf(DagGcpSystemTestCase.skip_check(GCP_COMPUTE_KEY), SKIP_TEST_WARNING)
+class GcpComputeExampleDagsSystemTest(DagGcpSystemTestCase):
+
+    def setUp(self):
+        super(GcpComputeExampleDagsSystemTest, self).setUp()
+        self.gcp_authenticator.gcp_authenticate()
+        self.helper.delete_instance()
+        self.helper.create_instance()
+        self.gcp_authenticator.gcp_revoke_authentication()
+
+    def tearDown(self):
+        self.gcp_authenticator.gcp_authenticate()
+        self.helper.delete_instance()
+        self.gcp_authenticator.gcp_revoke_authentication()
+        super(GcpComputeExampleDagsSystemTest, self).tearDown()
+
+    def __init__(self, method_name='runTest'):
+        super(GcpComputeExampleDagsSystemTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_compute',
+            gcp_key=GCP_COMPUTE_KEY)
+        self.helper = GCPComputeTestHelper()
+
+    def test_run_example_dag_compute(self):
+        self._run_dag()
+
+
+@unittest.skipIf(DagGcpSystemTestCase.skip_check(GCP_COMPUTE_KEY), SKIP_TEST_WARNING)
+class GcpComputeIgmExampleDagsSystemTest(DagGcpSystemTestCase):
+
+    def setUp(self):
+        super(GcpComputeIgmExampleDagsSystemTest, self).setUp()
+        self.gcp_authenticator.gcp_authenticate()
+        try:
+            self.helper.delete_instance_group_and_template(silent=True)
+            self.helper.create_instance_group_and_template()
+        finally:
+            self.gcp_authenticator.gcp_revoke_authentication()
+
+    def tearDown(self):
+        self.gcp_authenticator.gcp_authenticate()
+        try:
+            self.helper.delete_instance_group_and_template()
+        finally:
+            self.gcp_authenticator.gcp_revoke_authentication()
+        super(GcpComputeIgmExampleDagsSystemTest, self).tearDown()
+
+    def __init__(self, method_name='runTest'):
+        super(GcpComputeIgmExampleDagsSystemTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_compute_igm',
+            gcp_key=GCP_COMPUTE_KEY)
+        self.helper = GCPComputeTestHelper()
+
+    def test_run_example_dag_compute_igm(self):
+        self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_compute_operator_system_helper.py b/tests/contrib/operators/test_gcp_compute_operator_system_helper.py
new file mode 100755
index 0000000000..60de2381d1
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_compute_operator_system_helper.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import argparse
+import os
+
+from tests.contrib.utils.gcp_authenticator import GcpAuthenticator, GCP_COMPUTE_KEY
+from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
+
+GCE_INSTANCE = os.environ.get('GCE_INSTANCE', 'testinstance')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCE_INSTANCE_GROUP_MANAGER_NAME = os.environ.get('GCE_INSTANCE_GROUP_MANAGER_NAME',
+                                                 'instance-group-test')
+GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
+GCE_TEMPLATE_NAME = os.environ.get('GCE_TEMPLATE_NAME',
+                                   'instance-template-test')
+GCE_NEW_TEMPLATE_NAME = os.environ.get('GCE_NEW_TEMPLATE_NAME',
+                                       'instance-template-test-new')
+
+
+class GCPComputeTestHelper(LoggingCommandExecutor):
+
+    def delete_instance(self):
+        self.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', GCP_PROJECT_ID,
+            '--quiet', '--verbosity=none',
+            'instances', 'delete', GCE_INSTANCE, '--zone', GCE_ZONE,
+        ])
+
+    def create_instance(self):
+        self.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', GCP_PROJECT_ID, '--quiet',
+            'instances', 'create', GCE_INSTANCE,
+            '--zone', GCE_ZONE
+        ])
+
+    def delete_instance_group_and_template(self, silent=False):
+        self.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', GCP_PROJECT_ID,
+            '--quiet', '--verbosity=none',
+            'instance-groups', 'managed', 'delete', GCE_INSTANCE_GROUP_MANAGER_NAME,
+            '--zone', GCE_ZONE
+        ], silent=silent)
+        self.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', GCP_PROJECT_ID,
+            '--quiet', '--verbosity=none',
+            'instance-templates', 'delete', GCE_NEW_TEMPLATE_NAME
+        ], silent=silent)
+        self.execute_cmd([
+            'gcloud', 'beta', 'compute',
+            '--project', GCP_PROJECT_ID,
+            '--quiet', '--verbosity=none',
+            'instance-templates', 'delete', GCE_TEMPLATE_NAME
+        ], silent=silent)
+
+    def create_instance_group_and_template(self):
+        self.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', GCP_PROJECT_ID, '--quiet',
+            'instance-templates', 'create', GCE_TEMPLATE_NAME
+        ])
+        self.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', GCP_PROJECT_ID, '--quiet',
+            'instance-groups', 'managed', 'create', GCE_INSTANCE_GROUP_MANAGER_NAME,
+            '--template', GCE_TEMPLATE_NAME,
+            '--zone', GCE_ZONE, '--size=1'
+        ])
+        self.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', GCP_PROJECT_ID, '--quiet',
+            'instance-groups', 'managed', 'wait-until-stable',
+            GCE_INSTANCE_GROUP_MANAGER_NAME,
+            '--zone', GCE_ZONE
+        ])
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(
+        description='Create or delete GCE instances/instance groups for system tests.')
+    parser.add_argument('--action', dest='action', required=True,
+                        choices=('create-instance', 'delete-instance',
+                                 'create-instance-group', 'delete-instance-group',
+                                 'before-tests', 'after-tests'))
+    action = parser.parse_args().action
+
+    helper = GCPComputeTestHelper()
+    gcp_authenticator = GcpAuthenticator(GCP_COMPUTE_KEY)
+    helper.log.info('Starting action: {}'.format(action))
+
+    gcp_authenticator.gcp_store_authentication()
+    try:
+        gcp_authenticator.gcp_authenticate()
+        if action == 'before-tests':
+            pass
+        elif action == 'after-tests':
+            pass
+        elif action == 'create-instance':
+            helper.create_instance()
+        elif action == 'delete-instance':
+            helper.delete_instance()
+        elif action == 'create-instance-group':
+            helper.create_instance_group_and_template()
+        elif action == 'delete-instance-group':
+            helper.delete_instance_group_and_template()
+        else:
+            raise Exception("Unknown action: {}".format(action))
+    finally:
+        gcp_authenticator.gcp_restore_authentication()
+
+    helper.log.info('Finishing action: {}'.format(action))
diff --git a/tests/contrib/operators/test_gcp_container_operator.py b/tests/contrib/operators/test_gcp_container_operator.py
index 1685e9c6ac..600c7cd8a8 100644
--- a/tests/contrib/operators/test_gcp_container_operator.py
+++ b/tests/contrib/operators/test_gcp_container_operator.py
@@ -33,7 +33,7 @@
     except ImportError:
         mock = None
 
-PROJECT_ID = 'test-id'
+TEST_GCP_PROJECT_ID = 'test-id'
 PROJECT_LOCATION = 'test-location'
 PROJECT_TASK_ID = 'test-task-id'
 CLUSTER_NAME = 'test-cluster-name'
@@ -55,7 +55,7 @@ class GoogleCloudPlatformContainerOperatorTest(unittest.TestCase):
 
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_create_execute(self, mock_hook):
-        operator = GKEClusterCreateOperator(project_id=PROJECT_ID,
+        operator = GKEClusterCreateOperator(project_id=TEST_GCP_PROJECT_ID,
                                             location=PROJECT_LOCATION,
                                             body=PROJECT_BODY_CREATE,
                                             task_id=PROJECT_TASK_ID)
@@ -67,7 +67,7 @@ def test_create_execute(self, mock_hook):
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_create_execute_error_body(self, mock_hook):
         with self.assertRaises(AirflowException):
-            operator = GKEClusterCreateOperator(project_id=PROJECT_ID,
+            operator = GKEClusterCreateOperator(project_id=TEST_GCP_PROJECT_ID,
                                                 location=PROJECT_LOCATION,
                                                 body=None,
                                                 task_id=PROJECT_TASK_ID)
@@ -88,7 +88,7 @@ def test_create_execute_error_project_id(self, mock_hook):
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_create_execute_error_location(self, mock_hook):
         with self.assertRaises(AirflowException):
-            operator = GKEClusterCreateOperator(project_id=PROJECT_ID,
+            operator = GKEClusterCreateOperator(project_id=TEST_GCP_PROJECT_ID,
                                                 body=PROJECT_BODY,
                                                 task_id=PROJECT_TASK_ID)
 
@@ -97,7 +97,7 @@ def test_create_execute_error_location(self, mock_hook):
 
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_delete_execute(self, mock_hook):
-        operator = GKEClusterDeleteOperator(project_id=PROJECT_ID,
+        operator = GKEClusterDeleteOperator(project_id=TEST_GCP_PROJECT_ID,
                                             name=CLUSTER_NAME,
                                             location=PROJECT_LOCATION,
                                             task_id=PROJECT_TASK_ID)
@@ -118,7 +118,7 @@ def test_delete_execute_error_project_id(self, mock_hook):
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_delete_execute_error_cluster_name(self, mock_hook):
         with self.assertRaises(AirflowException):
-            operator = GKEClusterDeleteOperator(project_id=PROJECT_ID,
+            operator = GKEClusterDeleteOperator(project_id=TEST_GCP_PROJECT_ID,
                                                 location=PROJECT_LOCATION,
                                                 task_id=PROJECT_TASK_ID)
 
@@ -128,7 +128,7 @@ def test_delete_execute_error_cluster_name(self, mock_hook):
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_delete_execute_error_location(self, mock_hook):
         with self.assertRaises(AirflowException):
-            operator = GKEClusterDeleteOperator(project_id=PROJECT_ID,
+            operator = GKEClusterDeleteOperator(project_id=TEST_GCP_PROJECT_ID,
                                                 name=CLUSTER_NAME,
                                                 task_id=PROJECT_TASK_ID)
 
@@ -138,7 +138,7 @@ def test_delete_execute_error_location(self, mock_hook):
 
 class GKEPodOperatorTest(unittest.TestCase):
     def setUp(self):
-        self.gke_op = GKEPodOperator(project_id=PROJECT_ID,
+        self.gke_op = GKEPodOperator(project_id=TEST_GCP_PROJECT_ID,
                                      location=PROJECT_LOCATION,
                                      cluster_name=CLUSTER_NAME,
                                      task_id=PROJECT_TASK_ID,
@@ -167,7 +167,7 @@ def test_execute_conn_id_none(self, proc_mock, file_mock, exec_mock):
 
         # Assert the gcloud command being called correctly
         proc_mock.assert_called_with(
-            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, PROJECT_ID).split())
+            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, TEST_GCP_PROJECT_ID).split())
 
         self.assertEqual(self.gke_op.config_file, FILE_NAME)
 
@@ -197,7 +197,7 @@ def test_execute_conn_id_path(self, proc_mock, file_mock, exec_mock, get_con_moc
 
         # Assert the gcloud command being called correctly
         proc_mock.assert_called_with(
-            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, PROJECT_ID).split())
+            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, TEST_GCP_PROJECT_ID).split())
 
         self.assertEqual(self.gke_op.config_file, FILE_NAME)
 
@@ -232,7 +232,7 @@ def test_execute_conn_id_dict(self, proc_mock, file_mock, exec_mock, get_con_moc
 
         # Assert the gcloud command being called correctly
         proc_mock.assert_called_with(
-            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, PROJECT_ID).split())
+            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, TEST_GCP_PROJECT_ID).split())
 
         self.assertEqual(self.gke_op.config_file, FILE_NAME)
 
diff --git a/tests/contrib/operators/test_gcp_function_operator.py b/tests/contrib/operators/test_gcp_function_operator.py
index 46d599bf7d..3c568fce0c 100644
--- a/tests/contrib/operators/test_gcp_function_operator.py
+++ b/tests/contrib/operators/test_gcp_function_operator.py
@@ -41,20 +41,21 @@
 EMPTY_CONTENT = ''.encode('utf8')
 MOCK_RESP_404 = type('', (object,), {"status": 404})()
 
-PROJECT_ID = 'test_project_id'
-LOCATION = 'test_region'
-SOURCE_ARCHIVE_URL = 'gs://folder/file.zip'
-ENTRYPOINT = 'helloWorld'
-FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
-                                                               ENTRYPOINT)
-RUNTIME = 'nodejs6'
+GCP_PROJECT_ID = 'test_project_id'
+GCP_LOCATION = 'test_region'
+GCF_SOURCE_ARCHIVE_URL = 'gs://folder/file.zip'
+GCF_ENTRYPOINT = 'helloWorld'
+FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
+                                                               GCP_LOCATION,
+                                                               GCF_ENTRYPOINT)
+GCF_RUNTIME = 'nodejs6'
 VALID_RUNTIMES = ['nodejs6', 'nodejs8', 'python37']
 VALID_BODY = {
     "name": FUNCTION_NAME,
-    "entryPoint": ENTRYPOINT,
-    "runtime": RUNTIME,
+    "entryPoint": GCF_ENTRYPOINT,
+    "runtime": GCF_RUNTIME,
     "httpsTrigger": {},
-    "sourceArchiveUrl": SOURCE_ARCHIVE_URL
+    "sourceArchiveUrl": GCF_SOURCE_ARCHIVE_URL
 }
 
 
@@ -100,8 +101,8 @@ def test_deploy_execute(self, mock_hook):
             side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found'))
         mock_hook.return_value.create_new_function.return_value = True
         op = GcfFunctionDeployOperator(
-            project_id=PROJECT_ID,
-            location=LOCATION,
+            project_id=GCP_PROJECT_ID,
+            location=GCP_LOCATION,
             body=deepcopy(VALID_BODY),
             task_id="id"
         )
@@ -125,8 +126,8 @@ def test_update_function_if_exists(self, mock_hook):
         mock_hook.return_value.get_function.return_value = True
         mock_hook.return_value.update_function.return_value = True
         op = GcfFunctionDeployOperator(
-            project_id=PROJECT_ID,
-            location=LOCATION,
+            project_id=GCP_PROJECT_ID,
+            location=GCP_LOCATION,
             body=deepcopy(VALID_BODY),
             task_id="id"
         )
@@ -192,7 +193,6 @@ def test_empty_body(self, mock_hook):
     ])
     @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook')
     def test_correct_runtime_field(self, runtime, mock_hook):
-        mock_hook.return_value.list_functions.return_value = []
         mock_hook.return_value.create_new_function.return_value = True
         body = deepcopy(VALID_BODY)
         body['runtime'] = runtime
@@ -217,7 +217,6 @@ def test_correct_runtime_field(self, runtime, mock_hook):
     ])
     @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook')
     def test_valid_network_field(self, network, mock_hook):
-        mock_hook.return_value.list_functions.return_value = []
         mock_hook.return_value.create_new_function.return_value = True
         body = deepcopy(VALID_BODY)
         body['network'] = network
@@ -241,7 +240,6 @@ def test_valid_network_field(self, network, mock_hook):
     ])
     @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook')
     def test_valid_labels_field(self, labels, mock_hook):
-        mock_hook.return_value.list_functions.return_value = []
         mock_hook.return_value.create_new_function.return_value = True
         body = deepcopy(VALID_BODY)
         body['labels'] = labels
@@ -258,7 +256,6 @@ def test_valid_labels_field(self, labels, mock_hook):
 
     @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook')
     def test_validation_disabled(self, mock_hook):
-        mock_hook.return_value.list_functions.return_value = []
         mock_hook.return_value.create_new_function.return_value = True
         body = {
             "name": "function_name",
@@ -278,7 +275,6 @@ def test_validation_disabled(self, mock_hook):
 
     @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook')
     def test_body_validation_simple(self, mock_hook):
-        mock_hook.return_value.list_functions.return_value = []
         mock_hook.return_value.create_new_function.return_value = True
         body = deepcopy(VALID_BODY)
         body['name'] = ''
@@ -315,7 +311,6 @@ def test_body_validation_simple(self, mock_hook):
     ])
     @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook')
     def test_invalid_field_values(self, key, value, message, mock_hook):
-        mock_hook.return_value.list_functions.return_value = []
         mock_hook.return_value.create_new_function.return_value = True
         body = deepcopy(VALID_BODY)
         body[key] = value
@@ -521,7 +516,6 @@ def test_valid_trigger_union_field(self, trigger, mock_hook):
 
     @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook')
     def test_extra_parameter(self, mock_hook):
-        mock_hook.return_value.list_functions.return_value = []
         mock_hook.return_value.create_new_function.return_value = True
         body = deepcopy(VALID_BODY)
         body['extra_parameter'] = 'extra'
diff --git a/tests/contrib/operators/test_gcp_function_operator_system.py b/tests/contrib/operators/test_gcp_function_operator_system.py
new file mode 100644
index 0000000000..2dc24de00a
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_function_operator_system.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from tests.contrib.utils.base_gcp_system_test_case import \
+    SKIP_TEST_WARNING, DagGcpSystemTestCase
+from tests.contrib.utils.gcp_authenticator import GCP_FUNCTION_KEY
+
+
+@unittest.skipIf(
+    DagGcpSystemTestCase.skip_check(GCP_FUNCTION_KEY), SKIP_TEST_WARNING)
+class GcpFunctionExampleDagsSystemTest(DagGcpSystemTestCase):
+    def __init__(self, method_name='runTest'):
+        super(GcpFunctionExampleDagsSystemTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_function',
+            gcp_key=GCP_FUNCTION_KEY)
+
+    def test_run_example_dag_function(self):
+        self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_spanner_operator.py b/tests/contrib/operators/test_gcp_spanner_operator.py
index 525f796471..455a6543c8 100644
--- a/tests/contrib/operators/test_gcp_spanner_operator.py
+++ b/tests/contrib/operators/test_gcp_spanner_operator.py
@@ -22,11 +22,12 @@
 
 from airflow import AirflowException
 from airflow.contrib.operators.gcp_spanner_operator import \
-    CloudSpannerInstanceDeployOperator, CloudSpannerInstanceDeleteOperator, \
-    CloudSpannerInstanceDatabaseQueryOperator, CloudSpannerInstanceDatabaseDeployOperator, \
-    CloudSpannerInstanceDatabaseDeleteOperator, CloudSpannerInstanceDatabaseUpdateOperator
-from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
-    SKIP_TEST_WARNING, GCP_SPANNER_KEY
+    CloudSpannerInstanceDeployOperator, \
+    CloudSpannerInstanceDeleteOperator, \
+    CloudSpannerInstanceDatabaseQueryOperator, \
+    CloudSpannerInstanceDatabaseDeployOperator, \
+    CloudSpannerInstanceDatabaseDeleteOperator, \
+    CloudSpannerInstanceDatabaseUpdateOperator
 
 try:
     # noinspection PyProtectedMember
@@ -45,8 +46,8 @@
 DISPLAY_NAME = 'Test Instance'
 INSERT_QUERY = "INSERT my_table1 (id, name) VALUES (1, 'One')"
 INSERT_QUERY_2 = "INSERT my_table2 (id, name) VALUES (1, 'One')"
-CREATE_QUERY = "CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)"
-CREATE_QUERY_2 = "CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)"
+CREATE_QUERY = "CREATE TABLE my_table1 (id INT64, name STRING(100)) PRIMARY KEY (id)"
+CREATE_QUERY_2 = "CREATE TABLE my_table2 (id INT64, name STRING(100)) PRIMARY KEY (id)"
 DDL_STATEMENTS = [CREATE_QUERY, CREATE_QUERY_2]
 
 
@@ -405,16 +406,3 @@ def test_database_delete_ex_if_param_missing(self, project_id, instance_id,
         err = cm.exception
         self.assertIn("The required parameter '{}' is empty".format(exp_msg), str(err))
         mock_hook.assert_not_called()
-
-
-@unittest.skipIf(
-    BaseGcpIntegrationTestCase.skip_check(GCP_SPANNER_KEY), SKIP_TEST_WARNING)
-class CloudSpannerExampleDagsTest(BaseGcpIntegrationTestCase):
-    def __init__(self, method_name='runTest'):
-        super(CloudSpannerExampleDagsTest, self).__init__(
-            method_name,
-            dag_id='example_gcp_spanner',
-            gcp_key=GCP_SPANNER_KEY)
-
-    def test_run_example_dag_cloudsql_query(self):
-        self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_spanner_operator_system.py b/tests/contrib/operators/test_gcp_spanner_operator_system.py
new file mode 100644
index 0000000000..fd4c50bd8a
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_spanner_operator_system.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from tests.contrib.operators.test_gcp_spanner_operator_system_helper import \
+    GCPSpannerTestHelper
+from tests.contrib.utils.base_gcp_system_test_case import \
+    SKIP_TEST_WARNING, DagGcpSystemTestCase
+from tests.contrib.utils.gcp_authenticator import GCP_SPANNER_KEY
+
+
+@unittest.skipIf(DagGcpSystemTestCase.skip_check(GCP_SPANNER_KEY), SKIP_TEST_WARNING)
+class CloudSpannerExampleDagsTest(DagGcpSystemTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudSpannerExampleDagsTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_spanner',
+            gcp_key=GCP_SPANNER_KEY)
+        self.helper = GCPSpannerTestHelper()
+
+    def tearDown(self):
+        self.gcp_authenticator.gcp_authenticate()
+        try:
+            self.helper.delete_instance()
+        finally:
+            self.gcp_authenticator.gcp_revoke_authentication()
+        super(CloudSpannerExampleDagsTest, self).tearDown()
+
+    def test_run_example_dag_spanner(self):
+        self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_spanner_operator_system_helper.py b/tests/contrib/operators/test_gcp_spanner_operator_system_helper.py
new file mode 100755
index 0000000000..b715bcdbcb
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_spanner_operator_system_helper.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+import argparse
+
+from tests.contrib.utils.gcp_authenticator import GcpAuthenticator, GCP_SPANNER_KEY
+from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
+
+
+class GCPSpannerTestHelper(LoggingCommandExecutor):
+
+    def delete_instance(self):
+        self.execute_cmd([
+            'gcloud', 'spanner', '--project', GCP_PROJECT_ID,
+            '--quiet', '--verbosity=none',
+            'instances', 'delete', GCP_SPANNER_INSTANCE_ID
+        ])
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(
+        description='Create or delete spanner instances for system tests.')
+    parser.add_argument('--action', dest='action', required=True,
+                        choices=('delete-instance',
+                                 'before-tests', 'after-tests'))
+    action = parser.parse_args().action
+
+    helper = GCPSpannerTestHelper()
+    gcp_authenticator = GcpAuthenticator(GCP_SPANNER_KEY)
+    helper.log.info('Starting action: {}'.format(action))
+
+    gcp_authenticator.gcp_store_authentication()
+    try:
+        gcp_authenticator.gcp_authenticate()
+        if action == 'before-tests':
+            pass
+        elif action == 'after-tests':
+            pass
+        elif action == 'delete-instance':
+            helper.delete_instance()
+        else:
+            raise Exception("Unknown action: {}".format(action))
+    finally:
+        gcp_authenticator.gcp_restore_authentication()
+
+    helper.log.info('Finishing action: {}'.format(action))
diff --git a/tests/contrib/operators/test_gcp_sql_operator.py b/tests/contrib/operators/test_gcp_sql_operator.py
index 646b2e771f..39eecca619 100644
--- a/tests/contrib/operators/test_gcp_sql_operator.py
+++ b/tests/contrib/operators/test_gcp_sql_operator.py
@@ -20,20 +20,15 @@
 import os
 import unittest
 
-import time
 from parameterized import parameterized
-from uuid import uuid1
 
 from airflow import AirflowException
-from airflow.contrib.hooks.gcp_sql_hook import CloudSqlProxyRunner
 from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
     CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \
     CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \
     CloudSqlInstanceExportOperator, CloudSqlInstanceImportOperator, \
     CloudSqlInstanceDatabaseDeleteOperator, CloudSqlQueryOperator
 from airflow.models.connection import Connection
-from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
-    GCP_CLOUDSQL_KEY, SKIP_TEST_WARNING
 
 try:
     # noinspection PyProtectedMember
@@ -150,7 +145,7 @@
             "schemaOnly": False
         },
         "csvExportOptions": {
-            "selectQuery": "SELECT * FROM ..."
+            "selectQuery": "SELECT * FROM TABLE"
         }
     }
 }
@@ -546,6 +541,13 @@ def test_instance_import(self, mock_hook):
 
 
 class CloudSqlQueryValidationTest(unittest.TestCase):
+
+    @staticmethod
+    def _setup_connections(get_connections, uri):
+        cloudsql_connection = Connection()
+        cloudsql_connection.parse_from_uri(uri)
+        get_connections.side_effect = [[cloudsql_connection]]
+
     @parameterized.expand([
         ('', 'location', 'instance_name', 'postgres', False, False,
          'SELECT * FROM TEST',
@@ -578,19 +580,18 @@ def test_create_operator_with_wrong_parameters(self,
                                                    sql,
                                                    message,
                                                    get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?"
-            "database_type={database_type}&"
-            "project_id={project_id}&location={location}&instance={instance_name}&"
-            "use_proxy={use_proxy}&use_ssl={use_ssl}".
-            format(database_type=database_type,
-                   project_id=project_id,
-                   location=location,
-                   instance_name=instance_name,
-                   use_proxy=use_proxy,
-                   use_ssl=use_ssl))
-        get_connections.return_value = [connection]
+        uri = \
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?" \
+            "database_type={database_type}&" \
+            "project_id={project_id}&location={location}&instance={instance_name}&" \
+            "use_proxy={use_proxy}&use_ssl={use_ssl}".format(
+                database_type=database_type,
+                project_id=project_id,
+                location=location,
+                instance_name=instance_name,
+                use_proxy=use_proxy,
+                use_ssl=use_ssl)
+        self._setup_connections(get_connections, uri)
         with self.assertRaises(AirflowException) as cm:
             op = CloudSqlQueryOperator(
                 sql=sql,
@@ -602,12 +603,10 @@ def test_create_operator_with_wrong_parameters(self,
 
     @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
     def test_create_operator_with_correct_parameters_postgres(self, get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&"
-            "project_id=example-project&location=europe-west1&instance=testdb&"
-            "use_proxy=False&use_ssl=False")
-        get_connections.return_value = [connection]
+        uri = "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&" \
+              "project_id=example-project&location=europe-west1&instance=testdb&" \
+              "use_proxy=False&use_ssl=False"
+        self._setup_connections(get_connections, uri)
         operator = CloudSqlQueryOperator(
             sql=['SELECT * FROM TABLE'],
             task_id='task_id'
@@ -625,13 +624,11 @@ def test_create_operator_with_correct_parameters_postgres(self, get_connections)
 
     @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
     def test_create_operator_with_correct_parameters_postgres_ssl(self, get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&"
-            "project_id=example-project&location=europe-west1&instance=testdb&"
-            "use_proxy=False&use_ssl=True&sslcert=/bin/bash&"
-            "sslkey=/bin/bash&sslrootcert=/bin/bash")
-        get_connections.return_value = [connection]
+        uri = "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&" \
+              "project_id=example-project&location=europe-west1&instance=testdb&" \
+              "use_proxy=False&use_ssl=True&sslcert=/bin/bash&" \
+              "sslkey=/bin/bash&sslrootcert=/bin/bash"
+        self._setup_connections(get_connections, uri)
         operator = CloudSqlQueryOperator(
             sql=['SELECT * FROM TABLE'],
             task_id='task_id'
@@ -653,12 +650,10 @@ def test_create_operator_with_correct_parameters_postgres_ssl(self, get_connecti
     @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
     def test_create_operator_with_correct_parameters_postgres_proxy_socket(
             self, get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&"
-            "project_id=example-project&location=europe-west1&instance=testdb&"
-            "use_proxy=True&sql_proxy_use_tcp=False")
-        get_connections.return_value = [connection]
+        uri = "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&" \
+              "project_id=example-project&location=europe-west1&instance=testdb&" \
+              "use_proxy=True&sql_proxy_use_tcp=False"
+        self._setup_connections(get_connections, uri)
         operator = CloudSqlQueryOperator(
             sql=['SELECT * FROM TABLE'],
             task_id='task_id'
@@ -678,12 +673,10 @@ def test_create_operator_with_correct_parameters_postgres_proxy_socket(
     @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
     def test_create_operator_with_correct_parameters_postgres_proxy_tcp(self,
                                                                         get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&"
-            "project_id=example-project&location=europe-west1&instance=testdb&"
-            "use_proxy=True&sql_proxy_use_tcp=True")
-        get_connections.return_value = [connection]
+        uri = "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&" \
+              "project_id=example-project&location=europe-west1&instance=testdb&" \
+              "use_proxy=True&sql_proxy_use_tcp=True"
+        self._setup_connections(get_connections, uri)
         operator = CloudSqlQueryOperator(
             sql=['SELECT * FROM TABLE'],
             task_id='task_id'
@@ -701,12 +694,10 @@ def test_create_operator_with_correct_parameters_postgres_proxy_tcp(self,
 
     @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
     def test_create_operator_with_correct_parameters_mysql(self, get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&"
-            "project_id=example-project&location=europe-west1&instance=testdb&"
-            "use_proxy=False&use_ssl=False")
-        get_connections.return_value = [connection]
+        uri = "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&" \
+            "project_id=example-project&location=europe-west1&instance=testdb&" \
+            "use_proxy=False&use_ssl=False"
+        self._setup_connections(get_connections, uri)
         operator = CloudSqlQueryOperator(
             sql=['SELECT * FROM TABLE'],
             task_id='task_id'
@@ -724,13 +715,11 @@ def test_create_operator_with_correct_parameters_mysql(self, get_connections):
 
     @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
     def test_create_operator_with_correct_parameters_mysql_ssl(self, get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&"
-            "project_id=example-project&location=europe-west1&instance=testdb&"
-            "use_proxy=False&use_ssl=True&sslcert=/bin/bash&"
-            "sslkey=/bin/bash&sslrootcert=/bin/bash")
-        get_connections.return_value = [connection]
+        uri = "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&" \
+              "project_id=example-project&location=europe-west1&instance=testdb&" \
+              "use_proxy=False&use_ssl=True&sslcert=/bin/bash&" \
+              "sslkey=/bin/bash&sslrootcert=/bin/bash"
+        self._setup_connections(get_connections, uri)
         operator = CloudSqlQueryOperator(
             sql=['SELECT * FROM TABLE'],
             task_id='task_id'
@@ -752,12 +741,10 @@ def test_create_operator_with_correct_parameters_mysql_ssl(self, get_connections
     @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
     def test_create_operator_with_correct_parameters_mysql_proxy_socket(self,
                                                                         get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&"
-            "project_id=example-project&location=europe-west1&instance=testdb&"
-            "use_proxy=True&sql_proxy_use_tcp=False")
-        get_connections.return_value = [connection]
+        uri = "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&" \
+              "project_id=example-project&location=europe-west1&instance=testdb&" \
+              "use_proxy=True&sql_proxy_use_tcp=False"
+        self._setup_connections(get_connections, uri)
         operator = CloudSqlQueryOperator(
             sql=['SELECT * FROM TABLE'],
             task_id='task_id'
@@ -778,12 +765,10 @@ def test_create_operator_with_correct_parameters_mysql_proxy_socket(self,
 
     @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
     def test_create_operator_with_correct_parameters_mysql_tcp(self, get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&"
-            "project_id=example-project&location=europe-west1&instance=testdb&"
-            "use_proxy=True&sql_proxy_use_tcp=True")
-        get_connections.return_value = [connection]
+        uri = "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&" \
+              "project_id=example-project&location=europe-west1&instance=testdb&" \
+              "use_proxy=True&sql_proxy_use_tcp=True"
+        self._setup_connections(get_connections, uri)
         operator = CloudSqlQueryOperator(
             sql=['SELECT * FROM TABLE'],
             task_id='task_id'
@@ -801,33 +786,30 @@ def test_create_operator_with_correct_parameters_mysql_tcp(self, get_connections
 
     @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
     def test_create_operator_with_too_long_unix_socket_path(self, get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&"
-            "project_id=example-project&location=europe-west1&"
-            "instance="
-            "test_db_with_long_name_a_bit_above_the_limit_of_UNIX_socket&"
-            "use_proxy=True&sql_proxy_use_tcp=False")
-        get_connections.return_value = [connection]
+        uri = "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&" \
+              "project_id=example-project&location=europe-west1&" \
+              "instance=" \
+              "test_db_with_long_name_a_bit_above" \
+              "_the_limit_of_UNIX_socket_asdadadasadasd&" \
+              "use_proxy=True&sql_proxy_use_tcp=False"
+        self._setup_connections(get_connections, uri)
+        operator = CloudSqlQueryOperator(
+            sql=['SELECT * FROM TABLE'],
+            task_id='task_id'
+        )
         with self.assertRaises(AirflowException) as cm:
-            op = CloudSqlQueryOperator(
-                sql=['SELECT * FROM TABLE'],
-                task_id='task_id'
-            )
-            op.execute(None)
+            operator.execute(None)
         err = cm.exception
         self.assertIn("The UNIX socket path length cannot exceed", str(err))
 
     @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
     def test_create_operator_with_not_too_long_unix_socket_path(self, get_connections):
-        connection = Connection()
-        connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&"
-            "project_id=example-project&location=europe-west1&"
-            "instance="
-            "test_db_with_longname_but_with_limit_of_UNIX_socket_aaaa&"
-            "use_proxy=True&sql_proxy_use_tcp=False")
-        get_connections.return_value = [connection]
+        uri = "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&" \
+              "project_id=example-project&location=europe-west1&" \
+              "instance=" \
+              "test_db_with_longname_but_with_limit_of_UNIX_socket&" \
+              "use_proxy=True&sql_proxy_use_tcp=False"
+        self._setup_connections(get_connections, uri)
         operator = CloudSqlQueryOperator(
             sql=['SELECT * FROM TABLE'],
             task_id='task_id'
@@ -873,89 +855,3 @@ def test_cloudsql_hook_delete_connection_on_exception(
         err = cm.exception
         self.assertEqual("Exception when running a query", str(err))
         delete_connection.assert_called_once_with()
-
-
-@unittest.skipIf(
-    BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
-class CloudSqlProxyIntegrationTest(BaseGcpIntegrationTestCase):
-    def __init__(self, method_name='runTest'):
-        super(CloudSqlProxyIntegrationTest, self).__init__(
-            method_name,
-            dag_id='example_gcp_sql_query',
-            gcp_key='gcp_cloudsql.json')
-
-    def test_start_proxy_fail_no_parameters(self):
-        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
-                                     project_id=PROJECT_ID,
-                                     instance_specification='a')
-        with self.assertRaises(AirflowException) as cm:
-            runner.start_proxy()
-        err = cm.exception
-        self.assertIn("invalid instance name", str(err))
-        with self.assertRaises(AirflowException) as cm:
-            runner.start_proxy()
-        err = cm.exception
-        self.assertIn("invalid instance name", str(err))
-        self.assertIsNone(runner.sql_proxy_process)
-
-    def test_start_proxy_with_all_instances(self):
-        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
-                                     project_id=PROJECT_ID,
-                                     instance_specification='')
-        try:
-            runner.start_proxy()
-            time.sleep(1)
-        finally:
-            runner.stop_proxy()
-        self.assertIsNone(runner.sql_proxy_process)
-
-    def test_start_proxy_with_all_instances_generated_credential_file(self):
-        self.update_connection_with_dictionary()
-        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
-                                     project_id=PROJECT_ID,
-                                     instance_specification='')
-        try:
-            runner.start_proxy()
-            time.sleep(1)
-        finally:
-            runner.stop_proxy()
-        self.assertIsNone(runner.sql_proxy_process)
-
-    def test_start_proxy_with_all_instances_specific_version(self):
-        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
-                                     project_id=PROJECT_ID,
-                                     instance_specification='',
-                                     sql_proxy_version='v1.13')
-        try:
-            runner.start_proxy()
-            time.sleep(1)
-        finally:
-            runner.stop_proxy()
-        self.assertIsNone(runner.sql_proxy_process)
-        self.assertEqual(runner.get_proxy_version(), "1.13")
-
-
-@unittest.skipIf(
-    BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
-class CloudSqlExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
-    def __init__(self, method_name='runTest'):
-        super(CloudSqlExampleDagsIntegrationTest, self).__init__(
-            method_name,
-            dag_id='example_gcp_sql',
-            gcp_key=GCP_CLOUDSQL_KEY)
-
-    def test_run_example_dag_cloudsql_query(self):
-        self._run_dag()
-
-
-@unittest.skipIf(
-    BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
-class CloudSqlQueryExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
-    def __init__(self, method_name='runTest'):
-        super(CloudSqlQueryExampleDagsIntegrationTest, self).__init__(
-            method_name,
-            dag_id='example_gcp_sql_query',
-            gcp_key=GCP_CLOUDSQL_KEY)
-
-    def test_run_example_dag_cloudsql_query(self):
-        self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_sql_operator_system.py b/tests/contrib/operators/test_gcp_sql_operator_system.py
new file mode 100644
index 0000000000..4fc7b56136
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_sql_operator_system.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import unittest
+
+from airflow import AirflowException
+from tests.contrib.utils.base_gcp_system_test_case import \
+    SKIP_TEST_WARNING, DagGcpSystemTestCase
+from tests.contrib.operators.test_gcp_sql_operator_system_helper import \
+    CloudSqlQueryTestHelper
+from tests.contrib.utils.gcp_authenticator import GCP_CLOUDSQL_KEY
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'project-id')
+
+SQL_QUERY_TEST_HELPER = CloudSqlQueryTestHelper()
+
+
+@unittest.skipIf(DagGcpSystemTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
+class CloudSqlExampleDagsIntegrationTest(DagGcpSystemTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudSqlExampleDagsIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_sql',
+            gcp_key=GCP_CLOUDSQL_KEY)
+
+    def tearDown(self):
+        # Delete instances just in case the test failed and did not cleanup after itself
+        self.gcp_authenticator.gcp_authenticate()
+        try:
+            SQL_QUERY_TEST_HELPER.delete_instances()
+            SQL_QUERY_TEST_HELPER.delete_instances(instance_suffix="2")
+            SQL_QUERY_TEST_HELPER.delete_service_account_acls()
+        finally:
+            self.gcp_authenticator.gcp_revoke_authentication()
+        super(CloudSqlExampleDagsIntegrationTest, self).tearDown()
+
+    def test_run_example_dag_cloudsql(self):
+        try:
+            self._run_dag()
+        except AirflowException as e:
+            self.log.warning(
+                "In case you see 'The instance or operation is not in an appropriate "
+                "state to handle the request' error - you "
+                "can remove '.random' file from airflow folder and re-run "
+                "the test. This will generate random name of the database for next run "
+                "(the problem is that Cloud SQL keeps names of deleted instances in "
+                "short-term cache).")
+            raise e
diff --git a/tests/contrib/operators/test_gcp_sql_operator_system_helper.py b/tests/contrib/operators/test_gcp_sql_operator_system_helper.py
new file mode 100755
index 0000000000..85974a9987
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_sql_operator_system_helper.py
@@ -0,0 +1,477 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import errno
+import json
+import os
+from os.path import expanduser
+
+import argparse
+from threading import Thread
+
+import time
+
+from six.moves.urllib.parse import urlsplit
+
+from tests.contrib.utils.base_gcp_system_test_case import RetrieveVariables
+from tests.contrib.utils.gcp_authenticator import GcpAuthenticator, GCP_CLOUDSQL_KEY
+from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
+
+try:
+    # noinspection PyProtectedMember
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+retrieve_variables = RetrieveVariables()
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
+
+GCSQL_POSTGRES_SERVER_CA_FILE = os.environ.get('GCSQL_POSTGRES_SERVER_CA_FILE',
+                                               ".key/postgres-server-ca.pem")
+GCSQL_POSTGRES_CLIENT_CERT_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_CERT_FILE',
+                                                 ".key/postgres-client-cert.pem")
+GCSQL_POSTGRES_CLIENT_KEY_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_KEY_FILE',
+                                                ".key/postgres-client-key.pem")
+GCSQL_POSTGRES_PUBLIC_IP_FILE = os.environ.get('GCSQL_POSTGRES_PUBLIC_IP_FILE',
+                                               ".key/postgres-ip.env")
+GCSQL_POSTGRES_USER = os.environ.get('GCSQL_POSTGRES_USER', 'postgres_user')
+GCSQL_POSTGRES_DATABASE_NAME = os.environ.get('GCSQL_POSTGRES_DATABASE_NAME',
+                                              'postgresdb')
+GCSQL_MYSQL_CLIENT_CERT_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_CERT_FILE',
+                                              ".key/mysql-client-cert.pem")
+GCSQL_MYSQL_CLIENT_KEY_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_KEY_FILE',
+                                             ".key/mysql-client-key.pem")
+GCSQL_MYSQL_SERVER_CA_FILE = os.environ.get('GCSQL_MYSQL_SERVER_CA_FILE',
+                                            ".key/mysql-server-ca.pem")
+GCSQL_MYSQL_PUBLIC_IP_FILE = os.environ.get('GCSQL_MYSQL_PUBLIC_IP_FILE',
+                                            ".key/mysql-ip.env")
+GCSQL_MYSQL_USER = os.environ.get('GCSQL_MYSQL_USER', 'mysql_user')
+GCSQL_MYSQL_DATABASE_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'mysqldb')
+
+GCSQL_MYSQL_EXPORT_URI = os.environ.get('GCSQL_MYSQL_EXPORT_URI',
+                                        'gs://bucketName/fileName')
+DB_VERSION_MYSQL = 'MYSQL_5_7'
+DV_VERSION_POSTGRES = 'POSTGRES_9_6'
+
+HOME_DIR = expanduser("~")
+
+
+def get_absolute_path(path):
+    if path.startswith("/"):
+        return path
+    else:
+        return os.path.join(HOME_DIR, path)
+
+
+server_ca_file_postgres = get_absolute_path(GCSQL_POSTGRES_SERVER_CA_FILE)
+client_cert_file_postgres = get_absolute_path(GCSQL_POSTGRES_CLIENT_CERT_FILE)
+client_key_file_postgres = get_absolute_path(GCSQL_POSTGRES_CLIENT_KEY_FILE)
+
+server_ca_file_mysql = get_absolute_path(GCSQL_MYSQL_SERVER_CA_FILE)
+client_cert_file_mysql = get_absolute_path(GCSQL_MYSQL_CLIENT_CERT_FILE)
+client_key_file_mysql = get_absolute_path(GCSQL_MYSQL_CLIENT_KEY_FILE)
+
+
+def get_postgres_instance_name(instance_suffix=''):
+    return os.environ.get('GCSQL_POSTGRES_INSTANCE_NAME' + instance_suffix,
+                          'testpostgres')
+
+
+def get_mysql_instance_name(instance_suffix=''):
+    return os.environ.get('GCSQL_MYSQL_INSTANCE_NAME' + instance_suffix,
+                          'testmysql')
+
+
+class CloudSqlQueryTestHelper(LoggingCommandExecutor):
+
+    def create_instances(self, instance_suffix=''):
+        thread_mysql = Thread(target=lambda: self.__create_instance(
+            get_mysql_instance_name(instance_suffix), DB_VERSION_MYSQL))
+        thread_postgres = Thread(target=lambda: self.__create_instance(
+            get_postgres_instance_name(instance_suffix), DV_VERSION_POSTGRES))
+        thread_mysql.start()
+        thread_postgres.start()
+        thread_mysql.join()
+        thread_postgres.join()
+
+    def delete_instances(self, instance_suffix=''):
+        thread_mysql = Thread(target=lambda: self.__delete_instance(
+            get_mysql_instance_name(instance_suffix)))
+        thread_postgres = Thread(target=lambda: self.__delete_instance(
+            get_postgres_instance_name(instance_suffix)))
+        thread_mysql.start()
+        thread_postgres.start()
+        thread_mysql.join()
+        thread_postgres.join()
+
+    def get_ip_addresses(self, instance_suffix):
+        with open(GCSQL_MYSQL_PUBLIC_IP_FILE, "w") as f:
+            ip = self.__get_ip_address(get_mysql_instance_name(instance_suffix),
+                                       'GCSQL_MYSQL_PUBLIC_IP')
+            f.write(ip)
+        with open(GCSQL_POSTGRES_PUBLIC_IP_FILE, "w") as f:
+            ip = self.__get_ip_address(get_postgres_instance_name(instance_suffix),
+                                       'GCSQL_POSTGRES_PUBLIC_IP')
+            f.write(ip)
+
+    def raise_database_exception(self, database):
+        raise Exception("The {database} instance does not exist. Make sure to run  "
+                        "`python {f} --action=before-tests` before running the test"
+                        " (and remember to run `python {f} --action=after-tests` "
+                        "after you are done."
+                        .format(f=__file__, database=database))
+
+    def check_if_instances_are_up(self, instance_suffix=''):
+        res_postgres = self.execute_cmd(
+            ['gcloud', 'sql', 'instances', 'describe',
+             get_postgres_instance_name(instance_suffix),
+             "--project={}".format(GCP_PROJECT_ID)])
+        if res_postgres != 0:
+            self.raise_database_exception('postgres')
+        res_postgres = self.execute_cmd(
+            ['gcloud', 'sql', 'instances', 'describe',
+             get_postgres_instance_name(instance_suffix),
+             "--project={}".format(GCP_PROJECT_ID)])
+        if res_postgres != 0:
+            self.raise_database_exception('mysql')
+
+    def authorize_address(self, instance_suffix=''):
+        ip = self.__get_my_public_ip()
+        self.log.info('Authorizing access from IP: %s', ip)
+        postgres_thread = Thread(target=lambda: self.execute_cmd(
+            ['gcloud', 'sql', 'instances', 'patch',
+             get_postgres_instance_name(instance_suffix), '--quiet',
+             "--authorized-networks={}".format(ip),
+             "--project={}".format(GCP_PROJECT_ID)]))
+        mysql_thread = Thread(target=lambda: self.execute_cmd(
+            ['gcloud', 'sql', 'instances', 'patch',
+             get_mysql_instance_name(instance_suffix), '--quiet',
+             "--authorized-networks={}".format(ip),
+             "--project={}".format(GCP_PROJECT_ID)]))
+        postgres_thread.start()
+        mysql_thread.start()
+        postgres_thread.join()
+        mysql_thread.join()
+
+    def setup_instances(self, instance_suffix=''):
+        mysql_thread = Thread(target=lambda: self.__setup_instance_and_certs(
+            get_mysql_instance_name(instance_suffix), DB_VERSION_MYSQL,
+            server_ca_file_mysql,
+            client_key_file_mysql, client_cert_file_mysql, GCSQL_MYSQL_DATABASE_NAME,
+            GCSQL_MYSQL_USER
+        ))
+        postgres_thread = Thread(target=lambda: self.__setup_instance_and_certs(
+            get_postgres_instance_name(instance_suffix), DV_VERSION_POSTGRES,
+            server_ca_file_postgres,
+            client_key_file_postgres, client_cert_file_postgres,
+            GCSQL_POSTGRES_DATABASE_NAME, GCSQL_POSTGRES_USER
+        ))
+        mysql_thread.start()
+        postgres_thread.start()
+        mysql_thread.join()
+        postgres_thread.join()
+        self.get_ip_addresses(instance_suffix)
+        self.authorize_address(instance_suffix)
+
+    def delete_service_account_acls(self):
+        self.__delete_service_accounts_acls()
+
+    def __create_instance(self, instance_name, db_version):
+        self.log.info('Creating a test %s instance "%s"...', db_version, instance_name)
+        try:
+            create_instance_opcode = self.__create_sql_instance(instance_name, db_version)
+            if create_instance_opcode:  # return code 1, some error occurred
+                operation_name = self.__get_operation_name(instance_name)
+                self.log.info('Waiting for operation: %s ...', operation_name)
+                self.__wait_for_create(operation_name)
+                self.log.info('... Done.')
+
+            self.log.info('... Done creating a test %s instance "%s"!\n',
+                          db_version, instance_name)
+        except Exception as ex:
+            self.log.error('Exception occurred. '
+                           'Aborting creating a test instance.\n\n%s', ex)
+            raise ex
+
+    def __delete_service_accounts_acls(self):
+        export_bucket_split = urlsplit(GCSQL_MYSQL_EXPORT_URI)
+        export_bucket_name = export_bucket_split[1]  # netloc (bucket)
+        self.log.info('Deleting temporary service accounts from bucket "%s"...',
+                      export_bucket_name)
+        all_permissions = self.check_output(['gsutil', 'iam', 'get',
+                                             "gs://{}".format(export_bucket_name),
+                                             "--project={}".format(GCP_PROJECT_ID)])
+        all_permissions_dejson = json.loads(all_permissions.decode("utf-8"))
+        for binding in all_permissions_dejson['bindings']:
+            if binding['role'] == 'roles/storage.legacyBucketWriter':
+                for member in binding['members']:
+                    if not member.startswith('serviceAccount:gcp-storage-account'):
+
+                        self.log.info("Remove member: {}".format(member))
+                        member_type, member_email = member.split(':')
+                        if member_type != 'serviceAccount':
+                            self.log.warning("Skip removing member {} as the type {} is "
+                                             "not service account".format(member,
+                                                                          member_type))
+                        self.execute_cmd(['gsutil', 'acl', 'ch', '-d', member_email,
+                                         "gs://{}".format(export_bucket_name)])
+                    else:
+                        self.log.info("Skip removing member {}".format(member))
+
+    @staticmethod
+    def set_ip_addresses_in_env():
+        CloudSqlQueryTestHelper.__set_ip_address_in_env(GCSQL_MYSQL_PUBLIC_IP_FILE)
+        CloudSqlQueryTestHelper.__set_ip_address_in_env(GCSQL_POSTGRES_PUBLIC_IP_FILE)
+
+    @staticmethod
+    def __set_ip_address_in_env(file_name):
+        if os.path.exists(file_name):
+            with open(file_name, "r") as f:
+                env, ip = f.read().split("=")
+                os.environ[env] = ip
+
+    def __setup_instance_and_certs(self, instance_name, db_version, server_ca_file,
+                                   client_key_file, client_cert_file, db_name,
+                                   db_username):
+        self.log.info('Setting up a test %s instance "%s"...', db_version, instance_name)
+        try:
+            self.__remove_keys_and_certs([server_ca_file, client_key_file,
+                                          client_cert_file])
+
+            self.__wait_for_operations(instance_name)
+            self.__write_to_file(server_ca_file, self.__get_server_ca_cert(instance_name))
+            client_cert_name = 'client-cert-name'
+            self.__wait_for_operations(instance_name)
+            self.__delete_client_cert(instance_name, client_cert_name)
+            self.__wait_for_operations(instance_name)
+            self.__create_client_cert(instance_name, client_key_file, client_cert_name)
+            self.__wait_for_operations(instance_name)
+            self.__write_to_file(client_cert_file,
+                                 self.__get_client_cert(instance_name, client_cert_name))
+            self.__wait_for_operations(instance_name)
+            self.__wait_for_operations(instance_name)
+            self.__create_user(instance_name, db_username)
+            self.__wait_for_operations(instance_name)
+            self.__delete_db(instance_name, db_name)
+            self.__create_db(instance_name, db_name)
+            self.log.info('... Done setting up a test %s instance "%s"!\n',
+                          db_version, instance_name)
+        except Exception as ex:
+            self.log.error('Exception occurred. '
+                           'Aborting setting up test instance and certs.\n\n%s', ex)
+            raise ex
+
+    def __delete_instance(self, instance_name):
+        # type: (str) -> None
+        self.log.info('Deleting Cloud SQL instance "%s"...', instance_name)
+        self.execute_cmd(['gcloud', 'sql', 'instances', 'delete',
+                          instance_name, '--quiet'])
+        self.log.info('... Done.')
+
+    def __get_my_public_ip(self):
+        return self.check_output(
+            ['curl', 'https://ipinfo.io/ip']).decode('utf-8').strip()
+
+    def __create_sql_instance(self, instance_name, db_version):
+        # type: (str, str) -> int
+        return self.execute_cmd(
+            ['gcloud', 'sql', 'instances', 'create', instance_name,
+             '--region', GCP_LOCATION,
+             '--project', GCP_PROJECT_ID,
+             '--database-version', db_version,
+             '--tier', 'db-f1-micro'])
+
+    def __get_server_ca_cert(self, instance_name):
+        # type: (str) -> bytes
+        self.log.info('Getting server CA cert for "%s"...', instance_name)
+        output = self.check_output(
+            ['gcloud', 'sql', 'instances', 'describe', instance_name,
+             '--format=value(serverCaCert.cert)'])
+        self.log.info('... Done.')
+        return output
+
+    def __get_client_cert(self, instance_name, client_cert_name):
+        # type: (str, str) -> bytes
+        self.log.info('Getting client cert for "%s"...', instance_name)
+        output = self.check_output(
+            ['gcloud', 'sql', 'ssl', 'client-certs', 'describe', client_cert_name, '-i',
+             instance_name, '--format=get(cert)'])
+        self.log.info('... Done.')
+        return output
+
+    def __create_user(self, instance_name, username):
+        # type: (str, str) -> None
+        self.log.info('Creating user "%s" in Cloud SQL instance "%s"...', username,
+                      instance_name)
+        self.execute_cmd(['gcloud', 'sql', 'users', 'create', username, '-i',
+                          instance_name, '--host', '%', '--password', 'JoxHlwrPzwch0gz9',
+                          '--quiet'])
+        self.log.info('... Done.')
+
+    def __delete_db(self, instance_name, db_name):
+        # type: (str, str) -> None
+        self.log.info('Deleting database "%s" in Cloud SQL instance "%s"...', db_name,
+                      instance_name)
+        self.execute_cmd(['gcloud', 'sql', 'databases', 'delete', db_name, '-i',
+                          instance_name, '--quiet'])
+        self.log.info('... Done.')
+
+    def __create_db(self, instance_name, db_name):
+        # type: (str, str) -> None
+        self.log.info('Creating database "%s" in Cloud SQL instance "%s"...', db_name,
+                      instance_name)
+        self.execute_cmd(['gcloud', 'sql', 'databases', 'create', db_name, '-i',
+                          instance_name, '--quiet'])
+        self.log.info('... Done.')
+
+    def __write_to_file(self, filepath, content):
+        # type: (str, bytes) -> None
+        # https://stackoverflow.com/a/12517490
+        self.log.info("Checking file under: %s", filepath)
+        if not os.path.exists(os.path.dirname(filepath)):
+            self.log.info("File doesn't exits. Creating dir...")
+            try:
+                os.makedirs(os.path.dirname(filepath))
+            except OSError as exc:  # Guard against race condition
+                self.log.info("Error while creating dir.")
+                if exc.errno != errno.EEXIST:
+                    raise
+        self.log.info("... Done. Dir created.")
+
+        with open(filepath, "w") as f:
+            f.write(str(content.decode('utf-8')))
+        self.log.info('Written file in: %s', filepath)
+
+    def __remove_keys_and_certs(self, filepaths):
+        if not len(filepaths):
+            return
+        self.log.info('Removing client keys and certs...')
+
+        for filepath in filepaths:
+            if os.path.exists(filepath):
+                os.remove(filepath)
+        self.log.info('Done ...')
+
+    def __delete_client_cert(self, instance_name, common_name):
+        self.log.info('Deleting client key and cert for "%s"...', instance_name)
+        self.execute_cmd(['gcloud', 'sql', 'ssl', 'client-certs', 'delete', common_name,
+                          '-i', instance_name, '--quiet'])
+        self.log.info('... Done.')
+
+    def __create_client_cert(self, instance_name, client_key_file, common_name):
+        self.log.info('Creating client key and cert for "%s"...', instance_name)
+        try:
+            os.remove(client_key_file)
+        except OSError:
+            pass
+        self.execute_cmd(['gcloud', 'sql', 'ssl', 'client-certs', 'create', common_name,
+                          client_key_file, '-i', instance_name])
+        self.log.info('... Done.')
+
+    def __get_operation_name(self, instance_name):
+        # type: (str) -> str
+        op_name_bytes = self.check_output(
+            ['gcloud', 'sql', 'operations', 'list', '-i',
+             instance_name, '--format=get(name)'])
+        return op_name_bytes.decode('utf-8').strip()
+
+    def __print_operations(self, operations):
+        self.log.info("\n==== OPERATIONS >>>>")
+        self.log.info(operations)
+        self.log.info("<<<< OPERATIONS ====\n")
+
+    def __wait_for_operations(self, instance_name):
+        # type: (str) -> None
+        while True:
+            operations = self.__get_operations(instance_name)
+            self.__print_operations(operations)
+            if "RUNNING" in operations:
+                self.log.info("Found a running operation. Sleeping 5s before retrying...")
+                time.sleep(5)
+            else:
+                break
+
+    def __get_ip_address(self, instance_name, env_var):
+        # type: (str, str) -> str
+        ip = self.check_output(
+            ['gcloud', 'sql', 'instances', 'describe',
+             instance_name,
+             '--format=get(ipAddresses[0].ipAddress)']
+        ).decode('utf-8').strip()
+        os.environ[env_var] = ip
+        return "{}={}".format(env_var, ip)
+
+    def __get_operations(self, instance_name):
+        # type: (str) -> str
+        op_name_bytes = self.check_output(
+            ['gcloud', 'sql', 'operations', 'list', '-i',
+             instance_name, '--format=get(NAME,TYPE,STATUS)'])
+        return op_name_bytes.decode('utf-8').strip()
+
+    def __wait_for_create(self, operation_name):
+        # type: (str) -> None
+        self.execute_cmd(['gcloud', 'beta', 'sql', 'operations', 'wait',
+                          '--project', GCP_PROJECT_ID, operation_name])
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(
+        description='Create or delete Cloud SQL instances for system tests.')
+    parser.add_argument('--action', required=True,
+                        choices=('create', 'delete', 'setup-instances',
+                                 'create2', 'delete2', 'setup-instances2',
+                                 'before-tests', 'after-tests',
+                                 'delete-service-accounts-acls'))
+    action = parser.parse_args().action
+
+    helper = CloudSqlQueryTestHelper()
+    gcp_authenticator = GcpAuthenticator(GCP_CLOUDSQL_KEY)
+    helper.log.info('Starting action: {}'.format(action))
+
+    gcp_authenticator.gcp_store_authentication()
+    try:
+        gcp_authenticator.gcp_authenticate()
+        if action == 'before-tests':
+            pass
+        elif action == 'after-tests':
+            pass
+        elif action == 'create':
+            helper.create_instances()
+        elif action == 'delete':
+            helper.delete_instances()
+        elif action == 'create2':
+            helper.create_instances(instance_suffix="2")
+        elif action == 'delete2':
+            helper.delete_instances(instance_suffix="2")
+        elif action == 'setup-instances':
+            helper.setup_instances()
+        elif action == 'setup-instances2':
+            helper.setup_instances(instance_suffix="2")
+        elif action == 'delete-service-accounts-acls':
+            helper.delete_service_account_acls()
+        else:
+            raise Exception("Unknown action: {}".format(action))
+    finally:
+        gcp_authenticator.gcp_restore_authentication()
+    helper.log.info('Finishing action: {}'.format(action))
diff --git a/tests/contrib/operators/test_gcp_sql_operatorquery_system.py b/tests/contrib/operators/test_gcp_sql_operatorquery_system.py
new file mode 100644
index 0000000000..4e216662f6
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_sql_operatorquery_system.py
@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import random
+import string
+import unittest
+from os.path import dirname
+
+import time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_sql_hook import CloudSqlProxyRunner
+from tests.contrib.utils.base_gcp_system_test_case import BaseGcpSystemTestCase, \
+    DagGcpSystemTestCase
+from tests.contrib.operators.test_gcp_sql_operator_system_helper import \
+    CloudSqlQueryTestHelper
+from tests.contrib.utils.gcp_authenticator import GCP_CLOUDSQL_KEY
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'project-id')
+
+SKIP_CLOUDSQL_QUERY_WARNING = """
+    This test is skipped from automated runs intentionally
+    as creating databases in Google Cloud SQL takes a very
+    long time. You can still set GCP_ENABLE_CLOUDSQL_QUERY_TEST
+    environment variable to 'True' and then you should be able to
+    run it manually after you create the database
+    Creating the database can be done by running
+    `{}/test_gcp_sql_operatorquery_system_helper.py \
+--action=before-tests`
+    (you should remember to delete the database with --action=after-tests flag)
+""".format(dirname(__file__))
+
+GCP_ENABLE_CLOUDSQL_QUERY_TEST = os.environ.get('GCP_ENABLE_CLOUDSQL_QUERY_TEST')
+
+if GCP_ENABLE_CLOUDSQL_QUERY_TEST == 'True':
+    enable_cloudsql_query_test = True
+else:
+    enable_cloudsql_query_test = False
+
+
+SQL_QUERY_TEST_HELPER = CloudSqlQueryTestHelper()
+
+
+@unittest.skipIf(not enable_cloudsql_query_test, SKIP_CLOUDSQL_QUERY_WARNING)
+class CloudSqlProxySystemTest(BaseGcpSystemTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudSqlProxySystemTest, self).__init__(
+            method_name,
+            gcp_key='gcp_cloudsql.json')
+
+    def setUp(self):
+        super(CloudSqlProxySystemTest, self).setUp()
+        self.gcp_authenticator.gcp_authenticate()
+        SQL_QUERY_TEST_HELPER.check_if_instances_are_up(instance_suffix="_QUERY")
+        self.gcp_authenticator.gcp_revoke_authentication()
+
+    @staticmethod
+    def generate_unique_path():
+        return ''.join(
+            random.choice(string.ascii_letters + string.digits) for _ in range(8))
+
+    def test_start_proxy_fail_no_parameters(self):
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + self.generate_unique_path(),
+                                     project_id=GCP_PROJECT_ID,
+                                     instance_specification='a')
+        with self.assertRaises(AirflowException) as cm:
+            runner.start_proxy()
+        err = cm.exception
+        self.assertIn("invalid instance name", str(err))
+        with self.assertRaises(AirflowException) as cm:
+            runner.start_proxy()
+        err = cm.exception
+        self.assertIn("invalid instance name", str(err))
+        self.assertIsNone(runner.sql_proxy_process)
+
+    def test_start_proxy_with_all_instances(self):
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + self.generate_unique_path(),
+                                     project_id=GCP_PROJECT_ID,
+                                     instance_specification='')
+        try:
+            runner.start_proxy()
+            time.sleep(1)
+        finally:
+            runner.stop_proxy()
+        self.assertIsNone(runner.sql_proxy_process)
+
+    def test_start_proxy_with_all_instances_generated_credential_file(self):
+        self.gcp_authenticator.set_dictionary_in_airflow_connection()
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + self.generate_unique_path(),
+                                     project_id=GCP_PROJECT_ID,
+                                     instance_specification='')
+        try:
+            runner.start_proxy()
+            time.sleep(1)
+        finally:
+            runner.stop_proxy()
+        self.assertIsNone(runner.sql_proxy_process)
+
+    def test_start_proxy_with_all_instances_specific_version(self):
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + self.generate_unique_path(),
+                                     project_id=GCP_PROJECT_ID,
+                                     instance_specification='',
+                                     sql_proxy_version='v1.13')
+        try:
+            runner.start_proxy()
+            time.sleep(1)
+        finally:
+            runner.stop_proxy()
+        self.assertIsNone(runner.sql_proxy_process)
+        self.assertEqual(runner.get_proxy_version(), "1.13")
+
+
+@unittest.skipIf(not enable_cloudsql_query_test, SKIP_CLOUDSQL_QUERY_WARNING)
+class CloudSqlQueryExampleDagsSystemTest(DagGcpSystemTestCase):
+
+    def __init__(self, method_name='runTest'):
+        super(CloudSqlQueryExampleDagsSystemTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_sql_query',
+            gcp_key=GCP_CLOUDSQL_KEY)
+
+    def setUp(self):
+        super(CloudSqlQueryExampleDagsSystemTest, self).setUp()
+        self.gcp_authenticator.gcp_authenticate()
+        SQL_QUERY_TEST_HELPER.check_if_instances_are_up(instance_suffix="_QUERY")
+        SQL_QUERY_TEST_HELPER.setup_instances(instance_suffix="_QUERY")
+        self.gcp_authenticator.gcp_revoke_authentication()
+
+    def test_run_example_dag_cloudsql_query(self):
+        self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_sql_operatorquery_system_helper.py b/tests/contrib/operators/test_gcp_sql_operatorquery_system_helper.py
new file mode 100755
index 0000000000..d8bc5164c9
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_sql_operatorquery_system_helper.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+
+from tests.contrib.operators.test_gcp_sql_operator_system_helper \
+    import CloudSqlQueryTestHelper
+from tests.contrib.utils.gcp_authenticator import GCP_CLOUDSQL_KEY, GcpAuthenticator
+
+QUERY_SUFFIX = "_QUERY"
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(
+        description='Create or delete Cloud SQL instances for system tests.')
+    parser.add_argument('--action', required=True,
+                        choices=('create', 'delete', 'setup-instances',
+                                 'before-tests', 'after-tests'))
+    action = parser.parse_args().action
+
+    helper = CloudSqlQueryTestHelper()
+    gcp_authenticator = GcpAuthenticator(gcp_key=GCP_CLOUDSQL_KEY)
+    helper.log.info('Starting action: {}'.format(action))
+    gcp_authenticator.gcp_store_authentication()
+    try:
+        gcp_authenticator.gcp_authenticate()
+        if action == 'before-tests':
+            helper.create_instances(instance_suffix=QUERY_SUFFIX)
+            helper.setup_instances(instance_suffix=QUERY_SUFFIX)
+        elif action == 'after-tests':
+            helper.delete_instances(instance_suffix=QUERY_SUFFIX)
+        elif action == 'create':
+            helper.create_instances(instance_suffix=QUERY_SUFFIX)
+        elif action == 'delete':
+            helper.delete_instances(instance_suffix=QUERY_SUFFIX)
+        elif action == 'setup-instances':
+            helper.setup_instances(instance_suffix=QUERY_SUFFIX)
+        else:
+            raise Exception("Unknown action: {}".format(action))
+    finally:
+        gcp_authenticator.gcp_restore_authentication()
+
+    helper.log.info('Finishing action: {}'.format(action))
diff --git a/tests/contrib/operators/test_gcs_acl_operator.py b/tests/contrib/operators/test_gcs_acl_operator.py
index 562c653574..f428397e8b 100644
--- a/tests/contrib/operators/test_gcs_acl_operator.py
+++ b/tests/contrib/operators/test_gcs_acl_operator.py
@@ -22,8 +22,6 @@
 from airflow.contrib.operators.gcs_acl_operator import \
     GoogleCloudStorageBucketCreateAclEntryOperator, \
     GoogleCloudStorageObjectCreateAclEntryOperator
-from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
-    SKIP_TEST_WARNING, GCP_GCS_KEY
 
 try:
     from unittest import mock
@@ -72,16 +70,3 @@ def test_object_create_acl(self, mock_hook):
             generation="test-generation",
             user_project="test-user-project"
         )
-
-
-@unittest.skipIf(
-    BaseGcpIntegrationTestCase.skip_check(GCP_GCS_KEY), SKIP_TEST_WARNING)
-class CloudStorageExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
-    def __init__(self, method_name='runTest'):
-        super(CloudStorageExampleDagsIntegrationTest, self).__init__(
-            method_name,
-            dag_id='example_gcs_acl',
-            gcp_key=GCP_GCS_KEY)
-
-    def test_run_example_dag_gcs_acl(self):
-        self._run_dag()
diff --git a/tests/contrib/operators/test_gcs_acl_operator_system.py b/tests/contrib/operators/test_gcs_acl_operator_system.py
new file mode 100644
index 0000000000..0afddcbc9e
--- /dev/null
+++ b/tests/contrib/operators/test_gcs_acl_operator_system.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from tests.contrib.utils.base_gcp_system_test_case import \
+    SKIP_TEST_WARNING, DagGcpSystemTestCase
+from tests.contrib.utils.gcp_authenticator import GCP_GCS_KEY
+
+
+@unittest.skipIf(
+    DagGcpSystemTestCase.skip_check(GCP_GCS_KEY), SKIP_TEST_WARNING)
+class CloudStorageExampleDagsSystemTest(DagGcpSystemTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudStorageExampleDagsSystemTest, self).__init__(
+            method_name,
+            dag_id='example_gcs_acl',
+            gcp_key=GCP_GCS_KEY)
+
+    def test_run_example_dag_gcs_acl(self):
+        self._run_dag()
diff --git a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
index 0825364884..2eeae3a8a9 100644
--- a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
+++ b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
@@ -35,7 +35,7 @@
 TASK_ID = 'test-s3-gcs-transfer-operator'
 S3_BUCKET = 'test-s3-bucket'
 GCS_BUCKET = 'test-gcs-bucket'
-PROJECT_ID = 'test-project'
+GCP_PROJECT_ID = 'test-project'
 DESCRIPTION = 'test-description'
 ACCESS_KEY = 'test-access-key'
 SECRET_KEY = 'test-secret-key'
@@ -57,7 +57,7 @@ def test_constructor(self):
             task_id=TASK_ID,
             s3_bucket=S3_BUCKET,
             gcs_bucket=GCS_BUCKET,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             description=DESCRIPTION,
             schedule=SCHEDULE,
         )
@@ -65,7 +65,7 @@ def test_constructor(self):
         self.assertEqual(operator.task_id, TASK_ID)
         self.assertEqual(operator.s3_bucket, S3_BUCKET)
         self.assertEqual(operator.gcs_bucket, GCS_BUCKET)
-        self.assertEqual(operator.project_id, PROJECT_ID)
+        self.assertEqual(operator.project_id, GCP_PROJECT_ID)
         self.assertEqual(operator.description, DESCRIPTION)
         self.assertEqual(operator.schedule, SCHEDULE)
 
@@ -78,7 +78,7 @@ def test_execute(self, mock_s3_hook, mock_transfer_hook):
             task_id=TASK_ID,
             s3_bucket=S3_BUCKET,
             gcs_bucket=GCS_BUCKET,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             description=DESCRIPTION,
             schedule=SCHEDULE,
         )
@@ -91,7 +91,7 @@ def test_execute(self, mock_s3_hook, mock_transfer_hook):
         operator.execute(None)
 
         mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             description=DESCRIPTION,
             schedule=SCHEDULE,
             transfer_spec={
@@ -123,7 +123,7 @@ def test_execute_skip_wait(self, mock_s3_hook, mock_transfer_hook):
             task_id=TASK_ID,
             s3_bucket=S3_BUCKET,
             gcs_bucket=GCS_BUCKET,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             description=DESCRIPTION,
             wait=False,
         )
@@ -136,7 +136,7 @@ def test_execute_skip_wait(self, mock_s3_hook, mock_transfer_hook):
         operator.execute(None)
 
         mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             description=DESCRIPTION,
             schedule=None,
             transfer_spec={
diff --git a/tests/contrib/utils/base_gcp_system_test_case.py b/tests/contrib/utils/base_gcp_system_test_case.py
new file mode 100644
index 0000000000..03031a457f
--- /dev/null
+++ b/tests/contrib/utils/base_gcp_system_test_case.py
@@ -0,0 +1,259 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import subprocess
+import unittest
+from glob import glob
+from shutil import move
+from tempfile import mkdtemp
+
+from airflow.utils import db as db_utils
+from airflow import models, settings, AirflowException, LoggingMixin
+from airflow.utils.timezone import datetime
+from tests.contrib.utils.gcp_authenticator import GcpAuthenticator
+from tests.contrib.utils.run_once_decorator import run_once
+
+AIRFLOW_MAIN_FOLDER = os.path.realpath(os.path.join(
+    os.path.dirname(os.path.realpath(__file__)),
+    os.pardir, os.pardir, os.pardir))
+
+AIRFLOW_PARENT_FOLDER = os.path.realpath(os.path.join(AIRFLOW_MAIN_FOLDER,
+                                                      os.pardir, os.pardir, os.pardir))
+ENV_FILE_RETRIEVER = os.path.join(AIRFLOW_PARENT_FOLDER,
+                                  "get_system_test_environment_variables.py")
+
+
+# Retrieve environment variables from parent directory retriever - it should be
+# in the path ${AIRFLOW_SOOURCE_DIR}/../../get_system_test_environment_variables.py
+# and it should print all the variables in form of key=value to the stdout
+class RetrieveVariables:
+    @staticmethod
+    @run_once
+    def retrieve_variables():
+        if os.path.isfile(ENV_FILE_RETRIEVER):
+            if os.environ.get('AIRFLOW__CORE__UNIT_TEST_MODE'):
+                raise Exception("Please unset the AIRFLOW__CORE__UNIT_TEST_MODE")
+            variables = subprocess.check_output([ENV_FILE_RETRIEVER]).decode("utf-8")
+            print("Applying variables retrieved")
+            for line in variables.split("\n"):
+                try:
+                    variable, key = line.split("=")
+                except ValueError:
+                    continue
+                print("{}={}".format(variable, key))
+                os.environ[variable] = key
+
+
+RetrieveVariables.retrieve_variables()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+
+CONTRIB_OPERATORS_EXAMPLES_DAG_FOLDER = os.path.join(
+    AIRFLOW_MAIN_FOLDER, "airflow", "contrib", "example_dags")
+
+OPERATORS_EXAMPLES_DAG_FOLDER = os.path.join(
+    AIRFLOW_MAIN_FOLDER, "airflow", "example_dags")
+
+AIRFLOW_HOME = os.environ.get('AIRFLOW_HOME',
+                              os.path.join(os.path.expanduser('~'), 'airflow'))
+
+DAG_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
+
+
+SKIP_TEST_WARNING = """
+The test is only run when the test is run in with GCP-system-tests enabled
+environment. You can enable it in one of two ways:
+
+* Set GCP_CONFIG_DIR environment variable to point to the GCP configuration
+  directory which keeps variables.env file with environment variables to set
+  and keys directory which keeps service account keys in .json format
+* Run this test within automated environment variable workspace where
+  config directory is checked out next to the airflow one.
+
+""".format(__file__)
+
+
+class BaseGcpSystemTestCase(unittest.TestCase, LoggingMixin):
+    def __init__(self,
+                 method_name,
+                 gcp_key,
+                 project_extra=None):
+        super(BaseGcpSystemTestCase, self).__init__(methodName=method_name)
+        self.gcp_authenticator = GcpAuthenticator(gcp_key=gcp_key,
+                                                  project_extra=project_extra)
+        self.setup_called = False
+
+    @staticmethod
+    def skip_check(key_name):
+        return GcpAuthenticator(key_name).full_key_path is None
+
+    def setUp(self):
+        self.gcp_authenticator.gcp_store_authentication()
+        self.gcp_authenticator.gcp_authenticate()
+        # We checked that authentication works. Ne we revoke it to make
+        # sure we are not relying on the default authentication
+        self.gcp_authenticator.gcp_revoke_authentication()
+        self.setup_called = True
+
+    # noinspection PyPep8Naming
+    def tearDown(self):
+        self.gcp_authenticator.gcp_restore_authentication()
+
+
+class DagGcpSystemTestCase(BaseGcpSystemTestCase):
+    def __init__(self,
+                 method_name,
+                 dag_id,
+                 gcp_key,
+                 dag_name=None,
+                 require_local_executor=False,
+                 example_dags_folder=CONTRIB_OPERATORS_EXAMPLES_DAG_FOLDER,
+                 project_extra=None):
+        super(DagGcpSystemTestCase, self).__init__(method_name=method_name,
+                                                   gcp_key=gcp_key,
+                                                   project_extra=project_extra)
+        self.dag_id = dag_id
+        self.dag_name = self.dag_id + '.py' if not dag_name else dag_name
+        self.example_dags_folder = example_dags_folder
+        self.require_local_executor = require_local_executor
+        self.temp_dir = None
+
+    @staticmethod
+    def _get_dag_folder():
+        return DAG_FOLDER
+
+    @staticmethod
+    def _get_files_to_link(path):
+        """
+        Returns all file names (note - file names not paths)
+        that have the same base name as the .py dag file (for example dag_name.sql etc.)
+        :param path: path to the dag file.
+        :return: list of files matching the base name
+        """
+        prefix, ext = os.path.splitext(path)
+        assert ext == '.py', "Dag name should be a .py file and is {} file".format(ext)
+        files_to_link = []
+        for file in glob(prefix + ".*"):
+            files_to_link.append(os.path.basename(file))
+        return files_to_link
+
+    def _symlink_dag_and_associated_files(self, remove=False):
+        target_folder = self._get_dag_folder()
+        source_path = os.path.join(self.example_dags_folder, self.dag_name)
+        for file_name in self._get_files_to_link(source_path):
+            source_path = os.path.join(self.example_dags_folder, file_name)
+            target_path = os.path.join(target_folder, file_name)
+            if remove:
+                try:
+                    self.log.info("Remove symlink: {} -> {} ".format(
+                        target_path, source_path))
+                    os.remove(target_path)
+                except OSError:
+                    pass
+            else:
+                if not os.path.exists(target_path):
+                    self.log.info("Symlink: {} -> {} ".format(target_path, source_path))
+                    os.symlink(source_path, target_path)
+                else:
+                    self.log.info("Symlink {} already exists. Not symlinking it.".
+                                  format(target_path))
+
+    def _store_dags_to_temporary_directory(self):
+        dag_folder = self._get_dag_folder()
+        self.temp_dir = mkdtemp()
+        self.log.info("Storing DAGS from {} to temporary directory {}".
+                      format(dag_folder, self.temp_dir))
+        try:
+            os.mkdir(dag_folder)
+        except OSError:
+            pass
+        for file in os.listdir(dag_folder):
+            move(os.path.join(dag_folder, file), os.path.join(self.temp_dir, file))
+
+    def _restore_dags_from_temporary_directory(self):
+        dag_folder = self._get_dag_folder()
+        self.log.info("Restoring DAGS to {} from temporary directory {}"
+                      .format(dag_folder, self.temp_dir))
+        for file in os.listdir(self.temp_dir):
+            move(os.path.join(self.temp_dir, file), os.path.join(dag_folder, file))
+
+    def _run_dag(self):
+        self.log.info("Attempting to run DAG: {}".format(self.dag_id))
+        if not self.setup_called:
+            raise AirflowException("Please make sure to call super.setUp() in your "
+                                   "test class!")
+        dag_folder = self._get_dag_folder()
+        dag_bag = models.DagBag(dag_folder=dag_folder, include_examples=False)
+        self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+        dag = dag_bag.get_dag(self.dag_id)
+        if dag is None:
+            raise AirflowException(
+                "The Dag {} could not be found. It's either an import problem or "
+                "the dag {} was not symlinked to the DAGs folder. "
+                "The content of the {} folder is {}".
+                format(self.dag_id,
+                       self.dag_id + ".py",
+                       dag_folder,
+                       os.listdir(dag_folder)))
+        dag.clear(reset_dag_runs=True)
+        dag.run(ignore_first_depends_on_past=True, verbose=True)
+
+    @staticmethod
+    def _check_local_executor_setup():
+        postgres_path = os.path.realpath(os.path.join(
+            AIRFLOW_MAIN_FOLDER,
+            "tests", "contrib", "operators", "postgres_local_executor.cfg"))
+        if postgres_path != os.environ.get('AIRFLOW_CONFIG'):
+            raise AirflowException(
+                """
+Please set AIRFLOW_CONFIG variable to '{}'
+and make sure you have a Postgres server running locally and
+airflow/airflow.db database created.
+
+You can create the database via these commands:
+'createuser root'
+'createdb airflow/airflow.db`
+
+""".format(postgres_path))
+
+    # noinspection PyPep8Naming
+    def setUp(self):
+        if self.require_local_executor:
+            self._check_local_executor_setup()
+        try:
+            # We want to avoid random errors while database got reset - those
+            # Are apparently triggered by parser trying to parse DAGs while
+            # The tables are dropped. We move the dags temporarily out of the dags folder
+            # and move them back after reset
+            self._store_dags_to_temporary_directory()
+            try:
+                db_utils.resetdb(settings.RBAC)
+            finally:
+                self._restore_dags_from_temporary_directory()
+            self._symlink_dag_and_associated_files()
+            super(DagGcpSystemTestCase, self).setUp()
+
+        except Exception as e:
+            # In case of any error during setup - restore the authentication
+            self.gcp_authenticator.gcp_restore_authentication()
+            raise e
+
+    def tearDown(self):
+        self._symlink_dag_and_associated_files(remove=True)
+        super(DagGcpSystemTestCase, self).tearDown()
diff --git a/tests/contrib/utils/gcp_authenticator.py b/tests/contrib/utils/gcp_authenticator.py
new file mode 100644
index 0000000000..b26f23f9c7
--- /dev/null
+++ b/tests/contrib/utils/gcp_authenticator.py
@@ -0,0 +1,209 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import os
+import subprocess
+
+from airflow import settings, AirflowException
+from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
+
+from airflow.models.connection import Connection
+
+GCP_COMPUTE_KEY = 'gcp_compute.json'
+GCP_FUNCTION_KEY = 'gcp_function.json'
+GCP_CLOUDSQL_KEY = 'gcp_cloudsql.json'
+GCP_BIGTABLE_KEY = 'gcp_bigtable.json'
+GCP_SPANNER_KEY = 'gcp_spanner.json'
+GCP_GCS_KEY = 'gcp_gcs.json'
+
+KEYPATH_EXTRA = 'extra__google_cloud_platform__key_path'
+KEYFILE_DICT_EXTRA = 'extra__google_cloud_platform__keyfile_dict'
+SCOPE_EXTRA = 'extra__google_cloud_platform__scope'
+PROJECT_EXTRA = 'extra__google_cloud_platform__project'
+
+AIRFLOW_MAIN_FOLDER = os.path.realpath(os.path.join(
+    os.path.dirname(os.path.realpath(__file__)),
+    os.pardir, os.pardir, os.pardir))
+
+
+class GcpAuthenticator(LoggingCommandExecutor):
+    """
+    Manages authentication to Google Cloud Platform. It helps to manage
+    connection - it can authenticate with the gcp key name specified
+    """
+    original_account = None
+
+    def __init__(self, gcp_key, project_extra=None):
+        """
+        Initialises the authenticator.
+
+        :param gcp_key: name of the key to use for authentication (see GCP_*_KEY values)
+        :param project_extra: optional extra project parameter passed to google cloud
+               connection
+        """
+        super(GcpAuthenticator, self).__init__()
+        self.gcp_key = gcp_key
+        self.project_extra = project_extra
+        self.project_id = self.get_project_id()
+        self.full_key_path = None
+        self._set_key_path()
+
+    @staticmethod
+    def get_project_id():
+        return os.environ.get('GCP_PROJECT_ID')
+
+    def set_key_path_in_airflow_connection(self):
+        """
+        Set key path in 'google_cloud_default' connection to point to the full
+        key path
+        :return: None
+        """
+        session = settings.Session()
+        try:
+            conn = session.query(Connection).filter(
+                Connection.conn_id == 'google_cloud_default')[0]
+            extras = conn.extra_dejson
+            extras[KEYPATH_EXTRA] = self.full_key_path
+            if extras.get(KEYFILE_DICT_EXTRA):
+                del extras[KEYFILE_DICT_EXTRA]
+            extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
+            extras[PROJECT_EXTRA] = self.project_extra if self.project_extra else \
+                self.project_id
+            conn.extra = json.dumps(extras)
+            session.commit()
+        except BaseException as ex:
+            self.log.info('Airflow DB Session error:' + str(ex))
+            session.rollback()
+            raise
+        finally:
+            session.close()
+
+    def set_dictionary_in_airflow_connection(self):
+        """
+        Set dictionary in 'google_cloud_default' connection to contain content
+        of the json service account file.
+        :return: None
+        """
+        session = settings.Session()
+        try:
+            conn = session.query(Connection).filter(
+                Connection.conn_id == 'google_cloud_default')[0]
+            extras = conn.extra_dejson
+            with open(self.full_key_path, "r") as path_file:
+                content = json.load(path_file)
+            extras[KEYFILE_DICT_EXTRA] = json.dumps(content)
+            if extras.get(KEYPATH_EXTRA):
+                del extras[KEYPATH_EXTRA]
+            extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
+            extras[PROJECT_EXTRA] = self.project_extra
+            conn.extra = json.dumps(extras)
+            session.commit()
+        except BaseException as ex:
+            self.log.info('Airflow DB Session error:' + str(ex))
+            session.rollback()
+            raise
+        finally:
+            session.close()
+
+    def _set_key_path(self):
+        """
+        Sets full key path - if GCP_CONFIG_DIR points to absolute
+            directory, it tries to find the key in this directory. Otherwise it assumes
+            that Airflow is run from directory where configuration i checked out
+            next to airflow directory in config directory
+            it tries to find the key folder in the workspace's config
+            directory.
+        :param : name of the key file to find.
+        """
+        if "GCP_CONFIG_DIR" in os.environ:
+            gcp_config_dir = os.environ["GCP_CONFIG_DIR"]
+        else:
+            gcp_config_dir = os.path.join(AIRFLOW_MAIN_FOLDER,
+                                          os.pardir,
+                                          "config")
+        if not os.path.isdir(gcp_config_dir):
+            self.log.info("The {} is not a directory".format(gcp_config_dir))
+        key_dir = os.path.join(gcp_config_dir, "keys")
+        if not os.path.isdir(key_dir):
+            self.log.info("The {} is not a directory".format(key_dir))
+            return
+        key_path = os.path.join(key_dir, self.gcp_key)
+        if not os.path.isfile(key_path):
+            self.log.info("The {} is missing".format(key_path))
+        self.full_key_path = key_path
+
+    def _validate_key_set(self):
+        if self.full_key_path is None:
+            raise AirflowException("The gcp_key is not set!")
+        if not os.path.isfile(self.full_key_path):
+            raise AirflowException(
+                "The key {} could not be found. Please copy it to the {} path.".
+                format(self.gcp_key, self.full_key_path))
+
+    def gcp_authenticate(self):
+        """
+        Authenticate with service account specified via key name.
+        """
+        self._validate_key_set()
+        self.log.info("Setting the GCP key to {}".format(self.full_key_path))
+        # Checking if we can authenticate using service account credentials provided
+        self.execute_cmd(
+            ['gcloud', 'auth', 'activate-service-account',
+             '--key-file={}'.format(self.full_key_path),
+             '--project={}'.format(self.project_id)])
+        self.set_key_path_in_airflow_connection()
+
+    def gcp_revoke_authentication(self):
+        """
+        Change default authentication to none - which is not existing one.
+        """
+        self._validate_key_set()
+        self.log.info("Revoking authentication - setting it to none")
+        self.execute_cmd(
+            ['gcloud', 'config', 'get-value', 'account',
+             '--project={}'.format(self.project_id)])
+        self.execute_cmd(['gcloud', 'config', 'set', 'account', 'none',
+                          '--project={}'.format(self.project_id)])
+
+    def gcp_store_authentication(self):
+        """
+        Store authentication as it was originally so it can be restored and revoke
+        authentication.
+        """
+        self._validate_key_set()
+        if not GcpAuthenticator.original_account:
+            GcpAuthenticator.original_account = self.check_output(
+                ['gcloud', 'config', 'get-value', 'account',
+                 '--project={}'.format(self.project_id)]).decode('utf-8')
+            self.log.info("Storing account: to restore it later {}".format(
+                GcpAuthenticator.original_account))
+
+    def gcp_restore_authentication(self):
+        """
+        Restore authentication to the original one one.
+        """
+        self._validate_key_set()
+        if GcpAuthenticator.original_account:
+            self.log.info("Restoring original account stored: {}".
+                          format(GcpAuthenticator.original_account))
+            subprocess.call(['gcloud', 'config', 'set', 'account',
+                             GcpAuthenticator.original_account,
+                             '--project={}'.format(self.project_id)])
+        else:
+            self.log.info("Not restoring the original GCP account: it is not set")
diff --git a/tests/contrib/utils/logging_command_executor.py b/tests/contrib/utils/logging_command_executor.py
new file mode 100644
index 0000000000..e0833f4a77
--- /dev/null
+++ b/tests/contrib/utils/logging_command_executor.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import subprocess
+
+from airflow import LoggingMixin, AirflowException
+
+
+class LoggingCommandExecutor(LoggingMixin):
+
+    def execute_cmd(self, cmd, silent=False):
+        if silent:
+            self.log.info("Executing in silent mode: '{}'".format(" ".join(cmd)))
+            with open(os.devnull, 'w') as FNULL:
+                return subprocess.call(args=cmd, stdout=FNULL, stderr=subprocess.STDOUT)
+        else:
+            self.log.info("Executing: '{}'".format(" ".join(cmd)))
+            process = subprocess.Popen(args=cmd, stdout=subprocess.PIPE,
+                                       stderr=subprocess.PIPE, universal_newlines=True)
+            output, err = process.communicate()
+            retcode = process.poll()
+            self.log.info("Stdout: {}".format(output))
+            self.log.info("Stderr: {}".format(err))
+            if retcode:
+                print("Error when executing '{}'".format(" ".join(cmd)))
+            return retcode
+
+    def check_output(self, cmd):
+        self.log.info("Executing for output: '{}'".format(" ".join(cmd)))
+        process = subprocess.Popen(args=cmd, stdout=subprocess.PIPE,
+                                   stderr=subprocess.PIPE)
+        output, err = process.communicate()
+        retcode = process.poll()
+        if retcode:
+            self.log.info("Error when executing '{}'".format(" ".join(cmd)))
+            self.log.info("Stdout: {}".format(output))
+            self.log.info("Stderr: {}".format(err))
+            raise AirflowException("Retcode {} on {} with stdout: {}, stderr: {}".
+                                   format(retcode, " ".join(cmd), output, err))
+        return output
diff --git a/tests/contrib/utils/run_once_decorator.py b/tests/contrib/utils/run_once_decorator.py
new file mode 100644
index 0000000000..934250ec2f
--- /dev/null
+++ b/tests/contrib/utils/run_once_decorator.py
@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from functools import wraps
+
+
+def run_once(f):
+    """Runs a function (successfully) only once.
+    The running can be reset by setting the `has_run` attribute to False
+    """
+    @wraps(f)
+    def wrapper(*args, **kwargs):
+        if not wrapper.has_run:
+            result = f(*args, **kwargs)
+            wrapper.has_run = True
+            return result
+    wrapper.has_run = False
+    return wrapper
diff --git a/tests/dags/.gitignore b/tests/dags/.gitignore
index d42b544171..37ab6d48e9 100644
--- a/tests/dags/.gitignore
+++ b/tests/dags/.gitignore
@@ -1,5 +1,4 @@
 # This line is to avoid accidental commits of example dags for integration testing
 # In order to test example dags easily we often create symbolic links in this directory
-# and run the Airflow with AIRFLOW__CORE__UNIT_TEST_MODE=True
 # this line prevents accidental committing of such symbolic links.
 example_*


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services