You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/28 13:42:38 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request, #26750: Remove DAG parsing from StandardTaskRunner

ephraimbuddy opened a new pull request, #26750:
URL: https://github.com/apache/airflow/pull/26750

   This makes the starting of StandardTaskRunner faster as the parsing of DAG will now be done at task_run.
   Also removed parsing of example dags when running a task
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham merged pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
jedcunningham merged PR #26750:
URL: https://github.com/apache/airflow/pull/26750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tirkarthi commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
tirkarthi commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r996901947


##########
airflow/cli/commands/task_command.py:
##########
@@ -364,14 +363,7 @@ def task_run(args, dag=None):
         print(f'Loading pickle id: {args.pickle}')
         dag = get_dag_by_pickle(args.pickle)
     elif not dag:
-        if args.local:
-            try:
-                dag = get_dag_by_deserialization(args.dag_id)
-            except AirflowException:
-                print(f'DAG {args.dag_id} does not exist in the database, trying to parse the dag_file')
-                dag = get_dag(args.subdir, args.dag_id)
-        else:
-            dag = get_dag(args.subdir, args.dag_id)
+        dag = get_dag(args.subdir, args.dag_id, include_examples=False)

Review Comment:
   I was trying out a patch for this issue : https://github.com/apache/airflow/issues/26555 . I just added an example of dynamic task mapping dag in this PR as a fixture : https://github.com/apache/airflow/pull/26678 . I used to use this dag in my test case calling `airflow tasks render` which used to pass earlier but failed after rebase. I don't have the PR created yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pingzh commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
pingzh commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r993758769


##########
airflow/cli/commands/task_command.py:
##########
@@ -364,14 +363,7 @@ def task_run(args, dag=None):
         print(f'Loading pickle id: {args.pickle}')
         dag = get_dag_by_pickle(args.pickle)
     elif not dag:
-        if args.local:

Review Comment:
   hi @ephraimbuddy could you please give more context about why this should be removed?
   
   it looks like your PR only tried to optimize when executing a task via fork. there is also an important part https://github.com/apache/airflow/pull/26750/files#diff-8b336b0706abbada03e8dce519e122384da2c913cc79ee4d638d0427a1342d41L47-L48 
   
   i think we should need `dag = get_dag_by_deserialization(args.dag_id)` please let me know your thoughts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r983771966


##########
airflow/cli/commands/task_command.py:
##########
@@ -369,9 +369,9 @@ def task_run(args, dag=None):
                 dag = get_dag_by_deserialization(args.dag_id)
             except AirflowException:
                 print(f'DAG {args.dag_id} does not exist in the database, trying to parse the dag_file')
-                dag = get_dag(args.subdir, args.dag_id)
+                dag = get_dag(args.subdir, args.dag_id, include_examples=False)
         else:
-            dag = get_dag(args.subdir, args.dag_id)
+            dag = get_dag(args.subdir, args.dag_id, include_examples=False)  # --raw

Review Comment:
   We don't need it anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r993777794


##########
airflow/cli/commands/task_command.py:
##########
@@ -364,14 +363,7 @@ def task_run(args, dag=None):
         print(f'Loading pickle id: {args.pickle}')
         dag = get_dag_by_pickle(args.pickle)
     elif not dag:
-        if args.local:
-            try:
-                dag = get_dag_by_deserialization(args.dag_id)
-            except AirflowException:
-                print(f'DAG {args.dag_id} does not exist in the database, trying to parse the dag_file')
-                dag = get_dag(args.subdir, args.dag_id)
-        else:
-            dag = get_dag(args.subdir, args.dag_id)
+        dag = get_dag(args.subdir, args.dag_id, include_examples=False)

Review Comment:
   Sorry for this late reply. Can you share the test case? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r982664204


##########
airflow/cli/commands/task_command.py:
##########
@@ -369,9 +369,9 @@ def task_run(args, dag=None):
                 dag = get_dag_by_deserialization(args.dag_id)
             except AirflowException:
                 print(f'DAG {args.dag_id} does not exist in the database, trying to parse the dag_file')
-                dag = get_dag(args.subdir, args.dag_id)
+                dag = get_dag(args.subdir, args.dag_id, include_examples=False)
         else:
-            dag = get_dag(args.subdir, args.dag_id)
+            dag = get_dag(args.subdir, args.dag_id, include_examples=False)  # --raw

Review Comment:
   If we do, can we add it above this line with more detail



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r982663899


##########
airflow/cli/commands/task_command.py:
##########
@@ -369,9 +369,9 @@ def task_run(args, dag=None):
                 dag = get_dag_by_deserialization(args.dag_id)
             except AirflowException:
                 print(f'DAG {args.dag_id} does not exist in the database, trying to parse the dag_file')
-                dag = get_dag(args.subdir, args.dag_id)
+                dag = get_dag(args.subdir, args.dag_id, include_examples=False)
         else:
