You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/12 04:28:30 UTC

[GitHub] [airflow] uranusjr opened a new pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

uranusjr opened a new pull request #16352:
URL: https://github.com/apache/airflow/pull/16352


   Close #16304. Todo:
   
   * [x] Add those fields to the actual `DagRun` model.
   * [x] Populate the fields when a `DagRun` instance is created (when `DagRun.execution_date` is populated). I think (hope) this field is never updated after a run is created?
   * [x] Add `logical_date` and `data_interval` to template context.
       * [x] Emit deprecation warning when the `execution_date` context variable is accessed. (fix #16372)
   * Change functions that create `DagRun` instances to use `data_interval`.
       * [x] `DAG.create_dagrun()`
       * [x] Connexion `post_dag_run()`
       * [ ] (Optional) A lot of tests
   * Change functions that call `DAG.create_dagrun()` to use `data_interval`.
   	* [x] `BackfillJob`
   	* [x] `SchedulerJob`
   	* [x] `SubDagOperator`
   	* [x] Web UI manual trigger view
   	* [ ] (Optional) A lot of tests
   	* [ ] (Optional) Experiemental API
   * [x] Add documentation on how the template field meanings change and how they map to the previous values.
   * Test data interval is correctly populated when triggered
   	* [x] In the scheduler (manual, scheduled, and backfilled, etc.)
   	* [x] From the web UI
   	* [x] From the CLI
   	* From the stable API (this needs things in #17122 and will be a part of that PR instead)
   * Add documentation on data intervals. (Broken into another PR since I think there are many things to change)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678402962



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -242,16 +242,23 @@ def post_dag_run(dag_id, session):
     except ValidationError as err:
         raise BadRequest(detail=str(err))
 
+    execution_date = post_body["execution_date"]
     dagrun_instance = (
         session.query(DagRun)
         .filter(
             DagRun.dag_id == dag_id,
-            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == post_body["execution_date"]),
+            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == execution_date),
         )
         .first()
     )
     if not dagrun_instance:
-        dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, **post_body)
+        dag = current_app.dag_bag.get_dag(dag_id)
+        dag_run = DagRun(
+            dag_id=dag_id,
+            run_type=DagRunType.MANUAL,
+            data_interval=dag.timetable.infer_data_interval(execution_date),
+            **post_body,
+        )

Review comment:
       No; I can change this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r657801000



##########
File path: airflow/models/dag.py
##########
@@ -1812,19 +1815,40 @@ def create_dagrun(
             if not isinstance(run_id, str):
                 raise ValueError(f"`run_id` expected to be a str is {type(run_id)}")
             run_type: DagRunType = DagRunType.from_run_id(run_id)
-        elif run_type and execution_date:
+
+        # The preferred signature *requires* the data_interval argument. The
+        # legacy form of accepting an optional execution_date (and disallowing
+        # data_interval) is deprecated but accepted for compatibility.
+        if data_interval is None:
+            warnings.warn(
+                "Creating a DagRun without data_interval is deprecated.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            if execution_date is None:
+                start = timezone.utcnow()
+            else:
+                start = execution_date
+            if run_type == DagRunType.MANUAL:
+                data_interval = (start, start)
+            else:
+                data_interval = (start, self.following_schedule(start))
+        elif execution_date is not None:
+            raise TypeError("cannot set data_interval and execution_date together")
+
+        if not run_id:
+            if not run_type or not data_interval:
+                raise AirflowException(
+                    "Creating DagRun needs either `run_id` or both `run_type` and `data_interval`"
+                )
             if not isinstance(run_type, DagRunType):
                 raise ValueError(f"`run_type` expected to be a DagRunType is {type(run_type)}")
-            run_id = DagRun.generate_run_id(run_type, execution_date)
-        elif not run_id:
-            raise AirflowException(
-                "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
-            )
+            run_id = DagRun.generate_run_id(run_type, data_interval[0])
 
         run = DagRun(
             dag_id=self.dag_id,
             run_id=run_id,
-            execution_date=execution_date,

Review comment:
       The AIP calls for a column called `schedule_date` https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval#AIP39Richerscheduler_interval-RenameDagRun.execution_datetoschedule_date
   
   I'm happy to change the name of that column, but I think still we need _a_ canonical date on DagRun table to enable chronological sorting, don't we?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r657797382



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+import json
+
+from alembic import op
+from sqlalchemy import Column, Integer, String, or_
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import foreign, relationship
+from sqlalchemy_jsonfield import JSONField
+
+from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.utils.session import create_session
+from airflow.utils.sqlalchemy import Interval, UtcDateTime
+from airflow.utils.types import DagRunType
+
+# Revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "30867afad44a"
+branch_labels = None
+depends_on = None
+
+ID_LEN = 250
+
+Base = declarative_base()
+
+
+class DagModel(Base):
+    """A partially frozen ``airflow.models.DagModel`` class."""
+
+    __tablename__ = "dag"
+
+    dag_id = Column(String(ID_LEN), primary_key=True)
+    schedule_interval = Column(Interval)
+
+
+class DagRun(Base):
+    """A partially frozen ``airflow.models.DagRun`` class."""
+
+    __tablename__ = "dag_run"
+
+    id = Column(Integer, primary_key=True)
+    dag_id = Column(String(ID_LEN))
+    execution_date = Column(UtcDateTime)
+    data_interval_start = Column(UtcDateTime)
+    data_interval_end = Column(UtcDateTime)
+    run_id = Column(String(ID_LEN))
+    run_type = Column(String(50))
+
+    dag = relationship(DagModel, primaryjoin=(foreign(DagModel.dag_id) == dag_id))
+
+
+class SerializedDagModel(Base):
+    """A partially frozen ``airflow.models.SerializedDagModel`` class."""
+
+    __tablename__ = "serialized_dag"
+
+    dag_id = Column(String(ID_LEN), primary_key=True)
+    data = Column(JSONField(json=json))
+
+    @property
+    def dag(self):
+        """Copied from the original model class."""
+        SerializedDAG._load_operator_extra_links = self.load_op_links  # pylint: disable=protected-access
+        if isinstance(self.data, dict):
+            return SerializedDAG.from_dict(self.data)
+        return SerializedDAG.from_json(self.data)
+
+
+# These kinds of runs don't have a data interval and can be bulk-updated with
+# one SQL call, so we do them separately. Other "real" schedule intervals are
+# too complicated and need to be populated manually.
+NO_SCHEDULE_FILTER = or_(
+    DagModel.schedule_interval.in_([None, "@once"]),
+    DagRun.run_type == DagRunType.MANUAL,
+)
+
+
+def _populate_simple_dagrun_intervals(session):
+    """Handle DAG runs with simple schedule intervals."""
+    updates = {
+        DagRun.data_interval_start: DagRun.execution_date,
+        DagRun.data_interval_end: DagRun.execution_date,
+    }
+    # SQLite doesn't support UPDATE ... WHERE with multiple tables, and MySQL
+    # doesn't support subqeury in UPDATE, so...
+    if op.get_bind().dialect.name != 'sqlite':
+        dag_runs = session.query(DagRun).filter(DagRun.dag, NO_SCHEDULE_FILTER)
+    else:
+        dag_run_ids = session.query(DagRun.id).filter(NO_SCHEDULE_FILTER)
+        dag_runs = session.query(DagRun).filter(DagRun.id.in_(dag_run_ids.subquery()))
+    dag_runs.update(updates, synchronize_session=False)
+
+
+def _populate_complex_dagrun_intervals(session):
+    """Handle DAG runs with "real" schedule intervals."""
+    dag_runs_with_serialized = session.query(DagRun, SerializedDagModel).filter(
+        DagModel.dag_id == DagRun.dag_id,
+        SerializedDagModel.dag_id == DagRun.dag_id,
+        ~NO_SCHEDULE_FILTER,
+    )
+    for dag_run, serialized in dag_runs_with_serialized:
+        dag = serialized.dag
+        data_interval_start = dag_run.execution_date
+        dag_run.data_interval_start = data_interval_start
+        dag_run.data_interval_end = dag.following_schedule(data_interval_start)
+        session.merge(dag_run, load=False)
+
+
+def upgrade():
+    """Apply add data_interval_[start|end] to DagRun."""
+    # Create aolumns with NULL as default.
+    with op.batch_alter_table("dag_run") as batch_op:
+        batch_op.add_column(Column("data_interval_start", UtcDateTime))

Review comment:
       Is this the right type for mssql? Paging @aneesh-joseph 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679620135



##########
File path: airflow/models/taskinstance.py
##########
@@ -1670,37 +1667,97 @@ def get(
                 except AirflowNotFoundException:
                     return default_conn
 
+        def deprecated_proxy(func, *, key, replacement=None) -> lazy_object_proxy.Proxy:
+            def deprecated_func():
+                message = (
+                    f"Accessing {key!r} from the template is deprecated and "
+                    f"will be removed in a future version."

Review comment:
       I like having an extra f here since it aligns the literals :p




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-891859321


   Note: I’ve changed `schedule_date` in this PR to `logical_date` as I mentioned on the dev mailing list: lists.apache.org/thread.html/r00d88d22ae15f9c98b91c8f28163c09e59fb4a6436586861c69a66b7%40%3Cdev.airflow.apache.org%3E
   
   @eladkal said he doesn’t particularly like the name and plans to share some thoughts on the mailing list. But I’m going to use `logical_date` for now since we need to use _something_ (AIP-39 implementation would be greatly delayed if we block this PR until the name issue is fully resolved), and `logical_date` is already better than `schedule_date`. We can always mass-replace all the occurences (like I did in the rename commit) when we settle on something else 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r681202782



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -242,21 +243,29 @@ def post_dag_run(dag_id, session):
     except ValidationError as err:
         raise BadRequest(detail=str(err))
 
+    execution_date = post_body["execution_date"]
+    run_id = post_body["run_id"]
     dagrun_instance = (
         session.query(DagRun)
         .filter(
             DagRun.dag_id == dag_id,
-            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == post_body["execution_date"]),
+            or_(DagRun.run_id == run_id, DagRun.execution_date == execution_date),
         )
         .first()
     )
     if not dagrun_instance:
-        dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, **post_body)
-        session.add(dag_run)
-        session.commit()
+        dag_run = current_app.dag_bag.get_dag(dag_id).create_dagrun(
+            run_type=DagRunType.MANUAL,
+            run_id=run_id,
+            execution_date=execution_date,
+            state=State.QUEUED,
+            conf=post_body.get("conf"),
+            external_trigger=True,
+            dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
+        )

Review comment:
       In line 239 above, we queried the DagModel to check if dag exists. I'm thinking that instead of `current_app.dag_bag.get_dag(dag_id).create_dagrun` we should get dag from line 239:
   ```dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()``` and then `dag.create_dagrun`, what do you think?
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-858663634


   Oh, we'll need to add these to the OpenAPI defn too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-860746684


   > > Oh, we'll need to add these to the OpenAPI defn too.
   > 
   > Yeah, I’m working on it, the `post_dag_run` endpoint also needs updating. How should we handle backward compatibility in API? My current approach is to allow the caller to pass _either_ `execution_date` or `data_interval_[start|end]`, but how do I make this show in the schema? Also, is there a way to emit a message to tell the client to migrate?
   
   For it to show on the schema, you need to update the DagRun Component https://github.com/apache/airflow/blob/943292b4e0c494f023c86d648289b1f23ccb0ee9/airflow/api_connexion/openapi/v1.yaml#L1847


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: AIP-39: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-883382159


   Alright, I think this is ready (if the tests pass as expected).
   
   To keep the PR in a reasonable size, I did *not* do the followings:
   
   * Rename `execution_date` on SQLAlchemy models to `schedule_date` (including DagRun, TaskInstance, and many more)
   * Allow `schedule_date` to be passed from Connexion API.
   
   These will be done in later PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r683390362



##########
File path: docs/apache-airflow/macros-ref.rst
##########
@@ -35,28 +35,23 @@ in all templates
 =====================================   ====================================
 Variable                                Description
 =====================================   ====================================
-``{{ ds }}``                            the execution date as ``YYYY-MM-DD``
-``{{ ds_nodash }}``                     the execution date as ``YYYYMMDD``
-``{{ prev_ds }}``                       the previous execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ prev_ds }}`` will be ``2018-01-01``
-``{{ prev_ds_nodash }}``                the previous execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ next_ds }}``                       the next execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-01`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ next_ds }}`` will be ``2018-01-08``
-``{{ next_ds_nodash }}``                the next execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ yesterday_ds }}``                  the day before the execution date as ``YYYY-MM-DD``
-``{{ yesterday_ds_nodash }}``           the day before the execution date as ``YYYYMMDD``
-``{{ tomorrow_ds }}``                   the day after the execution date as ``YYYY-MM-DD``
-``{{ tomorrow_ds_nodash }}``            the day after the execution date as ``YYYYMMDD``
-``{{ ts }}``                            same as ``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
+``{{ logical_date }}``                  the logical date of the DAG run (`pendulum.Pendulum`_)
+``{{ ds }}``                            the logical date as ``YYYY-MM-DD``

Review comment:
       Maybe we need to leave this as it is but deprecate it, and replace with `{{ data_interval_start | ds }}` (i.e. add new `ds`, `ds_nodash`, `ts`, etc filters) -- separate PR there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r649234817



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+import alembic
+import sqlalchemy
+
+from airflow.models import DagModel, DagRun
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.utils.session import create_session
+from airflow.utils.sqlalchemy import UtcDateTime
+from airflow.utils.timezone import utcnow
+
+# revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "e9304a3141f0"
+branch_labels = None
+depends_on = None
+
+
+# None and "@once" schedule intervals don't have a data interval and can be
+# bulk-updated with one SQL call, so we do them separately. Other "real"
+# schedule intervals are too complicated and need to be populated manually.
+NO_SCHEDULE_FILTER = DagModel.schedule_interval.in_([None, "@once"])
+
+
+def _populate_simple_dagrun_intervals(session):
+    """Handle DAG runs with simple schedule intervals."""
+    updates = {
+        DagRun.data_interval_start: DagRun.execution_date,
+        DagRun.data_interval_end: DagRun.execution_date,
+    }
+    joined = session.query(DagRun).join(DagModel, DagModel.dag_id == DagRun.dag_id)
+    joined.filter(NO_SCHEDULE_FILTER).update(updates)
+
+
+def _populate_complex_dagrun_intervals(session):
+    """Handle DAG runs with "real" schedule intervals."""
+    joined = session.query(DagRun, SerializedDagModel).join(
+        SerializedDagModel,
+        DagRun.dag_id == SerializedDagModel.dag_id,
+    )
+    for dag_run, serialized in joined.filter(~NO_SCHEDULE_FILTER):
+        dag = serialized.dag
+        data_interval_start = dag_run.execution_date
+        dag_run.data_interval_start = data_interval_start
+        dag_run.data_interval_end = dag.following_schedule(data_interval_start)

Review comment:
       Is this at all possible without requiring the seriazlied dag table? -- inflating every dag, and walking over every dag run is not going to be quick.
   
   There is _a_ `schedule_interval` column on DagModel currently, that I _think_ gives us enough info? Not sure it saves us much run time though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679537509



##########
File path: airflow/models/dag.py
##########
@@ -596,23 +649,17 @@ def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
         :return: A list of dates within the interval following the dag's schedule.

Review comment:
       We should remove align from docstring
   
   https://github.com/apache/airflow/blob/47908878a96775db49eac49339926c0ccff1e3be/airflow/models/dag.py#L644-L648

##########
File path: airflow/models/dag.py
##########
@@ -579,7 +580,59 @@ def timetable(self) -> Timetable:
         type_name = type(interval).__name__
         raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.")
 
-    def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
+    def iter_dagrun_infos_between(
+        self,
+        earliest: Optional[pendulum.DateTime],
+        latest: pendulum.DateTime,
+        *,
+        align: bool = True,
+    ) -> Iterable[DagRunInfo]:
+        """Yield DagRunInfo using this DAG's timetable between given interval.
+
+        DagRunInfo instances yielded if their ``schedule_date`` is not earlier
+        than ``earliest``, nor later than ``latest``. The instances are ordered
+        by their ``schedule_date`` from earliest to latest.
+
+        If ``align`` is ``False``, the first run will happen immediately on
+        ``ealiest``, even if it does not fall on the logical timetable schedule.

Review comment:
       ```suggestion
           ``earliest``, even if it does not fall on the logical timetable schedule.
   ```

##########
File path: airflow/timetables/base.py
##########
@@ -79,6 +74,18 @@ def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
         """
         return cls(run_after=end, data_interval=DataInterval(start, end))
 
+    @property
+    def schedule_date(self) -> DateTime:

Review comment:
       Shouldn't this be `run_date` based on our proposal?
   
   https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval#AIP39Richerscheduler_interval-AbstractTimetableinterface

##########
File path: airflow/models/taskinstance.py
##########
@@ -1670,37 +1667,97 @@ def get(
                 except AirflowNotFoundException:
                     return default_conn
 
+        def deprecated_proxy(func, *, key, replacement=None) -> lazy_object_proxy.Proxy:
+            def deprecated_func():
+                message = (
+                    f"Accessing {key!r} from the template is deprecated and "
+                    f"will be removed in a future version."

Review comment:
       ```suggestion
                       "will be removed in a future version."
   ```

##########
File path: airflow/timetables/interval.py
##########
@@ -43,6 +43,13 @@ def __eq__(self, other: Any) -> bool:
     def validate(self) -> None:
         self._schedule.validate()
 
+    def infer_data_interval(self, run_after: DateTime) -> Optional[DataInterval]:
+        # Get the last complete period before run_after, e.g. if a DAG run is
+        # scheduled at each midnight, the data interval of a manually trioggered

Review comment:
       ```suggestion
           # scheduled at each midnight, the data interval of a manually triggered
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2403,6 +2461,24 @@ def __init__(self, concurrency=None, **kwargs):
     def __repr__(self):
         return f"<DAG: {self.dag_id}>"
 
