You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:47:14 UTC
[airflow] 30/37: Fix dag run trigger with a note. (#29228)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9891eef2abdade85c2956f99e29405fa7ecc960b
Author: Vedant Lodha <ve...@hotmail.com>
AuthorDate: Thu Feb 2 01:07:39 2023 +0530
Fix dag run trigger with a note. (#29228)
* Fix dag run trigger with a note.
Currently, triggering dag run with note gives 400. This PR fixes it.
Closes: #28825
* make flask_login.current_user a global import
* precommit error fixes.
* add test coverage for the fix
(cherry picked from commit b94f36bf563f5c8372086cec63b74eadef638ef8)
---
airflow/api_connexion/endpoints/dag_run_endpoint.py | 7 +++++--
airflow/api_connexion/schemas/dag_run_schema.py | 2 +-
tests/api_connexion/endpoints/test_dag_run_endpoint.py | 14 +++++++-------
3 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index b566ba2df7..2b3ee16b2f 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -21,6 +21,7 @@ from http import HTTPStatus
import pendulum
from connexion import NoContent
from flask import g
+from flask_login import current_user
from marshmallow import ValidationError
from sqlalchemy import or_
from sqlalchemy.orm import Query, Session
@@ -319,6 +320,10 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id),
session=session,
)
+ dag_run_note = post_body.get("note")
+ if dag_run_note:
+ current_user_id = getattr(current_user, "id", None)
+ dag_run.note = (dag_run_note, current_user_id)
return dagrun_schema.dump(dag_run)
except ValueError as ve:
raise BadRequest(detail=str(ve))
@@ -438,8 +443,6 @@ def set_dag_run_note(*, dag_id: str, dag_run_id: str, session: Session = NEW_SES
except ValidationError as err:
raise BadRequest(detail=str(err))
- from flask_login import current_user
-
current_user_id = getattr(current_user, "id", None)
if dag_run.dag_run_note is None:
dag_run.note = (new_note, current_user_id)
diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py
index 7ca857951b..999b533728 100644
--- a/airflow/api_connexion/schemas/dag_run_schema.py
+++ b/airflow/api_connexion/schemas/dag_run_schema.py
@@ -73,7 +73,7 @@ class DAGRunSchema(SQLAlchemySchema):
data_interval_end = auto_field(dump_only=True)
last_scheduling_decision = auto_field(dump_only=True)
run_type = auto_field(dump_only=True)
- note = auto_field(dump_only=True)
+ note = auto_field(dump_only=False)
@pre_load
def autogenerate(self, data, **kwargs):
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 8b02bb321f..b645e601e9 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1018,14 +1018,14 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
class TestPostDagRun(TestDagRunEndpoint):
@pytest.mark.parametrize("logical_date_field_name", ["execution_date", "logical_date"])
@pytest.mark.parametrize(
- "dag_run_id, logical_date",
+ "dag_run_id, logical_date, note",
[
- pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", id="both-present"),
- pytest.param(None, "2020-06-11T18:00:00+00:00", id="only-date"),
- pytest.param(None, None, id="both-missing"),
+ pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", id="all-present"),
+ pytest.param(None, "2020-06-11T18:00:00+00:00", None, id="only-date"),
+ pytest.param(None, None, None, id="all-missing"),
],
)
- def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_date):
+ def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_date, note):
self._create_dag("TEST_DAG_ID")
# We'll patch airflow.utils.timezone.utcnow to always return this so we
@@ -1037,7 +1037,7 @@ class TestPostDagRun(TestDagRunEndpoint):
request_json[logical_date_field_name] = logical_date
if dag_run_id is not None:
request_json["dag_run_id"] = dag_run_id
-
+ request_json["note"] = note
with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now):
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
@@ -1068,7 +1068,7 @@ class TestPostDagRun(TestDagRunEndpoint):
"data_interval_start": expected_logical_date,
"last_scheduling_decision": None,
"run_type": "manual",
- "note": None,
+ "note": note,
}
def test_should_respond_400_if_a_dag_has_import_errors(self, session):