You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/16 20:39:25 UTC

[GitHub] potiuk closed pull request #4329: Cloud build

potiuk closed pull request #4329: Cloud build
URL: https://github.com/apache/incubator-airflow/pull/4329
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/cloudbuild/cloudbuild.yaml b/airflow/contrib/cloudbuild/cloudbuild.yaml
new file mode 100644
index 0000000000..c001b6ac62
--- /dev/null
+++ b/airflow/contrib/cloudbuild/cloudbuild.yaml
@@ -0,0 +1,201 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+steps:
+- name: 'gcr.io/cloud-builders/gcloud'
+  id: 'clone-variables'
+  args: ['source', 'repos', 'clone', 'airflow-breeze-config', '/root/airflow-breeze-config']
+  volumes:
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:${_AIRFLOW_BREEZE_BRANCH}'
+  id: 'decrypt-files-and-variables'
+  env:
+  - "PYTHON_VERSION=2.7"
+  - "HOME=/root"
+  - "AIRFLOW_BREEZE_CONFIG_DIR=/root/airflow-breeze-config"
+  - "BUILD_ID=${BUILD_ID}"
+  - "GCP_PROJECT_ID=${PROJECT_ID}"
+  entrypoint: '/bin/bash'
+  args: ['-c', '/airflow/_init.sh ${_CLOUDBUILD_SCRIPTS_DIR}/decrypt_all_files_and_variables.sh']
+  dir: '/root/airflow-breeze-config/keys'
+  volumes:
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+- name: "gcr.io/cloud-builders/gsutil"
+  id: 'create-build-id-dir'
+  entrypoint: '/bin/bash'
+  waitFor: ['decrypt-files-and-variables']
+  args: ['-c', 'source /root/airflow-breeze-config/variables.env && touch /tmp/empty.txt && gsutil cp /tmp/empty.txt "gs://$${AIRFLOW_BREEZE_GCP_BUILD_BUCKET}/${BUILD_ID}/empty.txt" || true']
+  volumes:
+  - name: 'logs-python35'
+    path: '/logs/python35'
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:${_AIRFLOW_BREEZE_BRANCH}'
+  id: 'build-docs'
+  env:
+    - "PYTHON_VERSION=2.7"
+    - "HOME=/root"
+    - "BUILD_ID=${BUILD_ID}"
+    - "GCP_PROJECT_ID=${PROJECT_ID}"
+  args: [ "/bin/bash", "-c", "/airflow/_init.sh ${_CLOUDBUILD_SCRIPTS_DIR}/build_docs.sh"]
+  volumes:
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:${_AIRFLOW_BREEZE_BRANCH}'
+  id: 'python27-test'
+  waitFor: ['decrypt-files-and-variables', 'create-build-id-dir']
+  env:
+    - "HOME=/root"
+    - "PYTHON_VERSION=2.7"
+    - "AIRFLOW_BREEZE_TEST_SUITE=python27"
+    - "AIRFLOW_BREEZE_SHORT_SHA=${SHORT_SHA}"
+    - "AIRFLOW_BREEZE_CONFIG_DIR=/root/airflow-breeze-config"
+    - "BUILD_ID=${BUILD_ID}"
+    - "SKIP_UNLINKING_EXAMPLES=True"
+    - "GCP_PROJECT_ID=${PROJECT_ID}"
+  args: [ "/bin/bash", "-c",
+          "/airflow/_init.sh ${_CLOUDBUILD_SCRIPTS_DIR}/run_ci_tests.sh"]
+  volumes:
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+  - name: 'logs-python27'
+    path: '/logs'
+- name: "gcr.io/cloud-builders/gsutil"
+  id: 'python27-send-logs'
+  entrypoint: '/bin/bash'
+  waitFor: ['python27-test']
+  args: ['-c', 'source /root/airflow-breeze-config/variables.env && gsutil -m cp -r /logs "gs://$${AIRFLOW_BREEZE_GCP_BUILD_BUCKET}/${BUILD_ID}/" || true']
+  volumes:
+  - name: 'logs-python27'
+    path: '/logs/python27'
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:${_AIRFLOW_BREEZE_BRANCH}'
+  id: 'python35-test'
+  waitFor: ['decrypt-files-and-variables', 'create-build-id-dir']
+  env:
+  - "HOME=/root"
+  - "PYTHON_VERSION=3.5"
+  - "AIRFLOW_BREEZE_TEST_SUITE=python35"
+  - "AIRFLOW_BREEZE_SHORT_SHA=${SHORT_SHA}"
+  - "AIRFLOW_BREEZE_CONFIG_DIR=/root/airflow-breeze-config"
+  - "BUILD_ID=${BUILD_ID}"
+  - "SKIP_UNLINKING_EXAMPLES=True"
+  - "GCP_PROJECT_ID=${PROJECT_ID}"
+  args: [ "/bin/bash", "-c",
+          "/airflow/_init.sh ${_CLOUDBUILD_SCRIPTS_DIR}/run_ci_tests.sh"]
+  volumes:
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+  - name: 'logs-python35'
+    path: '/logs'
+- name: "gcr.io/cloud-builders/gsutil"
+  id: 'python35-send-logs'
+  entrypoint: '/bin/bash'
+  waitFor: ['python35-test']
+  args: ['-c', 'source /root/airflow-breeze-config/variables.env && gsutil -m cp -r /logs "gs://$${AIRFLOW_BREEZE_GCP_BUILD_BUCKET}/${BUILD_ID}/" || true']
+  volumes:
+  - name: 'logs-python35'
+    path: '/logs/python35'
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:${_AIRFLOW_BREEZE_BRANCH}'
+  id: 'python36-test'
+  waitFor: ['decrypt-files-and-variables', 'create-build-id-dir']
+  env:
+    - "HOME=/root"
+    - "PYTHON_VERSION=3.6"
+    - "AIRFLOW_BREEZE_TEST_SUITE=python36"
+    - "AIRFLOW_BREEZE_SHORT_SHA=${SHORT_SHA}"
+    - "AIRFLOW_BREEZE_CONFIG_DIR=/root/airflow-breeze-config"
+    - "BUILD_ID=${BUILD_ID}"
+    - "SKIP_UNLINKING_EXAMPLES=True"
+    - "GCP_PROJECT_ID=${PROJECT_ID}"
+  args: [ "/bin/bash", "-c",
+          "/airflow/_init.sh ${_CLOUDBUILD_SCRIPTS_DIR}/run_ci_tests.sh"]
+  volumes:
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+  - name: 'logs-python36'
+    path: '/logs'
+- name: "gcr.io/cloud-builders/gsutil"
+  id: 'python36-send-logs'
+  entrypoint: '/bin/bash'
+  waitFor: ['python36-test']
+  args: ['-c', 'source /root/airflow-breeze-config/variables.env && gsutil -m cp -r /logs "gs://$${AIRFLOW_BREEZE_GCP_BUILD_BUCKET}/${BUILD_ID}/" || true']
+  volumes:
+  - name: 'logs-python36'
+    path: '/logs/python36'
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:${_AIRFLOW_BREEZE_BRANCH}'
+  id: 'merge-tests'
+  waitFor: ['python27-send-logs', 'python35-send-logs', 'python36-send-logs']
+  env:
+    - "PYTHON_VERSION=2.7"
+    - "HOME=/root"
+    - "BUILD_ID=${BUILD_ID}"
+    - "GCP_PROJECT_ID=${PROJECT_ID}"
+  args: [ "/bin/bash", "-c", "/airflow/_init.sh ${_CLOUDBUILD_SCRIPTS_DIR}/merge_tests.sh ${_AIRFLOW_BREEZE_TEST_SUITES}"]
+  volumes:
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:${_AIRFLOW_BREEZE_BRANCH}'
+  id: 'prepare-summary'
+  waitFor: ['merge-tests']
+  env:
+  - "PYTHON_VERSION=2.7"
+  - "HOME=/root"
+  - "BUILD_ID=${BUILD_ID}"
+  - "REPO_NAME=${REPO_NAME}"
+  - "BRANCH_NAME=${BRANCH_NAME}"
+  - "TAG_NAME=${TAG_NAME}"
+  - "COMMIT_SHA=${COMMIT_SHA}"
+  - "GCP_PROJECT_ID=${PROJECT_ID}"
+  - "AIRFLOW_BREEZE_TEST_SUITES=${_AIRFLOW_BREEZE_TEST_SUITES}"
+  args: [ "/bin/bash", "-c", "/airflow/_init.sh ${_CLOUDBUILD_SCRIPTS_DIR}/prepare_summary_page.sh"]
+  volumes:
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+- name: "gcr.io/cloud-builders/gsutil"
+  id: 'send-artifacts'
+  entrypoint: '/bin/bash'
+  waitFor: ['build-docs', 'prepare-summary']
+  args: ['-c', 'source /root/airflow-breeze-config/variables.env && gsutil -m cp -r output/* "gs://$${AIRFLOW_BREEZE_GCP_BUILD_BUCKET}/" || true']
+  volumes:
+  - name: 'airflow-breeze-config'
+    path: '/root/airflow-breeze-config'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:${_AIRFLOW_BREEZE_BRANCH}'
+  id: 'verify-tests'
+  waitFor: ['send-artifacts']
+  env:
+    - "PYTHON_VERSION=2.7"
+    - "HOME=/root"
+    - "BUILD_ID=${BUILD_ID}"
+    - "GCP_PROJECT_ID=${PROJECT_ID}"
+  args: [ "/bin/bash", "-c", "/airflow/_init.sh ${_CLOUDBUILD_SCRIPTS_DIR}/verify_tests.sh"]
+  volumes:
+substitutions:
+  _AIRFLOW_BREEZE_BRANCH: 'master'
+  _AIRFLOW_BREEZE_TEST_SUITES: 'python27 python35 python36'
+  _CLOUDBUILD_SCRIPTS_DIR: '/workspace/airflow/contrib/cloudbuild/scripts'
+options:
+  machineType: 'N1_HIGHCPU_8'
+timeout: 2800s
diff --git a/airflow/contrib/cloudbuild/scripts/build_docs.sh b/airflow/contrib/cloudbuild/scripts/build_docs.sh
new file mode 100755
index 0000000000..dad22bd7ab
--- /dev/null
+++ b/airflow/contrib/cloudbuild/scripts/build_docs.sh
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+set -x
+
+MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+
+export AIRFLOW_HOME="${AIRFLOW_HOME:=/airflow}"
+export AIRFLOW_SOURCES="${AIRFLOW_SOURCES:=/workspace}"
+export AIRFLOW_OUTPUT="${AIRFLOW_SOURCES}/output"
+export AIRFLOW_BREEZE_TEST_SUITE="${AIRFLOW_BREEZE_TEST_SUITE:=docs}"
+export BUILD_ID="${BUILD_ID:=build}"
+
+export DOC_SOURCES_DIR=${DOC_SOURCES_DIR:=${AIRFLOW_SOURCES}/docs}
+export DOC_OUTPUT_DIR=${AIRFLOW_OUTPUT}/${BUILD_ID}/docs
+
+pushd ${DOC_SOURCES_DIR}
+
+./build.sh
+
+mkdir -pv ${DOC_OUTPUT_DIR}
+rm -rvf ${DOC_OUTPUT_DIR}
+cp -rv ${DOC_SOURCES_DIR}/_build/html/ ${DOC_OUTPUT_DIR}
+
+popd
diff --git a/airflow/contrib/cloudbuild/scripts/create_cloudsql_database.sh b/airflow/contrib/cloudbuild/scripts/create_cloudsql_database.sh
new file mode 100755
index 0000000000..398a024c47
--- /dev/null
+++ b/airflow/contrib/cloudbuild/scripts/create_cloudsql_database.sh
@@ -0,0 +1,41 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+set -x
+
+MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+
+export AIRFLOW_HOME="${AIRFLOW_HOME:=/airflow}"
+export AIRFLOW_SOURCES="${AIRFLOW_SOURCES:=/workspace}"
+export AIRFLOW_OUTPUT="${AIRFLOW_SOURCES}/output"
+export BUILD_ID="${BUILD_ID:=build}"
+export AIRFLOW_BREEZE_CONFIG_DIR="${HOME}/airflow-breeze-config"
+export AIRFLOW_BREEZE_TEST_SUITES=${AIRFLOW_BREEZE_TEST_SUITES:=""}
+
+for AIRFLOW_BREEZE_TEST_SUITE in ${AIRFLOW_BREEZE_TEST_SUITES}; do
+    export AIRFLOW_BREEZE_TEST_SUITE
+    # Re-source variables for test suite
+    set -a
+    source ${AIRFLOW_BREEZE_CONFIG_DIR}/variables.env
+    set +a
+    echo "Creating CloudSQL database for test suite ${AIRFLOW_BREEZE_TEST_SUITE}"
+    python ${AIRFLOW_SOURCES}/tests/contrib/operators/test_gcp_sql_operator.py --action=create
+    echo "Created CloudSQL database for test suite ${AIRFLOW_BREEZE_TEST_SUITE}"
+done
diff --git a/airflow/contrib/cloudbuild/scripts/decrypt_all_files_and_variables.sh b/airflow/contrib/cloudbuild/scripts/decrypt_all_files_and_variables.sh
new file mode 100755
index 0000000000..b7af2f8226
--- /dev/null
+++ b/airflow/contrib/cloudbuild/scripts/decrypt_all_files_and_variables.sh
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+set -euo pipefail
+
+MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+
+export AIRFLOW_HOME="${AIRFLOW_HOME:=/airflow}"
+export AIRFLOW_SOURCES="${AIRFLOW_SOURCES:=/workspace}"
+export AIRFLOW_OUTPUT="${AIRFLOW_SOURCES}/output"
+export AIRFLOW_BREEZE_TEST_SUITE="${AIRFLOW_BREEZE_TEST_SUITE:=docs}"
+export BUILD_ID="${BUILD_ID:=build}"
+export GCP_PROJECT_ID=${GCP_PROJECT_ID:="wrong-project"}
+
+AIRFLOW_BREEZE_CONFIG_DIR=${AIRFLOW_BREEZE_CONFIG_DIR:=${HOME}/airflow-breeze-config}
+export GCP_SERVICE_ACCOUNT_KEY_DIR=${AIRFLOW_BREEZE_CONFIG_DIR}/keys
+
+# Create logs dir preemptively - if same directory is created in parallel in two
+# images, one of the newly created dirs might disappear - it's likely due to the way
+# syncing the volume data back works
+export LOG_OUTPUT_DIR=${AIRFLOW_OUTPUT}/${BUILD_ID}/logs/
+mkdir -pv ${LOG_OUTPUT_DIR}
+
+echo "Decrypting variables"
+python ${AIRFLOW_HOME}/_decrypt_encrypted_variables.py ${GCP_PROJECT_ID} \
+   > ${AIRFLOW_SOURCES}/decrypted_variables.env
+echo "Decrypted variables. Number of variables decrypted: "\
+     "$(wc -l ${AIRFLOW_SOURCES}/decrypted_variables.env)"
+
+echo "Decrypting keys from ${GCP_SERVICE_ACCOUNT_KEY_DIR}"
+pushd ${GCP_SERVICE_ACCOUNT_KEY_DIR}
+for FILE in *.json.enc *.pem.enc;
+do
+  gcloud kms decrypt --plaintext-file $(basename ${FILE} .enc) --ciphertext-file ${FILE} \
+     --location=global --keyring=incubator-airflow --key=service_accounts_crypto_key \
+     --project=${GCP_PROJECT_ID}\
+     && echo Decrypted ${FILE}
+done
+chmod -v og-rw *
+popd
diff --git a/airflow/contrib/cloudbuild/scripts/delete_cloudsql_database.sh b/airflow/contrib/cloudbuild/scripts/delete_cloudsql_database.sh
new file mode 100755
index 0000000000..5b400e778d
--- /dev/null
+++ b/airflow/contrib/cloudbuild/scripts/delete_cloudsql_database.sh
@@ -0,0 +1,41 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+set -x
+
+MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+
+export AIRFLOW_HOME="${AIRFLOW_HOME:=/airflow}"
+export AIRFLOW_SOURCES="${AIRFLOW_SOURCES:=/workspace}"
+export AIRFLOW_OUTPUT="${AIRFLOW_SOURCES}/output"
+export BUILD_ID="${BUILD_ID:=build}"
+export AIRFLOW_BREEZE_CONFIG_DIR="${HOME}/airflow-breeze-config"
+export AIRFLOW_BREEZE_TEST_SUITES=${AIRFLOW_BREEZE_TEST_SUITES:=""}
+
+for AIRFLOW_BREEZE_TEST_SUITE in ${AIRFLOW_BREEZE_TEST_SUITES}; do
+    export AIRFLOW_BREEZE_TEST_SUITE
+    # Re-source variables for test suite
+    set -a
+    source ${AIRFLOW_BREEZE_CONFIG_DIR}/variables.env
+    set +a
+    echo "Deleting CloudSQL database for test suite ${AIRFLOW_BREEZE_TEST_SUITE}"
+    python ${AIRFLOW_SOURCES}/tests/contrib/operators/test_gcp_sql_operator.py --action=delete
+    echo "Deleted CloudSQL database for test suite ${AIRFLOW_BREEZE_TEST_SUITE}"
+done
diff --git a/airflow/contrib/cloudbuild/scripts/encrypt_all_files.sh b/airflow/contrib/cloudbuild/scripts/encrypt_all_files.sh
new file mode 100755
index 0000000000..d9c9b478af
--- /dev/null
+++ b/airflow/contrib/cloudbuild/scripts/encrypt_all_files.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+set -euo pipefail
+
+MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+AIRFLOW_BREEZE_CONFIG_DIR=${AIRFLOW_BREEZE_CONFIG_DIR:=${HOME}/airflow-breeze-config}
+export GCP_SERVICE_ACCOUNT_KEY_DIR=${AIRFLOW_BREEZE_CONFIG_DIR/keys
+
+pushd ${GCP_SERVICE_ACCOUNT_KEY_DIR}
+for FILE in *.json *.pem;
+do
+  gcloud kms encrypt --plaintext-file ${FILE} --ciphertext-file ${FILE}.enc \
+     --location=global --keyring=incubator-airflow --key=service_accounts_crypto_key \
+     --project=${GCP_PROJECT_ID}
+     && echo Encrypted ${FILE}
+done
+popd
diff --git a/airflow/contrib/cloudbuild/scripts/merge_tests.sh b/airflow/contrib/cloudbuild/scripts/merge_tests.sh
new file mode 100755
index 0000000000..439edc7a33
--- /dev/null
+++ b/airflow/contrib/cloudbuild/scripts/merge_tests.sh
@@ -0,0 +1,37 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+set -x
+
+MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+
+export AIRFLOW_HOME="${AIRFLOW_HOME:=/airflow}"
+export AIRFLOW_SOURCES="${AIRFLOW_SOURCES:=/workspace}"
+export AIRFLOW_OUTPUT="${AIRFLOW_SOURCES}/output"
+export BUILD_ID="${BUILD_ID:=build}"
+
+export TEST_OUTPUT_DIR=${AIRFLOW_OUTPUT}/${BUILD_ID}/tests
+
+for PREFIX in $@; do
+    export MERGED_XUNIT_FILE=${TEST_OUTPUT_DIR}/${PREFIX}.xml
+    xunitmerge ${TEST_OUTPUT_DIR}/${PREFIX}-*.xml ${MERGED_XUNIT_FILE}
+
+    junit2html ${MERGED_XUNIT_FILE} ${MERGED_XUNIT_FILE}.html
+done
diff --git a/airflow/contrib/cloudbuild/scripts/prepare_summary_page.sh b/airflow/contrib/cloudbuild/scripts/prepare_summary_page.sh
new file mode 100755
index 0000000000..1d573ea7ff
--- /dev/null
+++ b/airflow/contrib/cloudbuild/scripts/prepare_summary_page.sh
@@ -0,0 +1,103 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+set -x
+
+MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+
+export AIRFLOW_HOME="${AIRFLOW_HOME:=/airflow}"
+export AIRFLOW_SOURCES="${AIRFLOW_SOURCES:=/workspace}"
+export AIRFLOW_OUTPUT="${AIRFLOW_SOURCES}/output"
+export BUILD_ID="${BUILD_ID:=build}"
+export GCP_PROJECT_ID="${GCP_PROJECT_ID:=example-project}"
+export AIRFLOW_BREEZE_GCP_BUILD_BUCKET="${AIRFLOW_BREEZE_GCP_BUILD_BUCKET:=example-bucket}"
+export HTML_OUTPUT_DIR=${AIRFLOW_OUTPUT}/${BUILD_ID}
+export AIRFLOW_BREEZE_TEST_SUITES="${AIRFLOW_BREEZE_TEST_SUITES:=python2}"
+
+export TEST_OUTPUT_DIR=${AIRFLOW_OUTPUT}/${BUILD_ID}/tests
+
+export REPO_NAME=${REPO_NAME:-""}
+export TAG_NAME=${TAG_NAME:-""}
+export BRANCH_NAME=${BRANCH_NAME:-""}
+export COMMIT_SHA=${COMMIT_SHA:-""}
+
+TEST_ENV_SUMMARY=""
+
+FAILED="false"
+
+for AIRFLOW_BREEZE_TEST_SUITE in ${AIRFLOW_BREEZE_TEST_SUITES}
+do
+    ENVIRONMENT_STATUS="<font color=\"green\">Passed</font>"
+    set +e
+    ls ${TEST_OUTPUT_DIR}/${AIRFLOW_BREEZE_TEST_SUITE}-*-failure.txt
+    FAILURE_FILES_EXIST=$?
+    set -e
+    if [[ "${FAILURE_FILES_EXIST}" == "0" ]]; then
+        ENVIRONMENT_STATUS="<font color=\"red\">Failed!</font>"
+        FAILED="true"
+    fi
+    TEST_ENV_SUMMARY="""${TEST_ENV_SUMMARY}
+                       <li><a href=\"https://storage.googleapis.com/${AIRFLOW_BREEZE_GCP_BUILD_BUCKET}/${BUILD_ID}/tests/${AIRFLOW_BREEZE_TEST_SUITE}.xml.html\">
+ ${AIRFLOW_BREEZE_TEST_SUITE} test results</a>:  ${ENVIRONMENT_STATUS}</li>
+"""
+done
+
+STATUS="Overall status: <font color=\"green\">Passed!</font>"
+
+if [[ ${FAILED} == "true" ]]; then
+    STATUS="Overall status: <font color=\"red\">Failed!</font>"
+fi
+
+export HTML_CONTENT="""
+<!doctype html>
+<html lang="en">
+<head>
+  <meta charset="utf-8">
+
+  <title>Summary page for the ${REPO_NAME} ${BRANCH_NAME} build ${BUILD_ID} in project ${GCP_PROJECT_ID}</title>
+  <meta name=\"description\" content=\"Summary page for the ${REPO_NAME} ${BRANCH_NAME} build ${BUILD_ID}\">
+
+</head>
+
+    <body>
+
+          <h1>Summary page of the build for ${REPO_NAME} ${BRANCH_NAME}</h1>
+          <h2>Build info:</h2>
+          <ul>
+              <li>GCP project id: ${GCP_PROJECT_ID}</li>
+              <li>Build id: ${BUILD_ID}</li>
+              <li>Status: ${STATUS}</li>
+              <li>Branch name: ${BRANCH_NAME}</li>
+              <li>Tag name: ${TAG_NAME}</li>
+              <li>Commit SHA: ${COMMIT_SHA}</li>
+          </ul>
+          <h2>Links:</h2>
+          <ul>
+            <li><a href=\"https://console.cloud.google.com/cloud-build/builds/${BUILD_ID}?project=${GCP_PROJECT_ID}\">Google Cloud Build</a></li>
+            ${TEST_ENV_SUMMARY}
+            <li><a href=\"https://console.cloud.google.com/storage/browser/${AIRFLOW_BREEZE_GCP_BUILD_BUCKET}/${BUILD_ID}/logs/?project=${GCP_PROJECT_ID}\">Airflow logs in GCS bucket</a></li>
+            <li><a href=\"https://console.cloud.google.com/logs/viewer?authuser=0&project=${GCP_PROJECT_ID}&minLogLevel=0&expandAll=false&resource=build%2Fbuild_id%2F${BUILD_ID}\">Stackdriver logs</a></li>
+            <li><a href=\"https://storage.googleapis.com/${AIRFLOW_BREEZE_GCP_BUILD_BUCKET}/${BUILD_ID}/docs/index.html\">Generated documentation</a></li>
+          </ul>
+    </body>
+</html>
+"""
+
+echo "${HTML_CONTENT}" > ${HTML_OUTPUT_DIR}/index.html
diff --git a/airflow/contrib/cloudbuild/scripts/run_ci_tests.sh b/airflow/contrib/cloudbuild/scripts/run_ci_tests.sh
new file mode 100755
index 0000000000..bad11e0813
--- /dev/null
+++ b/airflow/contrib/cloudbuild/scripts/run_ci_tests.sh
@@ -0,0 +1,120 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+set -x
+
+MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+
+# environment
+export AIRFLOW_HOME="${AIRFLOW_HOME:=/airflow}"
+export AIRFLOW_SOURCES="${AIRFLOW_SOURCES:=/workspace}"
+export AIRFLOW_OUTPUT="${AIRFLOW_SOURCES}/output"
+export AIRFLOW_BREEZE_TEST_SUITE="${AIRFLOW_BREEZE_TEST_SUITE:=none}"
+export BUILD_ID="${BUILD_ID:=build}"
+
+# Force unit tests mode
+export AIRFLOW__CORE__UNIT_TEST_MODE=True
+
+export TEST_OUTPUT_DIR=${AIRFLOW_OUTPUT}/${BUILD_ID}/tests
+export LOG_OUTPUT_DIR=/logs/
+
+mkdir -pv ${AIRFLOW_HOME}/logs
+rm -rvf ${AIRFLOW_HOME}/logs/*
+mkdir -pv ${LOG_OUTPUT_DIR}
+rm -rvf ${LOG_OUTPUT_DIR}/*
+
+# add test/contrib to PYTHONPATH
+export PYTHONPATH=${PYTHONPATH:=""}:${AIRFLOW_SOURCES}/tests/test_utils
+
+# Generate the `airflow` executable if needed
+which airflow > /dev/null || python setup.py develop
+
+echo "Initializing the DB"
+yes | airflow initdb || true
+yes | airflow resetdb || true
+
+rm -rfv ${TEST_OUTPUT_DIR}/${AIRFLOW_BREEZE_TEST_SUITE}-*.xml
+
+pushd ${AIRFLOW_SOURCES}
+
+FAILED="false"
+
+export TEST_SUITE_FAILURE_FILE=${TEST_OUTPUT_DIR}/${AIRFLOW_BREEZE_TEST_SUITE}-failure.txt
+rm -f ${TEST_SUITE_FAILURE_FILE}
+
+for MODULE_TO_TEST in ${AIRFLOW_BREEZE_CI_TEST_MODULES:=""}; do
+    if [[ ${MODULE_TO_TEST} == "" ]]; then
+       continue
+    fi
+    echo "Running tests for '" ${MODULE_TO_TEST} "'"
+    export XUNIT_FILE=${TEST_OUTPUT_DIR}/${AIRFLOW_BREEZE_TEST_SUITE}-${MODULE_TO_TEST}.xml
+    export TEST_FAILURE_FILE=${TEST_OUTPUT_DIR}/${AIRFLOW_BREEZE_TEST_SUITE}-${MODULE_TO_TEST}-failure.txt
+
+    mkdir -pv $(dirname ${XUNIT_FILE})
+    rm -fv ${XUNIT_FILE} ${XUNIT_FILE}.html
+
+    mkdir -pv $(dirname ${TEST_FAILURE_FILE})
+    rm -fv ${TEST_FAILURE_FILE}
+
+    NOSE_ARGS="${MODULE_TO_TEST}"
+
+    # Add coverage if all tests are run
+    if [[ "${MODULE_TO_TEST}" == "." ]]; then
+        NOSE_ARGS="--with-coverage \
+        --cover-erase \
+        --cover-html \
+        --cover-package=airflow \
+        --cover-html-dir=${AIRFLOW_SOURCES}/airflow/www/static/coverage"
+    fi
+
+    # Add common parameters to nose
+    NOSE_ARGS="${NOSE_ARGS} \
+    --with-xunit \
+    --xunit-file=${XUNIT_FILE} \
+    --xunit-testsuite-name=${AIRFLOW_BREEZE_TEST_SUITE} \
+    --with-ignore-docstrings \
+    --rednose \
+    --with-timer \
+    -v \
+    --logging-level=DEBUG "
+
+    echo "Starting the unit tests with the following nose arguments: "${NOSE_ARGS}
+
+
+    # We do not fail if the tests fail as we want to do post-processing and
+    # send results anyway, but we mark the test as failed
+    set +e
+
+    nosetests ${NOSE_ARGS}
+
+    if [[ $? != 0 ]]; then
+        touch ${TEST_FAILURE_FILE}
+        FAILED="true"
+    fi
+    set -e
+done
+
+if [[ "${FAILED}" == "true" ]]; then
+    touch ${TEST_SUITE_FAILURE_FILE}
+fi
+
+cp -rv ${AIRFLOW_HOME}/logs/* ${LOG_OUTPUT_DIR}
+
+popd
diff --git a/airflow/contrib/cloudbuild/scripts/verify_tests.sh b/airflow/contrib/cloudbuild/scripts/verify_tests.sh
new file mode 100755
index 0000000000..0cfc31b267
--- /dev/null
+++ b/airflow/contrib/cloudbuild/scripts/verify_tests.sh
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+set -x
+
+MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+
+export AIRFLOW_HOME="${AIRFLOW_HOME:=/airflow}"
+export AIRFLOW_SOURCES="${AIRFLOW_SOURCES:=/workspace}"
+export AIRFLOW_OUTPUT="${AIRFLOW_SOURCES}/output"
+export BUILD_ID="${BUILD_ID:=build}"
+
+export TEST_OUTPUT_DIR=${AIRFLOW_OUTPUT}/${BUILD_ID}/tests
+
+set +e
+FAILED_STEPS="$(ls ${TEST_OUTPUT_DIR}/*-failure.txt 2>/dev/null)"
+set -e
+
+if [[ ${FAILED_STEPS} == "" ]]; then
+   echo
+   echo "All test steps succeeded !!!!"
+   echo
+   exit 0
+else
+   echo
+   echo "Some test steps failed!: "
+   for STEP in ${FAILED_STEPS}
+   do
+        echo $(basename ${STEP} -failure.txt)
+   done
+   echo
+   exit 1
+fi
diff --git a/airflow/contrib/example_dags/example_gcp_compute.py b/airflow/contrib/example_dags/example_gcp_compute.py
index 928e9744b6..11b510a025 100644
--- a/airflow/contrib/example_dags/example_gcp_compute.py
+++ b/airflow/contrib/example_dags/example_gcp_compute.py
@@ -23,10 +23,10 @@
 
 This DAG relies on the following OS environment variables
 
-* PROJECT_ID - Google Cloud Platform project where the Compute Engine instance exists.
-* ZONE - Google Cloud Platform zone where the instance exists.
-* INSTANCE - Name of the Compute Engine instance.
-* SHORT_MACHINE_TYPE_NAME - Machine type resource name to set, e.g. 'n1-standard-1'.
+* GCP_PROJECT_ID - Google Cloud Platform project where the Compute Engine instance exists.
+* GCE_ZONE - Google Cloud Platform zone where the instance exists.
+* GCE_INSTANCE - Name of the Compute Engine instance.
+* GCE_SHORT_MACHINE_TYPE_NAME - Machine type resource name to set, e.g. 'n1-standard-1'.
     See https://cloud.google.com/compute/docs/machine-types
 """
 import os
@@ -38,19 +38,19 @@
     GceInstanceStopOperator, GceSetMachineTypeOperator
 
 # [START howto_operator_gce_args_common]
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-ZONE = os.environ.get('ZONE', 'europe-west1-b')
-INSTANCE = os.environ.get('INSTANCE', 'testinstance')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
+GCE_INSTANCE = os.environ.get('GCE_INSTANCE', 'testinstance')
 # [END howto_operator_gce_args_common]
 
 default_args = {
-    'start_date': airflow.utils.dates.days_ago(1)
+    'start_date': airflow.utils.dates.days_ago(1),
 }
 
 # [START howto_operator_gce_args_set_machine_type]
-SHORT_MACHINE_TYPE_NAME = os.environ.get('SHORT_MACHINE_TYPE_NAME', 'n1-standard-1')
+GCE_SHORT_MACHINE_TYPE_NAME = os.environ.get('GCE_SHORT_MACHINE_TYPE_NAME', 'n1-standard-1')
 SET_MACHINE_TYPE_BODY = {
-    'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, SHORT_MACHINE_TYPE_NAME)
+    'machineType': 'zones/{}/machineTypes/{}'.format(GCE_ZONE, GCE_SHORT_MACHINE_TYPE_NAME)
 }
 # [END howto_operator_gce_args_set_machine_type]
 
@@ -58,52 +58,53 @@
 with models.DAG(
     'example_gcp_compute',
     default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
+    schedule_interval=datetime.timedelta(days=1),
+    catchup=False
 ) as dag:
     # [START howto_operator_gce_start]
     gce_instance_start = GceInstanceStartOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         task_id='gcp_compute_start_task'
     )
     # [END howto_operator_gce_start]
     # Duplicate start for idempotence testing
     gce_instance_start2 = GceInstanceStartOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         task_id='gcp_compute_start_task2'
     )
     # [START howto_operator_gce_stop]
     gce_instance_stop = GceInstanceStopOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         task_id='gcp_compute_stop_task'
     )
     # [END howto_operator_gce_stop]
     # Duplicate stop for idempotence testing
     gce_instance_stop2 = GceInstanceStopOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         task_id='gcp_compute_stop_task2'
     )
     # [START howto_operator_gce_set_machine_type]
     gce_set_machine_type = GceSetMachineTypeOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         body=SET_MACHINE_TYPE_BODY,
         task_id='gcp_compute_set_machine_type'
     )
     # [END howto_operator_gce_set_machine_type]
     # Duplicate set machine type for idempotence testing
     gce_set_machine_type2 = GceSetMachineTypeOperator(
-        project_id=PROJECT_ID,
-        zone=ZONE,
-        resource_id=INSTANCE,
+        project_id=GCP_PROJECT_ID,
+        zone=GCE_ZONE,
+        resource_id=GCE_INSTANCE,
         body=SET_MACHINE_TYPE_BODY,
         task_id='gcp_compute_set_machine_type2'
     )
diff --git a/airflow/contrib/example_dags/example_gcp_compute_igm.py b/airflow/contrib/example_dags/example_gcp_compute_igm.py
index 3e4543c60d..082a4b0be8 100644
--- a/airflow/contrib/example_dags/example_gcp_compute_igm.py
+++ b/airflow/contrib/example_dags/example_gcp_compute_igm.py
@@ -24,17 +24,17 @@
 
 This DAG relies on the following OS environment variables
 
-* PROJECT_ID - the Google Cloud Platform project where the Compute Engine instance exists
-* ZONE - the zone where the Compute Engine instance exists
+* GCP_PROJECT_ID - the Google Cloud Platform project where the Compute Engine instance exists
+* GCE_ZONE - the zone where the Compute Engine instance exists
 
 Variables for copy template operator:
-* TEMPLATE_NAME - name of the template to copy
-* NEW_TEMPLATE_NAME - name of the new template
-* NEW_DESCRIPTION - description added to the template
+* GCE_TEMPLATE_NAME - name of the template to copy
+* GCE_NEW_TEMPLATE_NAME - name of the new template
+* GCE_NEW_DESCRIPTION - description added to the template
 
 Variables for update template in Group Manager:
 
-* INSTANCE_GROUP_MANAGER_NAME - name of the Instance Group Manager
+* GCE_INSTANCE_GROUP_MANAGER_NAME - name of the Instance Group Manager
 * SOURCE_TEMPLATE_URL - url of the template to replace in the Instance Group Manager
 * DESTINATION_TEMPLATE_URL - url of the new template to set in the Instance Group Manager
 """
@@ -48,8 +48,8 @@
     GceInstanceTemplateCopyOperator, GceInstanceGroupManagerUpdateTemplateOperator
 
 # [START howto_operator_compute_igm_common_args]
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-ZONE = os.environ.get('ZONE', 'europe-west1-b')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
 # [END howto_operator_compute_igm_common_args]
 
 default_args = {
@@ -57,13 +57,13 @@
 }
 
 # [START howto_operator_compute_template_copy_args]
-TEMPLATE_NAME = os.environ.get('TEMPLATE_NAME', 'instance-template-test')
-NEW_TEMPLATE_NAME = os.environ.get('NEW_TEMPLATE_NAME',
+GCE_TEMPLATE_NAME = os.environ.get('GCE_TEMPLATE_NAME', 'instance-template-test')
+GCE_NEW_TEMPLATE_NAME = os.environ.get('GCE_NEW_TEMPLATE_NAME',
                                    'instance-template-test-new')
-NEW_DESCRIPTION = os.environ.get('NEW_DESCRIPTION', 'Test new description')
+GCE_NEW_DESCRIPTION = os.environ.get('GCE_NEW_DESCRIPTION', 'Test new description')
 GCE_INSTANCE_TEMPLATE_BODY_UPDATE = {
-    "name": NEW_TEMPLATE_NAME,
-    "description": NEW_DESCRIPTION,
+    "name": GCE_NEW_TEMPLATE_NAME,
+    "description": GCE_NEW_DESCRIPTION,
     "properties": {
         "machineType": "n1-standard-2"
     }
@@ -71,18 +71,18 @@
 # [END howto_operator_compute_template_copy_args]
 
 # [START howto_operator_compute_igm_update_template_args]
-INSTANCE_GROUP_MANAGER_NAME = os.environ.get('INSTANCE_GROUP_MANAGER_NAME',
+GCE_INSTANCE_GROUP_MANAGER_NAME = os.environ.get('GCE_INSTANCE_GROUP_MANAGER_NAME',
                                              'instance-group-test')
 
 SOURCE_TEMPLATE_URL = os.environ.get(
     'SOURCE_TEMPLATE_URL',
-    "https://www.googleapis.com/compute/beta/projects/"
-    "example-project/global/instanceTemplates/instance-template-test")
+    "https://www.googleapis.com/compute/beta/projects/" + GCP_PROJECT_ID +
+    "/global/instanceTemplates/instance-template-test")
 
 DESTINATION_TEMPLATE_URL = os.environ.get(
     'DESTINATION_TEMPLATE_URL',
-    "https://www.googleapis.com/compute/beta/projects/"
-    "example-airflow/global/instanceTemplates/" + NEW_TEMPLATE_NAME)
+    "https://www.googleapis.com/compute/beta/projects/" + GCP_PROJECT_ID +
+    "/global/instanceTemplates/" + GCE_NEW_TEMPLATE_NAME)
 
 UPDATE_POLICY = {
     "type": "OPPORTUNISTIC",
@@ -99,29 +99,30 @@
 with models.DAG(
     'example_gcp_compute_igm',
     default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
+    schedule_interval=datetime.timedelta(days=1),
+    catchup=False
 ) as dag:
     # [START howto_operator_gce_igm_copy_template]
     gce_instance_template_copy = GceInstanceTemplateCopyOperator(
-        project_id=PROJECT_ID,
-        resource_id=TEMPLATE_NAME,
+        project_id=GCP_PROJECT_ID,
+        resource_id=GCE_TEMPLATE_NAME,
         body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
         task_id='gcp_compute_igm_copy_template_task'
     )
     # [END howto_operator_gce_igm_copy_template]
     # Added to check for idempotence
     gce_instance_template_copy2 = GceInstanceTemplateCopyOperator(
-        project_id=PROJECT_ID,
-        resource_id=TEMPLATE_NAME,
+        project_id=GCP_PROJECT_ID,
+        resource_id=GCE_TEMPLATE_NAME,
         body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
         task_id='gcp_compute_igm_copy_template_task_2'
     )
     # [START howto_operator_gce_igm_update_template]
     gce_instance_group_manager_update_template = \
         GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            resource_id=INSTANCE_GROUP_MANAGER_NAME,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            zone=GCE_ZONE,
             source_template=SOURCE_TEMPLATE_URL,
             destination_template=DESTINATION_TEMPLATE_URL,
             update_policy=UPDATE_POLICY,
@@ -131,9 +132,9 @@
     # Added to check for idempotence (and without UPDATE_POLICY)
     gce_instance_group_manager_update_template2 = \
         GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            resource_id=INSTANCE_GROUP_MANAGER_NAME,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            zone=GCE_ZONE,
             source_template=SOURCE_TEMPLATE_URL,
             destination_template=DESTINATION_TEMPLATE_URL,
             task_id='gcp_compute_igm_group_manager_update_template_2'
diff --git a/airflow/contrib/example_dags/example_gcp_function_delete.py b/airflow/contrib/example_dags/example_gcp_function_delete.py
index 642e3a744c..e6d7f0b912 100644
--- a/airflow/contrib/example_dags/example_gcp_function_delete.py
+++ b/airflow/contrib/example_dags/example_gcp_function_delete.py
@@ -20,9 +20,9 @@
 """
 Example Airflow DAG that deletes a Google Cloud Function.
 This DAG relies on the following OS environment variables
-* PROJECT_ID - Google Cloud Project where the Cloud Function exists.
-* LOCATION - Google Cloud Functions region where the function exists.
-* ENTRYPOINT - Name of the executable function in the source code.
+* GCP_PROJECT_ID - Google Cloud Project where the Cloud Function exists.
+* GCP_LOCATION - Google Cloud Functions region where the function exists.
+* GCF_ENTRYPOINT - Name of the executable function in the source code.
 """
 
 import os
@@ -33,13 +33,16 @@
 from airflow.contrib.operators.gcp_function_operator import GcfFunctionDeleteOperator
 
 # [START howto_operator_gcf_delete_args]
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-LOCATION = os.environ.get('LOCATION', 'europe-west1')
-ENTRYPOINT = os.environ.get('ENTRYPOINT', 'helloWorld')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
+GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld')
 # A fully-qualified name of the function to delete
 
-FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
-                                                               ENTRYPOINT)
+GCF_SHORT_FUNCTION_NAME = os.environ.get('GCF_SHORT_FUNCTION_NAME', 'hello')
+FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
+                                                               GCP_LOCATION,
+                                                               GCF_SHORT_FUNCTION_NAME)
+
 # [END howto_operator_gcf_delete_args]
 
 default_args = {
@@ -49,7 +52,8 @@
 with models.DAG(
     'example_gcp_function_delete',
     default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
+    schedule_interval=datetime.timedelta(days=1),
+    catchup=False
 ) as dag:
     # [START howto_operator_gcf_delete]
     t1 = GcfFunctionDeleteOperator(
diff --git a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
index 76563d7596..74f8ade0d1 100644
--- a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
+++ b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
@@ -22,19 +22,19 @@
 
 This DAG relies on the following OS environment variables
 https://airflow.apache.org/concepts.html#variables
-* PROJECT_ID - Google Cloud Project to use for the Cloud Function.
-* LOCATION - Google Cloud Functions region where the function should be
+* GCP_PROJECT_ID - Google Cloud Project to use for the Cloud Function.
+* GCP_LOCATION - Google Cloud Functions region where the function should be
   created.
-* SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage
+* GCF_SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage
 or
-    * SOURCE_UPLOAD_URL - Generated upload URL for the zipped source
+    * GCF_SOURCE_UPLOAD_URL - Generated upload URL for the zipped source
     or
-    * ZIP_PATH - Local path to the zipped source archive
+    * GCF_ZIP_PATH - Local path to the zipped source archive
 or
 * SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function is
 defined in a supported Cloud Source Repository URL format
 https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository
-* ENTRYPOINT - Name of the executable function in the source code.
+* GCF_ENTRYPOINT - Name of the executable function in the source code.
 """
 
 import os
@@ -46,28 +46,30 @@
 from airflow.utils import dates
 
 # [START howto_operator_gcf_deploy_variables]
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-LOCATION = os.environ.get('LOCATION', 'europe-west1')
-SOURCE_ARCHIVE_URL = os.environ.get('SOURCE_ARCHIVE_URL', '')
-SOURCE_UPLOAD_URL = os.environ.get('SOURCE_UPLOAD_URL', '')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
+GCF_SOURCE_ARCHIVE_URL = os.environ.get('GCF_SOURCE_ARCHIVE_URL', '')
+GCF_SOURCE_UPLOAD_URL = os.environ.get('GCF_SOURCE_UPLOAD_URL', '')
 SOURCE_REPOSITORY = os.environ.get(
     'SOURCE_REPOSITORY',
     'https://source.developers.google.com/'
-    'projects/example-project/repos/hello-world/moveable-aliases/master')
-ZIP_PATH = os.environ.get('ZIP_PATH', '')
-ENTRYPOINT = os.environ.get('ENTRYPOINT', 'helloWorld')
-FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
-                                                               ENTRYPOINT)
-RUNTIME = 'nodejs6'
-VALIDATE_BODY = os.environ.get('VALIDATE_BODY', True)
+    'projects/{}/repos/hello-world/moveable-aliases/master'.format(GCP_PROJECT_ID))
+GCF_ZIP_PATH = os.environ.get('GCF_ZIP_PATH', '')
+GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld')
+GCF_SHORT_FUNCTION_NAME = os.environ.get('GCF_SHORT_FUNCTION_NAME', 'hello')
+FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
+                                                               GCP_LOCATION,
+                                                               GCF_SHORT_FUNCTION_NAME)
+GCF_RUNTIME = 'nodejs6'
+GCP_VALIDATE_BODY = os.environ.get('GCP_VALIDATE_BODY', True)
 
 # [END howto_operator_gcf_deploy_variables]
 
 # [START howto_operator_gcf_deploy_body]
 body = {
     "name": FUNCTION_NAME,
-    "entryPoint": ENTRYPOINT,
-    "runtime": RUNTIME,
+    "entryPoint": GCF_ENTRYPOINT,
+    "runtime": GCF_RUNTIME,
     "httpsTrigger": {}
 }
 # [END howto_operator_gcf_deploy_body]
@@ -79,17 +81,17 @@
 # [END howto_operator_gcf_default_args]
 
 # [START howto_operator_gcf_deploy_variants]
-if SOURCE_ARCHIVE_URL:
-    body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
+if GCF_SOURCE_ARCHIVE_URL:
+    body['sourceArchiveUrl'] = GCF_SOURCE_ARCHIVE_URL
 elif SOURCE_REPOSITORY:
     body['sourceRepository'] = {
         'url': SOURCE_REPOSITORY
     }
-elif ZIP_PATH:
+elif GCF_ZIP_PATH:
     body['sourceUploadUrl'] = ''
-    default_args['zip_path'] = ZIP_PATH
-elif SOURCE_UPLOAD_URL:
-    body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
+    default_args['zip_path'] = GCF_ZIP_PATH
+elif GCF_SOURCE_UPLOAD_URL:
+    body['sourceUploadUrl'] = GCF_SOURCE_UPLOAD_URL
 else:
     raise Exception("Please provide one of the source_code parameters")
 # [END howto_operator_gcf_deploy_variants]
@@ -98,16 +100,17 @@
 with models.DAG(
     'example_gcp_function_deploy_delete',
     default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
+    schedule_interval=datetime.timedelta(days=1),
+    catchup=False
 ) as dag:
     # [START howto_operator_gcf_deploy]
     deploy_task = GcfFunctionDeployOperator(
         task_id="gcf_deploy_task",
         name=FUNCTION_NAME,
-        project_id=PROJECT_ID,
-        location=LOCATION,
+        project_id=GCP_PROJECT_ID,
+        location=GCP_LOCATION,
         body=body,
-        validate_body=VALIDATE_BODY
+        validate_body=GCP_VALIDATE_BODY
     )
     # [END howto_operator_gcf_deploy]
     delete_task = GcfFunctionDeleteOperator(
diff --git a/airflow/contrib/example_dags/example_gcp_sql.py b/airflow/contrib/example_dags/example_gcp_sql.py
index c6838a2baf..5b6d81a1aa 100644
--- a/airflow/contrib/example_dags/example_gcp_sql.py
+++ b/airflow/contrib/example_dags/example_gcp_sql.py
@@ -23,7 +23,7 @@
 
 This DAG relies on the following OS environment variables
 https://airflow.apache.org/concepts.html#variables
-* PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance.
+* GCP_PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance.
 * INSTANCE_NAME - Name of the Cloud SQL instance.
 * DB_NAME - Name of the database inside a Cloud SQL instance.
 """
@@ -44,7 +44,7 @@
     GoogleCloudStorageObjectCreateAclEntryOperator
 
 # [START howto_operator_cloudsql_arguments]
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
 INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'test-mysql')
 INSTANCE_NAME2 = os.environ.get('INSTANCE_NAME2', 'test-mysql2')
 DB_NAME = os.environ.get('DB_NAME', 'testdb')
@@ -145,7 +145,7 @@
 db_create_body = {
     "instance": INSTANCE_NAME,
     "name": DB_NAME,
-    "project": PROJECT_ID
+    "project": GCP_PROJECT_ID
 }
 # [END howto_operator_cloudsql_db_create_body]
 # [START howto_operator_cloudsql_db_patch_body]
@@ -176,21 +176,21 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_create]
     sql_instance_create = CloudSqlInstanceCreateOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=body,
         instance=INSTANCE_NAME,
-        task_id='sql_instance_create'
+        task_id='sql_instance_create_task'
     )
     # [END howto_operator_cloudsql_create]
     prev_task = sql_instance_create
 
-    sql_instance_create_2 = CloudSqlInstanceCreateOperator(
-        project_id=PROJECT_ID,
+    sql_instance_create_2_task = CloudSqlInstanceCreateOperator(
+        project_id=GCP_PROJECT_ID,
         body=body2,
         instance=INSTANCE_NAME2,
         task_id='sql_instance_create_2'
     )
-    prev_task = next_dep(sql_instance_create_2, prev_task)
+    prev_task = next_dep(sql_instance_create_2_task, prev_task)
 
     # ############################################## #
     # ### MODIFYING INSTANCE AND ITS DATABASE ###### #
@@ -198,7 +198,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_patch]
     sql_instance_patch_task = CloudSqlInstancePatchOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=patch_body,
         instance=INSTANCE_NAME,
         task_id='sql_instance_patch_task'
@@ -208,7 +208,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_db_create]
     sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=db_create_body,
         instance=INSTANCE_NAME,
         task_id='sql_db_create_task'
@@ -218,7 +218,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_db_patch]
     sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=db_patch_body,
         instance=INSTANCE_NAME,
         database=DB_NAME,
@@ -245,7 +245,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_export]
     sql_export_task = CloudSqlInstanceExportOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=export_body,
         instance=INSTANCE_NAME,
         task_id='sql_export_task'
@@ -272,7 +272,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_import]
     sql_import_task = CloudSqlInstanceImportOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         body=import_body,
         instance=INSTANCE_NAME2,
         task_id='sql_import_task'
@@ -286,7 +286,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_db_delete]
     sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         instance=INSTANCE_NAME,
         database=DB_NAME,
         task_id='sql_db_delete_task'
@@ -300,7 +300,7 @@ def next_dep(task, prev):
 
     # [START howto_operator_cloudsql_delete]
     sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         instance=INSTANCE_NAME,
         task_id='sql_instance_delete_task'
     )
@@ -308,7 +308,7 @@ def next_dep(task, prev):
     prev_task = next_dep(sql_instance_delete_task, prev_task)
 
     sql_instance_delete_task_2 = CloudSqlInstanceDeleteOperator(
-        project_id=PROJECT_ID,
+        project_id=GCP_PROJECT_ID,
         instance=INSTANCE_NAME2,
         task_id='sql_instance_delete_task_2'
     )
diff --git a/airflow/contrib/example_dags/example_gcp_sql_query.py b/airflow/contrib/example_dags/example_gcp_sql_query.py
index 5439fb6afc..aa7f5626c3 100644
--- a/airflow/contrib/example_dags/example_gcp_sql_query.py
+++ b/airflow/contrib/example_dags/example_gcp_sql_query.py
@@ -22,26 +22,25 @@
 
 This DAG relies on the following OS environment variables
 
-* PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance
-* LOCATION - Google Cloud location where the database is created
+* GCP_PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance
+* GCP_REGION - Google Cloud region where the database is created
 *
-* POSTGRES_INSTANCE_NAME - Name of the postgres Cloud SQL instance
-* POSTGRES_USER - Name of the postgres database user
-* POSTGRES_PASSWORD - Password of the postgres database user
-* POSTGRES_PROXY_PORT - Local port number for proxy connections for postgres
-* POSTGRES_PUBLIC_IP - Public IP of the Postgres database
-* POSTGRES_PUBLIC_PORT - Port of the postgres database
+* GCSQL_POSTGRES_INSTANCE_NAME - Name of the postgres Cloud SQL instance
+* GCSQL_POSTGRES_USER - Name of the postgres database user
+* GCSQL_POSTGRES_PASSWORD - Password of the postgres database user
+* GCSQL_POSTGRES_PUBLIC_IP - Public IP of the Postgres database
+* GCSQL_POSTGRES_PUBLIC_PORT - Port of the postgres database
 *
-* MYSQL_INSTANCE_NAME - Name of the postgres Cloud SQL instance
-* MYSQL_USER - Name of the mysql database user
-* MYSQL_PASSWORD - Password of the mysql database user
-* MYSQL_PROXY_PORT - Local port number for proxy connections for mysql
-* MYSQL_PUBLIC_IP - Public IP of the mysql database
-* MYSQL_PUBLIC_PORT - Port of the mysql database
+* GCSQL_MYSQL_INSTANCE_NAME - Name of the postgres Cloud SQL instance
+* GCSQL_MYSQL_USER - Name of the mysql database user
+* GCSQL_MYSQL_PASSWORD - Password of the mysql database user
+* GCSQL_MYSQL_PUBLIC_IP - Public IP of the mysql database
+* GCSQL_MYSQL_PUBLIC_PORT - Port of the mysql database
 """
 
 import os
 import subprocess
+from os.path import expanduser
 
 from six.moves.urllib.parse import quote_plus
 
@@ -51,34 +50,36 @@
 
 # [START howto_operator_cloudsql_query_arguments]
 
-PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-LOCATION = os.environ.get('REGION', 'europe-west-1')
-
-POSTGRES_INSTANCE_NAME = os.environ.get('POSTGRES_INSTANCE_NAME', 'testpostgres')
-POSTGRES_DATABASE_NAME = os.environ.get('POSTGRES_DATABASE_NAME', 'postgresdb')
-POSTGRES_USER = os.environ.get('POSTGRES_USER', 'postgres_user')
-POSTGRES_PASSWORD = os.environ.get('POSTGRES_PASSWORD', 'password')
-POSTGRES_PUBLIC_IP = os.environ.get('POSTGRES_PUBLIC_IP', '0.0.0.0')
-POSTGRES_PUBLIC_PORT = os.environ.get('POSTGRES_PUBLIC_PORT', 5432)
-POSTGRES_CLIENT_CERT_FILE = os.environ.get('POSTGRES_CLIENT_CERT_FILE',
-                                           "/tmp/client-cert.pem")
-POSTGRES_CLIENT_KEY_FILE = os.environ.get('POSTGRES_CLIENT_KEY_FILE',
-                                          "/tmp/client-key.pem")
-POSTGRES_SERVER_CA_FILE = os.environ.get('POSTGRES_SERVER_CA_FILE',
-                                         "/tmp/server-ca.pem")
-
-MYSQL_INSTANCE_NAME = os.environ.get('MYSQL_INSTANCE_NAME', 'testmysql')
-MYSQL_DATABASE_NAME = os.environ.get('MYSQL_DATABASE_NAME', 'mysqldb')
-MYSQL_USER = os.environ.get('MYSQL_USER', 'mysql_user')
-MYSQL_PASSWORD = os.environ.get('MYSQL_PASSWORD', 'password')
-MYSQL_PUBLIC_IP = os.environ.get('MYSQL_PUBLIC_IP', '0.0.0.0')
-MYSQL_PUBLIC_PORT = os.environ.get('MYSQL_PUBLIC_PORT', 3306)
-MYSQL_CLIENT_CERT_FILE = os.environ.get('MYSQL_CLIENT_CERT_FILE',
-                                        "/tmp/client-cert.pem")
-MYSQL_CLIENT_KEY_FILE = os.environ.get('MYSQL_CLIENT_KEY_FILE',
-                                       "/tmp/client-key.pem")
-MYSQL_SERVER_CA_FILE = os.environ.get('MYSQL_SERVER_CA_FILE',
-                                      "/tmp/server-ca.pem")
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_REGION = os.environ.get('GCP_REGION', 'europe-west-1b')
+
+GCSQL_POSTGRES_INSTANCE_NAME = os.environ.get('GCSQL_POSTGRES_INSTANCE_NAME',
+                                              'testpostgres')
+GCSQL_POSTGRES_DATABASE_NAME = os.environ.get('GCSQL_POSTGRES_DATABASE_NAME',
+                                              'postgresdb')
+GCSQL_POSTGRES_USER = os.environ.get('GCSQL_POSTGRES_USER', 'postgres_user')
+GCSQL_POSTGRES_PASSWORD = os.environ.get('GCSQL_POSTGRES_PASSWORD', 'password')
+GCSQL_POSTGRES_PUBLIC_IP = os.environ.get('GCSQL_POSTGRES_PUBLIC_IP', '0.0.0.0')
+GCSQL_POSTGRES_PUBLIC_PORT = os.environ.get('GCSQL_POSTGRES_PUBLIC_PORT', 5432)
+GCSQL_POSTGRES_CLIENT_CERT_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_CERT_FILE',
+                                                 ".key/postgres-client-cert.pem")
+GCSQL_POSTGRES_CLIENT_KEY_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_KEY_FILE',
+                                                ".key/postgres-client-key.pem")
+GCSQL_POSTGRES_SERVER_CA_FILE = os.environ.get('GCSQL_POSTGRES_SERVER_CA_FILE',
+                                               ".key/postgres-server-ca.pem")
+
+GCSQL_MYSQL_INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'testmysql')
+GCSQL_MYSQL_DATABASE_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'mysqldb')
+GCSQL_MYSQL_USER = os.environ.get('GCSQL_MYSQL_USER', 'mysql_user')
+GCSQL_MYSQL_PASSWORD = os.environ.get('GCSQL_MYSQL_PASSWORD', 'password')
+GCSQL_MYSQL_PUBLIC_IP = os.environ.get('GCSQL_MYSQL_PUBLIC_IP', '0.0.0.0')
+GCSQL_MYSQL_PUBLIC_PORT = os.environ.get('GCSQL_MYSQL_PUBLIC_PORT', 3306)
+GCSQL_MYSQL_CLIENT_CERT_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_CERT_FILE',
+                                              ".key/mysql-client-cert.pem")
+GCSQL_MYSQL_CLIENT_KEY_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_KEY_FILE',
+                                             ".key/mysql-client-key.pem")
+GCSQL_MYSQL_SERVER_CA_FILE = os.environ.get('GCSQL_MYSQL_SERVER_CA_FILE',
+                                            ".key/mysql-server-ca.pem")
 
 SQL = [
     'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
@@ -97,18 +98,28 @@
 
 # [START howto_operator_cloudsql_query_connections]
 
+HOME_DIR = expanduser("~")
+
+
+def get_absolute_path(path):
+    if path.startswith("/"):
+        return path
+    else:
+        return os.path.join(HOME_DIR, path)
+
+
 postgres_kwargs = dict(
-    user=quote_plus(POSTGRES_USER),
-    password=quote_plus(POSTGRES_PASSWORD),
-    public_port=POSTGRES_PUBLIC_PORT,
-    public_ip=quote_plus(POSTGRES_PUBLIC_IP),
-    project_id=quote_plus(PROJECT_ID),
-    location=quote_plus(LOCATION),
-    instance=quote_plus(POSTGRES_INSTANCE_NAME),
-    database=quote_plus(POSTGRES_DATABASE_NAME),
-    client_cert_file=quote_plus(POSTGRES_CLIENT_CERT_FILE),
-    client_key_file=quote_plus(POSTGRES_CLIENT_KEY_FILE),
-    server_ca_file=quote_plus(POSTGRES_SERVER_CA_FILE)
+    user=quote_plus(GCSQL_POSTGRES_USER),
+    password=quote_plus(GCSQL_POSTGRES_PASSWORD),
+    public_port=GCSQL_POSTGRES_PUBLIC_PORT,
+    public_ip=quote_plus(GCSQL_POSTGRES_PUBLIC_IP),
+    project_id=quote_plus(GCP_PROJECT_ID),
+    location=quote_plus(GCP_REGION),
+    instance=quote_plus(GCSQL_POSTGRES_INSTANCE_NAME),
+    database=quote_plus(GCSQL_POSTGRES_DATABASE_NAME),
+    client_cert_file=quote_plus(get_absolute_path(GCSQL_POSTGRES_CLIENT_CERT_FILE)),
+    client_key_file=quote_plus(get_absolute_path(GCSQL_POSTGRES_CLIENT_KEY_FILE)),
+    server_ca_file=quote_plus(get_absolute_path(GCSQL_POSTGRES_SERVER_CA_FILE))
 )
 
 # The connections below are created using one of the standard approaches - via environment
@@ -161,17 +172,17 @@
     .format(**postgres_kwargs)
 
 mysql_kwargs = dict(
-    user=quote_plus(MYSQL_USER),
-    password=quote_plus(MYSQL_PASSWORD),
-    public_port=MYSQL_PUBLIC_PORT,
-    public_ip=quote_plus(MYSQL_PUBLIC_IP),
-    project_id=quote_plus(PROJECT_ID),
-    location=quote_plus(LOCATION),
-    instance=quote_plus(MYSQL_INSTANCE_NAME),
-    database=quote_plus(MYSQL_DATABASE_NAME),
-    client_cert_file=quote_plus(MYSQL_CLIENT_CERT_FILE),
-    client_key_file=quote_plus(MYSQL_CLIENT_KEY_FILE),
-    server_ca_file=quote_plus(MYSQL_SERVER_CA_FILE)
+    user=quote_plus(GCSQL_MYSQL_USER),
+    password=quote_plus(GCSQL_MYSQL_PASSWORD),
+    public_port=GCSQL_MYSQL_PUBLIC_PORT,
+    public_ip=quote_plus(GCSQL_MYSQL_PUBLIC_IP),
+    project_id=quote_plus(GCP_PROJECT_ID),
+    location=quote_plus(GCP_REGION),
+    instance=quote_plus(GCSQL_MYSQL_INSTANCE_NAME),
+    database=quote_plus(GCSQL_MYSQL_DATABASE_NAME),
+    client_cert_file=quote_plus(get_absolute_path(GCSQL_MYSQL_CLIENT_CERT_FILE)),
+    client_key_file=quote_plus(get_absolute_path(GCSQL_MYSQL_CLIENT_KEY_FILE)),
+    server_ca_file=quote_plus(get_absolute_path(GCSQL_MYSQL_SERVER_CA_FILE))
 )
 
 # MySQL: connect via proxy over TCP (specific proxy version)
@@ -246,7 +257,8 @@
 with models.DAG(
     dag_id='example_gcp_sql_query',
     default_args=default_args,
-    schedule_interval=None
+    schedule_interval=None,
+    catchup=False
 ) as dag:
     for connection_name in connection_names:
         tasks.append(
diff --git a/airflow/contrib/hooks/gcp_function_hook.py b/airflow/contrib/hooks/gcp_function_hook.py
index 29cef1716c..e260e1d832 100644
--- a/airflow/contrib/hooks/gcp_function_hook.py
+++ b/airflow/contrib/hooks/gcp_function_hook.py
@@ -76,7 +76,7 @@ def list_functions(self, full_location):
         Lists all Cloud Functions created in the location.
 
         :param full_location: full location including the project in the form of
-            of /projects/<PROJECT>/location/<LOCATION>
+            of /projects/<PROJECT>/location/<GCP_LOCATION>
         :type full_location: str
         :return: array of Cloud Functions objects - representing functions in the location
         :rtype: [dict]
@@ -90,7 +90,7 @@ def create_new_function(self, full_location, body):
         Creates a new function in Cloud Function in the location specified in the body.
 
         :param full_location: full location including the project in the form of
-            of /projects/<PROJECT>/location/<LOCATION>
+            of /projects/<PROJECT>/location/<GCP_LOCATION>
         :type full_location: str
         :param body: body required by the Cloud Functions insert API
         :type body: dict
@@ -130,7 +130,7 @@ def upload_function_zip(self, parent, zip_path):
         Uploads zip file with sources.
 
         :param parent: Google Cloud Platform project id and region where zip file should
-         be uploaded in the form of /projects/<PROJECT>/location/<LOCATION>
+         be uploaded in the form of /projects/<PROJECT>/location/<GCP_LOCATION>
         :type parent: str
         :param zip_path: path of the valid .zip file to upload
         :type zip_path: str
diff --git a/airflow/contrib/hooks/gcp_sql_hook.py b/airflow/contrib/hooks/gcp_sql_hook.py
index 1581637e0d..95b8ea95ec 100644
--- a/airflow/contrib/hooks/gcp_sql_hook.py
+++ b/airflow/contrib/hooks/gcp_sql_hook.py
@@ -19,8 +19,11 @@
 import errno
 import json
 import os
+import random
 import re
 import shutil
+import string
+
 import socket
 import platform
 import subprocess
@@ -45,6 +48,8 @@
 from airflow.models import Connection
 from airflow.utils.db import provide_session
 
+UNIX_PATH_MAX = 108
+
 NUM_RETRIES = 5
 
 # Time to sleep between active checks of the operation results
@@ -749,18 +754,41 @@ def _validate_inputs(self):
             self._check_ssl_file(self.sslcert, "sslcert")
             self._check_ssl_file(self.sslkey, "sslkey")
             self._check_ssl_file(self.sslrootcert, "sslrootcert")
+        if self.use_proxy and not self.sql_proxy_use_tcp:
+            base_socket_path_len = 13  # /tmp/ + 8 chars + /
+            project_id_len = len(self.project_id) + 1  # + :
+            location_len = len(self.location) + 1  # + :
+            instance_len = len(self.instance)
+            if self.database_type == 'postgres':
+                suffix = "/.s.PGSQL.5432"
+                suffix_len = len(suffix)
+            else:
+                suffix = ""
+                suffix_len = 0
+            total_socket_path_len = base_socket_path_len + project_id_len + \
+                location_len + instance_len + suffix_len
+            if total_socket_path_len > UNIX_PATH_MAX:
+                raise AirflowException(
+                    "The UNIX socket path length cannot exceed {} characters "
+                    "on Linux system. Either use shorter instance/database "
+                    "name or switch to TCP connection. "
+                    "The socket path for Cloud SQL proxy is now:"
+                    " /tmp/[8 random chars]/{}:{}:{}{}".format(
+                        UNIX_PATH_MAX, self.project_id, self.instance,
+                        self.database, suffix))
 
     def _generate_unique_path(self):
         # We are not using mkdtemp here as the path generated with mkdtemp
         # can be close to 60 characters and there is a limitation in
         # length of socket path to around 100 characters in total.
         # We append project/location/instance to it later and postgres
-        # appends its own prefix, so we chose a shorter "/tmp/{uuid1}" - based
-        # on host name and clock + clock sequence. This should be fairly
-        # sufficient for our needs and should even work if the time is set back.
-        # We are using db_conn_id generated with uuid1 so that connection
-        # id matches the folder - for easier debugging.
-        return "/tmp/" + self.db_conn_id
+        # appends its own prefix, so we chose a shorter "/tmp/[8 random characters]" -
+        random.seed()
+        while True:
+            candidate = "/tmp/" + ''.join(
+                random.choice(string.ascii_letters + string.digits) for _ in range(8))
+            if not os.path.exists(candidate):
+                return candidate
 
     @staticmethod
     def _quote(value):
diff --git a/airflow/contrib/operators/gcp_function_operator.py b/airflow/contrib/operators/gcp_function_operator.py
index 7f7da1d3ec..2153af452f 100644
--- a/airflow/contrib/operators/gcp_function_operator.py
+++ b/airflow/contrib/operators/gcp_function_operator.py
@@ -181,7 +181,7 @@ def _set_airflow_version_label(self):
 
     def execute(self, context):
         if self.zip_path_preprocessor.should_upload_function():
-            self.body[SOURCE_UPLOAD_URL] = self._upload_source_code()
+            self.body[GCF_SOURCE_UPLOAD_URL] = self._upload_source_code()
         self._validate_all_body_fields()
         self._set_airflow_version_label()
         if not self._check_if_function_exists():
@@ -190,10 +190,10 @@ def execute(self, context):
             self._update_function()
 
 
-SOURCE_ARCHIVE_URL = 'sourceArchiveUrl'
-SOURCE_UPLOAD_URL = 'sourceUploadUrl'
+GCF_SOURCE_ARCHIVE_URL = 'sourceArchiveUrl'
+GCF_SOURCE_UPLOAD_URL = 'sourceUploadUrl'
 SOURCE_REPOSITORY = 'sourceRepository'
-ZIP_PATH = 'zip_path'
+GCF_ZIP_PATH = 'zip_path'
 
 
 class ZipPathPreprocessor:
@@ -226,28 +226,28 @@ def _is_present_and_empty(dictionary, field):
         return field in dictionary and not dictionary[field]
 
     def _verify_upload_url_and_no_zip_path(self):
-        if self._is_present_and_empty(self.body, SOURCE_UPLOAD_URL):
+        if self._is_present_and_empty(self.body, GCF_SOURCE_UPLOAD_URL):
             if not self.zip_path:
                 raise AirflowException(
                     "Parameter '{}' is empty in the body and argument '{}' "
                     "is missing or empty. You need to have non empty '{}' "
                     "when '{}' is present and empty.".
-                    format(SOURCE_UPLOAD_URL, ZIP_PATH, ZIP_PATH, SOURCE_UPLOAD_URL))
+                    format(GCF_SOURCE_UPLOAD_URL, GCF_ZIP_PATH, GCF_ZIP_PATH, GCF_SOURCE_UPLOAD_URL))
 
     def _verify_upload_url_and_zip_path(self):
-        if SOURCE_UPLOAD_URL in self.body and self.zip_path:
-            if not self.body[SOURCE_UPLOAD_URL]:
+        if GCF_SOURCE_UPLOAD_URL in self.body and self.zip_path:
+            if not self.body[GCF_SOURCE_UPLOAD_URL]:
                 self.upload_function = True
             else:
                 raise AirflowException("Only one of '{}' in body or '{}' argument "
                                        "allowed. Found both."
-                                       .format(SOURCE_UPLOAD_URL, ZIP_PATH))
+                                       .format(GCF_SOURCE_UPLOAD_URL, GCF_ZIP_PATH))
 
     def _verify_archive_url_and_zip_path(self):
-        if SOURCE_ARCHIVE_URL in self.body and self.zip_path:
+        if GCF_SOURCE_ARCHIVE_URL in self.body and self.zip_path:
             raise AirflowException("Only one of '{}' in body or '{}' argument "
                                    "allowed. Found both."
-                                   .format(SOURCE_ARCHIVE_URL, ZIP_PATH))
+                                   .format(GCF_SOURCE_ARCHIVE_URL, GCF_ZIP_PATH))
 
     def should_upload_function(self):
         if self.upload_function is None:
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index 095553b3ac..7beee2a626 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -412,13 +412,13 @@ Cloud IAM permissions.
    service account.
 
 The typical way of assigning Cloud IAM permissions with `gcloud` is
-shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project
+shown below. Just replace GCP_PROJECT_ID with ID of your Google Cloud Platform project
 and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
 
 .. code-block:: bash
 
   gcloud iam service-accounts add-iam-policy-binding \
-    PROJECT_ID@appspot.gserviceaccount.com \
+    GCP_PROJECT_ID@appspot.gserviceaccount.com \
     --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
     --role="roles/iam.serviceAccountUser"
 
@@ -520,13 +520,13 @@ Cloud IAM permissions.
    service account.
 
 The typical way of assigning Cloud IAM permissions with `gcloud` is
-shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project
+shown below. Just replace GCP_PROJECT_ID with ID of your Google Cloud Platform project
 and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
 
 .. code-block:: bash
 
   gcloud iam service-accounts add-iam-policy-binding \
-    PROJECT_ID@appspot.gserviceaccount.com \
+    GCP_PROJECT_ID@appspot.gserviceaccount.com \
     --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
     --role="roles/iam.serviceAccountUser"
 
diff --git a/setup.py b/setup.py
index 56b9bb2873..c063796d56 100644
--- a/setup.py
+++ b/setup.py
@@ -162,6 +162,7 @@ def write_version(filename=os.path.join(*['airflow',
 ]
 # major update coming soon, clamp to 0.x
 cloudant = ['cloudant>=0.5.9,<2.0']
+coverage = ['coverage']
 crypto = ['cryptography>=0.9.3']
 dask = [
     'distributed>=1.17.1, <2'
@@ -204,6 +205,7 @@ def write_version(filename=os.path.join(*['airflow',
 jdbc = ['jaydebeapi>=1.1.1']
 jenkins = ['python-jenkins>=0.4.15']
 jira = ['JIRA>1.0.7']
+junit2html = ['junit2html==22']
 kerberos = ['pykerberos>=1.1.13',
             'requests_kerberos>=0.10.0',
             'thrift_sasl>=0.2.0',
@@ -237,6 +239,7 @@ def write_version(filename=os.path.join(*['airflow',
 vertica = ['vertica-python>=0.5.1']
 webhdfs = ['hdfs[dataframe,avro,kerberos]>=2.0.4']
 winrm = ['pywinrm==0.2.2']
+xunitmerge = ['xunitmerge==1.0.4']
 zendesk = ['zdesk']
 
 all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid + pinot \
@@ -273,7 +276,7 @@ def write_version(filename=os.path.join(*['airflow',
              docker + ssh + kubernetes + celery + azure_blob_storage + redis + gcp_api +
              datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
              druid + pinot + segment + snowflake + elasticsearch + azure_data_lake + azure_cosmos +
-             atlas)
+             atlas + coverage + junit2html + xunitmerge)
 
 # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
 if PY3:
diff --git a/tests/contrib/hooks/test_gcp_container_hook.py b/tests/contrib/hooks/test_gcp_container_hook.py
index 6e13461395..8123a54041 100644
--- a/tests/contrib/hooks/test_gcp_container_hook.py
+++ b/tests/contrib/hooks/test_gcp_container_hook.py
@@ -32,16 +32,16 @@
 
 TASK_ID = 'test-gke-cluster-operator'
 CLUSTER_NAME = 'test-cluster'
-TEST_PROJECT_ID = 'test-project'
-ZONE = 'test-zone'
+TEST_GCP_PROJECT_ID = 'test-project'
+GCE_ZONE = 'test-zone'
 
 
 class GKEClusterHookDeleteTest(unittest.TestCase):
     def setUp(self):
         with mock.patch.object(GKEClusterHook, "__init__", return_value=None):
             self.gke_hook = GKEClusterHook(None, None, None)
-            self.gke_hook.project_id = TEST_PROJECT_ID
-            self.gke_hook.location = ZONE
+            self.gke_hook.project_id = TEST_GCP_PROJECT_ID
+            self.gke_hook.location = GCE_ZONE
             self.gke_hook.client = mock.Mock()
 
     @mock.patch("airflow.contrib.hooks.gcp_container_hook.GKEClusterHook._dict_to_proto")
@@ -55,7 +55,7 @@ def test_delete_cluster(self, wait_mock, convert_mock):
         self.gke_hook.delete_cluster(name=CLUSTER_NAME, retry=retry_mock,
                                      timeout=timeout_mock)
 
-        client_delete.assert_called_with(project_id=TEST_PROJECT_ID, zone=ZONE,
+        client_delete.assert_called_with(project_id=TEST_GCP_PROJECT_ID, zone=GCE_ZONE,
                                          cluster_id=CLUSTER_NAME,
                                          retry=retry_mock, timeout=timeout_mock)
         wait_mock.assert_called_with(client_delete.return_value)
@@ -94,8 +94,8 @@ class GKEClusterHookCreateTest(unittest.TestCase):
     def setUp(self):
         with mock.patch.object(GKEClusterHook, "__init__", return_value=None):
             self.gke_hook = GKEClusterHook(None, None, None)
-            self.gke_hook.project_id = TEST_PROJECT_ID
-            self.gke_hook.location = ZONE
+            self.gke_hook.project_id = TEST_GCP_PROJECT_ID
+            self.gke_hook.location = GCE_ZONE
             self.gke_hook.client = mock.Mock()
 
     @mock.patch("airflow.contrib.hooks.gcp_container_hook.GKEClusterHook._dict_to_proto")
@@ -114,7 +114,7 @@ def test_create_cluster_proto(self, wait_mock, convert_mock):
         self.gke_hook.create_cluster(mock_cluster_proto, retry=retry_mock,
                                      timeout=timeout_mock)
 
-        client_create.assert_called_with(project_id=TEST_PROJECT_ID, zone=ZONE,
+        client_create.assert_called_with(project_id=TEST_GCP_PROJECT_ID, zone=GCE_ZONE,
                                          cluster=mock_cluster_proto,
                                          retry=retry_mock, timeout=timeout_mock)
         wait_mock.assert_called_with(client_create.return_value)
@@ -133,7 +133,7 @@ def test_create_cluster_dict(self, wait_mock, convert_mock):
         self.gke_hook.create_cluster(mock_cluster_dict, retry=retry_mock,
                                      timeout=timeout_mock)
 
-        client_create.assert_called_with(project_id=TEST_PROJECT_ID, zone=ZONE,
+        client_create.assert_called_with(project_id=TEST_GCP_PROJECT_ID, zone=GCE_ZONE,
                                          cluster=proto_mock,
                                          retry=retry_mock, timeout=timeout_mock)
         wait_mock.assert_called_with(client_create.return_value)
@@ -172,8 +172,8 @@ class GKEClusterHookGetTest(unittest.TestCase):
     def setUp(self):
         with mock.patch.object(GKEClusterHook, "__init__", return_value=None):
             self.gke_hook = GKEClusterHook(None, None, None)
-            self.gke_hook.project_id = TEST_PROJECT_ID
-            self.gke_hook.location = ZONE
+            self.gke_hook.project_id = TEST_GCP_PROJECT_ID
+            self.gke_hook.location = GCE_ZONE
             self.gke_hook.client = mock.Mock()
 
     def test_get_cluster(self):
@@ -184,7 +184,7 @@ def test_get_cluster(self):
         self.gke_hook.get_cluster(name=CLUSTER_NAME, retry=retry_mock,
                                   timeout=timeout_mock)
 
-        client_get.assert_called_with(project_id=TEST_PROJECT_ID, zone=ZONE,
+        client_get.assert_called_with(project_id=TEST_GCP_PROJECT_ID, zone=GCE_ZONE,
                                       cluster_id=CLUSTER_NAME,
                                       retry=retry_mock, timeout=timeout_mock)
 
@@ -194,15 +194,15 @@ class GKEClusterHookTest(unittest.TestCase):
     def setUp(self):
         with mock.patch.object(GKEClusterHook, "__init__", return_value=None):
             self.gke_hook = GKEClusterHook(None, None, None)
-            self.gke_hook.project_id = TEST_PROJECT_ID
-            self.gke_hook.location = ZONE
+            self.gke_hook.project_id = TEST_GCP_PROJECT_ID
+            self.gke_hook.location = GCE_ZONE
             self.gke_hook.client = mock.Mock()
 
     def test_get_operation(self):
         self.gke_hook.client.get_operation = mock.Mock()
         self.gke_hook.get_operation('TEST_OP')
-        self.gke_hook.client.get_operation.assert_called_with(project_id=TEST_PROJECT_ID,
-                                                              zone=ZONE,
+        self.gke_hook.client.get_operation.assert_called_with(project_id=TEST_GCP_PROJECT_ID,
+                                                              zone=GCE_ZONE,
                                                               operation_id='TEST_OP')
 
     def test_append_label(self):
diff --git a/tests/contrib/hooks/test_gcp_dataproc_hook.py b/tests/contrib/hooks/test_gcp_dataproc_hook.py
index e22b27a8e7..acc4922f82 100644
--- a/tests/contrib/hooks/test_gcp_dataproc_hook.py
+++ b/tests/contrib/hooks/test_gcp_dataproc_hook.py
@@ -31,8 +31,8 @@
         mock = None
 
 JOB = 'test-job'
-PROJECT_ID = 'test-project-id'
-REGION = 'global'
+GCP_PROJECT_ID = 'test-project-id'
+GCP_REGION = 'global'
 TASK_ID = 'test-task-id'
 
 BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
@@ -53,8 +53,8 @@ def setUp(self):
     def test_submit(self, job_mock):
         with mock.patch(DATAPROC_STRING.format('DataProcHook.get_conn',
                                                return_value=None)):
-            self.dataproc_hook.submit(PROJECT_ID, JOB)
-            job_mock.assert_called_once_with(mock.ANY, PROJECT_ID, JOB, REGION,
+            self.dataproc_hook.submit(GCP_PROJECT_ID, JOB)
+            job_mock.assert_called_once_with(mock.ANY, GCP_PROJECT_ID, JOB, GCP_REGION,
                                              job_error_states=mock.ANY)
 
 
diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py
index b92116a031..01c01fead4 100644
--- a/tests/contrib/operators/test_bigquery_operator.py
+++ b/tests/contrib/operators/test_bigquery_operator.py
@@ -34,7 +34,7 @@
 
 TASK_ID = 'test-bq-create-table-operator'
 TEST_DATASET = 'test-dataset'
-TEST_PROJECT_ID = 'test-project'
+TEST_GCP_PROJECT_ID = 'test-project'
 TEST_TABLE_ID = 'test-table-id'
 TEST_GCS_BUCKET = 'test-bucket'
 TEST_GCS_DATA = ['dir1/*.csv']
@@ -47,7 +47,7 @@ class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase):
     def test_execute(self, mock_hook):
         operator = BigQueryCreateEmptyTableOperator(task_id=TASK_ID,
                                                     dataset_id=TEST_DATASET,
-                                                    project_id=TEST_PROJECT_ID,
+                                                    project_id=TEST_GCP_PROJECT_ID,
                                                     table_id=TEST_TABLE_ID)
 
         operator.execute(None)
@@ -57,7 +57,7 @@ def test_execute(self, mock_hook):
             .create_empty_table \
             .assert_called_once_with(
                 dataset_id=TEST_DATASET,
-                project_id=TEST_PROJECT_ID,
+                project_id=TEST_GCP_PROJECT_ID,
                 table_id=TEST_TABLE_ID,
                 schema_fields=None,
                 time_partitioning={},
@@ -111,7 +111,7 @@ def test_execute(self, mock_hook):
         operator = BigQueryDeleteDatasetOperator(
             task_id=TASK_ID,
             dataset_id=TEST_DATASET,
-            project_id=TEST_PROJECT_ID
+            project_id=TEST_GCP_PROJECT_ID
         )
 
         operator.execute(None)
@@ -121,7 +121,7 @@ def test_execute(self, mock_hook):
             .delete_dataset \
             .assert_called_once_with(
                 dataset_id=TEST_DATASET,
-                project_id=TEST_PROJECT_ID
+                project_id=TEST_GCP_PROJECT_ID
             )
 
 
@@ -131,7 +131,7 @@ def test_execute(self, mock_hook):
         operator = BigQueryCreateEmptyDatasetOperator(
             task_id=TASK_ID,
             dataset_id=TEST_DATASET,
-            project_id=TEST_PROJECT_ID
+            project_id=TEST_GCP_PROJECT_ID
         )
 
         operator.execute(None)
@@ -141,7 +141,7 @@ def test_execute(self, mock_hook):
             .create_empty_dataset \
             .assert_called_once_with(
                 dataset_id=TEST_DATASET,
-                project_id=TEST_PROJECT_ID,
+                project_id=TEST_GCP_PROJECT_ID,
                 dataset_reference={}
             )
 
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 219d7255ff..28e4e035e9 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -50,9 +50,9 @@
 
 TASK_ID = 'test-dataproc-operator'
 CLUSTER_NAME = 'test-cluster-name'
-PROJECT_ID = 'test-project-id'
+GCP_PROJECT_ID = 'test-project-id'
 NUM_WORKERS = 123
-ZONE = 'us-central1-a'
+GCE_ZONE = 'us-central1-a'
 NETWORK_URI = '/projects/project_id/regions/global/net'
 SUBNETWORK_URI = '/projects/project_id/regions/global/subnet'
 INTERNAL_IP_ONLY = True
@@ -78,7 +78,7 @@
 AUTO_DELETE_TIME = datetime.datetime(2017, 6, 7)
 AUTO_DELETE_TTL = 654
 DEFAULT_DATE = datetime.datetime(2017, 6, 6)
-REGION = 'test-region'
+GCP_REGION = 'test-region'
 MAIN_URI = 'test-uri'
 TEMPLATE_ID = 'template-id'
 
@@ -87,7 +87,7 @@
 DATAPROC_JOB_TO_SUBMIT = {
     'job': {
         'reference': {
-            'projectId': PROJECT_ID,
+            'projectId': GCP_PROJECT_ID,
             'jobId': DATAPROC_JOB_ID,
         },
         'placement': {
@@ -118,9 +118,9 @@ def setUp(self):
                 DataprocClusterCreateOperator(
                     task_id=TASK_ID,
                     cluster_name=CLUSTER_NAME,
-                    project_id=PROJECT_ID,
+                    project_id=GCP_PROJECT_ID,
                     num_workers=NUM_WORKERS,
-                    zone=ZONE,
+                    zone=GCE_ZONE,
                     network_uri=NETWORK_URI,
                     subnetwork_uri=SUBNETWORK_URI,
                     internal_ip_only=INTERNAL_IP_ONLY,
@@ -154,9 +154,9 @@ def test_init(self):
         """Test DataProcClusterOperator instance is properly initialized."""
         for suffix, dataproc_operator in enumerate(self.dataproc_operators):
             self.assertEqual(dataproc_operator.cluster_name, CLUSTER_NAME)
-            self.assertEqual(dataproc_operator.project_id, PROJECT_ID)
+            self.assertEqual(dataproc_operator.project_id, GCP_PROJECT_ID)
             self.assertEqual(dataproc_operator.num_workers, NUM_WORKERS)
-            self.assertEqual(dataproc_operator.zone, ZONE)
+            self.assertEqual(dataproc_operator.zone, GCE_ZONE)
             self.assertEqual(dataproc_operator.network_uri, NETWORK_URI)
             self.assertEqual(dataproc_operator.subnetwork_uri, SUBNETWORK_URI)
             self.assertEqual(dataproc_operator.tags, TAGS)
@@ -186,7 +186,7 @@ def test_build_cluster_data(self):
         for suffix, dataproc_operator in enumerate(self.dataproc_operators):
             cluster_data = dataproc_operator._build_cluster_data()
             self.assertEqual(cluster_data['clusterName'], CLUSTER_NAME)
-            self.assertEqual(cluster_data['projectId'], PROJECT_ID)
+            self.assertEqual(cluster_data['projectId'], GCP_PROJECT_ID)
             self.assertEqual(cluster_data['config']['softwareConfig'],
                              {'imageVersion': IMAGE_VERSION})
             self.assertEqual(cluster_data['config']['configBucket'], STORAGE_BUCKET)
@@ -223,9 +223,9 @@ def test_build_cluster_data_with_autoDeleteTime(self):
         dataproc_operator = DataprocClusterCreateOperator(
             task_id=TASK_ID,
             cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             num_workers=NUM_WORKERS,
-            zone=ZONE,
+            zone=GCE_ZONE,
             dag=self.dag,
             auto_delete_time=AUTO_DELETE_TIME,
         )
@@ -237,9 +237,9 @@ def test_build_cluster_data_with_autoDeleteTtl(self):
         dataproc_operator = DataprocClusterCreateOperator(
             task_id=TASK_ID,
             cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             num_workers=NUM_WORKERS,
-            zone=ZONE,
+            zone=GCE_ZONE,
             dag=self.dag,
             auto_delete_ttl=AUTO_DELETE_TTL,
         )
@@ -251,9 +251,9 @@ def test_build_cluster_data_with_autoDeleteTime_and_autoDeleteTtl(self):
         dataproc_operator = DataprocClusterCreateOperator(
             task_id=TASK_ID,
             cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             num_workers=NUM_WORKERS,
-            zone=ZONE,
+            zone=GCE_ZONE,
             dag=self.dag,
             auto_delete_time=AUTO_DELETE_TIME,
             auto_delete_ttl=AUTO_DELETE_TTL,
@@ -270,9 +270,9 @@ def test_init_with_image_version_and_custom_image_both_set(self):
             DataprocClusterCreateOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
-                zone=ZONE,
+                zone=GCE_ZONE,
                 dag=self.dag,
                 image_version=IMAGE_VERSION,
                 custom_image=CUSTOM_IMAGE
@@ -282,9 +282,9 @@ def test_init_with_custom_image(self):
         dataproc_operator = DataprocClusterCreateOperator(
             task_id=TASK_ID,
             cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             num_workers=NUM_WORKERS,
-            zone=ZONE,
+            zone=GCE_ZONE,
             dag=self.dag,
             custom_image=CUSTOM_IMAGE
         )
@@ -292,7 +292,7 @@ def test_init_with_custom_image(self):
         cluster_data = dataproc_operator._build_cluster_data()
         expected_custom_image_url = \
             'https://www.googleapis.com/compute/beta/projects/' \
-            '{}/global/images/{}'.format(PROJECT_ID, CUSTOM_IMAGE)
+            '{}/global/images/{}'.format(GCP_PROJECT_ID, CUSTOM_IMAGE)
         self.assertEqual(cluster_data['config']['masterConfig']['imageUri'],
                          expected_custom_image_url)
         self.assertEqual(cluster_data['config']['workerConfig']['imageUri'],
@@ -302,10 +302,10 @@ def test_build_single_node_cluster(self):
         dataproc_operator = DataprocClusterCreateOperator(
             task_id=TASK_ID,
             cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             num_workers=0,
             num_preemptible_workers=0,
-            zone=ZONE,
+            zone=GCE_ZONE,
             dag=self.dag
         )
         cluster_data = dataproc_operator._build_cluster_data()
@@ -318,10 +318,10 @@ def test_init_cluster_with_zero_workers_and_not_non_zero_preemtibles(self):
             DataprocClusterCreateOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=0,
                 num_preemptible_workers=2,
-                zone=ZONE,
+                zone=GCE_ZONE,
                 dag=self.dag,
                 image_version=IMAGE_VERSION,
             )
@@ -332,9 +332,9 @@ def test_cluster_name_log_no_sub(self):
             dataproc_task = DataprocClusterCreateOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
-                zone=ZONE,
+                zone=GCE_ZONE,
                 dag=self.dag
             )
             with patch.object(dataproc_task.log, 'info') as mock_info:
@@ -348,9 +348,9 @@ def test_cluster_name_log_sub(self):
             dataproc_task = DataprocClusterCreateOperator(
                 task_id=TASK_ID,
                 cluster_name='smoke-cluster-{{ ts_nodash }}',
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
-                zone=ZONE,
+                zone=GCE_ZONE,
                 dag=self.dag
             )
             with patch.object(dataproc_task.log, 'info') as mock_info:
@@ -371,9 +371,9 @@ def create_cluster_with_invalid_internal_ip_only_setup():
             create_cluster = DataprocClusterCreateOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
-                zone=ZONE,
+                zone=GCE_ZONE,
                 dag=self.dag,
                 internal_ip_only=True)
 
@@ -418,7 +418,7 @@ def test_cluster_name_log_no_sub(self):
             dataproc_task = DataprocClusterScaleOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
                 num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
                 dag=self.dag
@@ -434,7 +434,7 @@ def test_cluster_name_log_sub(self):
             dataproc_task = DataprocClusterScaleOperator(
                 task_id=TASK_ID,
                 cluster_name='smoke-cluster-{{ ts_nodash }}',
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 num_workers=NUM_WORKERS,
                 num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
                 dag=self.dag
@@ -482,7 +482,7 @@ def test_cluster_name_log_no_sub(self):
             dataproc_task = DataprocClusterDeleteOperator(
                 task_id=TASK_ID,
                 cluster_name=CLUSTER_NAME,
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 dag=self.dag
             )
             with patch.object(dataproc_task.log, 'info') as mock_info:
@@ -496,7 +496,7 @@ def test_cluster_name_log_sub(self):
             dataproc_task = DataprocClusterDeleteOperator(
                 task_id=TASK_ID,
                 cluster_name='smoke-cluster-{{ ts_nodash }}',
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 dag=self.dag
             )
 
@@ -520,12 +520,12 @@ def test_hook_correct_region():
         with patch(HOOK) as mock_hook:
             dataproc_task = DataProcHadoopOperator(
                 task_id=TASK_ID,
-                region=REGION
+                region=GCP_REGION
             )
 
             dataproc_task.execute(None)
             mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
-                                                                  REGION, mock.ANY)
+                                                                  GCP_REGION, mock.ANY)
 
     @staticmethod
     def test_dataproc_job_id_is_set():
@@ -544,12 +544,12 @@ def test_hook_correct_region():
         with patch(HOOK) as mock_hook:
             dataproc_task = DataProcHiveOperator(
                 task_id=TASK_ID,
-                region=REGION
+                region=GCP_REGION
             )
 
             dataproc_task.execute(None)
             mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
-                                                                  REGION, mock.ANY)
+                                                                  GCP_REGION, mock.ANY)
 
     @staticmethod
     def test_dataproc_job_id_is_set():
@@ -569,12 +569,12 @@ def test_hook_correct_region():
             dataproc_task = DataProcPySparkOperator(
                 task_id=TASK_ID,
                 main=MAIN_URI,
-                region=REGION
+                region=GCP_REGION
             )
 
             dataproc_task.execute(None)
             mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
-                                                                  REGION, mock.ANY)
+                                                                  GCP_REGION, mock.ANY)
 
     @staticmethod
     def test_dataproc_job_id_is_set():
@@ -594,12 +594,12 @@ def test_hook_correct_region():
         with patch(HOOK) as mock_hook:
             dataproc_task = DataProcSparkOperator(
                 task_id=TASK_ID,
-                region=REGION
+                region=GCP_REGION
             )
 
             dataproc_task.execute(None)
             mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
-                                                                  REGION, mock.ANY)
+                                                                  GCP_REGION, mock.ANY)
 
     @staticmethod
     def test_dataproc_job_id_is_set():
@@ -642,8 +642,8 @@ def test_workflow(self):
 
             dataproc_task = DataprocWorkflowTemplateInstantiateOperator(
                 task_id=TASK_ID,
-                project_id=PROJECT_ID,
-                region=REGION,
+                project_id=GCP_PROJECT_ID,
+                region=GCP_REGION,
                 template_id=TEMPLATE_ID,
                 dag=self.dag
             )
@@ -694,7 +694,7 @@ def test_iniline_workflow(self):
                         "cluster_name": CLUSTER_NAME,
                         "config": {
                             "gce_cluster_config": {
-                                "zone_uri": ZONE,
+                                "zone_uri": GCE_ZONE,
                             }
                         }
                     }
@@ -710,8 +710,8 @@ def test_iniline_workflow(self):
 
             dataproc_task = DataprocWorkflowTemplateInstantiateInlineOperator(
                 task_id=TASK_ID,
-                project_id=PROJECT_ID,
-                region=REGION,
+                project_id=GCP_PROJECT_ID,
+                region=GCP_REGION,
                 template=template,
                 dag=self.dag
             )
diff --git a/tests/contrib/operators/test_gcp_base.py b/tests/contrib/operators/test_gcp_base.py
index 7e786c5b4b..5805e49652 100644
--- a/tests/contrib/operators/test_gcp_base.py
+++ b/tests/contrib/operators/test_gcp_base.py
@@ -20,6 +20,7 @@
 import os
 import subprocess
 import unittest
+from os.path import expanduser
 
 from airflow import models, settings, configuration, AirflowException
 from airflow.utils.timezone import datetime
@@ -41,10 +42,12 @@
 OPERATORS_EXAMPLES_DAG_FOLDER = os.path.join(
     AIRFLOW_MAIN_FOLDER, "airflow", "example_dags")
 
-TESTS_DAG_FOLDER = os.path.join(
+AIRFLOW_HOME = os.environ.get('AIRFLOW_HOME',
+                              os.path.join(os.path.expanduser('~'), 'airflow'))
+UNIT_TEST_DAG_FOLDER = os.path.join(
     AIRFLOW_MAIN_FOLDER, "tests", "dags")
 
-GCP_FOLDER_ENVIRONMENT_VARIABLE = "GCP_SERVICE_ACCOUNT_KEY_FOLDER"
+DAG_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
 
 GCP_COMPUTE_KEY = 'gcp_compute.json'
 GCP_FUNCTION_KEY = 'gcp_function.json'
@@ -54,10 +57,16 @@
 GCP_GCS_KEY = 'gcp_gcs.json'
 
 SKIP_TEST_WARNING = """
-The test is only run when there is GCP connection available! "
-Set GCP_SERVICE_ACCOUNT_KEY_FOLDER environment variable if "
-you want to run them".
-"""
+
+The test is only run when the test is run in with GCP-sytem-tests enabled 
+Airflow environment!
+
+Set AIRFLOW_BREEZE_CONFIG_DIR environment variable if you want to run thee tests.
+
+If you want to run it in IDE you should run 'python {}'.
+This way you will retrieve all the variables that you should set - ready to copy & paste. 
+
+""".format(__file__)
 
 
 class BaseGcpIntegrationTestCase(unittest.TestCase):
@@ -75,24 +84,126 @@ def __init__(self,
         self.example_dags_folder = example_dags_folder
         self.project_extra = project_extra
         self.full_key_path = None
+        self.original_account = None
+        self.project_id = self.get_project_id()
+
+    @staticmethod
+    def get_project_id():
+        return os.environ.get('GCP_PROJECT_ID')
+
+    @staticmethod
+    def execute_cmd(cmd, silent=False):
+        if silent:
+            print("Executing in silent mode: '{}'".format(" ".join(cmd)))
+            with open(os.devnull, 'w') as FNULL:
+                return subprocess.call(args=cmd, stdout=FNULL, stderr=subprocess.STDOUT)
+        else:
+            print("Executing: '{}'".format(" ".join(cmd)))
+            process = subprocess.Popen(args=cmd, stdout=subprocess.PIPE,
+                                       stderr=subprocess.PIPE)
+            output, err = process.communicate()
+            retcode = process.poll()
+            print("Stdout: {}".format(output))
+            print("Stderr: {}".format(err))
+            if retcode:
+                print("Error when executing '{}'".format(" ".join(cmd)))
+            return retcode
+
+    @staticmethod
+    def check_output(cmd):
+            print("Executing for output: '{}'".format(" ".join(cmd)))
+            process = subprocess.Popen(args=cmd, stdout=subprocess.PIPE,
+                                       stderr=subprocess.PIPE)
+            output, err = process.communicate()
+            retcode = process.poll()
+            if retcode:
+                print("Error when executing '{}'".format(" ".join(cmd)))
+                print("Stdout: {}".format(output))
+                print("Stderr: {}".format(err))
+                raise AirflowException("Retcode {} on {} with stdout: {}, stderr: {}".
+                                       format(retcode, " ".join(cmd), output, err))
+            return output
+
+    @staticmethod
+    def _get_key_path(key_name):
+        """
+        Returns key path - if AIRFLOW_BREEZE_CONFIG_DIR points to absolute
+            directory, it tries to find the key in this directory, otherwise it assumes
+            that the folder is sub-directory of the HOME directory.
+        :param key_name: name of the key file to find.
+        :return: path of the key file or None if the key is not found
+        :rtype: str
+        """
+        if "AIRFLOW_BREEZE_CONFIG_DIR" not in os.environ:
+            return None
+        airflow_breeze_config_dir = os.environ["AIRFLOW_BREEZE_CONFIG_DIR"]
+        if not airflow_breeze_config_dir.startswith("/"):
+            home_dir = expanduser("~")
+            airflow_breeze_config_dir = os.path.join(home_dir, airflow_breeze_config_dir)
+        if not os.path.isdir(airflow_breeze_config_dir):
+            return None
+        key_dir = os.path.join(airflow_breeze_config_dir, "keys")
+        if not os.path.isdir(key_dir):
+            return None
+        key_path = os.path.join(key_dir, key_name)
+        if not os.path.isfile(key_path):
+            return None
+        return key_path
 
-    def _gcp_authenticate(self):
-        key_dir_path = os.environ['GCP_SERVICE_ACCOUNT_KEY_FOLDER']
-        self.full_key_path = os.path.join(key_dir_path, self.gcp_key)
+    def gcp_authenticate(self):
+        """
+        Authenticate with service account specified.
+        """
+        self.full_key_path = self._get_key_path(self.gcp_key)
 
         if not os.path.isfile(self.full_key_path):
             raise Exception("The key {} could not be found. Please copy it to the "
-                            "{} folder.".format(self.gcp_key, key_dir_path))
+                            "{} path.".format(self.gcp_key, self.full_key_path))
         print("Setting the GCP key to {}".format(self.full_key_path))
         # Checking if we can authenticate using service account credentials provided
         retcode = subprocess.call(['gcloud', 'auth', 'activate-service-account',
-                                   '--key-file={}'.format(self.full_key_path)])
+                                   '--key-file={}'.format(self.full_key_path),
+                                   '--project={}'.format(self.project_id)])
         if retcode != 0:
             raise AirflowException("The gcloud auth method was not successful!")
         self.update_connection_with_key_path()
-        # Now we revoke all authentication here because we want to make sure
-        # that all works fine with the credentials retrieved from the gcp_connection
-        subprocess.call(['gcloud', 'auth', 'revoke'])
+
+    def gcp_revoke_authentication(self):
+        """
+        Change default authentication to non existing one.
+        Tests should be run without default authentication
+         because the authentication from Connection table should be used.
+        """
+        current_account = subprocess.check_output(
+            ['gcloud', 'config', 'get-value', 'account',
+             '--project={}'.format(self.project_id)]).decode('utf-8')
+        print("Revoking authentication for account: {}".format(current_account))
+        subprocess.call(['gcloud', 'auth', 'revoke',
+                         '--project={}'.format(self.project_id)])
+
+    def gcp_store_authentication(self):
+        """
+        Change default authentication to non existing one.
+        Tests should be run without default authentication
+         because the authentication from Connection table should be used.
+        """
+        self.original_account = subprocess.check_output(
+            ['gcloud', 'config', 'get-value', 'account',
+             '--project={}'.format(self.project_id)]).decode('utf-8')
+        print("Storing account: {}".format(self.original_account))
+        subprocess.call(['gcloud', 'config', 'set', 'account', 'none',
+                         '--project={}'.format(self.project_id)])
+
+    def gcp_restore_authentication(self):
+        """
+        Restore authentication to the original one one.
+        Tests should be run without default authentication
+         because the authentication from Connection table should be used.
+        """
+        if self.original_account:
+            print("Restoring original account stored: {}".format(self.original_account))
+            subprocess.call(['gcloud', 'config', 'set', 'account', self.original_account,
+                             '--project={}'.format(self.project_id)])
 
     def update_connection_with_key_path(self):
         session = settings.Session()
@@ -107,8 +218,8 @@ def update_connection_with_key_path(self):
             extras[PROJECT_EXTRA] = self.project_extra
             conn.extra = json.dumps(extras)
             session.commit()
-        except BaseException as e:
-            print('Airflow DB Session error:' + str(e.message))
+        except BaseException as ex:
+            print('Airflow DB Session error:' + str(ex.message))
             session.rollback()
             raise
         finally:
@@ -120,8 +231,8 @@ def update_connection_with_dictionary(self):
             conn = session.query(models.Connection).filter(
                 models.Connection.conn_id == 'google_cloud_default')[0]
             extras = conn.extra_dejson
-            with open(self.full_key_path, "r") as f:
-                content = json.load(f)
+            with open(self.full_key_path, "r") as path_file:
+                content = json.load(path_file)
             extras[KEYFILE_DICT_EXTRA] = json.dumps(content)
             if extras.get(KEYPATH_EXTRA):
                 del extras[KEYPATH_EXTRA]
@@ -129,47 +240,65 @@ def update_connection_with_dictionary(self):
             extras[PROJECT_EXTRA] = self.project_extra
             conn.extra = json.dumps(extras)
             session.commit()
-        except BaseException as e:
-            print('Airflow DB Session error:' + str(e.message))
+        except BaseException as ex:
+            print('Airflow DB Session error:' + str(ex.message))
             session.rollback()
             raise
         finally:
             session.close()
 
+    @staticmethod
+    def _get_dag_folder():
+        return UNIT_TEST_DAG_FOLDER
+
     def _symlink_dag(self):
-        target_path = os.path.join(TESTS_DAG_FOLDER, self.dag_name)
-        if os.path.exists(target_path):
-            os.remove(target_path)
-        os.symlink(
-            os.path.join(self.example_dags_folder, self.dag_name),
-            os.path.join(target_path))
+        target_path = os.path.join(self._get_dag_folder(), self.dag_name)
+        if not os.path.exists(target_path):
+            os.symlink(
+                os.path.join(self.example_dags_folder, self.dag_name),
+                os.path.join(target_path))
 
     def _rm_symlink_dag(self):
-        os.remove(os.path.join(TESTS_DAG_FOLDER, self.dag_name))
+        try:
+            os.remove(os.path.join(self._get_dag_folder(), self.dag_name))
+        except OSError:
+            pass
 
     def _run_dag(self):
-        dag_bag = models.DagBag(dag_folder=TESTS_DAG_FOLDER, include_examples=False)
+        dag_bag = models.DagBag(dag_folder=self._get_dag_folder(),
+                                include_examples=False)
         self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
         dag = dag_bag.get_dag(self.dag_id)
         dag.clear(reset_dag_runs=True)
         dag.run(ignore_first_depends_on_past=True, verbose=True)
 
+    @staticmethod
+    def _get_variables_dir():
+        return None
+
     def setUp(self):
+        if not os.environ.get('AIRFLOW__CORE__UNIT_TEST_MODE'):
+            raise AirflowException("AIRFLOW__CORE__UNIT_TEST_MODE variable must be"
+                                   " set to  non-empty value "
+                                   " BEFORE you run the test so that Airflow "
+                                   " engine is setup properly and uses unittest.db. "
+                                   " Make sure it is set in the"
+                                   " scripts executing the test or in your test "
+                                   " configuration in IDE")
         configuration.conf.load_test_config()
-        self._gcp_authenticate()
+        self.gcp_store_authentication()
+        self.gcp_authenticate()
+        # We checked that authentication works - but then we revoke it to make
+        # sure we are not relying on the authentication
+        self.gcp_revoke_authentication()
         self._symlink_dag()
 
     def tearDown(self):
-        self._rm_symlink_dag()
+        self.gcp_restore_authentication()
+        if not os.environ.get('SKIP_UNLINKING_EXAMPLES'):
+            self._rm_symlink_dag()
 
     @staticmethod
-    def skip_check(key):
-        if GCP_FOLDER_ENVIRONMENT_VARIABLE not in os.environ:
-            return True
-        key_folder = os.environ[GCP_FOLDER_ENVIRONMENT_VARIABLE]
-        if not os.path.isdir(key_folder):
-            return True
-        key_path = os.path.join(key_folder, key)
-        if not os.path.isfile(key_path):
-            return True
-        return False
+    def skip_check(key_name):
+        key_path = BaseGcpIntegrationTestCase._get_key_path(key_name)
+        return key_path is None
diff --git a/tests/contrib/operators/test_gcp_compute_operator.py b/tests/contrib/operators/test_gcp_compute_operator.py
index 4a4e336b7c..12d7d66c65 100644
--- a/tests/contrib/operators/test_gcp_compute_operator.py
+++ b/tests/contrib/operators/test_gcp_compute_operator.py
@@ -17,6 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import ast
+import os
 import unittest
 from copy import deepcopy
 
@@ -29,6 +30,8 @@
     GceInstanceGroupManagerUpdateTemplateOperator
 from airflow.models import TaskInstance, DAG
 from airflow.utils import timezone
+from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
+    GCP_COMPUTE_KEY, SKIP_TEST_WARNING
 
 try:
     # noinspection PyProtectedMember
@@ -41,12 +44,12 @@
 
 EMPTY_CONTENT = ''.encode('utf8')
 
-PROJECT_ID = 'project-id'
-ZONE = 'zone'
+GCP_PROJECT_ID = 'project-id'
+GCE_ZONE = 'zone'
 RESOURCE_ID = 'resource-id'
-SHORT_MACHINE_TYPE_NAME = 'n1-machine-type'
+GCE_SHORT_MACHINE_TYPE_NAME = 'n1-machine-type'
 SET_MACHINE_TYPE_BODY = {
-    'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, SHORT_MACHINE_TYPE_NAME)
+    'machineType': 'zones/{}/machineTypes/{}'.format(GCE_ZONE, GCE_SHORT_MACHINE_TYPE_NAME)
 }
 
 DEFAULT_DATE = timezone.datetime(2017, 1, 1)
@@ -57,8 +60,8 @@ class GceInstanceStartTest(unittest.TestCase):
     def test_instance_start(self, mock_hook):
         mock_hook.return_value.start_instance.return_value = True
         op = GceInstanceStartOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=RESOURCE_ID,
             task_id='id'
         )
@@ -66,14 +69,14 @@ def test_instance_start(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.start_instance.assert_called_once_with(
-            PROJECT_ID, ZONE, RESOURCE_ID
+            GCP_PROJECT_ID, GCE_ZONE, RESOURCE_ID
         )
         self.assertTrue(result)
 
     # Setting all of the operator's input parameters as template dag_ids
     # (could be anything else) just to test if the templating works for all fields
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
-    def test_instance_start_with_templates(self, mock_hook):
+    def test_instance_start_with_templates(self, _):
         dag_id = 'test_dag_id'
         configuration.load_test_config()
         args = {
@@ -102,7 +105,7 @@ def test_start_should_throw_ex_when_missing_project_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStartOperator(
                 project_id="",
-                zone=ZONE,
+                zone=GCE_ZONE,
                 resource_id=RESOURCE_ID,
                 task_id='id'
             )
@@ -115,7 +118,7 @@ def test_start_should_throw_ex_when_missing_project_id(self, mock_hook):
     def test_start_should_throw_ex_when_missing_zone(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStartOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 zone="",
                 resource_id=RESOURCE_ID,
                 task_id='id'
@@ -129,8 +132,8 @@ def test_start_should_throw_ex_when_missing_zone(self, mock_hook):
     def test_start_should_throw_ex_when_missing_resource_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStartOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id="",
                 task_id='id'
             )
@@ -145,8 +148,8 @@ class GceInstanceStopTest(unittest.TestCase):
     def test_instance_stop(self, mock_hook):
         mock_hook.return_value.stop_instance.return_value = True
         op = GceInstanceStopOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=RESOURCE_ID,
             task_id='id'
         )
@@ -154,14 +157,14 @@ def test_instance_stop(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.stop_instance.assert_called_once_with(
-            PROJECT_ID, ZONE, RESOURCE_ID
+            GCP_PROJECT_ID, GCE_ZONE, RESOURCE_ID
         )
         self.assertTrue(result)
 
     # Setting all of the operator's input parameters as templated dag_ids
     # (could be anything else) just to test if the templating works for all fields
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
-    def test_instance_stop_with_templates(self, mock_hook):
+    def test_instance_stop_with_templates(self, _):
         dag_id = 'test_dag_id'
         configuration.load_test_config()
         args = {
@@ -190,7 +193,7 @@ def test_stop_should_throw_ex_when_missing_project_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStopOperator(
                 project_id="",
-                zone=ZONE,
+                zone=GCE_ZONE,
                 resource_id=RESOURCE_ID,
                 task_id='id'
             )
@@ -203,7 +206,7 @@ def test_stop_should_throw_ex_when_missing_project_id(self, mock_hook):
     def test_stop_should_throw_ex_when_missing_zone(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStopOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 zone="",
                 resource_id=RESOURCE_ID,
                 task_id='id'
@@ -217,8 +220,8 @@ def test_stop_should_throw_ex_when_missing_zone(self, mock_hook):
     def test_stop_should_throw_ex_when_missing_resource_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStopOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id="",
                 task_id='id'
             )
@@ -233,8 +236,8 @@ class GceInstanceSetMachineTypeTest(unittest.TestCase):
     def test_set_machine_type(self, mock_hook):
         mock_hook.return_value.set_machine_type.return_value = True
         op = GceSetMachineTypeOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=RESOURCE_ID,
             body=SET_MACHINE_TYPE_BODY,
             task_id='id'
@@ -243,14 +246,14 @@ def test_set_machine_type(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.set_machine_type.assert_called_once_with(
-            PROJECT_ID, ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY
+            GCP_PROJECT_ID, GCE_ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY
         )
         self.assertTrue(result)
 
     # Setting all of the operator's input parameters as templated dag_ids
     # (could be anything else) just to test if the templating works for all fields
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
-    def test_set_machine_type_with_templates(self, mock_hook):
+    def test_set_machine_type_with_templates(self, _):
         dag_id = 'test_dag_id'
         configuration.load_test_config()
         args = {
@@ -280,7 +283,7 @@ def test_set_machine_type_should_throw_ex_when_missing_project_id(self, mock_hoo
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
                 project_id="",
-                zone=ZONE,
+                zone=GCE_ZONE,
                 resource_id=RESOURCE_ID,
                 body=SET_MACHINE_TYPE_BODY,
                 task_id='id'
@@ -294,7 +297,7 @@ def test_set_machine_type_should_throw_ex_when_missing_project_id(self, mock_hoo
     def test_set_machine_type_should_throw_ex_when_missing_zone(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 zone="",
                 resource_id=RESOURCE_ID,
                 body=SET_MACHINE_TYPE_BODY,
@@ -309,8 +312,8 @@ def test_set_machine_type_should_throw_ex_when_missing_zone(self, mock_hook):
     def test_set_machine_type_should_throw_ex_when_missing_resource_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id="",
                 body=SET_MACHINE_TYPE_BODY,
                 task_id='id'
@@ -324,8 +327,8 @@ def test_set_machine_type_should_throw_ex_when_missing_resource_id(self, mock_ho
     def test_set_machine_type_should_throw_ex_when_missing_machine_type(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id=RESOURCE_ID,
                 body={},
                 task_id='id'
@@ -367,11 +370,12 @@ def test_set_machine_type_should_handle_and_trim_gce_error(
             self, get_conn, _execute_set_machine_type, _check_zone_operation_status):
         get_conn.return_value = {}
         _execute_set_machine_type.return_value = {"name": "test-operation"}
-        _check_zone_operation_status.return_value = ast.literal_eval(self.MOCK_OP_RESPONSE)
+        _check_zone_operation_status.return_value = ast.literal_eval(
+            self.MOCK_OP_RESPONSE)
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id=RESOURCE_ID,
                 body=SET_MACHINE_TYPE_BODY,
                 task_id='id'
@@ -379,9 +383,9 @@ def test_set_machine_type_should_handle_and_trim_gce_error(
             op.execute(None)
         err = cm.exception
         _check_zone_operation_status.assert_called_once_with(
-            {}, "test-operation", PROJECT_ID, ZONE)
+            {}, "test-operation", GCP_PROJECT_ID, GCE_ZONE)
         _execute_set_machine_type.assert_called_once_with(
-            PROJECT_ID, ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY)
+            GCP_PROJECT_ID, GCE_ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY)
         # Checking the full message was sometimes failing due to different order
         # of keys in the serialized JSON
         self.assertIn("400 BAD REQUEST: {", str(err))  # checking the square bracket trim
@@ -490,7 +494,7 @@ def test_successful_copy_template(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME}
@@ -499,7 +503,7 @@ def test_successful_copy_template(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=GCE_INSTANCE_TEMPLATE_BODY_INSERT,
             request_id=None
         )
@@ -511,7 +515,7 @@ def test_idempotent_copy_template_when_already_copied(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME}
@@ -530,7 +534,7 @@ def test_successful_copy_template_with_request_id(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
             task_id='id',
@@ -540,7 +544,7 @@ def test_successful_copy_template_with_request_id(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=GCE_INSTANCE_TEMPLATE_BODY_INSERT,
             request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
         )
@@ -554,7 +558,7 @@ def test_successful_copy_template_with_description_fields(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
             task_id='id',
@@ -568,7 +572,7 @@ def test_successful_copy_template_with_description_fields(self, mock_hook):
         body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
         body_insert["description"] = "New description"
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=body_insert,
             request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
         )
@@ -582,7 +586,7 @@ def test_copy_with_some_validation_warnings(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
@@ -598,7 +602,7 @@ def test_copy_with_some_validation_warnings(self, mock_hook):
         body_insert["some_wrong_field"] = "test"
         body_insert["properties"]["some_other_wrong_field"] = "test"
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=body_insert,
             request_id=None,
         )
@@ -612,7 +616,7 @@ def test_successful_copy_template_with_updated_nested_fields(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={
@@ -628,7 +632,7 @@ def test_successful_copy_template_with_updated_nested_fields(self, mock_hook):
         body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
         body_insert["properties"]["machineType"] = "n1-standard-2"
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=body_insert,
             request_id=None
         )
@@ -642,7 +646,7 @@ def test_successful_copy_template_with_smaller_array_fields(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={
@@ -681,7 +685,7 @@ def test_successful_copy_template_with_smaller_array_fields(self, mock_hook):
             }
         ]
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=body_insert,
             request_id=None
         )
@@ -695,7 +699,7 @@ def test_successful_copy_template_with_bigger_array_fields(self, mock_hook):
             GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
         ]
         op = GceInstanceTemplateCopyOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             resource_id=GCE_INSTANCE_TEMPLATE_NAME,
             task_id='id',
             body_patch={
@@ -742,7 +746,7 @@ def test_successful_copy_template_with_bigger_array_fields(self, mock_hook):
             }
         ]
         mock_hook.return_value.insert_instance_template.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=body_insert,
             request_id=None,
         )
@@ -757,7 +761,7 @@ def test_missing_name(self, mock_hook):
         ]
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceTemplateCopyOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 resource_id=GCE_INSTANCE_TEMPLATE_NAME,
                 request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
                 task_id='id',
@@ -876,8 +880,8 @@ def test_successful_instance_group_update(self, mock_hook):
         mock_hook.return_value.get_instance_group_manager.return_value = \
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
@@ -887,8 +891,8 @@ def test_successful_instance_group_update(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='beta',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             body=GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH,
             request_id=None
@@ -902,8 +906,8 @@ def test_successful_instance_group_update_no_instance_template_field(self, mock_
         mock_hook.return_value.get_instance_group_manager.return_value = \
             instance_group_manager_no_template
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
@@ -916,8 +920,8 @@ def test_successful_instance_group_update_no_instance_template_field(self, mock_
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH)
         del expected_patch_no_instance_template['instanceTemplate']
         mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             body=expected_patch_no_instance_template,
             request_id=None
@@ -931,8 +935,8 @@ def test_successful_instance_group_update_no_versions_field(self, mock_hook):
         mock_hook.return_value.get_instance_group_manager.return_value = \
             instance_group_manager_no_versions
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
@@ -945,8 +949,8 @@ def test_successful_instance_group_update_no_versions_field(self, mock_hook):
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH)
         del expected_patch_no_versions['versions']
         mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             body=expected_patch_no_versions,
             request_id=None
@@ -958,8 +962,8 @@ def test_successful_instance_group_update_with_update_policy(self, mock_hook):
         mock_hook.return_value.get_instance_group_manager.return_value = \
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             update_policy=GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY,
@@ -974,8 +978,8 @@ def test_successful_instance_group_update_with_update_policy(self, mock_hook):
         expected_patch_with_update_policy['updatePolicy'] = \
             GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY
         mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             body=expected_patch_with_update_policy,
             request_id=None
@@ -987,8 +991,8 @@ def test_successful_instance_group_update_with_request_id(self, mock_hook):
         mock_hook.return_value.get_instance_group_manager.return_value = \
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
@@ -999,8 +1003,8 @@ def test_successful_instance_group_update_with_request_id(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='beta',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             body=GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH,
             request_id=GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID
@@ -1011,8 +1015,8 @@ def test_successful_instance_group_update_with_request_id(self, mock_hook):
     def test_try_to_use_api_v1(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             GceInstanceGroupManagerUpdateTemplateOperator(
-                project_id=PROJECT_ID,
-                zone=ZONE,
+                project_id=GCP_PROJECT_ID,
+                zone=GCE_ZONE,
                 resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
                 task_id='id',
                 api_version='v1',
@@ -1027,8 +1031,8 @@ def test_try_to_use_non_existing_template(self, mock_hook):
         mock_hook.return_value.get_instance_group_manager.return_value = \
             deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
         op = GceInstanceGroupManagerUpdateTemplateOperator(
-            project_id=PROJECT_ID,
-            zone=ZONE,
+            project_id=GCP_PROJECT_ID,
+            zone=GCE_ZONE,
             resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
             task_id='id',
             source_template=GCE_INSTANCE_TEMPLATE_NON_EXISTING_URL,
@@ -1039,3 +1043,123 @@ def test_try_to_use_non_existing_template(self, mock_hook):
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.patch_instance_group_manager.assert_not_called()
         self.assertTrue(result)
+
+
+ITEST_INSTANCE = os.environ.get('GCE_INSTANCE', 'testinstance')
+ITEST_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+ITEST_INSTANCE_GROUP_MANAGER_NAME = os.environ.get('GCE_INSTANCE_GROUP_MANAGER_NAME',
+                                                   'instance-group-test')
+ITEST_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
+ITEST_TEMPLATE_NAME = os.environ.get('GCE_TEMPLATE_NAME',
+                                     'instance-template-test')
+ITEST_NEW_TEMPLATE_NAME = os.environ.get('GCE_NEW_TEMPLATE_NAME',
+                                         'instance-template-test-new')
+
+
+@unittest.skipIf(
+    BaseGcpIntegrationTestCase.skip_check(GCP_COMPUTE_KEY), SKIP_TEST_WARNING)
+class GcpComputeExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+
+    @staticmethod
+    def delete_instance():
+        BaseGcpIntegrationTestCase.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID,
+            '--quiet', '--verbosity=none',
+            'instances', 'delete', ITEST_INSTANCE, '--zone', ITEST_ZONE,
+        ])
+
+    @staticmethod
+    def create_instance():
+        BaseGcpIntegrationTestCase.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID, '--quiet',
+            'instances', 'create', ITEST_INSTANCE,
+            '--zone', ITEST_ZONE
+        ])
+
+    def setUp(self):
+        super(GcpComputeExampleDagsIntegrationTest, self).setUp()
+        self.gcp_authenticate()
+        self.delete_instance()
+        self.create_instance()
+        self.gcp_revoke_authentication()
+
+    def tearDown(self):
+        self.gcp_authenticate()
+        self.delete_instance()
+        self.gcp_revoke_authentication()
+        super(GcpComputeExampleDagsIntegrationTest, self).tearDown()
+
+    def __init__(self, method_name='runTest'):
+        super(GcpComputeExampleDagsIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_compute',
+            gcp_key=GCP_COMPUTE_KEY)
+
+    def test_run_example_dag_compute_igm(self):
+        self._run_dag()
+
+
+@unittest.skipIf(
+    BaseGcpIntegrationTestCase.skip_check(GCP_COMPUTE_KEY), SKIP_TEST_WARNING)
+class GcpComputeIgmExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+
+    @staticmethod
+    def delete_instance_group_and_template(silent=False):
+        BaseGcpIntegrationTestCase.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID,
+            '--quiet', '--verbosity=none',
+            'instance-groups', 'managed', 'delete', ITEST_INSTANCE_GROUP_MANAGER_NAME,
+            '--zone', ITEST_ZONE
+        ], silent=silent)
+        BaseGcpIntegrationTestCase.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID,
+            '--quiet', '--verbosity=none',
+            'instance-templates', 'delete', ITEST_NEW_TEMPLATE_NAME
+        ], silent=silent)
+        BaseGcpIntegrationTestCase.execute_cmd([
+            'gcloud', 'beta', 'compute',
+            '--project', ITEST_PROJECT_ID,
+            '--quiet', '--verbosity=none',
+            'instance-templates', 'delete', ITEST_TEMPLATE_NAME
+        ], silent=silent)
+
+    @staticmethod
+    def create_instance_group_and_template():
+        BaseGcpIntegrationTestCase.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID, '--quiet',
+            'instance-templates', 'create', ITEST_TEMPLATE_NAME
+        ])
+        BaseGcpIntegrationTestCase.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID, '--quiet',
+            'instance-groups', 'managed', 'create', ITEST_INSTANCE_GROUP_MANAGER_NAME,
+            '--template', ITEST_TEMPLATE_NAME,
+            '--zone', ITEST_ZONE, '--size=1'
+        ])
+        BaseGcpIntegrationTestCase.execute_cmd([
+            'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID, '--quiet',
+            'instance-groups', 'managed', 'wait-until-stable',
+            ITEST_INSTANCE_GROUP_MANAGER_NAME,
+            '--zone', ITEST_ZONE
+        ])
+
+    def setUp(self):
+        super(GcpComputeIgmExampleDagsIntegrationTest, self).setUp()
+        self.gcp_authenticate()
+        self.delete_instance_group_and_template(silent=True)
+        self.create_instance_group_and_template()
+        self.gcp_revoke_authentication()
+
+    def tearDown(self):
+        self.gcp_authenticate()
+        self.delete_instance_group_and_template()
+        self.gcp_revoke_authentication()
+        super(GcpComputeIgmExampleDagsIntegrationTest, self).tearDown()
+
+    def __init__(self, method_name='runTest'):
+        super(GcpComputeIgmExampleDagsIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_compute_igm',
+            gcp_key=GCP_COMPUTE_KEY)
+
+    def test_run_example_dag_compute_igm(self):
+        self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_container_operator.py b/tests/contrib/operators/test_gcp_container_operator.py
index 1685e9c6ac..600c7cd8a8 100644
--- a/tests/contrib/operators/test_gcp_container_operator.py
+++ b/tests/contrib/operators/test_gcp_container_operator.py
@@ -33,7 +33,7 @@
     except ImportError:
         mock = None
 
-PROJECT_ID = 'test-id'
+TEST_GCP_PROJECT_ID = 'test-id'
 PROJECT_LOCATION = 'test-location'
 PROJECT_TASK_ID = 'test-task-id'
 CLUSTER_NAME = 'test-cluster-name'
@@ -55,7 +55,7 @@ class GoogleCloudPlatformContainerOperatorTest(unittest.TestCase):
 
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_create_execute(self, mock_hook):
-        operator = GKEClusterCreateOperator(project_id=PROJECT_ID,
+        operator = GKEClusterCreateOperator(project_id=TEST_GCP_PROJECT_ID,
                                             location=PROJECT_LOCATION,
                                             body=PROJECT_BODY_CREATE,
                                             task_id=PROJECT_TASK_ID)
@@ -67,7 +67,7 @@ def test_create_execute(self, mock_hook):
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_create_execute_error_body(self, mock_hook):
         with self.assertRaises(AirflowException):
-            operator = GKEClusterCreateOperator(project_id=PROJECT_ID,
+            operator = GKEClusterCreateOperator(project_id=TEST_GCP_PROJECT_ID,
                                                 location=PROJECT_LOCATION,
                                                 body=None,
                                                 task_id=PROJECT_TASK_ID)
@@ -88,7 +88,7 @@ def test_create_execute_error_project_id(self, mock_hook):
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_create_execute_error_location(self, mock_hook):
         with self.assertRaises(AirflowException):
-            operator = GKEClusterCreateOperator(project_id=PROJECT_ID,
+            operator = GKEClusterCreateOperator(project_id=TEST_GCP_PROJECT_ID,
                                                 body=PROJECT_BODY,
                                                 task_id=PROJECT_TASK_ID)
 
@@ -97,7 +97,7 @@ def test_create_execute_error_location(self, mock_hook):
 
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_delete_execute(self, mock_hook):
-        operator = GKEClusterDeleteOperator(project_id=PROJECT_ID,
+        operator = GKEClusterDeleteOperator(project_id=TEST_GCP_PROJECT_ID,
                                             name=CLUSTER_NAME,
                                             location=PROJECT_LOCATION,
                                             task_id=PROJECT_TASK_ID)
@@ -118,7 +118,7 @@ def test_delete_execute_error_project_id(self, mock_hook):
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_delete_execute_error_cluster_name(self, mock_hook):
         with self.assertRaises(AirflowException):
-            operator = GKEClusterDeleteOperator(project_id=PROJECT_ID,
+            operator = GKEClusterDeleteOperator(project_id=TEST_GCP_PROJECT_ID,
                                                 location=PROJECT_LOCATION,
                                                 task_id=PROJECT_TASK_ID)
 
@@ -128,7 +128,7 @@ def test_delete_execute_error_cluster_name(self, mock_hook):
     @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEClusterHook')
     def test_delete_execute_error_location(self, mock_hook):
         with self.assertRaises(AirflowException):
-            operator = GKEClusterDeleteOperator(project_id=PROJECT_ID,
+            operator = GKEClusterDeleteOperator(project_id=TEST_GCP_PROJECT_ID,
                                                 name=CLUSTER_NAME,
                                                 task_id=PROJECT_TASK_ID)
 
@@ -138,7 +138,7 @@ def test_delete_execute_error_location(self, mock_hook):
 
 class GKEPodOperatorTest(unittest.TestCase):
     def setUp(self):
-        self.gke_op = GKEPodOperator(project_id=PROJECT_ID,
+        self.gke_op = GKEPodOperator(project_id=TEST_GCP_PROJECT_ID,
                                      location=PROJECT_LOCATION,
                                      cluster_name=CLUSTER_NAME,
                                      task_id=PROJECT_TASK_ID,
@@ -167,7 +167,7 @@ def test_execute_conn_id_none(self, proc_mock, file_mock, exec_mock):
 
         # Assert the gcloud command being called correctly
         proc_mock.assert_called_with(
-            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, PROJECT_ID).split())
+            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, TEST_GCP_PROJECT_ID).split())
 
         self.assertEqual(self.gke_op.config_file, FILE_NAME)
 
@@ -197,7 +197,7 @@ def test_execute_conn_id_path(self, proc_mock, file_mock, exec_mock, get_con_moc
 
         # Assert the gcloud command being called correctly
         proc_mock.assert_called_with(
-            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, PROJECT_ID).split())
+            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, TEST_GCP_PROJECT_ID).split())
 
         self.assertEqual(self.gke_op.config_file, FILE_NAME)
 
@@ -232,7 +232,7 @@ def test_execute_conn_id_dict(self, proc_mock, file_mock, exec_mock, get_con_moc
 
         # Assert the gcloud command being called correctly
         proc_mock.assert_called_with(
-            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, PROJECT_ID).split())
+            GCLOUD_COMMAND.format(CLUSTER_NAME, PROJECT_LOCATION, TEST_GCP_PROJECT_ID).split())
 
         self.assertEqual(self.gke_op.config_file, FILE_NAME)
 
diff --git a/tests/contrib/operators/test_gcp_function_operator.py b/tests/contrib/operators/test_gcp_function_operator.py
index 46d599bf7d..48a9c47f08 100644
--- a/tests/contrib/operators/test_gcp_function_operator.py
+++ b/tests/contrib/operators/test_gcp_function_operator.py
@@ -29,6 +29,9 @@
 
 from copy import deepcopy
 
+from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
+    GCP_FUNCTION_KEY, SKIP_TEST_WARNING
+
 try:
     # noinspection PyProtectedMember
     from unittest import mock
@@ -41,20 +44,21 @@
 EMPTY_CONTENT = ''.encode('utf8')
 MOCK_RESP_404 = type('', (object,), {"status": 404})()
 
-PROJECT_ID = 'test_project_id'
-LOCATION = 'test_region'
-SOURCE_ARCHIVE_URL = 'gs://folder/file.zip'
-ENTRYPOINT = 'helloWorld'
-FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
-                                                               ENTRYPOINT)
-RUNTIME = 'nodejs6'
+GCP_PROJECT_ID = 'test_project_id'
+GCP_LOCATION = 'test_region'
+GCF_SOURCE_ARCHIVE_URL = 'gs://folder/file.zip'
+GCF_ENTRYPOINT = 'helloWorld'
+FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
+                                                               GCP_LOCATION,
+                                                               GCF_ENTRYPOINT)
+GCF_RUNTIME = 'nodejs6'
 VALID_RUNTIMES = ['nodejs6', 'nodejs8', 'python37']
 VALID_BODY = {
     "name": FUNCTION_NAME,
-    "entryPoint": ENTRYPOINT,
-    "runtime": RUNTIME,
+    "entryPoint": GCF_ENTRYPOINT,
+    "runtime": GCF_RUNTIME,
     "httpsTrigger": {},
-    "sourceArchiveUrl": SOURCE_ARCHIVE_URL
+    "sourceArchiveUrl": GCF_SOURCE_ARCHIVE_URL
 }
 
 
@@ -100,8 +104,8 @@ def test_deploy_execute(self, mock_hook):
             side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found'))
         mock_hook.return_value.create_new_function.return_value = True
         op = GcfFunctionDeployOperator(
-            project_id=PROJECT_ID,
-            location=LOCATION,
+            project_id=GCP_PROJECT_ID,
+            location=GCP_LOCATION,
             body=deepcopy(VALID_BODY),
             task_id="id"
         )
@@ -125,8 +129,8 @@ def test_update_function_if_exists(self, mock_hook):
         mock_hook.return_value.get_function.return_value = True
         mock_hook.return_value.update_function.return_value = True
         op = GcfFunctionDeployOperator(
-            project_id=PROJECT_ID,
-            location=LOCATION,
+            project_id=GCP_PROJECT_ID,
+            location=GCP_LOCATION,
             body=deepcopy(VALID_BODY),
             task_id="id"
         )
@@ -639,3 +643,29 @@ def test_non_404_gcf_error_bubbled_up(self, mock_hook):
         mock_hook.return_value.delete_function.assert_called_once_with(
             'projects/project_name/locations/project_location/functions/function_name'
         )
+
+
+@unittest.skipIf(
+    BaseGcpIntegrationTestCase.skip_check(GCP_FUNCTION_KEY), SKIP_TEST_WARNING)
+class CloudFunctionsDeleteExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudFunctionsDeleteExampleDagsIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_function_delete',
+            gcp_key=GCP_FUNCTION_KEY)
+
+    def test_run_example_dag_delete_query(self):
+        self._run_dag()
+
+
+@unittest.skipIf(
+    BaseGcpIntegrationTestCase.skip_check(GCP_FUNCTION_KEY), SKIP_TEST_WARNING)
+class CloudFunctionsDeployDeleteExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudFunctionsDeployDeleteExampleDagsIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_function_deploy_delete',
+            gcp_key=GCP_FUNCTION_KEY)
+
+    def test_run_example_dag_deploy_delete_query(self):
+        self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_sql_operator.py b/tests/contrib/operators/test_gcp_sql_operator.py
index 9f631493e0..a8395b8dc0 100644
--- a/tests/contrib/operators/test_gcp_sql_operator.py
+++ b/tests/contrib/operators/test_gcp_sql_operator.py
@@ -20,20 +20,18 @@
 import os
 import unittest
 
+
 import time
 from parameterized import parameterized
 from uuid import uuid1
 
 from airflow import AirflowException
-from airflow.contrib.hooks.gcp_sql_hook import CloudSqlProxyRunner
 from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
     CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \
     CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \
     CloudSqlInstanceExportOperator, CloudSqlInstanceImportOperator, \
     CloudSqlInstanceDatabaseDeleteOperator, CloudSqlQueryOperator
 from airflow.models import Connection
-from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
-    GCP_CLOUDSQL_KEY, SKIP_TEST_WARNING
 
 try:
     # noinspection PyProtectedMember
@@ -44,7 +42,7 @@
     except ImportError:
         mock = None
 
-PROJECT_ID = os.environ.get('PROJECT_ID', 'project-id')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'project-id')
 INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'test-name')
 DB_NAME = os.environ.get('DB_NAME', 'db1')
 
@@ -127,7 +125,7 @@
     "name": DB_NAME,            # The name of the database in the Cloud SQL instance.
                                 # This does not include the project ID or instance name.
 
-    "project": PROJECT_ID,      # The project ID of the project containing the Cloud SQL
+    "project": GCP_PROJECT_ID,  # The project ID of the project containing the Cloud SQL
                                 # database. The Google apps domain is prefixed if
                                 # applicable.
 
@@ -178,7 +176,7 @@ def test_instance_create(self, mock_hook, _check_if_instance_exists):
         _check_if_instance_exists.return_value = False
         mock_hook.return_value.create_instance.return_value = True
         op = CloudSqlInstanceCreateOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             instance=INSTANCE_NAME,
             body=CREATE_BODY,
             task_id="id"
@@ -189,7 +187,7 @@ def test_instance_create(self, mock_hook, _check_if_instance_exists):
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.create_instance.assert_called_once_with(
-            PROJECT_ID, CREATE_BODY
+            GCP_PROJECT_ID, CREATE_BODY
         )
         self.assertIsNone(result)
 
@@ -200,7 +198,7 @@ def test_instance_create_idempotent(self, mock_hook, _check_if_instance_exists):
         _check_if_instance_exists.return_value = True
         mock_hook.return_value.create_instance.return_value = True
         op = CloudSqlInstanceCreateOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             instance=INSTANCE_NAME,
             body=CREATE_BODY,
             task_id="id"
@@ -231,7 +229,7 @@ def test_create_should_throw_ex_when_empty_project_id(self, mock_hook):
     def test_create_should_throw_ex_when_empty_body(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = CloudSqlInstanceCreateOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 body={},
                 instance=INSTANCE_NAME,
                 task_id="id"
@@ -245,7 +243,7 @@ def test_create_should_throw_ex_when_empty_body(self, mock_hook):
     def test_create_should_throw_ex_when_empty_instance(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = CloudSqlInstanceCreateOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 body=CREATE_BODY,
                 instance="",
                 task_id="id"
@@ -269,7 +267,7 @@ def test_create_should_validate_list_type(self, mock_hook):
         }
         with self.assertRaises(AirflowException) as cm:
             op = CloudSqlInstanceCreateOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 body=wrong_list_type_body,
                 instance=INSTANCE_NAME,
                 task_id="id"
@@ -292,7 +290,7 @@ def test_create_should_validate_non_empty_fields(self, mock_hook):
         }
         with self.assertRaises(AirflowException) as cm:
             op = CloudSqlInstanceCreateOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 body=empty_tier_body,
                 instance=INSTANCE_NAME,
                 task_id="id"
@@ -308,7 +306,7 @@ def test_create_should_validate_non_empty_fields(self, mock_hook):
     def test_instance_patch(self, mock_hook):
         mock_hook.return_value.patch_instance.return_value = True
         op = CloudSqlInstancePatchOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             body=PATCH_BODY,
             instance=INSTANCE_NAME,
             task_id="id"
@@ -317,7 +315,7 @@ def test_instance_patch(self, mock_hook):
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.patch_instance.assert_called_once_with(
-            PROJECT_ID, PATCH_BODY, INSTANCE_NAME
+            GCP_PROJECT_ID, PATCH_BODY, INSTANCE_NAME
         )
         self.assertTrue(result)
 
@@ -329,7 +327,7 @@ def test_instance_patch_should_bubble_up_ex_if_not_exists(self, mock_hook,
         _check_if_instance_exists.return_value = False
         with self.assertRaises(AirflowException) as cm:
             op = CloudSqlInstancePatchOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 body=PATCH_BODY,
                 instance=INSTANCE_NAME,
                 task_id="id"
@@ -347,7 +345,7 @@ def test_instance_patch_should_bubble_up_ex_if_not_exists(self, mock_hook,
     def test_instance_delete(self, mock_hook, _check_if_instance_exists):
         _check_if_instance_exists.return_value = True
         op = CloudSqlInstanceDeleteOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             instance=INSTANCE_NAME,
             task_id="id"
         )
@@ -356,7 +354,7 @@ def test_instance_delete(self, mock_hook, _check_if_instance_exists):
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.delete_instance.assert_called_once_with(
-            PROJECT_ID, INSTANCE_NAME
+            GCP_PROJECT_ID, INSTANCE_NAME
         )
 
     @mock.patch("airflow.contrib.operators.gcp_sql_operator"
@@ -368,7 +366,7 @@ def test_instance_delete_should_abort_and_succeed_if_not_exists(
             _check_if_instance_exists):
         _check_if_instance_exists.return_value = False
         op = CloudSqlInstanceDeleteOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             instance=INSTANCE_NAME,
             task_id="id"
         )
@@ -384,7 +382,7 @@ def test_instance_delete_should_abort_and_succeed_if_not_exists(
     def test_instance_db_create(self, mock_hook, _check_if_db_exists):
         _check_if_db_exists.return_value = False
         op = CloudSqlInstanceDatabaseCreateOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             instance=INSTANCE_NAME,
             body=DATABASE_INSERT_BODY,
             task_id="id"
@@ -393,7 +391,7 @@ def test_instance_db_create(self, mock_hook, _check_if_db_exists):
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.create_database.assert_called_once_with(
-            PROJECT_ID, INSTANCE_NAME, DATABASE_INSERT_BODY
+            GCP_PROJECT_ID, INSTANCE_NAME, DATABASE_INSERT_BODY
         )
         self.assertTrue(result)
 
@@ -404,7 +402,7 @@ def test_instance_db_create_should_abort_and_succeed_if_exists(
             self, mock_hook, _check_if_db_exists):
         _check_if_db_exists.return_value = True
         op = CloudSqlInstanceDatabaseCreateOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             instance=INSTANCE_NAME,
             body=DATABASE_INSERT_BODY,
             task_id="id"
@@ -421,7 +419,7 @@ def test_instance_db_create_should_abort_and_succeed_if_exists(
     def test_instance_db_patch(self, mock_hook, _check_if_db_exists):
         _check_if_db_exists.return_value = True
         op = CloudSqlInstanceDatabasePatchOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             instance=INSTANCE_NAME,
             database=DB_NAME,
             body=DATABASE_PATCH_BODY,
@@ -431,7 +429,7 @@ def test_instance_db_patch(self, mock_hook, _check_if_db_exists):
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.patch_database.assert_called_once_with(
-            PROJECT_ID, INSTANCE_NAME, DB_NAME, DATABASE_PATCH_BODY
+            GCP_PROJECT_ID, INSTANCE_NAME, DB_NAME, DATABASE_PATCH_BODY
         )
         self.assertTrue(result)
 
@@ -443,7 +441,7 @@ def test_instance_db_patch_should_throw_ex_if_not_exists(
         _check_if_db_exists.return_value = False
         with self.assertRaises(AirflowException) as cm:
             op = CloudSqlInstanceDatabasePatchOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 instance=INSTANCE_NAME,
                 database=DB_NAME,
                 body=DATABASE_PATCH_BODY,
@@ -461,7 +459,7 @@ def test_instance_db_patch_should_throw_ex_if_not_exists(
     def test_instance_db_patch_should_throw_ex_when_empty_database(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = CloudSqlInstanceDatabasePatchOperator(
-                project_id=PROJECT_ID,
+                project_id=GCP_PROJECT_ID,
                 instance=INSTANCE_NAME,
                 database="",
                 body=DATABASE_INSERT_BODY,
@@ -479,7 +477,7 @@ def test_instance_db_patch_should_throw_ex_when_empty_database(self, mock_hook):
     def test_instance_db_delete(self, mock_hook, _check_if_db_exists):
         _check_if_db_exists.return_value = True
         op = CloudSqlInstanceDatabaseDeleteOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             instance=INSTANCE_NAME,
             database=DB_NAME,
             task_id="id"
@@ -489,7 +487,7 @@ def test_instance_db_delete(self, mock_hook, _check_if_db_exists):
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.delete_database.assert_called_once_with(
-            PROJECT_ID, INSTANCE_NAME, DB_NAME
+            GCP_PROJECT_ID, INSTANCE_NAME, DB_NAME
         )
 
     @mock.patch("airflow.contrib.operators.gcp_sql_operator"
@@ -499,7 +497,7 @@ def test_instance_db_delete_should_abort_and_succeed_if_not_exists(
             self, mock_hook, _check_if_db_exists):
         _check_if_db_exists.return_value = False
         op = CloudSqlInstanceDatabaseDeleteOperator(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             instance=INSTANCE_NAME,
             database=DB_NAME,
             task_id="id"
@@ -580,7 +578,8 @@ def test_create_operator_with_wrong_parameters(self,
                                                    get_connections):
         connection = Connection()
         connection.parse_from_uri(
-            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type={database_type}&"
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?"
+            "database_type={database_type}&"
             "project_id={project_id}&location={location}&instance={instance_name}&"
             "use_proxy={use_proxy}&use_ssl={use_ssl}".
             format(database_type=database_type,
@@ -796,89 +795,3 @@ def test_create_operator_with_correct_parameters_mysql_tcp(self, get_connections
         self.assertEqual('127.0.0.1', conn.host)
         self.assertNotEqual(3200, conn.port)
         self.assertEqual('testdb', conn.schema)
-
-
-@unittest.skipIf(
-    BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
-class CloudSqlProxyIntegrationTest(BaseGcpIntegrationTestCase):
-    def __init__(self, method_name='runTest'):
-        super(CloudSqlProxyIntegrationTest, self).__init__(
-            method_name,
-            dag_id='example_gcp_sql_query',
-            gcp_key='gcp_cloudsql.json')
-
-    def test_start_proxy_fail_no_parameters(self):
-        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
-                                     project_id=PROJECT_ID,
-                                     instance_specification='a')
-        with self.assertRaises(AirflowException) as cm:
-            runner.start_proxy()
-        err = cm.exception
-        self.assertIn("invalid instance name", str(err))
-        with self.assertRaises(AirflowException) as cm:
-            runner.start_proxy()
-        err = cm.exception
-        self.assertIn("invalid instance name", str(err))
-        self.assertIsNone(runner.sql_proxy_process)
-
-    def test_start_proxy_with_all_instances(self):
-        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
-                                     project_id=PROJECT_ID,
-                                     instance_specification='')
-        try:
-            runner.start_proxy()
-            time.sleep(1)
-        finally:
-            runner.stop_proxy()
-        self.assertIsNone(runner.sql_proxy_process)
-
-    def test_start_proxy_with_all_instances_generated_credential_file(self):
-        self.update_connection_with_dictionary()
-        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
-                                     project_id=PROJECT_ID,
-                                     instance_specification='')
-        try:
-            runner.start_proxy()
-            time.sleep(1)
-        finally:
-            runner.stop_proxy()
-        self.assertIsNone(runner.sql_proxy_process)
-
-    def test_start_proxy_with_all_instances_specific_version(self):
-        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
-                                     project_id=PROJECT_ID,
-                                     instance_specification='',
-                                     sql_proxy_version='v1.13')
-        try:
-            runner.start_proxy()
-            time.sleep(1)
-        finally:
-            runner.stop_proxy()
-        self.assertIsNone(runner.sql_proxy_process)
-        self.assertEqual(runner.get_proxy_version(), "1.13")
-
-
-@unittest.skipIf(
-    BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
-class CloudSqlExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
-    def __init__(self, method_name='runTest'):
-        super(CloudSqlExampleDagsIntegrationTest, self).__init__(
-            method_name,
-            dag_id='example_gcp_sql',
-            gcp_key=GCP_CLOUDSQL_KEY)
-
-    def test_run_example_dag_cloudsql_query(self):
-        self._run_dag()
-
-
-@unittest.skipIf(
-    BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
-class CloudSqlQueryExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
-    def __init__(self, method_name='runTest'):
-        super(CloudSqlQueryExampleDagsIntegrationTest, self).__init__(
-            method_name,
-            dag_id='example_gcp_sql_query',
-            gcp_key=GCP_CLOUDSQL_KEY)
-
-    def test_run_example_dag_cloudsql_query(self):
-        self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_sql_query_operator.py b/tests/contrib/operators/test_gcp_sql_query_operator.py
new file mode 100644
index 0000000000..833cb03bd4
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_sql_query_operator.py
@@ -0,0 +1,168 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import unittest
+
+import time
+
+from uuid import uuid1
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_sql_hook import CloudSqlProxyRunner
+from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
+    GCP_CLOUDSQL_KEY
+from tests.contrib.operators.test_gcp_sql_query_operator_helper import \
+    CloudSqlQueryTestHelper
+
+try:
+    # noinspection PyProtectedMember
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'project-id')
+INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'test-name')
+DB_NAME = os.environ.get('DB_NAME', 'db1')
+
+
+SKIP_CLOUDSQL_QUERY_WARNING = """
+    This test is skipped from automated runs intentionally
+    as creating databases in Google Cloud SQL takes a very
+    long time. You can still set GCP_ENABLE_CLOUDSQL_QUERY_TEST 
+    environment variable to 'True' and then you should be able to
+    run it manually after you create the database
+    Creating the database can be done by running this python
+    file as python program with --action=create flag
+    (you should remember to delete the database with --action=delete flag)
+"""
+GCP_ENABLE_CLOUDSQL_QUERY_TEST = os.environ.get('GCP_ENABLE_CLOUDSQL_QUERY_TEST')
+
+if GCP_ENABLE_CLOUDSQL_QUERY_TEST == 'True':
+    skip_cloudsql_query_test = False
+else:
+    skip_cloudsql_query_test = True
+
+
+@unittest.skipIf(skip_cloudsql_query_test, SKIP_CLOUDSQL_QUERY_WARNING)
+class CloudSqlProxyIntegrationTest(BaseGcpIntegrationTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudSqlProxyIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_sql_query',
+            gcp_key='gcp_cloudsql.json')
+
+    def test_start_proxy_fail_no_parameters(self):
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
+                                     project_id=GCP_PROJECT_ID,
+                                     instance_specification='a')
+        with self.assertRaises(AirflowException) as cm:
+            runner.start_proxy()
+        err = cm.exception
+        self.assertIn("invalid instance name", str(err))
+        with self.assertRaises(AirflowException) as cm:
+            runner.start_proxy()
+        err = cm.exception
+        self.assertIn("invalid instance name", str(err))
+        self.assertIsNone(runner.sql_proxy_process)
+
+    def test_start_proxy_with_all_instances(self):
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
+                                     project_id=GCP_PROJECT_ID,
+                                     instance_specification='')
+        try:
+            runner.start_proxy()
+            time.sleep(1)
+        finally:
+            runner.stop_proxy()
+        self.assertIsNone(runner.sql_proxy_process)
+
+    def test_start_proxy_with_all_instances_generated_credential_file(self):
+        self.update_connection_with_dictionary()
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
+                                     project_id=GCP_PROJECT_ID,
+                                     instance_specification='')
+        try:
+            runner.start_proxy()
+            time.sleep(1)
+        finally:
+            runner.stop_proxy()
+        self.assertIsNone(runner.sql_proxy_process)
+
+    def test_start_proxy_with_all_instances_specific_version(self):
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
+                                     project_id=GCP_PROJECT_ID,
+                                     instance_specification='',
+                                     sql_proxy_version='v1.13')
+        try:
+            runner.start_proxy()
+            time.sleep(1)
+        finally:
+            runner.stop_proxy()
+        self.assertIsNone(runner.sql_proxy_process)
+        self.assertEqual(runner.get_proxy_version(), "1.13")
+
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
+
+ITEST_POSTGRES_INSTANCE_NAME = os.environ.get('GCSQL_POSTGRES_INSTANCE_NAME',
+                                              'testpostgres')
+ITEST_MYSQL_INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME',
+                                           'testmysql')
+GCSQL_POSTGRES_SERVER_CA_FILE = os.environ.get('GCSQL_POSTGRES_SERVER_CA_FILE',
+                                               ".key/postgres-server-ca.pem")
+GCSQL_POSTGRES_CLIENT_CERT_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_CERT_FILE',
+                                                 ".key/postgres-client-cert.pem")
+GCSQL_POSTGRES_CLIENT_KEY_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_KEY_FILE',
+                                                ".key/postgres-client-key.pem")
+GCSQL_POSTGRES_PUBLIC_IP_FILE = os.environ.get('GCSQL_POSTGRES_PUBLIC_IP_FILE',
+                                               ".key/postgres-ip.env")
+GCSQL_POSTGRES_USER = os.environ.get('GCSQL_POSTGRES_USER', 'postgres_user')
+GCSQL_POSTGRES_DATABASE_NAME = os.environ.get('GCSQL_POSTGRES_DATABASE_NAME',
+                                              'postgresdb')
+GCSQL_MYSQL_CLIENT_CERT_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_CERT_FILE',
+                                              ".key/mysql-client-cert.pem")
+GCSQL_MYSQL_CLIENT_KEY_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_KEY_FILE',
+                                             ".key/mysql-client-key.pem")
+GCSQL_MYSQL_SERVER_CA_FILE = os.environ.get('GCSQL_MYSQL_SERVER_CA_FILE',
+                                            ".key/mysql-server-ca.pem")
+GCSQL_MYSQL_PUBLIC_IP_FILE = os.environ.get('GCSQL_MYSQL_PUBLIC_IP_FILE',
+                                            ".key/mysql-ip.env")
+GCSQL_MYSQL_USER = os.environ.get('GCSQL_MYSQL_USER', 'mysql_user')
+GCSQL_MYSQL_DATABASE_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'mysqldb')
+DB_VERSION_MYSQL = 'MYSQL_5_7'
+DV_VERSION_POSTGRES = 'POSTGRES_9_6'
+
+
+@unittest.skipIf(skip_cloudsql_query_test, SKIP_CLOUDSQL_QUERY_WARNING)
+class CloudSqlQueryExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+
+    def __init__(self, method_name='runTest'):
+        self.helper = CloudSqlQueryTestHelper()
+        self.helper.set_ip_addresses_in_env()
+        super(CloudSqlQueryExampleDagsIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_sql_query',
+            gcp_key=GCP_CLOUDSQL_KEY)
+
+    def test_run_example_dag_cloudsql_query(self):
+        self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_sql_query_operator_helper.py b/tests/contrib/operators/test_gcp_sql_query_operator_helper.py
new file mode 100644
index 0000000000..4eff471712
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_sql_query_operator_helper.py
@@ -0,0 +1,423 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import errno
+import logging
+import os
+from os.path import expanduser
+
+import argparse
+from threading import Thread
+
+import time
+
+from airflow import LoggingMixin
+from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase
+
+try:
+    # noinspection PyProtectedMember
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'project-id')
+INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'test-name')
+DB_NAME = os.environ.get('DB_NAME', 'db1')
+
+
+SKIP_CLOUDSQL_QUERY_WARNING = """
+    This test is skipped from automated runs intentionally
+    as creating databases in Google Cloud SQL takes a very
+    long time. You can still set GCP_ENABLE_CLOUDSQL_QUERY_TEST 
+    environment variable to 'True' and then you should be able to
+    run it manually after you create the database
+    Creating the database can be done by running this python
+    file as python program with --action=create flag
+    (you should remember to delete the database with --action=delete flag)
+"""
+GCP_ENABLE_CLOUDSQL_QUERY_TEST = os.environ.get('GCP_ENABLE_CLOUDSQL_QUERY_TEST')
+
+if GCP_ENABLE_CLOUDSQL_QUERY_TEST == 'True':
+    skip_cloudsql_query_test = False
+else:
+    skip_cloudsql_query_test = True
+
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
+
+ITEST_POSTGRES_INSTANCE_NAME = os.environ.get('GCSQL_POSTGRES_INSTANCE_NAME',
+                                              'testpostgres')
+ITEST_MYSQL_INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME',
+                                           'testmysql')
+GCSQL_POSTGRES_SERVER_CA_FILE = os.environ.get('GCSQL_POSTGRES_SERVER_CA_FILE',
+                                               ".key/postgres-server-ca.pem")
+GCSQL_POSTGRES_CLIENT_CERT_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_CERT_FILE',
+                                                 ".key/postgres-client-cert.pem")
+GCSQL_POSTGRES_CLIENT_KEY_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_KEY_FILE',
+                                                ".key/postgres-client-key.pem")
+GCSQL_POSTGRES_PUBLIC_IP_FILE = os.environ.get('GCSQL_POSTGRES_PUBLIC_IP_FILE',
+                                               ".key/postgres-ip.env")
+GCSQL_POSTGRES_USER = os.environ.get('GCSQL_POSTGRES_USER', 'postgres_user')
+GCSQL_POSTGRES_DATABASE_NAME = os.environ.get('GCSQL_POSTGRES_DATABASE_NAME',
+                                              'postgresdb')
+GCSQL_MYSQL_CLIENT_CERT_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_CERT_FILE',
+                                              ".key/mysql-client-cert.pem")
+GCSQL_MYSQL_CLIENT_KEY_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_KEY_FILE',
+                                             ".key/mysql-client-key.pem")
+GCSQL_MYSQL_SERVER_CA_FILE = os.environ.get('GCSQL_MYSQL_SERVER_CA_FILE',
+                                            ".key/mysql-server-ca.pem")
+GCSQL_MYSQL_PUBLIC_IP_FILE = os.environ.get('GCSQL_MYSQL_PUBLIC_IP_FILE',
+                                            ".key/mysql-ip.env")
+GCSQL_MYSQL_USER = os.environ.get('GCSQL_MYSQL_USER', 'mysql_user')
+GCSQL_MYSQL_DATABASE_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'mysqldb')
+DB_VERSION_MYSQL = 'MYSQL_5_7'
+DV_VERSION_POSTGRES = 'POSTGRES_9_6'
+
+HOME_DIR = expanduser("~")
+
+
+def get_absolute_path(path):
+    if path.startswith("/"):
+        return path
+    else:
+        return os.path.join(HOME_DIR, path)
+
+
+server_ca_file_postgres = get_absolute_path(GCSQL_POSTGRES_SERVER_CA_FILE)
+client_cert_file_postgres = get_absolute_path(GCSQL_POSTGRES_CLIENT_CERT_FILE)
+client_key_file_postgres = get_absolute_path(GCSQL_POSTGRES_CLIENT_KEY_FILE)
+
+server_ca_file_mysql = get_absolute_path(GCSQL_MYSQL_SERVER_CA_FILE)
+client_cert_file_mysql = get_absolute_path(GCSQL_MYSQL_CLIENT_CERT_FILE)
+client_key_file_mysql = get_absolute_path(GCSQL_MYSQL_CLIENT_KEY_FILE)
+
+
+class CloudSqlQueryTestHelper(LoggingMixin):
+    @staticmethod
+    def create_instances():
+        thread_mysql = Thread(target=lambda: helper.__create_instance(
+            ITEST_MYSQL_INSTANCE_NAME, DB_VERSION_MYSQL))
+        thread_postgres = Thread(target=lambda: helper.__create_instance(
+            ITEST_POSTGRES_INSTANCE_NAME, DV_VERSION_POSTGRES))
+        thread_mysql.start()
+        thread_postgres.start()
+        thread_mysql.join()
+        thread_postgres.join()
+
+    @staticmethod
+    def delete_instances():
+        thread_mysql = Thread(target=lambda: helper.__delete_instance(
+            ITEST_MYSQL_INSTANCE_NAME))
+        thread_postgres = Thread(target=lambda: helper.__delete_instance(
+            ITEST_POSTGRES_INSTANCE_NAME))
+        thread_mysql.start()
+        thread_postgres.start()
+        thread_mysql.join()
+        thread_postgres.join()
+
+    @staticmethod
+    def get_ip_addresses():
+        with open(GCSQL_MYSQL_PUBLIC_IP_FILE, "w") as f:
+            f.write(helper.__get_ip_address(
+                    ITEST_MYSQL_INSTANCE_NAME, 'GCSQL_MYSQL_PUBLIC_IP'))
+        with open(GCSQL_POSTGRES_PUBLIC_IP_FILE, "w") as f:
+            f.write(helper.__get_ip_address(
+                    ITEST_POSTGRES_INSTANCE_NAME, 'GCSQL_POSTGRES_PUBLIC_IP'))
+
+    def authorize_address(self):
+        ip = self.__get_my_public_ip()
+        self.log.info('Authorizing access from IP: %s', ip)
+        postgres_thread = Thread(target=lambda: BaseGcpIntegrationTestCase.execute_cmd(
+            ['gcloud', 'sql', 'instances', 'patch',
+             ITEST_POSTGRES_INSTANCE_NAME, '--quiet',
+             "--authorized-networks={}".format(ip),
+             "--project={}".format(
+                 BaseGcpIntegrationTestCase.get_project_id())]))
+        mysql_thread = Thread(target=lambda: BaseGcpIntegrationTestCase.execute_cmd(
+            ['gcloud', 'sql', 'instances', 'patch',
+             ITEST_MYSQL_INSTANCE_NAME, '--quiet',
+             "--authorized-networks={}".format(ip),
+             "--project={}".format(
+                 BaseGcpIntegrationTestCase.get_project_id())]))
+        postgres_thread.start()
+        mysql_thread.start()
+        postgres_thread.join()
+        mysql_thread.join()
+
+    def setup_instances(self):
+        mysql_thread = Thread(target=lambda: self.__setup_instance_and_certs(
+            ITEST_MYSQL_INSTANCE_NAME, DB_VERSION_MYSQL, server_ca_file_mysql,
+            client_key_file_mysql, client_cert_file_mysql, GCSQL_MYSQL_DATABASE_NAME,
+            GCSQL_MYSQL_USER
+        ))
+        postgres_thread = Thread(target=lambda: self.__setup_instance_and_certs(
+            ITEST_POSTGRES_INSTANCE_NAME, DV_VERSION_POSTGRES, server_ca_file_postgres,
+            client_key_file_postgres, client_cert_file_postgres,
+            GCSQL_POSTGRES_DATABASE_NAME, GCSQL_POSTGRES_USER
+        ))
+        mysql_thread.start()
+        postgres_thread.start()
+        mysql_thread.join()
+        postgres_thread.join()
+        self.get_ip_addresses()
+        self.authorize_address()
+
+    def __create_instance(self, instance_name, db_version):
+        self.log.info('Creating a test %s instance "%s"...', db_version, instance_name)
+        try:
+            create_instance_opcode = self.__create_sql_instance(instance_name, db_version)
+            if create_instance_opcode:  # return code 1, some error occurred
+                operation_name = self.__get_operation_name(instance_name)
+                self.log.info('Waiting for operation: %s ...', operation_name)
+                self.__wait_for_create(operation_name)
+                self.log.info('... Done.')
+
+            self.log.info('... Done creating a test %s instance "%s"!\n',
+                          db_version, instance_name)
+        except Exception as ex:
+            self.log.error('Exception occurred. '
+                           'Aborting creating a test instance.\n\n%s', ex)
+            raise ex
+
+    @staticmethod
+    def set_ip_addresses_in_env():
+        CloudSqlQueryTestHelper.__set_ip_address_in_env(GCSQL_MYSQL_PUBLIC_IP_FILE)
+        CloudSqlQueryTestHelper.__set_ip_address_in_env(GCSQL_POSTGRES_PUBLIC_IP_FILE)
+
+    @staticmethod
+    def __set_ip_address_in_env(file_name):
+        with open(file_name, "r") as f:
+            env, ip = f.read().split("=")
+            os.environ[env] = ip
+
+    def __setup_instance_and_certs(self, instance_name, db_version, server_ca_file,
+                                   client_key_file, client_cert_file, db_name,
+                                   db_username):
+        self.log.info('Setting up a test %s instance "%s"...', db_version, instance_name)
+        try:
+            self.__remove_keys_and_certs([server_ca_file, client_key_file,
+                                          client_cert_file])
+
+            self.__wait_for_operations(instance_name)
+            self.__write_to_file(server_ca_file, self.__get_server_ca_cert(instance_name))
+            client_cert_name = 'client-cert'
+            self.__wait_for_operations(instance_name)
+            self.__delete_client_cert(instance_name, client_cert_name)
+            self.__wait_for_operations(instance_name)
+            self.__create_client_cert(instance_name, client_key_file, client_cert_name)
+            self.__wait_for_operations(instance_name)
+            self.__write_to_file(client_cert_file,
+                                 self.__get_client_cert(instance_name, client_cert_name))
+            self.__wait_for_operations(instance_name)
+            self.__wait_for_operations(instance_name)
+            self.__create_user(instance_name, db_username)
+            self.__wait_for_operations(instance_name)
+            self.__delete_db(instance_name, db_name)
+            self.__create_db(instance_name, db_name)
+            self.log.info('... Done setting up a test %s instance "%s"!\n',
+                          db_version, instance_name)
+        except Exception as ex:
+            self.log.error('Exception occurred. '
+                           'Aborting setting up test instance and certs.\n\n%s', ex)
+            raise ex
+
+    def __delete_instance(self, instance_name):
+        # type: (str) -> None
+        self.log.info('Deleting Cloud SQL instance "%s"...', instance_name)
+        BaseGcpIntegrationTestCase.execute_cmd(['gcloud', 'sql', 'instances', 'delete',
+                                                instance_name, '--quiet'])
+        self.log.info('... Done.')
+
+    @staticmethod
+    def __get_my_public_ip():
+        return BaseGcpIntegrationTestCase.check_output(
+            ['curl', 'https://ipinfo.io/ip']).decode('utf-8').strip()
+
+    @staticmethod
+    def __create_sql_instance(instance_name, db_version):
+        # type: (str, str) -> int
+        return BaseGcpIntegrationTestCase \
+            .execute_cmd(['gcloud', 'sql', 'instances', 'create', instance_name,
+                          '--region',  GCP_LOCATION,
+                          '--project', GCP_PROJECT_ID,
+                          '--database-version', db_version,
+                          '--tier', 'db-f1-micro'])
+
+    def __get_server_ca_cert(self, instance_name):
+        # type: (str) -> bytes
+        self.log.info('Getting server CA cert for "%s"...', instance_name)
+        output = BaseGcpIntegrationTestCase.check_output(
+            ['gcloud', 'sql', 'instances', 'describe', instance_name,
+             '--format=value(serverCaCert.cert)'])
+        self.log.info('... Done.')
+        return output
+
+    def __get_client_cert(self, instance_name, client_cert_name):
+        # type: (str, str) -> bytes
+        self.log.info('Getting client cert for "%s"...', instance_name)
+        output = BaseGcpIntegrationTestCase.check_output(
+            ['gcloud', 'sql', 'ssl', 'client-certs', 'describe', client_cert_name, '-i',
+             instance_name, '--format=get(cert)'])
+        self.log.info('... Done.')
+        return output
+
+    def __create_user(self, instance_name, username):
+        # type: (str, str) -> None
+        self.log.info('Creating user "%s" in Cloud SQL instance "%s"...', username,
+                      instance_name)
+        BaseGcpIntegrationTestCase \
+            .execute_cmd(['gcloud', 'sql', 'users', 'create', username, '-i',
+                          instance_name, '--host', '%', '--password', 'JoxHlwrPzwch0gz9',
+                          '--quiet'])
+        self.log.info('... Done.')
+
+    def __delete_db(self, instance_name, db_name):
+        # type: (str, str) -> None
+        self.log.info('Deleting database "%s" in Cloud SQL instance "%s"...', db_name,
+                      instance_name)
+        BaseGcpIntegrationTestCase \
+            .execute_cmd(['gcloud', 'sql', 'databases', 'delete', db_name, '-i',
+                          instance_name, '--quiet'])
+        self.log.info('... Done.')
+
+    def __create_db(self, instance_name, db_name):
+        # type: (str, str) -> None
+        self.log.info('Creating database "%s" in Cloud SQL instance "%s"...', db_name,
+                      instance_name)
+        BaseGcpIntegrationTestCase \
+            .execute_cmd(['gcloud', 'sql', 'databases', 'create', db_name, '-i',
+                          instance_name, '--quiet'])
+        self.log.info('... Done.')
+
+    def __write_to_file(self, filepath, content):
+        # type: (str, bytes) -> None
+        # https://stackoverflow.com/a/12517490
+        self.log.info("Checking file under: %s", filepath)
+        if not os.path.exists(os.path.dirname(filepath)):
+            self.log.info("File doesn't exits. Creating dir...")
+            try:
+                os.makedirs(os.path.dirname(filepath))
+            except OSError as exc:  # Guard against race condition
+                self.log.info("Error while creating dir.")
+                if exc.errno != errno.EEXIST:
+                    raise
+        self.log.info("... Done. Dir created.")
+
+        with open(filepath, "w") as f:
+            f.write(str(content.decode('utf-8')))
+        self.log.info('Written file in: %s', filepath)
+
+    def __remove_keys_and_certs(self, filepaths):
+        if not len(filepaths):
+            return
+        self.log.info('Removing client keys and certs...')
+
+        for filepath in filepaths:
+            if os.path.exists(filepath):
+                os.remove(filepath)
+        self.log.info('Done ...')
+
+    def __delete_client_cert(self, instance_name, common_name):
+        self.log.info('Deleting client key and cert for "%s"...', instance_name)
+        BaseGcpIntegrationTestCase \
+            .execute_cmd(['gcloud', 'sql', 'ssl', common_name, 'delete', 'client-cert',
+                          '-i', instance_name, '--quiet'])
+        self.log.info('... Done.')
+
+    def __create_client_cert(self, instance_name, client_key_file, common_name):
+        self.log.info('Creating client key and cert for "%s"...', instance_name)
+        BaseGcpIntegrationTestCase \
+            .execute_cmd(['gcloud', 'sql', 'ssl', 'client-certs', 'create', common_name,
+                          client_key_file, '-i', instance_name])
+        self.log.info('... Done.')
+
+    @staticmethod
+    def __get_operation_name(instance_name):
+        # type: (str) -> str
+        op_name_bytes = BaseGcpIntegrationTestCase \
+            .check_output(['gcloud', 'sql', 'operations', 'list', '-i',
+                           instance_name, '--format=get(name)'])
+        return op_name_bytes.decode('utf-8').strip()
+
+    def __print_operations(self, operations):
+        self.log.info("\n==== OPERATIONS >>>>")
+        self.log.info(operations)
+        self.log.info("<<<< OPERATIONS ====\n")
+
+    def __wait_for_operations(self, instance_name):
+        # type: (str) -> None
+        while True:
+            operations = self.__get_operations(instance_name)
+            self.__print_operations(operations)
+            if "RUNNING" in operations:
+                self.log.info("Found a running operation. Sleeping 5s before retrying...")
+                time.sleep(5)
+            else:
+                break
+
+    @staticmethod
+    def __get_ip_address(instance_name, env_var):
+        # type: (str, str) -> str
+        ip = BaseGcpIntegrationTestCase \
+            .check_output(['gcloud', 'sql', 'instances', 'describe',
+                           instance_name,
+                           '--format=get(ipAddresses[0].ipAddress)']
+                          ).decode('utf-8').strip()
+        return "{}={}".format(env_var, ip)
+
+    @staticmethod
+    def __get_operations(instance_name):
+        # type: (str) -> str
+        op_name_bytes = BaseGcpIntegrationTestCase \
+            .check_output(['gcloud', 'sql', 'operations', 'list', '-i',
+                           instance_name, '--format=get(NAME,TYPE,STATUS)'])
+        return op_name_bytes.decode('utf-8').strip()
+
+    @staticmethod
+    def __wait_for_create(operation_name):
+        # type: (str) -> None
+        BaseGcpIntegrationTestCase \
+            .execute_cmd(['gcloud', 'beta', 'sql', 'operations', 'wait',
+                          '--project', GCP_PROJECT_ID, operation_name])
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(
+        description='Create or delete Cloud SQL instances for system tests.')
+    parser.add_argument('--action', dest='action', required=True,
+                        choices=('create', 'delete', 'setup_instances'))
+    action = parser.parse_args().action
+
+    helper = CloudSqlQueryTestHelper()
+    logging.info('Starting action: {}'.format(action))
+
+    if action == 'create':
+        helper.create_instances()
+        helper.setup_instances()
+    elif action == 'delete':
+        helper.delete_instances()
+    elif action == 'setup_instances':
+        helper.setup_instances()
+    else:
+        raise Exception("Unknown action: {}".format(action))
+
+    logging.info('Finishing action: {}'.format(action))
diff --git a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
index 2bf51c0707..16db49fb5a 100644
--- a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
+++ b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
@@ -35,7 +35,7 @@
 TASK_ID = 'test-s3-gcs-transfer-operator'
 S3_BUCKET = 'test-s3-bucket'
 GCS_BUCKET = 'test-gcs-bucket'
-PROJECT_ID = 'test-project'
+GCP_PROJECT_ID = 'test-project'
 DESCRIPTION = 'test-description'
 ACCESS_KEY = 'test-access-key'
 SECRET_KEY = 'test-secret-key'
@@ -57,14 +57,14 @@ def test_constructor(self):
             task_id=TASK_ID,
             s3_bucket=S3_BUCKET,
             gcs_bucket=GCS_BUCKET,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             description=DESCRIPTION,
         )
 
         self.assertEqual(operator.task_id, TASK_ID)
         self.assertEqual(operator.s3_bucket, S3_BUCKET)
         self.assertEqual(operator.gcs_bucket, GCS_BUCKET)
-        self.assertEqual(operator.project_id, PROJECT_ID)
+        self.assertEqual(operator.project_id, GCP_PROJECT_ID)
         self.assertEqual(operator.description, DESCRIPTION)
 
     @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.GCPTransferServiceHook')
@@ -76,7 +76,7 @@ def test_execute(self, mock_s3_hook, mock_transfer_hook):
             task_id=TASK_ID,
             s3_bucket=S3_BUCKET,
             gcs_bucket=GCS_BUCKET,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             description=DESCRIPTION,
             schedule=SCHEDULE,
         )
@@ -89,7 +89,7 @@ def test_execute(self, mock_s3_hook, mock_transfer_hook):
         operator.execute(None)
 
         mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             description=DESCRIPTION,
             schedule=SCHEDULE,
             transfer_spec={
@@ -121,7 +121,7 @@ def test_execute_skip_wait(self, mock_s3_hook, mock_transfer_hook):
             task_id=TASK_ID,
             s3_bucket=S3_BUCKET,
             gcs_bucket=GCS_BUCKET,
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             description=DESCRIPTION,
             wait=False,
         )
@@ -134,7 +134,7 @@ def test_execute_skip_wait(self, mock_s3_hook, mock_transfer_hook):
         operator.execute(None)
 
         mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
-            project_id=PROJECT_ID,
+            project_id=GCP_PROJECT_ID,
             description=DESCRIPTION,
             schedule=None,
             transfer_spec={


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services