You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/12 21:10:56 UTC

[airflow] 01/08: Modify helm chart to use pod_template_file (#10872)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cd5f064d9fb904a0b4981919ed41bea9afb79897
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Fri Sep 11 10:47:59 2020 -0700

    Modify helm chart to use pod_template_file (#10872)
    
    * Modify helm chart to use pod_template_file
    
    Since we are deprecating most k8sexecutor arguments
    we should use the pod_template_file when launching airflow
    using the KubernetesExecutor
    
    * fix tests
    
    * one more nit
    
    * fix dag command
    
    * fix pylint
    
    (cherry picked from commit 56bd9b7d6b494251fa728ff6a7eb06d6d7eeb2c8)
---
 .pre-commit-config.yaml                            |   2 +-
 airflow/executors/kubernetes_executor.py           |   7 +-
 airflow/kubernetes/pod_generator.py                |  47 ++++++-
 chart/files/pod-template-file.yaml                 | 105 +++++++++++++++
 chart/templates/_helpers.yaml                      |   4 +
 chart/templates/configmap.yaml                     |   8 ++
 .../templates/scheduler/scheduler-deployment.yaml  |   4 +
 chart/tests/pod-template-file_test.yaml            | 149 +++++++++++++++++++++
 docs/production-deployment.rst                     |   5 -
 scripts/ci/kubernetes/ci_run_helm_testing.sh       |   7 +-
 tests/kubernetes/test_pod_generator.py             |  73 ++++++----
 tests/kubernetes/test_worker_configuration.py      |   7 +-
 12 files changed, 371 insertions(+), 47 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 1d278e5..eee592e 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -25,7 +25,7 @@ repos:
     rev: v1.1.9
     hooks:
       - id: forbid-tabs
-        exclude: ^docs/Makefile$
+        exclude: ^docs/Makefile$|^clients/gen/go.sh
       - id: insert-license
         name: Add license for all SQL files
         files: \.sql$
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 0ae8f16..73dd91e 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -440,10 +440,11 @@ class AirflowKubernetesScheduler(LoggingMixin):
             dag_id=pod_generator.make_safe_label_value(dag_id),
             task_id=pod_generator.make_safe_label_value(task_id),
             try_number=try_number,
-            date=self._datetime_to_label_safe_datestring(execution_date),
+            kube_image=self.kube_config.kube_image,
+            date=execution_date,
             command=command,
-            kube_executor_config=kube_executor_config,
-            worker_config=self.worker_configuration_pod
+            pod_override_object=kube_executor_config,
+            base_worker_pod=self.worker_configuration_pod
         )
 
         sanitized_pod = self.launcher._client.api_client.sanitize_for_serialization(pod)
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 4fbfec1..5a57230 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -30,6 +30,7 @@ from functools import reduce
 
 import kubernetes.client.models as k8s
 import yaml
+from dateutil import parser
 from kubernetes.client.api_client import ApiClient
 from airflow.contrib.kubernetes.pod import _extract_volume_mounts
 
@@ -92,6 +93,30 @@ def make_safe_label_value(string):
     return safe_label
 
 
+def datetime_to_label_safe_datestring(datetime_obj):
+    """
+    Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but
+    not "_" let's
+    replace ":" with "_"
+
+    :param datetime_obj: datetime.datetime object
+    :return: ISO-like string representing the datetime
+    """
+    return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_')
+
+
+def label_safe_datestring_to_datetime(string):
+    """
+    Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not
+    "_", let's
+    replace ":" with "_"
+
+    :param string: str
+    :return: datetime.datetime object
+    """
+    return parser.parse(string.replace('_plus_', '+').replace("_", ":"))
+
+
 class PodGenerator(object):
     """
     Contains Kubernetes Airflow Worker configuration logic
@@ -496,10 +521,11 @@ class PodGenerator(object):
         task_id,
         pod_id,
         try_number,
+        kube_image,
         date,
         command,
-        kube_executor_config,
-        worker_config,
+        pod_override_object,
+        base_worker_pod,
         namespace,
         worker_uuid
     ):
@@ -511,22 +537,29 @@ class PodGenerator(object):
         """
         dynamic_pod = PodGenerator(
             namespace=namespace,
+            image=kube_image,
             labels={
                 'airflow-worker': worker_uuid,
-                'dag_id': dag_id,
-                'task_id': task_id,
-                'execution_date': date,
+                'dag_id': make_safe_label_value(dag_id),
+                'task_id': make_safe_label_value(task_id),
+                'execution_date': datetime_to_label_safe_datestring(date),
                 'try_number': str(try_number),
                 'airflow_version': airflow_version.replace('+', '-'),
                 'kubernetes_executor': 'True',
             },
+            annotations={
+                'dag_id': dag_id,
+                'task_id': task_id,
+                'execution_date': date.isoformat(),
+                'try_number': str(try_number),
+            },
             cmds=command,
             name=pod_id
         ).gen_pod()
 
         # Reconcile the pods starting with the first chronologically,
-        # Pod from the airflow.cfg -> Pod from executor_config arg -> Pod from the K8s executor
-        pod_list = [worker_config, kube_executor_config, dynamic_pod]
+        # Pod from the pod_template_File -> Pod from executor_config arg -> Pod from the K8s executor
+        pod_list = [base_worker_pod, pod_override_object, dynamic_pod]
 
         return reduce(PodGenerator.reconcile_pods, pod_list)
 
diff --git a/chart/files/pod-template-file.yaml b/chart/files/pod-template-file.yaml
new file mode 100644
index 0000000..b19edf1
--- /dev/null
+++ b/chart/files/pod-template-file.yaml
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+apiVersion: v1
+kind: Pod
+metadata:
+  name: dummy-name
+spec:
+{{- if .Values.dags.gitSync.enabled }}
+  initContainers:
+{{- include "git_sync_container" . | indent 8 }}
+{{- end }}
+  containers:
+    - args: []
+      command: []
+      env:
+      - name: AIRFLOW__CORE__EXECUTOR
+        value: LocalExecutor
+{{- include "standard_airflow_environment" . | indent 4 }}
+      envFrom: []
+      image: dummy_image
+      imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
+      name: base
+      ports: []
+      volumeMounts:
+        - mountPath: {{ template "airflow_logs" . }}
+          name: airflow-logs
+{{- if or .Values.dags.persistence.enabled .Values.dags.gitSync.enabled }}
+        - mountPath: {{ template "airflow_dags_mount_path" . }}
+          name: airflow-dags
+          readOnly: false
+{{- end }}
+{{- if .Values.dags.gitSync.sshKeySecret }}
+        - mountPath: /etc/git-secret/known_hosts
+          name: {{ .Values.dags.gitSync.knownHosts }}
+          subPath: known_hosts
+{{- end }}
+{{- if .Values.dags.gitSync.sshKeySecret }}
+        - mountPath: /etc/git-secret/ssh
+          name: git-sync-ssh-key
+          subPath: ssh
+{{- end }}
+{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}
+        - mountPath: {{ include "airflow_dags_mount_path" . }}
+          name: airflow-dags
+          readOnly: true
+{{- if .Values.dags.persistence.enabled }}
+          subPath: {{.Values.dags.gitSync.dest }}/{{ .Values.dags.gitSync.subPath }}
+{{- end }}
+{{- end }}
+  hostNetwork: false
+  {{- if or .Values.registry.secretName .Values.registry.connection }}
+  imagePullSecrets:
+    - name: {{ template "registry_secret" . }}
+  {{- end }}
+  restartPolicy: Never
+  securityContext:
+    runAsUser: {{ .Values.uid }}
+  nodeSelector:
+    {{ toYaml .Values.nodeSelector | indent 8 }}
+  affinity:
+    {{ toYaml .Values.affinity | indent 8 }}
+  tolerations:
+    {{ toYaml .Values.tolerations | indent 8 }}
+  serviceAccountName: '{{ .Release.Name }}-worker-serviceaccount'
+  volumes:
+  {{- if .Values.dags.persistence.enabled }}
+  - name: dags
+    persistentVolumeClaim:
+      claimName: {{ template "airflow_dags_volume_claim" . }}
+  {{- else if .Values.dags.gitSync.enabled }}
+  - name: dags
+    emptyDir: {}
+  {{- end }}
+  {{- if and  .Values.dags.gitSync.enabled  .Values.dags.gitSync.sshKeySecret }}
+{{- include "git_sync_ssh_key_volume" . | indent 2 }}
+  {{- end }}
+  - emptyDir: {}
+    name: airflow-logs
+{{- if .Values.dags.gitSync.knownHosts }}
+  - configMap:
+      defaultMode: 288
+      name: {{ include "airflow_config" . }}
+    name: git-sync-known-hosts
+{{- end }}
+  - configMap:
+      name: {{ include "airflow_config" . }}
+    name: airflow-config
+  - configMap:
+      name: {{ include "airflow_config" . }}
+    name: airflow-local-settings
diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml
index 898924f..49c3b3f 100644
--- a/chart/templates/_helpers.yaml
+++ b/chart/templates/_helpers.yaml
@@ -213,6 +213,10 @@
 {{ default (printf "%s-airflow-result-backend" .Release.Name) .Values.data.resultBackendSecretName }}
 {{- end }}
 
+{{ define "airflow_pod_template_file" -}}
+{{ (printf "%s/pod_templates" .Values.airflowHome) }}
+{{- end }}
+
 {{ define "pgbouncer_config_secret" -}}
 {{ .Release.Name }}-pgbouncer-config
 {{- end }}
diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml
index f0e09a0..cc9a388 100644
--- a/chart/templates/configmap.yaml
+++ b/chart/templates/configmap.yaml
@@ -55,3 +55,11 @@ data:
   known_hosts: |
     {{ .Values.dags.gitSync.knownHosts | nindent 4 }}
 {{- end }}
+{{- if eq .Values.executor "KubernetesExecutor" }}
+  pod_template_file.yaml: |-
+{{- if .Values.podTemplate }}
+    {{ .Values.podTemplate | nindent 4 }}
+{{- else }}
+{{ tpl (.Files.Get "files/pod-template-file.yaml") . | nindent 4 }}
+{{- end }}
+{{- end }}
diff --git a/chart/templates/scheduler/scheduler-deployment.yaml b/chart/templates/scheduler/scheduler-deployment.yaml
index 9331556..f2b4a99 100644
--- a/chart/templates/scheduler/scheduler-deployment.yaml
+++ b/chart/templates/scheduler/scheduler-deployment.yaml
@@ -133,6 +133,10 @@ spec:
           resources:
 {{ toYaml .Values.scheduler.resources | indent 12 }}
           volumeMounts:
+            - name: config
+              mountPath: {{ include "airflow_pod_template_file" . }}/pod_template_file.yaml
+              subPath: pod_template_file.yaml
+              readOnly: true
             - name: logs
               mountPath: {{ template "airflow_logs" . }}
             - name: config
diff --git a/chart/tests/pod-template-file_test.yaml b/chart/tests/pod-template-file_test.yaml
new file mode 100644
index 0000000..64e99f8
--- /dev/null
+++ b/chart/tests/pod-template-file_test.yaml
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+templates:
+  - pod-template-file.yaml
+tests:
+  - it: should work
+    asserts:
+      - isKind:
+          of: Pod
+      - equal:
+          path: spec.containers[0].image
+          value: dummy_image
+      - equal:
+          path: spec.containers[0].name
+          value: base
+  - it: should add an initContainer if gitSync is true
+    set:
+      dags:
+        gitSync:
+          enabled: true
+          containerName: git-sync-test
+          containerTag: test-tag
+          containerRepository: test-registry/test-repo
+          wait: 66
+          maxFailures: 70
+          subPath: "path1/path2"
+          dest: "test-dest"
+          root: "/git-root"
+          rev: HEAD
+          depth: 1
+          repo: https://github.com/apache/airflow.git
+          branch: test-branch
+          sshKeySecret: ~
+          credentialsSecret: ~
+          knownHosts: ~
+    asserts:
+      - isKind:
+          of: Pod
+      - equal:
+          path: spec.initContainers[0]
+          value:
+            name: git-sync-test
+            image: test-registry/test-repo:test-tag
+            env:
+              - name: GIT_SYNC_REV
+                value: HEAD
+              - name: GIT_SYNC_BRANCH
+                value: test-branch
+              - name: GIT_SYNC_REPO
+                value: https://github.com/apache/airflow.git
+              - name: GIT_SYNC_DEPTH
+                value: "1"
+              - name: GIT_SYNC_ROOT
+                value: /git-root
+              - name: GIT_SYNC_DEST
+                value: test-dest
+              - name: GIT_SYNC_ADD_USER
+                value: "true"
+              - name: GIT_SYNC_WAIT
+                value: "66"
+              - name: GIT_SYNC_MAX_SYNC_FAILURES
+                value: "70"
+            volumeMounts:
+              - mountPath: /git-root
+                name: dags
+  - it: validate if ssh params are added
+    set:
+      dags:
+        gitSync:
+          enabled: true
+          containerName: git-sync-test
+          sshKeySecret: ssh-secret
+          knownHosts: ~
+          branch: test-branch
+    asserts:
+      - contains:
+          path: spec.initContainers[0].env
+          content:
+            name: GIT_SSH_KEY_FILE
+            value: "/etc/git-secret/ssh"
+      - contains:
+          path: spec.initContainers[0].env
+          content:
+            name: GIT_SYNC_SSH
+            value: "true"
+      - contains:
+          path: spec.initContainers[0].env
+          content:
+            name: GIT_KNOWN_HOSTS
+            value: "false"
+      - contains:
+          path: spec.volumes
+          content:
+            name: git-sync-ssh-key
+            secret:
+              secretName: ssh-secret
+              defaultMode: 288
+  - it: should set username and pass env variables
+    set:
+      dags:
+        gitSync:
+          enabled: true
+          credentialsSecret: user-pass-secret
+          sshKeySecret: ~
+    asserts:
+      - contains:
+          path: spec.initContainers[0].env
+          content:
+            name: GIT_SYNC_USERNAME
+            valueFrom:
+              secretKeyRef:
+                name: user-pass-secret
+                key: GIT_SYNC_USERNAME
+      - contains:
+          path: spec.initContainers[0].env
+          content:
+            name: GIT_SYNC_PASSWORD
+            valueFrom:
+              secretKeyRef:
+                name: user-pass-secret
+                key: GIT_SYNC_PASSWORD
+  - it: should set the volume claim correctly when using an existing claim
+    set:
+      dags:
+        persistence:
+          enabled: true
+          existingClaim: test-claim
+    asserts:
+      - contains:
+          path: spec.volumes
+          content:
+            name: dags
+            persistentVolumeClaim:
+              claimName: test-claim
diff --git a/docs/production-deployment.rst b/docs/production-deployment.rst
index 7c4bfab..de974a6 100644
--- a/docs/production-deployment.rst
+++ b/docs/production-deployment.rst
@@ -630,8 +630,3 @@ Keytab secret and both containers in the same Pod share the volume, where tempor
 the side-care container and read by the worker container.
 
 This concept is implemented in the development version of the Helm Chart that is part of Airflow source code.
-
-
-.. spelling::
-
-   pypirc
diff --git a/scripts/ci/kubernetes/ci_run_helm_testing.sh b/scripts/ci/kubernetes/ci_run_helm_testing.sh
index e5308db..224cc9e 100755
--- a/scripts/ci/kubernetes/ci_run_helm_testing.sh
+++ b/scripts/ci/kubernetes/ci_run_helm_testing.sh
@@ -20,9 +20,10 @@ echo "Running helm tests"
 
 chart_directory="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/../../../chart/"
 
-echo "Chart directory is ${chart_directory}"
+cat chart/files/pod-template-file.yaml > chart/templates/pod-template-file.yaml
 
-docker run -w /airflow-chart -v "${chart_directory}":/airflow-chart \
+docker run -w /airflow-chart -v "$chart_directory":/airflow-chart \
   --entrypoint /bin/sh \
   aneeshkj/helm-unittest \
-  -c "helm repo add stable https://kubernetes-charts.storage.googleapis.com; helm dependency update ; helm unittest ."
+  -c "helm repo add stable https://kubernetes-charts.storage.googleapis.com; helm dependency update ; helm unittest ." \
+  && rm chart/templates/pod-template-file.yaml
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index 0c9d722..fed7c97 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -19,12 +19,14 @@ import unittest
 import sys
 from tests.compat import mock
 import uuid
-import kubernetes.client.models as k8s
-from kubernetes.client import ApiClient
+
+from dateutil import parser
+from kubernetes.client import ApiClient, models as k8s
 
 from airflow.kubernetes.k8s_model import append_to_pod
 from airflow.kubernetes.pod import Resources
-from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects
+from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects, \
+    datetime_to_label_safe_datestring
 from airflow.kubernetes.secret import Secret
 
 
@@ -63,16 +65,25 @@ class TestPodGenerator(unittest.TestCase):
             Secret('env', 'TARGET', 'secret_b', 'source_b'),
         ]
 
