You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/30 17:41:04 UTC

[GitHub] [airflow] jedcunningham commented on a diff in pull request #24743: WIP Add other dataset models -- dag-based dataset deps

jedcunningham commented on code in PR #24743:
URL: https://github.com/apache/airflow/pull/24743#discussion_r911291025


##########
airflow/models/dataset.py:
##########
@@ -66,10 +68,36 @@ def __init__(self, uri: str, **kwargs):
         super().__init__(uri=uri, **kwargs)
 
     def __eq__(self, other):
-        return self.uri == other.uri
+        if isinstance(other, self.__class__):
+            return self.uri == other.uri
+        raise NotImplemented
 
     def __hash__(self):
         return hash(self.uri)
 
     def __repr__(self):
         return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})"
+
+    @provide_session
+    def get_dataset_id(self, session=NEW_SESSION):
+        if self.id:
+            dataset_id = self.id
+        else:
+            stored = session.query(self.__class__).filter(self.__class__.uri == self.uri).first()

Review Comment:
   ```suggestion
               stored = session.query(self.__class__.id).filter(self.__class__.uri == self.uri).first()
   ```
   
   We can just bring back the id, no?



##########
airflow/models/taskinstance.py:
##########
@@ -1521,7 +1524,25 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
-
+            for obj in getattr(self.task, '_outlets', []):
+                self.log.debug("outlet obj %s", obj)
+                if isinstance(obj, Dataset):
+                    dataset = session.query(Dataset).filter(Dataset.uri == obj.uri).first()
+                    if not dataset:
+                        dataset = Dataset(uri=obj.uri, extra=obj.extra)
+                        self.log.debug("adding dataset %s", dataset)
+                        session.add(dataset)
+                        session.flush()

Review Comment:
   How can this be hit? Shouldn't it have been added during parse?



##########
airflow/models/dataset.py:
##########
@@ -66,10 +68,36 @@ def __init__(self, uri: str, **kwargs):
         super().__init__(uri=uri, **kwargs)
 
     def __eq__(self, other):
-        return self.uri == other.uri
+        if isinstance(other, self.__class__):
+            return self.uri == other.uri
+        raise NotImplemented
 
     def __hash__(self):
         return hash(self.uri)
 
     def __repr__(self):
         return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})"
+
+    @provide_session
+    def get_dataset_id(self, session=NEW_SESSION):
+        if self.id:
+            dataset_id = self.id
+        else:
+            stored = session.query(self.__class__).filter(self.__class__.uri == self.uri).first()
+            if not stored:
+                session.add(self)
+                session.flush()
+                dataset_id = self.id
+            else:
+                dataset_id = stored.id
+        return dataset_id

Review Comment:
   ```suggestion
               if stored:
                   return stored.id
               
               session.add(self)
               session.flush()
               return self.id
   ```
   
   nit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org