+    @property
+    def next_data_interval(self) -> Optional[Tuple[datetime, datetime]]:
+        if self.next_dagrun_data_interval_start is None:
+            if self.next_dagrun_data_interval_end is not None:
+                raise AirflowException(
+                    "Inconsistent DagModel: next_dagrun_data_interval_start and "
+                    "next_dagrun_data_interval_end must be either both None or both datetime"
+                )

Review comment:
       Let's show the values that are passed too, should help in debugging

##########
File path: airflow/models/dag.py
##########
@@ -579,7 +580,59 @@ def timetable(self) -> Timetable:
         type_name = type(interval).__name__
         raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.")
 
-    def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
+    def iter_dagrun_infos_between(
+        self,
+        earliest: Optional[pendulum.DateTime],
+        latest: pendulum.DateTime,
+        *,
+        align: bool = True,
+    ) -> Iterable[DagRunInfo]:
+        """Yield DagRunInfo using this DAG's timetable between given interval.
+
+        DagRunInfo instances yielded if their ``schedule_date`` is not earlier
+        than ``earliest``, nor later than ``latest``. The instances are ordered
+        by their ``schedule_date`` from earliest to latest.
+
+        If ``align`` is ``False``, the first run will happen immediately on
+        ``ealiest``, even if it does not fall on the logical timetable schedule.
+        The default is ``True``, but subdags will ignore this value and always
+        behave as if this is set to ``False`` for backward compatibility.
+        """
+        if earliest is None:
+            earliest = self._time_restriction.earliest
+        earliest = timezone.coerce_datetime(earliest)
+        latest = timezone.coerce_datetime(latest)
+
+        restriction = TimeRestriction(earliest, latest, catchup=True)
+
+        # HACK: Sub-DAGs are currently scheduled differently. For example, say
+        # the schedule is @daily and start is 2021-06-03 22:16:00, a top-level
+        # DAG should be first scheduled to run on midnight 2021-06-04, but a
+        # sub-DAG should be first scheduled to run RIGHT NOW. We can change
+        # this, but since sub-DAGs are going away in 3.0 anyway, let's keep
+        # compatibility for now and remove this entirely later.
+        if self.is_subdag:
+            align = False
+
+        info = self.timetable.next_dagrun_info(None, restriction)
+        if info is None:
+            # No runs to be scheduled between the user-supplied timeframe. But
+            # if align=False, "invent" a data interval for the timeframe itself.
+            if not align:
+                yield DagRunInfo.interval(earliest, latest)
+            return
+
+        # If align=False and earliest is not a logical schedule date, "invent"
+        # a data interval betwwen it and the first schedule date.

Review comment:
       ```suggestion
           # a data interval between it and the first schedule date.
   ```

##########
File path: airflow/jobs/backfill_job.py
##########
@@ -755,8 +759,13 @@ def _execute(self, session=None):
 
         start_date = self.bf_start_date
 
-        # Get intervals between the start/end dates, which will turn into dag runs
-        run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date, align=True)
+        # Get DagRun schedule between the start/end dates, which will turn into dag runs.
+        dagrun_start_date = timezone.coerce_datetime(start_date)
+        if self.bf_end_date is None:
+            dagrun_end_date = pendulum.now(timezone.utc)
+        else:
+            dagrun_end_date = pendulum.instance(self.bf_end_date)

Review comment:
       What do you think about handling this inside `iter_dagrun_infos_between` (similar to what we had earlier in `get_run_dates`) vs not

##########
File path: airflow/models/dag.py
##########
@@ -579,7 +580,59 @@ def timetable(self) -> Timetable:
         type_name = type(interval).__name__
         raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.")
 
-    def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
+    def iter_dagrun_infos_between(
+        self,
+        earliest: Optional[pendulum.DateTime],
+        latest: pendulum.DateTime,
+        *,
+        align: bool = True,
+    ) -> Iterable[DagRunInfo]:
+        """Yield DagRunInfo using this DAG's timetable between given interval.
+
+        DagRunInfo instances yielded if their ``schedule_date`` is not earlier
+        than ``earliest``, nor later than ``latest``. The instances are ordered
+        by their ``schedule_date`` from earliest to latest.
+
+        If ``align`` is ``False``, the first run will happen immediately on
+        ``ealiest``, even if it does not fall on the logical timetable schedule.

Review comment:
       Also, an example here with the actual dates might make it easier to understand

##########
File path: airflow/models/taskinstance.py
##########
@@ -1530,21 +1532,25 @@ def get_template_context(self, session=None) -> Context:
         integrate_macros_plugins()
 
         params = {}  # type: Dict[str, Any]
-        run_id = ''
-        dag_run = None
-        if hasattr(task, 'dag'):
-            if task.dag.params:
-                params.update(task.dag.params)
-            from airflow.models.dagrun import DagRun  # Avoid circular import
-
-            dag_run = (
-                session.query(DagRun)
-                .filter_by(dag_id=task.dag.dag_id, execution_date=self.execution_date)
-                .first()
+        with contextlib.suppress(AttributeError):
+            params.update(task.dag.params)
+
+        dag_run = self.get_dagrun()

Review comment:
       ```suggestion
           dag_run = self.get_dagrun(session=session)
   ```

##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+from alembic import op
+from sqlalchemy import Column
+from sqlalchemy.dialects import mssql
+
+from airflow.utils.sqlalchemy import UtcDateTime
+
+# Revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "97cdd93827b8"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply data_interval fields to DagModel and DagRun."""
+    if op.get_bind().dialect.name == "mssql":
+        column_type = mssql.DATETIME2(precision=6)
+    else:
+        column_type = UtcDateTime

Review comment:
       https://github.com/apache/airflow/blob/c384f9b0f509bab704a70380465be18754800a52/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py#L48-L55
   
   Do we need something like the above to take care of MySQL too for precision to avoid issues like https://github.com/apache/airflow/pull/9336 . Worth testing this with MySQL

