You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/02 14:23:21 UTC

[GitHub] [airflow] ashb opened a new pull request, #26134: eNo cattrs in lineage

ashb opened a new pull request, #26134:
URL: https://github.com/apache/airflow/pull/26134

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r962163277


##########
airflow/lineage/__init__.py:
##########
@@ -64,33 +54,38 @@ def get_backend() -> Optional[LineageBackend]:
     return None
 
 
-def _get_instance(meta: Metadata):
-    """Instantiate an object from Metadata"""
-    cls = import_string(meta.type_name)
-    return structure(meta.data, cls)
+def _render_object(obj: Any, context: "Context") -> dict:
+    return context['ti'].task.render_template(obj, context)
 
 
-def _render_object(obj: Any, context) -> Any:
-    """Renders a attr annotated object. Will set non serializable attributes to none"""
-    return structure(
-        json.loads(
-            ENV.from_string(json.dumps(unstructure(obj), default=lambda o: None))
-            .render(lazy_mapping_from_context(context))
-            .encode('utf-8')
-        ),
-        type(obj),
-    )
+def _deserialize(serialized: dict):
+    from airflow.serialization.serialized_objects import BaseSerialization
 
+    # This is only use in the worker side, so it is okay to "blindly" import the specified class here.
+    cls = import_string(serialized['__type'])
+    return cls(**BaseSerialization.deserialize(serialized['__var']))
 
-def _to_dataset(obj: Any, source: str) -> Optional[Metadata]:
-    """Create Metadata from attr annotated object"""
-    if not attr.has(obj):
-        return None
 
-    type_name = obj.__module__ + '.' + obj.__class__.__name__
-    data = unstructure(obj)
+def _serialize(objs: List[Any], source: str):
+    """Serialize an attrs-decorated class to JSON"""
+    from airflow.serialization.serialized_objects import BaseSerialization
 
-    return Metadata(type_name, source, data)
+    for obj in objs:
+        if not attr.has(obj):
+            continue

Review Comment:
   Should we remove the following line?:
   
   https://github.com/apache/airflow/blob/6a152773d7a6ab340d87fed81a571670b36bf356/airflow/lineage/__init__.py#L170-L171



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r962841379


##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)

Review Comment:
   @kaxil WDYT? Leave this "as it was" or fix it now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r962163864


##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)

Review Comment:
   Thinking out loud, isn't it in-consistent that non-attr objects are not excluded to be set as "outlets" but excluded for inlets?
   
   Example the following will evaluate to `True` currently on L149:
   
   ```python
   assert op1.outlets == [a, file3]
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r963045023


##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)

Review Comment:
   Would lean towards leave this "as it was" in this PR and fix it in a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r962161250


##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)

Review Comment:
   ```suggestion
           op1.pre_execute(ctx1)
           assert op1.outlets == [a, file3]
           op1.post_execute(ctx1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r962175455


##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)

Review Comment:
   Yeah possibly - I didn't really change this behaviour so I didn't think about it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb merged pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
ashb merged PR #26134:
URL: https://github.com/apache/airflow/pull/26134


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r962162405


##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)

Review Comment:
   ```suggestion
           file3 = File("/tmp/does_not_exist_3")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r962161250


##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)

Review Comment:
   ```suggestion
           op1.pre_execute(ctx1)
           assert op1.outlets == [a, file3]
           op1.post_execute(ctx1)
   ```



##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)
+
+        op2.pre_execute(ctx2)
+        assert op2.inlets == [file3]

Review Comment:
   shouldn't this be `[a, file3]`?



##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)

Review Comment:
   This will verify that non-attr object was indeed set as Outlet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r962163864


##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)

Review Comment:
   Thinking out loud, isn't it in-consistent that non-attr objects are not excluded to be set as "outlets" but excluded for inlets?
   
   Example the following will evaluate to True currently on L149:
   
   ```
   assert op1.outlets == [a, file3]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r962162632


##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)
+
+        op2.pre_execute(ctx2)
+        assert op2.inlets == [file3]

Review Comment:
   shouldn't this be `[a, file3]`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26134: Remove `cattrs` from airflow.lineage

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26134:
URL: https://github.com/apache/airflow/pull/26134#discussion_r962161328


##########
tests/lineage/test_lineage.py:
##########
@@ -118,6 +120,37 @@ def test_lineage_render(self, dag_maker):
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
 
+    def test_non_attr_outlet(self, dag_maker):
+        class A:
+            pass
+
+        a = A()
+
+        f3s = "/tmp/does_not_exist_3"
+        file3 = File(f3s)
+
+        with dag_maker(dag_id='test_prepare_lineage'):
+            op1 = EmptyOperator(
+                task_id='leave1',
+                outlets=[a, file3],
+            )
+            op2 = EmptyOperator(task_id='leave2', inlets='auto')
+
+            op1 >> op2
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+        ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE})
+
+        # prepare with manual inlets and outlets
+        op1.pre_execute(ctx1)
+        op1.post_execute(ctx1)

Review Comment:
   This will verify that non-attr object was indeed set as Outlet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org