You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/07 22:05:40 UTC

[GitHub] [airflow] dstandish opened a new pull request, #24908: WIP - Dataset event log

dstandish opened a new pull request, #24908:
URL: https://github.com/apache/airflow/pull/24908

   exploration adding dataset event [log] table which stores all events, even those which are ignored for purpose of dag run queue records (e.g. say the dataset has already been updated once and the dag run is still waiting on other datasets)
   
   opens up some possibilities for more elaborate triggering behavior. also, provides a way to scrutinize / surface / visualize all historical dataset events (the queue table's records are ephemeral)
   
   even before we have a chance to make enhancements beyond 2.4, i suspect it would be pretty easy for a user to write a long-running deferrable operator that would consume dataset events and do things with them, e.g. implement more complex dag run triggering rules based on all the full event history.
   
   one component i'm just starting to play with and think about is how to get information, to the consuming dag, about what actually happened in the dataset event -- e.g. was it appended to, or replaced, this kind of thing. you can see in the change to example_datasets.py i'm using post_execute to insert some metadata in a dataset event payload.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917272208


##########
airflow/models/taskinstance.py:
##########
@@ -1513,10 +1513,10 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
-            self._create_dataset_dag_run_queue_records(session=session)
+            self._create_dataset_dag_run_queue_records(context=context, session=session)
             session.commit()
 
-    def _create_dataset_dag_run_queue_records(self, *, session):
+    def _create_dataset_dag_run_queue_records(self, *, context: Context = None, session: Session):

Review Comment:
   SO @jedcunningham  this is a remnant that reveals part of the secret master plan.  
   
   We [ultimately, eventually] need some way to allow `the tasks that do the updates` to transmit metadata to `the tasks that handle the datasets that were updated`.
   
   When starting this branch i was playing around with injecting that metadata into task context (from the _writing_ task), then here grabbing that metadata out of context and storing in in the `extra` field when stamping the dataset event.
   
   BUT we're not adding that feature in this PR so, chopping is a good call. I just missed it when pulling that stuff out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #24908: Dataset event log

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917164753


##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))

Review Comment:
   Any reason why this has to have `source_` prefix? I feel it's good without the prefix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on pull request #24908: Dataset event log

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on PR #24908:
URL: https://github.com/apache/airflow/pull/24908#issuecomment-1179299482

   Might be worth tossing `log` in the name as well, `dataset_event_log`, to help distinguish it from `dataset_dag_run_queue` for those less familiar with this area. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917271875


##########
tests/models/test_taskinstance.py:
##########
@@ -1499,10 +1499,25 @@ def test_outlet_datasets(self, create_task_instance):
         ti._run_raw_task()
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
+
+        # check that one queue record created for each dag that depends on dataset 1
         assert session.query(DatasetDagRunQueue.target_dag_id).filter(
             DatasetTaskRef.dag_id == dag1.dag_id, DatasetTaskRef.task_id == 'upstream_task_1'
         ).all() == [('dag3',), ('dag4',), ('dag5',)]
 
+        # check that one event record created for dataset1 and this TI
+        assert session.query(Dataset.uri).join(DatasetEvent.dataset).filter(
+            DatasetEvent.source_task_instance == ti
+        ).one() == ('s3://dag1/output_1.txt',)
+
+        # check that no other dataset events recorded
+        assert (
+                session.query(Dataset.uri)
+                .join(DatasetEvent.dataset)
+                .filter(DatasetEvent.source_task_instance == ti)
+                .count()
+            ) == 1

Review Comment:
   excellent -- thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #24908: Dataset event log

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #24908:
URL: https://github.com/apache/airflow/pull/24908#issuecomment-1179357417

   > Might be worth tossing `log` in the name as well, `dataset_event_log`, to help distinguish it from `dataset_dag_run_queue` for those less familiar with this area.
   
   I thought about this too, and it I think it would be a reasonable way to go.  However, the reason I leaned towards not using the word "log" is because, logs tend to be things that are not needed for the operation of your app.  They are there to troubleshoot etc.  But I think this table is / will be more than that -- it's not merely a log, e.g. not just for auditing or troubleshooting.  It's going to be surfaced  in the UI, for one, and I suspect that these events ultimately I think will be accessible by TIs, sort of like XCom.  
   
   They are events, operational in nature, more akin to XCom than Log.  And I think omitting the "log" word signals that aspect better.
   
   BUT, if general consensus is in the other direction, I am happy to go along.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917280784


##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))

Review Comment:
   yeah like it's clarifying that this TI is the source of the event -- maybe in future we have events that have a specific target? dunno



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #24908: Dataset event log

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917218718


##########
airflow/models/taskinstance.py:
##########
@@ -1513,10 +1513,10 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
-            self._create_dataset_dag_run_queue_records(session=session)
+            self._create_dataset_dag_run_queue_records(context=context, session=session)
             session.commit()
 
-    def _create_dataset_dag_run_queue_records(self, *, session):
+    def _create_dataset_dag_run_queue_records(self, *, context: Context = None, session: Session):

Review Comment:
   ```suggestion
       def _create_dataset_dag_run_queue_records(self, *, session: Session) -> None:
   ```
   
   Wait, `context` isn't even used? Oh, and we missed the return type.



##########
airflow/models/taskinstance.py:
##########
@@ -1513,10 +1513,10 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
-            self._create_dataset_dag_run_queue_records(session=session)
+            self._create_dataset_dag_run_queue_records(context=context, session=session)

Review Comment:
   ```suggestion
               self._create_dataset_dag_run_queue_records(session=session)
   ```



##########
tests/models/test_taskinstance.py:
##########
@@ -1499,10 +1499,25 @@ def test_outlet_datasets(self, create_task_instance):
         ti._run_raw_task()
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
+
+        # check that one queue record created for each dag that depends on dataset 1
         assert session.query(DatasetDagRunQueue.target_dag_id).filter(
             DatasetTaskRef.dag_id == dag1.dag_id, DatasetTaskRef.task_id == 'upstream_task_1'
         ).all() == [('dag3',), ('dag4',), ('dag5',)]
 
+        # check that one event record created for dataset1 and this TI
+        assert session.query(Dataset.uri).join(DatasetEvent.dataset).filter(
+            DatasetEvent.source_task_instance == ti
+        ).one() == ('s3://dag1/output_1.txt',)
+
+        # check that no other dataset events recorded
+        assert (
+                session.query(Dataset.uri)
+                .join(DatasetEvent.dataset)
+                .filter(DatasetEvent.source_task_instance == ti)
+                .count()
+            ) == 1

Review Comment:
   ```suggestion
               session.query(Dataset.uri)
               .join(DatasetEvent.dataset)
               .filter(DatasetEvent.source_task_instance == ti)
               .count()
           ) == 1
   ```
   
   This fixes one of the failing static checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on a diff in pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917277421


##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))

Review Comment:
   Speaking of, do we plan to record downstream events for when a dataset triggers a dag run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #24908: Dataset event log

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917081354


##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
+    created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
+
+    __tablename__ = "dataset_event"
+    __table_args__ = (
+        Index('idx_dataset_id_created_at', dataset_id, created_at, mssql_clustered=True),
+        {'sqlite_autoincrement': True},  # ensures PK values not reused
+    )
+
+    source_task_instance = relationship(
+        "TaskInstance",
+        primaryjoin="""and_(
+            DatasetEvent.source_dag_id == foreign(TaskInstance.dag_id),
+            DatasetEvent.source_run_id == foreign(TaskInstance.run_id),
+            DatasetEvent.source_task_id == foreign(TaskInstance.task_id),
+            DatasetEvent.source_map_index == foreign(TaskInstance.map_index),
+        )""",
+        viewonly=True,
+        lazy="select",
+        uselist=False,
+    )
+    source_dag_run = relationship(
+        "DagRun",
+        primaryjoin="""and_(
+            DatasetEvent.source_dag_id == foreign(DagRun.dag_id),
+            DatasetEvent.source_run_id == foreign(DagRun.run_id),
+        )""",
+        viewonly=True,
+        lazy="select",
+        uselist=False,
+    )
+    dataset = relationship(
+        Dataset,
+        primaryjoin="DatasetEvent.dataset_id == foreign(Dataset.id)",
+        viewonly=True,
+        lazy="select",
+        uselist=False,
+    )
+
+    def __eq__(self, other):
+        if isinstance(other, self.__class__):
+            return self.dataset_id == other.dataset_id and self.created_at == other.created_at
+        else:
+            return NotImplemented
+
+    def __hash__(self):
+        return hash((self.dataset_id, self.created_at))
+
+    def __repr__(self):

Review Comment:
   ```suggestion
       def __repr__(self) -> str:
   ```



##########
airflow/models/taskinstance.py:
##########
@@ -1528,6 +1528,16 @@ def _create_dataset_dag_run_queue_records(self, *, session):
                     continue
                 downstream_dag_ids = [x.dag_id for x in dataset.dag_references]
                 self.log.debug("downstream dag ids %s", downstream_dag_ids)
+                session.add(
+                    DatasetEvent(
+                        dataset_id=dataset.id,
+                        extra=None,

Review Comment:
   ```suggestion
   ```
   
   This is the default, no?



##########
tests/models/test_taskinstance.py:
##########
@@ -1499,10 +1499,28 @@ def test_outlet_datasets(self, create_task_instance):
         ti._run_raw_task()
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
+
+        # check that one queue record created for each dag that depends on dataset 1
         assert session.query(DatasetDagRunQueue.target_dag_id).filter(
             DatasetTaskRef.dag_id == dag1.dag_id, DatasetTaskRef.task_id == 'upstream_task_1'
         ).all() == [('dag3',), ('dag4',), ('dag5',)]
 
+        # check that one event record created for dataset1 and this TI
+        assert session.query(Dataset.uri).join(DatasetEvent.dataset).filter(
+            DatasetEvent.source_task_instance == ti
+        ).one() == ('s3://dag1/output_1.txt',)
+
+        # check that no other dataset events recorded
+        assert (
+            len(
+                session.query(Dataset.uri)
+                .join(DatasetEvent.dataset)
+                .filter(DatasetEvent.source_task_instance == ti)
+                .all()
+            )
+            == 1
+        )

Review Comment:
   ```suggestion
           assert (
                   session.query(Dataset.uri)
                   .join(DatasetEvent.dataset)
                   .filter(DatasetEvent.source_task_instance == ti)
                   .count()
               ) == 1
   ```
   
   Not sure I got the formatting quite right...



##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
+    created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
+
+    __tablename__ = "dataset_event"
+    __table_args__ = (
+        Index('idx_dataset_id_created_at', dataset_id, created_at, mssql_clustered=True),
+        {'sqlite_autoincrement': True},  # ensures PK values not reused
+    )
+
+    source_task_instance = relationship(
+        "TaskInstance",
+        primaryjoin="""and_(
+            DatasetEvent.source_dag_id == foreign(TaskInstance.dag_id),
+            DatasetEvent.source_run_id == foreign(TaskInstance.run_id),
+            DatasetEvent.source_task_id == foreign(TaskInstance.task_id),
+            DatasetEvent.source_map_index == foreign(TaskInstance.map_index),
+        )""",
+        viewonly=True,
+        lazy="select",
+        uselist=False,
+    )
+    source_dag_run = relationship(
+        "DagRun",
+        primaryjoin="""and_(
+            DatasetEvent.source_dag_id == foreign(DagRun.dag_id),
+            DatasetEvent.source_run_id == foreign(DagRun.run_id),
+        )""",
+        viewonly=True,
+        lazy="select",
+        uselist=False,
+    )
+    dataset = relationship(
+        Dataset,
+        primaryjoin="DatasetEvent.dataset_id == foreign(Dataset.id)",
+        viewonly=True,
+        lazy="select",
+        uselist=False,
+    )
+
+    def __eq__(self, other):
+        if isinstance(other, self.__class__):
+            return self.dataset_id == other.dataset_id and self.created_at == other.created_at
+        else:
+            return NotImplemented
+
+    def __hash__(self):

Review Comment:
   ```suggestion
       def __hash__(self) -> int:
   ```



##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
+    created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
+
+    __tablename__ = "dataset_event"
+    __table_args__ = (
+        Index('idx_dataset_id_created_at', dataset_id, created_at, mssql_clustered=True),
+        {'sqlite_autoincrement': True},  # ensures PK values not reused
+    )
+
+    source_task_instance = relationship(
+        "TaskInstance",
+        primaryjoin="""and_(
+            DatasetEvent.source_dag_id == foreign(TaskInstance.dag_id),
+            DatasetEvent.source_run_id == foreign(TaskInstance.run_id),
+            DatasetEvent.source_task_id == foreign(TaskInstance.task_id),
+            DatasetEvent.source_map_index == foreign(TaskInstance.map_index),
+        )""",
+        viewonly=True,
+        lazy="select",
+        uselist=False,
+    )
+    source_dag_run = relationship(
+        "DagRun",
+        primaryjoin="""and_(
+            DatasetEvent.source_dag_id == foreign(DagRun.dag_id),
+            DatasetEvent.source_run_id == foreign(DagRun.run_id),
+        )""",
+        viewonly=True,
+        lazy="select",
+        uselist=False,
+    )
+    dataset = relationship(
+        Dataset,
+        primaryjoin="DatasetEvent.dataset_id == foreign(Dataset.id)",
+        viewonly=True,
+        lazy="select",
+        uselist=False,
+    )
+
+    def __eq__(self, other):

Review Comment:
   ```suggestion
       def __eq__(self, other) -> bool:
   ```
   
   If this passes mypy, eq can be a bit finicky.



##########
airflow/models/taskinstance.py:
##########
@@ -1513,10 +1513,10 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
-            self._create_dataset_dag_run_queue_records(session=session)
+            self._create_dataset_dag_run_queue_records(context=context, session=session)
             session.commit()
 
-    def _create_dataset_dag_run_queue_records(self, *, session):
+    def _create_dataset_dag_run_queue_records(self, *, context=None, session=NEW_SESSION):

Review Comment:
   ```suggestion
       def _create_dataset_dag_run_queue_records(self, *, context: Context = None, session: Session):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on PR #24908:
URL: https://github.com/apache/airflow/pull/24908#issuecomment-1180618308

   I guess I just think of "events" as things to be actioned, not just a record of things that happened. I don't feel that strongly about this though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917273468


##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))

Review Comment:
   And I guess, the idea is just to make it clear, when looking at the dataset event, and when we see the references to a dag_id, for instance, it's clear we're referring to "the dag that wrote to the dataset".
   
   E.g. when you are in the downstream TI that is processing the dataset update, and you are looking at this dataset event object (e.g. to see what was done to the dataset), and you see dag or task references on it -- will it be clear enough that those references mean "this is the dag / task that updated the dataset". 
   
   Probably so but let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917270799


##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))

Review Comment:
   Yeah, it's a good question.  So with dataset events, there tend to be multiple things implicated.  There's the task that updated the dataset.  But then there's the also dag or task that depends on the dataset, and will be triggered by the the update.  And e.g. with the queue table, we definitely needed to make clear we're talking about the target dag id.  Here though, I think you're probably right, we probably don't need it because it's only about writes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on pull request #24908: Dataset event log

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on PR #24908:
URL: https://github.com/apache/airflow/pull/24908#issuecomment-1179403622

   > However, the reason I leaned towards not using the word "log" is because, logs tend to be things that are not needed for the operation of your app.
   
   True, good point. Maybe "dataset-history" then?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on a diff in pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917277421


##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))

Review Comment:
   Speaking of, do we plan to record downstream events for when a dataset triggers a dag run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917286582


##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))

Review Comment:
   Yeah need to think through best way to model this so that everything can be tied together.  We can discuss next week.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on a diff in pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917281992


##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))

Review Comment:
   this all looks good to me for source events. Some sort of target event will be really helpful to visualize everything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #24908:
URL: https://github.com/apache/airflow/pull/24908#issuecomment-1179550252

   >> However, the reason I leaned towards not using the word "log" is because, logs tend to be things that are not needed for the operation of your app.
   
   > True, good point. Maybe "dataset-history" then?
   
   Not to be mr stick in the mud or anything.... _but_....
   
   Mabye help me understand the concern a little better?  
   
   What's wrong with sticking with the standard convention?  We have a model, DatasetEvent, to represent "dataset events", so why not stick it in a `dataset_event` table -- like you would put "dags" in a `dag` table, "serialized dags" in a `serialized_dag` table, "XComs" in an `xcom` table, or for that matter "orders" in an `order` table etc?
   
   I like dataset_event.  Seems to be as plain and clear as possible -- it stores dataset events.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on a diff in pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on code in PR #24908:
URL: https://github.com/apache/airflow/pull/24908#discussion_r917277421


##########
airflow/models/dataset.py:
##########
@@ -199,3 +199,86 @@ def __repr__(self):
         for attr in [x.name for x in self.__mapper__.primary_key]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetEvent(Base):
+    """
+    A table to store datasets events.
+
+    :param dataset_id: reference to Dataset record
+    :param extra: JSON field for arbitrary extra info
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
+
+    We use relationships instead of foreign keys so that dataset events are not deleted even
+    if the foreign key object is.
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    dataset_id = Column(Integer, nullable=False)
+    extra = Column(ExtendedJSON, nullable=True)
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))

Review Comment:
   Source would be upstream. Do we plan to record downstream events for when a dataset triggers a dag run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish merged pull request #24908: Dataset event table

Posted by GitBox <gi...@apache.org>.
dstandish merged PR #24908:
URL: https://github.com/apache/airflow/pull/24908


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org