You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/30 14:03:28 UTC

[airflow] 01/14: Don't crash scheduler if exec config has old k8s objects (#24117)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b1be02473b2ad04dde8d1268a47f18a22eb89faa
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Jun 14 21:30:53 2022 -0700

    Don't crash scheduler if exec config has old k8s objects (#24117)
    
    From time to time k8s library objects change their attrs.  If executor config is stored with old version, and unpickled with new version, we can get attribute errors that can crash the scheduler (see https://github.com/apache/airflow/issues/23727).
    
    Here we update handling so that we fail the task but don't crash the scheduler.
    
    (cherry picked from commit 0c41f437674f135fe7232a368bf9c198b0ecd2f0)
---
 airflow/exceptions.py                       |  4 +++
 airflow/executors/kubernetes_executor.py    | 12 +++++++--
 airflow/kubernetes/pod_generator.py         |  7 ++++--
 airflow/models/taskinstance.py              | 16 +++++++++++-
 tests/executors/test_kubernetes_executor.py | 39 +++++++++++++++++++++++++++++
 tests/kubernetes/test_pod_generator.py      | 30 +++++++++++++++++++++-
 tests/models/test_taskinstance.py           | 26 ++++++++++++++++++-
 7 files changed, 127 insertions(+), 7 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 6a8eed35a3..f1a8c1cb66 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -327,3 +327,7 @@ class TaskDeferred(BaseException):
 
 class TaskDeferralError(AirflowException):
     """Raised when a task failed during deferral for some reason."""
+
+
+class PodReconciliationError(AirflowException):
+    """Raised when an error is encountered while trying to merge pod configs."""
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index c76cf58f41..e510da2b31 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -35,7 +35,7 @@ from kubernetes.client import Configuration, models as k8s
 from kubernetes.client.rest import ApiException
 from urllib3.exceptions import ReadTimeoutError
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, PodReconciliationError
 from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType
 from airflow.kubernetes import pod_generator
 from airflow.kubernetes.kube_client import get_kube_client
@@ -300,8 +300,9 @@ class AirflowKubernetesScheduler(LoggingMixin):
         and store relevant info in the current_jobs map so we can track the job's
         status
         """
-        self.log.info('Kubernetes job is %s', str(next_job).replace("\n", " "))
         key, command, kube_executor_config, pod_template_file = next_job
+        self.log.info('Kubernetes job is %s', key)
+
         dag_id, task_id, run_id, try_number, map_index = key
 
         if command[0:3] != ["airflow", "tasks", "run"]:
@@ -617,6 +618,13 @@ class KubernetesExecutor(BaseExecutor):
                 task = self.task_queue.get_nowait()
                 try:
                     self.kube_scheduler.run_next(task)
+                except PodReconciliationError as e:
+                    self.log.error(
+                        "Pod reconciliation failed, likely due to kubernetes library upgrade. "
+                        "Try clearing the task to re-run.",
+                        exc_info=True,
+                    )
+                    self.fail(task[0], e)
                 except ApiException as e:
 
                     # These codes indicate something is wrong with pod definition; otherwise we assume pod
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 52b45801cc..8a86919a65 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -34,7 +34,7 @@ from dateutil import parser
 from kubernetes.client import models as k8s
 from kubernetes.client.api_client import ApiClient
 
-from airflow.exceptions import AirflowConfigException
+from airflow.exceptions import AirflowConfigException, PodReconciliationError
 from airflow.kubernetes.pod_generator_deprecated import PodDefaults, PodGenerator as PodGeneratorDeprecated
 from airflow.utils import yaml
 from airflow.version import version as airflow_version
@@ -389,7 +389,10 @@ class PodGenerator:
         # Pod from the pod_template_File -> Pod from executor_config arg -> Pod from the K8s executor
         pod_list = [base_worker_pod, pod_override_object, dynamic_pod]
 
-        return reduce(PodGenerator.reconcile_pods, pod_list)
+        try:
+            return reduce(PodGenerator.reconcile_pods, pod_list)
+        except Exception as e:
+            raise PodReconciliationError from e
 
     @staticmethod
     def serialize_pod(pod: k8s.V1Pod) -> dict:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 95ddb2dacd..fe3387ecf0 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -428,6 +428,20 @@ class TaskInstanceKey(NamedTuple):
         return self
 
 
+def _executor_config_comparator(x, y):
+    """
+    The TaskInstance.executor_config attribute is a pickled object that may contain
+    kubernetes objects.  If the installed library version has changed since the
+    object was originally pickled, due to the underlying ``__eq__`` method on these
+    objects (which converts them to JSON), we may encounter attribute errors. In this
+    case we should replace the stored object.
+    """
+    try:
+        return x == y
+    except AttributeError:
+        return False
+
+
 class TaskInstance(Base, LoggingMixin):
     """
     Task instances store the state of a task instance. This table is the
@@ -470,7 +484,7 @@ class TaskInstance(Base, LoggingMixin):
     queued_dttm = Column(UtcDateTime)
     queued_by_job_id = Column(Integer)
     pid = Column(Integer)
-    executor_config = Column(PickleType(pickler=dill))
+    executor_config = Column(PickleType(pickler=dill, comparator=_executor_config_comparator))
 
     external_executor_id = Column(String(ID_LEN, **COLLATION_ARGS))
 
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 954f4f0600..8ffeb5624b 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -29,6 +29,7 @@ from kubernetes.client.rest import ApiException
 from urllib3 import HTTPResponse
 
 from airflow import AirflowException
+from airflow.exceptions import PodReconciliationError
 from airflow.models.taskinstance import TaskInstanceKey
 from airflow.operators.bash import BashOperator
 from airflow.utils import timezone
@@ -272,6 +273,44 @@ class TestKubernetesExecutor:
                 assert kubernetes_executor.task_queue.empty()
                 assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
 
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    def test_run_next_pod_reconciliation_error(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+        """
+        When construct_pod raises PodReconciliationError, we should fail the task.
+        """
+        import sys
+
+        path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'
+
+        mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+        fail_msg = 'test message'
+        mock_kube_client.create_namespaced_pod = mock.MagicMock(side_effect=PodReconciliationError(fail_msg))
+        mock_get_kube_client.return_value = mock_kube_client
+        mock_api_client = mock.MagicMock()
+        mock_api_client.sanitize_for_serialization.return_value = {}
+        mock_kube_client.api_client = mock_api_client
+        config = {('kubernetes', 'pod_template_file'): path}
+        with conf_vars(config):
+            kubernetes_executor = self.kubernetes_executor
+            kubernetes_executor.start()
+            # Execute a task while the Api Throws errors
+            try_number = 1
+            task_instance_key = TaskInstanceKey('dag', 'task', 'run_id', try_number)
+            kubernetes_executor.execute_async(
+                key=task_instance_key,
+                queue=None,
+                command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+            )
+            kubernetes_executor.sync()
+
+            assert kubernetes_executor.task_queue.empty()
+            assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+            assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg
+
     @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync')
     @mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index d220872187..df5efb0d06 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -19,6 +19,7 @@ import re
 import sys
 import uuid
 from unittest import mock
+from unittest.mock import MagicMock
 
 import pytest
 from dateutil import parser
@@ -26,7 +27,7 @@ from kubernetes.client import ApiClient, models as k8s
 from parameterized import parameterized
 
 from airflow import __version__
-from airflow.exceptions import AirflowConfigException
+from airflow.exceptions import AirflowConfigException, PodReconciliationError
 from airflow.kubernetes.pod_generator import (
     PodDefaults,
     PodGenerator,
@@ -520,6 +521,33 @@ class TestPodGenerator:
         worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config)
         assert worker_config_result == sanitized_result
 
+    @mock.patch('uuid.uuid4')
+    def test_construct_pod_attribute_error(self, mock_uuid):
+        """
+        After upgrading k8s library we might get attribute error.
+        In this case it should raise PodReconciliationError
+        """
+        path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'
+        worker_config = PodGenerator.deserialize_model_file(path)
+        mock_uuid.return_value = self.static_uuid
+        executor_config = MagicMock()
+        executor_config.side_effect = AttributeError('error')
+
+        with pytest.raises(PodReconciliationError):
+            PodGenerator.construct_pod(
+                dag_id='dag_id',
+                task_id='task_id',
+                pod_id='pod_id',
+                kube_image='test-image',
+                try_number=3,
+                date=self.execution_date,
+                args=['command'],
+                pod_override_object=executor_config,
+                base_worker_pod=worker_config,
+                namespace='namespace',
+                scheduler_job_id='uuid',
+            )
+
     @mock.patch('uuid.uuid4')
     def test_ensure_max_label_length(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index b8e7534561..3990c3cbf5 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -57,7 +57,12 @@ from airflow.models import (
     XCom,
 )
 from airflow.models.taskfail import TaskFail
-from airflow.models.taskinstance import TaskInstance, load_error_file, set_error_file
+from airflow.models.taskinstance import (
+    TaskInstance,
+    _executor_config_comparator,
+    load_error_file,
+    set_error_file,
+)
 from airflow.models.taskmap import TaskMap
 from airflow.models.xcom import XCOM_RETURN_KEY
 from airflow.operators.bash import BashOperator
@@ -2868,3 +2873,22 @@ def test_expand_non_templated_field(dag_maker, session):
 
     echo_task = dag.get_task("echo")
     assert "get_extra_env" in echo_task.upstream_task_ids
+
+
+def test_executor_config_comparator():
+    """
+    When comparison raises AttributeError, return False.
+    This can happen when executor config contains kubernetes objects pickled
+    under older kubernetes library version.
+    """
+
+    class MockAttrError:
+        def __eq__(self, other):
+            raise AttributeError('hello')
+
+    a = MockAttrError()
+    with pytest.raises(AttributeError):
+        # just verify for ourselves that this throws
+        assert a == a
+    assert _executor_config_comparator(a, a) is False
+    assert _executor_config_comparator('a', 'a') is True