##########
File path: airflow/models/taskinstance.py
##########
@@ -1530,21 +1532,25 @@ def get_template_context(self, session=None) -> Context:
         integrate_macros_plugins()
 
         params = {}  # type: Dict[str, Any]
-        run_id = ''
-        dag_run = None
-        if hasattr(task, 'dag'):
-            if task.dag.params:
-                params.update(task.dag.params)
-            from airflow.models.dagrun import DagRun  # Avoid circular import
-
-            dag_run = (
-                session.query(DagRun)
-                .filter_by(dag_id=task.dag.dag_id, execution_date=self.execution_date)
-                .first()
+        with contextlib.suppress(AttributeError):
+            params.update(task.dag.params)
+
+        dag_run = self.get_dagrun()
+
+        # FIXME: Many tests don't create a DagRun. We should fix the tests.
+        if dag_run is None:
+            FakeDagRun = namedtuple(
+                "FakeDagRun",
+                # A minimal set of attributes to keep things working.
+                "conf data_interval_start data_interval_end external_trigger run_id",
+            )
+            dag_run = FakeDagRun(
+                conf=None,
+                data_interval_start=None,
+                data_interval_end=None,
+                external_trigger=False,
+                run_id="",

Review comment:
       ```suggestion
                   run_id=None,
   ```

##########
File path: airflow/models/taskinstance.py
##########
@@ -1670,37 +1667,97 @@ def get(
                 except AirflowNotFoundException:
                     return default_conn
 
+        def deprecated_proxy(func, *, key, replacement=None) -> lazy_object_proxy.Proxy:
+            def deprecated_func():
+                message = (
+                    f"Accessing {key!r} from the template is deprecated and "
+                    f"will be removed in a future version."
+                )
+                if replacement:
+                    message += f" Please use {replacement!r} instead."
+                warnings.warn(message, DeprecationWarning)
+                return func()
+
+            return lazy_object_proxy.Proxy(deprecated_func)
+
+        def get_previous_schedule_date() -> Optional[pendulum.DateTime]:
+            ti = self.get_previous_ti(None)

Review comment:
       ```suggestion
               ti = self.get_previous_ti(session=session)
   ```

##########
File path: airflow/timetables/interval.py
##########
@@ -43,6 +43,13 @@ def __eq__(self, other: Any) -> bool:
     def validate(self) -> None:
         self._schedule.validate()
 
+    def infer_data_interval(self, run_after: DateTime) -> Optional[DataInterval]:
+        # Get the last complete period before run_after, e.g. if a DAG run is
+        # scheduled at each midnight, the data interval of a manually trioggered
+        # run at 1am 25th is between 0am 24th and 0am 25th.

Review comment:
       Options:
   
   (1) Shouldn't the `data_interval_start_date` be 24th 1 am and `data_interval_end_date` be 25th 1am so `data_interval_end_date` = `schedule_data` / `run_date`. Why the offset of 1 hour? 

##########
File path: tests/core/test_core.py
##########
@@ -279,23 +281,16 @@ def test_task_get_template(self):
         assert context['ds'] == '2015-01-01'
         assert context['ds_nodash'] == '20150101'
 
-        # next_ds is 2015-01-02 as the dag interval is daily
+        # next_ds is 2015-01-02 as the dag schedule is daily.
         assert context['next_ds'] == '2015-01-02'
         assert context['next_ds_nodash'] == '20150102'
 
-        # prev_ds is 2014-12-31 as the dag interval is daily
-        assert context['prev_ds'] == '2014-12-31'
-        assert context['prev_ds_nodash'] == '20141231'
-
         assert context['ts'] == '2015-01-01T00:00:00+00:00'
         assert context['ts_nodash'] == '20150101T000000'
         assert context['ts_nodash_with_tz'] == '20150101T000000+0000'
 
-        assert context['yesterday_ds'] == '2014-12-31'
-        assert context['yesterday_ds_nodash'] == '20141231'
-
-        assert context['tomorrow_ds'] == '2015-01-02'
-        assert context['tomorrow_ds_nodash'] == '20150102'
+        assert context['data_interval_start'].isoformat() == '2015-01-01T00:00:00+00:00'
+        assert context['data_interval_end'].isoformat() == '2015-01-02T00:00:00+00:00'

Review comment:
       Let's add test for deprecated configs too

##########
File path: tests/models/test_dag.py
##########
@@ -2015,3 +2016,50 @@ def get_task_instance(session, task):
         assert dagrun.get_state() == State.QUEUED
 
     assert {t.key for t in altered} == {('test_set_task_instance_state', 'task_1', start_date, 1)}
+
+
+@pytest.mark.parametrize(
+    "start_date, expected_infos",
+    [
+        (
+            DEFAULT_DATE,
+            [DagRunInfo.interval(DEFAULT_DATE, DEFAULT_DATE + datetime.timedelta(hours=1))],
+        ),
+        (
+            DEFAULT_DATE - datetime.timedelta(hours=3),
+            [
+                DagRunInfo.interval(
+                    DEFAULT_DATE - datetime.timedelta(hours=3),
+                    DEFAULT_DATE - datetime.timedelta(hours=2),
+                ),
+                DagRunInfo.interval(
+                    DEFAULT_DATE - datetime.timedelta(hours=2),
+                    DEFAULT_DATE - datetime.timedelta(hours=1),
+                ),
+                DagRunInfo.interval(
+                    DEFAULT_DATE - datetime.timedelta(hours=1),
+                    DEFAULT_DATE,
+                ),
+                DagRunInfo.interval(
+                    DEFAULT_DATE,
+                    DEFAULT_DATE + datetime.timedelta(hours=1),
+                ),
+            ],
+        ),
+    ],
+    ids=["in-dag-restriction", "out-of-dag-restriction"],
+)
+def test_iter_dagrun_infos_between(start_date, expected_infos):
+    dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE, schedule_interval="@hourly")
+    DummyOperator(
+        task_id='dummy',
+        dag=dag,
+        owner='airflow',
+    )

Review comment:
       ```suggestion
       DummyOperator(task_id='dummy', dag=dag)
   ```
   
   for simplicity -- we don't need owner field

##########
File path: airflow/models/taskinstance.py
##########
@@ -1670,37 +1667,97 @@ def get(
                 except AirflowNotFoundException:
                     return default_conn
 
+        def deprecated_proxy(func, *, key, replacement=None) -> lazy_object_proxy.Proxy:
+            def deprecated_func():
+                message = (
+                    f"Accessing {key!r} from the template is deprecated and "
+                    f"will be removed in a future version."
+                )
+                if replacement:
+                    message += f" Please use {replacement!r} instead."
+                warnings.warn(message, DeprecationWarning)
+                return func()
+
+            return lazy_object_proxy.Proxy(deprecated_func)
+
+        def get_previous_schedule_date() -> Optional[pendulum.DateTime]:
+            ti = self.get_previous_ti(None)
+            if ti is None:
+                return None
+            return timezone.coerce_datetime(ti.execution_date)
+
+        def get_prev_execution_date():
+            if dag_run.external_trigger:
+                dt = self.execution_date
+            else:
+                dt = task.dag.previous_schedule(self.execution_date)
+            return timezone.coerce_datetime(dt)

Review comment:
       Should we suppress deprecation warnings from the following otherwise User will see the deprecation warning but will have no control or can change nothing to change it as the context dict is passed for each operator?
   
   https://github.com/apache/airflow/blob/47908878a96775db49eac49339926c0ccff1e3be/airflow/models/taskinstance.py#L812-L819




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678403607



##########
File path: airflow/models/dag.py
##########
@@ -596,23 +697,17 @@ def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
         :return: A list of dates within the interval following the dag's schedule.
         :rtype: list
         """
-        if start_date is None:
-            start = self._time_restriction.earliest
-        else:
-            start = pendulum.instance(start_date)
+        warnings.warn(
+            "`DAG.get_run_dates()` is deprecated. " "Please use `DAG.iter_dagrun_infos_between()` instead.",

Review comment:
       Black 🤦 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678828040



##########
File path: airflow/models/dag.py
##########
@@ -112,6 +113,51 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class _NextDagRunInfoLegacy(NamedTuple):
+    """Legacy return format for ``DAG.next_dagrun_info()``.
+
+    In the pre-AIP-39 implementation, ``DAG.next_dagrun_info()`` returns a
+    2-tuple ``(execution_date, run_after)``.
+    """
+
+    execution_date: Optional[pendulum.DateTime]
+    run_after: Optional[pendulum.DateTime]
+
+
+class _NextDagRunInfoCompat(_NextDagRunInfoLegacy):

Review comment:
       If it’s hard to describe, it’s probably a bad idea 🙂 I’m removing this altogether, we only need to fix a few tests anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r681400265



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -242,21 +243,29 @@ def post_dag_run(dag_id, session):
     except ValidationError as err:
         raise BadRequest(detail=str(err))
 
+    execution_date = post_body["execution_date"]
+    run_id = post_body["run_id"]
     dagrun_instance = (
         session.query(DagRun)
         .filter(
             DagRun.dag_id == dag_id,
-            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == post_body["execution_date"]),
+            or_(DagRun.run_id == run_id, DagRun.execution_date == execution_date),
         )
         .first()
     )
     if not dagrun_instance:
-        dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, **post_body)
-        session.add(dag_run)
-        session.commit()
+        dag_run = current_app.dag_bag.get_dag(dag_id).create_dagrun(
+            run_type=DagRunType.MANUAL,
+            run_id=run_id,
+            execution_date=execution_date,
+            state=State.QUEUED,
+            conf=post_body.get("conf"),
+            external_trigger=True,
+            dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
+        )

Review comment:
       `current_app.dag_bag.get_dag(dag_id)` returns a `DAG`, not a `DagModel`. only `DAG` can call `create_dagrun` (yes it’s a bit confusing, it took a while for me quite a while to sort these out).
   
   The new implementation matches more closely to how the Trigger Run web UI is doing.
   
   https://github.com/apache/airflow/blob/ff75cbcac9ec0b1992b4fddd6c160901f23e0c2a/airflow/www/views.py#L1570-L1621




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-897602865


   Failing on all Python 3.8 cases  though?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r649340278



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+import alembic
+import sqlalchemy
+
+from airflow.models import DagModel, DagRun
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.utils.session import create_session
+from airflow.utils.sqlalchemy import UtcDateTime
+from airflow.utils.timezone import utcnow
+
+# revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "e9304a3141f0"
+branch_labels = None
+depends_on = None
+
+
+# None and "@once" schedule intervals don't have a data interval and can be
+# bulk-updated with one SQL call, so we do them separately. Other "real"
+# schedule intervals are too complicated and need to be populated manually.
+NO_SCHEDULE_FILTER = DagModel.schedule_interval.in_([None, "@once"])
+
+
+def _populate_simple_dagrun_intervals(session):
+    """Handle DAG runs with simple schedule intervals."""
+    updates = {
+        DagRun.data_interval_start: DagRun.execution_date,
+        DagRun.data_interval_end: DagRun.execution_date,
+    }
+    joined = session.query(DagRun).join(DagModel, DagModel.dag_id == DagRun.dag_id)
+    joined.filter(NO_SCHEDULE_FILTER).update(updates)
+
+
+def _populate_complex_dagrun_intervals(session):
+    """Handle DAG runs with "real" schedule intervals."""
+    joined = session.query(DagRun, SerializedDagModel).join(
+        SerializedDagModel,
+        DagRun.dag_id == SerializedDagModel.dag_id,
+    )
+    for dag_run, serialized in joined.filter(~NO_SCHEDULE_FILTER):
+        dag = serialized.dag
+        data_interval_start = dag_run.execution_date
+        dag_run.data_interval_start = data_interval_start
+        dag_run.data_interval_end = dag.following_schedule(data_interval_start)

Review comment:
       Hmm, I just tried that, but… 
   
   https://github.com/apache/airflow/blob/6e9e56246b216a43eabb050c5b220f3665de6305/airflow/models/dag.py#L481-L491
   
   When calculating the data interval’s end (i.e. `next_execution_date`), we need to “fix” DST, which needs to know what timezone the DAG is in. But that information is only available in `DAG` (by inspecting the original `DAG.start_date` input) 😢




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-860749604


   Is it possible to supply two API schemas for the endpoint? I want to allow both supplying `execution_date` *or* `data_interval_[start|end]`, but not both. And is there a way to somehow mark the `execution_date` schema as deprecated?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678836464



##########
File path: airflow/www/views.py
##########
@@ -1537,11 +1537,12 @@ def trigger(self, session=None):
                     'airflow/trigger.html', dag_id=dag_id, origin=origin, conf=request_conf, form=form
                 )
 
-        dag = current_app.dag_bag.get_dag(dag_id)
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
         dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=execution_date,
-            state=State.QUEUED,
+            data_interval=dag.timetable.infer_data_interval(execution_date),
+            state=State.RUNNING,

Review comment:
       Indeed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-897340148


   Doesn’t repro locally. Let’s try again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr closed pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr closed pull request #16352:
URL: https://github.com/apache/airflow/pull/16352


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r658017437



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+import json
+
+from alembic import op
+from sqlalchemy import Column, Integer, String, or_
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import foreign, relationship
+from sqlalchemy_jsonfield import JSONField
+
+from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.utils.session import create_session
+from airflow.utils.sqlalchemy import Interval, UtcDateTime
+from airflow.utils.types import DagRunType
+
+# Revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "30867afad44a"
+branch_labels = None
+depends_on = None
+
+ID_LEN = 250
+
+Base = declarative_base()
+
+
+class DagModel(Base):
+    """A partially frozen ``airflow.models.DagModel`` class."""
+
+    __tablename__ = "dag"
+
+    dag_id = Column(String(ID_LEN), primary_key=True)
+    schedule_interval = Column(Interval)
+
+
+class DagRun(Base):
+    """A partially frozen ``airflow.models.DagRun`` class."""
+
+    __tablename__ = "dag_run"
+
+    id = Column(Integer, primary_key=True)
+    dag_id = Column(String(ID_LEN))
+    execution_date = Column(UtcDateTime)
+    data_interval_start = Column(UtcDateTime)
+    data_interval_end = Column(UtcDateTime)
+    run_id = Column(String(ID_LEN))
+    run_type = Column(String(50))
+
+    dag = relationship(DagModel, primaryjoin=(foreign(DagModel.dag_id) == dag_id))
+
+
+class SerializedDagModel(Base):
+    """A partially frozen ``airflow.models.SerializedDagModel`` class."""
+
+    __tablename__ = "serialized_dag"
+
+    dag_id = Column(String(ID_LEN), primary_key=True)
+    data = Column(JSONField(json=json))
+
+    @property
+    def dag(self):
+        """Copied from the original model class."""
+        SerializedDAG._load_operator_extra_links = self.load_op_links  # pylint: disable=protected-access
+        if isinstance(self.data, dict):
+            return SerializedDAG.from_dict(self.data)
+        return SerializedDAG.from_json(self.data)
+
+
+# These kinds of runs don't have a data interval and can be bulk-updated with
+# one SQL call, so we do them separately. Other "real" schedule intervals are
+# too complicated and need to be populated manually.
+NO_SCHEDULE_FILTER = or_(
+    DagModel.schedule_interval.in_([None, "@once"]),
+    DagRun.run_type == DagRunType.MANUAL,
+)
+
+
+def _populate_simple_dagrun_intervals(session):
+    """Handle DAG runs with simple schedule intervals."""
+    updates = {
+        DagRun.data_interval_start: DagRun.execution_date,
+        DagRun.data_interval_end: DagRun.execution_date,
+    }
+    # SQLite doesn't support UPDATE ... WHERE with multiple tables, and MySQL
+    # doesn't support subqeury in UPDATE, so...
+    if op.get_bind().dialect.name != 'sqlite':
+        dag_runs = session.query(DagRun).filter(DagRun.dag, NO_SCHEDULE_FILTER)
+    else:
+        dag_run_ids = session.query(DagRun.id).filter(NO_SCHEDULE_FILTER)
+        dag_runs = session.query(DagRun).filter(DagRun.id.in_(dag_run_ids.subquery()))
+    dag_runs.update(updates, synchronize_session=False)
+
+
+def _populate_complex_dagrun_intervals(session):
+    """Handle DAG runs with "real" schedule intervals."""
+    dag_runs_with_serialized = session.query(DagRun, SerializedDagModel).filter(
+        DagModel.dag_id == DagRun.dag_id,
+        SerializedDagModel.dag_id == DagRun.dag_id,
+        ~NO_SCHEDULE_FILTER,
+    )
+    for dag_run, serialized in dag_runs_with_serialized:
+        dag = serialized.dag
+        data_interval_start = dag_run.execution_date
+        dag_run.data_interval_start = data_interval_start
+        dag_run.data_interval_end = dag.following_schedule(data_interval_start)
+        session.merge(dag_run, load=False)
+
+
+def upgrade():
+    """Apply add data_interval_[start|end] to DagRun."""
+    # Create aolumns with NULL as default.
+    with op.batch_alter_table("dag_run") as batch_op:
+        batch_op.add_column(Column("data_interval_start", UtcDateTime))

Review comment:
       Hmm CI for MSSQL is stuck during this migration so I likely am doing things wrong.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678404313



##########
File path: airflow/models/dag.py
##########
@@ -2403,6 +2507,24 @@ def __init__(self, concurrency=None, **kwargs):
     def __repr__(self):
         return f"<DAG: {self.dag_id}>"
 
+    @property
+    def data_interval(self) -> Optional[Tuple[datetime, datetime]]:
+        if self.next_dagrun_data_interval_start is None:
+            if self.next_dagrun_data_interval_end is not None:
+                raise AirflowException(
+                    "Inconsistent DagModel: next_dagrun_data_interval_start and "
+                    "next_dagrun_data_interval_end must be either both None or both datetime"
+                )
+            return None
+        return (self.next_dagrun_data_interval_start, self.next_dagrun_data_interval_end)

Review comment:
       Yes, this should be `next_data_interval` instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678836464



##########
File path: airflow/www/views.py
##########
@@ -1537,11 +1537,12 @@ def trigger(self, session=None):
                     'airflow/trigger.html', dag_id=dag_id, origin=origin, conf=request_conf, form=form
                 )
 
-        dag = current_app.dag_bag.get_dag(dag_id)
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
         dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=execution_date,
-            state=State.QUEUED,
+            data_interval=dag.timetable.infer_data_interval(execution_date),
+            state=State.RUNNING,

Review comment:
       Indeed. Nice catch, thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679746669



##########
File path: airflow/models/taskinstance.py
##########
@@ -1530,21 +1532,25 @@ def get_template_context(self, session=None) -> Context:
         integrate_macros_plugins()
 
         params = {}  # type: Dict[str, Any]
-        run_id = ''
-        dag_run = None
-        if hasattr(task, 'dag'):
-            if task.dag.params:
-                params.update(task.dag.params)
-            from airflow.models.dagrun import DagRun  # Avoid circular import
-
-            dag_run = (
-                session.query(DagRun)
-                .filter_by(dag_id=task.dag.dag_id, execution_date=self.execution_date)
-                .first()
+        with contextlib.suppress(AttributeError):
+            params.update(task.dag.params)
+
+        dag_run = self.get_dagrun()
+
+        # FIXME: Many tests don't create a DagRun. We should fix the tests.
+        if dag_run is None:
+            FakeDagRun = namedtuple(
+                "FakeDagRun",
+                # A minimal set of attributes to keep things working.
+                "conf data_interval_start data_interval_end external_trigger run_id",
+            )
+            dag_run = FakeDagRun(
+                conf=None,
+                data_interval_start=None,
+                data_interval_end=None,
+                external_trigger=False,
+                run_id="",

Review comment:
       Sticking with an empty string for now; we can always change later if None is more advantageous.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678836464



##########
File path: airflow/www/views.py
##########
@@ -1537,11 +1537,12 @@ def trigger(self, session=None):
                     'airflow/trigger.html', dag_id=dag_id, origin=origin, conf=request_conf, form=form
                 )
 
-        dag = current_app.dag_bag.get_dag(dag_id)
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
         dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=execution_date,
-            state=State.QUEUED,
+            data_interval=dag.timetable.infer_data_interval(execution_date),
+            state=State.RUNNING,

Review comment:
       Indeed. Thanks for catching this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679734779



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+from alembic import op
+from sqlalchemy import Column
+from sqlalchemy.dialects import mssql
+
+from airflow.utils.sqlalchemy import UtcDateTime
+
+# Revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "97cdd93827b8"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply data_interval fields to DagModel and DagRun."""
+    if op.get_bind().dialect.name == "mssql":
+        column_type = mssql.DATETIME2(precision=6)
+    else:
+        column_type = UtcDateTime

Review comment:
       No idea, I’m doing it 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-897654921


   Yeah they are failing on `main` as well, I think we recently changed something that caused a SQL incompatibility in the Python 3.6 + MySQL combination. I plan to take a look on Friday or next week if someone doesn’t do it before me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r649235374



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+import alembic
+import sqlalchemy
+
+from airflow.models import DagModel, DagRun
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.utils.session import create_session
+from airflow.utils.sqlalchemy import UtcDateTime
+from airflow.utils.timezone import utcnow
+
+# revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "e9304a3141f0"
+branch_labels = None
+depends_on = None
+
+
+# None and "@once" schedule intervals don't have a data interval and can be
+# bulk-updated with one SQL call, so we do them separately. Other "real"
+# schedule intervals are too complicated and need to be populated manually.
+NO_SCHEDULE_FILTER = DagModel.schedule_interval.in_([None, "@once"])
+
+
+def _populate_simple_dagrun_intervals(session):
+    """Handle DAG runs with simple schedule intervals."""
+    updates = {
+        DagRun.data_interval_start: DagRun.execution_date,
+        DagRun.data_interval_end: DagRun.execution_date,
+    }
+    joined = session.query(DagRun).join(DagModel, DagModel.dag_id == DagRun.dag_id)
+    joined.filter(NO_SCHEDULE_FILTER).update(updates)

Review comment:
       Add `synchronize_session=False` -- we don't need to update objects in the session here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-888857967


   Are there existing tests on manual runs? I know there are tests on scheduled and backfill runs (I modified some of those in this PR), but not manual ones. Or should I just add unit tests on `infer_data_interval`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-897360425


   Hmm, it’s still failing, but now for a different version-db combination. I’m going to say it’s flaky and not related…


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r660292671



##########
File path: airflow/models/dag.py
##########
@@ -1812,19 +1815,40 @@ def create_dagrun(
             if not isinstance(run_id, str):
                 raise ValueError(f"`run_id` expected to be a str is {type(run_id)}")
             run_type: DagRunType = DagRunType.from_run_id(run_id)
-        elif run_type and execution_date:
+
+        # The preferred signature *requires* the data_interval argument. The
+        # legacy form of accepting an optional execution_date (and disallowing
+        # data_interval) is deprecated but accepted for compatibility.
+        if data_interval is None:
+            warnings.warn(
+                "Creating a DagRun without data_interval is deprecated.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            if execution_date is None:
+                start = timezone.utcnow()
+            else:
+                start = execution_date
+            if run_type == DagRunType.MANUAL:
+                data_interval = (start, start)
+            else:
+                data_interval = (start, self.following_schedule(start))

Review comment:
       Now it is, so I’ll change this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-858663276


   >  I think (hope) this field is never updated after a run is created?
   
   It is not (not by Airflow anyway 😁)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r683389711



##########
File path: docs/apache-airflow/macros-ref.rst
##########
@@ -35,28 +35,23 @@ in all templates
 =====================================   ====================================
 Variable                                Description
 =====================================   ====================================
-``{{ ds }}``                            the execution date as ``YYYY-MM-DD``
-``{{ ds_nodash }}``                     the execution date as ``YYYYMMDD``
-``{{ prev_ds }}``                       the previous execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ prev_ds }}`` will be ``2018-01-01``
-``{{ prev_ds_nodash }}``                the previous execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ next_ds }}``                       the next execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-01`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ next_ds }}`` will be ``2018-01-08``
-``{{ next_ds_nodash }}``                the next execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ yesterday_ds }}``                  the day before the execution date as ``YYYY-MM-DD``
-``{{ yesterday_ds_nodash }}``           the day before the execution date as ``YYYYMMDD``
-``{{ tomorrow_ds }}``                   the day after the execution date as ``YYYY-MM-DD``
-``{{ tomorrow_ds_nodash }}``            the day after the execution date as ``YYYYMMDD``
-``{{ ts }}``                            same as ``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
+``{{ logical_date }}``                  the logical date of the DAG run (`pendulum.Pendulum`_)
+``{{ ds }}``                            the logical date as ``YYYY-MM-DD``

Review comment:
       Hmmmmmm, _Probably_, but would make it breaking change :(
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679619000



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -755,8 +759,13 @@ def _execute(self, session=None):
 
         start_date = self.bf_start_date
 
-        # Get intervals between the start/end dates, which will turn into dag runs
-        run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date, align=True)
+        # Get DagRun schedule between the start/end dates, which will turn into dag runs.
+        dagrun_start_date = timezone.coerce_datetime(start_date)
+        if self.bf_end_date is None:
+            dagrun_end_date = pendulum.now(timezone.utc)
+        else:
+            dagrun_end_date = pendulum.instance(self.bf_end_date)

Review comment:
       Personally I dislike the pattern “if end if None, default to now” since it’s too implicit, and users (other contributors of Airflow) can accidentally forget to handle a possibly-None input and cause subtle bugs. For example, say there’s a view to return scheduled run dates of a DAG:
   
   ```python
   # If the user does not specify, defaults to the DAG’s start_date.
   start_date = request.values.get("start_date")
   
   # If the user does not specify, defaults to the DAG’s end_date.
   end_date = request.values.get("end_date")
   
   # Oops I forgot to actually check if the user-supplied start_date and end_date is None.
   
   # Now the runs returned are wrong.
   runs = list(dag.iter_dagrun_infos_between(start_date, end_date))
   ```
   
   This can manage to slip through unit tests (if you forgot to do this in code, chances are you wouldn’t remember to test it) and cause either bugs or (worse) incorrect user expectations we need to support for backward compatibility onwards (Hyrum’s law).
   
   Making `iter_dagrun_infos_between` accept only datetimes forces the caller to think it through when exactly the iterator should end, which is a good thing since this can potentially run for a *long* time if `earliest` is a long time in the past.
   
   ```python
   # Mypy now raises an error here and I’m reminded of the None case.
   runs = list(dag.iter_dagrun_infos_between(start_date, end_date))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-861270280


   In JSON Schema (but I don't know about OpenAPI) you can do "one of" https://json-schema.org/understanding-json-schema/reference/combining.html#oneof


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-897654921


   Yeah they are failing on `main` as well, I think we recently changed something that caused a SQL incompatibility in the Python 3.8 + MySQL combination. I plan to take a look on Friday or next week if someone doesn’t do it before me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-897326965


   `TestSchedulerJob.test_verify_integrity_if_dag_not_changed` Huh that’s new, need to investigate a bit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679613989



##########
File path: airflow/models/dag.py
##########
@@ -596,23 +649,17 @@ def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
         :return: A list of dates within the interval following the dag's schedule.

Review comment:
       Thanks for catching this!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-891859321


   Note: I’ve changed `schedule_date` in this PR to `logical_date` as I mentioned on the dev mailing list: https://lists.apache.org/thread.html/r00d88d22ae15f9c98b91c8f28163c09e59fb4a6436586861c69a66b7%40%3Cdev.airflow.apache.org%3E
   
   @eladkal said he doesn’t particularly like the name and plans to share some thoughts on the mailing list. But I’m going to use `logical_date` for now since we need to use _something_ (AIP-39 implementation would be greatly delayed if we block this PR until the name issue is fully resolved), and `logical_date` is already better than `schedule_date`. We can always mass-replace all the occurences (like I did in the rename commit) when we settle on something else 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679619712



##########
File path: airflow/models/taskinstance.py
##########
@@ -1530,21 +1532,25 @@ def get_template_context(self, session=None) -> Context:
         integrate_macros_plugins()
 
         params = {}  # type: Dict[str, Any]
-        run_id = ''
-        dag_run = None
-        if hasattr(task, 'dag'):
-            if task.dag.params:
-                params.update(task.dag.params)
-            from airflow.models.dagrun import DagRun  # Avoid circular import
-
-            dag_run = (
-                session.query(DagRun)
-                .filter_by(dag_id=task.dag.dag_id, execution_date=self.execution_date)
-                .first()
+        with contextlib.suppress(AttributeError):
+            params.update(task.dag.params)
+
+        dag_run = self.get_dagrun()
+
+        # FIXME: Many tests don't create a DagRun. We should fix the tests.
+        if dag_run is None:
+            FakeDagRun = namedtuple(
+                "FakeDagRun",
+                # A minimal set of attributes to keep things working.
+                "conf data_interval_start data_interval_end external_trigger run_id",
+            )
+            dag_run = FakeDagRun(
+                conf=None,
+                data_interval_start=None,
+                data_interval_end=None,
+                external_trigger=False,
+                run_id="",

Review comment:
       What’s the advantage of None here? IIUC a “real” DagRun cannot have `run_id=None` so I’m using consistent type here in case there’s some function call expecting `str` (of course `run_id` cannot be empty either, but at least the type is correct).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-897010527


   Some minor conflicts @uranusjr 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr merged pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr merged pull request #16352:
URL: https://github.com/apache/airflow/pull/16352


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r657823002



##########
File path: airflow/models/dag.py
##########
@@ -1812,19 +1815,40 @@ def create_dagrun(
             if not isinstance(run_id, str):
                 raise ValueError(f"`run_id` expected to be a str is {type(run_id)}")
             run_type: DagRunType = DagRunType.from_run_id(run_id)
-        elif run_type and execution_date:
+
+        # The preferred signature *requires* the data_interval argument. The
+        # legacy form of accepting an optional execution_date (and disallowing
+        # data_interval) is deprecated but accepted for compatibility.
+        if data_interval is None:
+            warnings.warn(
+                "Creating a DagRun without data_interval is deprecated.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            if execution_date is None:
+                start = timezone.utcnow()
+            else:
+                start = execution_date
+            if run_type == DagRunType.MANUAL:
+                data_interval = (start, start)
+            else:
+                data_interval = (start, self.following_schedule(start))
+        elif execution_date is not None:
+            raise TypeError("cannot set data_interval and execution_date together")
+
+        if not run_id:
+            if not run_type or not data_interval:
+                raise AirflowException(
+                    "Creating DagRun needs either `run_id` or both `run_type` and `data_interval`"
+                )
             if not isinstance(run_type, DagRunType):
                 raise ValueError(f"`run_type` expected to be a DagRunType is {type(run_type)}")
-            run_id = DagRun.generate_run_id(run_type, execution_date)
-        elif not run_id:
-            raise AirflowException(
-                "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
-            )
+            run_id = DagRun.generate_run_id(run_type, data_interval[0])
 
         run = DagRun(
             dag_id=self.dag_id,
             run_id=run_id,
-            execution_date=execution_date,

Review comment:
       We should, I just haven’t reached that part yet.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r657801655



##########
File path: airflow/models/dagrun.py
##########
@@ -123,10 +127,47 @@ def __init__(
         run_type: Optional[str] = None,
         dag_hash: Optional[str] = None,
         creating_job_id: Optional[int] = None,
+        data_interval: Optional[Tuple[datetime, datetime]] = None,
     ):
+        # The preferred signature *requires* the data_interval argument. The
+        # legacy form of accepting an optional execution_date (and disallowing
+        # data_interval) is deprecated but accepted for compatibility.
+        if data_interval is None:
+            warnings.warn(
+                "Creating a DagRun without data_interval is deprecated.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            if execution_date is None:
+                execution_date = timezone.utcnow()
+            self.data_interval_start = execution_date
+            if run_type == DagRunType.MANUAL:
+                self.data_interval_end = execution_date
+            else:
+                # This is terribly inefficient, but the caller should fix this :)
+                # The local import is necessary to avoid circular reference.
+                from airflow.models.serialized_dag import SerializedDagModel
+
+                with create_session() as session:
+                    serialized = session.query(SerializedDagModel)
+                    dag = serialized.filter(SerializedDagModel.dag_id == dag_id).first().dag

Review comment:
       ```suggestion
                       dag = serialized.filter(SerializedDagModel.dag_id == dag_id).one().dag
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r683364465



##########
File path: docs/apache-airflow/macros-ref.rst
##########
@@ -83,6 +78,27 @@ Variable                                Description
                                         the CLI's test subcommand
 =====================================   ====================================
 
+The following variables are deprecated. They are kept for backward compatibility,
+but you should convert existing code to use other variables instead.
+
+=====================================   ====================================
+Deprecated Variable                     Description
+=====================================   ====================================
+``{{ execution_date }}``                the execution date (logical date), same as ``logical_date``
+``{{ prev_ds }}``                       the previous execution date as ``YYYY-MM-DD``
+                                        if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``,
+                                        ``{{ prev_ds }}`` will be ``2018-01-01``
+``{{ prev_ds_nodash }}``                the previous execution date as ``YYYYMMDD`` if exists, else ``None``
+``{{ yesterday_ds }}``                  the day before the execution date as ``YYYY-MM-DD``
+``{{ yesterday_ds_nodash }}``           the day before the execution date as ``YYYYMMDD``
+``{{ tomorrow_ds }}``                   the day after the execution date as ``YYYY-MM-DD``
+``{{ tomorrow_ds_nodash }}``            the day after the execution date as ``YYYYMMDD``
+``{{ prev_execution_date }}``           the previous execution date (if available) (`pendulum.Pendulum`_)
+``{{ prev_execution_date_success }}``   execution date from prior successful dag run,
+                                        same as ``prev_logical_date_success``
+``{{ next_execution_date }}``           the next execution date, same as ``next_logical_date``

Review comment:
       ```suggestion
   ``{{ next_execution_date }}``           the next execution date, same as ``data_interval_end``
   ```
   
   (Yes, it would be the same, but I think we should point people to use the explicit data interval properties)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-888915907


   Turns out I already indirectly test the `data_interval` fields are populated in `test_task_get_template` (this tests the template context is correctly populated; and since that is populated from a DagRun instance, it means the data interval is correctly written).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678343708



##########
File path: airflow/models/dag.py
##########
@@ -112,6 +113,51 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class _NextDagRunInfoLegacy(NamedTuple):
+    """Legacy return format for ``DAG.next_dagrun_info()``.
+
+    In the pre-AIP-39 implementation, ``DAG.next_dagrun_info()`` returns a
+    2-tuple ``(execution_date, run_after)``.
+    """
+
+    execution_date: Optional[pendulum.DateTime]
+    run_after: Optional[pendulum.DateTime]
+
+
+class _NextDagRunInfoCompat(_NextDagRunInfoLegacy):

Review comment:
       We don't need to maintain compat here (though doing so isn't all that hard) but I think we should give this a better/non-compat-or-private name.
   
   I don't have any brilliant ideas for a name though.

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -242,16 +242,23 @@ def post_dag_run(dag_id, session):
     except ValidationError as err:
         raise BadRequest(detail=str(err))
 
+    execution_date = post_body["execution_date"]
     dagrun_instance = (
         session.query(DagRun)
         .filter(
             DagRun.dag_id == dag_id,
-            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == post_body["execution_date"]),
+            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == execution_date),
         )
         .first()
     )
     if not dagrun_instance:
-        dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, **post_body)
+        dag = current_app.dag_bag.get_dag(dag_id)
+        dag_run = DagRun(
+            dag_id=dag_id,
+            run_type=DagRunType.MANUAL,
+            data_interval=dag.timetable.infer_data_interval(execution_date),
+            **post_body,
+        )

Review comment:
       Is there a reason (other than we didn't before) that we don't use `dag.create_run()`?

##########
File path: airflow/models/dag.py
##########
@@ -2372,8 +2471,13 @@ class DagModel(Base):
 
     has_task_concurrency_limits = Column(Boolean, nullable=False)
 
-    # The execution_date of the next dag run
-    next_dagrun = Column(UtcDateTime)
+    # Kept for backwards compatibility. New code should use data_interval columns.

Review comment:
       I think we will still need this -- it is the execution_date/schedule_date of the new DagRun to create -- which may be different to interval start or end.
   
   ```suggestion
   ```

##########
File path: airflow/models/dag.py
##########
@@ -596,23 +697,17 @@ def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
         :return: A list of dates within the interval following the dag's schedule.
         :rtype: list
         """
-        if start_date is None:
-            start = self._time_restriction.earliest
-        else:
-            start = pendulum.instance(start_date)
+        warnings.warn(
+            "`DAG.get_run_dates()` is deprecated. " "Please use `DAG.iter_dagrun_infos_between()` instead.",

Review comment:
       ```suggestion
               "`DAG.get_run_dates()` is deprecated. Please use `DAG.iter_dagrun_infos_between()` instead.",
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2403,6 +2507,24 @@ def __init__(self, concurrency=None, **kwargs):
     def __repr__(self):
         return f"<DAG: {self.dag_id}>"
 
+    @property
+    def data_interval(self) -> Optional[Tuple[datetime, datetime]]:
+        if self.next_dagrun_data_interval_start is None:
+            if self.next_dagrun_data_interval_end is not None:
+                raise AirflowException(
+                    "Inconsistent DagModel: next_dagrun_data_interval_start and "
+                    "next_dagrun_data_interval_end must be either both None or both datetime"
+                )
+            return None
+        return (self.next_dagrun_data_interval_start, self.next_dagrun_data_interval_end)

Review comment:
       This seems... odd. What does `dag.data_interval` _mean_? I would expect this at least to be called `next_data_interval` based on the implementation.

##########
File path: airflow/www/views.py
##########
@@ -1537,11 +1537,12 @@ def trigger(self, session=None):
                     'airflow/trigger.html', dag_id=dag_id, origin=origin, conf=request_conf, form=form
                 )
 
-        dag = current_app.dag_bag.get_dag(dag_id)
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
         dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=execution_date,
-            state=State.QUEUED,
+            data_interval=dag.timetable.infer_data_interval(execution_date),
+            state=State.RUNNING,

Review comment:
       ```suggestion
               state=State.QUEUED,
   ```
   
   Bad conflict resolve?

##########
File path: airflow/timetables/interval.py
##########
@@ -43,6 +43,13 @@ def __eq__(self, other: Any) -> bool:
     def validate(self) -> None:
         self._schedule.validate()
 
+    def infer_data_interval(self, run_after: DateTime) -> Optional[DataInterval]:
+        # Get the last complete period before run_after, e.g. if a DAG run is
+        # scheduled at each midnight, the data interval of a manually trioggered
+        # run at 1am 25th is between 0am 24th and 0am 25th.

Review comment:
       (Without thinking about this in much detail) I would have expected such a manually triggered run to be from midnight 25th to 1am 25th.
   
   But maybe this way does make more sense. Data intervals aren't (currently) well defined in the case of manual triggers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r683362607



##########
File path: airflow/www/views.py
##########
@@ -1602,7 +1602,7 @@ def trigger(self, session=None):
                     'airflow/trigger.html', dag_id=dag_id, origin=origin, conf=request_conf, form=form
                 )
 
-        dag = current_app.dag_bag.get_dag(dag_id)
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)

Review comment:
       ```suggestion
           dag = current_app.dag_bag.get_dag(dag_id)
   ```
   
   Revert this as we don't otherwise touch this file.

##########
File path: docs/apache-airflow/macros-ref.rst
##########
@@ -35,28 +35,23 @@ in all templates
 =====================================   ====================================
 Variable                                Description
 =====================================   ====================================
-``{{ ds }}``                            the execution date as ``YYYY-MM-DD``
-``{{ ds_nodash }}``                     the execution date as ``YYYYMMDD``
-``{{ prev_ds }}``                       the previous execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ prev_ds }}`` will be ``2018-01-01``
-``{{ prev_ds_nodash }}``                the previous execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ next_ds }}``                       the next execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-01`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ next_ds }}`` will be ``2018-01-08``
-``{{ next_ds_nodash }}``                the next execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ yesterday_ds }}``                  the day before the execution date as ``YYYY-MM-DD``
-``{{ yesterday_ds_nodash }}``           the day before the execution date as ``YYYYMMDD``
-``{{ tomorrow_ds }}``                   the day after the execution date as ``YYYY-MM-DD``
-``{{ tomorrow_ds_nodash }}``            the day after the execution date as ``YYYYMMDD``
-``{{ ts }}``                            same as ``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
+``{{ logical_date }}``                  the logical date of the DAG run (`pendulum.Pendulum`_)
+``{{ ds }}``                            the logical date as ``YYYY-MM-DD``
+``{{ ds_nodash }}``                     the logical date as ``YYYYMMDD``
+``{{ ts }}``                            same as ``schedule_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
 ``{{ ts_nodash }}``                     same as ``ts`` without ``-``, ``:`` and TimeZone info. Example: ``20180101T000000``
 ``{{ ts_nodash_with_tz }}``             same as ``ts`` without ``-`` and ``:``. Example: ``20180101T000000+0000``
-``{{ execution_date }}``                the execution_date (logical date) (`pendulum.Pendulum`_)
-``{{ prev_execution_date }}``           the previous execution date (if available) (`pendulum.Pendulum`_)
-``{{ prev_execution_date_success }}``   execution date from prior successful dag run (if available) (`pendulum.Pendulum`_)
+``{{ next_logical_date }}``             the next logical date if exists (`pendulum.Pendulum`_ or ``None``)
+``{{ next_ds }}``                       the next logical date as ``YYYY-MM-DD``
+                                        if ``{{ ds }}`` is ``2018-01-01`` and ``schedule_interval`` is ``@weekly``,
+                                        ``{{ next_ds }}`` will be ``2018-01-08``
+``{{ next_ds_nodash }}``                the next schedule date as ``YYYYMMDD`` if exists, else ``None``

Review comment:
       ```suggestion
   ``{{ next_ds_nodash }}``                the next logical date as ``YYYYMMDD`` if exists, else ``None``
   ```

##########
File path: docs/apache-airflow/macros-ref.rst
##########
@@ -83,6 +78,27 @@ Variable                                Description
                                         the CLI's test subcommand
 =====================================   ====================================
 
+The following variables are deprecated. They are kept for backward compatibility,
+but you should convert existing code to use other variables instead.
+
+=====================================   ====================================
+Deprecated Variable                     Description
+=====================================   ====================================
+``{{ execution_date }}``                the execution date (logical date), same as ``logical_date``
+``{{ prev_ds }}``                       the previous execution date as ``YYYY-MM-DD``
+                                        if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``,
+                                        ``{{ prev_ds }}`` will be ``2018-01-01``
+``{{ prev_ds_nodash }}``                the previous execution date as ``YYYYMMDD`` if exists, else ``None``
+``{{ yesterday_ds }}``                  the day before the execution date as ``YYYY-MM-DD``
+``{{ yesterday_ds_nodash }}``           the day before the execution date as ``YYYYMMDD``
+``{{ tomorrow_ds }}``                   the day after the execution date as ``YYYY-MM-DD``
+``{{ tomorrow_ds_nodash }}``            the day after the execution date as ``YYYYMMDD``
+``{{ prev_execution_date }}``           the previous execution date (if available) (`pendulum.Pendulum`_)
+``{{ prev_execution_date_success }}``   execution date from prior successful dag run,
+                                        same as ``prev_logical_date_success``
+``{{ next_execution_date }}``           the next execution date, same as ``next_logical_date``

Review comment:
       ```suggestion
   ``{{ next_execution_date }}``           the next execution date, same as ``data_interval_env``
   ```
   
   (Yes, it would be the same, but I think we should point people to use the explicit data interval properties)

##########
File path: docs/apache-airflow/macros-ref.rst
##########
@@ -35,28 +35,23 @@ in all templates
 =====================================   ====================================
 Variable                                Description
 =====================================   ====================================
-``{{ ds }}``                            the execution date as ``YYYY-MM-DD``
-``{{ ds_nodash }}``                     the execution date as ``YYYYMMDD``
-``{{ prev_ds }}``                       the previous execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ prev_ds }}`` will be ``2018-01-01``
-``{{ prev_ds_nodash }}``                the previous execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ next_ds }}``                       the next execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-01`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ next_ds }}`` will be ``2018-01-08``
-``{{ next_ds_nodash }}``                the next execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ yesterday_ds }}``                  the day before the execution date as ``YYYY-MM-DD``
-``{{ yesterday_ds_nodash }}``           the day before the execution date as ``YYYYMMDD``
-``{{ tomorrow_ds }}``                   the day after the execution date as ``YYYY-MM-DD``
-``{{ tomorrow_ds_nodash }}``            the day after the execution date as ``YYYYMMDD``
-``{{ ts }}``                            same as ``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
+``{{ logical_date }}``                  the logical date of the DAG run (`pendulum.Pendulum`_)
+``{{ ds }}``                            the logical date as ``YYYY-MM-DD``
+``{{ ds_nodash }}``                     the logical date as ``YYYYMMDD``
+``{{ ts }}``                            same as ``schedule_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``

