You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/23 19:38:20 UTC
[airflow] 01/02: Move KubernetesPodOperator into providers package
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2324175fab7aff0d35651825e48eb7d651c8a845
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Tue Jun 23 11:49:51 2020 -0700
Move KubernetesPodOperator into providers package
This commit will make merging PRs simpler while also
maintaining existing paths.
---
.../contrib/operators/kubernetes_pod_operator.py | 276 +--------------------
.../cncf/kubernetes/operators/kubernetes_pod.py} | 0
chart/charts/postgresql-6.3.12.tgz | Bin 0 -> 22754 bytes
3 files changed, 12 insertions(+), 264 deletions(-)
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index b89a37f..382f965 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -17,272 +17,20 @@
"""Executes task in a Kubernetes POD"""
import warnings
-import re
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator as K8sPodOp
-from airflow.exceptions import AirflowException
-from airflow.kubernetes import kube_client, pod_generator, pod_launcher
-from airflow.kubernetes.k8s_model import append_to_pod
-from airflow.kubernetes.pod import Resources
-from airflow.models import BaseOperator
-from airflow.utils.decorators import apply_defaults
-from airflow.utils.helpers import validate_key
-from airflow.utils.state import State
-from airflow.version import version as airflow_version
-
-class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-attributes
+class KubernetesPodOperator(K8sPodOp):
"""
- Execute a task in a Kubernetes Pod
-
- .. note::
- If you use `Google Kubernetes Engine <https://cloud.google.com/kubernetes-engine/>`__, use
- :class:`~airflow.gcp.operators.kubernetes_engine.GKEPodOperator`, which
- simplifies the authorization process.
-
- :param image: Docker image you wish to launch. Defaults to hub.docker.com,
- but fully qualified URLS will point to custom repositories.
- :type image: str
- :param name: name of the pod in which the task will run, will be used (plus a random
- suffix) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
- :type name: str
- :param cmds: entrypoint of the container. (templated)
- The docker images's entrypoint is used if this is not provided.
- :type cmds: list[str]
- :param arguments: arguments of the entrypoint. (templated)
- The docker image's CMD is used if this is not provided.
- :type arguments: list[str]
- :param image_pull_policy: Specify a policy to cache or always pull an image.
- :type image_pull_policy: str
- :param image_pull_secrets: Any image pull secrets to be given to the pod.
- If more than one secret is required, provide a
- comma separated list: secret_a,secret_b
- :type image_pull_secrets: str
- :param ports: ports for launched pod.
- :type ports: list[airflow.kubernetes.pod.Port]
- :param volume_mounts: volumeMounts for launched pod.
- :type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
- :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
- :type volumes: list[airflow.kubernetes.volume.Volume]
- :param labels: labels to apply to the Pod.
- :type labels: dict
- :param startup_timeout_seconds: timeout in seconds to startup the pod.
- :type startup_timeout_seconds: int
- :param name: name of the pod in which the task will run, will be used to
- generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
- :type name: str
- :param env_vars: Environment variables initialized in the container. (templated)
- :type env_vars: dict
- :param secrets: Kubernetes secrets to inject in the container.
- They can be exposed as environment vars or files in a volume.
- :type secrets: list[airflow.kubernetes.secret.Secret]
- :param in_cluster: run kubernetes client with in_cluster configuration.
- :type in_cluster: bool
- :param cluster_context: context that points to kubernetes cluster.
- Ignored when in_cluster is True. If None, current-context is used.
- :type cluster_context: str
- :param get_logs: get the stdout of the container as logs of the tasks.
- :type get_logs: bool
- :param annotations: non-identifying metadata you can attach to the Pod.
- Can be a large range of data, and can include characters
- that are not permitted by labels.
- :type annotations: dict
- :param resources: A dict containing resources requests and limits.
- Possible keys are request_memory, request_cpu, limit_memory, limit_cpu,
- and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources.
- See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
- :type resources: dict
- :param affinity: A dict containing a group of affinity scheduling rules.
- :type affinity: dict
- :param node_selectors: A dict containing a group of scheduling rules.
- :type node_selectors: dict
- :param config_file: The path to the Kubernetes config file. (templated)
- :param config_file: The path to the Kubernetes config file. (templated)
- If not specified, default value is ``~/.kube/config``
- :type config_file: str
- :param do_xcom_push: If do_xcom_push is True, the content of the file
- /airflow/xcom/return.json in the container will also be pushed to an
- XCom when the container completes.
- :type do_xcom_push: bool
- :param is_delete_operator_pod: What to do when the pod reaches its final
- state, or the execution is interrupted.
- If False (default): do nothing, If True: delete the pod
- :type is_delete_operator_pod: bool
- :param hostnetwork: If True enable host networking on the pod.
- :type hostnetwork: bool
- :param tolerations: A list of kubernetes tolerations.
- :type tolerations: list tolerations
- :param configmaps: A list of configmap names objects that we
- want mount as env variables.
- :type configmaps: list[str]
- :param pod_runtime_info_envs: environment variables about
- pod runtime information (ip, namespace, nodeName, podName).
- :type pod_runtime_info_envs: list[airflow.kubernetes.pod_runtime_info_env.PodRuntimeInfoEnv]
- :param security_context: security options the pod should run with (PodSecurityContext).
- :type security_context: dict
- :param dnspolicy: dnspolicy for the pod.
- :type dnspolicy: str
- :param full_pod_spec: The complete podSpec
- :type full_pod_spec: kubernetes.client.models.V1Pod
+ This class is deprecated.
+ Please use `airflow.providers.cncd.kubernetes.operators.kubernetes_pod.KubernetesPodOperator
"""
- template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
-
- def execute(self, context):
- try:
- client = kube_client.get_kube_client(in_cluster=self.in_cluster,
- cluster_context=self.cluster_context,
- config_file=self.config_file)
- # Add Airflow Version to the label
- # And a label to identify that pod is launched by KubernetesPodOperator
- self.labels.update(
- {
- 'airflow_version': airflow_version.replace('+', '-'),
- 'kubernetes_pod_operator': 'True',
- }
- )
-
- pod = pod_generator.PodGenerator(
- image=self.image,
- namespace=self.namespace,
- cmds=self.cmds,
- args=self.arguments,
- labels=self.labels,
- name=self.name,
- envs=self.env_vars,
- extract_xcom=self.do_xcom_push,
- image_pull_policy=self.image_pull_policy,
- node_selectors=self.node_selectors,
- priority_class_name=self.priority_class_name,
- annotations=self.annotations,
- affinity=self.affinity,
- init_containers=self.init_containers,
- image_pull_secrets=self.image_pull_secrets,
- service_account_name=self.service_account_name,
- hostnetwork=self.hostnetwork,
- tolerations=self.tolerations,
- configmaps=self.configmaps,
- security_context=self.security_context,
- dnspolicy=self.dnspolicy,
- pod=self.full_pod_spec,
- ).gen_pod()
-
- pod = append_to_pod(
- pod,
- self.pod_runtime_info_envs +
- self.ports +
- self.resources +
- self.secrets +
- self.volumes +
- self.volume_mounts
- )
-
- self.pod = pod
-
- launcher = pod_launcher.PodLauncher(kube_client=client,
- extract_xcom=self.do_xcom_push)
-
- try:
- (final_state, result) = launcher.run_pod(
- pod,
- startup_timeout=self.startup_timeout_seconds,
- get_logs=self.get_logs)
- finally:
- if self.is_delete_operator_pod:
- launcher.delete_pod(pod)
-
- if final_state != State.SUCCESS:
- raise AirflowException(
- 'Pod returned a failure: {state}'.format(state=final_state)
- )
-
- return result
- except AirflowException as ex:
- raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
-
- def _set_resources(self, resources):
- return [Resources(**resources) if resources else Resources()]
-
- def _set_name(self, name):
- validate_key(name, max_length=220)
- return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
- @apply_defaults
- def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
- namespace,
- image,
- name,
- cmds=None,
- arguments=None,
- ports=None,
- volume_mounts=None,
- volumes=None,
- env_vars=None,
- secrets=None,
- in_cluster=None,
- cluster_context=None,
- labels=None,
- startup_timeout_seconds=120,
- get_logs=True,
- image_pull_policy='IfNotPresent',
- annotations=None,
- resources=None,
- affinity=None,
- init_containers=None,
- config_file=None,
- do_xcom_push=False,
- node_selectors=None,
- image_pull_secrets=None,
- service_account_name='default',
- is_delete_operator_pod=False,
- hostnetwork=False,
- tolerations=None,
- configmaps=None,
- security_context=None,
- pod_runtime_info_envs=None,
- dnspolicy=None,
- full_pod_spec=None,
- priority_class_name=None,
- *args,
- **kwargs):
- # https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag
- if kwargs.get('xcom_push') is not None:
- kwargs['do_xcom_push'] = kwargs.pop('xcom_push')
- warnings.warn(
- "`xcom_push` will be deprecated. Use `do_xcom_push` instead.",
- DeprecationWarning, stacklevel=2
- )
- super(KubernetesPodOperator, self).__init__(*args, resources=None, **kwargs)
- self.pod = None
- self.do_xcom_push = do_xcom_push
- self.image = image
- self.namespace = namespace
- self.cmds = cmds or []
- self.arguments = arguments or []
- self.labels = labels or {}
- self.startup_timeout_seconds = startup_timeout_seconds
- self.name = self._set_name(name)
- self.env_vars = env_vars or {}
- self.ports = ports or []
- self.init_containers = init_containers or []
- self.priority_class_name = priority_class_name
- self.volume_mounts = volume_mounts or []
- self.volumes = volumes or []
- self.secrets = secrets or []
- self.in_cluster = in_cluster
- self.cluster_context = cluster_context
- self.get_logs = get_logs
- self.image_pull_policy = image_pull_policy
- self.node_selectors = node_selectors or {}
- self.annotations = annotations or {}
- self.affinity = affinity or {}
- self.resources = self._set_resources(resources)
- self.config_file = config_file
- self.image_pull_secrets = image_pull_secrets
- self.service_account_name = service_account_name
- self.is_delete_operator_pod = is_delete_operator_pod
- self.hostnetwork = hostnetwork
- self.tolerations = tolerations or []
- self.configmaps = configmaps or []
- self.security_context = security_context or {}
- self.pod_runtime_info_envs = pod_runtime_info_envs or []
- self.dnspolicy = dnspolicy
- self.full_pod_spec = full_pod_spec
+ def __init__(self, *args, **kwargs):
+ warnings.warn(
+ """This class is deprecated
+ Please use
+ `airflow.providers.cncd.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.`.""",
+ DeprecationWarning, stacklevel=2
+ )
+ super(KubernetesPodOperator, self).__init__(*args, **kwargs)
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
similarity index 100%
copy from airflow/contrib/operators/kubernetes_pod_operator.py
copy to airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
diff --git a/chart/charts/postgresql-6.3.12.tgz b/chart/charts/postgresql-6.3.12.tgz
new file mode 100644
index 0000000..51751d7
Binary files /dev/null and b/chart/charts/postgresql-6.3.12.tgz differ