You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/07/14 18:42:43 UTC
[airflow] branch main updated: Add `source_` prefix to DatasetEvent columns (#25068)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 61fc4899d7 Add `source_` prefix to DatasetEvent columns (#25068)
61fc4899d7 is described below
commit 61fc4899d71821fd051944d5d9732f7d402edf6c
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Thu Jul 14 12:42:33 2022 -0600
Add `source_` prefix to DatasetEvent columns (#25068)
This helps clarify that the TI key was the source of the event.
---
.../versions/0114_2_4_0_add_dataset_model.py | 8 ++---
airflow/models/dataset.py | 36 +++++++++++-----------
airflow/models/taskinstance.py | 8 ++---
3 files changed, 26 insertions(+), 26 deletions(-)
diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
index ba4be54451..deb6c3c33f 100644
--- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
+++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
@@ -124,10 +124,10 @@ def _create_dataset_event_table():
sa.Column('id', Integer, primary_key=True, autoincrement=True),
sa.Column('dataset_id', Integer, nullable=False),
sa.Column('extra', ExtendedJSON, nullable=True),
- sa.Column('task_id', String(250), nullable=True),
- sa.Column('dag_id', String(250), nullable=True),
- sa.Column('run_id', String(250), nullable=True),
- sa.Column('map_index', sa.Integer(), nullable=True, server_default='-1'),
+ sa.Column('source_task_id', String(250), nullable=True),
+ sa.Column('source_dag_id', String(250), nullable=True),
+ sa.Column('source_run_id', String(250), nullable=True),
+ sa.Column('source_map_index', sa.Integer(), nullable=True, server_default='-1'),
sa.Column('created_at', TIMESTAMP, nullable=False),
sqlite_autoincrement=True, # ensures PK values not reused
)
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index 7646eaae40..e6db473a88 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -207,10 +207,10 @@ class DatasetEvent(Base):
:param dataset_id: reference to Dataset record
:param extra: JSON field for arbitrary extra info
- :param task_id: the task_id of the TI which updated the dataset
- :param dag_id: the dag_id of the TI which updated the dataset
- :param run_id: the run_id of the TI which updated the dataset
- :param map_index: the map_index of the TI which updated the dataset
+ :param source_task_id: the task_id of the TI which updated the dataset
+ :param source_dag_id: the dag_id of the TI which updated the dataset
+ :param source_run_id: the run_id of the TI which updated the dataset
+ :param source_map_index: the map_index of the TI which updated the dataset
We use relationships instead of foreign keys so that dataset events are not deleted even
if the foreign key object is.
@@ -219,10 +219,10 @@ class DatasetEvent(Base):
id = Column(Integer, primary_key=True, autoincrement=True)
dataset_id = Column(Integer, nullable=False)
extra = Column(ExtendedJSON, nullable=True)
- task_id = Column(StringID(), nullable=True)
- dag_id = Column(StringID(), nullable=True)
- run_id = Column(StringID(), nullable=True)
- map_index = Column(Integer, nullable=True, server_default=text("-1"))
+ source_task_id = Column(StringID(), nullable=True)
+ source_dag_id = Column(StringID(), nullable=True)
+ source_run_id = Column(StringID(), nullable=True)
+ source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
__tablename__ = "dataset_event"
@@ -234,10 +234,10 @@ class DatasetEvent(Base):
source_task_instance = relationship(
"TaskInstance",
primaryjoin="""and_(
- DatasetEvent.dag_id == foreign(TaskInstance.dag_id),
- DatasetEvent.run_id == foreign(TaskInstance.run_id),
- DatasetEvent.task_id == foreign(TaskInstance.task_id),
- DatasetEvent.map_index == foreign(TaskInstance.map_index),
+ DatasetEvent.source_dag_id == foreign(TaskInstance.dag_id),
+ DatasetEvent.source_run_id == foreign(TaskInstance.run_id),
+ DatasetEvent.source_task_id == foreign(TaskInstance.task_id),
+ DatasetEvent.source_map_index == foreign(TaskInstance.map_index),
)""",
viewonly=True,
lazy="select",
@@ -246,8 +246,8 @@ class DatasetEvent(Base):
source_dag_run = relationship(
"DagRun",
primaryjoin="""and_(
- DatasetEvent.dag_id == foreign(DagRun.dag_id),
- DatasetEvent.run_id == foreign(DagRun.run_id),
+ DatasetEvent.source_dag_id == foreign(DagRun.dag_id),
+ DatasetEvent.source_run_id == foreign(DagRun.run_id),
)""",
viewonly=True,
lazy="select",
@@ -276,10 +276,10 @@ class DatasetEvent(Base):
'id',
'dataset_id',
'extra',
- 'task_id',
- 'dag_id',
- 'run_id',
- 'map_index',
+ 'source_task_id',
+ 'source_dag_id',
+ 'source_run_id',
+ 'source_map_index',
]:
args.append(f"{attr}={getattr(self, attr)!r}")
return f"{self.__class__.__name__}({', '.join(args)})"
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 7db364be2f..4dd3e2545d 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1536,10 +1536,10 @@ class TaskInstance(Base, LoggingMixin):
session.add(
DatasetEvent(
dataset_id=dataset.id,
- task_id=self.task_id,
- dag_id=self.dag_id,
- run_id=self.run_id,
- map_index=self.map_index,
+ source_task_id=self.task_id,
+ source_dag_id=self.dag_id,
+ source_run_id=self.run_id,
+ source_map_index=self.map_index,
)
)
for dag_id in downstream_dag_ids: