You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/05/29 15:44:39 UTC

[airflow] branch v1-10-test updated: Move k8sexecutor out of contrib to closer match master (#8904)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 4daccda  Move k8sexecutor out of contrib to closer match master (#8904)
4daccda is described below

commit 4daccda3f04ace370ef3f6bf99e88c6faa170e4f
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Fri May 29 08:44:07 2020 -0700

    Move k8sexecutor out of contrib to closer match master (#8904)
    
    Considering that the k8s executor is now supported by core
    committers, we should move it from contrib to the primary executor
    directory.
    
    Co-authored-by: Daniel Imberman <da...@astronomer.io>
---
 airflow/contrib/executors/__init__.py              | 19 ------
 airflow/contrib/hooks/spark_submit_hook.py         |  2 +-
 airflow/contrib/kubernetes/__init__.py             |  5 ++
 .../contrib/operators/kubernetes_pod_operator.py   | 12 ++--
 airflow/executors/__init__.py                      |  2 +-
 .../{contrib => }/executors/kubernetes_executor.py |  6 +-
 .../__init__.py                                    |  0
 airflow/{contrib => }/kubernetes/kube_client.py    |  2 +-
 .../kubernetes_request_factory/__init__.py         |  0
 .../kubernetes_request_factory.py                  |  0
 .../pod_request_factory.py                         | 10 +++-
 airflow/{contrib => }/kubernetes/pod.py            |  2 +-
 airflow/{contrib => }/kubernetes/pod_generator.py  | 13 +++--
 airflow/{contrib => }/kubernetes/pod_launcher.py   |  4 +-
 .../kubernetes/pod_runtime_info_env.py             |  0
 airflow/{contrib => }/kubernetes/refresh_config.py |  0
 airflow/{contrib => }/kubernetes/secret.py         |  0
 airflow/{contrib => }/kubernetes/volume.py         |  0
 airflow/{contrib => }/kubernetes/volume_mount.py   |  0
 .../kubernetes/worker_configuration.py             |  4 +-
 docs/autoapi_templates/index.rst                   |  3 -
 docs/conf.py                                       |  2 +
 docs/kubernetes.rst                                |  8 +--
 tests/contrib/hooks/test_spark_submit_hook.py      |  2 +-
 .../executors/test_kubernetes_executor.py          | 68 ++++++++++++----------
 .../__init__.py                                    |  0
 .../kubernetes_request_factory}/__init__.py        |  0
 .../test_kubernetes_request_factory.py             |  9 ++-
 .../test_pod_request_factory.py                    |  7 +--
 tests/kubernetes/test_client.py                    |  8 +--
 tests/kubernetes/test_pod_launcher.py              |  2 +-
 .../__init__.py                                    |  0
 .../kubernetes/test_kubernetes_pod_operator.py     | 32 +++++-----
 tests/test_local_settings.py                       |  2 +-
 34 files changed, 108 insertions(+), 116 deletions(-)

diff --git a/airflow/contrib/executors/__init__.py b/airflow/contrib/executors/__init__.py
deleted file mode 100644
index b7f8352..0000000
--- a/airflow/contrib/executors/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index 32bbf91..8fd6c69 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -27,7 +27,7 @@ from airflow.exceptions import AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 try:
-    from airflow.contrib.kubernetes import kube_client
+    from airflow.kubernetes import kube_client
 except ImportError:
     pass
 
diff --git a/airflow/contrib/kubernetes/__init__.py b/airflow/contrib/kubernetes/__init__.py
index 13a8339..ef7074c 100644
--- a/airflow/contrib/kubernetes/__init__.py
+++ b/airflow/contrib/kubernetes/__init__.py
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,3 +16,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+#
+
+from airflow.kubernetes import *  # noqa
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index f692599..d121510 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -21,8 +21,8 @@ import warnings
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
-from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher
-from airflow.contrib.kubernetes.pod import Resources
+from airflow.kubernetes import kube_client, pod_generator, pod_launcher
+from airflow.kubernetes.pod import Resources
 from airflow.utils.helpers import validate_key
 from airflow.utils.state import State
 from airflow.version import version as airflow_version
@@ -50,11 +50,11 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                                comma separated list: secret_a,secret_b
     :type image_pull_secrets: str
     :param ports: ports for launched pod.
-    :type ports: list[airflow.contrib.kubernetes.pod.Port]
+    :type ports: list[airflow.kubernetes.pod.Port]
     :param volume_mounts: volumeMounts for launched pod.
-    :type volume_mounts: list[airflow.contrib.kubernetes.volume_mount.VolumeMount]
+    :type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
     :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
-    :type volumes: list[airflow.contrib.kubernetes.volume.Volume]
+    :type volumes: list[airflow.kubernetes.volume.Volume]
     :param labels: labels to apply to the Pod.
     :type labels: dict
     :param startup_timeout_seconds: timeout in seconds to startup the pod.
@@ -66,7 +66,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
     :type env_vars: dict
     :param secrets: Kubernetes secrets to inject in the container.
         They can be exposed as environment vars or files in a volume
-    :type secrets: list[airflow.contrib.kubernetes.secret.Secret]
+    :type secrets: list[airflow.kubernetes.secret.Secret]
     :param in_cluster: run kubernetes client with in_cluster configuration.
     :type in_cluster: bool
     :param cluster_context: context that points to kubernetes cluster.
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index e324d4a..b1b9951 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -83,7 +83,7 @@ def _get_executor(executor_name):
         from airflow.contrib.executors.mesos_executor import MesosExecutor
         return MesosExecutor()
     elif executor_name == Executors.KubernetesExecutor:
-        from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
+        from airflow.executors.kubernetes_executor import KubernetesExecutor
         return KubernetesExecutor()
     elif executor_name == Executors.DebugExecutor:
         from airflow.executors.debug_executor import DebugExecutor
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
similarity index 99%
rename from airflow/contrib/executors/kubernetes_executor.py
rename to airflow/executors/kubernetes_executor.py
index 9e22edd..32f4fd5 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -33,9 +33,9 @@ from kubernetes.client.rest import ApiException
 from urllib3.exceptions import HTTPError, ReadTimeoutError
 
 from airflow.configuration import conf
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
-from airflow.contrib.kubernetes.kube_client import get_kube_client
-from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
+from airflow.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.kube_client import get_kube_client
+from airflow.kubernetes.worker_configuration import WorkerConfiguration
 from airflow.executors.base_executor import BaseExecutor
 from airflow.executors import Executors
 from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/kubernetes/__init__.py
similarity index 100%
copy from airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
copy to airflow/kubernetes/__init__.py
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py
similarity index 97%
rename from airflow/contrib/kubernetes/kube_client.py
rename to airflow/kubernetes/kube_client.py
index ab37f1d..cff817b 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -26,7 +26,7 @@ try:
     from kubernetes.client.rest import ApiException  # pylint: disable=unused-import
     from kubernetes.client.api_client import ApiClient
     from kubernetes.client import Configuration
-    from airflow.contrib.kubernetes.refresh_config import (  # pylint: disable=ungrouped-imports
+    from airflow.kubernetes.refresh_config import (  # pylint: disable=ungrouped-imports
         load_kube_config,
         RefreshConfiguration,
     )
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/kubernetes/kubernetes_request_factory/__init__.py
similarity index 100%
rename from airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
rename to airflow/kubernetes/kubernetes_request_factory/__init__.py
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
similarity index 100%
rename from airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
rename to airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
similarity index 96%
rename from airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
rename to airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 4c2ea25..3790948 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -16,14 +16,15 @@
 # under the License.
 
 import yaml
-from airflow.contrib.kubernetes.pod import Pod
-from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
+from airflow.kubernetes.pod import Pod
+from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
     import KubernetesRequestFactory
 
 
 class SimplePodRequestFactory(KubernetesRequestFactory):
     """
-    Request generator for a simple pod.
+    Request generator for a pod.
+
     """
     _yaml = """apiVersion: v1
 kind: Pod
@@ -38,6 +39,7 @@ spec:
     """
 
     def __init__(self):
+
         pass
 
     def create(self, pod):
@@ -71,7 +73,9 @@ spec:
 class ExtractXcomPodRequestFactory(KubernetesRequestFactory):
     """
     Request generator for a pod with sidecar container.
+
     """
+
     XCOM_MOUNT_PATH = '/airflow/xcom'
     SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
     _yaml = """apiVersion: v1
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/kubernetes/pod.py
similarity index 98%
rename from airflow/contrib/kubernetes/pod.py
rename to airflow/kubernetes/pod.py
index 4c10e66..a9f3dfe 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -66,7 +66,7 @@ class Pod:
     :param cmds: The command to be run on the pod
     :type cmds: list[str]
     :param secrets: Secrets to be launched to the pod
-    :type secrets: list[airflow.contrib.kubernetes.secret.Secret]
+    :type secrets: list[airflow.kubernetes.secret.Secret]
     :param result: The result that will be returned to the operator after
         successful execution of the pod
     :type result: any
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
similarity index 93%
rename from airflow/contrib/kubernetes/pod_generator.py
rename to airflow/kubernetes/pod_generator.py
index e55cc51..56b41d7 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -15,9 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow.contrib.kubernetes.pod import Pod, Port
-from airflow.contrib.kubernetes.volume import Volume
-from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.kubernetes.pod import Pod, Port
+from airflow.kubernetes.volume import Volume
+from airflow.kubernetes.volume_mount import VolumeMount
+
 import uuid
 
 
@@ -70,7 +71,7 @@ class PodGenerator:
         Adds a Port to the generator
 
         :param port: ports for generated pod
-        :type port: airflow.contrib.kubernetes.pod.Port
+        :type port: airflow.kubernetes.pod.Port
         """
         self.ports.append({'name': port.name, 'containerPort': port.container_port})
 
@@ -79,7 +80,7 @@ class PodGenerator:
         Adds a Volume to the generator
 
         :param volume: volume for generated pod
-        :type volume: airflow.contrib.kubernetes.volume.Volume
+        :type volume: airflow.kubernetes.volume.Volume
         """
 
         self._add_volume(name=volume.name, configs=volume.configs)
@@ -140,7 +141,7 @@ class PodGenerator:
         Adds a VolumeMount to the generator
 
         :param volume_mount: volume for generated pod
-        :type volume_mount: airflow.contrib.kubernetes.volume_mount.VolumeMount
+        :type volume_mount: airflow.kubernetes.volume_mount.VolumeMount
         """
         self._add_mount(
             name=volume_mount.name,
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
similarity index 98%
rename from airflow/contrib/kubernetes/pod_launcher.py
rename to airflow/kubernetes/pod_launcher.py
index 51ee134..8a9028b 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -33,8 +33,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 from airflow import AirflowException
 
-from airflow.contrib.kubernetes.pod import Pod
-from airflow.contrib.kubernetes.kubernetes_request_factory import \
+from airflow.kubernetes.pod import Pod
+from airflow.kubernetes.kubernetes_request_factory import \
     pod_request_factory as pod_factory
 
 from .kube_client import get_kube_client
diff --git a/airflow/contrib/kubernetes/pod_runtime_info_env.py b/airflow/kubernetes/pod_runtime_info_env.py
similarity index 100%
rename from airflow/contrib/kubernetes/pod_runtime_info_env.py
rename to airflow/kubernetes/pod_runtime_info_env.py
diff --git a/airflow/contrib/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py
similarity index 100%
rename from airflow/contrib/kubernetes/refresh_config.py
rename to airflow/kubernetes/refresh_config.py
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/kubernetes/secret.py
similarity index 100%
rename from airflow/contrib/kubernetes/secret.py
rename to airflow/kubernetes/secret.py
diff --git a/airflow/contrib/kubernetes/volume.py b/airflow/kubernetes/volume.py
similarity index 100%
rename from airflow/contrib/kubernetes/volume.py
rename to airflow/kubernetes/volume.py
diff --git a/airflow/contrib/kubernetes/volume_mount.py b/airflow/kubernetes/volume_mount.py
similarity index 100%
rename from airflow/contrib/kubernetes/volume_mount.py
rename to airflow/kubernetes/volume_mount.py
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
similarity index 99%
rename from airflow/contrib/kubernetes/worker_configuration.py
rename to airflow/kubernetes/worker_configuration.py
index 9f4c3ae..74e464a 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -19,8 +19,8 @@ import os
 import six
 
 from airflow.configuration import conf
-from airflow.contrib.kubernetes.pod import Pod, Resources
-from airflow.contrib.kubernetes.secret import Secret
+from airflow.kubernetes.pod import Pod, Resources
+from airflow.kubernetes.secret import Secret
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.version import version as airflow_version
 
diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst
index 1e5787b..d7f7a47 100644
--- a/docs/autoapi_templates/index.rst
+++ b/docs/autoapi_templates/index.rst
@@ -107,9 +107,6 @@ All executors are in the following packages:
 
   airflow/executors/index
 
-  airflow/contrib/executors/index
-
-
 Models
 ------
 Models are built on top of the SQLAlchemy ORM Base class, and instances are
diff --git a/docs/conf.py b/docs/conf.py
index 22126ab..b32a30a 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -213,6 +213,7 @@ exclude_patterns = [
     '_api/airflow/sentry',
     '_api/airflow/stats',
     '_api/airflow/task',
+    '_api/airflow/kubernetes',
     '_api/airflow/ti_deps',
     '_api/airflow/utils',
     '_api/airflow/version',
@@ -479,6 +480,7 @@ autoapi_ignore = [
     '*/airflow/contrib/operators/s3_to_gcs_transfer_operator.py',
     '*/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py',
     '*/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py',
+    '*/airflow/kubernetes/kubernetes_request_factory/*',
 
     '*/node_modules/*',
     '*/migrations/*',
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 7874506..64b3332 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -34,10 +34,10 @@ any type of executor.
 .. code:: python
 
     from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-    from airflow.contrib.kubernetes.secret import Secret
-    from airflow.contrib.kubernetes.volume import Volume
-    from airflow.contrib.kubernetes.volume_mount import VolumeMount
-    from airflow.contrib.kubernetes.pod import Port
+    from airflow.kubernetes.secret import Secret
+    from airflow.kubernetes.volume import Volume
+    from airflow.kubernetes.volume_mount import VolumeMount
+    from airflow.kubernetes.pod import Port
 
 
     secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py
index f4b44ea..2349e74 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -706,7 +706,7 @@ class TestSparkSubmitHook(unittest.TestCase):
         self.assertEqual(kill_cmd[3], '--kill')
         self.assertEqual(kill_cmd[4], 'driver-20171128111415-0001')
 
-    @patch('airflow.contrib.kubernetes.kube_client.get_kube_client')
+    @patch('airflow.kubernetes.kube_client.get_kube_client')
     @patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen')
     def test_k8s_process_on_kill(self, mock_popen, mock_client_method):
         # Given
diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
similarity index 95%
rename from tests/contrib/executors/test_kubernetes_executor.py
rename to tests/executors/test_kubernetes_executor.py
index 7d2fe39..95333fc 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -29,18 +29,22 @@ import six
 
 from tests.compat import mock
 from tests.test_utils.config import conf_vars
+
 try:
     from kubernetes.client.rest import ApiException
-    from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler
-    from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
-    from airflow.contrib.executors.kubernetes_executor import KubeConfig
-    from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig
-    from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
+    from airflow import configuration  # noqa: F401
+    from airflow.configuration import conf  # noqa: F401
+    from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
+    from airflow.executors.kubernetes_executor import KubernetesExecutor
+    from airflow.executors.kubernetes_executor import KubeConfig
+    from airflow.executors.kubernetes_executor import KubernetesExecutorConfig
+    from airflow.kubernetes.worker_configuration import WorkerConfiguration
     from airflow.exceptions import AirflowConfigException
-    from airflow.contrib.kubernetes.secret import Secret
     from airflow.utils.state import State
     from airflow.version import version as airflow_version
-except ImportError:
+    from airflow.kubernetes.secret import Secret
+except ImportError as e:
+    print(e)
     AirflowKubernetesScheduler = None  # type: ignore
 
 if six.PY2:
@@ -168,7 +172,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
             self.skipTest("kubernetes python package is not installed")
 
         self.resources = mock.patch(
-            'airflow.contrib.kubernetes.worker_configuration.Resources'
+            'airflow.kubernetes.worker_configuration.Resources'
         )
 
         for patcher in [self.resources]:
@@ -333,7 +337,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.assertTrue({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
         self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'true'} in env)
         self.assertTrue({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
-                        'value': '/etc/git-secret/known_hosts'} in env)
+                         'value': '/etc/git-secret/known_hosts'} in env)
         self.assertTrue({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
 
     def test_init_environment_using_git_sync_user_without_known_hosts(self):
@@ -385,7 +389,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.assertTrue({'name': 'GIT_SYNC_PASSWORD', 'value': 'git_password'} in env)
         self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'true'} in env)
         self.assertTrue({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
-                        'value': '/etc/git-secret/known_hosts'} in env)
+                         'value': '/etc/git-secret/known_hosts'} in env)
         self.assertFalse({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
 
     def test_make_pod_git_sync_credentials_secret(self):
@@ -538,9 +542,9 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
 
         init_containers = worker_config._get_init_containers()
         git_ssh_key_file = next((x['value'] for x in init_containers[0]['env']
-                                if x['name'] == 'GIT_SSH_KEY_FILE'), None)
+                                 if x['name'] == 'GIT_SSH_KEY_FILE'), None)
         volume_mount_ssh_key = next((x['mountPath'] for x in init_containers[0]['volumeMounts']
-                                    if x['name'] == worker_config.git_sync_ssh_secret_volume_name),
+                                     if x['name'] == worker_config.git_sync_ssh_secret_volume_name),
                                     None)
         self.assertTrue(git_ssh_key_file)
         self.assertTrue(volume_mount_ssh_key)
