You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/07/14 18:42:43 UTC

[airflow] branch main updated: Add `source_` prefix to DatasetEvent columns (#25068)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 61fc4899d7 Add `source_` prefix to DatasetEvent columns (#25068)
61fc4899d7 is described below

commit 61fc4899d71821fd051944d5d9732f7d402edf6c
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Thu Jul 14 12:42:33 2022 -0600

    Add `source_` prefix to DatasetEvent columns (#25068)
    
    This helps clarify that the TI key was the source of the event.
---
 .../versions/0114_2_4_0_add_dataset_model.py       |  8 ++---
 airflow/models/dataset.py                          | 36 +++++++++++-----------
 airflow/models/taskinstance.py                     |  8 ++---
 3 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
index ba4be54451..deb6c3c33f 100644
--- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
+++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
@@ -124,10 +124,10 @@ def _create_dataset_event_table():
         sa.Column('id', Integer, primary_key=True, autoincrement=True),
         sa.Column('dataset_id', Integer, nullable=False),
         sa.Column('extra', ExtendedJSON, nullable=True),
-        sa.Column('task_id', String(250), nullable=True),
-        sa.Column('dag_id', String(250), nullable=True),
-        sa.Column('run_id', String(250), nullable=True),
-        sa.Column('map_index', sa.Integer(), nullable=True, server_default='-1'),
+        sa.Column('source_task_id', String(250), nullable=True),
+        sa.Column('source_dag_id', String(250), nullable=True),
+        sa.Column('source_run_id', String(250), nullable=True),
+        sa.Column('source_map_index', sa.Integer(), nullable=True, server_default='-1'),
         sa.Column('created_at', TIMESTAMP, nullable=False),
         sqlite_autoincrement=True,  # ensures PK values not reused
     )
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index 7646eaae40..e6db473a88 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -207,10 +207,10 @@ class DatasetEvent(Base):
 
     :param dataset_id: reference to Dataset record
     :param extra: JSON field for arbitrary extra info
-    :param task_id: the task_id of the TI which updated the dataset
-    :param dag_id: the dag_id of the TI which updated the dataset
-    :param run_id: the run_id of the TI which updated the dataset
-    :param map_index: the map_index of the TI which updated the dataset
+    :param source_task_id: the task_id of the TI which updated the dataset
+    :param source_dag_id: the dag_id of the TI which updated the dataset
+    :param source_run_id: the run_id of the TI which updated the dataset
+    :param source_map_index: the map_index of the TI which updated the dataset
 
     We use relationships instead of foreign keys so that dataset events are not deleted even
     if the foreign key object is.
@@ -219,10 +219,10 @@ class DatasetEvent(Base):
     id = Column(Integer, primary_key=True, autoincrement=True)
     dataset_id = Column(Integer, nullable=False)
     extra = Column(ExtendedJSON, nullable=True)
-    task_id = Column(StringID(), nullable=True)
-    dag_id = Column(StringID(), nullable=True)
-    run_id = Column(StringID(), nullable=True)
-    map_index = Column(Integer, nullable=True, server_default=text("-1"))
+    source_task_id = Column(StringID(), nullable=True)
+    source_dag_id = Column(StringID(), nullable=True)
+    source_run_id = Column(StringID(), nullable=True)
+    source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
     created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
 
     __tablename__ = "dataset_event"
@@ -234,10 +234,10 @@ class DatasetEvent(Base):
     source_task_instance = relationship(
         "TaskInstance",
         primaryjoin="""and_(
-            DatasetEvent.dag_id == foreign(TaskInstance.dag_id),
-            DatasetEvent.run_id == foreign(TaskInstance.run_id),
-            DatasetEvent.task_id == foreign(TaskInstance.task_id),
-            DatasetEvent.map_index == foreign(TaskInstance.map_index),
+            DatasetEvent.source_dag_id == foreign(TaskInstance.dag_id),
+            DatasetEvent.source_run_id == foreign(TaskInstance.run_id),
+            DatasetEvent.source_task_id == foreign(TaskInstance.task_id),
+            DatasetEvent.source_map_index == foreign(TaskInstance.map_index),
         )""",
         viewonly=True,
         lazy="select",
@@ -246,8 +246,8 @@ class DatasetEvent(Base):
     source_dag_run = relationship(
         "DagRun",
         primaryjoin="""and_(
-            DatasetEvent.dag_id == foreign(DagRun.dag_id),
-            DatasetEvent.run_id == foreign(DagRun.run_id),
+            DatasetEvent.source_dag_id == foreign(DagRun.dag_id),
+            DatasetEvent.source_run_id == foreign(DagRun.run_id),
         )""",
         viewonly=True,
         lazy="select",
@@ -276,10 +276,10 @@ class DatasetEvent(Base):
             'id',
             'dataset_id',
             'extra',
-            'task_id',
-            'dag_id',
-            'run_id',
-            'map_index',
+            'source_task_id',
+            'source_dag_id',
+            'source_run_id',
+            'source_map_index',
         ]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 7db364be2f..4dd3e2545d 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1536,10 +1536,10 @@ class TaskInstance(Base, LoggingMixin):
                 session.add(
                     DatasetEvent(
                         dataset_id=dataset.id,
-                        task_id=self.task_id,
-                        dag_id=self.dag_id,
-                        run_id=self.run_id,
-                        map_index=self.map_index,
+                        source_task_id=self.task_id,
+                        source_dag_id=self.dag_id,
+                        source_run_id=self.run_id,
+                        source_map_index=self.map_index,
                     )
                 )
                 for dag_id in downstream_dag_ids: