You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/19 15:14:58 UTC

[GitHub] [airflow] potiuk opened a new pull request, #25161: Robust context of dag parsing

potiuk opened a new pull request, #25161:
URL: https://github.com/apache/airflow/pull/25161

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936616784


##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"

Review Comment:
   The decision is already base on that indeed (look at the example). So this is just "extra" metadata on the context. I think there is no harm to keep it?



##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"

Review Comment:
   The decision is already based on that indeed (look at the example). So this is just "extra" metadata on the context. I think there is no harm to keep it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Robust context of dag parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r927905128


##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"
+
+
+class AirflowParsingContext(NamedTuple):
+    """Context of parsing for the DAG."""
+
+    context_type: Optional[AirflowParsingContextType]
+    dag_id: Optional[str]
+    task_id: Optional[str]
+
+
+_AIRFLOW_PARSING_CONTEXT = "_AIRFLOW_PARSING_CONTEXT"
+_AIRFLOW_PARSING_CONTEXT_DAG_ID = "_AIRFLOW_PARSING_CONTEXT_DAG_ID"
+_AIRFLOW_PARSING_CONTEXT_TASK_ID = "_AIRFLOW_PARSING_CONTEXT_TASK_ID"
+
+
+@contextmanager
+def _airflow_parsing_context_manager(
+    context: AirflowParsingContextType, dag_id: Optional[str] = None, task_id: Optional[str] = None
+):
+    os.environ[_AIRFLOW_PARSING_CONTEXT] = context.value
+    if context == AirflowParsingContextType.TASK_EXECUTION:
+        os.environ[_AIRFLOW_PARSING_CONTEXT_DAG_ID] = dag_id if dag_id is not None else ""
+        os.environ[_AIRFLOW_PARSING_CONTEXT_TASK_ID] = task_id if task_id is not None else ""

Review Comment:
   Actually MyPy Complains if you do that. But indeed we should rather DELETE the values (and we should actually restore previous values not replace them with None).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936016581


##########
airflow/executors/local_executor.py:
##########
@@ -78,14 +79,17 @@ def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
 
         self.log.info("%s running %s", self.__class__.__name__, command)
         setproctitle(f"airflow worker -- LocalExecutor: {command}")
-        if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
-            state = self._execute_work_in_subprocess(command)
-        else:
-            state = self._execute_work_in_fork(command)
-
-        self.result_queue.put((key, state))
-        # Remove the command since the worker is done executing the task
-        setproctitle("airflow worker -- LocalExecutor")
+        with _airflow_parsing_context_manager(
+            AirflowParsingContextType.TASK_EXECUTION, dag_id=key[0], task_id=key[1]
+        ):

