You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "potiuk (via GitHub)" <gi...@apache.org> on 2023/02/26 14:51:10 UTC

[GitHub] [airflow] potiuk opened a new pull request, #29776: Add Pydantic-powered ORM models serialization for internal API.

potiuk opened a new pull request, #29776:
URL: https://github.com/apache/airflow/pull/29776

   Add basic serialization capabilities for the ORM SqlAlchemy models that we use on the client side of the Internal API. Serializing the whole ORM models is rather complex, therefore it seems much more reasonable to convert the ORM models into serializable form and use them - rather than the SQLAlchemy models.
   
   There are just a handful of those models that we need to serialize, and it is important to maintain typing of the fields in the objects for MyPy verification so we can allow some level of duplication and redefine the models as pure Python objects.
   
   We only need one-way converstion (from database models to Python models), because all the DB operations and modifications of the Database entries will be done in the internal API server, so the server side of any method will be able to use primary key stored in the serializable object, to retrieve the actual DB model to update.
   
   We also need to serialization to work both way - an easy way to convert such Python classees to json and back - including validation.
   
   We could serialize those models manually, but this would be quite an overhead to develop and maintain - therefore we are harnessing the power of Pydantic, that has already ORM mapping to plain Python (Pydantic) classes built in.
   
   This PR implements definition of the Pydantic classes and tests for the classes testing:
   
   * conversion of the ORM models to Pydantic objects
   * serialization of the Pydantic classes to json
   * deserialization of the json-serialized classes to Pydantic objects
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118547021


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   Just a wondering why the choise is `pydantic` and not `attrs` or `dataclasses` which we already use in Airflow and supported in SQLA:
   - [2.0.x](https://docs.sqlalchemy.org/en/20/orm/dataclasses.html#integration-with-dataclasses-and-attrs)
   - [1.4.x](https://docs.sqlalchemy.org/en/14/orm/dataclasses.html#integration-with-dataclasses-and-attrs)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk merged PR #29776:
URL: https://github.com/apache/airflow/pull/29776


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1447131918

   > Quick question. If we take this approach, why do we need then to send the whole serialized object and not the primary keys only? Is it to avoid to refactore too many functions and have a backward compatible solution? I am not saying it is wrong, I am just curious :)
   
   Because much of our code that runs on the "client" side accesses those object's fields and even related instances fields. For  example when local task job is run, it acesses a number of local task job fields. Good example is our "context" that each of the tasks "execute" method gets. https://github.com/apache/airflow/blob/main/airflow/utils/context.pyi -> if you look there, task_instance field is actually a TaskInstance object. And you would like all TaskInstance fields to be available in the execute's method of the operator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118339128


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   Yeah, i looked at pydantic-sqlalchemy and for now i decided not to use it. I will try and see what will be the quality of generated code vs. The manually written one.
   
   But i can see if it can be integrated and generate the Pydantic classes automatically - that would save some effort (but not a lot to be honest). There are a few risks:
   
   * 'Still experimental' status of it
   
   * We likely do not want to convert all of the models and fields automatically - we will want to do skip some fields or treat them a bit differently and either decide to add some validations or not. Also we might want to exclude certain models from serializing if the serialized form will drag too much of useless data. There are some back references in our models and they might cause recursive serialization and  attempting to serialize more than we need).
   * Those models of ours that we want to serialize change extremely rarely so benefit from automating the conversion are very small vs. the cost connected with fixing potential 'experimental' issues.
   
   But I will try :) 
   
   The print statement are for now to show the serialized form so that you can manually see them when. Running - yes we can remove them eventually the asserts are good enough to show that serialization /serialization works 
   
   
   Re: how to use it - yes in all internal_api calls that we need to pas the models we should make sure that the db model gets created or retrieved on the 'other' side of the call and returned as serialized/Pydantic form.
   
   And we can even optimise it in some cases that we see the need of actually for performance we can also decide to return the DB model instead, because they are technically equivalent and using them is the same as long as the model would be detached anyway and they are read-only (or as long as we are sending modifications back). So if you look at the context.pyi - we cna have either DB model passed to it or the Pydantic eqwuivalent in case DB access is disabled. 
   
   That is not 100% compatible but for reading it should be (minus fields we decide not to serialize) and we already know that the case with internal APi will be slightly less performant and not everything will be possible that the users could do before - that's the whole point of the internal API to limit them  
   
   I have - for example a WIP/POC where i slightly modify LocalTaskJob interface where i use this technique to send BaseJobPydantic back from the server after creating LocalTaskJob instance (LocalTaskJob does not have other DB fields than BaseJob) and use it. Following the Pydantic classes, I think I will be able to do it with 0 impact on the 'regular' case (LocalTaskJob  will be used everywhere) - only in case of 'internal api' we will serialize/reserialize the object created. So impact on performance and behaviour for regular case will be not changed.
   
   In those cases return type of the methods will be LocalTaskJob | BaseJobPydantic and we will have Mypy to make sure that we have all fields in the Pydantic model. So for example when we will try to use a new field that is not defined in Pydantic version, Mypy will complain.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118647851


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   I did try to make them (both) work. But either I am too stupid or the documentation is completely misleady. I simply don't know how to make sure classes and related entitites (if they have them) can be serialized in a single go with them.  But if someone would like to make a parallell POC witth them - I would love to see it. I think wiht this PR it's rather straightforward what we want to achieve and which entites should be made serializable.
   
   If we can make them without Pydantic - I am all ears (not that Pydantic **just works** with just defining the entities and marking them to work in `orm_mode`. I wanted to avoid writing any code on our side to do the serialization, so if we could make it even simpler than that - I am all ears actually :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118647851


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   I did try to make them (both) work. But either I am too stupid or the documentation is completely misleading. I simply don't know how to make sure classes and related entitites (if they have them) can be serialized in a single go with them.  But if someone would like to make a parallell POC witth them - I would love to see it. I think wiht this PR it's rather straightforward what we want to achieve and which entites should be made serializable.
   
   If we can make them without Pydantic - I am all ears (not that Pydantic **just works** with just defining the entities and marking them to work in `orm_mode`. I wanted to avoid writing any code on our side to do the serialization, so if we could make it even simpler than that - I am all ears actually :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1456141063

   Thank you! This looks really cool.
   
   Just to confirm I understand it correctly:
   After this PR is merged, we need to change methods using TaskInstance parameter to use `TaskInstance|TaskInstancePydantic`,
   e.g. in run_raw_task
   https://github.com/apache/airflow/blob/f1b565f1175121451a5bdfaaa19831f0ed8b4d54/airflow/cli/commands/task_command.py#L277
   I think the `ti._run_raw_task` need also be updated (to static method?) to work with both  TaskInstance and TaskInstancePydantic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1454803956

   Any more comments @mhenc ? Can we merge this one and proceed with some refactors of existing methods?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1459080297

   Merged. I will try to complete my Local Task Job refactor with it - but feel free to use it to implement the other (simpler) parts of AIP-44 with it (happy to help with reviews).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118793268


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   > I did try to make them (both) work.
   
   That basically what I was to know. 🤣 I do not have any concern about `pydantic` especially if it configure more straightforward rather than other options
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "pierrejeambrun (via GitHub)" <gi...@apache.org>.
pierrejeambrun commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118172885


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   There are a few prints in the test that should be removed I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1445380776

   cc: @vincbeck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1447074961

   > We only need one-way converstion (from database models to Python models), because all the DB operations and modifications of the Database entries will be done in the internal API server, so the server side of any method will be able to use primary key stored in the serializable object, to retrieve the actual DB model to update.
   
   Quick question. If we take this approach, why do we need then to send the whole serialized object and not the primary keys only? Is it to avoid to have to refactore too many functions and have a backward compatible solution? I am not saying it is saying, I am just curious :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118647851


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   I did try to make them (both) work. But either I am too stupid or the documentation is completely misleady. I simply don't know how to make sure classes and related entitites (if they have them) can be serialized in a single go with them.  But if someone would like to make a parallell POC witth them - I would love to see it. I think wiht this PR it's rather straightforward what we want to achieve and which entites should be made serializable.
   
   If we can make them without Pydantic - I am all eaars (not that Pydantic **just works** with just defining the entities and marking them to work in `orm_mode`. I wanted to avoid writing any code on our side to do the serialization, so if we could make it even simpler than that - I am all ears actually :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1445406396

   While working on the next step of the POC I also added `task_instance` field to the BaseJobPydantic to show how nicely (and automatically Pydantic takes care about relations - when you use `BaseJobPydantic.from_orm(LocalTaskJob) it will automatically retrieve and convert to Pydantic also the linked TaskInstance object and when we serialize/deserialize the Pydantic object, all the infrormation that is kept in the TasInstance related to the LocalTaskJob is kept in the serialized form.
   
   This is exactly what Pydantic's "from_orm` magic does.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "pierrejeambrun (via GitHub)" <gi...@apache.org>.
pierrejeambrun commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118172885


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   There are a few print statement in tests that we can remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1447140031

   I see, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1458907186

   > Thank you! This looks really cool.
   > 
   > Just to confirm I understand it correctly: After this PR is merged, we need to change methods using TaskInstance parameter to use `TaskInstance|TaskInstancePydantic`, e.g. in run_raw_task
   > 
   > https://github.com/apache/airflow/blob/f1b565f1175121451a5bdfaaa19831f0ed8b4d54/airflow/cli/commands/task_command.py#L277
   > 
   > 
   > I think the `ti._run_raw_task` need also be updated (to static method?) to work with both TaskInstance and TaskInstancePydantic.
   
   Yeah. Rouhgly speaking. This might be a bit more involved in a few places (for example we will have to split some methods into two etc. But yeah, roughly speaking that what's it is. I have an (unfinished) example of local job refactor that would use that approach https://github.com/apache/airflow/commit/f11f5afbddfe39a9f0e31bc1fc1ba3cc1dfa5394


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "pierrejeambrun (via GitHub)" <gi...@apache.org>.
pierrejeambrun commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118172885


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   There are a few prints statement in tests that should be removed I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1445444591

   And now I also added all the "references" in DatasetModel with back-references, to show that Pydantic also handles it well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29776:
URL: https://github.com/apache/airflow/pull/29776#discussion_r1118339128


##########
tests/models/test_pydantic_models.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import parse_raw_as
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.pydantic.base_job import BaseJobPydantic
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.pydantic.dag_run import DagRunPydantic
+from airflow.models.pydantic.dataset import DatasetEventPydantic
+from airflow.models.pydantic.taskinstance import TaskInstancePydantic
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.models import DEFAULT_DATE
+
+
+def test_serializing_pydantic_task_instance(session, create_task_instance):
+    dag_id = "test-dag"
+    ti = create_task_instance(dag_id=dag_id, session=session)
+    ti.state = State.RUNNING
+    ti.next_kwargs = {"foo": "bar"}
+    session.commit()
+
+    pydantic_task_instance = TaskInstancePydantic.from_orm(ti)
+
+    json_string = pydantic_task_instance.json()
+    print(json_string)

Review Comment:
   Yeah, i looked at pydantic-sqlalchemy and for now i decided not to use it. I will try and see what will be the quality of generated code vs. The manually written one.
   
   But i can see if it can be integrated and generate the Pydantic classes automatically - that would save some effort (but not a lot to be honest). There are a few risks:
   
   * 'Still experimental' status of it
   
   * We likely do not want to convert all of the models and fields automatically - we will want to do skip some fields or treat them a bit differently and either decide to add some validations or not. Also we might want to exclude certain models from serializing if the serialized form will drag too much of useless data. There are some back references in our models and they might cause recursive serialization and  attempting to serialize more than we need).
   * Those models of ours that we want to serialize change extremely rarely so benefit from automating the conversion are very small vs. the cost connected with fixing potential 'experimental' issues.
   
   But I will try :) 
   
   The print statement are for now to show the serialized form so that you can manually see them when. Running - yes we can remove them eventually the asserts are good enough to show that serialization /serialization works 
   
   
   Re: how to use it - yes in all internal_api calls that we need to pas the models we should make sure that the db model gets created or retrieved on the 'other' side of the call and returned as serialized/Pydantic form.
   
   And we can even optimise it in some cases that we see the need of actually for performance. We can also decide to return the DB model instead (in regular DB mode) because they are technically equivalent and using them is the same as long as the model would be detached anyway and the model is read-only (or as long as we will implement sending modifications back). So if you look at the context.pyi - we can have either DB model passed to Context or the Pydantic equivalent in case DB access is disabled. 
   
   That is not 100% compatible but for reading it should be (minus fields we decide not to serialize) and we already know that the case with internal APi will be slightly less performant and not everything will be possible that the users could do before - that's the whole point of the internal API to limit them  
   
   I have - for example a WIP/POC where i slightly modify LocalTaskJob interface where i use this technique to send BaseJobPydantic back from the server after creating LocalTaskJob instance (LocalTaskJob does not have other DB fields than BaseJob) and use it. Following the Pydantic classes, I think I will be able to do it with 0 impact on the 'regular' case (LocalTaskJob  will be used everywhere) - only in case of 'internal api' we will serialize/reserialize the object created. So impact on performance and behaviour for regular case will be not changed.
   
   In those cases return type of the methods will be LocalTaskJob | BaseJobPydantic and we will have Mypy to make sure that we have all fields in the Pydantic model. So for example when we will try to use a new field that is not defined in Pydantic version, Mypy will complain.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29776: Add Pydantic-powered ORM models serialization for internal API.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29776:
URL: https://github.com/apache/airflow/pull/29776#issuecomment-1458910076

   If there are no more comments, I would love to merge this one (need a committer approval) and then I could follow up withe the example - and then similar approach could be used by other AIP-44 related PRs where we need to serialize DB models for the client API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org