-            dag = get_dag(args.subdir, args.dag_id)
+            dag = get_dag(args.subdir, args.dag_id, include_examples=False)  # --raw

Review Comment:
   Do we need this comment?
   ```suggestion
               dag = get_dag(args.subdir, args.dag_id, include_examples=False)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #26750:
URL: https://github.com/apache/airflow/pull/26750#issuecomment-1260948676

   Fixes the slowdown part of https://github.com/apache/airflow/issues/26573


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r993777794


##########
airflow/cli/commands/task_command.py:
##########
@@ -364,14 +363,7 @@ def task_run(args, dag=None):
         print(f'Loading pickle id: {args.pickle}')
         dag = get_dag_by_pickle(args.pickle)
     elif not dag:
-        if args.local:
-            try:
-                dag = get_dag_by_deserialization(args.dag_id)
-            except AirflowException:
-                print(f'DAG {args.dag_id} does not exist in the database, trying to parse the dag_file')
-                dag = get_dag(args.subdir, args.dag_id)
-        else:
-            dag = get_dag(args.subdir, args.dag_id)
+        dag = get_dag(args.subdir, args.dag_id, include_examples=False)

Review Comment:
   Sorry for this late reply, I was on vacation. Can you share the test case? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r982664741


##########
airflow/utils/cli.py:
##########
@@ -181,11 +181,11 @@ def get_dag_by_file_location(dag_id: str):
     return dagbag.dags[dag_id]
 
 
-def get_dag(subdir: str | None, dag_id: str) -> DAG:
+def get_dag(subdir: str | None, dag_id: str, include_examples=True) -> DAG:

Review Comment:
   should the default for this come from airflow.conf?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tirkarthi commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
tirkarthi commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r989733072


##########
airflow/cli/commands/task_command.py:
##########
@@ -364,14 +363,7 @@ def task_run(args, dag=None):
         print(f'Loading pickle id: {args.pickle}')
         dag = get_dag_by_pickle(args.pickle)
     elif not dag:
-        if args.local:
-            try:
-                dag = get_dag_by_deserialization(args.dag_id)
-            except AirflowException:
-                print(f'DAG {args.dag_id} does not exist in the database, trying to parse the dag_file')
-                dag = get_dag(args.subdir, args.dag_id)
-        else:
-            dag = get_dag(args.subdir, args.dag_id)
+        dag = get_dag(args.subdir, args.dag_id, include_examples=False)

Review Comment:
   `include_examples` is retrieved by default from configuration in `get_dag`. Can this change to have it always `False` be removed so that users who don't want examples to load can do so by using configuration? I was having a test case in my development PR that worked fine running an example dag and now it fails after rebase. This will be an issue to users too who might want to explore tasks command in cli with examples that worked before 2.4.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r993821390


##########
airflow/cli/commands/task_command.py:
##########
@@ -364,14 +363,7 @@ def task_run(args, dag=None):
         print(f'Loading pickle id: {args.pickle}')
         dag = get_dag_by_pickle(args.pickle)
     elif not dag:
-        if args.local:

Review Comment:
   > hi @ephraimbuddy could you please give more context about why this should be removed?
   > 
   > it looks like your PR only tried to optimize when executing a task via fork. there is also an important part https://github.com/apache/airflow/pull/26750/files#diff-8b336b0706abbada03e8dce519e122384da2c913cc79ee4d638d0427a1342d41L47-L48
   > 
   > i think we should need `dag = get_dag_by_deserialization(args.dag_id)` please let me know your thoughts
   
   We noticed a significant increase in task run duration with the previous change. Using `get_dag_by_deserialization` only delays the dag parsing(for `_start_by_fork`). Looking at `exec` I think we should devise another way to get the serialized dag for use because using the previous change, we had to parse the dag in TaskRunner process for `fork` which causes delays.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tirkarthi commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
tirkarthi commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r1012570908


##########
airflow/cli/commands/task_command.py:
##########
@@ -364,14 +363,7 @@ def task_run(args, dag=None):
         print(f'Loading pickle id: {args.pickle}')
         dag = get_dag_by_pickle(args.pickle)
     elif not dag:
-        if args.local:
-            try:
-                dag = get_dag_by_deserialization(args.dag_id)
-            except AirflowException:
-                print(f'DAG {args.dag_id} does not exist in the database, trying to parse the dag_file')
-                dag = get_dag(args.subdir, args.dag_id)
-        else:
-            dag = get_dag(args.subdir, args.dag_id)
+        dag = get_dag(args.subdir, args.dag_id, include_examples=False)

Review Comment:
   Thanks @ephraimbuddy for the patch. I will use it and also add you as co-author.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #26750: Remove DAG parsing from StandardTaskRunner

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #26750:
URL: https://github.com/apache/airflow/pull/26750#discussion_r1011799699


##########
airflow/cli/commands/task_command.py:
##########
@@ -364,14 +363,7 @@ def task_run(args, dag=None):
         print(f'Loading pickle id: {args.pickle}')
         dag = get_dag_by_pickle(args.pickle)
     elif not dag:
-        if args.local:
-            try:
-                dag = get_dag_by_deserialization(args.dag_id)
-            except AirflowException:
-                print(f'DAG {args.dag_id} does not exist in the database, trying to parse the dag_file')
-                dag = get_dag(args.subdir, args.dag_id)
-        else:
-            dag = get_dag(args.subdir, args.dag_id)
+        dag = get_dag(args.subdir, args.dag_id, include_examples=False)

Review Comment:
   ```diff
   
   diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
   index 982aa31fd5..ab05bb56a9 100644
   --- a/airflow/cli/commands/task_command.py
   +++ b/airflow/cli/commands/task_command.py
   @@ -578,13 +578,13 @@ def task_render(args):
            task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="memory"
        )
        ti.render_templates()
   -    for attr in task.__class__.template_fields:
   +    for attr in task.template_fields:
            print(
                textwrap.dedent(
                    f"""        # ----------------------------------------------------------
            # property: {attr}
            # ----------------------------------------------------------
   -        {getattr(task, attr)}
   +        {getattr(ti.task, attr)}
            """
                )
            )
   diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
   index c565f601d7..f3673fb2dd 100644
   --- a/tests/cli/commands/test_task_command.py
   +++ b/tests/cli/commands/test_task_command.py
   @@ -388,6 +388,54 @@ class TestCliTasks:
            assert 'echo "2016-01-01"' in output
            assert 'echo "2016-01-08"' in output
    
   +    def test_mapped_task_render(self):
   +        """
   +        tasks render should render and displays templated fields for a given mapped task
   +        """
   +        with redirect_stdout(io.StringIO()) as stdout:
   +            task_command.task_render(
   +                self.parser.parse_args(
   +                    [
   +                        "tasks",
   +                        "render",
   +                        "test_mapped_classic",
   +                        "consumer_literal",
   +                        "2022-01-01",
   +                        "--map-index",
   +                        "0",
   +                    ]
   +                )
   +            )
   +        # the dag test_mapped_classic has op_args=[[1], [2], [3]], so the first mapped task should have op_args=[1]
   +        output = stdout.getvalue()
   +        assert "[1]" in output
   +        assert "[2]" not in output
   +        assert "[3]" not in output
   +        assert "property: op_args" in output
   +
   +    def test_mapped_task_render_with_template(self):
   +        """
   +        tasks render should render and displays templated fields for a given mapped task
   +        """
   +        with redirect_stdout(io.StringIO()) as stdout:
   +            task_command.task_render(
   +                self.parser.parse_args(
   +                    [
   +                        "tasks",
   +                        "render",
   +                        "test_mapped_task_with_template",
   +                        "some_command",
   +                        "2022-01-01",
   +                        "--map-index",
   +                        "0",
   +                    ]
   +                )
   +            )
   +        # the dag test_mapped_classic has op_args=[[1], [2], [3]], so the first mapped task should have op_args=[1]
   +        output = stdout.getvalue()
   +        assert 'echo "2022-01-01"' in output
   +        assert 'echo "2022-01-08"' in output
   +
        def test_cli_run_when_pickle_and_dag_cli_method_selected(self):
            """
            tasks run should return an AirflowException when invalid pickle_id is passed
   diff --git a/tests/dags/test_mapped_task_with_templates.py b/tests/dags/test_mapped_task_with_templates.py
   new file mode 100644
   index 0000000000..fe0b8d0c49
   --- /dev/null
   +++ b/tests/dags/test_mapped_task_with_templates.py
   @@ -0,0 +1,36 @@
   +# Licensed to the Apache Software Foundation (ASF) under one
   +# or more contributor license agreements.  See the NOTICE file
   +# distributed with this work for additional information
   +# regarding copyright ownership.  The ASF licenses this file
   +# to you under the Apache License, Version 2.0 (the
   +# "License"); you may not use this file except in compliance
   +# with the License.  You may obtain a copy of the License at
   +#
   +#   http://www.apache.org/licenses/LICENSE-2.0
   +#
   +# Unless required by applicable law or agreed to in writing,
   +# software distributed under the License is distributed on an
   +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   +# KIND, either express or implied.  See the License for the
   +# specific language governing permissions and limitations
   +# under the License.
   +from __future__ import annotations
   +
   +import datetime
   +from textwrap import dedent
   +
   +from airflow import DAG
   +from airflow.operators.bash import BashOperator
   +
   +with DAG(dag_id="test_mapped_task_with_template", start_date=datetime.datetime(2022, 1, 1)) as dag:
   +    templated_command = dedent(
   +        """
   +    {% for i in range(5) %}
   +        echo "{{ ds }}"
   +        echo "{{ macros.ds_add(ds, 7)}}"
   +    {% endfor %}
   +    """
   +    )
   +    commands = [templated_command, "echo 1"]
   +
   +    BashOperator.partial(task_id="some_command").expand(bash_command=commands)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org