+        self.execution_date = parser.parse('2020-08-24 00:00:00.000000')
+        self.execution_date_label = datetime_to_label_safe_datestring(self.execution_date)
+        self.dag_id = 'dag_id'
+        self.task_id = 'task_id'
+        self.try_number = 3
         self.labels = {
             'airflow-worker': 'uuid',
             'dag_id': 'dag_id',
-            'execution_date': 'date',
+            'execution_date': mock.ANY,
             'task_id': 'task_id',
             'try_number': '3',
             'airflow_version': mock.ANY,
             'kubernetes_executor': 'True'
         }
         self.metadata = {
+            'annotations': {'dag_id': 'dag_id',
+                            'execution_date': '2020-08-24T00:00:00',
+                            'task_id': 'task_id',
+                            'try_number': '3'},
             'labels': self.labels,
             'name': 'pod_id-' + self.static_uuid.hex,
             'namespace': 'namespace'
@@ -646,8 +657,9 @@ class TestPodGenerator(unittest.TestCase):
             'dag_id',
             'task_id',
             'pod_id',
-            3,
-            'date',
+            self.try_number,
+            "kube_image",
+            self.execution_date,
             ['command'],
             executor_config,
             worker_config,
@@ -667,6 +679,7 @@ class TestPodGenerator(unittest.TestCase):
                     'env': [],
                     'envFrom': [],
                     'name': 'base',
+                    'image': 'kube_image',
                     'ports': [],
                     'resources': {
                         'limits': {
@@ -706,8 +719,9 @@ class TestPodGenerator(unittest.TestCase):
             'dag_id',
             'task_id',
             'pod_id',
-            3,
-            'date',
+            self.try_number,
+            "kube_image",
+            self.execution_date,
             ['command'],
             executor_config,
             worker_config,
@@ -727,6 +741,7 @@ class TestPodGenerator(unittest.TestCase):
                     'env': [],
                     'envFrom': [],
                     'name': 'base',
+                    'image': 'kube_image',
                     'ports': [],
                     'resources': {
                         'limits': {
@@ -789,8 +804,9 @@ class TestPodGenerator(unittest.TestCase):
             'dag_id',
             'task_id',
             'pod_id',
-            3,
-            'date',
+            self.try_number,
+            "kube_image",
+            self.execution_date,
             ['command'],
             executor_config,
             worker_config,
@@ -799,7 +815,7 @@ class TestPodGenerator(unittest.TestCase):
         )
         sanitized_result = self.k8s_client.sanitize_for_serialization(result)
 
-        self.metadata.update({'annotations': {'should': 'stay'}})
+        self.metadata['annotations']['should'] = 'stay'
 
         self.assertEqual({
             'apiVersion': 'v1',
@@ -811,6 +827,7 @@ class TestPodGenerator(unittest.TestCase):
                     'command': ['command'],
                     'env': [],
                     'envFrom': [],
+                    'image': 'kube_image',
                     'name': 'base',
                     'ports': [],
                     'resources': {
@@ -872,20 +889,21 @@ class TestPodGenerator(unittest.TestCase):
         )
 
         result = PodGenerator.construct_pod(
-            'dag_id',
-            'task_id',
-            'pod_id',
-            3,
-            'date',
-            ['command'],
-            executor_config,
-            worker_config,
-            'namespace',
-            'uuid',
+            dag_id='dag_id',
+            task_id='task_id',
+            pod_id='pod_id',
+            try_number=3,
+            kube_image='kube_image',
+            date=self.execution_date,
+            command=['command'],
+            pod_override_object=executor_config,
+            base_worker_pod=worker_config,
+            namespace='namespace',
+            worker_uuid='uuid',
         )
         sanitized_result = self.k8s_client.sanitize_for_serialization(result)
 
-        self.metadata.update({'annotations': {'should': 'stay'}})
+        self.metadata['annotations']['should'] = 'stay'
 
         self.assertEqual({
             'apiVersion': 'v1',
@@ -898,6 +916,7 @@ class TestPodGenerator(unittest.TestCase):
                     'env': [],
                     'envFrom': [],
                     'name': 'base',
+                    'image': 'kube_image',
                     'ports': [],
                     'resources': {
                         'limits': {
@@ -1081,12 +1100,14 @@ spec:
             worker_uuid="test",
             pod_id="test",
             dag_id="test",
+            kube_image="foo",
             task_id="test",
             try_number=1,
-            date="23-07-2020",
+            date=parser.parse("23-07-2020"),
             command="test",
-            kube_executor_config=None,
-            worker_config=k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"airflow-test": "airflow-task-pod"},
-                                                              annotations={"my.annotation": "foo"})))
+            pod_override_object=None,
+            base_worker_pod=k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(labels={"airflow-test": "airflow-task-pod"},
+                                          annotations={"my.annotation": "foo"})))
         self.assertIn("airflow-test", pod.metadata.labels)
         self.assertIn("my.annotation", pod.metadata.annotations)
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 40271dc..0ac7940 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -18,6 +18,7 @@
 
 import unittest
 import six
+from dateutil import parser
 from parameterized import parameterized
 
 from tests.compat import mock
@@ -373,12 +374,14 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.kube_config.base_log_folder = '/logs'
 
         worker_config = WorkerConfiguration(self.kube_config)
+        execution_date = parser.parse('2019-11-21 11:08:22.920875')
         pod = PodGenerator.construct_pod(
             "test_dag_id",
             "test_task_id",
             "test_pod_id",
             1,
-            "2019-11-21 11:08:22.920875",
+            'kube_image',
+            execution_date,
             ["bash -c 'ls /'"],
             None,
             worker_config.as_pod(),
@@ -389,7 +392,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
             'airflow-worker': 'sample-uuid',
             'airflow_version': airflow_version.replace('+', '-'),
             'dag_id': 'test_dag_id',
-            'execution_date': '2019-11-21 11:08:22.920875',
+            'execution_date': '2019-11-21T11_08_22.920875',
             'kubernetes_executor': 'True',
             'my_label': 'label_id',
             'task_id': 'test_task_id',