@@ -977,12 +981,12 @@ class TestKubernetesExecutor(unittest.TestCase):
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
+
     @unittest.skipIf(AirflowKubernetesScheduler is None,
                      'kubernetes python package is not installed')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
-
         # When a quota is exceeded this is the ApiException we get
         r = HTTPResponse(
             body='{"kind": "Status", "apiVersion": "v1", "metadata": {}, "status": "Failure", '
@@ -1019,8 +1023,8 @@ class TestKubernetesExecutor(unittest.TestCase):
         assert mock_kube_client.create_namespaced_pod.called
         self.assertTrue(kubernetesExecutor.task_queue.empty())
 
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.KubeConfig')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.sync')
+    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync')
     @mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')
     @mock.patch('airflow.settings.Stats.gauge')
     def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock_sync, mock_kube_config):
@@ -1031,9 +1035,9 @@ class TestKubernetesExecutor(unittest.TestCase):
                  mock.call('executor.running_tasks', mock.ANY)]
         mock_stats_gauge.assert_has_calls(calls)
 
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.KubeConfig')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
+    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher, mock_kube_config):
         executor = KubernetesExecutor()
         executor.start()
@@ -1041,9 +1045,9 @@ class TestKubernetesExecutor(unittest.TestCase):
         executor._change_state(key, State.RUNNING, 'pod_id', 'default')
         self.assertTrue(executor.event_buffer[key] == State.RUNNING)
 
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
         executor = KubernetesExecutor()
         executor.start()