Review Comment:
   This really comes from the non-consistent way our executors/runners are implemented (or maybe I do not understand it fully) - so this is more of a "safety net" - maybe not needed but nor harmful either.
   
   The problem is that the way celery/local executor works is a but inconsistent. While we have the context manager in executor layer, you say that we should implement them in Local Executor at "runnner" layer. 
   
   Seems rather inconsistent if we are going to get a consistent "executor" interface level.  But I believe (see the comment above  by @pingzh https://github.com/apache/airflow/pull/25161#pullrequestreview-1044262872 - that we can get rid of this double "context" - when we also get rid of double "parsing" with AIP-45.  
   
   Should we leave it in parallel then and get rid when we get rid of the double parsing as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #25161:
URL: https://github.com/apache/airflow/pull/25161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25161:
URL: https://github.com/apache/airflow/pull/25161#issuecomment-1203866214

   Now I think it should all be fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25161:
URL: https://github.com/apache/airflow/pull/25161#issuecomment-1198448143

   > General point: lets mark anything we do here as experimental as Dag Fetcher (AIP-5) may give us a nicer way of doing this
   
   We did already:
   
   > There is an experimental approach that you can take to optimize this behaviour. Note that it is not always
   possible to use (for example when generation of subsequent DAGs depends on the previous DAGs) or when
   there are some side-effects of your DAGs generation. Also the code snippet below is pretty complex and while
   we tested it and it works in most circumstances, there might be cases where detection of the currently
   parsed DAG will fail and it will revert to creating all the DAGs or fail. Use this solution with care and
   test it thoroughly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r937580391


##########
airflow/executors/base_executor.py:
##########
@@ -332,10 +332,26 @@ def slots_available(self):
             return sys.maxsize
 
     @staticmethod
-    def validate_command(command: List[str]) -> None:
-        """Check if the command to execute is airflow command"""
+    def validate_airflow_tasks_run_command(command: List[str]) -> Tuple[Optional[str], Optional[str]]:

Review Comment:
   Oh. This counts as a breaking change if anyone has written a custom executor and was calling `validate_command` (which they should be ideally) it would now blow up.
   
   Given that maybe two separate functions might be the way to go?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Robust context of dag parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r927902796


##########
docs/apache-airflow/howto/dynamic-dag-generation.rst:
##########
@@ -140,3 +140,120 @@ Each of them can run separately with related configuration
 
 .. warning::
   Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion <https://github.com/apache/airflow/discussions/21278#discussioncomment-2103559>`_ for more details
+
+
+Optimizing DAG parsing delays during execution
+----------------------------------------------
+
+Sometimes when you generate a lot of Dynamic DAGs from a single DAG file, it might cause unnecessary delays
+when the DAG file is parsed during task execution. The impact is a delay before a task starts.
+
+Why is this happening? You might not be aware but just before your task is executed,
+Airflow parses the Python file the DAG comes from.
+
+The Airflow Scheduler (or DAG Processor) requires loading of a complete DAG file to process all metadata.
+However, task execution requires only a single DAG object to execute a task. Knowing this, we can
+skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time.
+This optimization is most effective when the number of generated DAGs is high.
+
+There is an experimental approach that you can take to optimize this behaviour. Note that it is not always
+possible to use (for example when generation of subsequent DAGs depends on the previous DAGs) or when
+there are some side-effects of your DAGs generation. Also the code snippet below is pretty complex and while
+we tested it and it works in most circumstances, there might be cases where detection of the currently
+parsed DAG will fail and it will revert to creating all the DAGs or fail. Use this solution with care and
+test it thoroughly.
+
+A nice example of performance improvements you can gain is shown in the
+`Airflow's Magic Loop <https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629>`_ blog post
+that describes how parsing during task execution was reduced from 120 seconds to 200 ms.
+
+Airflow 2.4+
+............
+
+In Airflow 2.4 the following variables you can use
+``airflow.utils.dag_parsing_context import get_parsing_context`` method to retrieve the current context.
+
+Upon iterating over the collection of things to generate DAGs for, you can use the context to determine
+whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only
+a single DAG object (when executing the task):
+
+.. code-block:: python
+  :emphasize-lines: 6,7,8,12,13
+
+  import os
+  from airflow.models.dag import DAG
+  from airflow.utils.dag_parsing_context import get_parsing_context, AirflowParsingContextType
+
+  current_dag = None
+  parsing_context = get_parsing_context()
+  if parsing_context and parsing_context.context_type == AirflowParsingContextType.TASK_EXECUTION:
+      current_dag = parsing_context.dag_id

Review Comment:
   Indeed :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Robust context of dag parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r926396366


##########
airflow/executors/base_executor.py:
##########
@@ -332,10 +333,17 @@ def slots_available(self):
             return sys.maxsize
 
     @staticmethod
-    def validate_command(command: List[str]) -> None:
-        """Check if the command to execute is airflow command"""
+    def validate_command(command: List[str]) -> Tuple[Optional[str], Optional[str]]:

Review Comment:
   Crossed my mind too :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25161:
URL: https://github.com/apache/airflow/pull/25161#issuecomment-1203195318

   > > We did already:
   > 
   > Ah, but it's not marked with the "experimental" sphinx tag https://github.com/apache/airflow/blame/main/docs/apache-airflow/listeners.rst#L31
   
   Added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936548785


##########
airflow/executors/local_executor.py:
##########
@@ -78,14 +79,17 @@ def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
 
         self.log.info("%s running %s", self.__class__.__name__, command)
         setproctitle(f"airflow worker -- LocalExecutor: {command}")
-        if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
-            state = self._execute_work_in_subprocess(command)
-        else:
-            state = self._execute_work_in_fork(command)
-
-        self.result_queue.put((key, state))
-        # Remove the command since the worker is done executing the task
-        setproctitle("airflow worker -- LocalExecutor")
+        with _airflow_parsing_context_manager(
+            AirflowParsingContextType.TASK_EXECUTION, dag_id=key[0], task_id=key[1]
+        ):

Review Comment:
   Ok let me then trace it step-by step to see exactly what's going on there. Will be a goo learning to see where we are now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25161:
URL: https://github.com/apache/airflow/pull/25161#issuecomment-1203205163

   Jsut remaining question is about where to inject the parsing_context - @pingzh - any comments here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25161:
URL: https://github.com/apache/airflow/pull/25161#issuecomment-1202409324

   I think it should get green this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r937601155


##########
airflow/executors/base_executor.py:
##########
@@ -332,10 +332,26 @@ def slots_available(self):
             return sys.maxsize
 
     @staticmethod
-    def validate_command(command: List[str]) -> None:
-        """Check if the command to execute is airflow command"""
+    def validate_airflow_tasks_run_command(command: List[str]) -> Tuple[Optional[str], Optional[str]]:

Review Comment:
   I was basically waiting if somone will raise it (as I thought I am overthinking it :)).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r937600188


##########
airflow/executors/base_executor.py:
##########
@@ -332,10 +332,26 @@ def slots_available(self):
             return sys.maxsize
 
     @staticmethod
-    def validate_command(command: List[str]) -> None:
-        """Check if the command to execute is airflow command"""
+    def validate_airflow_tasks_run_command(command: List[str]) -> Tuple[Optional[str], Optional[str]]:

Review Comment:
   Yeah. I thought It might be an issue. I wiil just add back-compat method 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Robust context of dag parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r927901709


##########
docs/apache-airflow/howto/dynamic-dag-generation.rst:
##########
@@ -140,3 +140,120 @@ Each of them can run separately with related configuration
 
 .. warning::
   Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion <https://github.com/apache/airflow/discussions/21278#discussioncomment-2103559>`_ for more details
