You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/09 09:04:37 UTC

[airflow] 24/36: Kubernetes Cluster is started on host not in the container (#8265)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8dacc27fb2d75493063e0a0f1e3ee4951c366d8d
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Wed Jun 3 20:58:38 2020 +0200

    Kubernetes Cluster is started on host not in the container (#8265)
    
    Tests requiring Kubernetes Cluster are now moved out of
    the regular CI tests and moved to "kubernetes_tests" folder
    so that they can be run entirely on host without having
    the CI image built at all. They use production image
    to run the tests on KinD cluster and we add tooling
    to start/stop/deploy the application to the KinD cluster
    automatically - for both CI testing and local development.
    
    This is a pre-requisite to convert the tests to convert the
    tests to use the official Helm Chart and Docker images or
    Apache Airflow.
    
    It closes #8782
    
    (cherry picked from commit ff5dcccbbd49e7a4632f93fa915565ac31730110)
---
 .github/workflows/ci.yml                           |  37 +-
 .gitignore                                         |   4 +-
 BREEZE.rst                                         | 223 +++++++--
 Dockerfile.ci                                      |  17 +-
 IMAGES.rst                                         |   4 -
 README.md                                          |   2 +
 TESTING.rst                                        | 230 +++++++---
 airflow/kubernetes/refresh_config.py               |   1 -
 airflow/kubernetes/volume_mount.py                 |  36 +-
 breeze                                             | 211 +++++----
 breeze-complete                                    |  12 +-
 .../kubernetes => kubernetes_tests}/__init__.py    |   0
 .../test_kubernetes_executor.py                    | 101 +++--
 .../test_kubernetes_pod_operator.py                | 432 +++++++++++++-----
 scripts/ci/_utils.sh                               | 503 ++++++++++++++++++++-
 ...bootstrap.sh => ci_deploy_app_to_kubernetes.sh} |  25 +-
 .../run_ci_tests.sh => ci_load_image_to_kind.sh}   |  40 +-
 ...rap.sh => ci_perform_kind_cluster_operation.sh} |  15 +-
 scripts/ci/ci_run_airflow_testing.sh               |  33 +-
 scripts/ci/ci_run_kubernetes_tests.sh              | 105 +++++
 scripts/ci/docker-compose/base.yml                 |   1 -
 scripts/ci/docker-compose/local.yml                |   1 +
 scripts/ci/in_container/_in_container_utils.sh     |  28 --
 scripts/ci/in_container/entrypoint_ci.sh           |  73 +--
 .../ci/in_container/kubernetes/app/deploy_app.sh   | 210 ---------
 .../kubernetes/docker/rebuild_airflow_image.sh     |  74 ---
 .../in_container/kubernetes/setup_kind_cluster.sh  | 187 --------
 scripts/ci/in_container/run_ci_tests.sh            |   4 -
 .../kubernetes/app/postgres.yaml                   |   0
 .../{in_container => }/kubernetes/app/secrets.yaml |   0
 .../kubernetes/app/templates/airflow.template.yaml |  66 ++-
 .../app/templates/configmaps.template.yaml         |   0
 .../app/templates/init_git_sync.template.yaml      |  38 +-
 .../{in_container => }/kubernetes/app/volumes.yaml |   0
 .../docker/airflow-test-env-init-dags.sh}          |  21 +-
 .../docker/airflow-test-env-init-db.sh}            |  39 +-
 scripts/ci/kubernetes/docker/bootstrap.sh          |  74 +++
 .../kubernetes/kind-cluster-conf.yaml              |   0
 tests/conftest.py                                  |  37 --
 .../kubernetes/operators/test_kubernetes_pod.py    | 134 ++++++
 tests/runtime/__init__.py                          |  18 -
 41 files changed, 1909 insertions(+), 1127 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index afa911c..c48a7ac 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -15,7 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
 ---
 name: CI Build
 on:
@@ -23,7 +22,9 @@ on:
     branches: ['master', 'v1-10-test', 'v1-10-stable']
   pull_request:
     branches: ['master', 'v1-10-test', 'v1-10-stable']
+
 env:
+
   MOUNT_LOCAL_SOURCES: "false"
   FORCE_ANSWER_TO_QUESTIONS: "yes"
   SKIP_CHECK_REMOTE_IMAGE: "true"
@@ -36,6 +37,7 @@ env:
   CACHE_IMAGE_PREFIX: ${{ github.repository }}
   CACHE_REGISTRY_USERNAME: ${{ github.actor }}
   CACHE_REGISTRY_PASSWORD: ${{ secrets.GITHUB_TOKEN }}
+
 jobs:
 
   static-checks:
@@ -51,9 +53,9 @@ jobs:
         with:
           python-version: '3.x'
       - name: Cache pre-commit env
-        uses: actions/cache@v1
+        uses: actions/cache@v2
         env:
-          cache-name: cache-pre-commit-epoch2
+          cache-name: cache-pre-commit-v2
         with:
           path: ~/.cache/pre-commit
           key: ${{ env.cache-name }}-${{ github.job }}-${{ hashFiles('.pre-commit-config.yaml') }}
@@ -98,15 +100,15 @@ jobs:
 
   tests-kubernetes:
     timeout-minutes: 80
-    name: "kubernetes-test-${{matrix.kube-mode}}-kube"
+    name: "K8s: ${{matrix.kube-mode}} ${{matrix.python-version}} ${{matrix.kubernetes-version}}"
     runs-on: ubuntu-latest
     needs: [static-checks]
     strategy:
       matrix:
+        python-version: [3.6, 3.7]
         kube-mode:
-          - persistent_mode
-#          #TODO These tests will be unblocked when k8s tests run on host
-#          - git_mode
+          - image
+          - git
         kubernetes-version:
           - "v1.15.3"
       fail-fast: false
@@ -115,12 +117,14 @@ jobs:
       TEST_TYPE: ${{ matrix.test-type }}
       RUN_TESTS: "true"
       CI_JOB_TYPE: "Tests"
-      PYTHON_MAJOR_MINOR_VERSION: "3.6"
       SKIP_CI_IMAGE_CHECK: "true"
       RUNTIME: "kubernetes"
       ENABLE_KIND_CLUSTER: "true"
+      PYTHON_MAJOR_MINOR_VERSION: "${{ matrix.python-version }}"
       KUBERNETES_MODE: "${{ matrix.kube-mode }}"
       KUBERNETES_VERSION: "${{ matrix.kubernetes-version }}"
+    # For pull requests only run tests when python files changed
+    if: needs.pyfiles.outputs.count != '0' || github.event_name != 'pull_request'
     steps:
       - uses: actions/checkout@master
       - uses: actions/setup-python@v1
@@ -128,12 +132,22 @@ jobs:
           python-version: '3.x'
       - name: "Free space"
         run: ./scripts/ci/ci_free_space_on_ci.sh
-      - name: "Build CI image ${{ matrix.python-version }}"
-        run: ./scripts/ci/ci_prepare_image_on_ci.sh
       - name: "Build PROD image ${{ matrix.python-version }}"
         run: ./scripts/ci/ci_build_production_images.sh
+      - name: "Setup KinD cluster"
+        run: ./scripts/ci/ci_perform_kind_cluster_operation.sh start
+      - name: "Deploy app to cluster"
+        run: ./scripts/ci/ci_deploy_app_to_kubernetes.sh
+      - name: Cache virtualenv for kubernetes testing
+        uses: actions/cache@v2
+        env:
+          cache-name: cache-kubernetes-tests-virtualenv-v2
+        with:
+          path: .build/.kubernetes_venv
+          key: "${{ env.cache-name }}-${{ github.job }}-\
+${{ hashFiles('requirements/requirements-python${{matrix.python-version}}.txt') }}"
       - name: "Tests"
-        run: ./scripts/ci/ci_run_airflow_testing.sh
+        run: ./scripts/ci/ci_run_kubernetes_tests.sh
 
   pyfiles:
     timeout-minutes: 10
@@ -149,7 +163,6 @@ jobs:
           ./scripts/ci/ci_count_changed_files.sh ${GITHUB_SHA} '\.py$|.github/workflows/|^Dockerfile'
           echo "::set-output name=count::$?"
         id: pyfiles
-
   tests-postgres:
     timeout-minutes: 80
     name: "${{matrix.test-type}}:Pg${{matrix.postgres-version}},Py${{matrix.python-version}}"
diff --git a/.gitignore b/.gitignore
index 42c4dbe..4eca815 100644
--- a/.gitignore
+++ b/.gitignore
@@ -154,8 +154,8 @@ rat-results.txt
 # Kubernetes generated templated files
 *.generated
 *.tar.gz
-scripts/ci/in_container/kubernetes/kube/.generated/airflow.yaml
-scripts/ci/in_container/kubernetes/docker/requirements.txt
+scripts/ci/kubernetes/kube/.generated/airflow.yaml
+scripts/ci/kubernetes/docker/requirements.txt
 
 # Node & Webpack Stuff
 *.entry.js
diff --git a/BREEZE.rst b/BREEZE.rst
index 0d90763..7648b9d 100644
--- a/BREEZE.rst
+++ b/BREEZE.rst
@@ -245,7 +245,7 @@ Manage environments - CI (default) or Production - if ``--production-image`` fla
     * Build docker images with ``breeze build-image`` command
     * Enter interactive shell when no command are specified (default behaviour)
     * Join running interactive shell with ``breeze exec`` command
-    * Start Kind Kubernetes cluster for Kubernetes tests if ``--start-kind-cluster`` flag is specified
+    * Start/stops/restarts Kind Kubernetes cluster with ``kind-cluster`` command
     * Stop running interactive environment with ``breeze stop`` command
     * Restart running interactive environment with ``breeze restart`` command
     * Optionally reset database if specified as extra ``--db-reset`` flag
@@ -669,6 +669,7 @@ This is the current syntax for  `./breeze <./breeze>`_:
     generate-requirements                    Generates pinned requirements for pip dependencies
     push-image                               Pushes images to registry
     initialize-local-virtualenv              Initializes local virtualenv
+    kind-cluster                             Manages KinD cluster on the host
     setup-autocomplete                       Sets up autocomplete for breeze
     stop                                     Stops the docker-compose environment
     restart                                  Stops the docker-compose environment including DB cleanup
@@ -679,6 +680,7 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
     docker-compose                <ARG>      Executes specified docker-compose command
     execute-command               <ARG>      Executes specified command in the container
+    kind-cluster                  <ARG>      Manages KinD cluster on the host
     static-check                  <ARG>      Performs selected static check for changed files
     static-check-all-files        <ARG>      Performs selected static check for all files
     test-target                   <ARG>      Runs selected test target in the container
@@ -698,7 +700,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: shell
 
-  breeze [FLAGS] shell -- <EXTRA_ARGS>
+
+  breeze shell [FLAGS] -- <EXTRA_ARGS>
 
         This is default subcommand if no subcommand is used.
 
@@ -725,7 +728,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: build-docs
 
-  breeze [FLAGS] build-docs -- <EXTRA_ARGS>
+
+  breeze build-docs
 
         Builds Airflow documentation. The documentation is build inside docker container - to
         maintain the same build environment for everyone. Appropriate sources are mapped from
@@ -739,7 +743,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: build-image
 
-  breeze [FLAGS] build-image -- <EXTRA_ARGS>
+
+  breeze build-image [FLAGS]
 
         Builds docker image (CI or production) without entering the container. You can pass
         additional options to this command, such as '--force-build-image',
@@ -829,7 +834,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: cleanup-image
 
