You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/19 22:23:54 UTC

[GitHub] [airflow] dstandish opened a new pull request, #26509: WIP - Fix airflow tasks run --local when dags_folder differs from that of processor

dstandish opened a new pull request, #26509:
URL: https://github.com/apache/airflow/pull/26509

   Currently, if your dags_folder differs from that used by the dag processor, then task_run --local will fail to find the dag, because it uses the fileloc value stored in the serialized dag.  The behavior of this function is somewhat difficult to test so I wanted to first just test the bad behavior and next I'll work on a fix and verify that it behaves correctly.
   
   We may want to let fileloc be relative, or we may just want to also store the dags_folder value of the file processor (or just a placeholder?) so that we can replace it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
mhenc commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r977517608


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   nit: How does it apply to `processor_subdir` field from e.g.DagModel? 
   Doesn't it have the same meaning?
   Maybe we should keep the name consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r977886390


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   Hmm... that's a good question....  I don't know if it has the same meaning.  Is it correct to assume that processor_subdir will always be the dags folder of the processor?  If so, why didn't they call it dags folder in the first place?  What if there is sometimes an optimization to pass the actual dag file path as subdir to the dag processor so that it will just load that specific file instead of all dags in the dags folder?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish merged pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish merged PR #26509:
URL: https://github.com/apache/airflow/pull/26509


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26509:
URL: https://github.com/apache/airflow/pull/26509#issuecomment-1254795797

   Needs reabase :). But looks good. @mhenc? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r980206186


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   I read the thread carefuly (sorry it took me some time, I wanted to make sure I am not mixing things). There are some misunderstanding of what --sub is,  I will try to paraphrase my understanding and simply see if we are on the same page.
   
   A bit context as I understand it and remember :).
   
   The idea of the "--subdir" flag and the way we've implemented it was not to over-complicate it - i.e. to be able to do two things 
   
   Phase 1) separate out standalone DAG processor from scheduler (isolate code execution to a different machine that for example might be in a different security zone). 
   
   Phase 2) to be able to split your DAGS folder into separate subdirs - each isolated from each other.
   
   
   Case 1)you have all DAG File processors to run from DAGS_FOLDER (no --subdir flag, equivalent to what we currently have when we hae multiple scheduler all parsing data from the same folder - which is a bit redundant but acceptable. This was not the main purpose of introducing standalone DAG processor to run several of them at the same time)
   
   Case 2) You have one (or more) separate DAG processors and each group of those with its own, separte directory to process. No "top level" dag file processor. This is equivalent to have several independent DAG File processors and no "common" one at all.  Eventually this might be connected to teams. The main use-case here was to make it independent from the actual "airflow" CORE_DAGS_FOLDER
   
   TEAM_A -> DAG_PROCESSOR_A1, DAG_PROCESSOR_A2 (--subdir /FOLDER_OF_TEAM_A)
   TEAM_B -> DAG_PROCESSOR_B1, DAG_PROCESSOR_B2 (--subdir /FOLDER_OF_TEAM_B)
   
   This means that the user might configure it differently depending on the deployment choice they want to have. And yeah the `--subdir` name might be misleading in this context, I vaguely recall we had a discussion about it when we chose the parameter name (we can find it I guess to get more context - I can't even remember what was my preference there) but the `--subdir` in this case is more about the fact that this is not "all of" DAGs but subset of them. I think we chose the name because we chose to use existing `--subdir` parameter in local which has already this "mistleading" semantics - and so that `airflow dags test --subdir <ABSOLUTE_PATH>`, and have it equivalent to `dag-processor --subdir <ABSOLUTE_PATH>`
   
   Currently `--subdir` in local tasks has this behaviour:
   
   1) If it is an absolute PATH use it as the absolute location of the DAG
   2) If it has DAGS_FOLDER (anywhere)-> replace DAGS_FOLDER with the current DAGS_FOLDER - this was fixed by @ashb in 2.2.0 to handle the case where we could handle different locations on the scheduler and runners (https://github.com/apache/airflow/pull/16860) - it was broken in 2.0 and 2.1 but worked in 1.10.
   3) It can also expand the "user" path (god knows why but likely historical too).
   
   This is the current `process_subdir()` method:
   
   ```
   def process_subdir(subdir: str | None):
       """Expands path to absolute by replacing 'DAGS_FOLDER', '~', '.', etc."""
       if subdir:
           if not settings.DAGS_FOLDER:
               raise ValueError("DAGS_FOLDER variable in settings should be filled.")
           subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER)
           subdir = os.path.abspath(os.path.expanduser(subdir))
       return subdir
   ```
   
   Originally the idea was that they might want to have them independent of the the current https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder without relative paths. That's fine, notthing prevents us from doing so, I believe we handle this perfectly well - we already handled the situation where DagBag is passed an Absolute Path and it would store the absolute path of the DAG where it comes from. This is for example used in case of "example_dags" and this is what we wanted to get.
   
   
   I believe (please correct me if I misundderstood it @dstandish) that this PR here was very similar to what the #16860 change for @ashb was. But while Ashb wanted to resurrect the relative approach working for the main DAGS folder, but this one was to bring it to the standalone DAG processor. 
   
   This is fine - we did not want to do it originally, we have not thought this might be useful, but when I think of it can have some uses - but only of you create a deployment wher all the subdirs served by all file processors are subdirs of the "CORE" dags folder (and there is still no top-level parser/processor).
   
   Then you could likely synchronise all your sub-folders as a single volume or git-sync and scam each sub-folder of it through different group of dag file processors. Originally the idea was that each DAG File processor (group) could have a separate git-sync or volume. But this is an interesting approach where they are isolated for execution but not isolated for syncing. This makes it a bit more difficult to maintain the isolation and syncing (because you have to have parallel filesystem access  hierarchy to sub-processor hierarchy or git-submodules to run syncing from multple reposiotories (but it's doable and a number of our users already do it - for example Jagex in their airlfow Summit talk from London). 
   
   Are we on the same page here :) ? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26509:
URL: https://github.com/apache/airflow/pull/26509#issuecomment-1260646034

   Makes sense - one static check to fix but good to go :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r980206186


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   I read the thread carefuly (sorry it took me some time, I wanted to make sure I am not mixing things). There are some misunderstanding of what --sub is,  I will try to paraphrase my understanding and simply see if we are on the same page.
   
   A bit context as I understand it and remember :).
   
   The idea of the "--subdir" flag and the way we've implemented it was not to over-complicate it - i.e. to be able to do two things 
   
   Phase 1) separate out standalone DAG processor from scheduler (isolate code execution to a different machine that for example might be in a different security zone). 
   
   Phase 2) to be able to split your DAGS folder into separate subdirs - each isolated from each other.
   
   
   Case 1)you have all DAG File processors to run from DAGS_FOLDER (no --subdir flag, equivalent to what we currently have when we hae multiple scheduler all parsing data from the same folder - which is a bit redundant but acceptable. This was not the main purpose of introducing standalone DAG processor to run several of them at the same time)
   
   Case 2) You have one (or more) separate DAG processors and each group of those with its own, separte directory to process. No "top level" dag file processor. This is equivalent to have several independent DAG File processors and no "common" one at all.  Eventually this might be connected to teams. The main use-case here was to make it independent from the actual "airflow" CORE_DAGS_FOLDER
   
   TEAM_A -> DAG_PROCESSOR_A1, DAG_PROCESSOR_A2 (--subdir /FOLDER_OF_TEAM_A)
   TEAM_B -> DAG_PROCESSOR_B1, DAG_PROCESSOR_B2 (--subdir /FOLDER_OF_TEAM_B)
   
   This means that the user might configure it differently depending on the deployment choice they want to have. And yeah the `--subdir` name might be misleading in this context, I vaguely recall we had a discussion about it when we chose the parameter name (we can find it I guess to get more context - I can't even remember what was my preference there) but the `--subdir` in this case is more about the fact that this is not "all of" DAGs but subset of them. I think we chose the name because we chose to use existing `--subdir` parameter in local which has already this "mistleading" semantics - and so that `airflow dags test --subdir <ABSOLUTE_PATH>`, and have it equivalent to `dag-processor --subdir <ABSOLUTE_PATH>`
   
   Currently `--subdir` in local tasks has this behaviour:
   
   1) If it is an absolute PATH use it as the absolute location of the DAG
   2) If it has DAGS_FOLDER (anywhere)-> replace DAGS_FOLDER with the current DAGS_FOLDER - this was fixed by @ashb in 2.2.0 to handle the case where we could handle different locations on the scheduler and runners (https://github.com/apache/airflow/pull/16860) - it was broken in 2.0 and 2.1 but worked in 1.10.
   3) It can also expand the "user" path (god knows why but likely historical too).
   
   This is the current `process_subdir()` method:
   
   ```
   def process_subdir(subdir: str | None):
       """Expands path to absolute by replacing 'DAGS_FOLDER', '~', '.', etc."""
       if subdir:
           if not settings.DAGS_FOLDER:
               raise ValueError("DAGS_FOLDER variable in settings should be filled.")
           subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER)
           subdir = os.path.abspath(os.path.expanduser(subdir))
       return subdir
   ```
   
   Originally the idea was that they might want to have them independent of the the current https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder without relative paths. That's fine, notthing prevents us from doing so, I believe we handle this perfectly well - we already handled the situation where DagBag is passed an Absolute Path and it would store the absolute path of the DAG where it comes from. This is for example used in case of "example_dags" and this is what we wanted to get.
   
   
   I believe (please correct me if I misundderstood it @dstandish) that this PR here was very similar to what the #16860 change for @ashb was. But while Ashb wanted to resurrect the relative approach working for the main DAGS folder, but this one was to bring it to the standalone DAG processor. 
   
   This is fine - we did not want to do it originally, we have not thought this might be useful, but when I think of it can have some uses - but only of you create a deployment wher all the subdirs served by all file processors are subdirs of the "CORE" dags folder (and there is still no top-level parser/processor).
   
   Then you could likely synchronise all your sub-folders as a single volume or git-sync and scam each sub-folder of it through different group of dag file processors. Originally the idea was that each DAG File processor (group) could have a separate git-sync or volume. But this is an interesting approach where they are isolated for execution but not isolated for syncing. This makes it a bit more difficult to maintain the isolation and syncing (because you have to have parallel access  hierarchy to sub-processor hierarchy or git-submodules to run syncing from multple reposiotories (but it's doable and a number of our users already do it - for example Jagex in their airlfow Summit talk from London). 
   
   Are we on the same page here :) ? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #26509:
URL: https://github.com/apache/airflow/pull/26509#issuecomment-1260157556

   OK so... i made it so this is just a private attr on ser dag object...
   
   So really this just fixes the existing feature without adding any new backcompat promises.  
   
   And it is called dags folder not processor subdir, but, as has been established, the two are not the same thing, and the present attr is always going to tell the truth... so... @potiuk @mhenc does it get your blessing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r977898457


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   from the looks of it, that's why e.g. you have a `--subdir` option in airflow dags-processor command (which can be different from DAGS_FOLDER) 
   and then you have this logic
   https://github.com/apache/airflow/blob/main/airflow/dag_processing/manager.py#L508-L509
   this logic appears to say, "if i am a standalone dags processor, and i didn't find the dags in my path, then mark them as stale"
   
   it does seem a bit weird though because then maybe if you change the dags processor subdirs later, perhaps some of those dags would never be returned by this query and therefore would never be deactivated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r978051224


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   > There's nothing that force the processors to look at subfolders of the dags folder
   
   Isn't that what the `--subdir` option does?
   
   > you could have one looking at `/opt/dags` and another at `/home/airflow/airflow/dags` and it would "work"
   
   not seeing the connection?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r977891813


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   Actually... i think processor_subdir is something different.  I think that the idea is you can have multiple dag processors running, each looking at a different subdir, which is a subdir of dags_folder.  that sound right to you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r980206186


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   I read the thread carefuly (sorry it took me some time, I wanted to make sure I am not mixing things). There are some misunderstanding of what --sub is,  I will try to paraphrase my understanding and simply see if we are on the same page.
   
   # A bit context as I understand it and remember :).
   
   The idea of the "--subdir" flag and the way we've implemented it was not to over-complicate it - i.e. to be able to do two things 
   
   Phase 1) separate out standalone DAG processor from scheduler (isolate code execution to a different machine that for example might be in a different security zone). 
   
   Phase 2) to be able to split your DAGS folder into separate subdirs - each isolated from each other.
   
   
   Case 1)you have all DAG File processors to run from DAGS_FOLDER (no --subdir flag, equivalent to what we currently have when we hae multiple scheduler all parsing data from the same folder - which is a bit redundant but acceptable. This was not the main purpose of introducing standalone DAG processor to run several of them at the same time)
   
   Case 2) You have one (or more) separate DAG processors and each group of those with its own, separte directory to process. No "top level" dag file processor. This is equivalent to have several independent DAG File processors and no "common" one at all.  Eventually this might be connected to teams. The main use-case here was to make it independent from the actual "airflow" CORE_DAGS_FOLDER
   
   TEAM_A -> DAG_PROCESSOR_A1, DAG_PROCESSOR_A2 (--subdir /FOLDER_OF_TEAM_A)
   TEAM_B -> DAG_PROCESSOR_B1, DAG_PROCESSOR_B2 (--subdir /FOLDER_OF_TEAM_B)
   
   This means that the user might configure it differently depending on the deployment choice they want to have. And yeah the `--subdir` name might be misleading in this context, I vaguely recall we had a discussion about it when we chose the parameter name (we can find it I guess to get more context - I can't even remember what was my preference there) but the `--subdir` in this case is more about the fact that this is not "all of" DAGs but subset of them. I think we chose the name because we chose to use existing `--subdir` parameter in local which has already this "mistleading" semantics - and so that `airflow dags test --subdir <ABSOLUTE_PATH>`, and have it equivalent to `dag-processor --subdir <ABSOLUTE_PATH>`
   
   Currently `--subdir` in local tasks has this behaviour:
   
   1) If it is an absolute PATH use it as the absolute location of the DAG
   2) If it has DAGS_FOLDER (anywhere)-> replace DAGS_FOLDER with the current DAGS_FOLDER - this was fixed by @ashb in 2.2.0 to handle the case where we could handle different locations on the scheduler and runners (https://github.com/apache/airflow/pull/16860) - it was broken in 2.0 and 2.1 but worked in 1.10.
   3) It can also expand the "user" path (god knows why but likely historical too).
   
   This is the current `process_subdir()` method:
   
   ```
   def process_subdir(subdir: str | None):
       """Expands path to absolute by replacing 'DAGS_FOLDER', '~', '.', etc."""
       if subdir:
           if not settings.DAGS_FOLDER:
               raise ValueError("DAGS_FOLDER variable in settings should be filled.")
           subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER)
           subdir = os.path.abspath(os.path.expanduser(subdir))
       return subdir
   ```
   
   Originally the idea was that they might want to have them independent of the the current https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder without relative paths. That's fine, notthing prevents us from doing so, I believe we handle this perfectly well - we already handled the situation where DagBag is passed an Absolute Path and it would store the absolute path of the DAG where it comes from. This is for example used in case of "example_dags" and this is what we wanted to get.
   
   
   I believe (please correct me if I misundderstood it @dstandish) that this PR here was very similar to what the #16860 change for @ashb was. But while Ashb wanted to resurrect the relative approach working for the main DAGS folder, but this one was to bring it to the standalone DAG processor. 
   
   This is fine - we did not want to do it originally, we have not thought this might be useful, but when I think of it can have some uses - but only of you create a deployment wher all the subdirs served by all file processors are subdirs of the "CORE" dags folder (and there is still no top-level parser/processor).
   
   Then you could likely synchronise all your sub-folders as a single volume or git-sync and scam each sub-folder of it through different group of dag file processors. Originally the idea was that each DAG File processor (group) could have a separate git-sync or volume. But this is an interesting approach where they are isolated for execution but not isolated for syncing. This makes it a bit more difficult to maintain the isolation and syncing (because you have to have parallel access  hierarchy to sub-processor hierarchy or git-submodules to run syncing from multple reposiotories (but it's doable and a number of our users already do it - for example Jagex in their airlfow Summit talk from London). 
   
   Are we on the same page here :) ? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r978007364


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   There's nothing that force the processors to look at subfolders of the dags folder -- you could have one looking at `/opt/dags` and another at `/home/airflow/airflow/dags` and it would "work"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r977899429


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   ah i guess you would know better than most ;) 
   <img width="679" alt="image" src="https://user-images.githubusercontent.com/15932138/191809049-5b553a1a-d0b5-41e4-9dcc-7182ee21a127.png">
   let me know your thoughts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