+
+
+Optimizing DAG parsing delays during execution
+----------------------------------------------
+
+Sometimes when you generate a lot of Dynamic DAGs from a single DAG file, it might cause unnecessary delays
+when the DAG file is parsed during task execution. The impact is a delay before a task starts.
+
+Why is this happening? You might not be aware but just before your task is executed,
+Airflow parses the Python file the DAG comes from.
+
+The Airflow Scheduler (or DAG Processor) requires loading of a complete DAG file to process all metadata.
+However, task execution requires only a single DAG object to execute a task. Knowing this, we can
+skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time.
+This optimization is most effective when the number of generated DAGs is high.
+
+There is an experimental approach that you can take to optimize this behaviour. Note that it is not always
+possible to use (for example when generation of subsequent DAGs depends on the previous DAGs) or when
+there are some side-effects of your DAGs generation. Also the code snippet below is pretty complex and while
+we tested it and it works in most circumstances, there might be cases where detection of the currently
+parsed DAG will fail and it will revert to creating all the DAGs or fail. Use this solution with care and
+test it thoroughly.
+
+A nice example of performance improvements you can gain is shown in the
+`Airflow's Magic Loop <https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629>`_ blog post
+that describes how parsing during task execution was reduced from 120 seconds to 200 ms.
+
+Airflow 2.4+
+............
+
+In Airflow 2.4 the following variables you can use
+``airflow.utils.dag_parsing_context import get_parsing_context`` method to retrieve the current context.
+
+Upon iterating over the collection of things to generate DAGs for, you can use the context to determine
+whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only
+a single DAG object (when executing the task):
+
+.. code-block:: python
+  :emphasize-lines: 6,7,8,12,13
+
+  import os
+  from airflow.models.dag import DAG
+  from airflow.utils.dag_parsing_context import get_parsing_context, AirflowParsingContextType
+
+  current_dag = None
+  parsing_context = get_parsing_context()
+  if parsing_context and parsing_context.context_type == AirflowParsingContextType.TASK_EXECUTION:
+      current_dag = parsing_context.dag_id
+
+  for thing in list_of_things:
+      dag_id = f"generated_dag_{thing}"
+      if current_dag is not None and current_dag != dag_id:
+          continue  # skip generation of non-selected DAG
+
+      dag = DAG(dag_id=dag_id, ...)
+      globals()[dag_id] = dag
+
+
+
+Airflow 2.3 and below
+......................
+
+In Airflow 2.3 and below, upon evaluation of a DAG file, command line arguments are supplied
+which we can use to determine which Airflow component performs parsing:
+
+* Scheduler/DAG Processor args: ``["airflow", "scheduler"]`` or ``["airflow", "dag-processor"]``
+* Task execution args: ``["airflow", "tasks", "run", "dag_id", "task_id", ...]``
+
+However, depending on the executor used and forking model, those args might be available via ``sys.args``
+or via name of the process running. Airflow either executes tasks via running a new Python interpreter or
+sets the name of the process as "airflow task supervisor: {ARGS}" in case of celery forked process or
+"airflow task runner: dag_id task_id" in case of local executor forked process.
+
+Upon iterating over the collection of things to generate DAGs for, you can use these arguments to determine
+whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only
+a single DAG object (when executing the task):
+
+.. code-block:: python
+  :emphasize-lines: 7,8,9,19,20,24,25,31,32
+
+  import sys
+  import ast
+  import setproctitle
+  from airflow.models.dag import DAG
+
+  current_dag = None
+  if len(sys.argv) > 3 and sys.argv[1] == "tasks":
+      # task executed by starting a new Python interpreter
+      current_dag = sys.argv[3]
+  else:
+      try:
+          PROCTITLE_SUPERVISOR_PREFIX = "airflow task supervisor: "
+          PROCTITLE_TASK_RUNNER_PREFIX = "airflow task runner: "
+          proctitle = str(setproctitle.getproctitle())
+          if proctitle.startswith(PROCTITLE_SUPERVISOR_PREFIX):
+              # task executed via forked process in celery
+              args_string = proctitle[len(PROCTITLE_SUPERVISOR_PREFIX) :]
+              args = ast.literal_eval(args_string)
+              if len(args) > 3 and args[1] == "tasks":
+                  current_dag = args[3]
+          elif proctitle.startswith(PROCTITLE_TASK_RUNNER_PREFIX):
+              # task executed via forked process in standard_task_runner
+              args = proctitle[len(PROCTITLE_TASK_RUNNER_PREFIX) :].split(" ")
+              if len(args) > 0:
+                  current_dag = args[0]
+      except Exception:
+          pass

Review Comment:
   Yeah. The blog post is almost ready :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25161:
URL: https://github.com/apache/airflow/pull/25161#issuecomment-1192848443

   Addressed all comments and got rid of the "non-robust" solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936004226


##########
docs/apache-airflow/howto/dynamic-dag-generation.rst:
##########
@@ -140,3 +140,55 @@ Each of them can run separately with related configuration
 
 .. warning::
   Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion <https://github.com/apache/airflow/discussions/21278#discussioncomment-2103559>`_ for more details
