You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/12 21:10:56 UTC
[airflow] 01/08: Modify helm chart to use pod_template_file (#10872)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cd5f064d9fb904a0b4981919ed41bea9afb79897
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Fri Sep 11 10:47:59 2020 -0700
Modify helm chart to use pod_template_file (#10872)
* Modify helm chart to use pod_template_file
Since we are deprecating most k8sexecutor arguments
we should use the pod_template_file when launching airflow
using the KubernetesExecutor
* fix tests
* one more nit
* fix dag command
* fix pylint
(cherry picked from commit 56bd9b7d6b494251fa728ff6a7eb06d6d7eeb2c8)
---
.pre-commit-config.yaml | 2 +-
airflow/executors/kubernetes_executor.py | 7 +-
airflow/kubernetes/pod_generator.py | 47 ++++++-
chart/files/pod-template-file.yaml | 105 +++++++++++++++
chart/templates/_helpers.yaml | 4 +
chart/templates/configmap.yaml | 8 ++
.../templates/scheduler/scheduler-deployment.yaml | 4 +
chart/tests/pod-template-file_test.yaml | 149 +++++++++++++++++++++
docs/production-deployment.rst | 5 -
scripts/ci/kubernetes/ci_run_helm_testing.sh | 7 +-
tests/kubernetes/test_pod_generator.py | 73 ++++++----
tests/kubernetes/test_worker_configuration.py | 7 +-
12 files changed, 371 insertions(+), 47 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 1d278e5..eee592e 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -25,7 +25,7 @@ repos:
rev: v1.1.9
hooks:
- id: forbid-tabs
- exclude: ^docs/Makefile$
+ exclude: ^docs/Makefile$|^clients/gen/go.sh
- id: insert-license
name: Add license for all SQL files
files: \.sql$
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 0ae8f16..73dd91e 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -440,10 +440,11 @@ class AirflowKubernetesScheduler(LoggingMixin):
dag_id=pod_generator.make_safe_label_value(dag_id),
task_id=pod_generator.make_safe_label_value(task_id),
try_number=try_number,
- date=self._datetime_to_label_safe_datestring(execution_date),
+ kube_image=self.kube_config.kube_image,
+ date=execution_date,
command=command,
- kube_executor_config=kube_executor_config,
- worker_config=self.worker_configuration_pod
+ pod_override_object=kube_executor_config,
+ base_worker_pod=self.worker_configuration_pod
)
sanitized_pod = self.launcher._client.api_client.sanitize_for_serialization(pod)
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 4fbfec1..5a57230 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -30,6 +30,7 @@ from functools import reduce
import kubernetes.client.models as k8s
import yaml
+from dateutil import parser
from kubernetes.client.api_client import ApiClient
from airflow.contrib.kubernetes.pod import _extract_volume_mounts
@@ -92,6 +93,30 @@ def make_safe_label_value(string):
return safe_label
+def datetime_to_label_safe_datestring(datetime_obj):
+ """
+ Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but
+ not "_" let's
+ replace ":" with "_"
+
+ :param datetime_obj: datetime.datetime object
+ :return: ISO-like string representing the datetime
+ """
+ return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_')
+
+
+def label_safe_datestring_to_datetime(string):
+ """
+ Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not
+ "_", let's
+ replace ":" with "_"
+
+ :param string: str
+ :return: datetime.datetime object
+ """
+ return parser.parse(string.replace('_plus_', '+').replace("_", ":"))
+
+
class PodGenerator(object):
"""
Contains Kubernetes Airflow Worker configuration logic
@@ -496,10 +521,11 @@ class PodGenerator(object):
task_id,
pod_id,
try_number,
+ kube_image,
date,
command,
- kube_executor_config,
- worker_config,
+ pod_override_object,
+ base_worker_pod,
namespace,
worker_uuid
):
@@ -511,22 +537,29 @@ class PodGenerator(object):
"""
dynamic_pod = PodGenerator(
namespace=namespace,
+ image=kube_image,
labels={
'airflow-worker': worker_uuid,
- 'dag_id': dag_id,
- 'task_id': task_id,
- 'execution_date': date,
+ 'dag_id': make_safe_label_value(dag_id),
+ 'task_id': make_safe_label_value(task_id),
+ 'execution_date': datetime_to_label_safe_datestring(date),
'try_number': str(try_number),
'airflow_version': airflow_version.replace('+', '-'),
'kubernetes_executor': 'True',
},
+ annotations={
+ 'dag_id': dag_id,
+ 'task_id': task_id,
+ 'execution_date': date.isoformat(),
+ 'try_number': str(try_number),
+ },
cmds=command,
name=pod_id
).gen_pod()
# Reconcile the pods starting with the first chronologically,
- # Pod from the airflow.cfg -> Pod from executor_config arg -> Pod from the K8s executor
- pod_list = [worker_config, kube_executor_config, dynamic_pod]
+ # Pod from the pod_template_File -> Pod from executor_config arg -> Pod from the K8s executor
+ pod_list = [base_worker_pod, pod_override_object, dynamic_pod]
return reduce(PodGenerator.reconcile_pods, pod_list)
diff --git a/chart/files/pod-template-file.yaml b/chart/files/pod-template-file.yaml
new file mode 100644
index 0000000..b19edf1
--- /dev/null
+++ b/chart/files/pod-template-file.yaml
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+apiVersion: v1
+kind: Pod
+metadata:
+ name: dummy-name
+spec:
+{{- if .Values.dags.gitSync.enabled }}
+ initContainers:
+{{- include "git_sync_container" . | indent 8 }}
+{{- end }}
+ containers:
+ - args: []
+ command: []
+ env:
+ - name: AIRFLOW__CORE__EXECUTOR
+ value: LocalExecutor
+{{- include "standard_airflow_environment" . | indent 4 }}
+ envFrom: []
+ image: dummy_image
+ imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
+ name: base
+ ports: []
+ volumeMounts:
+ - mountPath: {{ template "airflow_logs" . }}
+ name: airflow-logs
+{{- if or .Values.dags.persistence.enabled .Values.dags.gitSync.enabled }}
+ - mountPath: {{ template "airflow_dags_mount_path" . }}
+ name: airflow-dags
+ readOnly: false
+{{- end }}
+{{- if .Values.dags.gitSync.sshKeySecret }}
+ - mountPath: /etc/git-secret/known_hosts
+ name: {{ .Values.dags.gitSync.knownHosts }}
+ subPath: known_hosts
+{{- end }}
+{{- if .Values.dags.gitSync.sshKeySecret }}
+ - mountPath: /etc/git-secret/ssh
+ name: git-sync-ssh-key
+ subPath: ssh
+{{- end }}
+{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}
+ - mountPath: {{ include "airflow_dags_mount_path" . }}
+ name: airflow-dags
+ readOnly: true
+{{- if .Values.dags.persistence.enabled }}
+ subPath: {{.Values.dags.gitSync.dest }}/{{ .Values.dags.gitSync.subPath }}
+{{- end }}
+{{- end }}
+ hostNetwork: false
+ {{- if or .Values.registry.secretName .Values.registry.connection }}
+ imagePullSecrets:
+ - name: {{ template "registry_secret" . }}
+ {{- end }}
+ restartPolicy: Never
+ securityContext:
+ runAsUser: {{ .Values.uid }}
+ nodeSelector:
+ {{ toYaml .Values.nodeSelector | indent 8 }}
+ affinity:
+ {{ toYaml .Values.affinity | indent 8 }}
+ tolerations:
+ {{ toYaml .Values.tolerations | indent 8 }}
+ serviceAccountName: '{{ .Release.Name }}-worker-serviceaccount'
+ volumes:
+ {{- if .Values.dags.persistence.enabled }}
+ - name: dags
+ persistentVolumeClaim:
+ claimName: {{ template "airflow_dags_volume_claim" . }}
+ {{- else if .Values.dags.gitSync.enabled }}
+ - name: dags
+ emptyDir: {}
+ {{- end }}
+ {{- if and .Values.dags.gitSync.enabled .Values.dags.gitSync.sshKeySecret }}
+{{- include "git_sync_ssh_key_volume" . | indent 2 }}
+ {{- end }}
+ - emptyDir: {}
+ name: airflow-logs
+{{- if .Values.dags.gitSync.knownHosts }}
+ - configMap:
+ defaultMode: 288
+ name: {{ include "airflow_config" . }}
+ name: git-sync-known-hosts
+{{- end }}
+ - configMap:
+ name: {{ include "airflow_config" . }}
+ name: airflow-config
+ - configMap:
+ name: {{ include "airflow_config" . }}
+ name: airflow-local-settings
diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml
index 898924f..49c3b3f 100644
--- a/chart/templates/_helpers.yaml
+++ b/chart/templates/_helpers.yaml
@@ -213,6 +213,10 @@
{{ default (printf "%s-airflow-result-backend" .Release.Name) .Values.data.resultBackendSecretName }}
{{- end }}
+{{ define "airflow_pod_template_file" -}}
+{{ (printf "%s/pod_templates" .Values.airflowHome) }}
+{{- end }}
+
{{ define "pgbouncer_config_secret" -}}
{{ .Release.Name }}-pgbouncer-config
{{- end }}
diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml
index f0e09a0..cc9a388 100644
--- a/chart/templates/configmap.yaml
+++ b/chart/templates/configmap.yaml
@@ -55,3 +55,11 @@ data:
known_hosts: |
{{ .Values.dags.gitSync.knownHosts | nindent 4 }}
{{- end }}
+{{- if eq .Values.executor "KubernetesExecutor" }}
+ pod_template_file.yaml: |-
+{{- if .Values.podTemplate }}
+ {{ .Values.podTemplate | nindent 4 }}
+{{- else }}
+{{ tpl (.Files.Get "files/pod-template-file.yaml") . | nindent 4 }}
+{{- end }}
+{{- end }}
diff --git a/chart/templates/scheduler/scheduler-deployment.yaml b/chart/templates/scheduler/scheduler-deployment.yaml
index 9331556..f2b4a99 100644
--- a/chart/templates/scheduler/scheduler-deployment.yaml
+++ b/chart/templates/scheduler/scheduler-deployment.yaml
@@ -133,6 +133,10 @@ spec:
resources:
{{ toYaml .Values.scheduler.resources | indent 12 }}
volumeMounts:
+ - name: config
+ mountPath: {{ include "airflow_pod_template_file" . }}/pod_template_file.yaml
+ subPath: pod_template_file.yaml
+ readOnly: true
- name: logs
mountPath: {{ template "airflow_logs" . }}
- name: config
diff --git a/chart/tests/pod-template-file_test.yaml b/chart/tests/pod-template-file_test.yaml
new file mode 100644
index 0000000..64e99f8
--- /dev/null
+++ b/chart/tests/pod-template-file_test.yaml
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+templates:
+ - pod-template-file.yaml
+tests:
+ - it: should work
+ asserts:
+ - isKind:
+ of: Pod
+ - equal:
+ path: spec.containers[0].image
+ value: dummy_image
+ - equal:
+ path: spec.containers[0].name
+ value: base
+ - it: should add an initContainer if gitSync is true
+ set:
+ dags:
+ gitSync:
+ enabled: true
+ containerName: git-sync-test
+ containerTag: test-tag
+ containerRepository: test-registry/test-repo
+ wait: 66
+ maxFailures: 70
+ subPath: "path1/path2"
+ dest: "test-dest"
+ root: "/git-root"
+ rev: HEAD
+ depth: 1
+ repo: https://github.com/apache/airflow.git
+ branch: test-branch
+ sshKeySecret: ~
+ credentialsSecret: ~
+ knownHosts: ~
+ asserts:
+ - isKind:
+ of: Pod
+ - equal:
+ path: spec.initContainers[0]
+ value:
+ name: git-sync-test
+ image: test-registry/test-repo:test-tag
+ env:
+ - name: GIT_SYNC_REV
+ value: HEAD
+ - name: GIT_SYNC_BRANCH
+ value: test-branch
+ - name: GIT_SYNC_REPO
+ value: https://github.com/apache/airflow.git
+ - name: GIT_SYNC_DEPTH
+ value: "1"
+ - name: GIT_SYNC_ROOT
+ value: /git-root
+ - name: GIT_SYNC_DEST
+ value: test-dest
+ - name: GIT_SYNC_ADD_USER
+ value: "true"
+ - name: GIT_SYNC_WAIT
+ value: "66"
+ - name: GIT_SYNC_MAX_SYNC_FAILURES
+ value: "70"
+ volumeMounts:
+ - mountPath: /git-root
+ name: dags
+ - it: validate if ssh params are added
+ set:
+ dags:
+ gitSync:
+ enabled: true
+ containerName: git-sync-test
+ sshKeySecret: ssh-secret
+ knownHosts: ~
+ branch: test-branch
+ asserts:
+ - contains:
+ path: spec.initContainers[0].env
+ content:
+ name: GIT_SSH_KEY_FILE
+ value: "/etc/git-secret/ssh"
+ - contains:
+ path: spec.initContainers[0].env
+ content:
+ name: GIT_SYNC_SSH
+ value: "true"
+ - contains:
+ path: spec.initContainers[0].env
+ content:
+ name: GIT_KNOWN_HOSTS
+ value: "false"
+ - contains:
+ path: spec.volumes
+ content:
+ name: git-sync-ssh-key
+ secret:
+ secretName: ssh-secret
+ defaultMode: 288
+ - it: should set username and pass env variables
+ set:
+ dags:
+ gitSync:
+ enabled: true
+ credentialsSecret: user-pass-secret
+ sshKeySecret: ~
+ asserts:
+ - contains:
+ path: spec.initContainers[0].env
+ content:
+ name: GIT_SYNC_USERNAME
+ valueFrom:
+ secretKeyRef:
+ name: user-pass-secret
+ key: GIT_SYNC_USERNAME
+ - contains:
+ path: spec.initContainers[0].env
+ content:
+ name: GIT_SYNC_PASSWORD
+ valueFrom:
+ secretKeyRef:
+ name: user-pass-secret
+ key: GIT_SYNC_PASSWORD
+ - it: should set the volume claim correctly when using an existing claim
+ set:
+ dags:
+ persistence:
+ enabled: true
+ existingClaim: test-claim
+ asserts:
+ - contains:
+ path: spec.volumes
+ content:
+ name: dags
+ persistentVolumeClaim:
+ claimName: test-claim
diff --git a/docs/production-deployment.rst b/docs/production-deployment.rst
index 7c4bfab..de974a6 100644
--- a/docs/production-deployment.rst
+++ b/docs/production-deployment.rst
@@ -630,8 +630,3 @@ Keytab secret and both containers in the same Pod share the volume, where tempor
the side-care container and read by the worker container.
This concept is implemented in the development version of the Helm Chart that is part of Airflow source code.
-
-
-.. spelling::
-
- pypirc
diff --git a/scripts/ci/kubernetes/ci_run_helm_testing.sh b/scripts/ci/kubernetes/ci_run_helm_testing.sh
index e5308db..224cc9e 100755
--- a/scripts/ci/kubernetes/ci_run_helm_testing.sh
+++ b/scripts/ci/kubernetes/ci_run_helm_testing.sh
@@ -20,9 +20,10 @@ echo "Running helm tests"
chart_directory="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/../../../chart/"
-echo "Chart directory is ${chart_directory}"
+cat chart/files/pod-template-file.yaml > chart/templates/pod-template-file.yaml
-docker run -w /airflow-chart -v "${chart_directory}":/airflow-chart \
+docker run -w /airflow-chart -v "$chart_directory":/airflow-chart \
--entrypoint /bin/sh \
aneeshkj/helm-unittest \
- -c "helm repo add stable https://kubernetes-charts.storage.googleapis.com; helm dependency update ; helm unittest ."
+ -c "helm repo add stable https://kubernetes-charts.storage.googleapis.com; helm dependency update ; helm unittest ." \
+ && rm chart/templates/pod-template-file.yaml
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index 0c9d722..fed7c97 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -19,12 +19,14 @@ import unittest
import sys
from tests.compat import mock
import uuid
-import kubernetes.client.models as k8s
-from kubernetes.client import ApiClient
+
+from dateutil import parser
+from kubernetes.client import ApiClient, models as k8s
from airflow.kubernetes.k8s_model import append_to_pod
from airflow.kubernetes.pod import Resources
-from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects
+from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects, \
+ datetime_to_label_safe_datestring
from airflow.kubernetes.secret import Secret
@@ -63,16 +65,25 @@ class TestPodGenerator(unittest.TestCase):
Secret('env', 'TARGET', 'secret_b', 'source_b'),
]
+ self.execution_date = parser.parse('2020-08-24 00:00:00.000000')
+ self.execution_date_label = datetime_to_label_safe_datestring(self.execution_date)
+ self.dag_id = 'dag_id'
+ self.task_id = 'task_id'
+ self.try_number = 3
self.labels = {
'airflow-worker': 'uuid',
'dag_id': 'dag_id',
- 'execution_date': 'date',
+ 'execution_date': mock.ANY,
'task_id': 'task_id',
'try_number': '3',
'airflow_version': mock.ANY,
'kubernetes_executor': 'True'
}
self.metadata = {
+ 'annotations': {'dag_id': 'dag_id',
+ 'execution_date': '2020-08-24T00:00:00',
+ 'task_id': 'task_id',
+ 'try_number': '3'},
'labels': self.labels,
'name': 'pod_id-' + self.static_uuid.hex,
'namespace': 'namespace'
@@ -646,8 +657,9 @@ class TestPodGenerator(unittest.TestCase):
'dag_id',
'task_id',
'pod_id',
- 3,
- 'date',
+ self.try_number,
+ "kube_image",
+ self.execution_date,
['command'],
executor_config,
worker_config,
@@ -667,6 +679,7 @@ class TestPodGenerator(unittest.TestCase):
'env': [],
'envFrom': [],
'name': 'base',
+ 'image': 'kube_image',
'ports': [],
'resources': {
'limits': {
@@ -706,8 +719,9 @@ class TestPodGenerator(unittest.TestCase):
'dag_id',
'task_id',
'pod_id',
- 3,
- 'date',
+ self.try_number,
+ "kube_image",
+ self.execution_date,
['command'],
executor_config,
worker_config,
@@ -727,6 +741,7 @@ class TestPodGenerator(unittest.TestCase):
'env': [],
'envFrom': [],
'name': 'base',
+ 'image': 'kube_image',
'ports': [],
'resources': {
'limits': {
@@ -789,8 +804,9 @@ class TestPodGenerator(unittest.TestCase):
'dag_id',
'task_id',
'pod_id',
- 3,
- 'date',
+ self.try_number,
+ "kube_image",
+ self.execution_date,
['command'],
executor_config,
worker_config,
@@ -799,7 +815,7 @@ class TestPodGenerator(unittest.TestCase):
)
sanitized_result = self.k8s_client.sanitize_for_serialization(result)
- self.metadata.update({'annotations': {'should': 'stay'}})
+ self.metadata['annotations']['should'] = 'stay'
self.assertEqual({
'apiVersion': 'v1',
@@ -811,6 +827,7 @@ class TestPodGenerator(unittest.TestCase):
'command': ['command'],
'env': [],
'envFrom': [],
+ 'image': 'kube_image',
'name': 'base',
'ports': [],
'resources': {
@@ -872,20 +889,21 @@ class TestPodGenerator(unittest.TestCase):
)
result = PodGenerator.construct_pod(
- 'dag_id',
- 'task_id',
- 'pod_id',
- 3,
- 'date',
- ['command'],
- executor_config,
- worker_config,
- 'namespace',
- 'uuid',
+ dag_id='dag_id',
+ task_id='task_id',
+ pod_id='pod_id',
+ try_number=3,
+ kube_image='kube_image',
+ date=self.execution_date,
+ command=['command'],
+ pod_override_object=executor_config,
+ base_worker_pod=worker_config,
+ namespace='namespace',
+ worker_uuid='uuid',
)
sanitized_result = self.k8s_client.sanitize_for_serialization(result)
- self.metadata.update({'annotations': {'should': 'stay'}})
+ self.metadata['annotations']['should'] = 'stay'
self.assertEqual({
'apiVersion': 'v1',
@@ -898,6 +916,7 @@ class TestPodGenerator(unittest.TestCase):
'env': [],
'envFrom': [],
'name': 'base',
+ 'image': 'kube_image',
'ports': [],
'resources': {
'limits': {
@@ -1081,12 +1100,14 @@ spec:
worker_uuid="test",
pod_id="test",
dag_id="test",
+ kube_image="foo",
task_id="test",
try_number=1,
- date="23-07-2020",
+ date=parser.parse("23-07-2020"),
command="test",
- kube_executor_config=None,
- worker_config=k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"airflow-test": "airflow-task-pod"},
- annotations={"my.annotation": "foo"})))
+ pod_override_object=None,
+ base_worker_pod=k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(labels={"airflow-test": "airflow-task-pod"},
+ annotations={"my.annotation": "foo"})))
self.assertIn("airflow-test", pod.metadata.labels)
self.assertIn("my.annotation", pod.metadata.annotations)
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 40271dc..0ac7940 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -18,6 +18,7 @@
import unittest
import six
+from dateutil import parser
from parameterized import parameterized
from tests.compat import mock
@@ -373,12 +374,14 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.kube_config.base_log_folder = '/logs'
worker_config = WorkerConfiguration(self.kube_config)
+ execution_date = parser.parse('2019-11-21 11:08:22.920875')
pod = PodGenerator.construct_pod(
"test_dag_id",
"test_task_id",
"test_pod_id",
1,
- "2019-11-21 11:08:22.920875",
+ 'kube_image',
+ execution_date,
["bash -c 'ls /'"],
None,
worker_config.as_pod(),
@@ -389,7 +392,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
'airflow-worker': 'sample-uuid',
'airflow_version': airflow_version.replace('+', '-'),
'dag_id': 'test_dag_id',
- 'execution_date': '2019-11-21 11:08:22.920875',
+ 'execution_date': '2019-11-21T11_08_22.920875',
'kubernetes_executor': 'True',
'my_label': 'label_id',
'task_id': 'test_task_id',