You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/12/17 07:21:04 UTC

(airflow) branch main updated: Bugfix/logging for pausing (#36182)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c884f3ce32 Bugfix/logging for pausing (#36182)
c884f3ce32 is described below

commit c884f3ce3250bb9dd58cf3dd8dde7c2555e664a5
Author: rom sharon <33...@users.noreply.github.com>
AuthorDate: Sun Dec 17 09:20:58 2023 +0200

    Bugfix/logging for pausing (#36182)
    
    
    ---------
    
    Co-authored-by: Aleph Melo <al...@icloud.com>
---
 airflow/www/decorators.py            |  4 ++-
 tests/www/views/test_views_paused.py | 50 ++++++++++++++++++++++++++++++++++++
 2 files changed, 53 insertions(+), 1 deletion(-)

diff --git a/airflow/www/decorators.py b/airflow/www/decorators.py
index a4b613170c..9e455f7e5f 100644
--- a/airflow/www/decorators.py
+++ b/airflow/www/decorators.py
@@ -93,7 +93,7 @@ def action_logging(func: Callable | None = None, event: str | None = None) -> Ca
                     user = get_auth_manager().get_user_name()
                     user_display = get_auth_manager().get_user_display_name()
 
-                fields_skip_logging = {"csrf_token", "_csrf_token"}
+                fields_skip_logging = {"csrf_token", "_csrf_token", "is_paused"}
                 extra_fields = [
                     (k, secrets_masker.redact(v, k))
                     for k, v in itertools.chain(request.values.items(multi=True), request.view_args.items())
@@ -106,6 +106,8 @@ def action_logging(func: Callable | None = None, event: str | None = None) -> Ca
 
                 params = {**request.values, **request.view_args}
 
+                if params and "is_paused" in params:
+                    extra_fields.append(("is_paused", params["is_paused"] == "false"))
                 log = Log(
                     event=event or f.__name__,
                     task_instance=None,
diff --git a/tests/www/views/test_views_paused.py b/tests/www/views/test_views_paused.py
new file mode 100644
index 0000000000..1f9ac1f4d8
--- /dev/null
+++ b/tests/www/views/test_views_paused.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.models.log import Log
+from tests.test_utils.db import clear_db_dags
+
+pytestmark = pytest.mark.db_test
+
+
+@pytest.fixture(autouse=True)
+def dags(create_dummy_dag):
+    paused_dag, _ = create_dummy_dag(dag_id="paused_dag", is_paused_upon_creation=True)
+    dag, _ = create_dummy_dag(dag_id="unpaused_dag")
+
+    yield dag, paused_dag
+
+    clear_db_dags()
+
+
+def test_logging_pause_dag(admin_client, dags, session):
+    dag, _ = dags
+    # is_paused=false mean pause the dag
+    admin_client.post(f"/paused?is_paused=false&dag_id={dag.dag_id}", follow_redirects=True)
+    dag_query = session.query(Log).filter(Log.dag_id == dag.dag_id)
+    assert "('is_paused', True)" in dag_query.first().extra
+
+
+def test_logging_unpuase_dag(admin_client, dags, session):
+    _, paused_dag = dags
+    # is_paused=true mean unpause the dag
+    admin_client.post(f"/paused?is_paused=true&dag_id={paused_dag.dag_id}", follow_redirects=True)
+    dag_query = session.query(Log).filter(Log.dag_id == paused_dag.dag_id)
+    assert "('is_paused', False)" in dag_query.first().extra