You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/19 13:24:56 UTC
[airflow] 03/03: Change back example_kubernetes_executor_config to
KubernetesExecutor
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 49d733223fe35016c421f7fc0ba41215427add68
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Wed Nov 18 20:25:32 2020 -0800
Change back example_kubernetes_executor_config to KubernetesExecutor
Fixes tests by removing pod_override as we are going to have users wait
until Airflow 2.0 to use the pod_override feature
---
.../example_kubernetes_executor_config.py | 96 ++++------------------
chart/requirements.lock | 6 +-
docs/executor/kubernetes.rst | 37 ---------
tests/serialization/test_dag_serialization.py | 8 --
4 files changed, 19 insertions(+), 128 deletions(-)
diff --git a/airflow/example_dags/example_kubernetes_executor_config.py b/airflow/example_dags/example_kubernetes_executor_config.py
index e3f42d0..e06e8ab 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -1,3 +1,4 @@
+
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
@@ -22,8 +23,6 @@ This is an example dag for using a Kubernetes Executor Configuration.
from __future__ import print_function
import os
-from kubernetes.client import models as k8s
-
from airflow.contrib.example_dags.libs.helper import print_stuff
from airflow.models import DAG
@@ -42,20 +41,6 @@ with DAG(
schedule_interval=None
) as dag:
- def test_sharedvolume_mount():
- """
- Tests whether the volume has been mounted.
- """
- for i in range(5):
- try:
- return_code = os.system("cat /shared/test.txt")
- if return_code != 0:
- raise ValueError("Error when checking volume mount. Return code {return_code}"
- .format(return_code=return_code))
- except ValueError as e:
- if i > 4:
- raise e
-
def test_volume_mount():
"""
Tests whether the volume has been mounted.
@@ -77,74 +62,27 @@ with DAG(
}
)
- # [START task_with_volume]
-
# You can mount volume or secret to the worker pod
second_task = PythonOperator(
task_id="four_task",
python_callable=test_volume_mount,
executor_config={
- "pod_override": k8s.V1Pod(
- spec=k8s.V1PodSpec(
- containers=[
- k8s.V1Container(
- name="base",
- volume_mounts=[
- k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
- ],
- )
- ],
- volumes=[
- k8s.V1Volume(
- name="example-kubernetes-test-volume",
- host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
- )
- ],
- )
- ),
- },
- )
- # [END task_with_volume]
-
- # [START task_with_template]
- task_with_template = PythonOperator(
- task_id="task_with_template",
- python_callable=print_stuff,
- executor_config={
- "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
- "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
- },
- )
- # [END task_with_template]
-
- # [START task_with_sidecar]
- sidecar_task = PythonOperator(
- task_id="task_with_sidecar",
- python_callable=test_sharedvolume_mount,
- executor_config={
- "pod_override": k8s.V1Pod(
- spec=k8s.V1PodSpec(
- containers=[
- k8s.V1Container(
- name="base",
- volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
- ),
- k8s.V1Container(
- name="sidecar",
- image="ubuntu",
- args=["echo \"retrieved from mount\" > /shared/test.txt"],
- command=["bash", "-cx"],
- volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
- ),
- ],
- volumes=[
- k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
- ],
- )
- ),
- },
+ "KubernetesExecutor": {
+ "volumes": [
+ {
+ "name": "example-kubernetes-test-volume",
+ "hostPath": {"path": "/tmp/"},
+ },
+ ],
+ "volume_mounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ },
+ ]
+ }
+ }
)
- # [END task_with_sidecar]
# Test that we can add labels to pods
third_task = PythonOperator(
@@ -174,5 +112,3 @@ with DAG(
start_task >> second_task >> third_task
start_task >> other_ns_task
- start_task >> sidecar_task
- start_task >> task_with_template
diff --git a/chart/requirements.lock b/chart/requirements.lock
index eb62c80..4999d6b 100644
--- a/chart/requirements.lock
+++ b/chart/requirements.lock
@@ -1,6 +1,6 @@
dependencies:
- name: postgresql
- repository: https://kubernetes-charts.storage.googleapis.com
+ repository: https://charts.helm.sh/stable/
version: 6.3.12
-digest: sha256:58d88cf56e78b2380091e9e16cc6ccf58b88b3abe4a1886dd47cd9faef5309af
-generated: "2020-11-04T15:59:36.967913-08:00"
+digest: sha256:1748aa702050d4e72ffba1b18960f49bfe5368757cf976116afeffbdedda1c98
+generated: "2020-11-18T20:19:23.9885-08:00"
diff --git a/docs/executor/kubernetes.rst b/docs/executor/kubernetes.rst
index ed172a8..3eabb51 100644
--- a/docs/executor/kubernetes.rst
+++ b/docs/executor/kubernetes.rst
@@ -92,43 +92,6 @@ pod_template_file using the ``dag_in_image`` setting:
:start-after: [START git_sync_template]
:end-before: [END git_sync_template]
-.. _concepts:pod_override:
-
-pod_override
-############
-
-When using the KubernetesExecutor, Airflow offers the ability to override system defaults on a per-task basis.
-To utilize this functionality, create a Kubernetes V1pod object and fill in your desired overrides.
-Please note that the scheduler will override the ``metadata.name`` of the V1pod before launching it.
-
-To overwrite the base container of the pod launched by the KubernetesExecutor,
-create a V1pod with a single container, and overwrite the fields as follows:
-
-.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py
- :language: python
- :start-after: [START task_with_volume]
- :end-before: [END task_with_volume]
-
-Note that volume mounts environment variables, ports, and devices will all be extended instead of overwritten.
-
-To add a sidecar container to the launched pod, create a V1pod with an empty first container with the
-name ``base`` and a second container containing your desired sidecar.
-
-.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py
- :language: python
- :start-after: [START task_with_sidecar]
- :end-before: [END task_with_sidecar]
-
-You can also create custom ``pod_template_file`` on a per-task basis so that you can recycle the same base values between multiple tasks.
-This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override_spec``.
-
-Here is an example of a task with both features:
-
-.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py
- :language: python
- :start-after: [START task_with_template]
- :end-before: [END task_with_template]
-
KubernetesExecutor Architecture
################################
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 30493fd..d999cb0 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -259,14 +259,6 @@ class TestStringifiedDAGs(unittest.TestCase):
assert sorted_serialized_dag(ground_truth_dag) == sorted_serialized_dag(json_dag)
- def test_deser_k8s_pod_override(self):
- dag = collect_dags('airflow/example_dags')['example_kubernetes_executor_config']
- serialized = SerializedDAG.to_json(dag)
- deser_dag = SerializedDAG.from_json(serialized)
- p1 = dag.tasks[1].executor_config
- p2 = deser_dag.tasks[1].executor_config
- self.assertDictEqual(p1['pod_override'].to_dict(), p2['pod_override'].to_dict())
-
def test_deserialization_across_process(self):
"""A serialized DAG can be deserialized in another process."""