You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/08/06 22:50:41 UTC

[airflow] branch main updated: KEDA task count query should ignore k8s queue (#17433)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3136f22  KEDA task count query should ignore k8s queue (#17433)
3136f22 is described below

commit 3136f22bf7377ed177d7dcca795339c674459913
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Fri Aug 6 15:50:17 2021 -0700

    KEDA task count query should ignore k8s queue (#17433)
    
    CeleryKubernetesExecutor lets us use both celery and kubernetes executors.
    KEDA lets us scale down to zero when there are no celery tasks running.
    If we have no celery tasks running, and we run a k8s task, then KEDA will
    launch a worker even though there are still no celery tasks.  We can prevent
    this from happening by ignoring the kubernetes queue in the KEDA query.
---
 chart/templates/workers/worker-kedaautoscaler.yaml |  5 ++-
 chart/tests/test_keda.py                           | 50 +++++++++++++++++++---
 chart/values.yaml                                  |  2 +
 3 files changed, 50 insertions(+), 7 deletions(-)

diff --git a/chart/templates/workers/worker-kedaautoscaler.yaml b/chart/templates/workers/worker-kedaautoscaler.yaml
index 9c27861..3596552 100644
--- a/chart/templates/workers/worker-kedaautoscaler.yaml
+++ b/chart/templates/workers/worker-kedaautoscaler.yaml
@@ -49,5 +49,8 @@ spec:
         query: >-
           SELECT ceil(COUNT(*)::decimal / {{ .Values.config.celery.worker_concurrency }})
           FROM task_instance
-          WHERE state='running' OR state='queued'
+          WHERE (state='running' OR state='queued')
+          {{- if eq .Values.executor "CeleryKubernetesExecutor" }}
+          AND queue != '{{ .Values.config.celery_kubernetes_executor.kubernetes_queue }}'
+          {{- end }}
 {{- end }}
diff --git a/chart/tests/test_keda.py b/chart/tests/test_keda.py
index d9359c8..6ea63a3 100644
--- a/chart/tests/test_keda.py
+++ b/chart/tests/test_keda.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import jmespath
+import pytest
 from parameterized import parameterized
 
 from tests.helm_template_generator import render_chart
@@ -51,17 +52,29 @@ class TestKeda:
         else:
             assert docs == []
 
-    @parameterized.expand(
+    @staticmethod
+    def build_query(executor, concurrency=16, queue=None):
+        """Builds the query used by KEDA autoscaler to determine how many workers there should be"""
+        query = (
+            f"SELECT ceil(COUNT(*)::decimal / {concurrency}) "
+            "FROM task_instance WHERE (state='running' OR state='queued')"
+        )
+        if executor == 'CeleryKubernetesExecutor':
+            query += f" AND queue != '{queue or 'kubernetes'}'"
+        return query
+
+    @pytest.mark.parametrize(
+        'executor,concurrency',
         [
             ("CeleryExecutor", 8),
             ("CeleryExecutor", 16),
             ("CeleryKubernetesExecutor", 8),
             ("CeleryKubernetesExecutor", 16),
-        ]
+        ],
     )
     def test_keda_concurrency(self, executor, concurrency):
         """
-        ScaledObject should only be created when set to enabled and executor is Celery or CeleryKubernetes
+        Verify keda sql query uses configured concurrency
         """
         docs = render_chart(
             values={
@@ -71,10 +84,35 @@ class TestKeda:
             },
             show_only=["templates/workers/worker-kedaautoscaler.yaml"],
         )
-        expected_query = (
-            f"SELECT ceil(COUNT(*)::decimal / {concurrency}) "
-            "FROM task_instance WHERE state='running' OR state='queued'"
+        expected_query = self.build_query(executor=executor, concurrency=concurrency)
+        assert jmespath.search("spec.triggers[0].metadata.query", docs[0]) == expected_query
+
+    @pytest.mark.parametrize(
+        'executor,queue,should_filter',
+        [
+            ("CeleryExecutor", None, False),
+            ("CeleryExecutor", 'my_queue', False),
+            ("CeleryKubernetesExecutor", None, True),
+            ("CeleryKubernetesExecutor", 'my_queue', True),
+        ],
+    )
+    def test_keda_query_kubernetes_queue(self, executor, queue, should_filter):
+        """
+        Verify keda sql query ignores kubernetes queue when CKE is used.
+        Sometimes a user might want to use a different queue name for k8s executor tasks,
+        and we also verify here that we use the configured queue name in that case.
+        """
+        values = {
+            "workers": {"keda": {"enabled": True}, "persistence": {"enabled": False}},
+            "executor": executor,
+        }
+        if queue:
+            values.update({'config': {'celery_kubernetes_executor': {'kubernetes_queue': queue}}})
+        docs = render_chart(
+            values=values,
+            show_only=["templates/workers/worker-kedaautoscaler.yaml"],
         )
+        expected_query = self.build_query(executor=executor, queue=queue)
         assert jmespath.search("spec.triggers[0].metadata.query", docs[0]) == expected_query
 
     @parameterized.expand(
diff --git a/chart/values.yaml b/chart/values.yaml
index de91d4f..b18de81 100644
--- a/chart/values.yaml
+++ b/chart/values.yaml
@@ -1111,6 +1111,8 @@ config:
     reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
     principal: '{{ .Values.kerberos.principal }}'
     ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{ .Values.kerberos.ccacheFileName }}'
+  celery_kubernetes_executor:
+    kubernetes_queue: 'kubernetes'
   kubernetes:
     namespace: '{{ .Release.Namespace }}'
     airflow_configmap: '{{ include "airflow_config" . }}'