+
+
+Optimizing DAG parsing delays during execution
+----------------------------------------------
+
+Sometimes when you generate a lot of Dynamic DAGs from a single DAG file, it might cause unnecessary delays
+when the DAG file is parsed during task execution. The impact is a delay before a task starts.
+
+Why is this happening? You might not be aware but just before your task is executed,
+Airflow parses the Python file the DAG comes from.
+
+The Airflow Scheduler (or DAG Processor) requires loading of a complete DAG file to process all metadata.
+However, task execution requires only a single DAG object to execute a task. Knowing this, we can
+skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time.
+This optimization is most effective when the number of generated DAGs is high.
+
+There is an experimental approach that you can take to optimize this behaviour. Note that it is not always
+possible to use (for example when generation of subsequent DAGs depends on the previous DAGs) or when
+there are some side-effects of your DAGs generation. Also the code snippet below is pretty complex and while
+we tested it and it works in most circumstances, there might be cases where detection of the currently
+parsed DAG will fail and it will revert to creating all the DAGs or fail. Use this solution with care and
+test it thoroughly.
+
+A nice example of performance improvements you can gain is shown in the
+`Airflow's Magic Loop <https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629>`_ blog post
+that describes how parsing during task execution was reduced from 120 seconds to 200 ms.
+
+In Airflow 2.4 the following variables you can use
+``airflow.utils.dag_parsing_context import get_parsing_context`` method to retrieve the current context.
+
+Upon iterating over the collection of things to generate DAGs for, you can use the context to determine
+whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only
+a single DAG object (when executing the task):
+
+.. code-block:: python
+  :emphasize-lines: 6,7,8,12,13
+
+  from airflow.models.dag import DAG
+  from airflow.utils.dag_parsing_context import get_parsing_context, AirflowParsingContextType
+
+  current_dag = None
+  parsing_context = get_parsing_context()
+  if parsing_context and parsing_context.context_type == AirflowParsingContextType.TASK_EXECUTION:
+      current_dag = parsing_context.dag_id

Review Comment:
   Ah yeah. Actually threre is even nicer simplification we can do which will have a little less bolerplate (see the change)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r935682849


##########
airflow/executors/local_executor.py:
##########
@@ -78,14 +79,17 @@ def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
 
         self.log.info("%s running %s", self.__class__.__name__, command)
         setproctitle(f"airflow worker -- LocalExecutor: {command}")
-        if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
-            state = self._execute_work_in_subprocess(command)
-        else:
-            state = self._execute_work_in_fork(command)
-
-        self.result_queue.put((key, state))
-        # Remove the command since the worker is done executing the task
-        setproctitle("airflow worker -- LocalExecutor")
+        with _airflow_parsing_context_manager(
+            AirflowParsingContextType.TASK_EXECUTION, dag_id=key[0], task_id=key[1]
+        ):

Review Comment:
   Nit(?): I would have expected this parsing context to be set in the "child" process only, not the parent. Is there a reason it was done this way?



##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"

Review Comment:
   I'm somewhat wary of this -- why do we want a DAG to ever know what mode it is in? Shouldn't the mode be one of
   
   a) Give me all the dags in the file (in which case `context.dag_id` is None); or
   b) Give me a specific DAG only?



##########
airflow/task/task_runner/standard_task_runner.py:
##########
@@ -84,12 +85,16 @@ def _start_by_fork(self):
             if job_id is not None:
                 proc_title += " {0.job_id}"
             setproctitle(proc_title.format(args))
-
             return_code = 0
             try:
                 # parse dag file since `airflow tasks run --local` does not parse dag file
                 dag = get_dag(args.subdir, args.dag_id)
