You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "kosteev (via GitHub)" <gi...@apache.org> on 2023/02/07 13:41:26 UTC

[GitHub] [airflow] kosteev commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

kosteev commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1098660105


##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        ti = TaskInstance()
+        ti.__dict__ = ti_dict.copy()
+        ti.init_on_load()
+        return ti
+
+    def to_dict(self) -> dict[str, Any]:
+        ti_dict = self.__dict__.copy()
+        ti_dict.pop("_log", None)
+        ti_dict.pop("test_mode", None)
+        ti_dict.pop("dag_run", None)

Review Comment:
   I am curious if we should care about keeping this in sync. Are there any consequences if we forget to remove some attribute here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org