mhenc commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r978377313


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   I see that name may be a little confusing, but `--subdir` just points to the dag folder - the value can be absolute path and two different processors may point ot complete different locations.
   
   So what you wrote above:
   
   > Actually... i think processor_subdir is something different. I think that the idea is you can have multiple dag processors running, each looking at a different subdir, which is a subdir of dags_folder. that sound right to you?
   
   is actually not true
   
   Also
   
   > it does seem a bit weird though because then maybe if you change the dags processor subdirs later, perhaps some of those dags would never be returned by this query and therefore would never be deactivated?
   
   For that we have a new check in the scheduler:
   https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L1541



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r980254962


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   > Are we on the same page here :) ?
   
   maybe sort of? :) i'm not sure what you're proposing though...
   
   taking it back down to where the rubber meets the road here...  what i'm trying to do here is fix a bug.  currently if the DAGS_FOLDER of scheduler/processor differs from that of k8s executor then all tasks will fail to run.  i think this issue is the same one you say worked in 1.10 but was broken in 2.0  and 2.1 -- it's broken again in 2.4 and that's what i'm trying to fix.
   
   this is easy to repro if you run scheduler in local virtualenv and run your k8s pods in docker desktop
   
   the cause is the recent optimization to not "process" dags in `airflow tasks run --local` but to read from ser dag.  the pod definition remains like this
   ```
     - args:
       - airflow
       - tasks
       - run
       - example_bash_operator
       - runme_0
       - manual__2022-09-23T20:04:52.306715+00:00
       - --local
       - --subdir
       - DAGS_FOLDER
   ```
   But when that gets converted to `tasks run --raw`  then the subdir becomes the full path (`fileloc`) of the dag _as it is located on the scheduler/processor_.  I think it breaks down because, our mechanism for handling different dags folders is `relative_fileloc`.  But currently the logic to calculate relative fileloc is based on the _current_ dags folder, which means, if the _current_ process has a different dags folder, it cannot be calculated!  So then it defaults to the full filepath as stored in serdag and therefore it fails to find the dag.
   
   So what do we do about it?  The relatively straightforward solution is to use the dags_folder of serializer instead of dags_folder of current process when calculating relative fileloc (or equivalently, you could actually store the relative fileloc when serializing).  Then the actual path can always be reassembled, provided that the dag _actually is_ relative to the dags_folder in all envs (and has the same relative path).
   
   So that's why I added dag_processor_dags_folder.
   
   But @mhenc is suggesting, I think, to just use `processor_subdir` instead.  But the problem I see with that is, processor_subdir may not always equal dags_folder.  And in the case of multiple processors in subdirs of dags folder, then this would not work I think.  The only way it would work is if you had a 1-1 relationship between dags_folder of worker and processor_subdir of processor.  But in that scenario, why not just set the DAGS_FOLDER of your processor to be the subdir?
   
   Fundamentally, I think that currently airflow assumes everything must be in the dags folder, and that dag processors, to the extent they have different paths (subdir) they are _subdirs_ of the dags folder.  This alone seems QED for the notion that processor_subdir !== dags_folder.  I think as we get closer to multitenancy, it seems likely we'll need to blow up the dags_folder concept; if it's no longer true that there is _one_ dags folder then the code and language need to reflect that.  There may be many dags folders.  "relative_fileloc" won't necessarily make sense as a concept (unless it's redefined to mean relative to the dags folder of this "sub-instance").  But for now, the two (processor_subdir) and dags folder seem to be two different things, and DAGS_FOLDER seems to have the best likelihood of giving a reliable answer for `relative_fileloc` so it seems worth storing separately.
   
   THAT SAID, perhaps we should make it a "private" attribute, so as not to increase the backcompat surface unnecessarily?
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
mhenc commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r978494229


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   > to make the subdir agree with configured dags folder. e.g. setting subdir on dag processor command does not force dags_folder to agree. so fundamentally, they are two different things right? the dag processor can run without them agreeing, no?
   
   Right, there is no logic/check, we just pass --subdir to the Dag Processor and it parses the dags from this directory. 
   
   >  if you can't use relative loc, then the worker can't really rely on ser dag to know where a dag is located.
   
   Well, if you define different set of workers on different task queues then it may work, but I agree there is no support from Airflow perspective for such setup.
   
   But note that dag processor seperation (and multiple dag processors) is just the first step towards Airflow multit-tenancy. At some point workers isolation will also be supported.
   
   cc: @potiuk 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r980261032


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   @potiuk 
   
   > I believe (please correct me if I misundderstood it @dstandish) that this PR here was very similar to what the https://github.com/apache/airflow/pull/16860 change for @ashb was. But while Ashb wanted to resurrect the relative approach working for the main DAGS folder, but this one was to bring it to the standalone DAG processor.
   
   Not quite.... the "change" here is not really a change, it's fixing the regression in 2.4 of that same exact feature.  It's not specific to dag processor.  This issue appears even just using scheduler with virtualenv and k8s exec --- no dags processor process required.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r980254962


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   > Are we on the same page here :) ?
   
   maybe sort of? :) i'm not sure what you're proposing though...
   
   taking it back down to where the rubber meets the road here...  what i'm trying to do here is fix a bug.  currently if the DAGS_FOLDER of scheduler/processor differs from that of k8s executor then all tasks will fail to run.  i think this issue is the same one you say worked in 1.10 but was broken in 2.0  and 2.1 -- it's broken again in 2.4 and that's what i'm trying to fix.
   
   this is easy to repro if you run scheduler in local virtualenv and run your k8s pods in docker desktop
   
   the cause is the recent optimization to not "process" dags in `airflow tasks run --local` but to read from ser dag.  the pod definition remains like this
   ```
     - args:
       - airflow
       - tasks
       - run
       - example_bash_operator
       - runme_0
       - manual__2022-09-23T20:04:52.306715+00:00
       - --local
       - --subdir
       - DAGS_FOLDER
   ```
   But when that gets converted to `tasks run --raw`  then the subdir becomes the full path (`fileloc`) of the dag _as it is located on the scheduler/processor`.  I think it breaks down because, our mechanism for handling different dags folders is `relative_fileloc`.  But currently the logic to calculate relative fileloc is based on the _current_ dags folder, which means, if the _current_ process has a different dags folder, it cannot be calculated!  So then it defaults to the full filepath as stored in serdag and therefore it fails to find the dag.
   
   So what do we do about it?  The relatively straightforward solution is to use the dags_folder of serializer instead of dags_folder of current process when calculating relative fileloc (or equivalently, you could actually store the relative fileloc when serializing).  Then the actual path can always be reassembled, provided that the dag _actually is_ relative to the dags_folder in all envs (and has the same relative path).
   
   So that's why I added dag_processor_dags_folder.
   
   But @mhenc is suggesting, I think, to just use `processor_subdir` instead.  But the problem I see with that is, processor_subdir may not always equal dags_folder.  And in the case of multiple processors in subdirs of dags folder, then this would not work I think.  The only way it would work is if you had a 1-1 relationship between dags_folder of worker and processor_subdir of processor.  But in that scenario, why not just set the DAGS_FOLDER of your processor to be the subdir?
   
   Fundamentally, I think that currently airflow assumes everything must be in the dags folder, and that dag processors, to the extent they have different paths (subdir) they are _subdirs_ of the dags folder.  This alone seems QED for the notion that processor_subdir !== dags_folder.  I think as we get closer to multitenancy, it seems likely we'll need to blow up the dags_folder concept; if it's no longer true that there is _one_ dags folder then the code and language need to reflect that.  There may be many dags folders.  "relative_fileloc" won't necessarily make sense as a concept (unless it's redefined to mean relative to the dags folder of this "sub-instance").  But for now, the two (processor_subdir) and dags folder seem to be two different things, and DAGS_FOLDER seems to have the best likelihood of giving a reliable answer for `relative_fileloc` so it seems worth storing separately.
   
   THAT SAID, perhaps we should make it a "private" attribute, so as not to increase the backcompat surface unnecessarily?
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26509:
URL: https://github.com/apache/airflow/pull/26509#discussion_r978437183


##########
airflow/models/dag.py:
##########
@@ -1189,11 +1195,25 @@ def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the configured DAGs folder."""
         path = pathlib.Path(self.fileloc)
         try:
-            return path.relative_to(settings.DAGS_FOLDER)
+            rel_path = path.relative_to(self.dag_processor_dags_folder or settings.DAGS_FOLDER)
+            if rel_path == pathlib.Path('.'):
+                return path
+            else:
+                return rel_path
         except ValueError:
             # Not relative to DAGS_FOLDER.
             return path
 
+    @property
+    def dag_processor_dags_folder(self):

Review Comment:
   > I see that name may be a little confusing, but --subdir just points to the dag folder - the value can be absolute path and two different processors may point ot complete different locations.
   
   what do you mean "just points to the dag folder"? i have looked at the code and it does not seem that there's any logic e.g. to make the subdir agree with configured dags folder.  e.g. setting subdir on dag processor command does not force dags_folder to agree.  so fundamentally, they are two different things right?  the dag processor can run without them agreeing, no?  
   
   if they are in completely different locations, i.e. not a "subdir" of the dags folder, then the concept of "relative location" of a dag file becomes kind of meaningless, and if you can't use relative loc, then the worker can't really rely on ser dag to know where a dag is located.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on pull request #26509: Fix airflow tasks run --local when dags_folder differs from that of processor

Posted by GitBox <gi...@apache.org>.
mhenc commented on PR #26509:
URL: https://github.com/apache/airflow/pull/26509#issuecomment-1260506102

   Yes, looks good for me. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org