-  breeze [FLAGS] cleanup-image -- <EXTRA_ARGS>
+
+  breeze cleanup-image [FLAGS]
 
         Removes the breeze-related images created in your local docker image cache. This will
         not reclaim space in docker cache. You need to 'docker system prune' (optionally
@@ -857,7 +863,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: exec
 
-  breeze [FLAGS] exec -- <EXTRA_ARGS>
+
+  breeze exec
 
         Execs into interactive shell to an already running container. The container mus be started
         already by breeze shell command. If you are not familiar with tmux, this is the best
@@ -870,7 +877,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: generate-requirements
 
-  breeze [FLAGS] generate-requirements -- <EXTRA_ARGS>
+
+  breeze generate-requirements [FLAGS]
 
         Generates pinned requirements from setup.py. Those requirements are generated in requirements
         directory - separately for different python version. Those requirements are used to run
@@ -897,7 +905,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: push-image
 
-  breeze [FLAGS] push-image -- <EXTRA_ARGS>
+
+  breeze push_image [FLAGS]
 
         Pushes images to docker registry. You can push the images to DockerHub registry (default)
         or to the GitHub cache registry (if --registry-cache flag is used).
@@ -946,7 +955,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: initialize-local-virtualenv
 
-  breeze [FLAGS] initialize-local-virtualenv -- <EXTRA_ARGS>
+
+  breeze initialize-local-virtualenv [FLAGS]
 
         Initializes locally created virtualenv installing all dependencies of Airflow
         taking into account the frozen requirements from requirements folder.
@@ -966,9 +976,70 @@ This is the current syntax for  `./breeze <./breeze>`_:
   ####################################################################################################
 
 
+  Detailed usage for command: kind-cluster
+
+
+  breeze kind-cluster [FLAGS] OPERATION
+
+        Manages host-side Kind Kubernetes cluster that is used to run Kubernetes integration tests.
+        It allows to start/stop/restart/status the Kind Kubernetes cluster and deploy Airflow to it.
+        This enables you to run tests inside the breeze environment with latest airflow images loaded.
+        Note that in case of deploying airflow, the first step is to rebuild the image and loading it
+        to the cluster so you can also pass appropriate build image flags that will influence
+        rebuilding the production image. Operation is one of:
+
+                 start stop restart status deploy test
+
+  Flags:
+
+  -p, --python <PYTHON_MAJOR_MINOR_VERSION>
+          Python version used for the image. This is always major/minor version.
+          One of:
+
+                 2.7 3.5 3.6 3.7
+
+  -F, --force-build-images
+          Forces building of the local docker images. The images are rebuilt
+          automatically for the first time or when changes are detected in
+          package-related files, but you can force it using this flag.
+
+  -P, --force-pull-images
+          Forces pulling of images from DockerHub before building to populate cache. The
+          images are pulled by default only for the first time you run the
+          environment, later the locally build images are used as cache.
+
+  -E, --extras
+          Extras to pass to build images The default are different for CI and production images:
+
+          CI image:
+                 devel_ci
+
+          Production image:
+                 async,aws,azure,celery,dask,elasticsearch,gcp,kubernetes,mysql,postgres,redis,slack,
+                 ssh,statsd,virtualenv
+
+  --additional-extras
+          Additional extras to pass to build images The default is no additional extras.
+
+  --additional-python-deps
+          Additional python dependencies to use when building the images.
+
+  -C, --force-clean-images
+          Force build images with cache disabled. This will remove the pulled or build images
+          and start building images from scratch. This might take a long time.
+
+  -L, --use-local-cache
+          Uses local cache to build images. No pulled images will be used, but results of local
+          builds in the Docker cache are used instead.
+
+
+  ####################################################################################################
+
+
   Detailed usage for command: setup-autocomplete
 
-  breeze [FLAGS] setup-autocomplete -- <EXTRA_ARGS>
+
+  breeze setup-autocomplete
 
         Sets up autocomplete for breeze commands. Once you do it you need to re-enter the bash
         shell and when typing breeze command <TAB> will provide autocomplete for
@@ -980,7 +1051,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: stop
 
-  breeze [FLAGS] stop -- <EXTRA_ARGS>
+
+  breeze stop
 
         Brings down running docker compose environment. When you start the environment, the docker
         containers will continue running so that startup time is shorter. But they take quite a lot of
@@ -992,7 +1064,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: restart
 
-  breeze [FLAGS] restart -- <EXTRA_ARGS>
+
+  breeze restart [FLAGS]
 
         Restarts running docker compose environment. When you restart the environment, the docker
         containers will be restarted. That includes cleaning up the databases. This is
@@ -1008,7 +1081,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: toggle-suppress-cheatsheet
 
-  breeze [FLAGS] toggle-suppress-cheatsheet -- <EXTRA_ARGS>
+
+  breeze toggle-suppress-cheatsheet
 
         Toggles on/off cheatsheet displayed before starting bash shell.
 
@@ -1018,7 +1092,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: toggle-suppress-asciiart
 
-  breeze [FLAGS] toggle-suppress-asciiart -- <EXTRA_ARGS>
+
+  breeze toggle-suppress-asciiart
 
         Toggles on/off asciiart displayed before starting bash shell.
 
@@ -1028,7 +1103,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: docker-compose
 
-  breeze [FLAGS] docker-compose <DOCKER_COMPOSE_COMMAND> -- <EXTRA_ARGS>
+
+  breeze docker-compose [FLAGS] COMMAND -- <EXTRA_ARGS>
 
         Run docker-compose command instead of entering the environment. Use 'help' as command
         to see available commands. The <EXTRA_ARGS> passed after -- are treated
@@ -1073,7 +1149,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: execute-command
 
-  breeze [FLAGS] execute-command -- <EXTRA_ARGS>
+
+  breeze execute-command [FLAGS] COMMAND -- <EXTRA_ARGS>
 
         Run chosen command instead of entering the environment. The command is run using
         'bash -c "<command with args>" if you need to pass arguments to your command, you need
@@ -1118,9 +1195,70 @@ This is the current syntax for  `./breeze <./breeze>`_:
   ####################################################################################################
 
 
+  Detailed usage for command: kind-cluster
+
+
+  breeze kind-cluster [FLAGS] OPERATION
+
+        Manages host-side Kind Kubernetes cluster that is used to run Kubernetes integration tests.
+        It allows to start/stop/restart/status the Kind Kubernetes cluster and deploy Airflow to it.
+        This enables you to run tests inside the breeze environment with latest airflow images loaded.
+        Note that in case of deploying airflow, the first step is to rebuild the image and loading it
+        to the cluster so you can also pass appropriate build image flags that will influence
+        rebuilding the production image. Operation is one of:
+
+                 start stop restart status deploy test
+
+  Flags:
+
+  -p, --python <PYTHON_MAJOR_MINOR_VERSION>
+          Python version used for the image. This is always major/minor version.
+          One of:
+
+                 2.7 3.5 3.6 3.7
+
+  -F, --force-build-images
+          Forces building of the local docker images. The images are rebuilt
+          automatically for the first time or when changes are detected in
+          package-related files, but you can force it using this flag.
+
+  -P, --force-pull-images
+          Forces pulling of images from DockerHub before building to populate cache. The
+          images are pulled by default only for the first time you run the
+          environment, later the locally build images are used as cache.
+
+  -E, --extras
+          Extras to pass to build images The default are different for CI and production images:
+
+          CI image:
+                 devel_ci
+
+          Production image:
+                 async,aws,azure,celery,dask,elasticsearch,gcp,kubernetes,mysql,postgres,redis,slack,
+                 ssh,statsd,virtualenv
+
+  --additional-extras
+          Additional extras to pass to build images The default is no additional extras.
+
+  --additional-python-deps
+          Additional python dependencies to use when building the images.
+
+  -C, --force-clean-images
+          Force build images with cache disabled. This will remove the pulled or build images
+          and start building images from scratch. This might take a long time.
+
+  -L, --use-local-cache
+          Uses local cache to build images. No pulled images will be used, but results of local
+          builds in the Docker cache are used instead.
+
+
+  ####################################################################################################
+
+
   Detailed usage for command: static-check
 
-  breeze [FLAGS] static-check <STATIC_CHECK> -- <EXTRA_ARGS>
+
+  breeze static-check [FLAGS] STATIC_CHECK
 
         Run selected static checks for currently changed files. You should specify static check that
         you would like to run or 'all' to run all checks. One of:
@@ -1149,7 +1287,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: static-check-all-files
 
-  breeze [FLAGS] static-check-all-files <STATIC_CHECK> -- <EXTRA_ARGS>
+
+  breeze static-check-all [FLAGS] STATIC_CHECK
 
         Run selected static checks for all applicable files. You should specify static check that
         you would like to run or 'all' to run all checks. One of:
@@ -1178,7 +1317,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: test-target
 
-  breeze [FLAGS] test-target <TEST_TARGET> -- <EXTRA_ARGS>
+
+  breeze test-target [FLAGS] TEST_TARGET -- <EXTRA_ARGS>
 
         Run the specified unit test target. There might be multiple
         targets specified separated with comas. The <EXTRA_ARGS> passed after -- are treated
@@ -1196,7 +1336,6 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: flags
 
-  breeze [FLAGS] flags -- <EXTRA_ARGS>
 
         Explains in detail all the flags that can be used with breeze.
 
@@ -1206,9 +1345,10 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: help
 
-  breeze [FLAGS] help -- <EXTRA_ARGS>
 
-        Shows this help message.
+  breeze help
+
+        Shows general help message for all commands.
 
 
   ####################################################################################################
@@ -1216,7 +1356,8 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
   Detailed usage for command: help-all
 
-  breeze [FLAGS] help-all -- <EXTRA_ARGS>
+
+  breeze help-all
 
         Shows detailed help for all commands and flags.
 
@@ -1284,43 +1425,23 @@ This is the current syntax for  `./breeze <./breeze>`_:
                  cassandra kerberos mongo openldap rabbitmq redis
 
   ****************************************************************************************************
-   Manage Kind kubernetes cluster (optional)
+   Kind kubernetes and Kubernetes tests configuration(optional)
 
-  Action for the cluster : only one of the --kind-cluster-* flags can be used at a time:
-
-  -s, --kind-cluster-start
-          Starts KinD Kubernetes cluster after entering the environment. The cluster is started using
-          Kubernetes Mode selected and Kubernetes version specified via --kubernetes-mode and
-          --kubernetes-version flags.
-
-  -x, --kind-cluster-stop
-          Stops KinD Kubernetes cluster if one has already been created. By default, if you do not
-          stop environment, the Kubernetes cluster created for testing is continuously running and
-          when you start Kubernetes testing again it will be reused. You can force deletion and
-          recreation of such cluster with this flag.
-
-  -r, --kind-cluster-recreate
-
-          Recreates KinD Kubernetes cluster if one has already been created. By default, if you do
-          not stop environment, the Kubernetes cluster created for testing is continuously running
-          and when you start Kubernetes testing again it will be reused. You can force deletion and
-          recreation of such cluster with this flag.
-
-  Kubernetes mode/version flags:
+  Configuration for the KinD Kubernetes cluster and tests:
 
   -K, --kubernetes-mode <KUBERNETES_MODE>
           Kubernetes mode - only used in case one of --kind-cluster-* commands is used.
           One of:
 
-                 persistent_mode git_mode
+                 image git
 
-          Default: git_mode
+          Default: image
 
   -V, --kubernetes-version <KUBERNETES_VERSION>
           Kubernetes version - only used in case one of --kind-cluster-* commands is used.
           One of:
 
-                 v1.15.3 v1.16.2
+                 v1.15.3
 
           Default: v1.15.3
 
@@ -1428,6 +1549,12 @@ This is the current syntax for  `./breeze <./breeze>`_:
           Note that you can further increase verbosity and see all the commands executed by breeze
           by running 'export VERBOSE_COMMANDS="true"' before running breeze.
 
+  ****************************************************************************************************
+   Print detailed help message
+
+  -h, --help
+          Shows detailed help message for the command specified.
+
  .. END BREEZE HELP MARKER
 
 Convenience Scripts
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 15038a5..1c93f27 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -154,20 +154,6 @@ RUN curl --fail --location https://download.docker.com/linux/debian/gpg | apt-ke
     && apt-get autoremove -yqq --purge \
     && apt-get clean && rm -rf /var/lib/apt/lists/*
 
-# Install kubectl
-ARG KUBECTL_VERSION="v1.15.3"
-
-RUN KUBECTL_URL="https://storage.googleapis.com/kubernetes-release/release/${KUBECTL_VERSION}/bin/linux/amd64/kubectl" \
-  && curl --fail --location  "${KUBECTL_URL}" --output "/usr/local/bin/kubectl" \
-  && chmod +x /usr/local/bin/kubectl
-
-# Install Kind
-ARG KIND_VERSION="v0.6.1"
-
-RUN KIND_URL="https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-linux-amd64" \
-   && curl --fail --location "${KIND_URL}" --output "/usr/local/bin/kind" \
-   && chmod +x /usr/local/bin/kind
-
 # Setup PIP
 # By default PIP install run without cache to make image smaller
 ARG PIP_NO_CACHE_DIR="true"
@@ -342,8 +328,7 @@ COPY Dockerfile.ci ${AIRFLOW_SOURCES}/Dockerfile.ci
 RUN register-python-argcomplete airflow >> ~/.bashrc
 
 # Install autocomplete for Kubeclt
-RUN echo "source /etc/bash_completion" >> ~/.bashrc \
-    && kubectl completion bash >> ~/.bashrc
+RUN echo "source /etc/bash_completion" >> ~/.bashrc
 
 WORKDIR ${AIRFLOW_SOURCES}
 
diff --git a/IMAGES.rst b/IMAGES.rst
index 3038bfe..37b8d03 100644
--- a/IMAGES.rst
+++ b/IMAGES.rst
@@ -160,10 +160,6 @@ The following build arguments (``--build-arg`` in docker build command) can be u
 | ``DEPENDENCIES_EPOCH_NUMBER``            | ``2``                                    | increasing this number will reinstall    |
 |                                          |                                          | all apt dependencies                     |
 +------------------------------------------+------------------------------------------+------------------------------------------+
-| ``KUBECTL_VERSION``                      | ``v1.15.3``                              | version of kubectl installed             |
-+------------------------------------------+------------------------------------------+------------------------------------------+
-| ``KIND_VERSION``                         | ``v0.6.1``                               | version of kind installed                |
-+------------------------------------------+------------------------------------------+------------------------------------------+
 | ``PIP_NO_CACHE_DIR``                     | ``true``                                 | if true, then no pip cache will be       |
 |                                          |                                          | stored                                   |
 +------------------------------------------+------------------------------------------+------------------------------------------+
diff --git a/README.md b/README.md
index a0dc473..ad8915b 100644
--- a/README.md
+++ b/README.md
@@ -65,6 +65,7 @@ Apache Airflow is tested with:
 * Postgres DB: 9.6, 10
 * MySQL DB: 5.7
 * Sqlite - latest stable (it is used mainly for development purpose)
+* Kubernetes - 1.16.2, 1.17.0
 
 ### Stable version (1.10.9)
 
@@ -72,6 +73,7 @@ Apache Airflow is tested with:
 * Postgres DB: 9.6, 10
 * MySQL DB: 5.6, 5.7
 * Sqlite - latest stable (it is used mainly for development purpose)
+* Kubernetes - 1.16.2, 1.17.0
 
 ## Getting started
 Please visit the Airflow Platform documentation (latest **stable** release) for help with [installing Airflow](https://airflow.apache.org/installation.html), getting a [quick start](https://airflow.apache.org/start.html), or a more complete [tutorial](https://airflow.apache.org/tutorial.html).
diff --git a/TESTING.rst b/TESTING.rst
index 3c53fdd..fc8a046 100644
--- a/TESTING.rst
+++ b/TESTING.rst
@@ -325,98 +325,224 @@ Those tests are skipped by default. You can enable them with ``--include-quarant
 can also decide to only run tests with ``-m quarantined`` flag to run only those tests.
 
 Running Tests with Kubernetes
------------------------------
+=============================
+
+Starting Kubernetes Cluster
+---------------------------
+
+For your testing you manage Kind cluster with ``kind-cluster`` breeze command:
+
+.. code-block:: bash
+
+    ./breeze kind-cluster [ start | stop | recreate | status ]
+
 
-Starting Kubernetes Cluster when Starting Breeze
-................................................
+The command allows you to start/stop/recreate/status Kind Kubernetes cluster
+in your docker engine as well as deploy airflow to use it for testing (with ``deploy`` command).
 
-To run Kubernetes in Breeze, you can start Breeze with the ``--kind-cluster-start`` switch. This
-automatically creates a Kind Kubernetes cluster in the same ``docker`` engine that is used to run Breeze.
-Setting up the Kubernetes cluster takes some time so the cluster continues running
-until it is stopped with the ``--kind-cluster-stop`` switch or until the ``--kind-cluster-recreate``
-switch is used rather than ``--kind-cluster-start``. Starting Breeze with the Kind Cluster automatically
-sets ``runtime`` to ``kubernetes`` (see below).
+Setting up the Kind Kubernetes cluster takes some time so once you started it, the cluster continues running
+until it is stopped with the ``kind-cluster stop`` command or until ``kind-cluster recreate``
+command is used (it will stop and recreate the cluster image).
 
-The cluster name follows the pattern ``airflow-python-X.Y.Z-vA.B.C`` where X.Y.Z is a Python version
+The cluster name follows the pattern ``airflow-python-X.Y-vA.B.C`` where X.Y is a Python version
 and A.B.C is a Kubernetes version. This way you can have multiple clusters set up and running at the same
 time for different Python versions and different Kubernetes versions.
 
-The Control Plane is available from inside the Docker image via ``<CLUSTER_NAME>-control-plane:6443``
-host:port, the worker of the Kind Cluster is available at  <CLUSTER_NAME>-worker
+The Control Plane is available from inside the Docker image via ``<KIND_CLUSTER_NAME>-control-plane:6443``
+host:port, the worker of the Kind Cluster is available at  <KIND_CLUSTER_NAME>-worker
 and the webserver port for the worker is 30809.
 
-After the Kubernetes Cluster is started, you need to deploy Airflow to the cluster:
+Deploying Airflow to Kubernetes Cluster
+---------------------------------------
 
-1. Build the image.
-2. Load it to the Kubernetes cluster.
-3. Deploy the Airflow application.
+Deploying Airflow to the Kubernetes cluster created is also done via ``kind-cluster`` breeze command:
 
-It can be done with a single script: ``./scripts/ci/in_container/kubernetes/deploy_airflow_to_kubernetes.sh``.
+.. code-block:: bash
 
-You can, however, work separately on the image in Kubernetes and deploying the Airflow app in the cluster.
+    ./breeze kind-cluster deploy
 
-Building and Loading Airflow Images to Kubernetes Cluster
-..............................................................
 
-Use the script ``./scripts/ci/in_container/kubernetes/docker/rebuild_airflow_image.sh`` that does the following:
+The deploy commands performs tthose steps:
 
-1. Rebuilds the latest ``apache/airflow:master-pythonX.Y-ci`` images using the latest sources.
-2. Builds a new Kubernetes image based on the  ``apache/airflow:master-pythonX.Y-ci`` using
+1. If needed, it rebuilds the latest ``apache/airflow:master-pythonX.Y`` production images using the
+   latest sources. You can also force the build with ``--force-build-image`` flag.
+2. Builds a new Kubernetes image based on the  ``apache/airflow:master-pythonX.Y`` using
    necessary scripts added to run in Kubernetes. The image is tagged as
-   ``apache/airflow:master-pythonX.Y-ci-kubernetes``.
+   ``apache/airflow:master-pythonX.Y-kubernetes``.
 3. Loads the image to the Kind Cluster using the ``kind load`` command.
-
-Deploying the Airflow Application in the Kubernetes Cluster
-...........................................................
-
-Use the script ``./scripts/ci/in_container/kubernetes/app/deploy_app.sh`` that does the following:
-
-1. Prepares Kubernetes resources by processing a template from the ``template`` directory and replacing
+4. Prepares Kubernetes resources by processing a template from the ``template`` directory and replacing
    variables with the right images and locations:
    - configmaps.yaml
    - airflow.yaml
-2. Uses the existing resources without replacing any variables inside:
+5. Uses the existing resources without replacing any variables inside:
    - secrets.yaml
    - postgres.yaml
    - volumes.yaml
-3. Applies all the resources to the Kind Cluster.
-4. Waits for all the applications to be ready and reachable.
+6. Applies all the resources to the Kind Cluster.
+7. Waits for all the applications to be ready and reachable.
 
-After the deployment is finished, you can run Kubernetes tests immediately in the same way as other tests.
-The Kubernetes tests are available in the ``tests/runtime/kubernetes`` folder.
 
-You can run all the integration tests for Kubernetes with ``pytest tests/runtime/kubernetes``.
+Running tests with Kubernetes Cluster
+-------------------------------------
 
+After the deployment is finished, you can run Kubernetes tests via ``scripts/ci/ci_run_kubernetes_tests.sh``.
 
-Running Runtime-Specific Tests
-------------------------------
+You can either run all tests or you can select which tests to run. You can also enter interactive virtualenv
+to run the tests manually one by one.
 
-Tests using a specific runtime are marked with a custom pytest marker ``pytest.mark.runtime``.
-The marker has a single parameter - the name of a runtime. At the moment the only supported runtime is
-``kubernetes``. This runtime is set when you run Breeze with one of the ``--kind-cluster-*`` flags.
-Runtime-specific tests run only when the selectd runtime is started.
 
+.. code-block:: bash
 
-.. code-block:: python
+    Running kubernetes tests
+
+      ./scripts/ci/ci_run_kubernetes_tests.sh                      - runs all kubernetes tests
+      ./scripts/ci/ci_run_kubernetes_tests.sh TEST [TEST ...]      - runs selected kubernetes tests (from kubernetes_tests folder)
+      ./scripts/ci/ci_run_kubernetes_tests.sh [-i|--interactive]   - Activates virtual environment ready to run tests and drops you in
+      ./scripts/ci/ci_run_kubernetes_tests.sh [--help]             - Prints this help message
+
+
+Typical testing pattern for Kubernetes tests
+--------------------------------------------
+
+The typical session for tests with Kubernetes looks like follows:
+
+1. Start the Kind cluster:
+
+.. code-block:: bash
+
+    ./breeze kind-cluster start
+
+    Starts Kind Kubernetes cluster
+
+       Use CI image.
+
+       Branch name:             master
+       Docker image:            apache/airflow:master-python3.7-ci
+
+       Airflow source version:  2.0.0.dev0
+       Python version:          3.7
+       DockerHub user:          apache
+       DockerHub repo:          airflow
+       Backend:                 postgres 9.6
+
+    No kind clusters found.
+
+    Creating cluster
+
+    Creating cluster "airflow-python-3.7-v1.17.0" ...
+     ✓ Ensuring node image (kindest/node:v1.17.0) đŸ–ŧ
+     ✓ Preparing nodes đŸ“Ļ đŸ“Ļ
+     ✓ Writing configuration 📜
+     ✓ Starting control-plane 🕹ī¸
+     ✓ Installing CNI 🔌
+    Could not read storage manifest, falling back on old k8s.io/host-path default ...
+     ✓ Installing StorageClass 💾
+     ✓ Joining worker nodes 🚜
+    Set kubectl context to "kind-airflow-python-3.7-v1.17.0"
+    You can now use your cluster with:
+
+    kubectl cluster-info --context kind-airflow-python-3.7-v1.17.0
+
+    Have a question, bug, or feature request? Let us know! https://kind.sigs.k8s.io/#community 🙂
+
+    Created cluster airflow-python-3.7-v1.17.0
+
+
+2. Check the status of the cluster
+
+.. code-block:: bash
+
+    ./breeze kind-cluster status
+
+    Checks status of Kind Kubernetes cluster
 
-    @pytest.mark.runtime("kubernetes")
-    class TestKubernetesExecutor(unittest.TestCase):
+       Use CI image.
 
+       Branch name:             master
+       Docker image:            apache/airflow:master-python3.7-ci
 
-You can use the custom ``--runtime`` switch in pytest to only run tests specific for that backend.
+       Airflow source version:  2.0.0.dev0
+       Python version:          3.7
+       DockerHub user:          apache
+       DockerHub repo:          airflow
+       Backend:                 postgres 9.6
+
+    airflow-python-3.7-v1.17.0-control-plane
+    airflow-python-3.7-v1.17.0-worker
+
+3. Deploy Airflow to the cluster
+
+.. code-block:: bash
+
+    ./breeze kind-cluster deploy
+
+4. Run Kubernetes tests
+
+Note that the tests are executed in production container not in the CI container.
+There is no need for the tests to run inside the Airflow CI container image as they only
+communicate with the Kubernetes-run Airflow deployed via the production image.
+Those Kubernetes tests require virtualenv to be created locally with airflow installed.
+The virtualenv required will be created automatically when the scripts are run.
+
+
+Either run all the tests:
+
+
+.. code-block:: bash
+
+    ./breeze kind-cluster test
+
+
+Or enter the interactive virtualenv (the environment is in ``.build/.kubernetes_venv`` folder:
 
-To run only kubernetes-runtime backend tests, enter:
 
 .. code-block:: bash
 
-    pytest --runtime kubernetes
+     ./scripts/ci/ci_run_kubernetes_tests.sh -i
+
+
+Once you enter the environment you get this information:
 
-**NOTE:** For convenience and faster search, all runtime tests are stored in the ``tests.runtime`` package. In this case, you
-can speed up the collection of tests by running:
 
 .. code-block:: bash
 
-    pytest --runtime kubernetes tests/runtime
+    Activating the virtual environment for kubernetes testing
+
+    You can run kubernetes testing via 'pytest kubernetes_tests/....'
+    You can add -s to see the output of your tests on screen
+
+    The webserver is available at http://localhost:30809/
+
+    User/password: airflow/airflow
+
+    You are entering the virtualenv now. Type exit to exit back to the original shell
+
+
+You can iterate with tests while you are in the virtualenv:
+
+
+.. code-block:: bash
+
+    pytest kubernetes_tests/test_kubernetes_executor.py::TestKubernetesExecutor::test_integration_run_dag_with_scheduler_failure -s
+
+
+You can modify the tests or KubernetesPodOperator and re-run them without re-deploying
+airflow to KinD cluster.
+
+However when you change the Airflow Kubernetes executor implementation you need to redeploy
+Airflow to the cluster.
+
+.. code-block:: bash
+
+    ./breeze kind-cluster deploy
+
+
+5. Stop KinD cluster when you are done
+
+.. code-block:: bash
+
+    ./breeze kind-cluster stop
+
 
 Airflow System Tests
 ====================
diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py
index b060d25..e7c5c13 100644
--- a/airflow/kubernetes/refresh_config.py
+++ b/airflow/kubernetes/refresh_config.py
@@ -115,5 +115,4 @@ def load_kube_config(client_configuration, config_file=None, context=None):
 
     loader = _get_kube_config_loader_for_yaml_file(
         config_file, active_context=context, config_persister=None)
-
     loader.load_and_set(client_configuration)
diff --git a/airflow/kubernetes/volume_mount.py b/airflow/kubernetes/volume_mount.py
index 74563b4..0dbca5f 100644
--- a/airflow/kubernetes/volume_mount.py
+++ b/airflow/kubernetes/volume_mount.py
@@ -24,26 +24,32 @@ from airflow.kubernetes.k8s_model import K8SModel
 
 
 class VolumeMount(K8SModel):
-    """Defines Kubernetes Volume Mount"""
+    """
+    Initialize a Kubernetes Volume Mount. Used to mount pod level volumes to
+    running container.
 
+    :param name: the name of the volume mount
+    :type name: str
+    :param mount_path:
+    :type mount_path: str
+    :param sub_path: subpath within the volume mount
+    :type sub_path: Optional[str]
+    :param read_only: whether to access pod with read-only mode
+    :type read_only: bool
+    """
     def __init__(self, name, mount_path, sub_path, read_only):
-        """Initialize a Kubernetes Volume Mount. Used to mount pod level volumes to
-        running container.
-        :param name: the name of the volume mount
-        :type name: str
-        :param mount_path:
-        :type mount_path: str
-        :param sub_path: subpath within the volume mount
-        :type sub_path: str
-        :param read_only: whether to access pod with read-only mode
-        :type read_only: bool
-        """
         self.name = name
         self.mount_path = mount_path
         self.sub_path = sub_path
         self.read_only = read_only
 
     def to_k8s_client_obj(self):
+        """
+        Converts to k8s object.
+
+        :return Volume Mount k8s object
+
+        """
         return k8s.V1VolumeMount(
             name=self.name,
             mount_path=self.mount_path,
@@ -52,6 +58,12 @@ class VolumeMount(K8SModel):
         )
 
     def attach_to_pod(self, pod):
+        """
+        Attaches to pod
+
+        :return Copy of the Pod object
+
+        """
         cp_pod = copy.deepcopy(pod)
         volume_mount = self.to_k8s_client_obj()
         cp_pod.spec.containers[0].volume_mounts = pod.spec.containers[0].volume_mounts or []
diff --git a/breeze b/breeze
index fcea655..7fa6e28 100755
--- a/breeze
+++ b/breeze
@@ -120,8 +120,11 @@ function setup_default_breeze_variables() {
     # This can be overridden by '--force-pull-images' flag
     export FORCE_PULL_IMAGES="false"
 
-    # Runtime is empty by default (might be set to kubernetes in case kubernetes is chosen)
-    RUNTIME=""
+    # Do not enable Kind Kubernetes cluster by default
+    export ENABLE_KIND_CLUSTER="false"
+
+    # By default we do not push images. This can be overridden by -u flag.
+    export PUSH_IMAGES=${PUSH_IMAGES:="false"}
 
     # Forward common host credentials to docker (gcloud, aws etc.).
     export FORWARD_CREDENTIALS="false"
@@ -148,11 +151,11 @@ function setup_default_breeze_variables() {
     # Default values for the flags used
 
     _BREEZE_DEFAULT_BACKEND="sqlite"
-    _BREEZE_DEFAULT_KUBERNETES_MODE="git_mode"
+    _BREEZE_DEFAULT_KUBERNETES_MODE="image"
     _BREEZE_DEFAULT_KUBERNETES_VERSION="v1.15.3"
     _BREEZE_DEFAULT_POSTGRES_VERSION="9.6"
+    _BREEZE_DEFAULT_POSTGRES_VERSION="9.6"
     _BREEZE_DEFAULT_MYSQL_VERSION="5.7"
-
     STATIC_CHECK_PYTHON_MAJOR_MINOR_VERSION=3.6
 }
 
@@ -376,16 +379,14 @@ EOF
 EOF
         fi
 
-        if [[ ${RUNTIME} == "kubernetes" ]]; then
+        if [[ ${INTEGRATIONS[*]} == *"kubernetes"* ]]; then
             cat <<EOF
 
-                               Kubernetes RUNTIME
-
-                               Kubernetes mode:    ${KUBERNETES_MODE}
-                               Kubernetes version: ${KUBERNETES_VERSION}
+                               Kubernetes mode:     ${KUBERNETES_MODE}
+                               Kubernetes version:  ${KUBERNETES_VERSION}
 
-                               Enable KinD:        ${ENABLE_KIND_CLUSTER}
-                               Cluster operation:  ${KIND_CLUSTER_OPERATION}
+                               Enable KinD cluster: ${ENABLE_KIND_CLUSTER}
+                               Cluster operation:   ${KIND_CLUSTER_OPERATION}
 EOF
         fi
     else
@@ -422,16 +423,14 @@ EOF
 EOF
         fi
 
-        if [[ ${RUNTIME} == "kubernetes" ]]; then
+        if [[ ${INTEGRATIONS[*]} == *"kubernetes"* ]]; then
             cat <<EOF
 
-   Kubernetes RUNTIME
+   Kubernetes mode:     ${KUBERNETES_MODE}
+   Kubernetes version:  ${KUBERNETES_VERSION}
 
-   Kubernetes mode:    ${KUBERNETES_MODE}
-   Kubernetes version: ${KUBERNETES_VERSION}
-
-   Enable KinD:        ${ENABLE_KIND_CLUSTER}
-   Cluster operation:  ${KIND_CLUSTER_OPERATION}
+   Enable KinD cluster: ${ENABLE_KIND_CLUSTER}
+   Cluster operation:   ${KIND_CLUSTER_OPERATION}
 EOF
         fi
     fi
@@ -458,7 +457,6 @@ export DOCKERHUB_REPO=${DOCKERHUB_REPO}
 export COMPOSE_FILE="${COMPOSE_FILE}"
 export PYTHON_MAJOR_MINOR_VERSION="${PYTHON_MAJOR_MINOR_VERSION}"
 export BACKEND="${BACKEND}"
-export RUNTIME="${RUNTIME}"
 export ENABLE_KIND_CLUSTER="${ENABLE_KIND_CLUSTER}"
 export KUBERNETES_MODE="${KUBERNETES_MODE}"
 export KUBERNETES_VERSION="${KUBERNETES_VERSION}"
@@ -495,7 +493,6 @@ function prepare_command_files() {
     BACKEND_DOCKER_COMPOSE_FILE=${SCRIPTS_CI_DIR}/docker-compose/backend-${BACKEND}.yml
     LOCAL_DOCKER_COMPOSE_FILE=${SCRIPTS_CI_DIR}/docker-compose/local.yml
     LOCAL_PROD_DOCKER_COMPOSE_FILE=${SCRIPTS_CI_DIR}/docker-compose/local-prod.yml
-    KUBERNETES_DOCKER_COMPOSE_FILE=${SCRIPTS_CI_DIR}/docker-compose/runtime-kubernetes.yml
     REMOVE_SOURCES_DOCKER_COMPOSE_FILE=${SCRIPTS_CI_DIR}/docker-compose/remove-sources.yml
     FORWARD_CREDENTIALS_DOCKER_COMPOSE_FILE=${SCRIPTS_CI_DIR}/docker-compose/forward-credentials.yml
 
@@ -516,11 +513,6 @@ function prepare_command_files() {
         COMPOSE_CI_FILE=${COMPOSE_CI_FILE}:${REMOVE_SOURCES_DOCKER_COMPOSE_FILE}
     fi
 
-    if [[ ${RUNTIME} == "kubernetes" ]]; then
-        COMPOSE_CI_FILE=${COMPOSE_CI_FILE}:${KUBERNETES_DOCKER_COMPOSE_FILE}
-        COMPOSE_PROD_FILE=${COMPOSE_PROD_FILE}:${KUBERNETES_DOCKER_COMPOSE_FILE}
-    fi
-
     set +u
     # shellcheck disable=SC2207
     UNIQUE_INTEGRATIONS=($(echo "${INTEGRATIONS[@]}" | tr ' ' '\n' | sort -u | tr '\n' ' '))
@@ -724,27 +716,6 @@ function parse_arguments() {
           export DOCKER_CACHE="no-cache"
           export FORCE_BUILD_IMAGES="true"
           shift ;;
-        -s|--kind-cluster-start)
-          export RUNTIME=kubernetes
-          export ENABLE_KIND_CLUSTER="true"
-          export KIND_CLUSTER_OPERATION="start"
-          echo "Starting kubernetes KinD cluster"
-          echo
-          shift ;;
-        -r|--kind-cluster-recreate)
-          export RUNTIME=kubernetes
-          export ENABLE_KIND_CLUSTER="true"
-          export KIND_CLUSTER_OPERATION="recreate"
-          echo "Recreating KinD cluster"
-          echo
-          shift ;;
-        -x|--kind-cluster-stop)
-          export RUNTIME=kubernetes
-          export ENABLE_KIND_CLUSTER="true"
-          export KIND_CLUSTER_OPERATION="stop"
-          echo "Stop KinD cluster"
-          echo
-          shift ;;
         -L|--use-local-cache)
           echo "Use local cache to build images"
           echo
@@ -887,6 +858,16 @@ function parse_arguments() {
           echo
           COMMAND_TO_RUN="perform_initialize_local_virtualenv"
           shift ;;
+        kind-cluster)
+          LAST_SUBCOMMAND="${1}"
+          COMMAND_TO_RUN="manage_kind_cluster"
+          export KIND_CLUSTER_OPERATION="${2:-}"
+          if [[ ${KIND_CLUSTER_OPERATION} != "" ]]; then
+              shift 2
+          else
+              shift
+          fi
+          ;;
         setup-autocomplete)
           LAST_SUBCOMMAND="${1}"
           echo "Setting up autocomplete"
@@ -1044,6 +1025,8 @@ function prepare_formatted_versions() {
       fold -w "${WIDTH}" -s | sed "s/^/${LIST_PREFIX}/")
     FORMATTED_KUBERNETES_VERSIONS=$(echo "${_BREEZE_ALLOWED_KUBERNETES_VERSIONS=""}" | tr '\n' ' ' | \
       fold -w "${WIDTH}" -s | sed "s/^/${LIST_PREFIX}/")
+    FORMATTED_KIND_OPERATIONS=$(echo "${_BREEZE_ALLOWED_KIND_OPERATIONS=""}" | tr '\n' ' ' | \
+      fold -w "${WIDTH}" -s | sed "s/^/${LIST_PREFIX}/")
     FORMATTED_INSTALL_AIRFLOW_VERSIONS=$(echo "${_BREEZE_ALLOWED_INSTALL_AIRFLOW_VERSIONS=""}" | \
       tr '\n' ' ' | fold -w "${WIDTH}" -s | sed "s/^/${LIST_PREFIX}/")
     FORMATTED_POSTGRES_VERSIONS=$(echo "${_BREEZE_ALLOWED_POSTGRES_VERSIONS=""}" | \
@@ -1071,6 +1054,7 @@ function prepare_usage() {
     export USAGE_GENERATE_REQUIREMENTS="Generates pinned requirements for pip dependencies"
     export USAGE_INITIALIZE_LOCAL_VIRTUALENV="Initializes local virtualenv"
     export USAGE_PUSH_IMAGE="Pushes images to registry"
+    export USAGE_KIND_CLUSTER="Manages KinD cluster on the host"
     export USAGE_SETUP_AUTOCOMPLETE="Sets up autocomplete for breeze"
     export USAGE_STOP="Stops the docker-compose environment"
     export USAGE_RESTART="Stops the docker-compose environment including DB cleanup"
@@ -1085,6 +1069,8 @@ function prepare_usage() {
 
     # shellcheck disable=SC2089
     DETAILED_USAGE_SHELL="
+${CMDNAME} shell [FLAGS] -- <EXTRA_ARGS>
+
       This is default subcommand if no subcommand is used.
 
       Enters interactive shell where you can run all tests, start Airflow webserver, scheduler,
@@ -1106,12 +1092,16 @@ $(flag_footer)
     # shellcheck disable=SC2090
     export DETAILED_USAGE_SHELL
     export DETAILED_USAGE_EXEC="
+${CMDNAME} exec
+
       Execs into interactive shell to an already running container. The container mus be started
       already by breeze shell command. If you are not familiar with tmux, this is the best
       way to run multiple processes in the same container at the same time for example scheduler,
       webserver, workers, database console and interactive terminal.
 "
     export DETAILED_USAGE_BUILD_DOCS="
+${CMDNAME} build-docs
+
       Builds Airflow documentation. The documentation is build inside docker container - to
       maintain the same build environment for everyone. Appropriate sources are mapped from
       the host to the container so that latest sources are used. The folders where documentation
@@ -1120,6 +1110,8 @@ $(flag_footer)
 "
     # shellcheck disable=SC2089
     DETAILED_USAGE_BUILD_IMAGE="
+${CMDNAME} build-image [FLAGS]
+
       Builds docker image (CI or production) without entering the container. You can pass
       additional options to this command, such as '--force-build-image',
       '--force-pull-image' '--python' '--use-local-cache'' in order to modify build behaviour.
@@ -1137,6 +1129,8 @@ $(flag_verbosity)
     export DETAILED_USAGE_BUILD_IMAGE
     # shellcheck disable=SC2089
     DETAILED_USAGE_CLEANUP_IMAGE="
+${CMDNAME} cleanup-image [FLAGS]
+
       Removes the breeze-related images created in your local docker image cache. This will
       not reclaim space in docker cache. You need to 'docker system prune' (optionally
       with --all) to reclaim that space.
@@ -1150,6 +1144,8 @@ $(flag_verbosity)
     export DETAILED_USAGE_CLEANUP_IMAGE
     # shellcheck disable=SC2089
     DETAILED_USAGE_DOCKER_COMPOSE="
+${CMDNAME} docker-compose [FLAGS] COMMAND -- <EXTRA_ARGS>
+
       Run docker-compose command instead of entering the environment. Use 'help' as command
       to see available commands. The <EXTRA_ARGS> passed after -- are treated
       as additional options passed to docker-compose. For example
@@ -1165,6 +1161,8 @@ $(flag_verbosity)
     export DETAILED_USAGE_DOCKER_COMPOSE
     # shellcheck disable=SC2089
     DETAILED_USAGE_EXECUTE_COMMAND="
+${CMDNAME} execute-command [FLAGS] COMMAND -- <EXTRA_ARGS>
+
       Run chosen command instead of entering the environment. The command is run using
       'bash -c \"<command with args>\" if you need to pass arguments to your command, you need
       to pass them together with command surrounded with \" or '. Alternatively you can
@@ -1184,6 +1182,8 @@ $(flag_verbosity)
       Explains in detail all the flags that can be used with breeze.
 "
     DETAILED_USAGE_GENERATE_REQUIREMENTS="
+${CMDNAME} generate-requirements [FLAGS]
+
       Generates pinned requirements from setup.py. Those requirements are generated in requirements
       directory - separately for different python version. Those requirements are used to run
       CI builds as well as run repeatable production image builds. You can use those requirements
@@ -1197,6 +1197,8 @@ $(flag_verbosity)
     # shellcheck disable=SC2090
     export DETAILED_USAGE_GENERATE_REQUIREMENTS
     DETAILED_USAGE_INITIALIZE_LOCAL_VIRTUALENV="
+${CMDNAME} initialize-local-virtualenv [FLAGS]
+
       Initializes locally created virtualenv installing all dependencies of Airflow
       taking into account the frozen requirements from requirements folder.
       This local virtualenv can be used to aid autocompletion and IDE support as
@@ -1210,6 +1212,8 @@ $(flag_airflow_variants)
     export DETAILED_USAGE_INITIALIZE_LOCAL_VIRTUALENV
     # shellcheck disable=SC2089
     DETAILED_USAGE_PUSH_IMAGE="
+${CMDNAME} push_image [FLAGS]
+
       Pushes images to docker registry. You can push the images to DockerHub registry (default)
       or to the GitHub cache registry (if --registry-cache flag is used).
 
@@ -1233,17 +1237,40 @@ $(flag_verbosity)
 "
     # shellcheck disable=SC2090
     export DETAILED_USAGE_PUSH_IMAGE
+    DETAILED_USAGE_KIND_CLUSTER="
+${CMDNAME} kind-cluster [FLAGS] OPERATION
+
+      Manages host-side Kind Kubernetes cluster that is used to run Kubernetes integration tests.
+      It allows to start/stop/restart/status the Kind Kubernetes cluster and deploy Airflow to it.
+      This enables you to run tests inside the breeze environment with latest airflow images loaded.
+      Note that in case of deploying airflow, the first step is to rebuild the image and loading it
+      to the cluster so you can also pass appropriate build image flags that will influence
+      rebuilding the production image. Operation is one of:
+
+${FORMATTED_KIND_OPERATIONS}
+
+Flags:
+$(flag_airflow_variants)
+$(flag_build_docker_images)
+"
+    export DETAILED_USAGE_KIND_CLUSTER
     export DETAILED_USAGE_SETUP_AUTOCOMPLETE="
+${CMDNAME} setup-autocomplete
+
       Sets up autocomplete for breeze commands. Once you do it you need to re-enter the bash
       shell and when typing breeze command <TAB> will provide autocomplete for
       parameters and values.
 "
     export DETAILED_USAGE_STOP="
+${CMDNAME} stop
+
       Brings down running docker compose environment. When you start the environment, the docker
       containers will continue running so that startup time is shorter. But they take quite a lot of
       memory and CPU. This command stops all running containers from the environment.
 "
     DETAILED_USAGE_RESTART="
+${CMDNAME} restart [FLAGS]
+
       Restarts running docker compose environment. When you restart the environment, the docker
       containers will be restarted. That includes cleaning up the databases. This is
       especially useful if you switch between different versions of Airflow.
@@ -1253,6 +1280,8 @@ $(flag_footer)
 "
     export DETAILED_USAGE_RESTART
     export DETAILED_USAGE_STATIC_CHECK="
+${CMDNAME} static-check [FLAGS] STATIC_CHECK
+
       Run selected static checks for currently changed files. You should specify static check that
       you would like to run or 'all' to run all checks. One of:
 
@@ -1269,6 +1298,8 @@ ${FORMATTED_STATIC_CHECKS}
       '${CMDNAME} static-check mypy -- --help'
 "
     export DETAILED_USAGE_STATIC_CHECK_ALL_FILES="
+${CMDNAME} static-check-all [FLAGS] STATIC_CHECK
+
       Run selected static checks for all applicable files. You should specify static check that
       you would like to run or 'all' to run all checks. One of:
 
@@ -1286,6 +1317,8 @@ ${FORMATTED_STATIC_CHECKS}
 "
     # shellcheck disable=SC2089
     DETAILED_USAGE_TEST_TARGET="
+${CMDNAME} test-target [FLAGS] TEST_TARGET -- <EXTRA_ARGS>
+
       Run the specified unit test target. There might be multiple
       targets specified separated with comas. The <EXTRA_ARGS> passed after -- are treated
       as additional options passed to pytest. For example:
@@ -1298,15 +1331,23 @@ $(flag_footer)
     # shellcheck disable=SC2090
     export DETAILED_USAGE_TEST_TARGET
     export DETAILED_USAGE_TOGGLE_SUPPRESS_CHEATSHEET="
+${CMDNAME} toggle-suppress-cheatsheet
+
       Toggles on/off cheatsheet displayed before starting bash shell.
 "
     export DETAILED_USAGE_TOGGLE_SUPPRESS_ASCIIART="
+${CMDNAME} toggle-suppress-asciiart
+
       Toggles on/off asciiart displayed before starting bash shell.
 "
     export DETAILED_USAGE_HELP="
-      Shows this help message.
+${CMDNAME} help
+
+      Shows general help message for all commands.
 "
     export DETAILED_USAGE_HELP_ALL="
+${CMDNAME} help-all
+
       Shows detailed help for all commands and flags.
 "
 }
@@ -1366,21 +1407,10 @@ Help commands:
 # Prints detailed usage for command specified
 function detailed_usage() {
       SUBCOMMAND=${1}
-      COMMAND_PARAMETER=""
-      if [[ ${SUBCOMMAND} == "static-check" ]]; then
-        COMMAND_PARAMETER="<STATIC_CHECK> "
-      elif [[ ${SUBCOMMAND} == "static-check-all-files" ]]; then
-        COMMAND_PARAMETER="<STATIC_CHECK> "
-      elif [[ ${SUBCOMMAND} == "test-target" ]]; then
-        COMMAND_PARAMETER="<TEST_TARGET> "
-      elif [[ ${SUBCOMMAND} == "docker-compose" ]]; then
-        COMMAND_PARAMETER="<DOCKER_COMPOSE_COMMAND> "
-      fi
       echo "
 
 Detailed usage for command: ${SUBCOMMAND}
 
-${CMDNAME} [FLAGS] ${SUBCOMMAND} ${COMMAND_PARAMETER}-- <EXTRA_ARGS>
 $(get_detailed_usage "${SUBCOMMAND}")
 
 "
@@ -1457,29 +1487,9 @@ ${FORMATTED_INTEGRATIONS}
 }
 
 # Prints Kubernetes action flags
-function flag_kubernetes_actions() {
+function flag_kubernetes_configuration() {
       echo "
-Action for the cluster : only one of the --kind-cluster-* flags can be used at a time:
-
--s, --kind-cluster-start
-        Starts KinD Kubernetes cluster after entering the environment. The cluster is started using
-        Kubernetes Mode selected and Kubernetes version specified via --kubernetes-mode and
-        --kubernetes-version flags.
-
--x, --kind-cluster-stop
-        Stops KinD Kubernetes cluster if one has already been created. By default, if you do not
-        stop environment, the Kubernetes cluster created for testing is continuously running and
-        when you start Kubernetes testing again it will be reused. You can force deletion and
-        recreation of such cluster with this flag.
-
--r, --kind-cluster-recreate
-
-        Recreates KinD Kubernetes cluster if one has already been created. By default, if you do
-        not stop environment, the Kubernetes cluster created for testing is continuously running
-        and when you start Kubernetes testing again it will be reused. You can force deletion and
-        recreation of such cluster with this flag.
-
-Kubernetes mode/version flags:
+Configuration for the KinD Kubernetes cluster and tests:
 
 -K, --kubernetes-mode <KUBERNETES_MODE>
         Kubernetes mode - only used in case one of --kind-cluster-* commands is used.
@@ -1556,6 +1566,13 @@ function flag_verbosity() {
 "
 }
 
+function flag_help() {
+      echo "
+-h, --help
+        Shows detailed help message for the command specified.
+"
+}
+
 # Prints flags controlling docker build process
 function flag_build_docker_images() {
       echo "
@@ -1642,8 +1659,8 @@ $(print_star_line)
 $(flag_breeze_actions)
 
 $(print_star_line)
- Manage Kind kubernetes cluster (optional)
-$(flag_kubernetes_actions)
+ Kind kubernetes and Kubernetes tests configuration(optional)
+$(flag_kubernetes_configuration)
 
 $(print_star_line)
  Manage mounting local files
@@ -1672,6 +1689,10 @@ $(flag_pull_push_docker_images)
 $(print_star_line)
  Increase verbosity of the scripts
 $(flag_verbosity)
+
+$(print_star_line)
+ Print detailed help message
+$(flag_help)
 "
 }
 
@@ -1915,6 +1936,28 @@ function run_build_command {
             ;;
         perform_initialize_local_virtualenv|perform_setup_autocomplete)
             ;;
+        manage_kind_cluster)
+            if [[ ${KIND_CLUSTER_OPERATION} == "start" ]] ; then
+              echo "Starts KinD cluster"
+            elif [[ ${KIND_CLUSTER_OPERATION} == "stop" ]] ; then
+              echo "Stops KinD cluster"
+            elif [[ ${KIND_CLUSTER_OPERATION} == "restart" ]] ; then
+              echo "Restarts KinD cluster"
+            elif [[ ${KIND_CLUSTER_OPERATION} == "status" ]] ; then
+              echo "Checks status of KinD cluster"
+            elif [[ ${KIND_CLUSTER_OPERATION} == "deploy" ]] ; then
+              echo "Deploys Airflow to KinD cluster"
+            elif [[ ${KIND_CLUSTER_OPERATION} == "test" ]] ; then
+              echo "Run Kubernetes tests with the KinD cluster "
+            else
+              echo "ERROR: Unknown Kind Kubernetes cluster operation: '${KIND_CLUSTER_OPERATION}'"
+              echo
+              echo "Should be one of:"
+              echo "${FORMATTED_KIND_OPERATIONS}"
+              echo
+              exit 1
+            fi
+            ;;
         *)
           echo >&2
           echo >&2 "ERROR: Unknown command to run ${COMMAND_TO_RUN}"
@@ -1997,6 +2040,10 @@ function run_breeze_command {
         perform_setup_autocomplete)
             setup_autocomplete
             ;;
+        manage_kind_cluster)
+            check_kind_and_kubectl_are_installed
+            perform_kind_cluster_operation "${KIND_CLUSTER_OPERATION}"
+            ;;
         build_docs)
             run_docs
             ;;
diff --git a/breeze-complete b/breeze-complete
index de39a83..d1df5b9 100644
--- a/breeze-complete
+++ b/breeze-complete
@@ -20,10 +20,11 @@
 _BREEZE_ALLOWED_PYTHON_MAJOR_MINOR_VERSIONS="2.7 3.5 3.6 3.7"
 _BREEZE_ALLOWED_BACKENDS="sqlite mysql postgres"
 _BREEZE_ALLOWED_INTEGRATIONS="cassandra kerberos mongo openldap rabbitmq redis"
-_BREEZE_ALLOWED_KUBERNETES_MODES="persistent_mode git_mode"
-_BREEZE_ALLOWED_KUBERNETES_VERSIONS="v1.15.3 v1.16.2"
+_BREEZE_ALLOWED_KUBERNETES_MODES="image git"
+_BREEZE_ALLOWED_KUBERNETES_VERSIONS="v1.15.3"
 _BREEZE_ALLOWED_MYSQL_VERSIONS="5.6 5.7"
 _BREEZE_ALLOWED_POSTGRES_VERSIONS="9.6 10"
+_BREEZE_ALLOWED_KIND_OPERATIONS="start stop restart status deploy test"
 
 _BREEZE_ALLOWED_INSTALL_AIRFLOW_VERSIONS=$(cat <<-EOF
 1.10.10
@@ -85,7 +86,6 @@ _BREEZE_DEFAULT_GITHUB_REPO="airflow"
 _BREEZE_SHORT_OPTIONS="
 h p: b: i:
 K: V:
-s x r
 l a: t: d:
 v y n q f
 F P I E: C L
@@ -95,7 +95,6 @@ D: R: c g: G:
 _BREEZE_LONG_OPTIONS="
 help python: backend: integration:
 kubernetes-mode: kubernetes-version:
-kind-cluster-start kind-cluster-stop kind-cluster-recreate
 skip-mounting-local-sources install-airflow-version: install-airflow-reference: db-reset
 verbose assume-yes assume-no assume-quit forward-credentials
 force-build-images force-pull-images production-image extras: force-clean-images use-local-cache
@@ -113,6 +112,7 @@ exec
 generate-requirements
 push-image
 initialize-local-virtualenv
+kind-cluster
 setup-autocomplete
 stop
 restart
@@ -122,6 +122,7 @@ toggle-suppress-asciiart"
 export BREEZE_EXTRA_ARG_COMMANDS="
 docker-compose
 execute-command
+kind-cluster
 static-check
 static-check-all-files
 test-target"
@@ -188,6 +189,9 @@ function _get_known_values_breeze() {
     -g | --github-repo)
         _BREEZE_KNOWN_VALUES="${_BREEZE_DEFAULT_GITHUB_REPO}"
         ;;
+    kind-cluster)
+        _BREEZE_KNOWN_VALUES="${_BREEZE_ALLOWED_KIND_OPERATIONS}"
+        ;;
     *)
         _BREEZE_KNOWN_VALUES=""
         ;;
diff --git a/tests/runtime/kubernetes/__init__.py b/kubernetes_tests/__init__.py
similarity index 100%
rename from tests/runtime/kubernetes/__init__.py
rename to kubernetes_tests/__init__.py
diff --git a/tests/runtime/kubernetes/test_kubernetes_executor.py b/kubernetes_tests/test_kubernetes_executor.py
similarity index 71%
rename from tests/runtime/kubernetes/test_kubernetes_executor.py
rename to kubernetes_tests/test_kubernetes_executor.py
index ac44f81..3b86c7e 100644
--- a/tests/runtime/kubernetes/test_kubernetes_executor.py
+++ b/kubernetes_tests/test_kubernetes_executor.py
@@ -20,16 +20,18 @@ import time
 import unittest
 from subprocess import check_call, check_output
 
-import pytest
 import requests
 import requests.exceptions
 from requests.adapters import HTTPAdapter
 from urllib3.util.retry import Retry
 
-KUBERNETES_HOST = (os.environ.get('CLUSTER_NAME') or "docker") + "-worker:30809"
+KUBERNETES_HOST_PORT = (os.environ.get('CLUSTER_HOST') or "localhost") + ":30809"
+
+print()
+print("Cluster host/port used: ${KUBERNETES_HOST_PORT}".format(KUBERNETES_HOST_PORT=KUBERNETES_HOST_PORT))
+print()
 
 
-@pytest.mark.runtime("kubernetes")
 class TestKubernetesExecutor(unittest.TestCase):
 
     @staticmethod
@@ -56,7 +58,7 @@ class TestKubernetesExecutor(unittest.TestCase):
 
     def _ensure_airflow_webserver_is_healthy(self):
         response = self.session.get(
-            "http://{host}/health".format(host=KUBERNETES_HOST),
+            "http://{host}/health".format(host=KUBERNETES_HOST_PORT),
             timeout=1,
         )
 
@@ -80,16 +82,15 @@ class TestKubernetesExecutor(unittest.TestCase):
 
             # Trigger a new dagrun
             try:
-                result = self.session.get(
-                    'http://{host}/api/experimental/dags/{dag_id}/'
-                    'dag_runs/{execution_date}/tasks/{task_id}'
-                    .format(host=host,
-                            dag_id=dag_id,
-                            execution_date=execution_date,
-                            task_id=task_id)
-                )
+                get_string = \
+                    'http://{host}/api/experimental/dags/{dag_id}/' \
+                    'dag_runs/{execution_date}/tasks/{task_id}'.format(
+                        host=host, dag_id=dag_id, execution_date=execution_date, task_id=task_id)
+                print("Calling [monitor_task]#1 {get_string}".format(get_string=get_string))
+                result = self.session.get(get_string)
                 self.assertEqual(result.status_code, 200, "Could not get the status")
                 result_json = result.json()
+                print("Received [monitor_task]#2: {result_json}".format(result_json=result_json))
                 state = result_json['state']
                 print("Attempt {}: Current state of operator is {}".format(tries, state))
 
@@ -112,18 +113,16 @@ class TestKubernetesExecutor(unittest.TestCase):
         while tries < max_tries:
             time.sleep(5)
 
+            get_string = \
+                'http://{host}/api/experimental/dags/{dag_id}/' \
+                'dag_runs/{execution_date}'.format(host=host,
+                                                   dag_id=dag_id, execution_date=execution_date)
+            print("Calling {get_string}".format(get_string=get_string))
             # Trigger a new dagrun
-            result = self.session.get(
-                'http://{host}/api/experimental/dags/{dag_id}/'
-                'dag_runs/{execution_date}'
-                .format(host=host,
-                        dag_id=dag_id,
-                        execution_date=execution_date)
-            )
-            print(result)
+            result = self.session.get(get_string)
             self.assertEqual(result.status_code, 200, "Could not get the status")
             result_json = result.json()
-            print(result_json)
+            print("Received: {result}".format(result=result))
             state = result_json['state']
             check_call(
                 ["echo", "Attempt {}: Current state of dag is {}".format(tries, state)])
@@ -138,53 +137,60 @@ class TestKubernetesExecutor(unittest.TestCase):
         # Maybe check if we can retrieve the logs, but then we need to extend the API
 
     def start_dag(self, dag_id, host):
-        result = self.session.get(
-            'http://{host}/api/experimental/'
-            'dags/{dag_id}/paused/false'.format(host=host, dag_id=dag_id)
-        )
+        get_string = 'http://{host}/api/experimental/' \
+                     'dags/{dag_id}/paused/false'.format(host=host, dag_id=dag_id)
+        print("Calling [start_dag]#1 {get_string}".format(get_string=get_string))
+        result = self.session.get(get_string)
         try:
             result_json = result.json()
         except ValueError:
             result_json = str(result)
-
+        print("Received [start_dag]#1 {result_json}".format(result_json=result_json))
         self.assertEqual(result.status_code, 200, "Could not enable DAG: {result}"
                          .format(result=result_json))
-
+        post_string = 'http://{host}/api/experimental/' \
+                      'dags/{dag_id}/dag_runs'.format(host=host, dag_id=dag_id)
+        print("Calling [start_dag]#2 {post_string}".format(post_string=post_string))
         # Trigger a new dagrun
-        result = self.session.post(
-            'http://{host}/api/experimental/'
-            'dags/{dag_id}/dag_runs'.format(host=host, dag_id=dag_id),
-            json={}
-        )
+        result = self.session.post(post_string, json={})
         try:
             result_json = result.json()
         except ValueError:
             result_json = str(result)
-
+        print("Received [start_dag]#2 {result_json}".format(result_json=result_json))
         self.assertEqual(result.status_code, 200, "Could not trigger a DAG-run: {result}"
                          .format(result=result_json))
 
         time.sleep(1)
 
-        result = self.session.get(
-            'http://{}/api/experimental/latest_runs'.format(host)
-        )
+        get_string = 'http://{host}/api/experimental/latest_runs'.format(host=host)
+        print("Calling [start_dag]#3 {get_string}".format(get_string=get_string))
+        result = self.session.get(get_string)
         self.assertEqual(result.status_code, 200, "Could not get the latest DAG-run:"
                                                   " {result}"
                          .format(result=result.json()))
         result_json = result.json()
+        print("Received: [start_dag]#3 {result_json}".format(result_json=result_json))
         return result_json
 
-    def test_integration_run_dag(self):
-        host = KUBERNETES_HOST
-        dag_id = 'example_kubernetes_executor_config'
-
+    def start_job_in_kubernetes(self, dag_id, host):
         result_json = self.start_dag(dag_id=dag_id, host=host)
-
         self.assertGreater(len(result_json['items']), 0)
+        execution_date = None
+        for dag_run in result_json['items']:
+            if dag_run['dag_id'] == dag_id:
+                execution_date = dag_run['execution_date']
+                break
+        self.assertIsNotNone(execution_date,
+                             "No execution_date can be found for the dag with {dag_id}".format(dag_id=dag_id))
+        return execution_date
+
+    def test_integration_run_dag(self):
+        host = KUBERNETES_HOST_PORT
+        dag_id = 'example_kubernetes_executor_config'
 
-        execution_date = result_json['items'][0]['execution_date']
-        print("Found the job with execution date {}".format(execution_date))
+        execution_date = self.start_job_in_kubernetes(dag_id, host)
+        print("Found the job with execution date {execution_date}".format(execution_date=execution_date))
 
         # Wait some time for the operator to complete
         self.monitor_task(host=host,
@@ -199,15 +205,10 @@ class TestKubernetesExecutor(unittest.TestCase):
                                        expected_final_state='success', timeout=200)
 
     def test_integration_run_dag_with_scheduler_failure(self):
-        host = KUBERNETES_HOST
+        host = KUBERNETES_HOST_PORT
         dag_id = 'example_kubernetes_executor_config'
 
-        result_json = self.start_dag(dag_id=dag_id, host=host)
-
-        self.assertGreater(len(result_json['items']), 0)
-
-        execution_date = result_json['items'][0]['execution_date']
-        print("Found the job with execution date {}".format(execution_date))
+        execution_date = self.start_job_in_kubernetes(dag_id, host)
 
         self._delete_airflow_pod()
 
diff --git a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
similarity index 64%
rename from tests/runtime/kubernetes/test_kubernetes_pod_operator.py
rename to kubernetes_tests/test_kubernetes_pod_operator.py
index f533659..7eb4beb 100644
--- a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -1,3 +1,4 @@
+# pylint: disable=unused-argument
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -20,25 +21,45 @@ import os
 import shutil
 import unittest
 
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from tests.compat import mock, patch
+
 import kubernetes.client.models as k8s
-import pytest
+import pendulum
 from kubernetes.client.api_client import ApiClient
 from kubernetes.client.rest import ApiException
 
-from airflow import AirflowException
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow.exceptions import AirflowException
+from airflow.kubernetes import kube_client
 from airflow.kubernetes.pod import Port
 from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.kubernetes.secret import Secret
 from airflow.kubernetes.volume import Volume
 from airflow.kubernetes.volume_mount import VolumeMount
+from airflow.models import DAG, TaskInstance
+
+from airflow.utils import timezone
 from airflow.version import version as airflow_version
-from tests.compat import mock
 
 
-@pytest.mark.runtime("kubernetes")
-class TestKubernetesPodOperator(unittest.TestCase):
+# noinspection DuplicatedCode
+def create_context(task):
+    dag = DAG(dag_id="dag")
+    tzinfo = pendulum.timezone("Europe/Amsterdam")
+    execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
+    task_instance = TaskInstance(task=task,
+                                 execution_date=execution_date)
+    return {
+        "dag": dag,
+        "ts": execution_date.isoformat(),
+        "task": task,
+        "ti": task_instance,
+    }
+
+
+# noinspection DuplicatedCode,PyUnusedLocal
+class TestKubernetesPodOperatorSystem(unittest.TestCase):
 
     def setUp(self):
         self.maxDiff = None  # pylint: disable=invalid-name
@@ -51,10 +72,12 @@ class TestKubernetesPodOperator(unittest.TestCase):
                 'name': mock.ANY,
                 'annotations': {},
                 'labels': {
-                    'foo': 'bar',
-                    'kubernetes_pod_operator': 'True',
-                    'airflow_version': airflow_version.replace('+', '-')
-                }
+                    'foo': 'bar', 'kubernetes_pod_operator': 'True',
+                    'airflow_version': airflow_version.replace('+', '-'),
+                    'execution_date': '2016-01-01T0100000100-a2f50a31f',
+                    'dag_id': 'dag',
+                    'task_id': 'task',
+                    'try_number': '1'},
             },
             'spec': {
                 'affinity': {},
@@ -67,15 +90,11 @@ class TestKubernetesPodOperator(unittest.TestCase):
                     'envFrom': [],
                     'name': 'base',
                     'ports': [],
-                    'resources': {'limits': {'cpu': None,
-                                             'memory': None,
-                                             'nvidia.com/gpu': None},
-                                  'requests': {'cpu': None,
-                                               'memory': None}},
                     'volumeMounts': [],
                 }],
                 'hostNetwork': False,
                 'imagePullSecrets': [],
+                'initContainers': [],
                 'nodeSelector': {},
                 'restartPolicy': 'Never',
                 'securityContext': {},
@@ -85,10 +104,15 @@ class TestKubernetesPodOperator(unittest.TestCase):
             }
         }
 
-    def test_config_path_move(self):
+    def tearDown(self):
+        client = kube_client.get_kube_client(in_cluster=False)
+        client.delete_collection_namespaced_pod(namespace="default")
+
+    def test_do_xcom_push_defaults_false(self):
         new_config_path = '/tmp/kube_config'
         old_config_path = os.path.expanduser('~/.kube/config')
         shutil.copy(old_config_path, new_config_path)
+
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
@@ -101,43 +125,31 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
             config_file=new_config_path,
         )
-        k.execute(None)
-        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.assertEqual(self.expected_pod, actual_pod)
+        self.assertFalse(k.do_xcom_push)
 
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_config_path(self, client_mock, launcher_mock):
-        from airflow.utils.state import State
+    def test_config_path_move(self):
+        new_config_path = '/tmp/kube_config'
+        old_config_path = os.path.expanduser('~/.kube/config')
+        shutil.copy(old_config_path, new_config_path)
 
-        file_path = "/tmp/fake_file"
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
+            name="test1",
             task_id="task",
             in_cluster=False,
             do_xcom_push=False,
-            config_file=file_path,
-            cluster_context='default',
-        )
-        launcher_mock.return_value = (State.SUCCESS, None)
-        k.execute(None)
-        client_mock.assert_called_once_with(
-            in_cluster=False,
-            cluster_context='default',
-            config_file=file_path,
+            config_file=new_config_path,
         )
+        context = create_context(k)
+        k.execute(context)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.assertEqual(self.expected_pod, actual_pod)
 
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_image_pull_secrets_correctly_set(self, mock_client, mock_launcher):
-        from airflow.utils.state import State
-
-        fake_pull_secrets = "fakeSecret"
+    def test_working_pod(self):
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
@@ -146,22 +158,16 @@ class TestKubernetesPodOperator(unittest.TestCase):
             labels={"foo": "bar"},
             name="test",
             task_id="task",
-            image_pull_secrets=fake_pull_secrets,
             in_cluster=False,
             do_xcom_push=False,
-            cluster_context='default',
-        )
-        mock_launcher.return_value = (State.SUCCESS, None)
-        k.execute(None)
-        self.assertEqual(
-            mock_launcher.call_args[0][0].spec.image_pull_secrets,
-            [k8s.V1LocalObjectReference(name=fake_pull_secrets)]
         )
+        context = create_context(k)
+        k.execute(context)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
+        self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
 
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod")
-    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_pod_delete_even_on_launcher_error(self, mock_client, delete_pod_mock, run_pod_mock):
+    def test_delete_operator_pod(self):
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
@@ -172,31 +178,15 @@ class TestKubernetesPodOperator(unittest.TestCase):
             task_id="task",
             in_cluster=False,
             do_xcom_push=False,
-            cluster_context='default',
             is_delete_operator_pod=True,
         )
-        run_pod_mock.side_effect = AirflowException('fake failure')
-        with self.assertRaises(AirflowException):
-            k.execute(None)
-        assert delete_pod_mock.called
-
-    def test_working_pod(self):
-        k = KubernetesPodOperator(
-            namespace='default',
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
-            task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-        )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.assertEqual(self.expected_pod, actual_pod)
+        self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
+        self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
 
-    def test_delete_operator_pod(self):
+    def test_pod_hostnetwork(self):
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
@@ -205,15 +195,19 @@ class TestKubernetesPodOperator(unittest.TestCase):
             labels={"foo": "bar"},
             name="test",
             task_id="task",
-            is_delete_operator_pod=True,
             in_cluster=False,
             do_xcom_push=False,
+            hostnetwork=True,
         )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.assertEqual(self.expected_pod, actual_pod)
+        self.expected_pod['spec']['hostNetwork'] = True
+        self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
+        self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
 
-    def test_pod_hostnetwork(self):
+    def test_pod_dnspolicy(self):
+        dns_policy = "ClusterFirstWithHostNet"
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
@@ -225,16 +219,20 @@ class TestKubernetesPodOperator(unittest.TestCase):
             in_cluster=False,
             do_xcom_push=False,
             hostnetwork=True,
+            dnspolicy=dns_policy
         )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['hostNetwork'] = True
-        self.assertEqual(self.expected_pod, actual_pod)
+        self.expected_pod['spec']['dnsPolicy'] = dns_policy
+        self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
+        self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
 
-    def test_pod_dnspolicy(self):
-        dns_policy = "ClusterFirstWithHostNet"
+    def test_pod_schedulername(self):
+        scheduler_name = "default-scheduler"
         k = KubernetesPodOperator(
-            namespace='default',
+            namespace="default",
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
@@ -243,13 +241,12 @@ class TestKubernetesPodOperator(unittest.TestCase):
             task_id="task",
             in_cluster=False,
             do_xcom_push=False,
-            hostnetwork=True,
-            dnspolicy=dns_policy,
+            schedulername=scheduler_name
         )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.expected_pod['spec']['hostNetwork'] = True
-        self.expected_pod['spec']['dnsPolicy'] = dns_policy
+        self.expected_pod['spec']['schedulerName'] = scheduler_name
         self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_node_selectors(self):
@@ -268,7 +265,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
             node_selectors=node_selectors,
         )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['nodeSelector'] = node_selectors
         self.assertEqual(self.expected_pod, actual_pod)
@@ -277,8 +275,10 @@ class TestKubernetesPodOperator(unittest.TestCase):
         resources = {
             'limit_cpu': 0.25,
             'limit_memory': '64Mi',
+            'limit_ephemeral_storage': '2Gi',
             'request_cpu': '250m',
             'request_memory': '64Mi',
+            'request_ephemeral_storage': '1Gi',
         }
         k = KubernetesPodOperator(
             namespace='default',
@@ -292,17 +292,20 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
             resources=resources,
         )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['containers'][0]['resources'] = {
             'requests': {
                 'memory': '64Mi',
-                'cpu': '250m'
+                'cpu': '250m',
+                'ephemeral-storage': '1Gi'
             },
             'limits': {
                 'memory': '64Mi',
                 'cpu': 0.25,
-                'nvidia.com/gpu': None
+                'nvidia.com/gpu': None,
+                'ephemeral-storage': '2Gi'
             }
         }
         self.assertEqual(self.expected_pod, actual_pod)
@@ -337,7 +340,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
             affinity=affinity,
         )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context=context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['affinity'] = affinity
         self.assertEqual(self.expected_pod, actual_pod)
@@ -357,7 +361,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
             ports=[port],
         )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context=context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['containers'][0]['ports'] = [{
             'name': 'http',
@@ -366,7 +371,7 @@ class TestKubernetesPodOperator(unittest.TestCase):
         self.assertEqual(self.expected_pod, actual_pod)
 
     def test_volume_mount(self):
-        with mock.patch.object(PodLauncher, 'log') as mock_logger:
+        with patch.object(PodLauncher, 'log') as mock_logger:
             volume_mount = VolumeMount('test-volume',
                                        mount_path='/tmp/test_volume',
                                        sub_path=None,
@@ -394,7 +399,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
                 in_cluster=False,
                 do_xcom_push=False,
             )
-            k.execute(None)
+            context = create_context(k)
+            k.execute(context=context)
             mock_logger.info.assert_any_call(b"retrieved from mount\n")
             actual_pod = self.api_client.sanitize_for_serialization(k.pod)
             self.expected_pod['spec']['containers'][0]['args'] = args
@@ -429,7 +435,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
             security_context=security_context,
         )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['securityContext'] = security_context
         self.assertEqual(self.expected_pod, actual_pod)
@@ -453,7 +460,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
             security_context=security_context,
         )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['securityContext'] = security_context
         self.assertEqual(self.expected_pod, actual_pod)
@@ -477,7 +485,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
             security_context=security_context,
         )
-        k.execute(None)
+        context = create_context(k)
+        k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['securityContext'] = security_context
         self.assertEqual(self.expected_pod, actual_pod)
@@ -494,10 +503,11 @@ class TestKubernetesPodOperator(unittest.TestCase):
             task_id="task",
             in_cluster=False,
             do_xcom_push=False,
-            startup_timeout_seconds=5
+            startup_timeout_seconds=5,
         )
         with self.assertRaises(AirflowException):
-            k.execute(None)
+            context = create_context(k)
+            k.execute(context)
             actual_pod = self.api_client.sanitize_for_serialization(k.pod)
             self.expected_pod['spec']['containers'][0]['image'] = bad_image_name
             self.assertEqual(self.expected_pod, actual_pod)
@@ -518,7 +528,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             service_account_name=bad_service_account_name,
         )
         with self.assertRaises(ApiException):
-            k.execute(None)
+            context = create_context(k)
+            k.execute(context)
             actual_pod = self.api_client.sanitize_for_serialization(k.pod)
             self.expected_pod['spec']['serviceAccountName'] = bad_service_account_name
             self.assertEqual(self.expected_pod, actual_pod)
@@ -540,7 +551,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
         )
         with self.assertRaises(AirflowException):
-            k.execute(None)
+            context = create_context(k)
+            k.execute(context)
             actual_pod = self.api_client.sanitize_for_serialization(k.pod)
             self.expected_pod['spec']['containers'][0]['args'] = bad_internal_command
             self.assertEqual(self.expected_pod, actual_pod)
@@ -559,20 +571,22 @@ class TestKubernetesPodOperator(unittest.TestCase):
             in_cluster=False,
             do_xcom_push=True,
         )
-        self.assertEqual(k.execute(None), json.loads(return_value))
+        context = create_context(k)
+        self.assertEqual(k.execute(context), json.loads(return_value))
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         volume = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME)
         volume_mount = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME_MOUNT)
         container = self.api_client.sanitize_for_serialization(PodDefaults.SIDECAR_CONTAINER)
         self.expected_pod['spec']['containers'][0]['args'] = args
-        self.expected_pod['spec']['containers'][0]['volumeMounts'].insert(0, volume_mount)
+        self.expected_pod['spec']['containers'][0]['volumeMounts'].insert(0, volume_mount)  # noqa
         self.expected_pod['spec']['volumes'].insert(0, volume)
         self.expected_pod['spec']['containers'].append(container)
         self.assertEqual(self.expected_pod, actual_pod)
 
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_envs_from_configmaps(self, mock_client, mock_launcher):
+    @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_envs_from_configmaps(self, mock_client, mock_monitor, mock_start):
         # GIVEN
         from airflow.utils.state import State
 
@@ -588,21 +602,23 @@ class TestKubernetesPodOperator(unittest.TestCase):
             task_id="task",
             in_cluster=False,
             do_xcom_push=False,
-            configmaps=[configmap]
+            configmaps=[configmap],
         )
         # THEN
-        mock_launcher.return_value = (State.SUCCESS, None)
-        k.execute(None)
+        mock_monitor.return_value = (State.SUCCESS, None)
+        context = create_context(k)
+        k.execute(context)
         self.assertEqual(
-            mock_launcher.call_args[0][0].spec.containers[0].env_from,
+            mock_start.call_args[0][0].spec.containers[0].env_from,
             [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(
                 name=configmap
             ))]
         )
 
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_envs_from_secrets(self, mock_client, mock_launcher):
+    @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock):
         # GIVEN
         from airflow.utils.state import State
         secret_ref = 'secret_name'
@@ -621,16 +637,194 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
         )
         # THEN
-        mock_launcher.return_value = (State.SUCCESS, None)
-        k.execute(None)
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = create_context(k)
+        k.execute(context)
         self.assertEqual(
-            mock_launcher.call_args[0][0].spec.containers[0].env_from,
+            start_mock.call_args[0][0].spec.containers[0].env_from,
             [k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(
                 name=secret_ref
             ))]
         )
 
+    def test_init_container(self):
+        # GIVEN
+        volume_mounts = [k8s.V1VolumeMount(
+            mount_path='/etc/foo',
+            name='test-volume',
+            sub_path=None,
+            read_only=True
+        )]
+
+        init_environments = [k8s.V1EnvVar(
+            name='key1',
+            value='value1'
+        ), k8s.V1EnvVar(
+            name='key2',
+            value='value2'
+        )]
+
+        init_container = k8s.V1Container(
+            name="init-container",
+            image="ubuntu:16.04",
+            env=init_environments,
+            volume_mounts=volume_mounts,
+            command=["bash", "-cx"],
+            args=["echo 10"]
+        )
+
+        volume_config = {
+            'persistentVolumeClaim':
+                {
+                    'claimName': 'test-volume'
+                }
+        }
+        volume = Volume(name='test-volume', configs=volume_config)
+
+        expected_init_container = {
+            'name': 'init-container',
+            'image': 'ubuntu:16.04',
+            'command': ['bash', '-cx'],
+            'args': ['echo 10'],
+            'env': [{
+                'name': 'key1',
+                'value': 'value1'
+            }, {
+                'name': 'key2',
+                'value': 'value2'
+            }],
+            'volumeMounts': [{
+                'mountPath': '/etc/foo',
+                'name': 'test-volume',
+                'readOnly': True
+            }],
+        }
+
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            volumes=[volume],
+            init_containers=[init_container],
+            in_cluster=False,
+            do_xcom_push=False,
+        )
+        context = create_context(k)
+        k.execute(context)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['initContainers'] = [expected_init_container]
+        self.expected_pod['spec']['volumes'] = [{
+            'name': 'test-volume',
+            'persistentVolumeClaim': {
+                'claimName': 'test-volume'
+            }
+        }]
+        self.assertEqual(self.expected_pod, actual_pod)
+
+    @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_pod_template_file(
+            self,
+            mock_client,
+            monitor_mock,
+            start_mock):  # pylint: disable=unused-argument
+        from airflow.utils.state import State
+        k = KubernetesPodOperator(
+            task_id='task',
+            pod_template_file='tests/kubernetes/pod.yaml',
+            do_xcom_push=True
+        )
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = create_context(k)
+        k.execute(context)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {'name': mock.ANY, 'namespace': 'mem-example'},
+            'spec': {
+                'volumes': [{'name': 'xcom', 'emptyDir': {}}],
+                'containers': [{
+                    'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang', '1'],
+                    'command': ['stress'],
+                    'image': 'polinux/stress',
+                    'name': 'memory-demo-ctr',
+                    'resources': {
+                        'limits': {'memory': '200Mi'},
+                        'requests': {'memory': '100Mi'}
+                    },
+                    'volumeMounts': [{
+                        'name': 'xcom',
+                        'mountPath': '/airflow/xcom'
+                    }]
+                }, {
+                    'name': 'airflow-xcom-sidecar',
+                    'image': "alpine",
+                    'command': ['sh', '-c', PodDefaults.XCOM_CMD],
+                    'volumeMounts': [
+                        {
+                            'name': 'xcom',
+                            'mountPath': '/airflow/xcom'
+                        }
+                    ],
+                    'resources': {'requests': {'cpu': '1m'}},
+                }],
+            }
+        }, actual_pod)
+
+    @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_pod_priority_class_name(
+            self,
+            mock_client,
+            monitor_mock,
+            start_mock):  # pylint: disable=unused-argument
+        """Test ability to assign priorityClassName to pod
+
+        """
+        from airflow.utils.state import State
+
+        priority_class_name = "medium-test"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            priority_class_name=priority_class_name,
+        )
+
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = create_context(k)
+        k.execute(context)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['priorityClassName'] = priority_class_name
+        self.assertEqual(self.expected_pod, actual_pod)
+
+    def test_pod_name(self):
+        pod_name_too_long = "a" * 221
+        with self.assertRaises(AirflowException):
+            KubernetesPodOperator(
+                namespace='default',
+                image="ubuntu:16.04",
+                cmds=["bash", "-cx"],
+                arguments=["echo 10"],
+                labels={"foo": "bar"},
+                name=pod_name_too_long,
+                task_id="task",
+                in_cluster=False,
+                do_xcom_push=False,
+            )
+
 
 # pylint: enable=unused-argument
-if __name__ == '__main__':
-    unittest.main()
diff --git a/scripts/ci/_utils.sh b/scripts/ci/_utils.sh
index 68778d8..2167229 100644
--- a/scripts/ci/_utils.sh
+++ b/scripts/ci/_utils.sh
@@ -134,7 +134,7 @@ function initialize_common_environment {
     # Add the right volume mount for sources, depending which mount strategy is used
     if [[ ${MOUNT_SOURCE_DIR_FOR_STATIC_CHECKS} == "true" ]]; then
         print_info
-        print_info "Mount whole airflow source directory for static checks (make sure all files are in container)"
+        print_info "Mount whole airflow source directory for static checks"
         print_info
         EXTRA_DOCKER_FLAGS=( \
           "-v" "${AIRFLOW_SOURCES}:/opt/airflow" \
@@ -208,12 +208,11 @@ function initialize_common_environment {
     # Default extras used for building CI image
     export DEFAULT_CI_EXTRAS="devel_ci"
 
-
     # Default extras used for building Production image. The master of this information is in the Dockerfile
     DEFAULT_PROD_EXTRAS=$(grep "ARG AIRFLOW_EXTRAS=" "${AIRFLOW_SOURCES}/Dockerfile"|
             awk 'BEGIN { FS="=" } { print $2 }' | tr -d '"')
 
-    # By default we build CI images  but when we specify `--producton-image` we switch to production image
+    # By default we build CI images  but when we specify `--production-image` we switch to production image
     export PRODUCTION_IMAGE="false"
 
     # The SQLlite URL used for sqlite runs
@@ -221,6 +220,16 @@ function initialize_common_environment {
 
     # Determines if airflow should be installed from a specified reference in GitHub
     export INSTALL_AIRFLOW_REFERENCE=""
+
+    # Version of Kubernetes to run
+    export KUBERNETES_VERSION="${KUBERNETES_VERSION:="v1.15.3"}"
+
+    # Name of the KinD cluster to connect to
+    export KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME:="airflow-python-${PYTHON_MAJOR_MINOR_VERSION}-${KUBERNETES_VERSION}"}
+
+    # Name of the KinD cluster to connect to when referred to via kubectl
+    export KUBECTL_CLUSTER_NAME=kind-${KIND_CLUSTER_NAME}
+
 }
 
 # Prints verbose information in case VERBOSE variable is set
@@ -267,6 +276,7 @@ function generate_local_mounts_list {
         "$prefix"setup.cfg:/opt/airflow/setup.cfg:cached
         "$prefix"setup.py:/opt/airflow/setup.py:cached
         "$prefix"tests:/opt/airflow/tests:cached
+        "$prefix"kubernetes_tests:/opt/airflow/kubernetes_tests:cached
         "$prefix"tmp:/opt/airflow/tmp:cached
     )
 }
@@ -526,7 +536,7 @@ function assert_not_in_container() {
     fi
 }
 
-# Removes the "Forced answer" (yes/no/quit) given previously, unles you specifically want to remember it.
+# Removes the "Forced answer" (yes/no/quit) given previously, unless you specifically want to remember it.
 #
 # This is the default behaviour of all rebuild scripts to ask independently whether you want to
 # rebuild the image or not. Sometimes however we want to remember answer previously given. For
@@ -711,7 +721,7 @@ function get_remote_image_info() {
     # delete container just in case
     verbose_docker rm --force "remote-airflow-manifest" >/dev/null 2>&1
     set -e
-    # Create container out of the manifest image without runnning it
+    # Create container out of the manifest image without running it
     verbose_docker create --name "remote-airflow-manifest" "${AIRFLOW_CI_REMOTE_MANIFEST_IMAGE}" >/dev/null 2>&1
     # Extract manifest and store it in local file
     verbose_docker cp "remote-airflow-manifest:${AIRFLOW_CI_BASE_TAG}.json" "${TMP_MANIFEST_REMOTE_JSON}" >/dev/null 2>&1
@@ -751,7 +761,7 @@ function compare_layers() {
     if (( NUM_DIFF >= MAGIC_CUT_OFF_NUMBER_OF_LAYERS )); then
         echo
         echo
-        echo "WARNING! Your image and the dockerhub image differ signifcantly"
+        echo "WARNING! Your image and the dockerhub image differ significantly"
         echo
         echo "Forcing pulling the images. It will be faster than rebuilding usually."
         echo "You can avoid it by setting SKIP_CHECK_REMOTE_IMAGE to true"
@@ -986,6 +996,8 @@ function get_ci_environment() {
     export CI_EVENT_TYPE="manual"
     export CI_TARGET_REPO="apache/airflow"
     export CI_TARGET_BRANCH="master"
+    export CI_SOURCE_REPO="apache/airflow"
+    export CI_SOURCE_BRANCH="master"
     export CI_BUILD_ID="default-build-id"
     export CI_JOB_ID="default-job-id"
     if [[ ${CI:=} != "true" ]]; then
@@ -994,29 +1006,50 @@ function get_ci_environment() {
         echo
     else
         if [[ ${TRAVIS:=} == "true" ]]; then
+            export CI_TARGET_REPO="${TRAVIS_REPO_SLUG}"
+            export CI_TARGET_BRANCH="${TRAVIS_BRANCH}"
+            export CI_BUILD_ID="${TRAVIS_BUILD_ID}"
+            export CI_JOB_ID="${TRAVIS_JOB_ID}"
             if [[ "${TRAVIS_PULL_REQUEST:=}" == "true" ]]; then
                 export CI_EVENT_TYPE="pull_request"
+                export CI_SOURCE_REPO="${TRAVIS_PULL_REQUEST_SLUG}"
+                export CI_SOURCE_BRANCH="${TRAVIS_PULL_REQUEST_BRANCH}"
             elif [[ "${TRAVIS_EVENT_TYPE:=}" == "cron" ]]; then
                 export CI_EVENT_TYPE="schedule"
             else
                 export CI_EVENT_TYPE="push"
             fi
-            export CI_TARGET_BRANCH="${TRAVIS_BRANCH}"
-            export CI_TARGET_REPO="${TRAVIS_REPO_SLUG}"
-            export CI_BUILD_ID="${TRAVIS_BUILD_ID}"
-            export CI_JOB_ID="${TRAVIS_JOB_ID}"
         elif [[ ${GITHUB_ACTIONS:=} == "true" ]]; then
+            export CI_TARGET_REPO="${GITHUB_REPOSITORY}"
+            export CI_TARGET_BRANCH="${GITHUB_BASE_REF}"
+            export CI_BUILD_ID="${GITHUB_RUN_ID}"
+            export CI_JOB_ID="${GITHUB_JOB}"
             if [[ ${GITHUB_EVENT_NAME:=} == "pull_request" ]]; then
                 export CI_EVENT_TYPE="pull_request"
+                # default name of the source repo (assuming it's forked without rename)
+                export SOURCE_AIRFLOW_REPO=${SOURCE_AIRFLOW_REPO:="airflow"}
+                # For Pull Requests it's ambiguous to find the PR and we need to
+                # assume that name of repo is airflow but it could be overridden in case it's not
+                export CI_SOURCE_REPO="${GITHUB_ACTOR}/${SOURCE_AIRFLOW_REPO}"
+                export CI_SOURCE_BRANCH="${GITHUB_HEAD_REF}"
+                BRANCH_EXISTS=$(git ls-remote --heads \
+                    "https://github.com/${CI_SOURCE_REPO}.git" "${CI_SOURCE_BRANCH}")
+                if [[ ${BRANCH_EXISTS} == "" ]]; then
+                    echo
+                    echo "https://github.com/${CI_SOURCE_REPO}.git Branch ${CI_SOURCE_BRANCH} does not exist"
+                    echo
+                    echo
+                    echo "Fallback to https://github.com/${CI_TARGET_REPO}.git Branch ${CI_TARGET_BRANCH}"
+                    echo
+                    # Fallback to the target repository if the repo does not exist
+                    export CI_SOURCE_REPO="${CI_TARGET_REPO}"
+                    export CI_SOURCE_BRANCH="${CI_TARGET_BRANCH}"
+                fi
             elif [[ ${GITHUB_EVENT_TYPE:=} == "schedule" ]]; then
                 export CI_EVENT_TYPE="schedule"
             else
                 export CI_EVENT_TYPE="push"
             fi
-            export CI_TARGET_REPO="${GITHUB_REPOSITORY}"
-            export CI_TARGET_BRANCH="${GITHUB_BASE_REF}"
-            export CI_BUILD_ID="${GITHUB_RUN_ID}"
-            export CI_JOB_ID="${GITHUB_JOB}"
         else
             echo
             echo "ERROR! Unknown CI environment. Exiting"
@@ -1029,6 +1062,8 @@ function get_ci_environment() {
     echo "CI_EVENT_TYPE=${CI_EVENT_TYPE}"
     echo "CI_TARGET_REPO=${CI_TARGET_REPO}"
     echo "CI_TARGET_BRANCH=${CI_TARGET_BRANCH}"
+    echo "CI_SOURCE_REPO=${CI_SOURCE_REPO}"
+    echo "CI_SOURCE_BRANCH=${CI_SOURCE_BRANCH}"
     echo "CI_BUILD_ID=${CI_BUILD_ID}"
     echo "CI_JOB_ID=${CI_JOB_ID}"
     echo
@@ -1386,6 +1421,7 @@ function build_prod_image() {
     fi
 }
 
+
 # Removes airflow CI and base images
 function remove_all_images() {
     echo
@@ -1558,6 +1594,11 @@ function prepare_prod_build() {
         export CACHED_AIRFLOW_PROD_BUILD_IMAGE=""
         export CACHED_PYTHON_BASE_IMAGE=""
     fi
+    export AIRFLOW_KUBERNETES_IMAGE=${AIRFLOW_PROD_IMAGE}-kubernetes
+    AIRFLOW_KUBERNETES_IMAGE_NAME=$(echo "${AIRFLOW_KUBERNETES_IMAGE}" | cut -f 1 -d ":")
+    export AIRFLOW_KUBERNETES_IMAGE_NAME
+    AIRFLOW_KUBERNETES_IMAGE_TAG=$(echo "${AIRFLOW_KUBERNETES_IMAGE}" | cut -f 2 -d ":")
+    export AIRFLOW_KUBERNETES_IMAGE_TAG
 
     if [[ "${INSTALL_AIRFLOW_REFERENCE:=}" != "" ]]; then
         # When --install-airflow-reference is used then the image is build from github tag
@@ -1666,3 +1707,437 @@ function set_mysql_encoding() {
 function get_airflow_version_from_production_image() {
      docker run --entrypoint /bin/bash "${AIRFLOW_PROD_IMAGE}" -c 'echo "${AIRFLOW_VERSION}"'
 }
+
+function dump_kind_logs() {
+    echo "###########################################################################################"
+    echo "                   Dumping logs from KIND"
+    echo "###########################################################################################"
+
+    FILE_NAME="${1}"
+    kind --name "${KIND_CLUSTER_NAME}" export logs "${FILE_NAME}"
+}
+
+
+function send_kubernetes_logs_to_file_io() {
+    echo "##############################################################################"
+    echo
+    echo "   DUMPING LOG FILES FROM KIND AND SENDING THEM TO file.io"
+    echo
+    echo "##############################################################################"
+    DUMP_DIR_NAME=$(date "+%Y-%m-%d")_kind_${CI_BUILD_ID:="default"}_${CI_JOB_ID:="default"}
+    DUMP_DIR=/tmp/${DUMP_DIR_NAME}
+    dump_kind_logs "${DUMP_DIR}"
+    tar -cvzf "${DUMP_DIR}.tar.gz" -C /tmp "${DUMP_DIR_NAME}"
+    echo
+    echo "   Logs saved to ${DUMP_DIR}.tar.gz"
+    echo
+    echo "##############################################################################"
+    curl -F "file=@${DUMP_DIR}.tar.gz" https://file.io
+}
+
+function check_kind_and_kubectl_are_installed() {
+    SYSTEM=$(uname -s| tr '[:upper:]' '[:lower:]')
+    KIND_VERSION="v0.7.0"
+    KIND_URL="https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-${SYSTEM}-amd64"
+    KIND_PATH="${BUILD_CACHE_DIR}/bin/kind"
+    KUBECTL_VERSION="v1.15.3"
+    KUBECTL_URL="https://storage.googleapis.com/kubernetes-release/release/${KUBECTL_VERSION}/bin/${SYSTEM}/amd64/kubectl"
+    KUBECTL_PATH="${BUILD_CACHE_DIR}/bin/kubectl"
+    mkdir -pv "${BUILD_CACHE_DIR}/bin"
+    if [[ ! -f "${KIND_PATH}" ]]; then
+        echo
+        echo "Downloading Kind version ${KIND_VERSION}"
+        echo
+        curl --fail --location "${KIND_URL}" --output "${KIND_PATH}"
+        chmod +x "${KIND_PATH}"
+    fi
+    if [[ ! -f "${KUBECTL_PATH}" ]]; then
+        echo
+        echo "Downloading Kubectl version ${KUBECTL_VERSION}"
+        echo
+        curl --fail --location "${KUBECTL_URL}" --output "${KUBECTL_PATH}"
+        chmod +x "${KUBECTL_PATH}"
+    fi
+    PATH=${PATH}:${BUILD_CACHE_DIR}/bin
+}
+
+function create_cluster() {
+    if [[ "${TRAVIS:="false"}" == "true" ]]; then
+        # Travis CI does not handle the nice output of Kind well, so we need to capture it
+        # And display only if kind fails to start
+        start_output_heartbeat "Creating kubernetes cluster" 10
+        set +e
+        if ! OUTPUT=$(kind create cluster \
+                        --name "${KIND_CLUSTER_NAME}" \
+                        --config "${AIRFLOW_SOURCES}/scripts/ci/kubernetes/kind-cluster-conf.yaml" \
+                        --image "kindest/node:${KUBERNETES_VERSION}" 2>&1); then
+            echo "${OUTPUT}"
+        fi
+        stop_output_heartbeat
+    else
+        kind create cluster \
+            --name "${KIND_CLUSTER_NAME}" \
+            --config "${AIRFLOW_SOURCES}/scripts/ci/kubernetes/kind-cluster-conf.yaml" \
+            --image "kindest/node:${KUBERNETES_VERSION}"
+    fi
+    echo
+    echo "Created cluster ${KIND_CLUSTER_NAME}"
+    echo
+
+}
+
+function delete_cluster() {
+    kind delete cluster --name "${KIND_CLUSTER_NAME}"
+    echo
+    echo "Deleted cluster ${KIND_CLUSTER_NAME}"
+    echo
+    rm -rf "${HOME}/.kube/*"
+}
+
+function perform_kind_cluster_operation() {
+    OPERATION="${1}"
+    ALL_CLUSTERS=$(kind get clusters || true)
+
+    echo
+    echo "Kubernetes mode: ${KUBERNETES_MODE}"
+    echo
+
+    if [[ ${OPERATION} == "status" ]]; then
+        if [[ ${ALL_CLUSTERS} == *"${KIND_CLUSTER_NAME}"* ]]; then
+            echo
+            echo "Cluster name: ${KIND_CLUSTER_NAME}"
+            echo
+            kind get nodes --name "${KIND_CLUSTER_NAME}"
+            echo
+            exit
+        else
+            echo
+            echo "Cluster ${KIND_CLUSTER_NAME} is not running"
+            echo
+            exit
+        fi
+    fi
+    if [[ ${ALL_CLUSTERS} == *"${KIND_CLUSTER_NAME}"* ]]; then
+        if [[ ${OPERATION} == "start" ]]; then
+            echo
+            echo "Cluster ${KIND_CLUSTER_NAME} is already created"
+            echo "Reusing previously created cluster"
+            echo
+        elif [[ ${OPERATION} == "restart" ]]; then
+            echo
+            echo "Recreating cluster"
+            echo
+            delete_cluster
+            create_cluster
+        elif [[ ${OPERATION} == "stop" ]]; then
+            echo
+            echo "Deleting cluster"
+            echo
+            delete_cluster
+            exit
+        elif [[ ${OPERATION} == "deploy" ]]; then
+            echo
+            echo "Deploying Airflow to KinD"
+            echo
+            get_ci_environment
+            check_kind_and_kubectl_are_installed
+            build_kubernetes_image
+            load_image_to_kind_cluster
+            prepare_kubernetes_app_variables
+            prepare_kubernetes_resources
+            apply_kubernetes_resources
+            wait_for_airflow_pods_up_and_running
+            wait_for_airflow_webserver_up_and_running
+        elif [[ ${OPERATION} == "test" ]]; then
+            echo
+            echo "Testing with kind to KinD"
+            echo
+            "${AIRFLOW_SOURCES}/scripts/ci/ci_run_kubernetes_tests.sh"
+        else
+            echo
+            echo "Wrong cluster operation: ${OPERATION}. Should be one of:"
+            echo "${FORMATTED_KIND_OPERATIONS}"
+            echo
+            exit 1
+        fi
+    else
+        if [[ ${OPERATION} == "start" ]]; then
+            echo
+            echo "Creating cluster"
+            echo
+            create_cluster
+        elif [[ ${OPERATION} == "recreate" ]]; then
+            echo
+            echo "Cluster ${KIND_CLUSTER_NAME} does not exist. Creating rather than recreating"
+            echo "Creating cluster"
+            echo
+            create_cluster
+        elif [[ ${OPERATION} == "stop" || ${OEPRATON} == "deploy" || ${OPERATION} == "test" ]]; then
+            echo
+            echo "Cluster ${KIND_CLUSTER_NAME} does not exist. It should exist for ${OPERATION} operation"
+            echo
+            exit 1
+        else
+            echo
+            echo "Wrong cluster operation: ${OPERATION}. Should be one of:"
+            echo "${FORMATTED_KIND_OPERATIONS}"
+            echo
+            exit 1
+        fi
+    fi
+}
+
+function check_cluster_ready_for_airflow() {
+    kubectl cluster-info --cluster "${KUBECTL_CLUSTER_NAME}"
+    kubectl get nodes --cluster "${KUBECTL_CLUSTER_NAME}"
+    echo
+    echo "Showing storageClass"
+    echo
+    kubectl get storageclass --cluster "${KUBECTL_CLUSTER_NAME}"
+    echo
+    echo "Showing kube-system pods"
+    echo
+    kubectl get -n kube-system pods --cluster "${KUBECTL_CLUSTER_NAME}"
+    echo
+    echo "Airflow environment on kubernetes is good to go!"
+    echo
+    kubectl create namespace test-namespace --cluster "${KUBECTL_CLUSTER_NAME}"
+}
+
+
+function build_kubernetes_image() {
+    cd "${AIRFLOW_SOURCES}" || exit 1
+    prepare_prod_build
+    if [[ $(docker images -q "${AIRFLOW_PROD_IMAGE}") == "" ||
+            ${FORCE_BUILD_IMAGES:="false"} == "true" ]]; then
+        build_prod_image
+    else
+        echo
+        echo "Skip rebuilding prod image. Use --force-build-images to rebuild prod image."
+        echo
+    fi
+    echo
+    echo "Adding kubernetes-specific scripts to prod image."
+    echo "Building ${AIRFLOW_KUBERNETES_IMAGE} from ${AIRFLOW_PROD_IMAGE} with latest sources."
+    echo
+    docker build \
+        --build-arg AIRFLOW_PROD_IMAGE="${AIRFLOW_PROD_IMAGE}" \
+        --cache-from "${AIRFLOW_PROD_IMAGE}" \
+        --tag="${AIRFLOW_KUBERNETES_IMAGE}" \
+        -f- . << 'EOF'
+    ARG AIRFLOW_PROD_IMAGE
+    FROM ${AIRFLOW_PROD_IMAGE}
+
+    ARG AIRFLOW_SOURCES=/home/airflow/airflow_sources/
+    ENV AIRFLOW_SOURCES=${AIRFLOW_SOURCES}
+
+    USER root
+
+    COPY --chown=airflow:airflow . ${AIRFLOW_SOURCES}
+
+    COPY scripts/ci/kubernetes/docker/airflow-test-env-init-db.sh /tmp/airflow-test-env-init-db.sh
+    COPY scripts/ci/kubernetes/docker/airflow-test-env-init-dags.sh /tmp/airflow-test-env-init-dags.sh
+    COPY scripts/ci/kubernetes/docker/bootstrap.sh /bootstrap.sh
+
+    RUN chmod +x /bootstrap.sh
+
+
+    USER airflow
+
+
+    ENTRYPOINT ["/bootstrap.sh"]
+EOF
+
+    echo "The ${AIRFLOW_KUBERNETES_IMAGE} is prepared for deployment."
+}
+
+function load_image_to_kind_cluster() {
+    echo
+    echo "Loading ${AIRFLOW_KUBERNETES_IMAGE} to ${KIND_CLUSTER_NAME}"
+    echo
+    kind load docker-image --name "${KIND_CLUSTER_NAME}" "${AIRFLOW_KUBERNETES_IMAGE}"
+}
+
+function prepare_kubernetes_app_variables() {
+    echo
+    echo "Preparing kubernetes variables"
+    echo
+    KUBERNETES_APP_DIR="${AIRFLOW_SOURCES}/scripts/ci/kubernetes/app"
+    TEMPLATE_DIRNAME="${KUBERNETES_APP_DIR}/templates"
+    BUILD_DIRNAME="${KUBERNETES_APP_DIR}/build"
+
+    # shellcheck source=common/_image_variables.sh
+    . "${AIRFLOW_SOURCES}/common/_image_variables.sh"
+
+    # Source branch will be set in DockerHub
+    SOURCE_BRANCH=${SOURCE_BRANCH:=${DEFAULT_BRANCH}}
+    BRANCH_NAME=${BRANCH_NAME:=${SOURCE_BRANCH}}
+
+    if [[ ! -d "${BUILD_DIRNAME}" ]]; then
+        mkdir -p "${BUILD_DIRNAME}"
+    fi
+
+    rm -f "${BUILD_DIRNAME}"/*
+    rm -f "${BUILD_DIRNAME}"/*
+
+    if [[ "${KUBERNETES_MODE}" == "image" ]]; then
+        INIT_DAGS_VOLUME_NAME=airflow-dags
+        POD_AIRFLOW_DAGS_VOLUME_NAME=airflow-dags
+        CONFIGMAP_DAGS_FOLDER=/opt/airflow/dags
+        CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT=
+        CONFIGMAP_DAGS_VOLUME_CLAIM=airflow-dags
+    else
+        INIT_DAGS_VOLUME_NAME=airflow-dags-fake
+        POD_AIRFLOW_DAGS_VOLUME_NAME=airflow-dags-git
+        CONFIGMAP_DAGS_FOLDER=/opt/airflow/dags/repo/airflow/example_dags
+        CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT=/opt/airflow/dags
+        CONFIGMAP_DAGS_VOLUME_CLAIM=
+    fi
+
+
+    CONFIGMAP_GIT_REPO=${CI_SOURCE_REPO}
+    CONFIGMAP_BRANCH=${CI_SOURCE_BRANCH}
+}
+
+function prepare_kubernetes_resources() {
+    echo
+    echo "Preparing kubernetes resources"
+    echo
+    if [[ "${KUBERNETES_MODE}" == "image" ]]; then
+        sed -e "s/{{INIT_GIT_SYNC}}//g" \
+            "${TEMPLATE_DIRNAME}/airflow.template.yaml" >"${BUILD_DIRNAME}/airflow.yaml"
+    else
+        sed -e "/{{INIT_GIT_SYNC}}/{r ${TEMPLATE_DIRNAME}/init_git_sync.template.yaml" -e 'd}' \
+            "${TEMPLATE_DIRNAME}/airflow.template.yaml" >"${BUILD_DIRNAME}/airflow.yaml"
+    fi
+    sed -i "s|{{AIRFLOW_KUBERNETES_IMAGE}}|${AIRFLOW_KUBERNETES_IMAGE}|g" "${BUILD_DIRNAME}/airflow.yaml"
+
+    sed -i "s|{{CONFIGMAP_GIT_REPO}}|${CONFIGMAP_GIT_REPO}|g" "${BUILD_DIRNAME}/airflow.yaml"
+    sed -i "s|{{CONFIGMAP_BRANCH}}|${CONFIGMAP_BRANCH}|g" "${BUILD_DIRNAME}/airflow.yaml"
+    sed -i "s|{{INIT_DAGS_VOLUME_NAME}}|${INIT_DAGS_VOLUME_NAME}|g" "${BUILD_DIRNAME}/airflow.yaml"
+    sed -i "s|{{POD_AIRFLOW_DAGS_VOLUME_NAME}}|${POD_AIRFLOW_DAGS_VOLUME_NAME}|g" \
+        "${BUILD_DIRNAME}/airflow.yaml"
+
+    sed "s|{{CONFIGMAP_DAGS_FOLDER}}|${CONFIGMAP_DAGS_FOLDER}|g" \
+        "${TEMPLATE_DIRNAME}/configmaps.template.yaml" >"${BUILD_DIRNAME}/configmaps.yaml"
+    sed -i "s|{{CONFIGMAP_GIT_REPO}}|${CONFIGMAP_GIT_REPO}|g" "${BUILD_DIRNAME}/configmaps.yaml"
+    sed -i "s|{{CONFIGMAP_BRANCH}}|${CONFIGMAP_BRANCH}|g" "${BUILD_DIRNAME}/configmaps.yaml"
+    sed -i "s|{{CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT}}|${CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT}|g" \
+        "${BUILD_DIRNAME}/configmaps.yaml"
+    sed -i "s|{{CONFIGMAP_DAGS_VOLUME_CLAIM}}|${CONFIGMAP_DAGS_VOLUME_CLAIM}|g" \
+        "${BUILD_DIRNAME}/configmaps.yaml"
+    sed -i "s|{{AIRFLOW_KUBERNETES_IMAGE_NAME}}|${AIRFLOW_KUBERNETES_IMAGE_NAME}|g" \
+        "${BUILD_DIRNAME}/configmaps.yaml"
+    sed -i "s|{{AIRFLOW_KUBERNETES_IMAGE_TAG}}|${AIRFLOW_KUBERNETES_IMAGE_TAG}|g" \
+        "${BUILD_DIRNAME}/configmaps.yaml"
+}
+
+function apply_kubernetes_resources() {
+    echo
+    echo "Apply kubernetes resources."
+    echo
+
+
+    kubectl delete -f "${KUBERNETES_APP_DIR}/postgres.yaml" --cluster "${KUBECTL_CLUSTER_NAME}" \
+        2>&1 | grep -v "NotFound" || true
+    kubectl delete -f "${BUILD_DIRNAME}/airflow.yaml" --cluster "${KUBECTL_CLUSTER_NAME}" \
+        2>&1 | grep -v "NotFound" || true
+    kubectl delete -f "${KUBERNETES_APP_DIR}/secrets.yaml" --cluster "${KUBECTL_CLUSTER_NAME}" \
+        2>&1 | grep -v "NotFound" || true
+
+    set -e
+
+    kubectl apply -f "${KUBERNETES_APP_DIR}/secrets.yaml" --cluster "${KUBECTL_CLUSTER_NAME}"
+    kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml" --cluster "${KUBECTL_CLUSTER_NAME}"
+    kubectl apply -f "${KUBERNETES_APP_DIR}/postgres.yaml" --cluster "${KUBECTL_CLUSTER_NAME}"
+    kubectl apply -f "${KUBERNETES_APP_DIR}/volumes.yaml" --cluster "${KUBECTL_CLUSTER_NAME}"
+    kubectl apply -f "${BUILD_DIRNAME}/airflow.yaml" --cluster "${KUBECTL_CLUSTER_NAME}"
+}
+
+
+function dump_kubernetes_logs() {
+    POD=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' \
+        --cluster "${KUBECTL_CLUSTER_NAME}" | grep airflow | head -1)
+    echo "------- pod description -------"
+    kubectl describe pod "${POD}" --cluster "${KUBECTL_CLUSTER_NAME}"
+    echo "------- webserver init container logs - init -------"
+    kubectl logs "${POD}" -c init --cluster "${KUBECTL_CLUSTER_NAME}" || true
+    if [[ "${KUBERNETES_MODE}" == "git" ]]; then
+        echo "------- webserver init container logs - git-sync-clone -------"
+        kubectl logs "${POD}" -c git-sync-clone --cluster "${KUBECTL_CLUSTER_NAME}" || true
+    fi
+    echo "------- webserver logs -------"
+    kubectl logs "${POD}" -c webserver --cluster "${KUBECTL_CLUSTER_NAME}" || true
+    echo "------- scheduler logs -------"
+    kubectl logs "${POD}" -c scheduler --cluster "${KUBECTL_CLUSTER_NAME}" || true
+    echo "--------------"
+}
+
+function wait_for_airflow_pods_up_and_running() {
+    set +o pipefail
+    # wait for up to 10 minutes for everything to be deployed
+    PODS_ARE_READY="0"
+    for i in {1..150}; do
+        echo "------- Running kubectl get pods: $i -------"
+        PODS=$(kubectl get pods --cluster "${KUBECTL_CLUSTER_NAME}" | awk 'NR>1 {print $0}')
+        echo "$PODS"
+        NUM_AIRFLOW_READY=$(echo "${PODS}" | grep airflow | awk '{print $2}' | grep -cE '([0-9])\/(\1)' \
+            | xargs)
+        NUM_POSTGRES_READY=$(echo "${PODS}" | grep postgres | awk '{print $2}' | grep -cE '([0-9])\/(\1)' \
+            | xargs)
+        if [[ "${NUM_AIRFLOW_READY}" == "1" && "${NUM_POSTGRES_READY}" == "1" ]]; then
+            PODS_ARE_READY="1"
+            break
+        fi
+        sleep 4
+    done
+    POD=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' \
+        --cluster "${KUBECTL_CLUSTER_NAME}" | grep airflow | head -1)
+
+    if [[ "${PODS_ARE_READY}" == "1" ]]; then
+        echo "PODS are ready."
+        set -o pipefail
+    else
+        echo >&2 "PODS are not ready after waiting for a long time. Exiting..."
+        dump_kubernetes_logs
+        exit 1
+    fi
+}
+
+
+function wait_for_airflow_webserver_up_and_running() {
+    set +o pipefail
+    # Wait until Airflow webserver is up
+    KUBERNETES_HOST=localhost
+    AIRFLOW_WEBSERVER_IS_READY="0"
+    CONSECUTIVE_SUCCESS_CALLS=0
+    for i in {1..30}; do
+        echo "------- Wait until webserver is up: $i -------"
+        PODS=$(kubectl get pods --cluster "${KUBECTL_CLUSTER_NAME}" | awk 'NR>1 {print $0}')
+        echo "$PODS"
+        HTTP_CODE=$(curl -LI "http://${KUBERNETES_HOST}:30809/health" -o /dev/null -w '%{http_code}\n' -sS) \
+            || true
+        if [[ "${HTTP_CODE}" == 200 ]]; then
+            ((CONSECUTIVE_SUCCESS_CALLS += 1))
+        else
+            CONSECUTIVE_SUCCESS_CALLS="0"
+        fi
+        if [[ "${CONSECUTIVE_SUCCESS_CALLS}" == 3 ]]; then
+            AIRFLOW_WEBSERVER_IS_READY="1"
+            break
+        fi
+        sleep 10
+    done
+    set -o pipefail
+    if [[ "${AIRFLOW_WEBSERVER_IS_READY}" == "1" ]]; then
+        echo
+        echo "Airflow webserver is ready."
+        echo
+    else
+        echo >&2
+        echo >&2 "Airflow webserver is not ready after waiting for a long time. Exiting..."
+        echo >&2
+        dump_kubernetes_logs
+        exit 1
+    fi
+}
diff --git a/scripts/ci/in_container/kubernetes/docker/bootstrap.sh b/scripts/ci/ci_deploy_app_to_kubernetes.sh
similarity index 53%
copy from scripts/ci/in_container/kubernetes/docker/bootstrap.sh
copy to scripts/ci/ci_deploy_app_to_kubernetes.sh
index 0a9f34d..6061c1c 100755
--- a/scripts/ci/in_container/kubernetes/docker/bootstrap.sh
+++ b/scripts/ci/ci_deploy_app_to_kubernetes.sh
@@ -15,13 +15,22 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# shellcheck source=scripts/ci/_script_init.sh
+. "$( dirname "${BASH_SOURCE[0]}" )/_script_init.sh"
 
-if [[ "$1" = "webserver" ]]
-then
-    exec airflow webserver
-fi
+set -euo pipefail
 
-if [[ "$1" = "scheduler" ]]
-then
-    exec airflow scheduler
-fi
+export KUBERNETES_VERSION=${KUBERNETES_VERSION:="v1.15.3"}
+export PYTHON_MAJOR_MINOR_VERSION=${PYTHON_MAJOR_MINOR_VERSION:="3.6"}
+export KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME:="airflow-python-${PYTHON_MAJOR_MINOR_VERSION}-${KUBERNETES_VERSION}"}
+export KUBERNETES_MODE=${KUBERNETES_MODE:="image"}
+
+get_ci_environment
+check_kind_and_kubectl_are_installed
+build_kubernetes_image
+load_image_to_kind_cluster
+prepare_kubernetes_app_variables
+prepare_kubernetes_resources
+apply_kubernetes_resources
+wait_for_airflow_pods_up_and_running
+wait_for_airflow_webserver_up_and_running
diff --git a/scripts/ci/in_container/run_ci_tests.sh b/scripts/ci/ci_load_image_to_kind.sh
similarity index 53%
copy from scripts/ci/in_container/run_ci_tests.sh
copy to scripts/ci/ci_load_image_to_kind.sh
index b046c08..2fc2570 100755
--- a/scripts/ci/in_container/run_ci_tests.sh
+++ b/scripts/ci/ci_load_image_to_kind.sh
@@ -15,34 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# shellcheck source=scripts/ci/in_container/_in_container_script_init.sh
-. "$( dirname "${BASH_SOURCE[0]}" )/_in_container_script_init.sh"
+# shellcheck source=scripts/ci/_script_init.sh
+. "$( dirname "${BASH_SOURCE[0]}" )/_script_init.sh"
 
-# any argument received is overriding the default nose execution arguments:
-PYTEST_ARGS=( "$@" )
+cd "${AIRFLOW_SOURCES}" || exit 1
 
+export KUBERNETES_VERSION=${KUBERNETES_VERSION:="v1.15.3"}
+export PYTHON_MAJOR_MINOR_VERSION=${PYTHON_MAJOR_MINOR_VERSION:="3.6"}
+export KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME:="airflow-python-${PYTHON_MAJOR_MINOR_VERSION}-${KUBERNETES_VERSION}"}
+
+prepare_prod_build
 echo
-echo "Starting the tests with those pytest arguments: ${PYTEST_ARGS[*]}"
+echo "Loading the ${AIRFLOW_KUBERNETES_IMAGE} to cluster ${KIND_CLUSTER_NAME} from docker"
+echo
+"${AIRFLOW_SOURCES}/.build/bin/kind" load docker-image --name "${KIND_CLUSTER_NAME}" "${AIRFLOW_KUBERNETES_IMAGE}"
+echo
+echo "Loaded the ${AIRFLOW_KUBERNETES_IMAGE} to cluster ${KIND_CLUSTER_NAME}"
 echo
-set +e
-
-pytest "${PYTEST_ARGS[@]}"
-
-RES=$?
-
-set +x
-if [[ "${RES}" == "0" && ${CI:="false"} == "true" ]]; then
-    echo "All tests successful"
-    bash <(curl -s https://codecov.io/bash)
-fi
-
-if [[ ${CI} == "true" ]]; then
-    send_docker_logs_to_file_io
-    send_airflow_logs_to_file_io
-fi
-
-if [[ ${CI} == "true" && ${ENABLE_KIND_CLUSTER} == "true" ]]; then
-    send_kubernetes_logs_to_file_io
-fi
-
-exit "${RES}"
diff --git a/scripts/ci/in_container/kubernetes/docker/bootstrap.sh b/scripts/ci/ci_perform_kind_cluster_operation.sh
similarity index 79%
rename from scripts/ci/in_container/kubernetes/docker/bootstrap.sh
rename to scripts/ci/ci_perform_kind_cluster_operation.sh
index 0a9f34d..7963344 100755
--- a/scripts/ci/in_container/kubernetes/docker/bootstrap.sh
+++ b/scripts/ci/ci_perform_kind_cluster_operation.sh
@@ -16,12 +16,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
-if [[ "$1" = "webserver" ]]
-then
-    exec airflow webserver
-fi
+# shellcheck source=scripts/ci/_script_init.sh
+. "$( dirname "${BASH_SOURCE[0]}" )/_script_init.sh"
 
-if [[ "$1" = "scheduler" ]]
-then
-    exec airflow scheduler
-fi
+check_kind_and_kubectl_are_installed
+
+perform_kind_cluster_operation "${@}"
+
+check_cluster_ready_for_airflow
diff --git a/scripts/ci/ci_run_airflow_testing.sh b/scripts/ci/ci_run_airflow_testing.sh
index e3b23ab..f8d5e7e 100755
--- a/scripts/ci/ci_run_airflow_testing.sh
+++ b/scripts/ci/ci_run_airflow_testing.sh
@@ -18,7 +18,7 @@
 export VERBOSE=${VERBOSE:="false"}
 
 function run_airflow_testing_in_docker_with_kubernetes() {
-    export KUBERNETES_MODE=${KUBERNETES_MODE:="git_mode"}
+    export KUBERNETES_MODE=${KUBERNETES_MODE:="image"}
     export KUBERNETES_VERSION=${KUBERNETES_VERSION:="v1.15.3"}
 
     # shellcheck disable=SC2016
@@ -151,3 +151,34 @@ do
     exit ${EXIT_CODE}
 done
 set -u
+
+#TODO FIXME
+if [[ "${INTEGRATIONS[*]}" == *"kubernetes"* ]]; then
+    export KUBERNETES_MODE=${KUBERNETES_MODE:="image"}
+    export KUBERNETES_VERSION=${KUBERNETES_VERSION:="v1.15.3"}
+    export KIND_CLUSTER_NAME="airflow-python-${PYTHON_MAJOR_MINOR_VERSION}-${KUBERNETES_VERSION}"
+
+    if [[ ${ENABLE_KIND_CLUSTER} == "true" ]]; then
+        install_kind_in_host
+        "${MY_DIR}/setup_kind_cluster_in_host.sh"
+    fi
+
+    # adding trap to exiting trap
+    HANDLERS="$( trap -p EXIT | cut -f2 -d \' )"
+    # shellcheck disable=SC2064
+    trap "${HANDLERS}${HANDLERS:+;}send_kubernetes_logs_to_file_io" EXIT
+fi
+
+set +u
+# shellcheck disable=SC2016
+docker-compose --log-level INFO \
+  -f "${MY_DIR}/docker-compose/base.yml" \
+  -f "${MY_DIR}/docker-compose/backend-${BACKEND}.yml" \
+  "${INTEGRATIONS[@]}" \
+  "${DOCKER_COMPOSE_LOCAL[@]}" \
+     run airflow \
+       '/opt/airflow/scripts/ci/in_container/entrypoint_ci.sh "${@}"' \
+       /opt/airflow/scripts/ci/in_container/entrypoint_ci.sh "${@}"
+     # Note the command is there twice (!) because it is passed via bash -c
+     # and bash -c starts passing parameters from $0. TODO: fixme
+set -u
diff --git a/scripts/ci/ci_run_kubernetes_tests.sh b/scripts/ci/ci_run_kubernetes_tests.sh
new file mode 100755
index 0000000..c530112
--- /dev/null
+++ b/scripts/ci/ci_run_kubernetes_tests.sh
@@ -0,0 +1,105 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# shellcheck source=scripts/ci/_script_init.sh
+. "$( dirname "${BASH_SOURCE[0]}" )/_script_init.sh"
+
+INTERACTIVE="false"
+
+declare -a TESTS
+declare -a PYTEST_ARGS
+
+TESTS=()
+
+if [[ $# != 0 ]]; then
+    if [[ $1 == "--help" || $1 == "-h" ]]; then
+        echo
+        echo "Running kubernetes tests"
+        echo
+        echo "    $0                      - runs all kubernetes tests"
+        echo "    $0 TEST [TEST ...]      - runs selected kubernetes tests (from kubernetes_tests folder)"
+        echo "    $0 [-i|--interactive]   - Activates virtual environment ready to run tests and drops you in"
+        echo "    $0 [--help]             - Prints this help message"
+        echo
+        exit
+    elif [[ $1 == "--interactive" || $1 == "-i" ]]; then
+        echo
+        echo "Entering interactive environment for kubernetes testing"
+        echo
+        INTERACTIVE="true"
+    else
+        TESTS=("${@}")
+    fi
+    PYTEST_ARGS=(
+        "--pythonwarnings=ignore::DeprecationWarning"
+        "--pythonwarnings=ignore::PendingDeprecationWarning"
+    )
+else
+    TESTS=("kubernetes_tests")
+    PYTEST_ARGS=(
+        "--verbosity=1"
+        "--strict-markers"
+        "--durations=100"
+        "--cov=airflow/"
+        "--cov-config=.coveragerc"
+        "--cov-report=html:airflow/www/static/coverage/"
+        "--color=yes"
+        "--maxfail=50"
+        "--pythonwarnings=ignore::DeprecationWarning"
+        "--pythonwarnings=ignore::PendingDeprecationWarning"
+        )
+
+fi
+
+get_ci_environment
+
+cd "${AIRFLOW_SOURCES}" || exit 1
+
+VIRTUALENV_PATH="${BUILD_CACHE_DIR}/.kubernetes_venv"
+
+if [[ ! -d ${VIRTUALENV_PATH} ]]; then
+    echo
+    echo "Creating virtualenv at ${VIRTUALENV_PATH}"
+    echo
+    python -m venv "${VIRTUALENV_PATH}"
+fi
+
+. "${VIRTUALENV_PATH}/bin/activate"
+
+pip install pytest freezegun pytest-cov \
+    --constraint "requirements/requirements-python${PYTHON_MAJOR_MINOR_VERSION}.txt"
+
+pip install -e ".[kubernetes]" \
+    --constraint "requirements/requirements-python${PYTHON_MAJOR_MINOR_VERSION}.txt"
+
+if [[ ${INTERACTIVE} == "true" ]]; then
+    echo
+    echo "Activating the virtual environment for kubernetes testing"
+    echo
+    echo "You can run kubernetes testing via 'pytest kubernetes_tests/....'"
+    echo "You can add -s to see the output of your tests on screen"
+    echo
+    echo "The webserver is available at http://localhost:30809/"
+    echo
+    echo "User/password: airflow/airflow"
+    echo
+    echo "You are entering the virtualenv now. Type exit to exit back to the original shell"
+    echo
+    exec "${SHELL}"
+else
+    pytest "${PYTEST_ARGS[@]}" "${TESTS[@]}"
+fi
diff --git a/scripts/ci/docker-compose/base.yml b/scripts/ci/docker-compose/base.yml
index 2505784..34e31ff 100644
--- a/scripts/ci/docker-compose/base.yml
+++ b/scripts/ci/docker-compose/base.yml
@@ -41,7 +41,6 @@ services:
       - VERBOSE
       - VEROSE_COMMANDS
       - AIRFLOW_CI_IMAGE
-      - RUNTIME
       - ENABLE_KIND_CLUSTER
       - ENABLED_INTEGRATIONS
       - RUN_INTEGRATION_TESTS
diff --git a/scripts/ci/docker-compose/local.yml b/scripts/ci/docker-compose/local.yml
index d79b1aa..2991de1 100644
--- a/scripts/ci/docker-compose/local.yml
+++ b/scripts/ci/docker-compose/local.yml
@@ -55,6 +55,7 @@ services:
       - ../../../setup.cfg:/opt/airflow/setup.cfg:cached
       - ../../../setup.py:/opt/airflow/setup.py:cached
       - ../../../tests:/opt/airflow/tests:cached
+      - ../../../kubernetes_tests:/opt/airflow/kubernetes_tests:cached
       - ../../../tmp:/opt/airflow/tmp:cached
       # END automatically generated volumes from LOCAL_MOUNTS in _utils.sh
     environment:
diff --git a/scripts/ci/in_container/_in_container_utils.sh b/scripts/ci/in_container/_in_container_utils.sh
index 9b9814a..67cecfd 100644
--- a/scripts/ci/in_container/_in_container_utils.sh
+++ b/scripts/ci/in_container/_in_container_utils.sh
@@ -232,34 +232,6 @@ function send_airflow_logs_to_file_io() {
     curl -F "file=@${DUMP_FILE}" https://file.io
 }
 
-
-function dump_kind_logs() {
-    echo "###########################################################################################"
-    echo "                   Dumping logs from KIND"
-    echo "###########################################################################################"
-
-    FILE_NAME="${1}"
-    kind --name "${CLUSTER_NAME}" export logs "${FILE_NAME}"
-}
-
-
-function send_kubernetes_logs_to_file_io() {
-    echo "##############################################################################"
-    echo
-    echo "   DUMPING LOG FILES FROM KIND AND SENDING THEM TO file.io"
-    echo
-    echo "##############################################################################"
-    DUMP_DIR_NAME=$(date "+%Y-%m-%d")_kind_${CI_BUILD_ID:="default"}_${CI_JOB_ID:="default"}
-    DUMP_DIR=/tmp/${DUMP_DIR_NAME}
-    dump_kind_logs "${DUMP_DIR}"
-    tar -cvzf "${DUMP_DIR}.tar.gz" -C /tmp "${DUMP_DIR_NAME}"
-    echo
-    echo "   Logs saved to ${DUMP_DIR}.tar.gz"
-    echo
-    echo "##############################################################################"
-    curl -F "file=@${DUMP_DIR}.tar.gz" https://file.io
-}
-
 function install_released_airflow_version() {
     pip uninstall -y apache-airflow || true
     find /root/airflow/ -type f -print0 | xargs -0 rm -f --
diff --git a/scripts/ci/in_container/entrypoint_ci.sh b/scripts/ci/in_container/entrypoint_ci.sh
index 7a2ab15..111e7ca 100755
--- a/scripts/ci/in_container/entrypoint_ci.sh
+++ b/scripts/ci/in_container/entrypoint_ci.sh
@@ -29,7 +29,6 @@ BACKEND=${BACKEND:=sqlite}
 KUBERNETES_MODE=${KUBERNETES_MODE:=""}
 KUBERNETES_VERSION=${KUBERNETES_VERSION:=""}
 ENABLE_KIND_CLUSTER=${ENABLE_KIND_CLUSTER:="false"}
-RUNTIME=${RUNTIME:=""}
 
 export AIRFLOW_HOME=${AIRFLOW_HOME:=${HOME}}
 
@@ -143,54 +142,31 @@ if [[ ${INTEGRATION_KERBEROS:="false"} == "true" ]]; then
 fi
 
 
-if [[ "${RUNTIME}" == "" ]]; then
-    # Start MiniCluster
-    java -cp "/opt/minicluster-1.1-SNAPSHOT/*" com.ing.minicluster.MiniCluster \
-        >"${AIRFLOW_HOME}/logs/minicluster.log" 2>&1 &
+# Start MiniCluster
+java -cp "/opt/minicluster-1.1-SNAPSHOT/*" com.ing.minicluster.MiniCluster \
+    >"${AIRFLOW_HOME}/logs/minicluster.log" 2>&1 &
 
-    # Set up ssh keys
-    echo 'yes' | ssh-keygen -t rsa -C your_email@youremail.com -m PEM -P '' -f ~/.ssh/id_rsa \
-        >"${AIRFLOW_HOME}/logs/ssh-keygen.log" 2>&1
+# Set up ssh keys
+echo 'yes' | ssh-keygen -t rsa -C your_email@youremail.com -m PEM -P '' -f ~/.ssh/id_rsa \
+    >"${AIRFLOW_HOME}/logs/ssh-keygen.log" 2>&1
 
-    cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
-    ln -s -f ~/.ssh/authorized_keys ~/.ssh/authorized_keys2
-    chmod 600 ~/.ssh/*
+cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
+ln -s -f ~/.ssh/authorized_keys ~/.ssh/authorized_keys2
+chmod 600 ~/.ssh/*
 
-    # SSH Service
-    sudo service ssh restart >/dev/null 2>&1
+# SSH Service
+sudo service ssh restart >/dev/null 2>&1
 
-    # Sometimes the server is not quick enough to load the keys!
-    while [[ $(ssh-keyscan -H localhost 2>/dev/null | wc -l) != "3" ]] ; do
-        echo "Not all keys yet loaded by the server"
-        sleep 0.05
-    done
-
-    ssh-keyscan -H localhost >> ~/.ssh/known_hosts 2>/dev/null
-fi
+# Sometimes the server is not quick enough to load the keys!
+while [[ $(ssh-keyscan -H localhost 2>/dev/null | wc -l) != "3" ]] ; do
+    echo "Not all keys yet loaded by the server"
+    sleep 0.05
+done
 
+ssh-keyscan -H localhost >> ~/.ssh/known_hosts 2>/dev/null
 
-export KIND_CLUSTER_OPERATION="${KIND_CLUSTER_OPERATION:="start"}"
 export KUBERNETES_VERSION=${KUBERNETES_VERSION:=""}
 
-if [[ ${RUNTIME:=""} == "kubernetes" ]]; then
-    unset KRB5_CONFIG
-    unset KRB5_KTNAME
-    export AIRFLOW_KUBERNETES_IMAGE=${AIRFLOW_CI_IMAGE}-kubernetes
-    AIRFLOW_KUBERNETES_IMAGE_NAME=$(echo "${AIRFLOW_KUBERNETES_IMAGE}" | cut -f 1 -d ":")
-    export AIRFLOW_KUBERNETES_IMAGE_NAME
-    AIRFLOW_KUBERNETES_IMAGE_TAG=$(echo "${AIRFLOW_KUBERNETES_IMAGE}" | cut -f 2 -d ":")
-    export AIRFLOW_KUBERNETES_IMAGE_TAG
-fi
-
-
-if [[ "${ENABLE_KIND_CLUSTER}" == "true" ]]; then
-    export CLUSTER_NAME="airflow-python-${PYTHON_MAJOR_MINOR_VERSION}-${KUBERNETES_VERSION}"
-    "${MY_DIR}/kubernetes/setup_kind_cluster.sh"
-    if [[ ${KIND_CLUSTER_OPERATION} == "stop" ]]; then
-        exit 1
-    fi
-fi
-
 # shellcheck source=scripts/ci/in_container/configure_environment.sh
 . "${MY_DIR}/configure_environment.sh"
 
@@ -240,9 +216,9 @@ if [[ ${#@} -gt 0 && -n "$1" ]]; then
 fi
 
 if [[ -n ${RUN_INTEGRATION_TESTS:=""} ]]; then
-    for INTEGRATION in ${RUN_INTEGRATION_TESTS}
+    for INT in ${RUN_INTEGRATION_TESTS}
     do
-        CI_ARGS+=("--integration" "${INTEGRATION}")
+        CI_ARGS+=("--integration" "${INT}")
     done
     CI_ARGS+=("-rpfExX")
 elif [[ ${ONLY_RUN_LONG_RUNNING_TESTS:=""} == "true" ]]; then
@@ -261,17 +237,6 @@ elif [[ ${ONLY_RUN_QUARANTINED_TESTS:=""} == "true" ]]; then
         "--timeout" "90")
 fi
 
-
-if [[ -n ${RUNTIME} ]]; then
-    CI_ARGS+=("--runtime" "${RUNTIME}" "-rpfExX")
-    TESTS_TO_RUN="tests/runtime"
-    if [[ ${RUNTIME} == "kubernetes" ]]; then
-        export SKIP_INIT_DB=true
-        "${MY_DIR}/deploy_airflow_to_kubernetes.sh"
-    fi
-fi
-
-
 ARGS=("${CI_ARGS[@]}" "${TESTS_TO_RUN}")
 
 if [[ ${RUN_SYSTEM_TESTS:="false"} == "true" ]]; then
diff --git a/scripts/ci/in_container/kubernetes/app/deploy_app.sh b/scripts/ci/in_container/kubernetes/app/deploy_app.sh
deleted file mode 100755
index 1f7832d..0000000
--- a/scripts/ci/in_container/kubernetes/app/deploy_app.sh
+++ /dev/null
@@ -1,210 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-set -euo pipefail
-
-MY_DIR=$(cd "$(dirname "$0")" && pwd)
-
-AIRFLOW_SOURCES=$(
-    cd "${MY_DIR}/../../../../../" || exit 1
-    pwd
-)
-export AIRFLOW_SOURCES
-
-# We keep _utils here because we are not in the in_container directory
-# shellcheck source=scripts/ci/in_container/_in_container_utils.sh
-. "${MY_DIR}/../../_in_container_utils.sh"
-
-assert_in_container
-
-in_container_script_start
-
-function end_and_dump_logs() {
-    dump_logs
-    in_container_script_end
-}
-
-trap in_container_script_end EXIT
-
-export TEMPLATE_DIRNAME="${MY_DIR}/templates"
-export BUILD_DIRNAME="${MY_DIR}/build"
-
-# shellcheck source=common/_image_variables.sh
-. "${AIRFLOW_SOURCES}/common/_image_variables.sh"
-
-if [[ ! -d "${BUILD_DIRNAME}" ]]; then
-    mkdir -p "${BUILD_DIRNAME}"
-fi
-
-rm -f "${BUILD_DIRNAME}"/*
-
-if [[ "${KUBERNETES_MODE}" == "persistent_mode" ]]; then
-    INIT_DAGS_VOLUME_NAME=airflow-dags
-    POD_AIRFLOW_DAGS_VOLUME_NAME=airflow-dags
-    CONFIGMAP_DAGS_FOLDER=/opt/airflow/dags
-    CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT=
-    CONFIGMAP_DAGS_VOLUME_CLAIM=airflow-dags
-else
-    INIT_DAGS_VOLUME_NAME=airflow-dags-fake
-    POD_AIRFLOW_DAGS_VOLUME_NAME=airflow-dags-git
-    CONFIGMAP_DAGS_FOLDER=/opt/airflow/dags/repo/airflow/example_dags
-    CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT=/opt/airflow/dags
-    CONFIGMAP_DAGS_VOLUME_CLAIM=
-fi
-
-CONFIGMAP_GIT_REPO=${GITHUB_REPOSITORY:-apache/airflow}
-CONFIGMAP_BRANCH=${GITHUB_BASE_REF:=master}
-
-if [[ "${KUBERNETES_MODE}" == "persistent_mode" ]]; then
-    sed -e "s/{{INIT_GIT_SYNC}}//g" \
-        "${TEMPLATE_DIRNAME}/airflow.template.yaml" >"${BUILD_DIRNAME}/airflow.yaml"
-else
-    sed -e "/{{INIT_GIT_SYNC}}/{r ${TEMPLATE_DIRNAME}/init_git_sync.template.yaml" -e 'd}' \
-        "${TEMPLATE_DIRNAME}/airflow.template.yaml" >"${BUILD_DIRNAME}/airflow.yaml"
-fi
-sed -i "s|{{AIRFLOW_KUBERNETES_IMAGE}}|${AIRFLOW_KUBERNETES_IMAGE}|g" "${BUILD_DIRNAME}/airflow.yaml"
-
-sed -i "s|{{CONFIGMAP_GIT_REPO}}|${CONFIGMAP_GIT_REPO}|g" "${BUILD_DIRNAME}/airflow.yaml"
-sed -i "s|{{CONFIGMAP_BRANCH}}|${CONFIGMAP_BRANCH}|g" "${BUILD_DIRNAME}/airflow.yaml"
-sed -i "s|{{INIT_DAGS_VOLUME_NAME}}|${INIT_DAGS_VOLUME_NAME}|g" "${BUILD_DIRNAME}/airflow.yaml"
-sed -i "s|{{POD_AIRFLOW_DAGS_VOLUME_NAME}}|${POD_AIRFLOW_DAGS_VOLUME_NAME}|g" \
-    "${BUILD_DIRNAME}/airflow.yaml"
-
-sed "s|{{CONFIGMAP_DAGS_FOLDER}}|${CONFIGMAP_DAGS_FOLDER}|g" \
-    "${TEMPLATE_DIRNAME}/configmaps.template.yaml" >"${BUILD_DIRNAME}/configmaps.yaml"
-sed -i "s|{{CONFIGMAP_GIT_REPO}}|${CONFIGMAP_GIT_REPO}|g" "${BUILD_DIRNAME}/configmaps.yaml"
-sed -i "s|{{CONFIGMAP_BRANCH}}|${CONFIGMAP_BRANCH}|g" "${BUILD_DIRNAME}/configmaps.yaml"
-sed -i "s|{{CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT}}|${CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT}|g" \
-    "${BUILD_DIRNAME}/configmaps.yaml"
-sed -i "s|{{CONFIGMAP_DAGS_VOLUME_CLAIM}}|${CONFIGMAP_DAGS_VOLUME_CLAIM}|g" \
-    "${BUILD_DIRNAME}/configmaps.yaml"
-sed -i "s|{{AIRFLOW_KUBERNETES_IMAGE_NAME}}|${AIRFLOW_KUBERNETES_IMAGE_NAME}|g" \
-    "${BUILD_DIRNAME}/configmaps.yaml"
-sed -i "s|{{AIRFLOW_KUBERNETES_IMAGE_TAG}}|${AIRFLOW_KUBERNETES_IMAGE_TAG}|g" \
-    "${BUILD_DIRNAME}/configmaps.yaml"
-
-cat "${BUILD_DIRNAME}/airflow.yaml"
-cat "${BUILD_DIRNAME}/configmaps.yaml"
-
-kubectl delete -f "${MY_DIR}/postgres.yaml" || true
-kubectl delete -f "${BUILD_DIRNAME}/airflow.yaml" || true
-kubectl delete -f "${MY_DIR}/secrets.yaml" || true
-kubectl apply -f "${MY_DIR}/secrets.yaml" -n test-namespace
-
-set -e
-
-kubectl apply -f "${MY_DIR}/secrets.yaml"
-kubectl apply -f "${MY_DIR}/secrets.yaml" -n test-namespace
-kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml"
-kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml" -n test-namespace
-kubectl apply -f "${MY_DIR}/postgres.yaml"
-kubectl apply -f "${MY_DIR}/volumes.yaml"
-kubectl apply -f "${MY_DIR}/volumes.yaml" -n test-namespace
-
-set +x
-set +o pipefail
-PODS_ARE_READY="0"
-for i in {1..150}; do
-    echo "------- Running kubectl get pods: $i -------"
-    PODS=$(kubectl get pods | awk 'NR>1 {print $0}')
-    echo "$PODS"
-    NUM_POSTGRES_READY=$(echo "${PODS}" | grep postgres | awk '{print $2}' | grep -cE '([1-9])\/(\1)' | xargs)
-    if [[ "${NUM_POSTGRES_READY}" == "1" ]]; then
-        PODS_ARE_READY="1"
-        break
-    fi
-    sleep 4
-done
-
-sleep 7
-
-kubectl apply -f "${BUILD_DIRNAME}/airflow.yaml"
-
-dump_logs() {
-    echo "dumping logs"
-    POD=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep airflow | head -1)
-    echo "------- pod description -------"
-    kubectl describe pod "${POD}"
-    echo "------- webserver init container logs - init -------"
-    kubectl logs "${POD}" -c init || true
-    if [[ "${KUBERNETES_MODE}" != "persistent_mode" ]]; then
-        echo "------- webserver init container logs - git-sync-clone -------"
-        kubectl logs "${POD}" -c git-sync-clone || true
-    fi
-    echo "------- webserver logs -------"
-    kubectl logs "${POD}" -c webserver || true
-    echo "------- scheduler logs -------"
-    kubectl logs "${POD}" -c scheduler || true
-    echo "--------------"
-}
-
-set +x
-set +o pipefail
-# wait for up to 10 minutes for everything to be deployed
-PODS_ARE_READY="0"
-for i in {1..20}; do
-    echo "------- Running kubectl get pods: $i -------"
-    PODS=$(kubectl get pods | awk 'NR>1 {print $0}')
-    echo "$PODS"
-    NUM_AIRFLOW_READY=$(echo "${PODS}" | grep airflow | awk '{print $2}' | grep -cE '([2-9])\/(\1)' | xargs)
-    NUM_POSTGRES_READY=$(echo "${PODS}" | grep postgres | awk '{print $2}' | grep -cE '([1-9])\/(\1)' | xargs)
-    if [[ "${NUM_AIRFLOW_READY}" == "1" && "${NUM_POSTGRES_READY}" == "1" ]]; then
-        PODS_ARE_READY="1"
-        break
-    fi
-    sleep 4
-done
-POD=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep airflow | head -1)
-dump_logs
-if [[ "${PODS_ARE_READY}" == "1" ]]; then
-    echo "PODS are ready."
-else
-    echo >&2 "PODS are not ready after waiting for a long time. Exiting..."
-    dump_logs
-    exit 1
-fi
-
-# Wait until Airflow webserver is up
-KUBERNETES_HOST=${CLUSTER_NAME}-worker
-AIRFLOW_WEBSERVER_IS_READY="0"
-CONSECUTIVE_SUCCESS_CALLS=0
-for i in {1..20}; do
-    echo "------- Wait until webserver is up: $i -------"
-    PODS=$(kubectl get pods | awk 'NR>1 {print $0}')
-    echo "$PODS"
-    HTTP_CODE=$(curl -LI "http://${KUBERNETES_HOST}:30809/health" -o /dev/null -w '%{http_code}\n' -sS) || true
-    if [[ "${HTTP_CODE}" == 200 ]]; then
-        ((CONSECUTIVE_SUCCESS_CALLS += 1))
-    else
-        CONSECUTIVE_SUCCESS_CALLS="0"
-    fi
-    if [[ "${CONSECUTIVE_SUCCESS_CALLS}" == 3 ]]; then
-        AIRFLOW_WEBSERVER_IS_READY="1"
-        break
-    fi
-    sleep 10
-done
-set -o pipefail
-
-if [[ "${AIRFLOW_WEBSERVER_IS_READY}" == "1" ]]; then
-    echo "Airflow webserver is ready."
-else
-    echo >&2 "Airflow webserver is not ready after waiting for a long time. Exiting..."
-    dump_logs
-    exit 1
-fi
diff --git a/scripts/ci/in_container/kubernetes/docker/rebuild_airflow_image.sh b/scripts/ci/in_container/kubernetes/docker/rebuild_airflow_image.sh
deleted file mode 100755
index 1df822e..0000000
--- a/scripts/ci/in_container/kubernetes/docker/rebuild_airflow_image.sh
+++ /dev/null
@@ -1,74 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-MY_DIR=$(cd "$(dirname "$0")" && pwd)
-AIRFLOW_SOURCES=$(cd "${MY_DIR}/../../../../../" || exit 1 ; pwd)
-export AIRFLOW_SOURCES
-
-# We keep _utils here because we are not in the in_container directory
-# shellcheck source=scripts/ci/in_container/_in_container_utils.sh
-. "${MY_DIR}/../../_in_container_utils.sh"
-
-export OUTPUT_LOG=${AIRFLOW_SOURCES}/logs/rebuild_airflow_image.log
-
-assert_in_container
-
-in_container_script_start
-
-cd "${AIRFLOW_SOURCES}" || exit 1
-
-# Required to rebuild images from inside container
-mkdir -pv scripts/docker/
-cp /entrypoint.sh scripts/docker/
-
-echo
-echo "Building image from ${AIRFLOW_CI_IMAGE} with latest sources"
-echo
-#export AIRFLOW_PROD_BASE_TAG="${DEFAULT_BRANCH}-python${PYTHON_MAJOR_MINOR_VERSION}"
-#export AIRFLOW_PROD_IMAGE="${DOCKERHUB_USER}/${DOCKERHUB_REPO}:${AIRFLOW_PROD_BASE_TAG}"
-export AIRFLOW_PROD_IMAGE="apache/airflow:v1-10-test-python3.6"
-echo "Adding kubernetes-specific scripts to basic CI image."
-echo "Building ${AIRFLOW_KUBERNETES_IMAGE} from ${AIRFLOW_PROD_IMAGE}"
-echo
-docker build \
-    --build-arg AIRFLOW_PROD_IMAGE="${AIRFLOW_PROD_IMAGE}" \
-    --cache-from "${AIRFLOW_PROD_IMAGE}" \
-    --tag="${AIRFLOW_KUBERNETES_IMAGE}" \
-    -f- .  <<EOF
-ARG AIRFLOW_PROD_IMAGE
-FROM ${AIRFLOW_PROD_IMAGE}
-
-COPY scripts/ci/in_container/kubernetes/docker/airflow-test-env-init.sh /tmp/airflow-test-env-init.sh
-
-ENV AIRFLOW__CORE__LOAD_EXAMPLES="true"
-ENTRYPOINT ["/usr/bin/dumb-init", "--", "/entrypoint"]
-EOF
-
-echo
-echo "Loading the ${AIRFLOW_KUBERNETES_IMAGE} to cluster ${CLUSTER_NAME} from docker"
-echo
-kind load docker-image --name "${CLUSTER_NAME}" "${AIRFLOW_KUBERNETES_IMAGE}"
-echo
-echo "Loaded the ${AIRFLOW_KUBERNETES_IMAGE} to cluster ${CLUSTER_NAME}"
-echo
-
-echo
-echo "Stopping output heartbeat"
-echo
-
-
-in_container_script_end
diff --git a/scripts/ci/in_container/kubernetes/setup_kind_cluster.sh b/scripts/ci/in_container/kubernetes/setup_kind_cluster.sh
deleted file mode 100755
index fc033da..0000000
--- a/scripts/ci/in_container/kubernetes/setup_kind_cluster.sh
+++ /dev/null
@@ -1,187 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-MY_DIR=$(cd "$(dirname "$0")" && pwd)
-
-AIRFLOW_SOURCES=$(
-    cd "${MY_DIR}/../../../../" || exit 1
-    pwd
-)
-export AIRFLOW_SOURCES
-
-# We keep _utils here because we are not in the in_container directory
-# shellcheck source=scripts/ci/in_container/_in_container_utils.sh
-. "${MY_DIR}/../_in_container_utils.sh"
-
-assert_in_container
-
-in_container_script_start
-
-# just in case it's not set
-export KUBERNETES_VERSION=${KUBERNETES_VERSION:="v1.15.3"}
-
-echo
-echo "Kubernetes version = ${KUBERNETES_VERSION}"
-echo
-
-function create_cluster() {
-    if [[ "${CI:="false"}" == "true" ]]; then
-        # Travis CI does not handle the nice output of Kind well, so we need to capture it
-        # And display only if kind fails to start
-        start_output_heartbeat "Creating kubernetes cluster" 10
-        set +e
-        if ! OUTPUT=$(kind create cluster \
-                                    --name "${CLUSTER_NAME}" \
-                                    --config "${MY_DIR}/kind-cluster-conf.yaml" \
-                                    --image "kindest/node:${KUBERNETES_VERSION}" 2>&1); then
-            echo "${OUTPUT}"
-        fi
-        stop_output_heartbeat
-    else
-        kind create cluster \
-            --name "${CLUSTER_NAME}" \
-            --config "${MY_DIR}/kind-cluster-conf.yaml" \
-            --image "kindest/node:${KUBERNETES_VERSION}"
-    fi
-    echo
-    echo "Created cluster ${CLUSTER_NAME}"
-    echo
-
-    echo
-    echo "Connecting Kubernetes' worker and ccontrol plane to the network used by Breeze"
-    echo
-    docker network connect docker-compose_default "${CLUSTER_NAME}-control-plane"
-    docker network connect docker-compose_default "${CLUSTER_NAME}-worker"
-    echo
-    echo "Connected Kubernetes' worker and ccontrol plane to the network used by Breeze"
-    echo
-
-
-    echo
-    echo "Replacing cluster host address with https://${CLUSTER_NAME}-control-plane:6443"
-    echo
-    kubectl config set "clusters.kind-${CLUSTER_NAME}.server" "https://${CLUSTER_NAME}-control-plane:6443"
-
-    echo
-    echo "Replaced cluster host address with https://${CLUSTER_NAME}-control-plane:6443"
-    echo
-
-    echo
-    echo "Patching CoreDNS to avoid loop and to use 8.8.8.8 DNS as forward address."
-    echo
-    echo "============================================================================"
-    echo "      Original coredns configmap:"
-    echo "============================================================================"
-    kubectl get configmaps --namespace=kube-system coredns -o yaml
-    kubectl get configmaps --namespace=kube-system coredns -o yaml | \
-        sed 's/forward \. .*$/forward . 8.8.8.8/' | kubectl apply -f -
-
-    echo
-    echo "============================================================================"
-    echo "      Updated coredns configmap with new forward directive:"
-    echo "============================================================================"
-    kubectl get configmaps --namespace=kube-system coredns -o yaml
-
-
-    echo
-    echo "Restarting CoreDNS"
-    echo
-    kubectl scale deployment --namespace=kube-system coredns --replicas=0
-    kubectl scale deployment --namespace=kube-system coredns --replicas=2
-    echo
-    echo "Restarted CoreDNS"
-    echo
-}
-
-function delete_cluster() {
-    kind delete cluster --name "${CLUSTER_NAME}"
-    echo
-    echo "Deleted cluster ${CLUSTER_NAME}"
-    echo
-    rm -rf "${HOME}/.kube/*"
-}
-
-ALL_CLUSTERS=$(kind get clusters || true)
-if [[ ${ALL_CLUSTERS} == *"${CLUSTER_NAME}"* ]]; then
-    echo
-    echo "Cluster ${CLUSTER_NAME} is already created"
-    echo
-    if [[ ${KIND_CLUSTER_OPERATION} == "start" ]]; then
-        echo
-        echo "Reusing previously created cluster"
-        echo
-    elif [[ ${KIND_CLUSTER_OPERATION} == "recreate" ]]; then
-        echo
-        echo "Recreating cluster"
-        echo
-        delete_cluster
-        create_cluster
-    elif [[ ${KIND_CLUSTER_OPERATION} == "stop" ]]; then
-        echo
-        echo "Deleting cluster"
-        echo
-        delete_cluster
-        exit
-    else
-        echo
-        echo "Wrong cluster operation: ${KIND_CLUSTER_OPERATION}. Should be one of start/stop/recreate"
-        echo
-        exit 1
-    fi
-else
-    if [[ ${KIND_CLUSTER_OPERATION} == "start" ]]; then
-        echo
-        echo "Creating cluster"
-        echo
-        create_cluster
-    elif [[ ${KIND_CLUSTER_OPERATION} == "restart" ]]; then
-        echo
-        echo "Cluster ${CLUSTER_NAME} does not exist. Creating rather than recreating"
-        echo
-        echo "Creating cluster"
-        echo
-        create_cluster
-    elif [[ ${KIND_CLUSTER_OPERATION} == "stop" ]]; then
-        echo
-        echo "Cluster ${CLUSTER_NAME} does not exist. It should exist for stop operation"
-        echo
-        exit 1
-    else
-        echo
-        echo "Wrong cluster operation: ${KIND_CLUSTER_OPERATION}. Should be one of start/stop/recreate"
-        echo
-        exit 1
-    fi
-fi
-
-kubectl cluster-info
-
-kubectl get nodes
-echo
-echo "Showing storageClass"
-echo
-kubectl get storageclass
-echo
-echo "Showing kube-system pods"
-echo
-kubectl get -n kube-system pods
-echo
-echo "Airflow environment on kubernetes is good to go!"
-echo
-kubectl create namespace test-namespace
-
-in_container_script_end
diff --git a/scripts/ci/in_container/run_ci_tests.sh b/scripts/ci/in_container/run_ci_tests.sh
index b046c08..f5f4340 100755
--- a/scripts/ci/in_container/run_ci_tests.sh
+++ b/scripts/ci/in_container/run_ci_tests.sh
@@ -41,8 +41,4 @@ if [[ ${CI} == "true" ]]; then
     send_airflow_logs_to_file_io
 fi
 
-if [[ ${CI} == "true" && ${ENABLE_KIND_CLUSTER} == "true" ]]; then
-    send_kubernetes_logs_to_file_io
-fi
-
 exit "${RES}"
diff --git a/scripts/ci/in_container/kubernetes/app/postgres.yaml b/scripts/ci/kubernetes/app/postgres.yaml
similarity index 100%
rename from scripts/ci/in_container/kubernetes/app/postgres.yaml
rename to scripts/ci/kubernetes/app/postgres.yaml
diff --git a/scripts/ci/in_container/kubernetes/app/secrets.yaml b/scripts/ci/kubernetes/app/secrets.yaml
similarity index 100%
rename from scripts/ci/in_container/kubernetes/app/secrets.yaml
rename to scripts/ci/kubernetes/app/secrets.yaml
diff --git a/scripts/ci/in_container/kubernetes/app/templates/airflow.template.yaml b/scripts/ci/kubernetes/app/templates/airflow.template.yaml
similarity index 70%
rename from scripts/ci/in_container/kubernetes/app/templates/airflow.template.yaml
rename to scripts/ci/kubernetes/app/templates/airflow.template.yaml
index 0ee9492..3f2465d 100644
--- a/scripts/ci/in_container/kubernetes/app/templates/airflow.template.yaml
+++ b/scripts/ci/kubernetes/app/templates/airflow.template.yaml
@@ -46,31 +46,53 @@ spec:
         runAsGroup: 50000
         fsGroup: 50000
       initContainers:
-      - name: "init"
-        image: {{AIRFLOW_KUBERNETES_IMAGE}}
-        imagePullPolicy: IfNotPresent
-        volumeMounts:
-        - name: airflow-configmap
-          mountPath: /opt/airflow/airflow.cfg
-          subPath: airflow.cfg
-        - name: {{INIT_DAGS_VOLUME_NAME}}
-          mountPath: /opt/airflow/dags
-        env:
-        - name: SQL_ALCHEMY_CONN
-          valueFrom:
-            secretKeyRef:
-              name: airflow-secrets
-              key: sql_alchemy_conn
-        command:
-          - "bash"
-        args:
-          - "-cx"
-          - "/tmp/airflow-test-env-init.sh"
+        - name: "init-dags"
+          image: {{AIRFLOW_KUBERNETES_IMAGE}}
+          imagePullPolicy: Never
+          securityContext:
+            runAsUser: 0
+          volumeMounts:
+            - name: airflow-configmap
+              mountPath: /opt/airflow/airflow.cfg
+              subPath: airflow.cfg
+            - name: {{INIT_DAGS_VOLUME_NAME}}
+              mountPath: /opt/airflow/dags
+          env:
+            - name: SQL_ALCHEMY_CONN
+              valueFrom:
+                secretKeyRef:
+                  name: airflow-secrets
+                  key: sql_alchemy_conn
+          command:
+            - "bash"
+          args:
+            - "-cx"
+            - "/tmp/airflow-test-env-init-dags.sh"
+        - name: "init-db"
+          image: {{AIRFLOW_KUBERNETES_IMAGE}}
+          imagePullPolicy: Never
+          volumeMounts:
+          - name: airflow-configmap
+            mountPath: /opt/airflow/airflow.cfg
+            subPath: airflow.cfg
+          - name: {{INIT_DAGS_VOLUME_NAME}}
+            mountPath: /opt/airflow/dags
+          env:
+          - name: SQL_ALCHEMY_CONN
+            valueFrom:
+              secretKeyRef:
+                name: airflow-secrets
+                key: sql_alchemy_conn
+          command:
+            - "bash"
+          args:
+            - "-cx"
+            - "/tmp/airflow-test-env-init-db.sh"
 {{INIT_GIT_SYNC}}
       containers:
       - name: webserver
         image: {{AIRFLOW_KUBERNETES_IMAGE}}
-        imagePullPolicy: IfNotPresent
+        imagePullPolicy: Never
         ports:
         - name: webserver
           containerPort: 8080
@@ -95,7 +117,7 @@ spec:
           mountPath: /opt/airflow/logs
       - name: scheduler
         image: {{AIRFLOW_KUBERNETES_IMAGE}}
-        imagePullPolicy: IfNotPresent
+        imagePullPolicy: Never
         args: ["scheduler"]
         env:
         - name: AIRFLOW__KUBERNETES__NAMESPACE
diff --git a/scripts/ci/in_container/kubernetes/app/templates/configmaps.template.yaml b/scripts/ci/kubernetes/app/templates/configmaps.template.yaml
similarity index 100%
rename from scripts/ci/in_container/kubernetes/app/templates/configmaps.template.yaml
rename to scripts/ci/kubernetes/app/templates/configmaps.template.yaml
diff --git a/scripts/ci/in_container/kubernetes/app/templates/init_git_sync.template.yaml b/scripts/ci/kubernetes/app/templates/init_git_sync.template.yaml
similarity index 55%
rename from scripts/ci/in_container/kubernetes/app/templates/init_git_sync.template.yaml
rename to scripts/ci/kubernetes/app/templates/init_git_sync.template.yaml
index 343ad8c..940b6ec 100644
--- a/scripts/ci/in_container/kubernetes/app/templates/init_git_sync.template.yaml
+++ b/scripts/ci/kubernetes/app/templates/init_git_sync.template.yaml
@@ -15,22 +15,22 @@
 # specific language governing permissions and limitations
 # under the License.
 
-      - name: git-sync-clone
-        env:
-        - name: GIT_SYNC_REPO
-          value: https://github.com/{{CONFIGMAP_GIT_REPO}}.git
-        - name: GIT_SYNC_BRANCH
-          value: {{CONFIGMAP_BRANCH}}
-        - name: GIT_SYNC_ROOT
-          value: /git
-        - name: GIT_SYNC_DEST
-          value: repo
-        - name: GIT_SYNC_ONE_TIME
-          value: "true"
-        image: gcr.io/google-containers/git-sync-amd64:v2.0.5
-        imagePullPolicy: IfNotPresent
-        securityContext:
-          runAsUser: 0
-        volumeMounts:
-        - mountPath: /git
-          name: airflow-dags-git
+        - name: "git-sync-clone"
+          env:
+          - name: GIT_SYNC_REPO
+            value: https://github.com/{{CONFIGMAP_GIT_REPO}}.git
+          - name: GIT_SYNC_BRANCH
+            value: {{CONFIGMAP_BRANCH}}
+          - name: GIT_SYNC_ROOT
+            value: /git
+          - name: GIT_SYNC_DEST
+            value: repo
+          - name: GIT_SYNC_ONE_TIME
+            value: "true"
+          image: gcr.io/google-containers/git-sync-amd64:v2.0.5
+          imagePullPolicy: IfNotPresent
+          securityContext:
+            runAsUser: 0
+          volumeMounts:
+          - mountPath: /git
+            name: airflow-dags-git
diff --git a/scripts/ci/in_container/kubernetes/app/volumes.yaml b/scripts/ci/kubernetes/app/volumes.yaml
similarity index 100%
rename from scripts/ci/in_container/kubernetes/app/volumes.yaml
rename to scripts/ci/kubernetes/app/volumes.yaml
diff --git a/scripts/ci/in_container/kubernetes/docker/airflow-test-env-init.sh b/scripts/ci/kubernetes/docker/airflow-test-env-init-dags.sh
similarity index 70%
rename from scripts/ci/in_container/kubernetes/docker/airflow-test-env-init.sh
rename to scripts/ci/kubernetes/docker/airflow-test-env-init-dags.sh
index 8e15f0f..9722958 100755
--- a/scripts/ci/in_container/kubernetes/docker/airflow-test-env-init.sh
+++ b/scripts/ci/kubernetes/docker/airflow-test-env-init-dags.sh
@@ -16,8 +16,21 @@
 # specific language governing permissions and limitations
 # under the License.
 
-set -x
+set -euo pipefail
 
-cd /opt/airflow && \
-airflow initdb && \
-(airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow || true)
+echo
+echo "Copying airflow dags"
+echo
+
+
+# Create DAGS folder if it does not exist
+mkdir -pv "${AIRFLOW_HOME}/dags"
+ls -la "${AIRFLOW_HOME}/dags/"
+rm -rvf "${AIRFLOW_HOME}/dags/*"
+
+# Copy DAGS from current sources
+cp -Rv "${AIRFLOW_SOURCES}"/airflow/example_dags/* "${AIRFLOW_HOME}/dags/"
+
+echo
+echo "Copied airflow dags"
+echo
diff --git a/scripts/ci/docker-compose/runtime-kubernetes.yml b/scripts/ci/kubernetes/docker/airflow-test-env-init-db.sh
old mode 100644
new mode 100755
similarity index 60%
rename from scripts/ci/docker-compose/runtime-kubernetes.yml
rename to scripts/ci/kubernetes/docker/airflow-test-env-init-db.sh
index 3b96f46..c790160
--- a/scripts/ci/docker-compose/runtime-kubernetes.yml
+++ b/scripts/ci/kubernetes/docker/airflow-test-env-init-db.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,12 +15,32 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
----
-version: "2.2"
-services:
-  airflow:
-    environment:
-      - RUNTIME=kubernetes
-      - KUBERNETES_MODE
-      - KUBERNETES_VERSION
-      - KIND_CLUSTER_OPERATION
+
+set -euo pipefail
+
+echo
+echo "Initializing the Airflow db"
+echo
+
+
+# Init and upgrade the database to latest heads
+cd "${AIRFLOW_SOURCES}"/airflow || exit 1
+
+airflow db init
+alembic upgrade heads
+
+echo
+echo "Initialized the database"
+echo
+
+# Create Airflow User if it does not exist
+airflow create_user \
+    --username airflow \
+    --lastname airflow \
+    --firstname jon \
+    --email airflow@apache.org \
+    --role Admin --password airflow || true
+
+echo
+echo "Created airflow user"
+echo
diff --git a/scripts/ci/kubernetes/docker/bootstrap.sh b/scripts/ci/kubernetes/docker/bootstrap.sh
new file mode 100755
index 0000000..9099b6b
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/bootstrap.sh
@@ -0,0 +1,74 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+set -euo pipefail
+
+# This part is to allow fast iteration with the kubernetes tests
+# Pre-installed Airflow from the production image is removed and Airflow is re-installed from the
+# sources added during preparing the kubernetes image. This way when we deploy the image
+# to KinD we only add latest sources and only that most recent layer is sent to kind
+# and airflow always runs with compiled dist web files from pre-compiled dist installed in prod image
+
+
+echo
+echo "Save minimised web files"
+echo
+
+mv "$(python -m site | grep ^USER_SITE | awk '{print $2}' | tr -d "'")/airflow/www/static/dist/" \
+    "/tmp"
+
+echo
+echo "Uninstalling pre-installed airflow"
+echo
+
+# Uninstall preinstalled Apache Airlfow
+pip uninstall -y apache-airflow
+
+
+echo
+echo "Installing airflow from the sources"
+echo
+
+# Installing airflow from the sources copied to the Kubernetes image
+pip install --user "${AIRFLOW_SOURCES}"
+
+echo
+echo "Restore minimised web files"
+echo
+
+mv "/tmp/dist" "$(python -m site | grep ^USER_SITE | awk '{print $2}' | tr -d "'")/airflow/www/static/"
+
+echo
+echo "Airflow prepared. Running ${1}"
+echo
+
+
+if [[ "$1" = "webserver" ]]
+then
+    exec airflow webserver
+fi
+
+if [[ "$1" = "scheduler" ]]
+then
+    exec airflow scheduler
+fi
+
+echo
+echo "Entering bash"
+echo
+
+exec /bin/bash
diff --git a/scripts/ci/in_container/kubernetes/kind-cluster-conf.yaml b/scripts/ci/kubernetes/kind-cluster-conf.yaml
similarity index 100%
rename from scripts/ci/in_container/kubernetes/kind-cluster-conf.yaml
rename to scripts/ci/kubernetes/kind-cluster-conf.yaml
diff --git a/tests/conftest.py b/tests/conftest.py
index f39eadd..e7de7fc 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -83,12 +83,6 @@ def pytest_addoption(parser):
         help="only run tests matching the backend: [sqlite,postgres,mysql].",
     )
     group.addoption(
-        "--runtime",
-        action="store",
-        metavar="RUNTIME",
-        help="only run tests matching the runtime: [kubernetes].",
-    )
-    group.addoption(
         "--system",
         action="append",
         metavar="SYSTEMS",
@@ -174,9 +168,6 @@ def pytest_configure(config):
         "markers", "backend(name): mark test to run with named backend"
     )
     config.addinivalue_line(
-        "markers", "runtime(name): mark test to run with named runtime"
-    )
-    config.addinivalue_line(
         "markers", "system(name): mark test to run with named system"
     )
     config.addinivalue_line(
@@ -212,16 +203,6 @@ def skip_if_not_marked_with_backend(selected_backend, item):
                 format(backend=selected_backend, item=item))
 
 
-def skip_if_not_marked_with_runtime(selected_runtime, item):
-    for marker in item.iter_markers(name="runtime"):
-        runtime_name = marker.args[0]
-        if runtime_name == selected_runtime:
-            return
-    pytest.skip("The test is skipped because it has not been selected via --runtime switch. "
-                "Only tests marked with pytest.mark.runtime('{runtime}') are run: {item}".
-                format(runtime=selected_runtime, item=item))
-
-
 def skip_if_not_marked_with_system(selected_systems, item):
     for marker in item.iter_markers(name="system"):
         systems_name = marker.args[0]
@@ -268,19 +249,6 @@ def skip_if_integration_disabled(marker, item):
                            integration_name=integration_name, item=item))
 
 
-def skip_if_runtime_disabled(marker, item):
-    runtime_name = marker.args[0]
-    environment_variable_name = "RUNTIME"
-    environment_variable_value = os.environ.get(environment_variable_name)
-    if not environment_variable_value or environment_variable_value != runtime_name:
-        pytest.skip("The test requires {runtime_name} integration started and "
-                    "{name} environment variable to be set to true (it is '{value}')."
-                    " It can be set by specifying '--environment {runtime_name}' at breeze startup"
-                    ": {item}".
-                    format(name=environment_variable_name, value=environment_variable_value,
-                           runtime_name=runtime_name, item=item))
-
-
 def skip_if_wrong_backend(marker, item):
     valid_backend_names = marker.args
     environment_variable_name = "BACKEND"
@@ -323,11 +291,6 @@ def pytest_runtest_setup(item):
     selected_backend = item.config.getoption("--backend")
     if selected_backend:
         skip_if_not_marked_with_backend(selected_backend, item)
-    for marker in item.iter_markers(name="runtime"):
-        skip_if_runtime_disabled(marker, item)
-    selected_runtime = item.config.getoption("--runtime")
-    if selected_runtime:
-        skip_if_not_marked_with_runtime(selected_runtime, item)
     if not include_long_running:
         skip_long_running_test(item)
     if not include_quarantined:
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
new file mode 100644
index 0000000..266ce05
--- /dev/null
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -0,0 +1,134 @@
+# pylint: disable=unused-argument
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+import kubernetes.client.models as k8s
+import pendulum
+
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, TaskInstance
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+from airflow.utils import timezone
+
+
+# noinspection PyUnusedLocal
+class TestKubernetesPodOperator(unittest.TestCase):
+
+    # noinspection DuplicatedCode
+    @staticmethod
+    def create_context(task):
+        dag = DAG(dag_id="dag")
+        tzinfo = pendulum.timezone("Europe/Amsterdam")
+        execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
+        task_instance = TaskInstance(task=task,
+                                     execution_date=execution_date)
+        return {
+            "dag": dag,
+            "ts": execution_date.isoformat(),
+            "task": task,
+            "ti": task_instance,
+        }
+
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_config_path(self, client_mock, monitor_mock, start_mock):  # pylint: disable=unused-argument
+        from airflow.utils.state import State
+
+        file_path = "/tmp/fake_file"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            config_file=file_path,
+            cluster_context='default',
+        )
+        monitor_mock.return_value = (State.SUCCESS, None)
+        client_mock.list_namespaced_pod.return_value = []
+        context = self.create_context(k)
+        k.execute(context=context)
+        client_mock.assert_called_once_with(
+            in_cluster=False,
+            cluster_context='default',
+            config_file=file_path,
+        )
+
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_image_pull_secrets_correctly_set(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State
+
+        fake_pull_secrets = "fakeSecret"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            image_pull_secrets=fake_pull_secrets,
+            cluster_context='default',
+        )
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = self.create_context(k)
+        k.execute(context=context)
+        self.assertEqual(
+            start_mock.call_args[0][0].spec.image_pull_secrets,
+            [k8s.V1LocalObjectReference(name=fake_pull_secrets)]
+        )
+
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_pod_delete_even_on_launcher_error(
+            self,
+            mock_client,
+            delete_pod_mock,
+            monitor_pod_mock,
+            start_pod_mock):
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            is_delete_operator_pod=True,
+        )
+        monitor_pod_mock.side_effect = AirflowException('fake failure')
+        with self.assertRaises(AirflowException):
+            context = self.create_context(k)
+            k.execute(context=context)
+        assert delete_pod_mock.called
diff --git a/tests/runtime/__init__.py b/tests/runtime/__init__.py
deleted file mode 100644
index 114d189..0000000
--- a/tests/runtime/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.