You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/05/29 15:44:39 UTC
[airflow] branch v1-10-test updated: Move k8sexecutor out of
contrib to closer match master (#8904)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 4daccda Move k8sexecutor out of contrib to closer match master (#8904)
4daccda is described below
commit 4daccda3f04ace370ef3f6bf99e88c6faa170e4f
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Fri May 29 08:44:07 2020 -0700
Move k8sexecutor out of contrib to closer match master (#8904)
Considering that the k8s executor is now supported by core
committers, we should move it from contrib to the primary executor
directory.
Co-authored-by: Daniel Imberman <da...@astronomer.io>
---
airflow/contrib/executors/__init__.py | 19 ------
airflow/contrib/hooks/spark_submit_hook.py | 2 +-
airflow/contrib/kubernetes/__init__.py | 5 ++
.../contrib/operators/kubernetes_pod_operator.py | 12 ++--
airflow/executors/__init__.py | 2 +-
.../{contrib => }/executors/kubernetes_executor.py | 6 +-
.../__init__.py | 0
airflow/{contrib => }/kubernetes/kube_client.py | 2 +-
.../kubernetes_request_factory/__init__.py | 0
.../kubernetes_request_factory.py | 0
.../pod_request_factory.py | 10 +++-
airflow/{contrib => }/kubernetes/pod.py | 2 +-
airflow/{contrib => }/kubernetes/pod_generator.py | 13 +++--
airflow/{contrib => }/kubernetes/pod_launcher.py | 4 +-
.../kubernetes/pod_runtime_info_env.py | 0
airflow/{contrib => }/kubernetes/refresh_config.py | 0
airflow/{contrib => }/kubernetes/secret.py | 0
airflow/{contrib => }/kubernetes/volume.py | 0
airflow/{contrib => }/kubernetes/volume_mount.py | 0
.../kubernetes/worker_configuration.py | 4 +-
docs/autoapi_templates/index.rst | 3 -
docs/conf.py | 2 +
docs/kubernetes.rst | 8 +--
tests/contrib/hooks/test_spark_submit_hook.py | 2 +-
.../executors/test_kubernetes_executor.py | 68 ++++++++++++----------
.../__init__.py | 0
.../kubernetes_request_factory}/__init__.py | 0
.../test_kubernetes_request_factory.py | 9 ++-
.../test_pod_request_factory.py | 7 +--
tests/kubernetes/test_client.py | 8 +--
tests/kubernetes/test_pod_launcher.py | 2 +-
.../__init__.py | 0
.../kubernetes/test_kubernetes_pod_operator.py | 32 +++++-----
tests/test_local_settings.py | 2 +-
34 files changed, 108 insertions(+), 116 deletions(-)
diff --git a/airflow/contrib/executors/__init__.py b/airflow/contrib/executors/__init__.py
deleted file mode 100644
index b7f8352..0000000
--- a/airflow/contrib/executors/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index 32bbf91..8fd6c69 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -27,7 +27,7 @@ from airflow.exceptions import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
try:
- from airflow.contrib.kubernetes import kube_client
+ from airflow.kubernetes import kube_client
except ImportError:
pass
diff --git a/airflow/contrib/kubernetes/__init__.py b/airflow/contrib/kubernetes/__init__.py
index 13a8339..ef7074c 100644
--- a/airflow/contrib/kubernetes/__init__.py
+++ b/airflow/contrib/kubernetes/__init__.py
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,3 +16,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+#
+
+from airflow.kubernetes import * # noqa
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index f692599..d121510 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -21,8 +21,8 @@ import warnings
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
-from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher
-from airflow.contrib.kubernetes.pod import Resources
+from airflow.kubernetes import kube_client, pod_generator, pod_launcher
+from airflow.kubernetes.pod import Resources
from airflow.utils.helpers import validate_key
from airflow.utils.state import State
from airflow.version import version as airflow_version
@@ -50,11 +50,11 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
comma separated list: secret_a,secret_b
:type image_pull_secrets: str
:param ports: ports for launched pod.
- :type ports: list[airflow.contrib.kubernetes.pod.Port]
+ :type ports: list[airflow.kubernetes.pod.Port]
:param volume_mounts: volumeMounts for launched pod.
- :type volume_mounts: list[airflow.contrib.kubernetes.volume_mount.VolumeMount]
+ :type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
:param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
- :type volumes: list[airflow.contrib.kubernetes.volume.Volume]
+ :type volumes: list[airflow.kubernetes.volume.Volume]
:param labels: labels to apply to the Pod.
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod.
@@ -66,7 +66,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:type env_vars: dict
:param secrets: Kubernetes secrets to inject in the container.
They can be exposed as environment vars or files in a volume
- :type secrets: list[airflow.contrib.kubernetes.secret.Secret]
+ :type secrets: list[airflow.kubernetes.secret.Secret]
:param in_cluster: run kubernetes client with in_cluster configuration.
:type in_cluster: bool
:param cluster_context: context that points to kubernetes cluster.
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index e324d4a..b1b9951 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -83,7 +83,7 @@ def _get_executor(executor_name):
from airflow.contrib.executors.mesos_executor import MesosExecutor
return MesosExecutor()
elif executor_name == Executors.KubernetesExecutor:
- from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
+ from airflow.executors.kubernetes_executor import KubernetesExecutor
return KubernetesExecutor()
elif executor_name == Executors.DebugExecutor:
from airflow.executors.debug_executor import DebugExecutor
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
similarity index 99%
rename from airflow/contrib/executors/kubernetes_executor.py
rename to airflow/executors/kubernetes_executor.py
index 9e22edd..32f4fd5 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -33,9 +33,9 @@ from kubernetes.client.rest import ApiException
from urllib3.exceptions import HTTPError, ReadTimeoutError
from airflow.configuration import conf
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
-from airflow.contrib.kubernetes.kube_client import get_kube_client
-from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
+from airflow.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.kube_client import get_kube_client
+from airflow.kubernetes.worker_configuration import WorkerConfiguration
from airflow.executors.base_executor import BaseExecutor
from airflow.executors import Executors
from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/kubernetes/__init__.py
similarity index 100%
copy from airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
copy to airflow/kubernetes/__init__.py
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py
similarity index 97%
rename from airflow/contrib/kubernetes/kube_client.py
rename to airflow/kubernetes/kube_client.py
index ab37f1d..cff817b 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -26,7 +26,7 @@ try:
from kubernetes.client.rest import ApiException # pylint: disable=unused-import
from kubernetes.client.api_client import ApiClient
from kubernetes.client import Configuration
- from airflow.contrib.kubernetes.refresh_config import ( # pylint: disable=ungrouped-imports
+ from airflow.kubernetes.refresh_config import ( # pylint: disable=ungrouped-imports
load_kube_config,
RefreshConfiguration,
)
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/kubernetes/kubernetes_request_factory/__init__.py
similarity index 100%
rename from airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
rename to airflow/kubernetes/kubernetes_request_factory/__init__.py
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
similarity index 100%
rename from airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
rename to airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
similarity index 96%
rename from airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
rename to airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 4c2ea25..3790948 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -16,14 +16,15 @@
# under the License.
import yaml
-from airflow.contrib.kubernetes.pod import Pod
-from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
+from airflow.kubernetes.pod import Pod
+from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
import KubernetesRequestFactory
class SimplePodRequestFactory(KubernetesRequestFactory):
"""
- Request generator for a simple pod.
+ Request generator for a pod.
+
"""
_yaml = """apiVersion: v1
kind: Pod
@@ -38,6 +39,7 @@ spec:
"""
def __init__(self):
+
pass
def create(self, pod):
@@ -71,7 +73,9 @@ spec:
class ExtractXcomPodRequestFactory(KubernetesRequestFactory):
"""
Request generator for a pod with sidecar container.
+
"""
+
XCOM_MOUNT_PATH = '/airflow/xcom'
SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
_yaml = """apiVersion: v1
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/kubernetes/pod.py
similarity index 98%
rename from airflow/contrib/kubernetes/pod.py
rename to airflow/kubernetes/pod.py
index 4c10e66..a9f3dfe 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -66,7 +66,7 @@ class Pod:
:param cmds: The command to be run on the pod
:type cmds: list[str]
:param secrets: Secrets to be launched to the pod
- :type secrets: list[airflow.contrib.kubernetes.secret.Secret]
+ :type secrets: list[airflow.kubernetes.secret.Secret]
:param result: The result that will be returned to the operator after
successful execution of the pod
:type result: any
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
similarity index 93%
rename from airflow/contrib/kubernetes/pod_generator.py
rename to airflow/kubernetes/pod_generator.py
index e55cc51..56b41d7 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -15,9 +15,10 @@
# specific language governing permissions and limitations
# under the License.
-from airflow.contrib.kubernetes.pod import Pod, Port
-from airflow.contrib.kubernetes.volume import Volume
-from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.kubernetes.pod import Pod, Port
+from airflow.kubernetes.volume import Volume
+from airflow.kubernetes.volume_mount import VolumeMount
+
import uuid
@@ -70,7 +71,7 @@ class PodGenerator:
Adds a Port to the generator
:param port: ports for generated pod
- :type port: airflow.contrib.kubernetes.pod.Port
+ :type port: airflow.kubernetes.pod.Port
"""
self.ports.append({'name': port.name, 'containerPort': port.container_port})
@@ -79,7 +80,7 @@ class PodGenerator:
Adds a Volume to the generator
:param volume: volume for generated pod
- :type volume: airflow.contrib.kubernetes.volume.Volume
+ :type volume: airflow.kubernetes.volume.Volume
"""
self._add_volume(name=volume.name, configs=volume.configs)
@@ -140,7 +141,7 @@ class PodGenerator:
Adds a VolumeMount to the generator
:param volume_mount: volume for generated pod
- :type volume_mount: airflow.contrib.kubernetes.volume_mount.VolumeMount
+ :type volume_mount: airflow.kubernetes.volume_mount.VolumeMount
"""
self._add_mount(
name=volume_mount.name,
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
similarity index 98%
rename from airflow/contrib/kubernetes/pod_launcher.py
rename to airflow/kubernetes/pod_launcher.py
index 51ee134..8a9028b 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -33,8 +33,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from airflow import AirflowException
-from airflow.contrib.kubernetes.pod import Pod
-from airflow.contrib.kubernetes.kubernetes_request_factory import \
+from airflow.kubernetes.pod import Pod
+from airflow.kubernetes.kubernetes_request_factory import \
pod_request_factory as pod_factory
from .kube_client import get_kube_client
diff --git a/airflow/contrib/kubernetes/pod_runtime_info_env.py b/airflow/kubernetes/pod_runtime_info_env.py
similarity index 100%
rename from airflow/contrib/kubernetes/pod_runtime_info_env.py
rename to airflow/kubernetes/pod_runtime_info_env.py
diff --git a/airflow/contrib/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py
similarity index 100%
rename from airflow/contrib/kubernetes/refresh_config.py
rename to airflow/kubernetes/refresh_config.py
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/kubernetes/secret.py
similarity index 100%
rename from airflow/contrib/kubernetes/secret.py
rename to airflow/kubernetes/secret.py
diff --git a/airflow/contrib/kubernetes/volume.py b/airflow/kubernetes/volume.py
similarity index 100%
rename from airflow/contrib/kubernetes/volume.py
rename to airflow/kubernetes/volume.py
diff --git a/airflow/contrib/kubernetes/volume_mount.py b/airflow/kubernetes/volume_mount.py
similarity index 100%
rename from airflow/contrib/kubernetes/volume_mount.py
rename to airflow/kubernetes/volume_mount.py
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
similarity index 99%
rename from airflow/contrib/kubernetes/worker_configuration.py
rename to airflow/kubernetes/worker_configuration.py
index 9f4c3ae..74e464a 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -19,8 +19,8 @@ import os
import six
from airflow.configuration import conf
-from airflow.contrib.kubernetes.pod import Pod, Resources
-from airflow.contrib.kubernetes.secret import Secret
+from airflow.kubernetes.pod import Pod, Resources
+from airflow.kubernetes.secret import Secret
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.version import version as airflow_version
diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst
index 1e5787b..d7f7a47 100644
--- a/docs/autoapi_templates/index.rst
+++ b/docs/autoapi_templates/index.rst
@@ -107,9 +107,6 @@ All executors are in the following packages:
airflow/executors/index
- airflow/contrib/executors/index
-
-
Models
------
Models are built on top of the SQLAlchemy ORM Base class, and instances are
diff --git a/docs/conf.py b/docs/conf.py
index 22126ab..b32a30a 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -213,6 +213,7 @@ exclude_patterns = [
'_api/airflow/sentry',
'_api/airflow/stats',
'_api/airflow/task',
+ '_api/airflow/kubernetes',
'_api/airflow/ti_deps',
'_api/airflow/utils',
'_api/airflow/version',
@@ -479,6 +480,7 @@ autoapi_ignore = [
'*/airflow/contrib/operators/s3_to_gcs_transfer_operator.py',
'*/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py',
'*/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py',
+ '*/airflow/kubernetes/kubernetes_request_factory/*',
'*/node_modules/*',
'*/migrations/*',
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 7874506..64b3332 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -34,10 +34,10 @@ any type of executor.
.. code:: python
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
- from airflow.contrib.kubernetes.secret import Secret
- from airflow.contrib.kubernetes.volume import Volume
- from airflow.contrib.kubernetes.volume_mount import VolumeMount
- from airflow.contrib.kubernetes.pod import Port
+ from airflow.kubernetes.secret import Secret
+ from airflow.kubernetes.volume import Volume
+ from airflow.kubernetes.volume_mount import VolumeMount
+ from airflow.kubernetes.pod import Port
secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py
index f4b44ea..2349e74 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -706,7 +706,7 @@ class TestSparkSubmitHook(unittest.TestCase):
self.assertEqual(kill_cmd[3], '--kill')
self.assertEqual(kill_cmd[4], 'driver-20171128111415-0001')
- @patch('airflow.contrib.kubernetes.kube_client.get_kube_client')
+ @patch('airflow.kubernetes.kube_client.get_kube_client')
@patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen')
def test_k8s_process_on_kill(self, mock_popen, mock_client_method):
# Given
diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
similarity index 95%
rename from tests/contrib/executors/test_kubernetes_executor.py
rename to tests/executors/test_kubernetes_executor.py
index 7d2fe39..95333fc 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -29,18 +29,22 @@ import six
from tests.compat import mock
from tests.test_utils.config import conf_vars
+
try:
from kubernetes.client.rest import ApiException
- from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler
- from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
- from airflow.contrib.executors.kubernetes_executor import KubeConfig
- from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig
- from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
+ from airflow import configuration # noqa: F401
+ from airflow.configuration import conf # noqa: F401
+ from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
+ from airflow.executors.kubernetes_executor import KubernetesExecutor
+ from airflow.executors.kubernetes_executor import KubeConfig
+ from airflow.executors.kubernetes_executor import KubernetesExecutorConfig
+ from airflow.kubernetes.worker_configuration import WorkerConfiguration
from airflow.exceptions import AirflowConfigException
- from airflow.contrib.kubernetes.secret import Secret
from airflow.utils.state import State
from airflow.version import version as airflow_version
-except ImportError:
+ from airflow.kubernetes.secret import Secret
+except ImportError as e:
+ print(e)
AirflowKubernetesScheduler = None # type: ignore
if six.PY2:
@@ -168,7 +172,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.skipTest("kubernetes python package is not installed")
self.resources = mock.patch(
- 'airflow.contrib.kubernetes.worker_configuration.Resources'
+ 'airflow.kubernetes.worker_configuration.Resources'
)
for patcher in [self.resources]:
@@ -333,7 +337,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.assertTrue({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'true'} in env)
self.assertTrue({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
- 'value': '/etc/git-secret/known_hosts'} in env)
+ 'value': '/etc/git-secret/known_hosts'} in env)
self.assertTrue({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
def test_init_environment_using_git_sync_user_without_known_hosts(self):
@@ -385,7 +389,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.assertTrue({'name': 'GIT_SYNC_PASSWORD', 'value': 'git_password'} in env)
self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'true'} in env)
self.assertTrue({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
- 'value': '/etc/git-secret/known_hosts'} in env)
+ 'value': '/etc/git-secret/known_hosts'} in env)
self.assertFalse({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
def test_make_pod_git_sync_credentials_secret(self):
@@ -538,9 +542,9 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
init_containers = worker_config._get_init_containers()
git_ssh_key_file = next((x['value'] for x in init_containers[0]['env']
- if x['name'] == 'GIT_SSH_KEY_FILE'), None)
+ if x['name'] == 'GIT_SSH_KEY_FILE'), None)
volume_mount_ssh_key = next((x['mountPath'] for x in init_containers[0]['volumeMounts']
- if x['name'] == worker_config.git_sync_ssh_secret_volume_name),
+ if x['name'] == worker_config.git_sync_ssh_secret_volume_name),
None)
self.assertTrue(git_ssh_key_file)
self.assertTrue(volume_mount_ssh_key)
@@ -977,12 +981,12 @@ class TestKubernetesExecutor(unittest.TestCase):
Tests if an ApiException from the Kube Client will cause the task to
be rescheduled.
"""
+
@unittest.skipIf(AirflowKubernetesScheduler is None,
'kubernetes python package is not installed')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
-
# When a quota is exceeded this is the ApiException we get
r = HTTPResponse(
body='{"kind": "Status", "apiVersion": "v1", "metadata": {}, "status": "Failure", '
@@ -1019,8 +1023,8 @@ class TestKubernetesExecutor(unittest.TestCase):
assert mock_kube_client.create_namespaced_pod.called
self.assertTrue(kubernetesExecutor.task_queue.empty())
- @mock.patch('airflow.contrib.executors.kubernetes_executor.KubeConfig')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.sync')
+ @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync')
@mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')
@mock.patch('airflow.settings.Stats.gauge')
def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock_sync, mock_kube_config):
@@ -1031,9 +1035,9 @@ class TestKubernetesExecutor(unittest.TestCase):
mock.call('executor.running_tasks', mock.ANY)]
mock_stats_gauge.assert_has_calls(calls)
- @mock.patch('airflow.contrib.executors.kubernetes_executor.KubeConfig')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
+ @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher, mock_kube_config):
executor = KubernetesExecutor()
executor.start()
@@ -1041,9 +1045,9 @@ class TestKubernetesExecutor(unittest.TestCase):
executor._change_state(key, State.RUNNING, 'pod_id', 'default')
self.assertTrue(executor.event_buffer[key] == State.RUNNING)
- @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = KubernetesExecutor()
executor.start()
@@ -1053,9 +1057,9 @@ class TestKubernetesExecutor(unittest.TestCase):
self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
mock_delete_pod.assert_called_once_with('pod_id', 'default')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_failed_no_deletion(self, mock_delete_pod, mock_get_kube_client,
mock_kubernetes_job_watcher):
executor = KubernetesExecutor()
@@ -1068,9 +1072,9 @@ class TestKubernetesExecutor(unittest.TestCase):
self.assertTrue(executor.event_buffer[key] == State.FAILED)
mock_delete_pod.assert_not_called()
- @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
mock_kubernetes_job_watcher):
test_time = timezone.utcnow()
@@ -1083,9 +1087,9 @@ class TestKubernetesExecutor(unittest.TestCase):
self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
mock_delete_pod.assert_not_called()
- @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
- @mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
mock_kubernetes_job_watcher):
executor = KubernetesExecutor()
diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/__init__.py b/tests/kubernetes/__init__.py
similarity index 100%
copy from tests/contrib/kubernetes/kubernetes_request_factory/__init__.py
copy to tests/kubernetes/__init__.py
diff --git a/tests/contrib/kubernetes/__init__.py b/tests/kubernetes/kubernetes_request_factory/__init__.py
similarity index 100%
rename from tests/contrib/kubernetes/__init__.py
rename to tests/kubernetes/kubernetes_request_factory/__init__.py
diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py b/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
similarity index 97%
rename from tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
rename to tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
index edf406f..87d0073 100644
--- a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
+++ b/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
@@ -15,11 +15,10 @@
# specific language governing permissions and limitations
# under the License.
-from airflow.contrib.kubernetes.kubernetes_request_factory.\
- kubernetes_request_factory import KubernetesRequestFactory
-from airflow.contrib.kubernetes.pod import Pod, Resources
-from airflow.contrib.kubernetes.secret import Secret
-from airflow.contrib.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
+from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory import KubernetesRequestFactory
+from airflow.kubernetes.pod import Pod, Resources
+from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
+from airflow.kubernetes.secret import Secret
from parameterized import parameterized
import unittest
import copy
diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py b/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
similarity index 96%
rename from tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
rename to tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
index bca0d2e..68617de 100644
--- a/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
+++ b/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
@@ -15,11 +15,10 @@
# specific language governing permissions and limitations
# under the License.
-from airflow.contrib.kubernetes.kubernetes_request_factory.\
- pod_request_factory import SimplePodRequestFactory, \
+from airflow.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory, \
ExtractXcomPodRequestFactory
-from airflow.contrib.kubernetes.pod import Pod, Resources
-from airflow.contrib.kubernetes.secret import Secret
+from airflow.kubernetes.pod import Pod, Resources
+from airflow.kubernetes.secret import Secret
from airflow.exceptions import AirflowConfigException
import unittest
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index 9a59af6..8e60ef4 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -17,19 +17,19 @@
import unittest
-from airflow.contrib.kubernetes.kube_client import RefreshConfiguration, get_kube_client
+from airflow.kubernetes.kube_client import RefreshConfiguration, get_kube_client
from tests.compat import mock
class TestClient(unittest.TestCase):
- @mock.patch('airflow.contrib.kubernetes.kube_client.config')
+ @mock.patch('airflow.kubernetes.kube_client.config')
def test_load_cluster_config(self, _):
client = get_kube_client(in_cluster=True)
assert not isinstance(client.api_client.configuration, RefreshConfiguration)
- @mock.patch('airflow.contrib.kubernetes.kube_client.config')
- @mock.patch('airflow.contrib.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
+ @mock.patch('airflow.kubernetes.kube_client.config')
+ @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
def test_load_file_config(self, _, _2):
client = get_kube_client(in_cluster=False)
assert isinstance(client.api_client.configuration, RefreshConfiguration)
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 8943a10..422d3db 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -20,7 +20,7 @@ import mock
from requests.exceptions import BaseHTTPError
from airflow import AirflowException
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.pod_launcher import PodLauncher
class TestPodLauncher(unittest.TestCase):
diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/__init__.py b/tests/minikube/__init__.py
similarity index 100%
rename from tests/contrib/kubernetes/kubernetes_request_factory/__init__.py
rename to tests/minikube/__init__.py
diff --git a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
index af3e7ef..96e06fe 100644
--- a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
+++ b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
@@ -27,11 +27,11 @@ from kubernetes.client.rest import ApiException
from airflow import AirflowException
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow.contrib.kubernetes.pod import Port
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
-from airflow.contrib.kubernetes.secret import Secret
-from airflow.contrib.kubernetes.volume import Volume
-from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.volume import Volume
+from airflow.kubernetes.volume_mount import VolumeMount
from airflow.version import version as airflow_version
@@ -115,8 +115,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
)
k.execute(None)
- @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
- @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_config_path(self, client_mock, launcher_mock):
from airflow.utils.state import State
@@ -142,8 +142,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
config_file=file_path
)
- @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
- @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_image_pull_secrets_correctly_set(self, mock_client, mock_launcher):
from airflow.utils.state import State
@@ -166,9 +166,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
self.assertEqual(mock_launcher.call_args[0][0].image_pull_secrets,
fake_pull_secrets)
- @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
- @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.delete_pod")
- @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+ @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_pod_delete_even_on_launcher_error(self, mock_client, delete_pod_mock, run_pod_mock):
k = KubernetesPodOperator(
namespace='default',
@@ -501,8 +501,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
)
self.assertEqual(k.execute(None), json.loads(return_value))
- @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
- @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_envs_from_configmaps(self, mock_client, mock_launcher):
# GIVEN
from airflow.utils.state import State
@@ -525,8 +525,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
k.execute(None)
self.assertEqual(mock_launcher.call_args[0][0].configmaps, configmaps)
- @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
- @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_envs_from_secrets(self, mock_client, mock_launcher):
# GIVEN
from airflow.utils.state import State
diff --git a/tests/test_local_settings.py b/tests/test_local_settings.py
index aeba290..fdc6cb9 100644
--- a/tests/test_local_settings.py
+++ b/tests/test_local_settings.py
@@ -23,7 +23,7 @@ import tempfile
import unittest
from tests.compat import MagicMock, Mock, call, patch
-from airflow.contrib.kubernetes.pod import Pod
+from airflow.kubernetes.pod import Pod
SETTINGS_FILE_POLICY = """