You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "mhenc (via GitHub)" <gi...@apache.org> on 2023/02/03 12:56:25 UTC

[GitHub] [airflow] mhenc opened a new pull request, #29355: AIP-44 Support TaskInstance serialization.

mhenc opened a new pull request, #29355:
URL: https://github.com/apache/airflow/pull/29355

   In AIP-44 we need to send the TaskInstance object to worker for execution. Currently this object type doesn't  support serialization(only SimpleTaskInstance does). 
   
   This change is mainly about new way to construct TaskInstance  `from_dict` - which requires changing the constructor into `from_task` and refactoring all usages.
   
   related: #29320
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1113026086


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   That's **precisely** what I am refactoring now (with my POC)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1100288882


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   We rely on serialized_objects in InternalAPI in general
   https://github.com/apache/airflow/blob/main/airflow/api_internal/internal_api_call.py#L107
   Do you think we could switch to serde now? Is it compatible with what serialize_objects offer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1096633749


##########
airflow/models/taskinstance.py:
##########
@@ -448,21 +448,30 @@ class TaskInstance(Base, LoggingMixin):
     note = association_proxy("task_instance_note", "content", creator=_creator_note)
     task: Operator  # Not always set...
 
-    def __init__(
-        self,
+    @staticmethod
+    def from_task(

Review Comment:
   I like this, but this may not be viable. Creating a TaskInstance out-of-band has historically been used quite pervasively by users, and this could potentially break many installations. We could still do this (especially since we are trying to talk people off calling various Airflow internals anyway), but need to be aware of the possible implications.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "ashb (via GitHub)" <gi...@apache.org>.
ashb commented on PR #29355:
URL: https://github.com/apache/airflow/pull/29355#issuecomment-1422304775

   @bolkedebruin could you take a look? I know you've recently overhauled all of the serialisation code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on PR #29355:
URL: https://github.com/apache/airflow/pull/29355#issuecomment-1482845355

   Closing in favor of https://github.com/apache/airflow/pull/30282


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1112983708


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   But this works only for client -> Internal API side.
   
   What about Internal API -> client, e.g. worker. We need to have TaskInstance object in worker to run the tasks. 
   https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L187



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1101582176


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   Am I incorrect in thinking that migration is a 2 line change in `internal_api_call` and you are not relying on any of the other serialized_objects (basically DAG)? If so then I would say do not add technical dept and migrate now. This allows us to call `serialized_objects` as stale and soon to be deprecated.
   
   Otherwise, keep it and and add it to the todo of AIP-44?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1099950413


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   Here you are extending legacy code, i suggest using the more generic `serialization` code from `serde`



##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:

Review Comment:
   this should be called `deserialize` instead of `from_dict`, inlione with the (newish) serailization code.



##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        ti = TaskInstance()
+        ti.__dict__ = ti_dict.copy()
+        ti.init_on_load()
+        return ti
+
+    def to_dict(self) -> dict[str, Any]:

Review Comment:
   this should be called `serialize` instead of to_dict, inline with the (newish) serialization code).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1101430076


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   Great. I think we will migrate to the new serializer/deserializer in this case, but probably outside of this PR.
   If you believe it would better to migrate first, then I can revert this change and get back to it when using new way.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1101472519


##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        ti = TaskInstance()
+        ti.__dict__ = ti_dict.copy()
+        ti.init_on_load()
+        return ti
+
+    def to_dict(self) -> dict[str, Any]:
+        ti_dict = self.__dict__.copy()
+        ti_dict.pop("_log", None)
+        ti_dict.pop("test_mode", None)
+        ti_dict.pop("dag_run", None)

Review Comment:
   I see, actually running this in test makes more sense for me.
   Test added PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1113026086


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   That's **precisely** what I am refactoring now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1101472519


##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        ti = TaskInstance()
+        ti.__dict__ = ti_dict.copy()
+        ti.init_on_load()
+        return ti
+
+    def to_dict(self) -> dict[str, Any]:
+        ti_dict = self.__dict__.copy()
+        ti_dict.pop("_log", None)
+        ti_dict.pop("test_mode", None)
+        ti_dict.pop("dag_run", None)

Review Comment:
   I see, actually running this is tests makes more sense for me.
   Test added PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1104169696


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   yes, we need to make change there and server-side:
   https://github.com/apache/airflow/blob/main/airflow/api_internal/endpoints/rpc_api_endpoint.py#L76
   
   There are more methods with `internal_api_call` decorator. I did a quick check and I see that (beside primitives) we already need serialization to Dag,DagRun, BaseXCom, CallbackRequest (and probably more soon)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1106550754


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   Question. (and really sorry I have not looked at it earlier) and also follow up on https://github.com/apache/airflow/pull/29513 as well.
   
   Are we REALLY sure we need to serialize the whole TaskInstance (and other) objects to be passed via internal_api_call ?  
   
   For me this is an indication that we either have too narrow of a scope for an @internal_api call (generally speaking the whole internal_api_call should span the whole DB transaction. And since we are trying to pass an ORM object (TaskInstance, DAGRun etc.) it means that that object must have been retrieved before within a transaction. So it means that our internal_api_call should wrap the retrieval as well. Which might simply mean that we need to do some refactoring and add extract new methods (and then decorate them). 
   
   That's of course a general statement and approach and there might be cases that this require a bit deeper refactoring. 
   
   Which methods are affected @mhenc (besides the https://github.com/apache/airflow/pull/29513 one) ? maybe we can look toghether and figure out approach for all of them ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1106556102


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   For example in https://github.com/apache/airflow/pull/29513#discussion_r1106553896 I proposed the solution that would avoid TaskInstance serialization altogether. I am reasonable convinced, that similar approach can be done for all ORM objects of ours and that we do not need to serialize any of them (in which case the whole PR might not be needed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc closed pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc closed pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.
URL: https://github.com/apache/airflow/pull/29355


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1099675722


##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        ti = TaskInstance()
+        ti.__dict__ = ti_dict.copy()
+        ti.init_on_load()
+        return ti
+
+    def to_dict(self) -> dict[str, Any]:
+        ti_dict = self.__dict__.copy()
+        ti_dict.pop("_log", None)
+        ti_dict.pop("test_mode", None)
+        ti_dict.pop("dag_run", None)

Review Comment:
   Making the list of keys are variable (e.g. global `TASK_INSTANCE_TO_DICT_EXCLUDES`) and adding a pre-commit hook is a common approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1101560326


##########
airflow/models/taskinstance.py:
##########
@@ -470,13 +477,43 @@ class TaskInstance(Base, LoggingMixin):
 
     def __init__(
         self,
-        task: Operator,
+        task: Operator | None = None,
         execution_date: datetime | None = None,
         run_id: str | None = None,
         state: str | None = None,
         map_index: int = -1,
+        ti_dict: dict[str, Any] | None = None,
+    ):
+        """
+        Constructs TaskInstance object.
+
+        Deprecated, prefer to use "from_task" or "deserialize" static methods.
+        """
+        if ti_dict is not None:
+            # Should only be used by deserialize method.

Review Comment:
   I don't think this is required anymore?



##########
airflow/models/taskinstance.py:
##########
@@ -532,6 +569,41 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    @staticmethod
+    def from_task(
+        task: Operator,
+        execution_date: datetime | None = None,
+        run_id: str | None = None,
+        state: str | None = None,
+        map_index: int = -1,
+    ):
+        """
+        Create TaskInstance from task operator.
+
+        :param task: task's Operator object.
+        :param execution_date: Optional execution time of the task.
+        :param run_id: Optional DAG run ID for the task.
+        :param state: Optional state of the task.
+        :param map_index: Optional map index. Defaults to -1 (non-mapped task).
+        """
+        return TaskInstance(task, execution_date, run_id, state, map_index)
+
+    @staticmethod
+    def deserialize(ti_dict: dict[str, Any], version: int) -> TaskInstance:
+        """Deserialize TaskInstance from dictionary."""
+        if version > TaskInstance.__version__:
+            raise TypeError("version too big, dont know hot to deserialize")

Review Comment:
   Mmm you have even copied my typo ;-)



##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   Am I incorrect in thinking that migration is a 2 line change in `internal_api_call` and you are not relying on any of the other serialized_objects (basically DAG)? If so then I would say do not add technical dept and migrate now. This allows us to call `serialized_objects` as stale and soon to be deprecated.
   
   Otherwise, keep it and and it to the todo of AIP-44?



##########
airflow/models/taskinstance.py:
##########
@@ -532,6 +569,41 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    @staticmethod
+    def from_task(
+        task: Operator,
+        execution_date: datetime | None = None,
+        run_id: str | None = None,
+        state: str | None = None,
+        map_index: int = -1,
+    ):
+        """
+        Create TaskInstance from task operator.
+
+        :param task: task's Operator object.
+        :param execution_date: Optional execution time of the task.
+        :param run_id: Optional DAG run ID for the task.
+        :param state: Optional state of the task.
+        :param map_index: Optional map index. Defaults to -1 (non-mapped task).
+        """
+        return TaskInstance(task, execution_date, run_id, state, map_index)
+
+    @staticmethod
+    def deserialize(ti_dict: dict[str, Any], version: int) -> TaskInstance:
+        """Deserialize TaskInstance from dictionary."""
+        if version > TaskInstance.__version__:
+            raise TypeError("version too big, dont know hot to deserialize")

Review Comment:
   I suggest clarifying it a bit, by being more explicit (what version did you get and what do you support as a max)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1100488514


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   Given that you are going to rely on a lot of serialization and the new serializer/deserializer is significantly faster and more future proof (versioning) I think everything that does currently not have a schema (that's everything except a DAG*) should switch.
   
   I am willing to help out to ease the migration if required. 
   
   * I am working on DAG serialization/deserialization but untangling how it is done now and to improve the structure is taking time especially with all the edge cases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kosteev commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "kosteev (via GitHub)" <gi...@apache.org>.
kosteev commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1098660105


##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        ti = TaskInstance()
+        ti.__dict__ = ti_dict.copy()
+        ti.init_on_load()
+        return ti
+
+    def to_dict(self) -> dict[str, Any]:
+        ti_dict = self.__dict__.copy()
+        ti_dict.pop("_log", None)
+        ti_dict.pop("test_mode", None)
+        ti_dict.pop("dag_run", None)

Review Comment:
   I am curious if we should care about keeping this in sync. Are there any consequences if we forget to remove some attribute here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on PR #29355:
URL: https://github.com/apache/airflow/pull/29355#issuecomment-1422478049

   > In order to achieve the goal of constructing TaskInstance objects from "task" and "dict" could we have constructor of TaskInstance extended to support new parameter, e.g. task_dict like this:
   > 
   > def __init__(
   >     self,
   >     task: Operator,
   >     execution_date: datetime | None = None,
   >     run_id: str | None = None,
   >     state: str | None = None,
   >     map_index: int = -1,
   >     task_dict: dict = None,
   > ):
   >     if task_dict is not None:
   >         # init from task_dict
   >         return
   > 
   >     # old implementation
   > 
   > and then TaskInstance(task=t), TaskInstance(task_dict={...})
   > 
   > What do you think about this approach?
   
   I don't that makes sense, a staticmethod or classmethod `deserialize` should cover this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1100495876


##########
airflow/models/taskinstance.py:
##########
@@ -532,6 +560,42 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    def _init_from_dict(self, ti_dict: dict[str, Any]):
+        self.__dict__ = ti_dict.copy()
+        self.init_on_load()
+
+    @staticmethod
+    def from_task(
+        task: Operator,
+        execution_date: datetime | None = None,
+        run_id: str | None = None,
+        state: str | None = None,
+        map_index: int = -1,
+    ):
+        """
+        Create TaskInstance from task operator.
+
+        :param task: task's Operator object.
+        :param execution_date: Optional execution time of the task.
+        :param run_id: Optional DAG run ID for the task.
+        :param state: Optional state of the task.
+        :param map_index: Optional map index. Defaults to -1 (non-mapped task).
+        """
+        return TaskInstance(task, execution_date, run_id, state, map_index)
+
+    @staticmethod
+    def deserialize(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        return TaskInstance(ti_dict=ti_dict)

Review Comment:
   Why do this in the constructor? It seems to overload it and only calls `_init_from_dict` which does a dict copy and calls `init_on_load`? I would expect something like
   
   ```
   def deserialize(data: dict[str, Any], version: int) -> TaskInstance:
     if version > TaskInstance.__version__:
       raise TypeError("version too big, dont know hot to deserialize")
     
      ti = TaskInstance()
      ti.__dict__ = data.copy()
      ti.init_on_load()
      return ti
   ```



##########
airflow/models/taskinstance.py:
##########
@@ -532,6 +560,42 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    def _init_from_dict(self, ti_dict: dict[str, Any]):
+        self.__dict__ = ti_dict.copy()
+        self.init_on_load()
+
+    @staticmethod
+    def from_task(
+        task: Operator,
+        execution_date: datetime | None = None,
+        run_id: str | None = None,
+        state: str | None = None,
+        map_index: int = -1,
+    ):
+        """
+        Create TaskInstance from task operator.
+
+        :param task: task's Operator object.
+        :param execution_date: Optional execution time of the task.
+        :param run_id: Optional DAG run ID for the task.
+        :param state: Optional state of the task.
+        :param map_index: Optional map index. Defaults to -1 (non-mapped task).
+        """
+        return TaskInstance(task, execution_date, run_id, state, map_index)
+
+    @staticmethod
+    def deserialize(ti_dict: dict[str, Any]) -> TaskInstance:

Review Comment:
   deserialize takes two arguments: data: dict[str, Any] and version: int



##########
airflow/models/taskinstance.py:
##########
@@ -470,13 +469,42 @@ class TaskInstance(Base, LoggingMixin):
 

Review Comment:
   here add a 
   
   `__version__: ClassVar[int] = 1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on pull request #29355: AIP-44 Support TaskInstance serialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on PR #29355:
URL: https://github.com/apache/airflow/pull/29355#issuecomment-1415931500

   cc: @potiuk @vincbeck  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1112983708


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   But this works only for client -> Internal API side.
   
   What about Internal API -> client, e.g. worker. We need to have TaskInstance object in worker to run the task, e.g.
   https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L187
   
   unless of course we are able to refactor it completely



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "ashb (via GitHub)" <gi...@apache.org>.
ashb commented on PR #29355:
URL: https://github.com/apache/airflow/pull/29355#issuecomment-1422304840

   @bolkedebruin could you take a look? I know you've recently overhauled all of the serialisation code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1100345687


##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        ti = TaskInstance()
+        ti.__dict__ = ti_dict.copy()
+        ti.init_on_load()
+        return ti
+
+    def to_dict(self) -> dict[str, Any]:
+        ti_dict = self.__dict__.copy()
+        ti_dict.pop("_log", None)
+        ti_dict.pop("test_mode", None)
+        ti_dict.pop("dag_run", None)

Review Comment:
   Basically you’d list out what fields you expect to be in the dict in the pre-commit check, and compare it with `__dict__` minus these explicitly excluded fields. So if `__dict__` is changed, someone must consciously decide whether the new field should go into the dict (by adding it to the list maintained in the check), or be excluded (by adding it to the exclude fields).
   
   This can also be done in a test. (There are several serialization tests that do exactly this.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1112983708


##########
airflow/serialization/serialized_objects.py:
##########
@@ -502,6 +504,8 @@ def deserialize(cls, encoded_var: Any) -> Any:
             return Dataset(**var)
         elif type_ == DAT.SIMPLE_TASK_INSTANCE:
             return SimpleTaskInstance(**cls.deserialize(var))
+        elif type_ == DAT.TASK_INSTANCE:

Review Comment:
   But this if for client -> Internal API side .
   What about Internal API -> client, e.g. worker. We need to have TaskInstance object in worker to run the tasks. 
   https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L187



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1100289253


##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        ti = TaskInstance()
+        ti.__dict__ = ti_dict.copy()
+        ti.init_on_load()
+        return ti
+
+    def to_dict(self) -> dict[str, Any]:

Review Comment:
   Done



##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1100292662


##########
airflow/models/taskinstance.py:
##########
@@ -448,21 +448,30 @@ class TaskInstance(Base, LoggingMixin):
     note = association_proxy("task_instance_note", "content", creator=_creator_note)
     task: Operator  # Not always set...
 
-    def __init__(
-        self,
+    @staticmethod
+    def from_task(

Review Comment:
   I made changes that @kosteev suggested - with additional optional parametr to the constructor, while keepign the from_task static methods so users can slowly migrate there.
   On the other hand @bolkedebruin has some concerns about this approach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1099949583


##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:

Review Comment:
   this should be called `deserialize` instead of `from_dict`, inline with the (newish) serialization code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1099949583


##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:

Review Comment:
   this should be called `deserialize` instead of `from_dict`, inline with the (newish) serailization code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #29355: AIP-44 Support TaskInstance serialization/deserialization.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1100274604


##########
airflow/models/taskinstance.py:
##########
@@ -495,42 +503,59 @@ def __init__(
             )
             # make sure we have a localized execution_date stored in UTC
             if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
+                ti.log.warning(
                     "execution date %s has no timezone information. Using default from dag or system",
                     execution_date,
                 )
-                if self.task.has_dag():
+                if ti.task.has_dag():
                     if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
+                        assert ti.task.dag
+                    execution_date = timezone.make_aware(execution_date, ti.task.dag.timezone)
                 else:
                     execution_date = timezone.make_aware(execution_date)
 
                 execution_date = timezone.convert_to_utc(execution_date)
             with create_session() as session:
                 run_id = (
                     session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, execution_date=execution_date)
+                    .filter_by(dag_id=ti.dag_id, execution_date=execution_date)
                     .scalar()
                 )
                 if run_id is None:
                     raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date {execution_date} not found"
+                        f"DagRun for {ti.dag_id!r} with date {execution_date} not found"
                     ) from None
 
-        self.run_id = run_id
+        ti.run_id = run_id
 
-        self.try_number = 0
-        self.max_tries = self.task.retries
-        self.unixname = getuser()
+        ti.try_number = 0
+        ti.max_tries = ti.task.retries
+        ti.unixname = getuser()
         if state:
-            self.state = state
-        self.hostname = ""
+            ti.state = state
+        ti.hostname = ""
         # Is this TaskInstance being currently running within `airflow tasks run --raw`.
         # Not persisted to the database so only valid for the current process
-        self.raw = False
+        ti.raw = False
         # can be changed when calling 'run'
-        self.test_mode = False
+        ti.test_mode = False
+        return ti
+
+    @staticmethod
+    def from_dict(ti_dict: dict[str, Any]) -> TaskInstance:
+        """Create TaskInstance from dictionary."""
+        ti = TaskInstance()
+        ti.__dict__ = ti_dict.copy()
+        ti.init_on_load()
+        return ti
+
+    def to_dict(self) -> dict[str, Any]:
+        ti_dict = self.__dict__.copy()
+        ti_dict.pop("_log", None)
+        ti_dict.pop("test_mode", None)
+        ti_dict.pop("dag_run", None)

Review Comment:
   But what should it be kept in sync with? 
   We exclude there fields that are not useful (like logger, SQL Alchemy  state, or calculated in constructor anyway (test_mode).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org