@@ -1053,9 +1057,9 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
         mock_delete_pod.assert_called_once_with('pod_id', 'default')
 
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_failed_no_deletion(self, mock_delete_pod, mock_get_kube_client,
                                              mock_kubernetes_job_watcher):
         executor = KubernetesExecutor()
@@ -1068,9 +1072,9 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.FAILED)
         mock_delete_pod.assert_not_called()
 
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
                                             mock_kubernetes_job_watcher):
         test_time = timezone.utcnow()
@@ -1083,9 +1087,9 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
         mock_delete_pod.assert_not_called()
 
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
-    @mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
                                               mock_kubernetes_job_watcher):
         executor = KubernetesExecutor()
diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/__init__.py b/tests/kubernetes/__init__.py
similarity index 100%
copy from tests/contrib/kubernetes/kubernetes_request_factory/__init__.py
copy to tests/kubernetes/__init__.py
diff --git a/tests/contrib/kubernetes/__init__.py b/tests/kubernetes/kubernetes_request_factory/__init__.py
similarity index 100%
rename from tests/contrib/kubernetes/__init__.py
rename to tests/kubernetes/kubernetes_request_factory/__init__.py
diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py b/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
similarity index 97%
rename from tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
rename to tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
index edf406f..87d0073 100644
--- a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
+++ b/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
@@ -15,11 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow.contrib.kubernetes.kubernetes_request_factory.\
-    kubernetes_request_factory import KubernetesRequestFactory
-from airflow.contrib.kubernetes.pod import Pod, Resources
-from airflow.contrib.kubernetes.secret import Secret
-from airflow.contrib.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
+from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory import KubernetesRequestFactory
+from airflow.kubernetes.pod import Pod, Resources
+from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
+from airflow.kubernetes.secret import Secret
 from parameterized import parameterized
 import unittest
 import copy
diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py b/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
similarity index 96%
rename from tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
rename to tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
index bca0d2e..68617de 100644
--- a/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
+++ b/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
@@ -15,11 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow.contrib.kubernetes.kubernetes_request_factory.\
-    pod_request_factory import SimplePodRequestFactory, \
+from airflow.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory, \
     ExtractXcomPodRequestFactory
-from airflow.contrib.kubernetes.pod import Pod, Resources
-from airflow.contrib.kubernetes.secret import Secret
+from airflow.kubernetes.pod import Pod, Resources
+from airflow.kubernetes.secret import Secret
 from airflow.exceptions import AirflowConfigException
 import unittest
 
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index 9a59af6..8e60ef4 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -17,19 +17,19 @@
 
 import unittest
 
-from airflow.contrib.kubernetes.kube_client import RefreshConfiguration, get_kube_client
+from airflow.kubernetes.kube_client import RefreshConfiguration, get_kube_client
 from tests.compat import mock
 
 
 class TestClient(unittest.TestCase):
 
-    @mock.patch('airflow.contrib.kubernetes.kube_client.config')
+    @mock.patch('airflow.kubernetes.kube_client.config')
     def test_load_cluster_config(self, _):
         client = get_kube_client(in_cluster=True)
         assert not isinstance(client.api_client.configuration, RefreshConfiguration)
 
