You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/30 17:41:04 UTC
[GitHub] [airflow] jedcunningham commented on a diff in pull request #24743: WIP Add other dataset models -- dag-based dataset deps
jedcunningham commented on code in PR #24743:
URL: https://github.com/apache/airflow/pull/24743#discussion_r911291025
##########
airflow/models/dataset.py:
##########
@@ -66,10 +68,36 @@ def __init__(self, uri: str, **kwargs):
super().__init__(uri=uri, **kwargs)
def __eq__(self, other):
- return self.uri == other.uri
+ if isinstance(other, self.__class__):
+ return self.uri == other.uri
+ raise NotImplemented
def __hash__(self):
return hash(self.uri)
def __repr__(self):
return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})"
+
+ @provide_session
+ def get_dataset_id(self, session=NEW_SESSION):
+ if self.id:
+ dataset_id = self.id
+ else:
+ stored = session.query(self.__class__).filter(self.__class__.uri == self.uri).first()
Review Comment:
```suggestion
stored = session.query(self.__class__.id).filter(self.__class__.uri == self.uri).first()
```
We can just bring back the id, no?
##########
airflow/models/taskinstance.py:
##########
@@ -1521,7 +1524,25 @@ def _run_raw_task(
if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
-
+ for obj in getattr(self.task, '_outlets', []):
+ self.log.debug("outlet obj %s", obj)
+ if isinstance(obj, Dataset):
+ dataset = session.query(Dataset).filter(Dataset.uri == obj.uri).first()
+ if not dataset:
+ dataset = Dataset(uri=obj.uri, extra=obj.extra)
+ self.log.debug("adding dataset %s", dataset)
+ session.add(dataset)
+ session.flush()
Review Comment:
How can this be hit? Shouldn't it have been added during parse?
##########
airflow/models/dataset.py:
##########
@@ -66,10 +68,36 @@ def __init__(self, uri: str, **kwargs):
super().__init__(uri=uri, **kwargs)
def __eq__(self, other):
- return self.uri == other.uri
+ if isinstance(other, self.__class__):
+ return self.uri == other.uri
+ raise NotImplemented
def __hash__(self):
return hash(self.uri)
def __repr__(self):
return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})"
+
+ @provide_session
+ def get_dataset_id(self, session=NEW_SESSION):
+ if self.id:
+ dataset_id = self.id
+ else:
+ stored = session.query(self.__class__).filter(self.__class__.uri == self.uri).first()
+ if not stored:
+ session.add(self)
+ session.flush()
+ dataset_id = self.id
+ else:
+ dataset_id = stored.id
+ return dataset_id
Review Comment:
```suggestion
if stored:
return stored.id
session.add(self)
session.flush()
return self.id
```
nit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org