-                args.func(args, dag=dag)
+                with _airflow_parsing_context_manager(

Review Comment:
   Shouldn't the `dag = get_dag()` call be inside the context too?



##########
airflow/executors/local_executor.py:
##########
@@ -78,14 +79,17 @@ def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
 
         self.log.info("%s running %s", self.__class__.__name__, command)
         setproctitle(f"airflow worker -- LocalExecutor: {command}")
-        if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
-            state = self._execute_work_in_subprocess(command)
-        else:
-            state = self._execute_work_in_fork(command)
-
-        self.result_queue.put((key, state))
-        # Remove the command since the worker is done executing the task
-        setproctitle("airflow worker -- LocalExecutor")
+        with _airflow_parsing_context_manager(
+            AirflowParsingContextType.TASK_EXECUTION, dag_id=key[0], task_id=key[1]
+        ):

Review Comment:
   It looks like it _is_ done there too. In which case we shouldn't need it here too?



##########
docs/apache-airflow/howto/dynamic-dag-generation.rst:
##########
@@ -140,3 +140,55 @@ Each of them can run separately with related configuration
 
 .. warning::
   Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion <https://github.com/apache/airflow/discussions/21278#discussioncomment-2103559>`_ for more details
+
+
+Optimizing DAG parsing delays during execution
+----------------------------------------------
+
+Sometimes when you generate a lot of Dynamic DAGs from a single DAG file, it might cause unnecessary delays
+when the DAG file is parsed during task execution. The impact is a delay before a task starts.
+
+Why is this happening? You might not be aware but just before your task is executed,
+Airflow parses the Python file the DAG comes from.
+
+The Airflow Scheduler (or DAG Processor) requires loading of a complete DAG file to process all metadata.
+However, task execution requires only a single DAG object to execute a task. Knowing this, we can
+skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time.
+This optimization is most effective when the number of generated DAGs is high.
+
+There is an experimental approach that you can take to optimize this behaviour. Note that it is not always
+possible to use (for example when generation of subsequent DAGs depends on the previous DAGs) or when
+there are some side-effects of your DAGs generation. Also the code snippet below is pretty complex and while
+we tested it and it works in most circumstances, there might be cases where detection of the currently
+parsed DAG will fail and it will revert to creating all the DAGs or fail. Use this solution with care and
+test it thoroughly.
+
+A nice example of performance improvements you can gain is shown in the
+`Airflow's Magic Loop <https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629>`_ blog post
+that describes how parsing during task execution was reduced from 120 seconds to 200 ms.
+
+In Airflow 2.4 the following variables you can use
+``airflow.utils.dag_parsing_context import get_parsing_context`` method to retrieve the current context.
+
+Upon iterating over the collection of things to generate DAGs for, you can use the context to determine
+whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only
+a single DAG object (when executing the task):
+
+.. code-block:: python
+  :emphasize-lines: 6,7,8,12,13
+
+  from airflow.models.dag import DAG
+  from airflow.utils.dag_parsing_context import get_parsing_context, AirflowParsingContextType
+
+  current_dag = None
+  parsing_context = get_parsing_context()
+  if parsing_context and parsing_context.context_type == AirflowParsingContextType.TASK_EXECUTION:
+      current_dag = parsing_context.dag_id

Review Comment:
   Further to my previous comment, I'd be happier if this was just:
   
   ```suggestion
       current_dag = parsing_context.dag_id
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25161: Robust context of dag parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25161:
URL: https://github.com/apache/airflow/pull/25161#issuecomment-1189232045

   A bit cleaner interface (also appropriate to be implemented as "future".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25161: Robust context of dag parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25161:
URL: https://github.com/apache/airflow/pull/25161#issuecomment-1189185156

   This is the first attempt to implement the "robust execution context" as a follow up after #25121 .  I am not 100% if I got everything, but I think it might be quite close. @pingzh  I know you have a ton of experience with varioous runners and the way they are implemented is a bit "convoluted" so I'd appreciate thorough review (note that it is based on #25121 which was documentation only so only last commit matter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Robust context of dag parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r927901357


##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"
+
+
+class AirflowParsingContext(NamedTuple):
+    """Context of parsing for the DAG."""
+
+    context_type: Optional[AirflowParsingContextType]
+    dag_id: Optional[str]
+    task_id: Optional[str]
+
+
+_AIRFLOW_PARSING_CONTEXT = "_AIRFLOW_PARSING_CONTEXT"
+_AIRFLOW_PARSING_CONTEXT_DAG_ID = "_AIRFLOW_PARSING_CONTEXT_DAG_ID"
+_AIRFLOW_PARSING_CONTEXT_TASK_ID = "_AIRFLOW_PARSING_CONTEXT_TASK_ID"
+
+
+@contextmanager
+def _airflow_parsing_context_manager(
+    context: AirflowParsingContextType, dag_id: Optional[str] = None, task_id: Optional[str] = None
+):
+    os.environ[_AIRFLOW_PARSING_CONTEXT] = context.value
+    if context == AirflowParsingContextType.TASK_EXECUTION:
+        os.environ[_AIRFLOW_PARSING_CONTEXT_DAG_ID] = dag_id if dag_id is not None else ""
+        os.environ[_AIRFLOW_PARSING_CONTEXT_TASK_ID] = task_id if task_id is not None else ""

Review Comment:
   Yeah.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936652930


##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"

Review Comment:
   Right. Sklightly smaller with the Enum type.



##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"

Review Comment:
   Right. Sklightly smaller without the Enum type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936381251


##########
airflow/executors/local_executor.py:
##########
@@ -78,14 +79,17 @@ def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
 
         self.log.info("%s running %s", self.__class__.__name__, command)
         setproctitle(f"airflow worker -- LocalExecutor: {command}")
-        if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
-            state = self._execute_work_in_subprocess(command)
-        else:
-            state = self._execute_work_in_fork(command)
-
-        self.result_queue.put((key, state))
-        # Remove the command since the worker is done executing the task
-        setproctitle("airflow worker -- LocalExecutor")
+        with _airflow_parsing_context_manager(
+            AirflowParsingContextType.TASK_EXECUTION, dag_id=key[0], task_id=key[1]
+        ):

Review Comment:
   AIP-45 is already merged and in main, so we don't need this here at all then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25161:
URL: https://github.com/apache/airflow/pull/25161#issuecomment-1202724743

   > We did already:
   
   Ah, but it's not marked with the "experimental" sphinx tag https://github.com/apache/airflow/blame/main/docs/apache-airflow/listeners.rst#L31


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936621879


##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"

Review Comment:
   But yeah. you are right. YAGNI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] feluelle commented on a diff in pull request #25161: Robust context of dag parsing

Posted by GitBox <gi...@apache.org>.
feluelle commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r927523656


##########
docs/apache-airflow/howto/dynamic-dag-generation.rst:
##########
@@ -140,3 +140,120 @@ Each of them can run separately with related configuration
 
 .. warning::
   Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion <https://github.com/apache/airflow/discussions/21278#discussioncomment-2103559>`_ for more details
+
+
+Optimizing DAG parsing delays during execution
+----------------------------------------------
+
+Sometimes when you generate a lot of Dynamic DAGs from a single DAG file, it might cause unnecessary delays
+when the DAG file is parsed during task execution. The impact is a delay before a task starts.
+
+Why is this happening? You might not be aware but just before your task is executed,
+Airflow parses the Python file the DAG comes from.
+
+The Airflow Scheduler (or DAG Processor) requires loading of a complete DAG file to process all metadata.
+However, task execution requires only a single DAG object to execute a task. Knowing this, we can
+skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time.
+This optimization is most effective when the number of generated DAGs is high.
+
+There is an experimental approach that you can take to optimize this behaviour. Note that it is not always
+possible to use (for example when generation of subsequent DAGs depends on the previous DAGs) or when
+there are some side-effects of your DAGs generation. Also the code snippet below is pretty complex and while
+we tested it and it works in most circumstances, there might be cases where detection of the currently
+parsed DAG will fail and it will revert to creating all the DAGs or fail. Use this solution with care and
+test it thoroughly.
+
+A nice example of performance improvements you can gain is shown in the
+`Airflow's Magic Loop <https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629>`_ blog post
+that describes how parsing during task execution was reduced from 120 seconds to 200 ms.
+
+Airflow 2.4+
+............
+
+In Airflow 2.4 the following variables you can use
+``airflow.utils.dag_parsing_context import get_parsing_context`` method to retrieve the current context.
+
+Upon iterating over the collection of things to generate DAGs for, you can use the context to determine
+whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only
+a single DAG object (when executing the task):
+
+.. code-block:: python
+  :emphasize-lines: 6,7,8,12,13
+
+  import os
+  from airflow.models.dag import DAG
+  from airflow.utils.dag_parsing_context import get_parsing_context, AirflowParsingContextType
+
+  current_dag = None
+  parsing_context = get_parsing_context()
+  if parsing_context and parsing_context.context_type == AirflowParsingContextType.TASK_EXECUTION:
+      current_dag = parsing_context.dag_id
+
+  for thing in list_of_things:
+      dag_id = f"generated_dag_{thing}"
+      if current_dag is not None and current_dag != dag_id:
+          continue  # skip generation of non-selected DAG
+
+      dag = DAG(dag_id=dag_id, ...)
+      globals()[dag_id] = dag
+
+
+
+Airflow 2.3 and below
+......................
+
+In Airflow 2.3 and below, upon evaluation of a DAG file, command line arguments are supplied
+which we can use to determine which Airflow component performs parsing:
+
+* Scheduler/DAG Processor args: ``["airflow", "scheduler"]`` or ``["airflow", "dag-processor"]``
+* Task execution args: ``["airflow", "tasks", "run", "dag_id", "task_id", ...]``
+
+However, depending on the executor used and forking model, those args might be available via ``sys.args``
+or via name of the process running. Airflow either executes tasks via running a new Python interpreter or
+sets the name of the process as "airflow task supervisor: {ARGS}" in case of celery forked process or
+"airflow task runner: dag_id task_id" in case of local executor forked process.
+
+Upon iterating over the collection of things to generate DAGs for, you can use these arguments to determine
+whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only
+a single DAG object (when executing the task):
+
+.. code-block:: python
+  :emphasize-lines: 7,8,9,19,20,24,25,31,32
+
+  import sys
+  import ast
+  import setproctitle
+  from airflow.models.dag import DAG
+
+  current_dag = None
+  if len(sys.argv) > 3 and sys.argv[1] == "tasks":
+      # task executed by starting a new Python interpreter
+      current_dag = sys.argv[3]
+  else:
+      try:
+          PROCTITLE_SUPERVISOR_PREFIX = "airflow task supervisor: "
+          PROCTITLE_TASK_RUNNER_PREFIX = "airflow task runner: "
+          proctitle = str(setproctitle.getproctitle())
+          if proctitle.startswith(PROCTITLE_SUPERVISOR_PREFIX):
+              # task executed via forked process in celery
+              args_string = proctitle[len(PROCTITLE_SUPERVISOR_PREFIX) :]
+              args = ast.literal_eval(args_string)
+              if len(args) > 3 and args[1] == "tasks":
+                  current_dag = args[3]
+          elif proctitle.startswith(PROCTITLE_TASK_RUNNER_PREFIX):
+              # task executed via forked process in standard_task_runner
+              args = proctitle[len(PROCTITLE_TASK_RUNNER_PREFIX) :].split(" ")
+              if len(args) > 0:
+                  current_dag = args[0]
+      except Exception:
+          pass

Review Comment:
   This to me looks a bit like "_Don't try this at home._" 😅 I think I do I agree with @jedcunningham that this should not be in official docs.
   
   Your "robust" approach is much better :)