-    @mock.patch('airflow.contrib.kubernetes.kube_client.config')
-    @mock.patch('airflow.contrib.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
+    @mock.patch('airflow.kubernetes.kube_client.config')
+    @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
     def test_load_file_config(self, _, _2):
         client = get_kube_client(in_cluster=False)
         assert isinstance(client.api_client.configuration, RefreshConfiguration)
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 8943a10..422d3db 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -20,7 +20,7 @@ import mock
 from requests.exceptions import BaseHTTPError
 
 from airflow import AirflowException
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.pod_launcher import PodLauncher
 
 
 class TestPodLauncher(unittest.TestCase):
diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/__init__.py b/tests/minikube/__init__.py
similarity index 100%
rename from tests/contrib/kubernetes/kubernetes_request_factory/__init__.py
rename to tests/minikube/__init__.py
diff --git a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
index af3e7ef..96e06fe 100644
--- a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
+++ b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
@@ -27,11 +27,11 @@ from kubernetes.client.rest import ApiException
 
 from airflow import AirflowException
 from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow.contrib.kubernetes.pod import Port
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
-from airflow.contrib.kubernetes.secret import Secret
-from airflow.contrib.kubernetes.volume import Volume
-from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.volume import Volume
+from airflow.kubernetes.volume_mount import VolumeMount
 from airflow.version import version as airflow_version
 
 
@@ -115,8 +115,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         k.execute(None)
 
-    @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_config_path(self, client_mock, launcher_mock):
         from airflow.utils.state import State
 
@@ -142,8 +142,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             config_file=file_path
         )
 