Review comment:
       ```suggestion
   ``{{ ts }}``                            same as ``logical_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
   ```
   
   But see previous comment about deriving form data interval.

##########
File path: docs/apache-airflow/macros-ref.rst
##########
@@ -35,28 +35,23 @@ in all templates
 =====================================   ====================================
 Variable                                Description
 =====================================   ====================================
-``{{ ds }}``                            the execution date as ``YYYY-MM-DD``
-``{{ ds_nodash }}``                     the execution date as ``YYYYMMDD``
-``{{ prev_ds }}``                       the previous execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ prev_ds }}`` will be ``2018-01-01``
-``{{ prev_ds_nodash }}``                the previous execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ next_ds }}``                       the next execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-01`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ next_ds }}`` will be ``2018-01-08``
-``{{ next_ds_nodash }}``                the next execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ yesterday_ds }}``                  the day before the execution date as ``YYYY-MM-DD``
-``{{ yesterday_ds_nodash }}``           the day before the execution date as ``YYYYMMDD``
-``{{ tomorrow_ds }}``                   the day after the execution date as ``YYYY-MM-DD``
-``{{ tomorrow_ds_nodash }}``            the day after the execution date as ``YYYYMMDD``
-``{{ ts }}``                            same as ``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
+``{{ logical_date }}``                  the logical date of the DAG run (`pendulum.Pendulum`_)
+``{{ ds }}``                            the logical date as ``YYYY-MM-DD``

Review comment:
       I think this should be derrived from data_interval_start, rather than logical date.
   
   Previously the implicit data interval was exection_date...next_exectuion_date 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679621592



##########
File path: airflow/models/taskinstance.py
##########
@@ -1670,37 +1667,97 @@ def get(
                 except AirflowNotFoundException:
                     return default_conn
 
+        def deprecated_proxy(func, *, key, replacement=None) -> lazy_object_proxy.Proxy:
+            def deprecated_func():
+                message = (
+                    f"Accessing {key!r} from the template is deprecated and "
+                    f"will be removed in a future version."
+                )
+                if replacement:
+                    message += f" Please use {replacement!r} instead."
+                warnings.warn(message, DeprecationWarning)
+                return func()
+
+            return lazy_object_proxy.Proxy(deprecated_func)
+
+        def get_previous_schedule_date() -> Optional[pendulum.DateTime]:
+            ti = self.get_previous_ti(None)
+            if ti is None:
+                return None
+            return timezone.coerce_datetime(ti.execution_date)
+
+        def get_prev_execution_date():
+            if dag_run.external_trigger:
+                dt = self.execution_date
+            else:
+                dt = task.dag.previous_schedule(self.execution_date)
+            return timezone.coerce_datetime(dt)

Review comment:
       This function is used to populate `prev_execution_date`, `prev_ds`, etc., all of them also deprecated. If the user fixes them, this one will go away as well 🙂 
   
   But yeah it’s not a bad idea to suppress this, there’s no need to show two deprecation messages for one thing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678841333



##########
File path: airflow/timetables/interval.py
##########
@@ -43,6 +43,13 @@ def __eq__(self, other: Any) -> bool:
     def validate(self) -> None:
         self._schedule.validate()
 
+    def infer_data_interval(self, run_after: DateTime) -> Optional[DataInterval]:
+        # Get the last complete period before run_after, e.g. if a DAG run is
+        # scheduled at each midnight, the data interval of a manually trioggered
+        # run at 1am 25th is between 0am 24th and 0am 25th.

Review comment:
       What I like about this strategy is we set data interval to a period that always makes sense, instead of some in-between period that doesn’t conform to any DAG scheduling rules. But admittedly I don’t really know what people use manual triggers for (except for testing, for which the data interval doesn’t matter anyway) so maybe this is not what they want.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679741857



##########
File path: airflow/models/taskinstance.py
##########
@@ -1530,21 +1532,25 @@ def get_template_context(self, session=None) -> Context:
         integrate_macros_plugins()
 
         params = {}  # type: Dict[str, Any]
-        run_id = ''
-        dag_run = None
-        if hasattr(task, 'dag'):
-            if task.dag.params:
-                params.update(task.dag.params)
-            from airflow.models.dagrun import DagRun  # Avoid circular import
-
-            dag_run = (
-                session.query(DagRun)
-                .filter_by(dag_id=task.dag.dag_id, execution_date=self.execution_date)
-                .first()
+        with contextlib.suppress(AttributeError):
+            params.update(task.dag.params)
+
+        dag_run = self.get_dagrun()
+
+        # FIXME: Many tests don't create a DagRun. We should fix the tests.
+        if dag_run is None:
+            FakeDagRun = namedtuple(
+                "FakeDagRun",
+                # A minimal set of attributes to keep things working.
+                "conf data_interval_start data_interval_end external_trigger run_id",
+            )
+            dag_run = FakeDagRun(
+                conf=None,
+                data_interval_start=None,
+                data_interval_end=None,
+                external_trigger=False,
+                run_id="",

Review comment:
       No advantage, was just setting it to default value as it's default is None in ctor




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-897678067


   🎉 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r650123447



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+import alembic
+import sqlalchemy
+
+from airflow.models import DagModel, DagRun
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.utils.session import create_session
+from airflow.utils.sqlalchemy import UtcDateTime
+from airflow.utils.timezone import utcnow
+
+# revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "e9304a3141f0"
+branch_labels = None
+depends_on = None
+
+
+# None and "@once" schedule intervals don't have a data interval and can be
+# bulk-updated with one SQL call, so we do them separately. Other "real"
+# schedule intervals are too complicated and need to be populated manually.
+NO_SCHEDULE_FILTER = DagModel.schedule_interval.in_([None, "@once"])
+
+
+def _populate_simple_dagrun_intervals(session):
+    """Handle DAG runs with simple schedule intervals."""
+    updates = {
+        DagRun.data_interval_start: DagRun.execution_date,
+        DagRun.data_interval_end: DagRun.execution_date,
+    }
+    joined = session.query(DagRun).join(DagModel, DagModel.dag_id == DagRun.dag_id)
+    joined.filter(NO_SCHEDULE_FILTER).update(updates)
+
+
+def _populate_complex_dagrun_intervals(session):
+    """Handle DAG runs with "real" schedule intervals."""
+    joined = session.query(DagRun, SerializedDagModel).join(
+        SerializedDagModel,
+        DagRun.dag_id == SerializedDagModel.dag_id,
+    )
+    for dag_run, serialized in joined.filter(~NO_SCHEDULE_FILTER):
+        dag = serialized.dag
+        data_interval_start = dag_run.execution_date
+        dag_run.data_interval_start = data_interval_start
+        dag_run.data_interval_end = dag.following_schedule(data_interval_start)

Review comment:
       Dang.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r684099331



##########
File path: airflow/models/dag.py
##########
@@ -537,30 +540,31 @@ def next_dagrun_info(
         :param date_last_automated_dagrun: The ``max(execution_date)`` of
             existing "automated" DagRuns for this dag (scheduled or backfill,
             but not manual).
-        :return: A 2-tuple containing the DagRun's execution date, and the
-            earliest it could be scheduled.
+        :returns: DagRunInfo of the next dagrun, or None if a dagrun is not

Review comment:
       `:returns:` vs `:return:
   ```suggestion
           :return: DagRunInfo of the next dagrun, or None if a dagrun is not
   ```
   I think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r657812202