##########
docs/apache-airflow/howto/dynamic-dag-generation.rst:
##########
@@ -140,3 +140,120 @@ Each of them can run separately with related configuration
 
 .. warning::
   Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion <https://github.com/apache/airflow/discussions/21278#discussioncomment-2103559>`_ for more details
+
+
+Optimizing DAG parsing delays during execution
+----------------------------------------------
+
+Sometimes when you generate a lot of Dynamic DAGs from a single DAG file, it might cause unnecessary delays
+when the DAG file is parsed during task execution. The impact is a delay before a task starts.
+
+Why is this happening? You might not be aware but just before your task is executed,
+Airflow parses the Python file the DAG comes from.
+
+The Airflow Scheduler (or DAG Processor) requires loading of a complete DAG file to process all metadata.
+However, task execution requires only a single DAG object to execute a task. Knowing this, we can
+skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time.
+This optimization is most effective when the number of generated DAGs is high.
+
+There is an experimental approach that you can take to optimize this behaviour. Note that it is not always
+possible to use (for example when generation of subsequent DAGs depends on the previous DAGs) or when
+there are some side-effects of your DAGs generation. Also the code snippet below is pretty complex and while
+we tested it and it works in most circumstances, there might be cases where detection of the currently
+parsed DAG will fail and it will revert to creating all the DAGs or fail. Use this solution with care and
+test it thoroughly.
+
+A nice example of performance improvements you can gain is shown in the
+`Airflow's Magic Loop <https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629>`_ blog post
+that describes how parsing during task execution was reduced from 120 seconds to 200 ms.
+
+Airflow 2.4+
+............
+
+In Airflow 2.4 the following variables you can use
+``airflow.utils.dag_parsing_context import get_parsing_context`` method to retrieve the current context.
+
+Upon iterating over the collection of things to generate DAGs for, you can use the context to determine
+whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only
+a single DAG object (when executing the task):
+
+.. code-block:: python
+  :emphasize-lines: 6,7,8,12,13
+
+  import os

Review Comment:
   ```suggestion
   ```



##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"
+
+
+class AirflowParsingContext(NamedTuple):
+    """Context of parsing for the DAG."""
+
+    context_type: Optional[AirflowParsingContextType]
+    dag_id: Optional[str]
+    task_id: Optional[str]
+
+
+_AIRFLOW_PARSING_CONTEXT = "_AIRFLOW_PARSING_CONTEXT"
+_AIRFLOW_PARSING_CONTEXT_DAG_ID = "_AIRFLOW_PARSING_CONTEXT_DAG_ID"
+_AIRFLOW_PARSING_CONTEXT_TASK_ID = "_AIRFLOW_PARSING_CONTEXT_TASK_ID"
+
+
+@contextmanager
+def _airflow_parsing_context_manager(
+    context: AirflowParsingContextType, dag_id: Optional[str] = None, task_id: Optional[str] = None
+):
+    os.environ[_AIRFLOW_PARSING_CONTEXT] = context.value
+    if context == AirflowParsingContextType.TASK_EXECUTION:
+        os.environ[_AIRFLOW_PARSING_CONTEXT_DAG_ID] = dag_id if dag_id is not None else ""
+        os.environ[_AIRFLOW_PARSING_CONTEXT_TASK_ID] = task_id if task_id is not None else ""

Review Comment:
   ```suggestion
           os.environ[_AIRFLOW_PARSING_CONTEXT_DAG_ID] = dag_id
           os.environ[_AIRFLOW_PARSING_CONTEXT_TASK_ID] = task_id
   ```
   Why do you need to check this? Wouldn't `None` work as well? An empty `""` id does not make sense in my opinion, especially because you are using `Optional[str] = None` everywhere anyway. In the `get_parsing_context()` you are using `os.environ.get()` which means it will either be `None` or the value.



##########
docs/apache-airflow/howto/dynamic-dag-generation.rst:
##########
@@ -140,3 +140,120 @@ Each of them can run separately with related configuration
 
 .. warning::
   Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion <https://github.com/apache/airflow/discussions/21278#discussioncomment-2103559>`_ for more details
+
+
+Optimizing DAG parsing delays during execution
+----------------------------------------------
+
+Sometimes when you generate a lot of Dynamic DAGs from a single DAG file, it might cause unnecessary delays
+when the DAG file is parsed during task execution. The impact is a delay before a task starts.
+
+Why is this happening? You might not be aware but just before your task is executed,
+Airflow parses the Python file the DAG comes from.
+
+The Airflow Scheduler (or DAG Processor) requires loading of a complete DAG file to process all metadata.
+However, task execution requires only a single DAG object to execute a task. Knowing this, we can
+skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time.
+This optimization is most effective when the number of generated DAGs is high.
+
+There is an experimental approach that you can take to optimize this behaviour. Note that it is not always
+possible to use (for example when generation of subsequent DAGs depends on the previous DAGs) or when
+there are some side-effects of your DAGs generation. Also the code snippet below is pretty complex and while
+we tested it and it works in most circumstances, there might be cases where detection of the currently
+parsed DAG will fail and it will revert to creating all the DAGs or fail. Use this solution with care and
+test it thoroughly.
+
+A nice example of performance improvements you can gain is shown in the
+`Airflow's Magic Loop <https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629>`_ blog post
+that describes how parsing during task execution was reduced from 120 seconds to 200 ms.
+
+Airflow 2.4+
+............
+
+In Airflow 2.4 the following variables you can use
+``airflow.utils.dag_parsing_context import get_parsing_context`` method to retrieve the current context.
+
+Upon iterating over the collection of things to generate DAGs for, you can use the context to determine
+whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only
+a single DAG object (when executing the task):
+
+.. code-block:: python
+  :emphasize-lines: 6,7,8,12,13
+
+  import os
+  from airflow.models.dag import DAG
+  from airflow.utils.dag_parsing_context import get_parsing_context, AirflowParsingContextType
+
+  current_dag = None
+  parsing_context = get_parsing_context()
+  if parsing_context and parsing_context.context_type == AirflowParsingContextType.TASK_EXECUTION:
+      current_dag = parsing_context.dag_id

