You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/06 19:06:16 UTC

[GitHub] [airflow] ashb opened a new pull request #12858: Store per-task TIDeps in serialized blob

ashb opened a new pull request #12858:
URL: https://github.com/apache/airflow/pull/12858


   Without this change sensors in "reschedule" mode were being instantly
   rescheduled because they didn't have the extra dep that
   BaseSensorOperator added.
   
   To fix that we need to include deps in the serialization format (but to
   save space only when they are not the default list). As of this PR right
   now, only built-in deps are allowed -- a custom dep will result in a DAG
   parse error.
   
   We can fix that for 2.0.x, as I think it is a _very_ uncommon thing to
   do.
   
   Fixes #12783
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #12858:
URL: https://github.com/apache/airflow/pull/12858#issuecomment-739570801


   exit 137 in backfill still :(


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #12858:
URL: https://github.com/apache/airflow/pull/12858#discussion_r537111191



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -375,6 +379,21 @@ def serialize_operator(cls, op: BaseOperator) -> dict:
                 op.operator_extra_links
             )
 
+        if op.deps is not BaseOperator.deps:
+            # Are the deps different to BaseOperator, if so serialize the class names!
+            # For Airflow 2.0 expediency we _only_ allow built in Dep classes.
+            # Fix this for 2.0.x or 2.1
+            deps = []
+            for dep in op.deps:
+                klass = type(dep)
+                module_name = klass.__module__
+                if not module_name.startswith("airflow.ti_deps.deps."):
+                    # Throw? Log?

Review comment:
       ```suggestion
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp edited a comment on pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
houqp edited a comment on pull request #12858:
URL: https://github.com/apache/airflow/pull/12858#issuecomment-750744813


   @ashb I tried to serialize with builtin deps disabled for all of our DAGs, which only seems to only increase the serialized json size by 1-2%, including our biggest DAG that contains 1000+ tasks (total json size 3mb). All of our tasks use custom deps to enable special scheduling logic, so they all contributed to the size increase. Given that, the size increase seems minor?
   
   In your comment, you mentioned only built-in deps are allowed for now to reduce space, i am curious what edge-case would have caused significant size increase with custom deps. I would like to help lessen this restriction to unblock our production deployment of 2.0.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #12858:
URL: https://github.com/apache/airflow/pull/12858


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #12858:
URL: https://github.com/apache/airflow/pull/12858#discussion_r537117781



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -375,6 +379,21 @@ def serialize_operator(cls, op: BaseOperator) -> dict:
                 op.operator_extra_links
             )
 
+        if op.deps is not BaseOperator.deps:
+            # Are the deps different to BaseOperator, if so serialize the class names!
+            # For Airflow 2.0 expediency we _only_ allow built in Dep classes.
+            # Fix this for 2.0.x or 2.1
+            deps = []
+            for dep in op.deps:
+                klass = type(dep)
+                module_name = klass.__module__
+                if not module_name.startswith("airflow.ti_deps.deps."):
+                    # Throw? Log?

Review comment:
       :D 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp edited a comment on pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
houqp edited a comment on pull request #12858:
URL: https://github.com/apache/airflow/pull/12858#issuecomment-750744813


   @ashb I tried to serialize with builtin deps disabled for all of our DAGs, which only seems to only increase the serialized json size by 1-2%, including our biggest DAG that contains 1000+ tasks (total json size 3mb). All of our tasks use custom deps to enable special scheduling logic, so they all contributed the size increase. Given that, the size increase seems minor?
   
   In your comment, you mentioned only built-in deps are allowed for now to reduce space, i am curious what edge-case would have caused significant size increase with custom deps. I would like to help lessen this restriction to unblock our production deployment of 2.0.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #12858:
URL: https://github.com/apache/airflow/pull/12858#issuecomment-750785321


   The space saving is mainly for extreme cases (such as 22k tasks in one dag)
   
   The main requirement here is to not blindly inflate a class name from the JSON as that poses a security risk (long term, if not now) - instead we should lookup the class in a pre-registered list like we do for operator links.
   
   I had hoped to get this done properly for 2.0.0 but ran out of time, and decided that it was better to release than delay for this improvement


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12858:
URL: https://github.com/apache/airflow/pull/12858#issuecomment-739551611


   [The Workflow run](https://github.com/apache/airflow/actions/runs/404398120) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb edited a comment on pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
ashb edited a comment on pull request #12858:
URL: https://github.com/apache/airflow/pull/12858#issuecomment-750785321


   The space saving is mainly for extreme cases (such as 22k tasks in one dag) but size also affects deserialization time - when I want milliseconds to matter :)
   
   The main requirement here is to not blindly inflate a class name from the JSON as that poses a security risk (long term, if not now) - instead we should lookup the class in a pre-registered list like we do for operator links.
   
   I had hoped to get this done properly for 2.0.0 but ran out of time, and decided that it was better to release than delay for this improvement


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #12858:
URL: https://github.com/apache/airflow/pull/12858#issuecomment-750744813


   @ashb I tried to serialize with builtin deps disabled for all of our DAGs, which only seems to only increase the serialized json size by 1-2%, including our biggest DAG that contains 1000+ tasks (total size 3mb). All of our tasks use custom deps to enable special scheduling logic, so they all contributed the size increase. Given that, the size increase seems minor?
   
   In your comment, you mentioned only built-in deps are allowed for now to reduce space, i am curious what edge-case would have caused significant size increase with custom deps. I would like to help lessen this restriction to unblock our production deployment of 2.0.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on pull request #12858: Store per-task TIDeps in serialized blob

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #12858:
URL: https://github.com/apache/airflow/pull/12858#issuecomment-750814058


   Got it, so what's your plan for implementing custom dep support? I can help draft a PR if you are busy with other things.
   
   I am thinking of using dictionary encoding to reduce the size, i.e. deps encoded as `[1, 2, 5, 7]`. However, I am not sure whether this will actually help reduce deserialization time since we now need to do extra lookups to reconstruct `deps`. Do you have the 22k tasks dag available somewhere for me to do some benchmarking?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org