##########
File path: airflow/models/dag.py
##########
@@ -1812,19 +1815,40 @@ def create_dagrun(
             if not isinstance(run_id, str):
                 raise ValueError(f"`run_id` expected to be a str is {type(run_id)}")
             run_type: DagRunType = DagRunType.from_run_id(run_id)
-        elif run_type and execution_date:
+
+        # The preferred signature *requires* the data_interval argument. The
+        # legacy form of accepting an optional execution_date (and disallowing
+        # data_interval) is deprecated but accepted for compatibility.
+        if data_interval is None:
+            warnings.warn(
+                "Creating a DagRun without data_interval is deprecated.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            if execution_date is None:
+                start = timezone.utcnow()
+            else:
+                start = execution_date
+            if run_type == DagRunType.MANUAL:
+                data_interval = (start, start)
+            else:
+                data_interval = (start, self.following_schedule(start))

Review comment:
       It should, but that’s not in `main` yet 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16352: AIP-39: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r663722499



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+import json
+
+from alembic import op
+from sqlalchemy import Column, Integer, String, or_
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import foreign, relationship
+from sqlalchemy_jsonfield import JSONField
+
+from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.utils.session import create_session
+from airflow.utils.sqlalchemy import Interval, UtcDateTime
+from airflow.utils.types import DagRunType
+
+# Revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "30867afad44a"
+branch_labels = None
+depends_on = None
+
+ID_LEN = 250
+
+Base = declarative_base()
+
+
+class DagModel(Base):
+    """A partially frozen ``airflow.models.DagModel`` class."""
+
+    __tablename__ = "dag"
+
+    dag_id = Column(String(ID_LEN), primary_key=True)
+    schedule_interval = Column(Interval)
+
+
+class DagRun(Base):
+    """A partially frozen ``airflow.models.DagRun`` class."""
+
+    __tablename__ = "dag_run"
+
+    id = Column(Integer, primary_key=True)
+    dag_id = Column(String(ID_LEN))
+    execution_date = Column(UtcDateTime)
+    data_interval_start = Column(UtcDateTime)
+    data_interval_end = Column(UtcDateTime)
+    run_id = Column(String(ID_LEN))
+    run_type = Column(String(50))
+
+    dag = relationship(DagModel, primaryjoin=(foreign(DagModel.dag_id) == dag_id))
+
+
+class SerializedDagModel(Base):
+    """A partially frozen ``airflow.models.SerializedDagModel`` class."""
+
+    __tablename__ = "serialized_dag"
+
+    dag_id = Column(String(ID_LEN), primary_key=True)
+    data = Column(JSONField(json=json))
+
+    @property
+    def dag(self):
+        """Copied from the original model class."""
+        SerializedDAG._load_operator_extra_links = self.load_op_links  # pylint: disable=protected-access
+        if isinstance(self.data, dict):
+            return SerializedDAG.from_dict(self.data)
+        return SerializedDAG.from_json(self.data)
+
+
+# These kinds of runs don't have a data interval and can be bulk-updated with
+# one SQL call, so we do them separately. Other "real" schedule intervals are
+# too complicated and need to be populated manually.
+NO_SCHEDULE_FILTER = or_(
+    DagModel.schedule_interval.in_([None, "@once"]),
+    DagRun.run_type == DagRunType.MANUAL,
+)
+
+
+def _populate_simple_dagrun_intervals(session):
+    """Handle DAG runs with simple schedule intervals."""
+    updates = {
+        DagRun.data_interval_start: DagRun.execution_date,
+        DagRun.data_interval_end: DagRun.execution_date,
+    }
+    # SQLite doesn't support UPDATE ... WHERE with multiple tables, and MySQL
+    # doesn't support subqeury in UPDATE, so...
+    if op.get_bind().dialect.name != 'sqlite':
+        dag_runs = session.query(DagRun).filter(DagRun.dag, NO_SCHEDULE_FILTER)
+    else:
+        dag_run_ids = session.query(DagRun.id).filter(NO_SCHEDULE_FILTER)
+        dag_runs = session.query(DagRun).filter(DagRun.id.in_(dag_run_ids.subquery()))
+    dag_runs.update(updates, synchronize_session=False)
+
+
+def _populate_complex_dagrun_intervals(session):
+    """Handle DAG runs with "real" schedule intervals."""
+    dag_runs_with_serialized = session.query(DagRun, SerializedDagModel).filter(
+        DagModel.dag_id == DagRun.dag_id,
+        SerializedDagModel.dag_id == DagRun.dag_id,
+        ~NO_SCHEDULE_FILTER,
+    )
+    for dag_run, serialized in dag_runs_with_serialized:
+        dag = serialized.dag
+        data_interval_start = dag_run.execution_date
+        dag_run.data_interval_start = data_interval_start
+        dag_run.data_interval_end = dag.following_schedule(data_interval_start)
+        session.merge(dag_run, load=False)
+
+
+def upgrade():
+    """Apply add data_interval_[start|end] to DagRun."""
+    # Create aolumns with NULL as default.
+    with op.batch_alter_table("dag_run") as batch_op:
+        batch_op.add_column(Column("data_interval_start", UtcDateTime))

Review comment:
       MSSQL needs `mssql.DATETIME2(precision=6)`  for the column




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679620955



##########
File path: airflow/timetables/base.py
##########
@@ -79,6 +74,18 @@ def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
         """
         return cls(run_after=end, data_interval=DataInterval(start, end))
 
+    @property
+    def schedule_date(self) -> DateTime:

Review comment:
       No, this is the “logical” date of the DagRun, not the earliest time to schedule it.
   
   The equivalence of `run_date` in the proposal is `run_after` in the implementation (because I simply forgot `run_date` was in the proposal…); we can change that if needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r649376333



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+import alembic
+import sqlalchemy
+
+from airflow.models import DagModel, DagRun

Review comment:
       I chose to only pull out a minimal set of fields we need in the migration (a trick I sometimes do to make the migrations more readable).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-861691562


   > In JSON Schema (but I don't know about OpenAPI) you can do "one of" https://json-schema.org/understanding-json-schema/reference/combining.html#oneof
   
   That's true, there's also OneOff schema in OpenApi 3 https://swagger.io/specification/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16352: AIP-39: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r663722499



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+import json
+
+from alembic import op
+from sqlalchemy import Column, Integer, String, or_
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import foreign, relationship
+from sqlalchemy_jsonfield import JSONField
+
+from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.utils.session import create_session
+from airflow.utils.sqlalchemy import Interval, UtcDateTime
+from airflow.utils.types import DagRunType
+
+# Revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "30867afad44a"
+branch_labels = None
+depends_on = None
+
+ID_LEN = 250
+
+Base = declarative_base()
+
+
+class DagModel(Base):
+    """A partially frozen ``airflow.models.DagModel`` class."""
+
+    __tablename__ = "dag"
+
+    dag_id = Column(String(ID_LEN), primary_key=True)
+    schedule_interval = Column(Interval)
+
+
+class DagRun(Base):
+    """A partially frozen ``airflow.models.DagRun`` class."""
+
+    __tablename__ = "dag_run"
+
+    id = Column(Integer, primary_key=True)
+    dag_id = Column(String(ID_LEN))
+    execution_date = Column(UtcDateTime)
+    data_interval_start = Column(UtcDateTime)
+    data_interval_end = Column(UtcDateTime)
+    run_id = Column(String(ID_LEN))
+    run_type = Column(String(50))
+
+    dag = relationship(DagModel, primaryjoin=(foreign(DagModel.dag_id) == dag_id))
+
+
+class SerializedDagModel(Base):
+    """A partially frozen ``airflow.models.SerializedDagModel`` class."""
+
+    __tablename__ = "serialized_dag"
+
+    dag_id = Column(String(ID_LEN), primary_key=True)
+    data = Column(JSONField(json=json))
+
+    @property
+    def dag(self):
+        """Copied from the original model class."""
+        SerializedDAG._load_operator_extra_links = self.load_op_links  # pylint: disable=protected-access
+        if isinstance(self.data, dict):
+            return SerializedDAG.from_dict(self.data)
+        return SerializedDAG.from_json(self.data)
+
+
+# These kinds of runs don't have a data interval and can be bulk-updated with
+# one SQL call, so we do them separately. Other "real" schedule intervals are
+# too complicated and need to be populated manually.
+NO_SCHEDULE_FILTER = or_(
+    DagModel.schedule_interval.in_([None, "@once"]),
+    DagRun.run_type == DagRunType.MANUAL,
+)
+
+
+def _populate_simple_dagrun_intervals(session):
+    """Handle DAG runs with simple schedule intervals."""
+    updates = {
+        DagRun.data_interval_start: DagRun.execution_date,
+        DagRun.data_interval_end: DagRun.execution_date,
+    }
+    # SQLite doesn't support UPDATE ... WHERE with multiple tables, and MySQL
+    # doesn't support subqeury in UPDATE, so...
+    if op.get_bind().dialect.name != 'sqlite':
+        dag_runs = session.query(DagRun).filter(DagRun.dag, NO_SCHEDULE_FILTER)
+    else:
+        dag_run_ids = session.query(DagRun.id).filter(NO_SCHEDULE_FILTER)
+        dag_runs = session.query(DagRun).filter(DagRun.id.in_(dag_run_ids.subquery()))
+    dag_runs.update(updates, synchronize_session=False)
+
+
+def _populate_complex_dagrun_intervals(session):
+    """Handle DAG runs with "real" schedule intervals."""
+    dag_runs_with_serialized = session.query(DagRun, SerializedDagModel).filter(
+        DagModel.dag_id == DagRun.dag_id,
+        SerializedDagModel.dag_id == DagRun.dag_id,
+        ~NO_SCHEDULE_FILTER,
+    )
+    for dag_run, serialized in dag_runs_with_serialized:
+        dag = serialized.dag
+        data_interval_start = dag_run.execution_date
+        dag_run.data_interval_start = data_interval_start
+        dag_run.data_interval_end = dag.following_schedule(data_interval_start)
+        session.merge(dag_run, load=False)
+
+
+def upgrade():
+    """Apply add data_interval_[start|end] to DagRun."""
+    # Create aolumns with NULL as default.
+    with op.batch_alter_table("dag_run") as batch_op:
+        batch_op.add_column(Column("data_interval_start", UtcDateTime))

Review comment:
       MSSQL needs `mssql.DATETIME2(precision=6)`  for the column




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r684102232



##########
File path: airflow/models/dag.py
##########
@@ -537,30 +540,31 @@ def next_dagrun_info(
         :param date_last_automated_dagrun: The ``max(execution_date)`` of
             existing "automated" DagRuns for this dag (scheduled or backfill,
             but not manual).
-        :return: A 2-tuple containing the DagRun's execution date, and the
-            earliest it could be scheduled.
+        :returns: DagRunInfo of the next dagrun, or None if a dagrun is not

Review comment:
       Both are used in the code base so I think Sphinx is fine with either. But this is indeed unintended 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678841333



##########
File path: airflow/timetables/interval.py
##########
@@ -43,6 +43,13 @@ def __eq__(self, other: Any) -> bool:
     def validate(self) -> None:
         self._schedule.validate()
 
+    def infer_data_interval(self, run_after: DateTime) -> Optional[DataInterval]:
+        # Get the last complete period before run_after, e.g. if a DAG run is
+        # scheduled at each midnight, the data interval of a manually trioggered
+        # run at 1am 25th is between 0am 24th and 0am 25th.

Review comment:
       What I like about this strategy is we set data interval to a period that always makes sense, instead of some in-between period that doesn’t conform to any DAG scheduling rules. But admittedly I don’t really know what people use manual triggers for (except for testing, for which the data interval doesn’t matter either way since the result is not too important) so maybe this is not what they want.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-897664981


   We can probably merge this PR in that case


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r683384487



##########
File path: docs/apache-airflow/macros-ref.rst
##########
@@ -35,28 +35,23 @@ in all templates
 =====================================   ====================================
 Variable                                Description
 =====================================   ====================================
-``{{ ds }}``                            the execution date as ``YYYY-MM-DD``
-``{{ ds_nodash }}``                     the execution date as ``YYYYMMDD``
-``{{ prev_ds }}``                       the previous execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ prev_ds }}`` will be ``2018-01-01``
-``{{ prev_ds_nodash }}``                the previous execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ next_ds }}``                       the next execution date as ``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-01`` and ``schedule_interval`` is ``@weekly``,
-                                        ``{{ next_ds }}`` will be ``2018-01-08``
-``{{ next_ds_nodash }}``                the next execution date as ``YYYYMMDD`` if exists, else ``None``
-``{{ yesterday_ds }}``                  the day before the execution date as ``YYYY-MM-DD``
-``{{ yesterday_ds_nodash }}``           the day before the execution date as ``YYYYMMDD``
-``{{ tomorrow_ds }}``                   the day after the execution date as ``YYYY-MM-DD``
-``{{ tomorrow_ds_nodash }}``            the day after the execution date as ``YYYYMMDD``
-``{{ ts }}``                            same as ``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
+``{{ logical_date }}``                  the logical date of the DAG run (`pendulum.Pendulum`_)
+``{{ ds }}``                            the logical date as ``YYYY-MM-DD``

Review comment:
       How about `@once` DAGs? We decided to set its data interval to None, should this also become None?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-897602865


   Failing on all Python 3.8 cases  though, still unrelated to the change though


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678841333



##########
File path: airflow/timetables/interval.py
##########
@@ -43,6 +43,13 @@ def __eq__(self, other: Any) -> bool:
     def validate(self) -> None:
         self._schedule.validate()
 
+    def infer_data_interval(self, run_after: DateTime) -> Optional[DataInterval]:
+        # Get the last complete period before run_after, e.g. if a DAG run is
+        # scheduled at each midnight, the data interval of a manually trioggered
+        # run at 1am 25th is between 0am 24th and 0am 25th.

Review comment:
       What I like about this strategy is we set data interval to a period that always makes sense, instead of some in-between period that doesn’t conform to any DAG scheduling rules. But admittedly I don’t really know what people use manual triggers for (I only use it to check my DAG “can run from start to finish”, for which the data interval doesn’t matter either way since I discard the result anyway) so maybe this is not what they want.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-891181187


   I think this is ready for another round of reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-858670073


   > Oh, we'll need to add these to the OpenAPI defn too.
   
   Yeah, I’m working on it, the `post_dag_run` endpoint also needs updating. How should we handle backward compatibility in API? My current approach is to allow the caller to pass *either* `execution_date` or `data_interval_[start|end]`, but how do I make this show in the schema? Also, is there a way to emit a message to tell the client to migrate?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-858717810


   Paging @ephraimbuddy as our resident Connexion expert - any Idea?
   
   I suspect there is no way (other than perhaps a custom header) to have deprecation warning shown to the client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-860782292


   > Is it possible to supply two API schemas for the endpoint? I want to allow supplying either `execution_date` or `data_interval_[start|end]`, but not both. And if I do that, is there a way to somehow mark the `execution_date` schema as deprecated?
   
   I don't think it's possible to supply two API schemas. We can mark the endpoint as deprecated and have a new endpoint with the data_interval_[start|end]. 
   ```
      post:
             deprecated: true
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#issuecomment-860749604


   Is it possible to supply two API schemas for the endpoint? I want to allow supplying either `execution_date` or `data_interval_[start|end]`, but not both. And if I do that, is there a way to somehow mark the `execution_date` schema as deprecated?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16352: AIP-39: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r660292671



##########
File path: airflow/models/dag.py
##########
@@ -1812,19 +1815,40 @@ def create_dagrun(
             if not isinstance(run_id, str):
                 raise ValueError(f"`run_id` expected to be a str is {type(run_id)}")
             run_type: DagRunType = DagRunType.from_run_id(run_id)
-        elif run_type and execution_date:
+
+        # The preferred signature *requires* the data_interval argument. The
+        # legacy form of accepting an optional execution_date (and disallowing
+        # data_interval) is deprecated but accepted for compatibility.
+        if data_interval is None:
+            warnings.warn(
+                "Creating a DagRun without data_interval is deprecated.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            if execution_date is None:
+                start = timezone.utcnow()
+            else:
+                start = execution_date
+            if run_type == DagRunType.MANUAL:
+                data_interval = (start, start)
+            else:
+                data_interval = (start, self.following_schedule(start))

Review comment:
       Now it is, so I’ll change this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r649230997



##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+import alembic
+import sqlalchemy
+
+from airflow.models import DagModel, DagRun

Review comment:
       Rather than importing the model here, we should create a minimal version here -- otherwise this will _always_ use the latest version of the model meaning this might fail in the future.
   
   See `airflow/migrations/versions/6e96a59344a4_make_taskinstance_pool_not_nullable.py` for example




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16352: DagRun.date_interval_start and date_interval_start

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r657799820



##########
File path: airflow/models/dag.py
##########
@@ -1812,19 +1815,40 @@ def create_dagrun(
             if not isinstance(run_id, str):
                 raise ValueError(f"`run_id` expected to be a str is {type(run_id)}")
             run_type: DagRunType = DagRunType.from_run_id(run_id)
-        elif run_type and execution_date:
+
+        # The preferred signature *requires* the data_interval argument. The
+        # legacy form of accepting an optional execution_date (and disallowing
+        # data_interval) is deprecated but accepted for compatibility.
+        if data_interval is None:
+            warnings.warn(
+                "Creating a DagRun without data_interval is deprecated.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            if execution_date is None:
+                start = timezone.utcnow()
+            else:
+                start = execution_date
+            if run_type == DagRunType.MANUAL:
+                data_interval = (start, start)
+            else:
+                data_interval = (start, self.following_schedule(start))

Review comment:
       Shouldn't this go directly to a method on `self.timetable`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r681482907



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -242,21 +243,29 @@ def post_dag_run(dag_id, session):
     except ValidationError as err:
         raise BadRequest(detail=str(err))
 
+    execution_date = post_body["execution_date"]
+    run_id = post_body["run_id"]
     dagrun_instance = (
         session.query(DagRun)
         .filter(
             DagRun.dag_id == dag_id,
-            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == post_body["execution_date"]),
+            or_(DagRun.run_id == run_id, DagRun.execution_date == execution_date),
         )
         .first()
     )
     if not dagrun_instance:
-        dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, **post_body)
-        session.add(dag_run)
-        session.commit()
+        dag_run = current_app.dag_bag.get_dag(dag_id).create_dagrun(
+            run_type=DagRunType.MANUAL,
+            run_id=run_id,
+            execution_date=execution_date,
+            state=State.QUEUED,
+            conf=post_body.get("conf"),
+            external_trigger=True,
+            dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
+        )

Review comment:
       Oh, nice




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org