Review Comment:
   This looks so much cleaner. I like it a lot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pingzh commented on a diff in pull request #25161: Robust context of dag parsing

Posted by GitBox <gi...@apache.org>.
pingzh commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r925076077


##########
airflow/executors/base_executor.py:
##########
@@ -332,10 +333,17 @@ def slots_available(self):
             return sys.maxsize
 
     @staticmethod
-    def validate_command(command: List[str]) -> None:
-        """Check if the command to execute is airflow command"""
+    def validate_command(command: List[str]) -> Tuple[Optional[str], Optional[str]]:

Review Comment:
   nit: it might be more intuitive to call it `validate_airflow_tasks_run_command` as it is tight to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936000630


##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"

Review Comment:
   Sure. Good idea. I was not sure how to name it, but this seems better "split" than the 'execution" context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936017428


##########
airflow/task/task_runner/standard_task_runner.py:
##########
@@ -84,12 +85,16 @@ def _start_by_fork(self):
             if job_id is not None:
                 proc_title += " {0.job_id}"
             setproctitle(proc_title.format(args))
-
             return_code = 0
             try:
                 # parse dag file since `airflow tasks run --local` does not parse dag file
                 dag = get_dag(args.subdir, args.dag_id)
-                args.func(args, dag=dag)
+                with _airflow_parsing_context_manager(

Review Comment:
   That quite depends on the answer of whether we can somewhat standardize the level at which we inject the parser (see above).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936585249


##########
airflow/task/task_runner/standard_task_runner.py:
##########
@@ -84,12 +85,16 @@ def _start_by_fork(self):
             if job_id is not None:
                 proc_title += " {0.job_id}"
             setproctitle(proc_title.format(args))
-
             return_code = 0
             try:
                 # parse dag file since `airflow tasks run --local` does not parse dag file
                 dag = get_dag(args.subdir, args.dag_id)
-                args.func(args, dag=dag)
+                with _airflow_parsing_context_manager(

Review Comment:
   Right. Now yes. I see we  now parse it here.



##########
airflow/task/task_runner/standard_task_runner.py:
##########
@@ -84,12 +85,16 @@ def _start_by_fork(self):
             if job_id is not None:
                 proc_title += " {0.job_id}"
             setproctitle(proc_title.format(args))
-
             return_code = 0
             try:
                 # parse dag file since `airflow tasks run --local` does not parse dag file
                 dag = get_dag(args.subdir, args.dag_id)
-                args.func(args, dag=dag)
+                with _airflow_parsing_context_manager(

Review Comment:
   Stupid me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936584448


##########
airflow/executors/local_executor.py:
##########
@@ -78,14 +79,17 @@ def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
 
         self.log.info("%s running %s", self.__class__.__name__, command)
         setproctitle(f"airflow worker -- LocalExecutor: {command}")
-        if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
-            state = self._execute_work_in_subprocess(command)
-        else:
-            state = self._execute_work_in_fork(command)
-
-        self.result_queue.put((key, state))
-        # Remove the command since the worker is done executing the task
-        setproctitle("airflow worker -- LocalExecutor")
+        with _airflow_parsing_context_manager(
+            AirflowParsingContextType.TASK_EXECUTION, dag_id=key[0], task_id=key[1]
+        ):

Review Comment:
   OK. I see where I made the mistake



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25161:
URL: https://github.com/apache/airflow/pull/25161#issuecomment-1204348098

   Right. Finally Green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r936606096


##########
airflow/utils/dag_parsing_context.py:
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from contextlib import contextmanager
+from enum import Enum
+from typing import NamedTuple, Optional
+
+
+class AirflowParsingContextType(Enum):
+    """Type of context of parsing the DAG."""
+
+    DAG_PROCESSOR = "DAG_PROCESSOR"
+    TASK_EXECUTION = "TASK_EXECUTION"

Review Comment:
   Ah sorry, I wasn't clear.
   
   My suggestion is to remove the type entirely, and have the decision in dag files be `if context.dag_id is not None:`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25161: Add parsing context to DAG Parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25161:
URL: https://github.com/apache/airflow/pull/25161#discussion_r937737816


##########
airflow/executors/base_executor.py:
##########
@@ -333,9 +334,41 @@ def slots_available(self):
 
     @staticmethod
     def validate_command(command: List[str]) -> None:
-        """Check if the command to execute is airflow command"""
+        """
+        Back-compat method to Check if the command to execute is airflow command

Review Comment:
   ```suggestion
           Back-compat method to Check if the command to execute is airflow command
           
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org