You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/19 11:23:59 UTC

[GitHub] [airflow] ashb opened a new pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

ashb opened a new pull request #17719:
URL: https://github.com/apache/airflow/pull/17719


   Since TaskReschedule had an existing FK to TaskInstance we had to move
   change both of these at the same time.
   
   This puts an explicit FK constraint between TaskInstance and DagRun,
   meaning that we can remove a lot of "find TIs without DagRun" code in
   the scheduler too, as that is no longer a possible situation.
   
   Since there is now an explicit foreign key between TaskInstance and
   DagRun, we can remove a lot of the "cleanup" code in the scheduler that
   was dealing with this.
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692998718



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -865,13 +863,7 @@ def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
         if filter_by_dag_run is None:
             resettable_tis = (
                 session.query(TaskInstance)
-                .join(
-                    DagRun,
-                    and_(
-                        TaskInstance.dag_id == DagRun.dag_id,
-                        TaskInstance.execution_date == DagRun.execution_date,
-                    ),
-                )
+                .join(TaskInstance.dag_run)

Review comment:
       Worth using `contains_eager` here too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692481164



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -321,12 +247,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
         query = (
             session.query(TI)
             .outerjoin(TI.dag_run)

Review comment:
       ```suggestion
               .join(TI.dag_run)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-911561071


   @uranusjr Right, All the main tests are passing (and 2/4 of the helm ones failed with I think random failures).
   
   Do you want to give this a review/approve, then I'll rebase on to master to get the Up-to-date check happy (it doesn't like the merge)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692758448



##########
File path: airflow/models/baseoperator.py
##########
@@ -1257,24 +1257,46 @@ def get_flat_relatives(self, upstream: bool = False):
         dag: DAG = self._dag
         return list(map(lambda task_id: dag.task_dict[task_id], self.get_flat_relative_ids(upstream)))
 
+    @provide_session
     def run(
         self,
         start_date: Optional[datetime] = None,
         end_date: Optional[datetime] = None,
         ignore_first_depends_on_past: bool = True,
         ignore_ti_state: bool = False,
         mark_success: bool = False,
+        session: Session = None,
     ) -> None:
         """Run a set of task instances for a date range."""
         start_date = start_date or self.start_date
         end_date = end_date or self.end_date or timezone.utcnow()
 
         for info in self.dag.iter_dagrun_infos_between(start_date, end_date, align=False):
             ignore_depends_on_past = info.logical_date == start_date and ignore_first_depends_on_past
-            TaskInstance(self, info.logical_date).run(
+            try:
+                ti = TaskInstance(self, info.logical_date)
+            except DagRunNotFound:

Review comment:
       (`# Create a "fake" run…` -- it can't be a fake one, it needs to be a real one in the DB as the ti gets committed by `ti.run()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692776117



##########
File path: airflow/models/dagrun.py
##########
@@ -675,17 +671,15 @@ def verify_integrity(self, session: Session = None):
 
             if task.task_id not in task_ids:
                 Stats.incr(f"task_instance_created-{task.task_type}", 1, 1)
-                ti = TI(task, self.execution_date)
+                ti = TI(task, execution_date=None, run_id=self.run_id)
                 task_instance_mutation_hook(ti)
                 session.add(ti)
 
         try:
             session.flush()
         except IntegrityError as err:
             self.log.info(str(err))
-            self.log.info(
-                'Hit IntegrityError while creating the TIs for ' f'{dag.dag_id} - {self.execution_date}.'
-            )
+            self.log.info('Hit IntegrityError while creating the TIs for ' f'{dag.dag_id} - {self.run_id}.')

Review comment:
       2a7f8a411
   
   (I just changed the values, didn't look that closely at what it was previously doing.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692479872



##########
File path: tests/core/test_core.py
##########
@@ -314,64 +330,12 @@ def test_task_get_template(self):
             assert value == expected_value
             assert [str(m.message) for m in recorder] == [message]
 
-    def test_local_task_job(self):

Review comment:
       These tests are duplicating what was already in test_local_task_job -- we don't need them again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r701396620



##########
File path: UPDATING.md
##########
@@ -228,6 +228,14 @@ Now that the DAG parser syncs DAG permissions there is no longer a need for manu
 
 In addition, the `/refresh` and `/refresh_all` webserver endpoints have also been removed.
 
+### TaskInstances now *require* a DagRun
+
+Under normal operation every TaskInstance row in the database would have DagRun row too, but it was possible to manually delete the DagRun and Airflow would still execute the TaskInstances.
+
+In Airflow 2.2 we have changed this and now there is a database-level foreign key constraint ensuring that every TaskInstance has a DagRun row.
+
+Before updating to this 2.2 release you will have to manually resolve any inconsistencies (add back DagRun rows, or delete TaskInstances) if you have any "dangling" TaskInstance" rows.
+

Review comment:
       Should we also mention that `clean_tis_without_dagrun_interval` has been removed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692996103



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -645,16 +645,14 @@ def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
             # Sorting by execution date first
             sorted_ti_keys = sorted(
                 set_ti_keys,
-                key=lambda ti_key: (ti_key.execution_date, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
+                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),

Review comment:
       in that case L645 needs to be updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692753929



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -51,15 +51,17 @@
 from airflow.utils.session import create_session, provide_session
 
 
-def _get_ti(task, exec_date_or_run_id):
+@provide_session
+def _get_ti(task, exec_date_or_run_id, session):
     """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
-    dag_run = task.dag.get_dagrun(run_id=exec_date_or_run_id)
+    dag_run = task.dag.get_dagrun(run_id=exec_date_or_run_id, session=session)
     if not dag_run:
         try:
             execution_date = timezone.parse(exec_date_or_run_id)
-            ti = TaskInstance(task, execution_date)
-            ti.refresh_from_db()
-            return ti
+            dag_run.session.query(DagRun).filter(
+                DagRun.dag_id == task.dag_id,
+                DagRun.execution_date == execution_date,
+            ).one()

Review comment:
       Hmmmm I _thought_ I fixed that already :/ Clearly not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692767456



##########
File path: airflow/models/taskinstance.py
##########
@@ -389,27 +399,53 @@ class TaskInstance(Base, LoggingMixin):
         innerjoin=True,
     )
 
-    def __init__(self, task, execution_date: datetime, state: Optional[str] = None):
+    dag_run = relationship("DagRun", back_populates="task_instances")
+
+    execution_date = association_proxy("dag_run", "execution_date")
+
+    def __init__(
+        self, task, execution_date: Optional[datetime] = None, run_id: str = None, state: Optional[str] = None
+    ):
         super().__init__()
         self.dag_id = task.dag_id
         self.task_id = task.task_id
         self.refresh_from_task(task)
         self._log = logging.getLogger("airflow.task")
 
-        # make sure we have a localized execution_date stored in UTC
-        if execution_date and not timezone.is_localized(execution_date):
-            self.log.warning(
-                "execution date %s has no timezone information. Using default from dag or system",
-                execution_date,
+        if execution_date:
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            warnings.warn(
+                "Passing an execution_date to `TaskInstance()` is deprecated in favour of passing a run_id",
+                DeprecationWarning,
+                # Stack level is 4 because SQLA adds some wrappers around the constructor
+                stacklevel=4,
             )
-            if self.task.has_dag():
-                execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
-            else:
-                execution_date = timezone.make_aware(execution_date)
+            # make sure we have a localized execution_date stored in UTC
+            if execution_date and not timezone.is_localized(execution_date):
+                self.log.warning(
+                    "execution date %s has no timezone information. Using default from dag or system",
+                    execution_date,
+                )
+                if self.task.has_dag():
+                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                else:
+                    execution_date = timezone.make_aware(execution_date)
 
-            execution_date = timezone.convert_to_utc(execution_date)
+                execution_date = timezone.convert_to_utc(execution_date)
+            with create_session() as session:
+                try:
+                    (run_id,) = (
+                        session.query(DagRun.run_id)
+                        .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                        .one()
+                    )
+                except NoResultFound:

Review comment:
       Oh yes that's nicer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692770126



##########
File path: tests/executors/test_celery_executor.py
##########
@@ -363,8 +360,8 @@ def test_check_for_stalled_adopted_tasks(self):
             task_1 = BaseOperator(task_id="task_1", start_date=start_date)
             task_2 = BaseOperator(task_id="task_2", start_date=start_date)
 
-        key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, exec_date, try_number)
-        key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, exec_date, try_number)
+        key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, "runid", try_number)
+        key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, "runid", try_number)

Review comment:
       Might be worth it, yes. Two reasons I did this: it was late, and nothing in the Executor cares about DagRun or strictly speaking executor_date.
   
   
   Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r693048109



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -189,80 +189,6 @@ def is_alive(self, grace_multiplier: Optional[float] = None) -> bool:
             and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < scheduler_health_check_threshold
         )
 
-    @provide_session
-    def _change_state_for_tis_without_dagrun(

Review comment:
       Since tasks will no longer exist without dagrun, obviously the dag will fail or succeed and tis won't be stuck. I think we should remove these lines.:
   https://github.com/apache/airflow/blob/5d90bcb93a4318de7db7fd5527bb45c823fb0dab/airflow/jobs/scheduler_job.py#L603-L604




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692774478



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -189,80 +189,6 @@ def is_alive(self, grace_multiplier: Optional[float] = None) -> bool:
             and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < scheduler_health_check_threshold
         )
 
-    @provide_session
-    def _change_state_for_tis_without_dagrun(

Review comment:
       Huge, can clear the issue of tasks getting stuck!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #17719: Change TaskInstance and TaskReschedule PK from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #17719:
URL: https://github.com/apache/airflow/pull/17719


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692775729



##########
File path: airflow/models/skipmixin.py
##########
@@ -91,7 +79,28 @@ def skip(
         if not tasks:
             return
 
-        self._set_state_to_skipped(dag_run, execution_date, tasks, session)
+        if execution_date and not dag_run:
+            from airflow.models.dagrun import DagRun
+
+            warnings.warn(
+                "Passing an execution_date to `skip()` is deprecated in favour of passing a dag_run",
+                DeprecationWarning,
+                stacklevel=2,
+            )

Review comment:
       2a7f8a411




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r693006135



##########
File path: airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
##########
@@ -0,0 +1,211 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""TaskInstance keyed to DagRun
+
+Revision ID: 7b2661a43ba3
+Revises: 142555e44c17
+Create Date: 2021-07-15 15:26:12.710749
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.sql import and_, column, select, table
+
+from airflow.models.base import COLLATION_ARGS
+
+ID_LEN = 250
+
+# revision identifiers, used by Alembic.
+revision = '7b2661a43ba3'
+down_revision = '142555e44c17'
+branch_labels = None
+depends_on = None
+
+
+def _mssql_datetime():
+    from sqlalchemy.dialects import mssql
+
+    return mssql.DATETIME2(precision=6)
+
+
+# Just Enough Table to run the conditions for update.
+task_instance = table(
+    'task_instance',
+    column('task_id', sa.String),
+    column('dag_id', sa.String),
+    column('run_id', sa.String),
+    column('execution_date', sa.TIMESTAMP),
+)
+task_reschedule = table(
+    'task_reschedule',
+    column('task_id', sa.String),
+    column('dag_id', sa.String),
+    column('run_id', sa.String),
+    column('execution_date', sa.TIMESTAMP),
+)
+dag_run = table(
+    'dag_run',
+    column('dag_id', sa.String),
+    column('run_id', sa.String),
+    column('execution_date', sa.TIMESTAMP),
+)
+
+
+def upgrade():
+    """Apply TaskInstance keyed to DagRun"""
+    dialect_name = op.get_bind().dialect.name
+
+    run_id_col_type = sa.String(length=ID_LEN, **COLLATION_ARGS)
+
+    # First create column nullable
+    op.add_column('task_instance', sa.Column('run_id', run_id_col_type, nullable=True))
+    op.add_column('task_reschedule', sa.Column('run_id', run_id_col_type, nullable=True))
+
+    # Then update the new column by selecting the right value from DagRun
+    update_query = _multi_table_update(dialect_name, task_instance, task_instance.c.run_id)
+    op.execute(update_query)
+
+    #
+    # TaskReschedule has a FK to TaskInstance, so we have to update that before
+    # we can drop the TI.execution_date column
+
+    update_query = _multi_table_update(dialect_name, task_reschedule, task_reschedule.c.run_id)
+    op.execute(update_query)
+
+    with op.batch_alter_table('task_reschedule', schema=None) as batch_op:
+        batch_op.alter_column('run_id', existing_type=run_id_col_type, existing_nullable=True, nullable=False)
+
+        batch_op.drop_constraint('task_reschedule_dag_task_date_fkey', 'foreign')
+        batch_op.drop_index('idx_task_reschedule_dag_task_date')
+
+    with op.batch_alter_table('task_instance', schema=None) as batch_op:
+        # Then make it non-nullable
+        batch_op.alter_column('run_id', existing_type=run_id_col_type, existing_nullable=True, nullable=False)
+
+        # TODO: Is this right for non-postgres?
+        batch_op.drop_constraint('task_instance_pkey', type_='primary')

Review comment:
       https://alembic.sqlalchemy.org/en/latest/naming.html




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692766403



##########
File path: airflow/models/taskinstance.py
##########
@@ -1723,30 +1736,22 @@ def is_eligible_to_retry(self):
 
         return self.task.retries and self.try_number <= self.max_tries
 
-    @provide_session
-    def get_template_context(self, session=None) -> Context:
+    def get_template_context(self, session: Session = None) -> Context:
         """Return TI Context"""
+        # Do not use provide_session here -- it expunges everything on exit!
+        if not session:
+            session = settings.Session()
         task = self.task
         from airflow import macros
 
         integrate_macros_plugins()
 
-        dag_run = self.get_dagrun()
-
-        # FIXME: Many tests don't create a DagRun. We should fix the tests.
-        if dag_run is None:
-            FakeDagRun = namedtuple(
-                "FakeDagRun",
-                # A minimal set of attributes to keep things working.
-                "conf data_interval_start data_interval_end external_trigger run_id",
-            )
-            dag_run = FakeDagRun(
-                conf=None,
-                data_interval_start=None,
-                data_interval_end=None,
-                external_trigger=False,
-                run_id="",
-            )
+        params = {}  # type: Dict[str, Any]
+        # Ensure that the dag_run is loaded -- otherwise `self.execution_date` may not work
+        dag_run = self.get_dagrun(session)
+        if hasattr(task, 'dag'):
+            if task.dag.params:
+                params.update(task.dag.params)
 
         params = {}  # type: Dict[str, Any]

Review comment:
       I think this was a bad conflict resolution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692777320



##########
File path: airflow/ti_deps/deps/dagrun_exists_dep.py
##########
@@ -29,27 +29,9 @@ class DagrunRunningDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
-        dag = ti.task.dag
-        dagrun = ti.get_dagrun(session)
-        if not dagrun:
-            # The import is needed here to avoid a circular dependency
-            from airflow.models.dagrun import DagRun
-
-            running_dagruns = DagRun.find(
-                dag_id=dag.dag_id, state=State.RUNNING, external_trigger=False, session=session
+        dr = ti.get_dagrun(session)
+        if dr.state != State.RUNNING:
+            yield self._failing_status(
+                reason="Task instance's dagrun was not in the 'running' state but in "
+                "the state '{}'.".format(dr.state)
             )
-
-            if len(running_dagruns) >= dag.max_active_runs:
-                reason = (
-                    "The maximum number of active dag runs ({}) for this task "
-                    "instance's DAG '{}' has been reached.".format(dag.max_active_runs, ti.dag_id)
-                )
-            else:
-                reason = "Unknown reason"
-            yield self._failing_status(reason=f"Task instance's dagrun did not exist: {reason}.")

Review comment:
       This was only triggered for the case when there was no dag run, which is not possible now.
   
   (And the scheduler now manages the number of active DagRuns)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-912312182


   Now cleaning up or not cleaning up?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692765812



##########
File path: airflow/models/taskinstance.py
##########
@@ -1723,30 +1736,22 @@ def is_eligible_to_retry(self):
 
         return self.task.retries and self.try_number <= self.max_tries
 
-    @provide_session
-    def get_template_context(self, session=None) -> Context:
+    def get_template_context(self, session: Session = None) -> Context:
         """Return TI Context"""
+        # Do not use provide_session here -- it expunges everything on exit!
+        if not session:
+            session = settings.Session()
         task = self.task
         from airflow import macros
 
         integrate_macros_plugins()
 
-        dag_run = self.get_dagrun()
-
-        # FIXME: Many tests don't create a DagRun. We should fix the tests.
-        if dag_run is None:
-            FakeDagRun = namedtuple(
-                "FakeDagRun",
-                # A minimal set of attributes to keep things working.
-                "conf data_interval_start data_interval_end external_trigger run_id",
-            )
-            dag_run = FakeDagRun(
-                conf=None,
-                data_interval_start=None,
-                data_interval_end=None,
-                external_trigger=False,
-                run_id="",
-            )
+        params = {}  # type: Dict[str, Any]
+        # Ensure that the dag_run is loaded -- otherwise `self.execution_date` may not work
+        dag_run = self.get_dagrun(session)
+        if hasattr(task, 'dag'):
+            if task.dag.params:
+                params.update(task.dag.params)
 
         params = {}  # type: Dict[str, Any]

Review comment:
       Yes, _very_.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692764203



##########
File path: airflow/models/baseoperator.py
##########
@@ -1257,24 +1257,46 @@ def get_flat_relatives(self, upstream: bool = False):
         dag: DAG = self._dag
         return list(map(lambda task_id: dag.task_dict[task_id], self.get_flat_relative_ids(upstream)))
 
+    @provide_session
     def run(
         self,
         start_date: Optional[datetime] = None,
         end_date: Optional[datetime] = None,
         ignore_first_depends_on_past: bool = True,
         ignore_ti_state: bool = False,
         mark_success: bool = False,
+        session: Session = None,
     ) -> None:
         """Run a set of task instances for a date range."""
         start_date = start_date or self.start_date
         end_date = end_date or self.end_date or timezone.utcnow()
 
         for info in self.dag.iter_dagrun_infos_between(start_date, end_date, align=False):
             ignore_depends_on_past = info.logical_date == start_date and ignore_first_depends_on_past
-            TaskInstance(self, info.logical_date).run(
+            try:
+                ti = TaskInstance(self, info.logical_date)
+            except DagRunNotFound:

Review comment:
       You’re right; by “fake” I meant “created manually to match the supposed internal db state”, sorry for the lax choice of word.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-912374909


   Fix coming -- was a problem in the test fixture


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692775040



##########
File path: tests/jobs/test_triggerer_job.py
##########
@@ -305,22 +303,14 @@ def test_invalid_trigger(session):
     session.commit()
 
     # Create the test DAG and task
-    with DAG(
-        dag_id='test_invalid_trigger',
-        start_date=timezone.datetime(2016, 1, 1),
-        schedule_interval='@once',
-        max_active_runs=1,
-    ):
-        task1 = DummyOperator(task_id='dummy1')
+    with dag_maker(dag_id='test_invalid_trigger'):
+        DummyOperator(task_id='dummy1')
 
+    dr = dag_maker.create_dagrun()

Review comment:
       Done in 2a7f8a411 (to the dag_maker call)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r693001038



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -865,13 +863,7 @@ def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
         if filter_by_dag_run is None:
             resettable_tis = (
                 session.query(TaskInstance)
-                .join(
-                    DagRun,
-                    and_(
-                        TaskInstance.dag_id == DagRun.dag_id,
-                        TaskInstance.execution_date == DagRun.execution_date,
-                    ),
-                )
+                .join(TaskInstance.dag_run)

Review comment:
       Yes probably -- didn't know about that when I first started.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692997854



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -645,16 +645,14 @@ def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
             # Sorting by execution date first
             sorted_ti_keys = sorted(
                 set_ti_keys,
-                key=lambda ti_key: (ti_key.execution_date, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
+                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
             )
             return tabulate(sorted_ti_keys, headers=["DAG ID", "Task ID", "Execution date", "Try number"])
 
         def tabulate_tis_set(set_tis: Set[TaskInstance]) -> str:
             # Sorting by execution date first
-            sorted_tis = sorted(
-                set_tis, key=lambda ti: (ti.execution_date, ti.dag_id, ti.task_id, ti.try_number)
-            )
-            tis_values = ((ti.dag_id, ti.task_id, ti.execution_date, ti.try_number) for ti in sorted_tis)
+            sorted_tis = sorted(set_tis, key=lambda ti: (ti.run_id, ti.dag_id, ti.task_id, ti.try_number))
+            tis_values = ((ti.dag_id, ti.task_id, ti.run_id, ti.try_number) for ti in sorted_tis)

Review comment:
       Would be good to keep ordering by `execution_date` or even `start_date`, run_id feels odds for sorting here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r693005517



##########
File path: airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
##########
@@ -0,0 +1,211 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""TaskInstance keyed to DagRun
+
+Revision ID: 7b2661a43ba3
+Revises: 142555e44c17
+Create Date: 2021-07-15 15:26:12.710749
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.sql import and_, column, select, table
+
+from airflow.models.base import COLLATION_ARGS
+
+ID_LEN = 250
+
+# revision identifiers, used by Alembic.
+revision = '7b2661a43ba3'
+down_revision = '142555e44c17'
+branch_labels = None
+depends_on = None
+
+
+def _mssql_datetime():
+    from sqlalchemy.dialects import mssql
+
+    return mssql.DATETIME2(precision=6)
+
+
+# Just Enough Table to run the conditions for update.
+task_instance = table(
+    'task_instance',
+    column('task_id', sa.String),
+    column('dag_id', sa.String),
+    column('run_id', sa.String),
+    column('execution_date', sa.TIMESTAMP),
+)
+task_reschedule = table(
+    'task_reschedule',
+    column('task_id', sa.String),
+    column('dag_id', sa.String),
+    column('run_id', sa.String),
+    column('execution_date', sa.TIMESTAMP),
+)
+dag_run = table(
+    'dag_run',
+    column('dag_id', sa.String),
+    column('run_id', sa.String),
+    column('execution_date', sa.TIMESTAMP),
+)
+
+
+def upgrade():
+    """Apply TaskInstance keyed to DagRun"""
+    dialect_name = op.get_bind().dialect.name
+
+    run_id_col_type = sa.String(length=ID_LEN, **COLLATION_ARGS)
+
+    # First create column nullable
+    op.add_column('task_instance', sa.Column('run_id', run_id_col_type, nullable=True))
+    op.add_column('task_reschedule', sa.Column('run_id', run_id_col_type, nullable=True))
+
+    # Then update the new column by selecting the right value from DagRun
+    update_query = _multi_table_update(dialect_name, task_instance, task_instance.c.run_id)
+    op.execute(update_query)
+
+    #
+    # TaskReschedule has a FK to TaskInstance, so we have to update that before
+    # we can drop the TI.execution_date column
+
+    update_query = _multi_table_update(dialect_name, task_reschedule, task_reschedule.c.run_id)
+    op.execute(update_query)
+
+    with op.batch_alter_table('task_reschedule', schema=None) as batch_op:
+        batch_op.alter_column('run_id', existing_type=run_id_col_type, existing_nullable=True, nullable=False)
+
+        batch_op.drop_constraint('task_reschedule_dag_task_date_fkey', 'foreign')
+        batch_op.drop_index('idx_task_reschedule_dag_task_date')
+
+    with op.batch_alter_table('task_instance', schema=None) as batch_op:
+        # Then make it non-nullable
+        batch_op.alter_column('run_id', existing_type=run_id_col_type, existing_nullable=True, nullable=False)
+
+        # TODO: Is this right for non-postgres?
+        batch_op.drop_constraint('task_instance_pkey', type_='primary')

Review comment:
       This constraint wasn't named so would be different for different database so we might have to do something like https://github.com/apache/airflow/blob/main/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py#L75-L80




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r693001015



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -321,12 +247,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
         query = (
             session.query(TI)
             .outerjoin(TI.dag_run)
-            .filter(or_(DR.run_id.is_(None), DR.run_type != DagRunType.BACKFILL_JOB))
+            .filter(DR.run_type != DagRunType.BACKFILL_JOB)

Review comment:
       `contains_eager` migth help here too if I understand it correct




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692769520



##########
File path: airflow/www/views.py
##########
@@ -1096,7 +1097,8 @@ def rendered_k8s(self):
         logging.info("Retrieving rendered templates.")
         dag = current_app.dag_bag.get_dag(dag_id)
         task = dag.get_task(task_id)
-        ti = models.TaskInstance(task=task, execution_date=dttm)
+        dag_run = dag.get_dagrun(execution_date=execution_date)
+        ti = dag_run.get_task_instance(task_id=task.task_id)

Review comment:
       Yes, that involves changing all the URLs, and that felt like it should be another PR as this one is big enough already :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692877410



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -645,16 +645,14 @@ def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
             # Sorting by execution date first
             sorted_ti_keys = sorted(
                 set_ti_keys,
-                key=lambda ti_key: (ti_key.execution_date, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
+                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),

Review comment:
       https://docs.sqlalchemy.org/en/13/orm/loading_relationships.html?highlight=joinedload#sqlalchemy.orm.contains_eager may be useful in various places (`joinedload` would join against the table _twice_)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692757334



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -645,16 +645,14 @@ def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
             # Sorting by execution date first
             sorted_ti_keys = sorted(
                 set_ti_keys,
-                key=lambda ti_key: (ti_key.execution_date, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
+                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),

Review comment:
       This would change the order that errors are printed only so it's not "significant" to anything.
   
   (The session handling here is all over the shop, and if the TI is detached from the session and hasn't yet loaded `ti.dag_run` then you get a DetachedObject exception)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692480923



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -189,80 +189,6 @@ def is_alive(self, grace_multiplier: Optional[float] = None) -> bool:
             and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < scheduler_health_check_threshold
         )
 
-    @provide_session
-    def _change_state_for_tis_without_dagrun(

Review comment:
       This is deleted, as the DB won't let us have TIs without Dagruns anymore :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692030362



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -645,16 +645,14 @@ def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
             # Sorting by execution date first
             sorted_ti_keys = sorted(
                 set_ti_keys,
-                key=lambda ti_key: (ti_key.execution_date, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
+                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),

Review comment:
       Since we have a fk constraint and relationship, can we do some join and keep `TaskInstanceKey.execution_date` to be backed by the DagRun’s `execution_date` instead? (i.e. don’t change this function, but change how `TaskInstanceKey` is created.) The current change would break ordering if a DagRun has a custom run ID.

##########
File path: airflow/jobs/backfill_job.py
##########
@@ -645,16 +645,14 @@ def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
             # Sorting by execution date first
             sorted_ti_keys = sorted(
                 set_ti_keys,
-                key=lambda ti_key: (ti_key.execution_date, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
+                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
             )
             return tabulate(sorted_ti_keys, headers=["DAG ID", "Task ID", "Execution date", "Try number"])
 
         def tabulate_tis_set(set_tis: Set[TaskInstance]) -> str:
             # Sorting by execution date first
-            sorted_tis = sorted(
-                set_tis, key=lambda ti: (ti.execution_date, ti.dag_id, ti.task_id, ti.try_number)
-            )
-            tis_values = ((ti.dag_id, ti.task_id, ti.execution_date, ti.try_number) for ti in sorted_tis)
+            sorted_tis = sorted(set_tis, key=lambda ti: (ti.run_id, ti.dag_id, ti.task_id, ti.try_number))
+            tis_values = ((ti.dag_id, ti.task_id, ti.run_id, ti.try_number) for ti in sorted_tis)

Review comment:
       Similar to above, but here we need to do a join when these `TaskInstance` objects were fetched so `ti.dag_run.execution_date` can be cheap.

##########
File path: airflow/models/skipmixin.py
##########
@@ -91,7 +79,28 @@ def skip(
         if not tasks:
             return
 
-        self._set_state_to_skipped(dag_run, execution_date, tasks, session)
+        if execution_date and not dag_run:
+            from airflow.models.dagrun import DagRun
+
+            warnings.warn(
+                "Passing an execution_date to `skip()` is deprecated in favour of passing a dag_run",
+                DeprecationWarning,
+                stacklevel=2,
+            )

Review comment:
       I think we also need a warning when `dag_run` and `execution_date` are *both* passed to say `execution_date` is now ignored and should be removed. Or maybe we can check the value matches `dagrun.execution_date` and raise a `ValueError`. Or both of these.

##########
File path: airflow/cli/commands/task_command.py
##########
@@ -51,15 +51,17 @@
 from airflow.utils.session import create_session, provide_session
 
 
-def _get_ti(task, exec_date_or_run_id):
+@provide_session
+def _get_ti(task, exec_date_or_run_id, session):
     """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
-    dag_run = task.dag.get_dagrun(run_id=exec_date_or_run_id)
+    dag_run = task.dag.get_dagrun(run_id=exec_date_or_run_id, session=session)
     if not dag_run:
         try:
             execution_date = timezone.parse(exec_date_or_run_id)
-            ti = TaskInstance(task, execution_date)
-            ti.refresh_from_db()
-            return ti
+            dag_run.session.query(DagRun).filter(
+                DagRun.dag_id == task.dag_id,
+                DagRun.execution_date == execution_date,
+            ).one()

Review comment:
       ```suggestion
               session.query(DagRun).filter(
                   DagRun.dag_id == task.dag_id,
                   DagRun.execution_date == execution_date,
               ).one()
   ```
   
   ?

##########
File path: airflow/models/skipmixin.py
##########
@@ -37,36 +38,23 @@
 class SkipMixin(LoggingMixin):
     """A Mixin to skip Tasks Instances"""
 
-    def _set_state_to_skipped(self, dag_run, execution_date, tasks, session):
+    def _set_state_to_skipped(self, dag_run, tasks, session):

Review comment:
       ```suggestion
       def _set_state_to_skipped(self, dag_run: DagRun, tasks: Iterable[BaseOperator], session: Session):
   ```
   
   This might help catch lingering usages still not passing in a DagRun… (Also need to annotate `skip` below)

##########
File path: airflow/models/dagrun.py
##########
@@ -675,17 +671,15 @@ def verify_integrity(self, session: Session = None):
 
             if task.task_id not in task_ids:
                 Stats.incr(f"task_instance_created-{task.task_type}", 1, 1)
-                ti = TI(task, self.execution_date)
+                ti = TI(task, execution_date=None, run_id=self.run_id)
                 task_instance_mutation_hook(ti)
                 session.add(ti)
 
         try:
             session.flush()
         except IntegrityError as err:
             self.log.info(str(err))
-            self.log.info(
-                'Hit IntegrityError while creating the TIs for ' f'{dag.dag_id} - {self.execution_date}.'
-            )
+            self.log.info('Hit IntegrityError while creating the TIs for ' f'{dag.dag_id} - {self.run_id}.')

Review comment:
       ```suggestion
               self.log.info('Hit IntegrityError while creating the TIs for %s - %s.', dag.dag_id, self.run_id)
   ```
   
   ?

##########
File path: airflow/cli/commands/task_command.py
##########
@@ -51,15 +51,17 @@
 from airflow.utils.session import create_session, provide_session
 
 
-def _get_ti(task, exec_date_or_run_id):
+@provide_session
+def _get_ti(task, exec_date_or_run_id, session):
     """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
-    dag_run = task.dag.get_dagrun(run_id=exec_date_or_run_id)
+    dag_run = task.dag.get_dagrun(run_id=exec_date_or_run_id, session=session)
     if not dag_run:
         try:
             execution_date = timezone.parse(exec_date_or_run_id)
-            ti = TaskInstance(task, execution_date)
-            ti.refresh_from_db()
-            return ti
+            dag_run.session.query(DagRun).filter(
+                DagRun.dag_id == task.dag_id,
+                DagRun.execution_date == execution_date,
+            ).one()
         except (ParserError, TypeError):

Review comment:
       Can these still happen or should we change this to `NoResultFound`?

##########
File path: airflow/models/baseoperator.py
##########
@@ -1257,24 +1257,46 @@ def get_flat_relatives(self, upstream: bool = False):
         dag: DAG = self._dag
         return list(map(lambda task_id: dag.task_dict[task_id], self.get_flat_relative_ids(upstream)))
 
+    @provide_session
     def run(
         self,
         start_date: Optional[datetime] = None,
         end_date: Optional[datetime] = None,
         ignore_first_depends_on_past: bool = True,
         ignore_ti_state: bool = False,
         mark_success: bool = False,
+        session: Session = None,
     ) -> None:
         """Run a set of task instances for a date range."""
         start_date = start_date or self.start_date
         end_date = end_date or self.end_date or timezone.utcnow()
 
         for info in self.dag.iter_dagrun_infos_between(start_date, end_date, align=False):
             ignore_depends_on_past = info.logical_date == start_date and ignore_first_depends_on_past
-            TaskInstance(self, info.logical_date).run(
+            try:
+                ti = TaskInstance(self, info.logical_date)
+            except DagRunNotFound:

Review comment:
       It’s pretty surprising for a class init to throw an exception. I understand it’s unavoidable if `logical_date` is passed in in general, but perhaps her we can avoid doing it by getting the `DagRun` instance beforehand instead of passing in `info.logical_date`? We can also bulk-read the `run_id`s before the loop instead of reading them one by one.
   
   Something like
   
   ```python
   logical_dates = [
       info.logical_date
       for info in self.dag.iter_dagrun_infos_between(start_date, end_date, align=False)
   ]
   dagruns = {
       run.execution_date: run
       for run in session.query(DagRun).filter(DagRun.execution_date.in_(logical_dates))
   }
   for date in logical_dates:
       try:
           run = dagruns[date]
       except KeyError:
           # Create a "fake" run…
       ti = TaskInstance(self, run_id=run.run_id)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692775884



##########
File path: airflow/models/skipmixin.py
##########
@@ -37,36 +38,23 @@
 class SkipMixin(LoggingMixin):
     """A Mixin to skip Tasks Instances"""
 
-    def _set_state_to_skipped(self, dag_run, execution_date, tasks, session):
+    def _set_state_to_skipped(self, dag_run, tasks, session):

Review comment:
       2a7f8a411




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692768322



##########
File path: airflow/models/taskinstance.py
##########
@@ -1092,13 +1102,16 @@ def get_dagrun(self, session: Session = None):
         :param session: SQLAlchemy ORM Session
         :return: DagRun
         """
+        info = inspect(self)
+        if info.attrs.dag_run.loaded_value is not NO_VALUE:
+            return self.dag_run
+
         from airflow.models.dagrun import DagRun  # Avoid circular import
 
-        dr = (
-            session.query(DagRun)
-            .filter(DagRun.dag_id == self.dag_id, DagRun.execution_date == self.execution_date)
-            .first()
-        )
+        dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
+
+        # Record it in the instance for next timme. This means that `self.execution_date` will work correctly
+        set_committed_value(self, 'dag_run', dr)

Review comment:
       Yes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-912219603


   ```
     ____________ ERROR at setup of TestSentryHook.test_add_breadcrumbs _____________
     
     self = <test_sentry.TestSentryHook object at 0x7fadb2918160>
     dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 0x7fadb80762e0>
     
         @pytest.fixture
         def task_instance(self, dag_maker):
             # Mock the Dag
             with dag_maker(DAG_ID):
                 task = PythonOperator(task_id=TASK_ID, python_callable=int)
         
     >       dr = dag_maker.create_dagrun(execution_date=EXECUTION_DATE)
   
     tests/core/test_sentry.py:68: 
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
     tests/conftest.py:543: in create_dagrun
         self.dag_run = dag.create_dagrun(**kwargs)
     airflow/utils/session.py:67: in wrapper
         return func(*args, **kwargs)
     airflow/models/dag.py:2178: in create_dagrun
         run.verify_integrity(session=session)
     airflow/utils/session.py:67: in wrapper
         return func(*args, **kwargs)
     airflow/models/dagrun.py:683: in verify_integrity
         session.flush()
   ...
   sqlalchemy.orm.exc.FlushError: New instance <TaskInstance at 0x7fadb29727f0> with identity key (<class 'airflow.models.taskinstance.TaskInstance'>, ('test_task', 'test_dag', 'test'), None) conflicts with persistent instance <TaskInstance at 0x7fadb22b6e50>
   ```
   
   This probably needs to be fixed. I’m suspecting this is due to some test now cleaning up the db properly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692776689



##########
File path: airflow/models/baseoperator.py
##########
@@ -1257,24 +1257,46 @@ def get_flat_relatives(self, upstream: bool = False):
         dag: DAG = self._dag
         return list(map(lambda task_id: dag.task_dict[task_id], self.get_flat_relative_ids(upstream)))
 
+    @provide_session
     def run(
         self,
         start_date: Optional[datetime] = None,
         end_date: Optional[datetime] = None,
         ignore_first_depends_on_past: bool = True,
         ignore_ti_state: bool = False,
         mark_success: bool = False,
+        session: Session = None,
     ) -> None:
         """Run a set of task instances for a date range."""
         start_date = start_date or self.start_date
         end_date = end_date or self.end_date or timezone.utcnow()
 
         for info in self.dag.iter_dagrun_infos_between(start_date, end_date, align=False):
             ignore_depends_on_past = info.logical_date == start_date and ignore_first_depends_on_past
-            TaskInstance(self, info.logical_date).run(
+            try:
+                ti = TaskInstance(self, info.logical_date)
+            except DagRunNotFound:

Review comment:
       https://github.com/apache/airflow/commit/2a7f8a411ee43b0ee006e3fb046f559c2087b1d9#diff-848f325ace55b3504e8052fecdb53c0f295c891b67a6d90e9341cbe79cc545fbR1281
   
   Ah I thought you meant FakeDagRun like we had previously in get_template_context.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-902667086


   I'll (try) and keep my changes to this as fixup commits, so that you can review-commit-by-commit and not have to rereview everyhing as I fix up more of tests.
   
   Thanks for woking through this @uranusjr 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692775501



##########
File path: docs/apache-airflow/migrations-ref.rst
##########
@@ -23,6 +23,8 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``7b2661a43ba3`` (head)        | ``142555e44c17`` |                 | Change TaskInstance and TaskReschedule tables from execution_date to run_id.          |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``142555e44c17`` (head)        | ``54bebd308c5f`` |                 | Add ``data_interval_start`` and ``data_interval_end`` to ``DagRun``                   |

Review comment:
       2a7f8a411

##########
File path: airflow/models/taskinstance.py
##########
@@ -389,27 +399,53 @@ class TaskInstance(Base, LoggingMixin):
         innerjoin=True,
     )
 
-    def __init__(self, task, execution_date: datetime, state: Optional[str] = None):
+    dag_run = relationship("DagRun", back_populates="task_instances")
+
+    execution_date = association_proxy("dag_run", "execution_date")
+
+    def __init__(
+        self, task, execution_date: Optional[datetime] = None, run_id: str = None, state: Optional[str] = None
+    ):
         super().__init__()
         self.dag_id = task.dag_id
         self.task_id = task.task_id
         self.refresh_from_task(task)
         self._log = logging.getLogger("airflow.task")
 
-        # make sure we have a localized execution_date stored in UTC
-        if execution_date and not timezone.is_localized(execution_date):
-            self.log.warning(
-                "execution date %s has no timezone information. Using default from dag or system",
-                execution_date,
+        if execution_date:
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            warnings.warn(
+                "Passing an execution_date to `TaskInstance()` is deprecated in favour of passing a run_id",
+                DeprecationWarning,
+                # Stack level is 4 because SQLA adds some wrappers around the constructor
+                stacklevel=4,
             )
-            if self.task.has_dag():
-                execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
-            else:
-                execution_date = timezone.make_aware(execution_date)
+            # make sure we have a localized execution_date stored in UTC
+            if execution_date and not timezone.is_localized(execution_date):
+                self.log.warning(
+                    "execution date %s has no timezone information. Using default from dag or system",
+                    execution_date,
+                )
+                if self.task.has_dag():
+                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                else:
+                    execution_date = timezone.make_aware(execution_date)
 
-            execution_date = timezone.convert_to_utc(execution_date)
+                execution_date = timezone.convert_to_utc(execution_date)
+            with create_session() as session:
+                try:
+                    (run_id,) = (
+                        session.query(DagRun.run_id)
+                        .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                        .one()
+                    )
+                except NoResultFound:

Review comment:
       2a7f8a411

##########
File path: airflow/models/taskinstance.py
##########
@@ -1723,30 +1736,22 @@ def is_eligible_to_retry(self):
 
         return self.task.retries and self.try_number <= self.max_tries
 
-    @provide_session
-    def get_template_context(self, session=None) -> Context:
+    def get_template_context(self, session: Session = None) -> Context:
         """Return TI Context"""
+        # Do not use provide_session here -- it expunges everything on exit!
+        if not session:
+            session = settings.Session()
         task = self.task
         from airflow import macros
 
         integrate_macros_plugins()
 
-        dag_run = self.get_dagrun()
-
-        # FIXME: Many tests don't create a DagRun. We should fix the tests.
-        if dag_run is None:
-            FakeDagRun = namedtuple(
-                "FakeDagRun",
-                # A minimal set of attributes to keep things working.
-                "conf data_interval_start data_interval_end external_trigger run_id",
-            )
-            dag_run = FakeDagRun(
-                conf=None,
-                data_interval_start=None,
-                data_interval_end=None,
-                external_trigger=False,
-                run_id="",
-            )
+        params = {}  # type: Dict[str, Any]
+        # Ensure that the dag_run is loaded -- otherwise `self.execution_date` may not work
+        dag_run = self.get_dagrun(session)
+        if hasattr(task, 'dag'):
+            if task.dag.params:
+                params.update(task.dag.params)
 
         params = {}  # type: Dict[str, Any]

Review comment:
       2a7f8a411




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692998460



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -645,16 +645,14 @@ def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
             # Sorting by execution date first
             sorted_ti_keys = sorted(
                 set_ti_keys,
-                key=lambda ti_key: (ti_key.execution_date, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
+                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),

Review comment:
       Already updated in follow on pr




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692768914



##########
File path: airflow/www/utils.py
##########
@@ -473,6 +474,12 @@ def is_extendedjson(self, col_name):
             )
         return False
 
+    def get_col_default(self, col_name: str) -> Any:
+        if col_name not in self.list_columns:
+            # Handle AssociationProxy etc, or anything that isn't a "real" column
+            return None
+        return super().get_col_default(col_name)

Review comment:
       Quite.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17719: Change TaskInstance and TaskReschedule PK from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r702845592



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -434,15 +433,15 @@ def _process_backfill_task_instances(
             # determined deadlocked while they are actually
             # waiting for their upstream to finish
             @provide_session
-            def _per_task_process(key, ti, session=None):
+            def _per_task_process(key, ti: TaskInstance, session=None):
                 ti.refresh_from_db(lock_for_update=True, session=session)
 
                 task = self.dag.get_task(ti.task_id, include_subdags=True)
                 ti.task = task
 
-                ignore_depends_on_past = self.ignore_first_depends_on_past and ti.execution_date == (
-                    start_date or ti.start_date
-                )
+                ignore_depends_on_past = self.ignore_first_depends_on_past and ti.get_dagrun(
+                    session
+                ).execution_date == (start_date or ti.start_date)

Review comment:
       ```suggestion
                   dagrun = self.ignore_first_depends_on_past and ti.get_dagrun(session=session)
                   ignore_depends_on_past = dagrun.execution_date == (start_date or ti.start_date)
   ```
   
   Gosh that Black formatting is awful.

##########
File path: airflow/dag_processing/processor.py
##########
@@ -387,14 +389,10 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
             .subquery('sq')
         )
 
-        max_tis: List[TI] = (
-            session.query(TI)
-            .filter(
-                TI.dag_id == dag.dag_id,
-                TI.task_id == qry.c.task_id,
-                TI.execution_date == qry.c.max_ti,
-            )
-            .all()
+        max_tis: Iterator[TI] = session.query(TI).filter(
+            TI.dag_id == dag.dag_id,
+            TI.task_id == qry.c.task_id,
+            TI.execution_date == qry.c.max_ti,
         )

Review comment:
       ```suggestion
           max_tis: Iterator[TI] = (
               session.query(TI)
               .join(TI.dag_run)
               .filter(
                   TI.dag_id == dag.dag_id,
                   TI.task_id == qry.c.task_id,
                   DR.execution_date == qry.c.max_ti,
               )
          )
   ```
   
   I think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692997854



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -645,16 +645,14 @@ def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
             # Sorting by execution date first
             sorted_ti_keys = sorted(
                 set_ti_keys,
-                key=lambda ti_key: (ti_key.execution_date, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
+                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
             )
             return tabulate(sorted_ti_keys, headers=["DAG ID", "Task ID", "Execution date", "Try number"])
 
         def tabulate_tis_set(set_tis: Set[TaskInstance]) -> str:
             # Sorting by execution date first
-            sorted_tis = sorted(
-                set_tis, key=lambda ti: (ti.execution_date, ti.dag_id, ti.task_id, ti.try_number)
-            )
-            tis_values = ((ti.dag_id, ti.task_id, ti.execution_date, ti.try_number) for ti in sorted_tis)
+            sorted_tis = sorted(set_tis, key=lambda ti: (ti.run_id, ti.dag_id, ti.task_id, ti.try_number))
+            tis_values = ((ti.dag_id, ti.task_id, ti.run_id, ti.try_number) for ti in sorted_tis)

Review comment:
       Would be good to keep ordering by execution_date or start_date, run_id feels odds for sorting here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692754650



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -51,15 +51,17 @@
 from airflow.utils.session import create_session, provide_session
 
 
-def _get_ti(task, exec_date_or_run_id):
+@provide_session
+def _get_ti(task, exec_date_or_run_id, session):
     """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
-    dag_run = task.dag.get_dagrun(run_id=exec_date_or_run_id)
+    dag_run = task.dag.get_dagrun(run_id=exec_date_or_run_id, session=session)
     if not dag_run:
         try:
             execution_date = timezone.parse(exec_date_or_run_id)
-            ti = TaskInstance(task, execution_date)
-            ti.refresh_from_db()
-            return ti
+            dag_run.session.query(DagRun).filter(
+                DagRun.dag_id == task.dag_id,
+                DagRun.execution_date == execution_date,
+            ).one()
         except (ParserError, TypeError):

Review comment:
       These can happen from the `timezone.parse` on L60, but yes we should add NoResultFound too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r693398216



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -189,80 +189,6 @@ def is_alive(self, grace_multiplier: Optional[float] = None) -> bool:
             and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < scheduler_health_check_threshold
         )
 
-    @provide_session
-    def _change_state_for_tis_without_dagrun(

Review comment:
       I think this can happen if (for example) there is a crash in the worker early on in the "boot" process, so it can't report it's own status in the DB.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692996827



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -645,16 +645,14 @@ def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
             # Sorting by execution date first
             sorted_ti_keys = sorted(
                 set_ti_keys,
-                key=lambda ti_key: (ti_key.execution_date, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
+                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
             )
             return tabulate(sorted_ti_keys, headers=["DAG ID", "Task ID", "Execution date", "Try number"])
 
         def tabulate_tis_set(set_tis: Set[TaskInstance]) -> str:
             # Sorting by execution date first
-            sorted_tis = sorted(
-                set_tis, key=lambda ti: (ti.execution_date, ti.dag_id, ti.task_id, ti.try_number)
-            )
-            tis_values = ((ti.dag_id, ti.task_id, ti.execution_date, ti.try_number) for ti in sorted_tis)
+            sorted_tis = sorted(set_tis, key=lambda ti: (ti.run_id, ti.dag_id, ti.task_id, ti.try_number))
+            tis_values = ((ti.dag_id, ti.task_id, ti.run_id, ti.try_number) for ti in sorted_tis)

Review comment:
       same as above L653 needs to be updated if we sort by run_id 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-902669703


   `+1,423 −1,576`
   
   Still managed to delete more than I've added. So far anyway :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr closed pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
uranusjr closed pull request #17719:
URL: https://github.com/apache/airflow/pull/17719


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-911728778


   🤞🏻 Lets see how the full matrix goes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-902237213


   I'm not quite expecting _all_ tests to pass on this yet, but I'd like a review on this, so I'm marking it ready while I work on fixing up the remaining tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692719030



##########
File path: docs/apache-airflow/migrations-ref.rst
##########
@@ -23,6 +23,8 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``7b2661a43ba3`` (head)        | ``142555e44c17`` |                 | Change TaskInstance and TaskReschedule tables from execution_date to run_id.          |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``142555e44c17`` (head)        | ``54bebd308c5f`` |                 | Add ``data_interval_start`` and ``data_interval_end`` to ``DagRun``                   |

Review comment:
       ```suggestion
   | ``142555e44c17``               | ``54bebd308c5f`` |                 | Add ``data_interval_start`` and ``data_interval_end`` to ``DagRun``                   |
   ```

##########
File path: airflow/models/taskinstance.py
##########
@@ -1723,30 +1736,22 @@ def is_eligible_to_retry(self):
 
         return self.task.retries and self.try_number <= self.max_tries
 
-    @provide_session
-    def get_template_context(self, session=None) -> Context:
+    def get_template_context(self, session: Session = None) -> Context:
         """Return TI Context"""
+        # Do not use provide_session here -- it expunges everything on exit!
+        if not session:
+            session = settings.Session()
         task = self.task
         from airflow import macros
 
         integrate_macros_plugins()
 
-        dag_run = self.get_dagrun()
-
-        # FIXME: Many tests don't create a DagRun. We should fix the tests.
-        if dag_run is None:
-            FakeDagRun = namedtuple(
-                "FakeDagRun",
-                # A minimal set of attributes to keep things working.
-                "conf data_interval_start data_interval_end external_trigger run_id",
-            )
-            dag_run = FakeDagRun(
-                conf=None,
-                data_interval_start=None,
-                data_interval_end=None,
-                external_trigger=False,
-                run_id="",
-            )
+        params = {}  # type: Dict[str, Any]
+        # Ensure that the dag_run is loaded -- otherwise `self.execution_date` may not work
+        dag_run = self.get_dagrun(session)
+        if hasattr(task, 'dag'):
+            if task.dag.params:
+                params.update(task.dag.params)
 
         params = {}  # type: Dict[str, Any]

Review comment:
       The first `params` is shadowed. Accidental?

##########
File path: airflow/models/taskinstance.py
##########
@@ -389,27 +399,53 @@ class TaskInstance(Base, LoggingMixin):
         innerjoin=True,
     )
 
-    def __init__(self, task, execution_date: datetime, state: Optional[str] = None):
+    dag_run = relationship("DagRun", back_populates="task_instances")
+
+    execution_date = association_proxy("dag_run", "execution_date")
+
+    def __init__(
+        self, task, execution_date: Optional[datetime] = None, run_id: str = None, state: Optional[str] = None
+    ):
         super().__init__()
         self.dag_id = task.dag_id
         self.task_id = task.task_id
         self.refresh_from_task(task)
         self._log = logging.getLogger("airflow.task")
 
-        # make sure we have a localized execution_date stored in UTC
-        if execution_date and not timezone.is_localized(execution_date):
-            self.log.warning(
-                "execution date %s has no timezone information. Using default from dag or system",
-                execution_date,
+        if execution_date:
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            warnings.warn(
+                "Passing an execution_date to `TaskInstance()` is deprecated in favour of passing a run_id",
+                DeprecationWarning,
+                # Stack level is 4 because SQLA adds some wrappers around the constructor
+                stacklevel=4,
             )
-            if self.task.has_dag():
-                execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
-            else:
-                execution_date = timezone.make_aware(execution_date)
+            # make sure we have a localized execution_date stored in UTC
+            if execution_date and not timezone.is_localized(execution_date):
+                self.log.warning(
+                    "execution date %s has no timezone information. Using default from dag or system",
+                    execution_date,
+                )
+                if self.task.has_dag():
+                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                else:
+                    execution_date = timezone.make_aware(execution_date)
 
-            execution_date = timezone.convert_to_utc(execution_date)
+                execution_date = timezone.convert_to_utc(execution_date)
+            with create_session() as session:
+                try:
+                    (run_id,) = (
+                        session.query(DagRun.run_id)
+                        .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                        .one()
+                    )
+                except NoResultFound:

Review comment:
       ```suggestion
                   run_id = (
                       session.query(DagRun.run_id)
                       .filter_by(dag_id=self.dag_id, execution_date=execution_date)
                       .scalar()
                   )
                   if run_id is None:
   ```

##########
File path: airflow/ti_deps/deps/dagrun_exists_dep.py
##########
@@ -29,27 +29,9 @@ class DagrunRunningDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
-        dag = ti.task.dag
-        dagrun = ti.get_dagrun(session)
-        if not dagrun:
-            # The import is needed here to avoid a circular dependency
-            from airflow.models.dagrun import DagRun
-
-            running_dagruns = DagRun.find(
-                dag_id=dag.dag_id, state=State.RUNNING, external_trigger=False, session=session
+        dr = ti.get_dagrun(session)
+        if dr.state != State.RUNNING:
+            yield self._failing_status(
+                reason="Task instance's dagrun was not in the 'running' state but in "
+                "the state '{}'.".format(dr.state)
             )
-
-            if len(running_dagruns) >= dag.max_active_runs:
-                reason = (
-                    "The maximum number of active dag runs ({}) for this task "
-                    "instance's DAG '{}' has been reached.".format(dag.max_active_runs, ti.dag_id)
-                )
-            else:
-                reason = "Unknown reason"
-            yield self._failing_status(reason=f"Task instance's dagrun did not exist: {reason}.")

Review comment:
       Why is it OK to drop this logic?

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -60,8 +61,12 @@ class BigQueryConsoleLink(BaseOperatorLink):
     name = 'BigQuery Console'
 
     def get_link(self, operator, dttm):
-        ti = TaskInstance(task=operator, execution_date=dttm)
-        job_id = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
+        job_id = XCom.get_one(
+            dag_ids=operator.dag.dag_id,
+            task_ids=operator.task_id,
+            execution_date=dttm,
+            key='job_id',
+        )

Review comment:
       Also #16370. Should we deprecate using `execution_date` in `XCom.get_one()` and `XCom.get_many()`? If we do, how do we keep backward compatibility here while avoid emitting the DeprecationWarning?

##########
File path: airflow/models/taskinstance.py
##########
@@ -1092,13 +1102,16 @@ def get_dagrun(self, session: Session = None):
         :param session: SQLAlchemy ORM Session
         :return: DagRun
         """
+        info = inspect(self)
+        if info.attrs.dag_run.loaded_value is not NO_VALUE:
+            return self.dag_run
+
         from airflow.models.dagrun import DagRun  # Avoid circular import
 
-        dr = (
-            session.query(DagRun)
-            .filter(DagRun.dag_id == self.dag_id, DagRun.execution_date == self.execution_date)
-            .first()
-        )
+        dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
+
+        # Record it in the instance for next timme. This means that `self.execution_date` will work correctly
+        set_committed_value(self, 'dag_run', dr)

Review comment:
       ```suggestion
           # Record it in the instance for next time. This means that `self.execution_date` will work correctly
           set_committed_value(self, 'dag_run', dr)
   ```
   
   This also means `info.attrs.dag_run` will become loaded next time this is called, right?

##########
File path: airflow/ti_deps/dep_context.py
##########
@@ -85,23 +84,16 @@ def __init__(
         self.ignore_ti_state = ignore_ti_state
         self.finished_tasks = finished_tasks
 
-    def ensure_finished_tasks(self, dag, execution_date: pendulum.DateTime, session: Session):
+    def ensure_finished_tasks(self, dag_run, session: Session):

Review comment:
       ```suggestion
       def ensure_finished_tasks(self, dag_run: "DagRun", session: Session):
   ```
   
   (And an import of course.)

##########
File path: airflow/models/taskinstance.py
##########
@@ -2241,8 +2247,10 @@ def xcom_pull(
         if dag_id is None:
             dag_id = self.dag_id
 
+        execution_date = self.get_dagrun(session).execution_date
+
         query = XCom.get_many(
-            execution_date=self.execution_date,
+            execution_date=execution_date,

Review comment:
       Backref-ing #16370 so we remember to change this when XCom uses run_id.

##########
File path: tests/executors/test_celery_executor.py
##########
@@ -324,11 +322,11 @@ def test_try_adopt_task_instances(self):
             task_1 = BaseOperator(task_id="task_1", start_date=start_date)
             task_2 = BaseOperator(task_id="task_2", start_date=start_date)
 
-        ti1 = TaskInstance(task=task_1, execution_date=exec_date)
+        ti1 = TaskInstance(task=task_1, run_id=None)

Review comment:
       I feel we should create a real DagRun here that uses `exec_date` instead.

##########
File path: airflow/www/utils.py
##########
@@ -473,6 +474,12 @@ def is_extendedjson(self, col_name):
             )
         return False
 
+    def get_col_default(self, col_name: str) -> Any:
+        if col_name not in self.list_columns:
+            # Handle AssociationProxy etc, or anything that isn't a "real" column
+            return None
+        return super().get_col_default(col_name)

Review comment:
       I’m not sure what to think about this (not your code but the fact we need this block of code in the first place).

##########
File path: tests/executors/test_celery_executor.py
##########
@@ -363,8 +360,8 @@ def test_check_for_stalled_adopted_tasks(self):
             task_1 = BaseOperator(task_id="task_1", start_date=start_date)
             task_2 = BaseOperator(task_id="task_2", start_date=start_date)
 
-        key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, exec_date, try_number)
-        key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, exec_date, try_number)
+        key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, "runid", try_number)
+        key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, "runid", try_number)

Review comment:
       Less sure about this; is creating real DagRun and TaskInstance objects worthwhile?

##########
File path: tests/jobs/test_triggerer_job.py
##########
@@ -305,22 +303,14 @@ def test_invalid_trigger(session):
     session.commit()
 
     # Create the test DAG and task
-    with DAG(
-        dag_id='test_invalid_trigger',
-        start_date=timezone.datetime(2016, 1, 1),
-        schedule_interval='@once',
-        max_active_runs=1,
-    ):
-        task1 = DummyOperator(task_id='dummy1')
+    with dag_maker(dag_id='test_invalid_trigger'):
+        DummyOperator(task_id='dummy1')
 
+    dr = dag_maker.create_dagrun()

Review comment:
       Do we need to pass `session` in?

##########
File path: airflow/www/views.py
##########
@@ -1096,7 +1097,8 @@ def rendered_k8s(self):
         logging.info("Retrieving rendered templates.")
         dag = current_app.dag_bag.get_dag(dag_id)
         task = dag.get_task(task_id)
-        ti = models.TaskInstance(task=task, execution_date=dttm)
+        dag_run = dag.get_dagrun(execution_date=execution_date)
+        ti = dag_run.get_task_instance(task_id=task.task_id)

Review comment:
       At some point we should change these views to use `run_id` right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-912339057


   Never cleaned up, but previously the test was not saving the TaskInstance to db at all and now it is (and generating a constraint error).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-901853325


   > I'm wondering if https://github.com/apache/airflow/blob/4da4c186ecdcdae308fe8b4a7994c21faf42bc96/airflow/www/views.py#L1485-L1486 need a change too?
   
   Yes it should. It'll still work as it is but that relies on the compat shims


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r701408236



##########
File path: UPDATING.md
##########
@@ -228,6 +228,14 @@ Now that the DAG parser syncs DAG permissions there is no longer a need for manu
 
 In addition, the `/refresh` and `/refresh_all` webserver endpoints have also been removed.
 
+### TaskInstances now *require* a DagRun
+
+Under normal operation every TaskInstance row in the database would have DagRun row too, but it was possible to manually delete the DagRun and Airflow would still execute the TaskInstances.
+
+In Airflow 2.2 we have changed this and now there is a database-level foreign key constraint ensuring that every TaskInstance has a DagRun row.
+
+Before updating to this 2.2 release you will have to manually resolve any inconsistencies (add back DagRun rows, or delete TaskInstances) if you have any "dangling" TaskInstance" rows.
+

Review comment:
       Probably should yes (though having "extra" config items doesn't cause any harm)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr closed pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
uranusjr closed pull request #17719:
URL: https://github.com/apache/airflow/pull/17719


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17719: Change TaskInstance and TaskReschedule from execution_date to run_id

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#issuecomment-903175600


   ```
     E             Differing items:
     E             {'is_extra_encrypted': True} != {'is_extra_encrypted': False}
     E             {'is_encrypted': True} != {'is_encrypted': False}
   ```
   
   I don't see how I could have caused a failure in `tests/cli/commands/test_connection_command.py` 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org