-    @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_image_pull_secrets_correctly_set(self, mock_client, mock_launcher):
         from airflow.utils.state import State
 
@@ -166,9 +166,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
         self.assertEqual(mock_launcher.call_args[0][0].image_pull_secrets,
                          fake_pull_secrets)
 
-    @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.delete_pod")
-    @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_pod_delete_even_on_launcher_error(self, mock_client, delete_pod_mock, run_pod_mock):
         k = KubernetesPodOperator(
             namespace='default',
@@ -501,8 +501,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         self.assertEqual(k.execute(None), json.loads(return_value))
 
-    @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_envs_from_configmaps(self, mock_client, mock_launcher):
         # GIVEN
         from airflow.utils.state import State
@@ -525,8 +525,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
         k.execute(None)
         self.assertEqual(mock_launcher.call_args[0][0].configmaps, configmaps)
 
-    @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_envs_from_secrets(self, mock_client, mock_launcher):
         # GIVEN
         from airflow.utils.state import State
diff --git a/tests/test_local_settings.py b/tests/test_local_settings.py
index aeba290..fdc6cb9 100644
--- a/tests/test_local_settings.py
+++ b/tests/test_local_settings.py
@@ -23,7 +23,7 @@ import tempfile
 import unittest
 from tests.compat import MagicMock, Mock, call, patch
 
-from airflow.contrib.kubernetes.pod import Pod
+from airflow.kubernetes.pod import Pod
 
 
 SETTINGS_FILE_POLICY = """