You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2021/08/20 08:37:07 UTC

[airflow] branch main updated: Add logical_date to OpenAPI DAGRun schema (#17122)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 65684dd  Add logical_date to OpenAPI DAGRun schema (#17122)
65684dd is described below

commit 65684dda88d7eed32c89a8f06c0c5a848000bafe
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Fri Aug 20 16:36:42 2021 +0800

    Add logical_date to OpenAPI DAGRun schema (#17122)
    
    The idea is to make *both* logical_date and execution_date get
    serialized when a DAGRun is returned, but prefer logical_date
    from the user input (and fall back to execution_date when only it
    is provided).
---
 .../api_connexion/endpoints/dag_run_endpoint.py    |  15 ++-
 airflow/api_connexion/openapi/v1.yaml              |  18 +++-
 airflow/api_connexion/schemas/dag_run_schema.py    |  39 ++++++--
 .../endpoints/test_dag_run_endpoint.py             | 108 +++++++++++++++++----
 tests/api_connexion/schemas/test_dag_run_schema.py |   3 +
 5 files changed, 145 insertions(+), 38 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 80d04ab..feb6d24 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -243,13 +243,13 @@ def post_dag_run(dag_id, session):
     except ValidationError as err:
         raise BadRequest(detail=str(err))
 
-    execution_date = post_body["execution_date"]
+    logical_date = post_body["execution_date"]
     run_id = post_body["run_id"]
     dagrun_instance = (
         session.query(DagRun)
         .filter(
             DagRun.dag_id == dag_id,
-            or_(DagRun.run_id == run_id, DagRun.execution_date == execution_date),
+            or_(DagRun.run_id == run_id, DagRun.execution_date == logical_date),
         )
         .first()
     )
@@ -257,7 +257,7 @@ def post_dag_run(dag_id, session):
         dag_run = current_app.dag_bag.get_dag(dag_id).create_dagrun(
             run_type=DagRunType.MANUAL,
             run_id=run_id,
-            execution_date=execution_date,
+            execution_date=logical_date,
             state=State.QUEUED,
             conf=post_body.get("conf"),
             external_trigger=True,
@@ -265,12 +265,9 @@ def post_dag_run(dag_id, session):
         )
         return dagrun_schema.dump(dag_run)
 
-    if dagrun_instance.execution_date == execution_date:
+    if dagrun_instance.execution_date == logical_date:
         raise AlreadyExists(
-            detail=f"DAGRun with DAG ID: '{dag_id}' and "
-            f"DAGRun ExecutionDate: '{post_body['execution_date']}' already exists"
+            detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun logical date: '{logical_date}' already exists"
         )
 
-    raise AlreadyExists(
-        detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{post_body['run_id']}' already exists"
-    )
+    raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")
diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 9ff9484..05f73f6 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -1946,17 +1946,27 @@ components:
         dag_id:
           type: string
           readOnly: true
-        execution_date:
-          description: >
-            The execution date. This is the time when the DAG run should be started according
-            to the DAG definition.
+        logical_date:
+          type: string
+          nullable: true
+          description: |
+            The logical date (previously called execution date). This is the time or interval covered by
+            this DAG run, according to the DAG definition.
 
             The value of this field can be set only when creating the object. If you try to modify the
             field of an existing object, the request fails with an BAD_REQUEST error.
 
             This together with DAG_ID are a unique key.
+          format: date-time
+        execution_date:
           type: string
+          nullable: true
+          description: |
+            The execution date. This is the same as logical_date, kept for backwards compatibility.
+            If both this field and logical_date are provided but with different values, the request
+            will fail with an BAD_REQUEST error.
           format: date-time
+          deprecated: true
         start_date:
           type: string
           format: date-time
diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py
index fd1cb30..97258ab 100644
--- a/airflow/api_connexion/schemas/dag_run_schema.py
+++ b/airflow/api_connexion/schemas/dag_run_schema.py
@@ -18,7 +18,7 @@
 import json
 from typing import List, NamedTuple
 
-from marshmallow import fields, pre_load
+from marshmallow import fields, post_dump, pre_load
 from marshmallow.schema import Schema
 from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
 from pendulum.parsing import ParserError
@@ -45,6 +45,9 @@ class ConfObject(fields.Field):
         return value
 
 
+_MISSING = object()
+
+
 class DAGRunSchema(SQLAlchemySchema):
     """Schema for DAGRun"""
 
@@ -56,7 +59,7 @@ class DAGRunSchema(SQLAlchemySchema):
 
     run_id = auto_field(data_key='dag_run_id')
     dag_id = auto_field(dump_only=True)
-    execution_date = auto_field(validate=validate_istimezone)
+    execution_date = auto_field(data_key="logical_date", validate=validate_istimezone)
     start_date = auto_field(dump_only=True)
     end_date = auto_field(dump_only=True)
     state = DagStateField(dump_only=True)
@@ -65,18 +68,40 @@ class DAGRunSchema(SQLAlchemySchema):
 
     @pre_load
     def autogenerate(self, data, **kwargs):
-        """Auto generate run_id and execution_date if they are not loaded"""
-        if "execution_date" not in data.keys():
-            data["execution_date"] = str(timezone.utcnow())
-        if "dag_run_id" not in data.keys():
+        """Auto generate run_id and logical_date if they are not provided.
+
+        For compatibility, if `execution_date` is submitted, it is converted
+        to `logical_date`.
+        """
+        logical_date = data.get("logical_date", _MISSING)
+        execution_date = data.pop("execution_date", _MISSING)
+        if logical_date is execution_date is _MISSING:  # Both missing.
+            data["logical_date"] = str(timezone.utcnow())
+        elif logical_date is _MISSING:  # Only logical_date missing.
+            data["logical_date"] = execution_date
+        elif execution_date is _MISSING:  # Only execution_date missing.
+            pass
+        elif logical_date != execution_date:  # Both provided but don't match.
+            raise BadRequest(
+                "logical_date conflicts with execution_date",
+                detail=f"{logical_date!r} != {execution_date!r}",
+            )
+
+        if "dag_run_id" not in data:
             try:
                 data["dag_run_id"] = DagRun.generate_run_id(
-                    DagRunType.MANUAL, timezone.parse(data["execution_date"])
+                    DagRunType.MANUAL, timezone.parse(data["logical_date"])
                 )
             except (ParserError, TypeError) as err:
                 raise BadRequest("Incorrect datetime argument", detail=str(err))
         return data
 
+    @post_dump
+    def autofill(self, data, **kwargs):
+        """Populate execution_date from logical_date for compatibility."""
+        data["execution_date"] = data["logical_date"]
+        return data
+
 
 class DAGRunCollection(NamedTuple):
     """List of DAGRuns with metadata"""
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 03bd7d8..71b5274 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 from datetime import timedelta
+from unittest import mock
 
 import pytest
 from parameterized import parameterized
@@ -211,6 +212,7 @@ class TestGetDagRun(TestDagRunEndpoint):
             'dag_run_id': 'TEST_DAG_RUN_ID',
             'end_date': None,
             'state': 'running',
+            'logical_date': self.default_time,
             'execution_date': self.default_time,
             'external_trigger': True,
             'start_date': self.default_time,
@@ -265,6 +267,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
                     'end_date': None,
                     'state': 'running',
                     'execution_date': self.default_time,
+                    'logical_date': self.default_time,
                     'external_trigger': True,
                     'start_date': self.default_time,
                     'conf': {},
@@ -275,6 +278,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
                     'end_date': None,
                     'state': 'running',
                     'execution_date': self.default_time_2,
+                    'logical_date': self.default_time_2,
                     'external_trigger': True,
                     'start_date': self.default_time,
                     'conf': {},
@@ -313,6 +317,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
                     'end_date': None,
                     'state': 'running',
                     'execution_date': self.default_time_2,
+                    'logical_date': self.default_time_2,
                     'external_trigger': True,
                     'start_date': self.default_time,
                     'conf': {},
@@ -323,6 +328,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
                     'end_date': None,
                     'state': 'running',
                     'execution_date': self.default_time,
+                    'logical_date': self.default_time,
                     'external_trigger': True,
                     'start_date': self.default_time,
                     'conf': {},
@@ -566,6 +572,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
                     'end_date': None,
                     'state': 'running',
                     'execution_date': self.default_time,
+                    'logical_date': self.default_time,
                     'external_trigger': True,
                     'start_date': self.default_time,
                     'conf': {},
@@ -576,6 +583,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
                     'end_date': None,
                     'state': 'running',
                     'execution_date': self.default_time_2,
+                    'logical_date': self.default_time_2,
                     'external_trigger': True,
                     'start_date': self.default_time,
                     'conf': {},
@@ -600,6 +608,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
                     'end_date': None,
                     'state': 'running',
                     'execution_date': self.default_time_2,
+                    'logical_date': self.default_time_2,
                     'external_trigger': True,
                     'start_date': self.default_time,
                     'conf': {},
@@ -610,6 +619,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
                     'end_date': None,
                     'state': 'running',
                     'execution_date': self.default_time,
+                    'logical_date': self.default_time,
                     'external_trigger': True,
                     'start_date': self.default_time,
                     'conf': {},
@@ -645,6 +655,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
                     'end_date': None,
                     'state': 'running',
                     'execution_date': self.default_time,
+                    'logical_date': self.default_time,
                     'external_trigger': True,
                     'start_date': self.default_time,
                     'conf': {},
@@ -655,6 +666,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
                     'end_date': None,
                     'state': 'running',
                     'execution_date': self.default_time_2,
+                    'logical_date': self.default_time_2,
                     'external_trigger': True,
                     'start_date': self.default_time,
                     'conf': {},
@@ -905,41 +917,101 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
 
 
 class TestPostDagRun(TestDagRunEndpoint):
-    @parameterized.expand(
+    @pytest.mark.parametrize("logical_date_field_name", ["execution_date", "logical_date"])
+    @pytest.mark.parametrize(
+        "dag_run_id, logical_date",
         [
-            (
-                "All fields present",
-                {
-                    "dag_run_id": "TEST_DAG_RUN",
-                    "execution_date": "2020-06-11T18:00:00+00:00",
-                },
-            ),
-            ("dag_run_id missing", {"execution_date": "2020-06-11T18:00:00+00:00"}),
-            ("dag_run_id and execution_date missing", {}),
-        ]
+            pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", id="both-present"),
+            pytest.param(None, "2020-06-11T18:00:00+00:00", id="only-date"),
+            pytest.param(None, None, id="both-missing"),
+        ],
     )
-    def test_should_respond_200(self, name, request_json):
-        del name
+    def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_date):
+        self._create_dag("TEST_DAG_ID")
+
+        # We'll patch airflow.utils.timezone.utcnow to always return this so we
+        # can check the returned dates.
+        fixed_now = timezone.utcnow()
+
+        request_json = {}
+        if logical_date is not None:
+            request_json[logical_date_field_name] = logical_date
+        if dag_run_id is not None:
+            request_json["dag_run_id"] = dag_run_id
+
+        with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now):
+            response = self.client.post(
+                "api/v1/dags/TEST_DAG_ID/dagRuns",
+                json=request_json,
+                environ_overrides={"REMOTE_USER": "test"},
+            )
+        assert response.status_code == 200
+
+        if logical_date is None:
+            expected_logical_date = fixed_now.isoformat()
+        else:
+            expected_logical_date = logical_date
+        if dag_run_id is None:
+            expected_dag_run_id = f"manual__{expected_logical_date}"
+        else:
+            expected_dag_run_id = dag_run_id
+        assert {
+            "conf": {},
+            "dag_id": "TEST_DAG_ID",
+            "dag_run_id": expected_dag_run_id,
+            "end_date": None,
+            "execution_date": expected_logical_date,
+            "logical_date": expected_logical_date,
+            "external_trigger": True,
+            "start_date": None,
+            "state": "queued",
+        } == response.json
+
+    def test_should_response_200_for_matching_execution_date_logical_date(self):
         self._create_dag("TEST_DAG_ID")
         response = self.client.post(
-            "api/v1/dags/TEST_DAG_ID/dagRuns", json=request_json, environ_overrides={'REMOTE_USER': "test"}
+            "api/v1/dags/TEST_DAG_ID/dagRuns",
+            json={
+                "execution_date": "2020-11-10T08:25:56.939143+00:00",
+                "logical_date": "2020-11-10T08:25:56.939143+00:00",
+            },
+            environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 200
         assert {
             "conf": {},
             "dag_id": "TEST_DAG_ID",
-            "dag_run_id": response.json["dag_run_id"],
+            "dag_run_id": "manual__2020-11-10T08:25:56.939143+00:00",
             "end_date": None,
-            "execution_date": response.json["execution_date"],
+            "execution_date": "2020-11-10T08:25:56.939143+00:00",
+            "logical_date": "2020-11-10T08:25:56.939143+00:00",
             "external_trigger": True,
             "start_date": None,
             "state": "queued",
         } == response.json
 
+    def test_should_response_400_for_conflicting_execution_date_logical_date(self):
+        self._create_dag("TEST_DAG_ID")
+        response = self.client.post(
+            "api/v1/dags/TEST_DAG_ID/dagRuns",
+            json={
+                "execution_date": "2020-11-10T08:25:56.939143+00:00",
+                "logical_date": "2020-11-11T08:25:56.939143+00:00",
+            },
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert response.status_code == 400
+        assert response.json["title"] == "logical_date conflicts with execution_date"
+        assert response.json["detail"] == (
+            "'2020-11-11T08:25:56.939143+00:00' != '2020-11-10T08:25:56.939143+00:00'"
+        )
+
     @parameterized.expand(
         [
             ({'execution_date': "2020-11-10T08:25:56.939143"}, 'Naive datetime is disallowed'),
-            ({'execution_date': "2020-11-10T08:25:56P"}, "{'execution_date': ['Not a valid datetime.']}"),
+            ({'execution_date': "2020-11-10T08:25:56P"}, "{'logical_date': ['Not a valid datetime.']}"),
+            ({'logical_date': "2020-11-10T08:25:56.939143"}, 'Naive datetime is disallowed'),
+            ({'logical_date': "2020-11-10T08:25:56P"}, "{'logical_date': ['Not a valid datetime.']}"),
         ]
     )
     def test_should_response_400_for_naive_datetime_and_bad_datetime(self, data, expected):
@@ -1054,7 +1126,7 @@ class TestPostDagRun(TestDagRunEndpoint):
         assert response.status_code == 409, response.data
         assert response.json == {
             "detail": "DAGRun with DAG ID: 'TEST_DAG_ID' and "
-            "DAGRun ExecutionDate: '2020-06-11 18:00:00+00:00' already exists",
+            "DAGRun logical date: '2020-06-11 18:00:00+00:00' already exists",
             "status": 409,
             "title": "Conflict",
             "type": EXCEPTIONS_LINK_MAP[409],
diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py b/tests/api_connexion/schemas/test_dag_run_schema.py
index 9e4a9e8..b5333f1 100644
--- a/tests/api_connexion/schemas/test_dag_run_schema.py
+++ b/tests/api_connexion/schemas/test_dag_run_schema.py
@@ -66,6 +66,7 @@ class TestDAGRunSchema(TestDAGRunBase):
             "end_date": None,
             "state": "running",
             "execution_date": self.default_time,
+            "logical_date": self.default_time,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {"start": "stop"},
@@ -150,6 +151,7 @@ class TestDagRunCollection(TestDAGRunBase):
                     "dag_run_id": "my-dag-run",
                     "end_date": None,
                     "execution_date": self.default_time,
+                    "logical_date": self.default_time,
                     "external_trigger": True,
                     "state": "running",
                     "start_date": self.default_time,
@@ -161,6 +163,7 @@ class TestDagRunCollection(TestDAGRunBase):
                     "end_date": None,
                     "state": "running",
                     "execution_date": self.default_time,
+                    "logical_date": self.default_time,
                     "external_trigger": True,
                     "start_date": self.default_time,
                     "conf": {},