You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "potiuk (via GitHub)" <gi...@apache.org> on 2023/02/27 06:45:44 UTC

[GitHub] [airflow] potiuk commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

potiuk commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118339128


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   Yeah, i looked at pydantic-sqlalchemy and for now i decided not to use it. I will try and see what will be the quality of generated code vs. The manually written one.
   
   But i can see if it can be integrated and generate the Pydantic classes automatically - that would save some effort (but not a lot to be honest). There are a few risks:
   
   * 'Still experimental' status of it
   
   * We likely do not want to convert all of the models and fields automatically - we will want to do skip some fields or treat them a bit differently and either decide to add some validations or not. Also we might want to exclude certain models from serializing if the serialized form will drag too much of useless data. There are some back references in our models and they might cause recursive serialization and  attempting to serialize more than we need).
   * Those models of ours that we want to serialize change extremely rarely so benefit from automating the conversion are very small vs. the cost connected with fixing potential 'experimental' issues.
   
   But I will try :) 
   
   The print statement are for now to show the serialized form so that you can manually see them when. Running - yes we can remove them eventually the asserts are good enough to show that serialization /serialization works 
   
   
   Re: how to use it - yes in all internal_api calls that we need to pas the models we should make sure that the db model gets created or retrieved on the 'other' side of the call and returned as serialized/Pydantic form.
   
   And we can even optimise it in some cases that we see the need of actually for performance we can also decide to return the DB model instead, because they are technically equivalent and using them is the same as long as the model would be detached anyway and they are read-only (or as long as we are sending modifications back). So if you look at the context.pyi - we cna have either DB model passed to it or the Pydantic eqwuivalent in case DB access is disabled. 
   
   That is not 100% compatible but for reading it should be (minus fields we decide not to serialize) and we already know that the case with internal APi will be slightly less performant and not everything will be possible that the users could do before - that's the whole point of the internal API to limit them  
   
   I have - for example a WIP/POC where i slightly modify LocalTaskJob interface where i use this technique to send BaseJobPydantic back from the server after creating LocalTaskJob instance (LocalTaskJob does not have other DB fields than BaseJob) and use it. Following the Pydantic classes, I think I will be able to do it with 0 impact on the 'regular' case (LocalTaskJob  will be used everywhere) - only in case of 'internal api' we will serialize/reserialize the object created. So impact on performance and behaviour for regular case will be not changed.
   
   In those cases return type of the methods will be LocalTaskJob | BaseJobPydantic and we will have Mypy to make sure that we have all fields in the Pydantic model. So for example when we will try to use a new field that is not defined in Pydantic version, Mypy will complain.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org