You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "bolkedebruin (via GitHub)" <gi...@apache.org> on 2023/02/08 10:47:19 UTC

[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

bolkedebruin commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1099950413


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   Here you are extending legacy code, i suggest using the more generic `serialization` code from `serde`



##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:

Review Comment:
   this should be called `deserialize` instead of `from_dict`, inlione with the (newish) serailization code.



##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        ti = TaskInstance()
+        ti.__dict__ = ti_dict.copy()
+        ti.init_on_load()
+        return ti
+
+    def to_dict(self) -> dict[str, Any]:

Review Comment:
   this should be called `serialize` instead of to_dict, inline with the (newish) serialization code).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org