You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/16 09:01:34 UTC

[GitHub] [airflow] mhenc opened a new pull request #22305: Add dag-processor cli command

mhenc opened a new pull request #22305:
URL: https://github.com/apache/airflow/pull/22305


   This change introduces new cli command: 'dag-processor' which runs DagProcessorManager as a standalone process.
   
   Part of AIP-43
   https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1085668689


   > Do we want to have this in `2.3`?
   
   I think so. This is already an improvement in security and isolation. While it is still far from the the "true" multi-tenancy, it already gives some features of multitenancy. 
   
   This is still optional and without the DB isolation it does not give a lot of "security". but it already gives users interesting possibilities - for example if they have really a lot of DAGs to parse, but the do  not want to run multiple schedulers  - they could have a single or two schedulers and for example 10 `dags-processors`.  
   
   The additional security you get here is important. By employing DAG processor as separate process running in separate machine (or even separate security zone) you finally reach something that even some of scenarios where DAG writers could exercise too much power on scheduler. By adding DAG processor separation those scenarios are impossible and DAG writer is not able to execute any code on scheduler any more.
   
   Additionally that will give the users an opportunity to isolate code executed by different groups of people.  It does not secure a database access yet, but if you run separate DAG processors for separate subdirs with different "write" access, that already gives the user an option to run separate DAG processor in separate sub-folder you won't be able to execute a code on the same machine as "other group" of yours. Surely - you will be able to do in tasks, but if you use Kubernetes Executor, those are executed in separate PODs, so you will be able to already achive "complete code execution" isolation for multiple groups of people. 
   
   I think -  Google wants to get it deployed anyway, so they might want to take alpha/beta/RC candidates of 2.3 and hammer it to make it really robust and tested, and the sooner that feature gets into hands of the users, the more prepared we will be for next steps of multi-tenancy introduction. It will give us a chance to iron out all the wrinkles in case we find them before we go deeper into "full" multi-tenancy.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mhenc commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
mhenc commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1084218214


   Not really, we proposed `dag-processor` as it just use DagProcessor* class so it shows the connection there.
   E.g. the current documentation calls the component within the Scheduler as "Dag File Processor"
   https://airflow.apache.org/docs/apache-airflow/stable/concepts/scheduler.html#dag-file-processing
   
   but of course we can change it if we believe that fits better
   
   cc: @potiuk thoughts?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mhenc commented on a change in pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
mhenc commented on a change in pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#discussion_r832605958



##########
File path: airflow/dag_processing/manager.py
##########
@@ -383,14 +383,15 @@ def __init__(
         self._dag_ids = dag_ids
         self._async_mode = async_mode
         self._parsing_start_time: Optional[int] = None
+        self._standalone_mode = conf.getboolean("scheduler", "standalone_dag_processor")
 
         # Set the signal conn in to non-blocking mode, so that attempting to
         # send when the buffer is full errors, rather than hangs for-ever
         # attempting to send (this is to avoid deadlocks!)
         #
         # Don't do this in sync_mode, as we _need_ the DagParsingStat sent to
         # continue the scheduler
-        if self._async_mode:
+        if self._async_mode and not self._standalone_mode and self._signal_conn is not None:

Review comment:
       Makes sense, Removed _standalone_mode and rely only on _signal_conn (actually renamed to _direct_scheduler_conn)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1085672190


   Of course we need a bit more documentation (but we can very quickly add the docs describing the use cases and architecture and benefits of it). I am happy to prepare some of that actually if we need to do it quickly - even for Alpha of 2.3.0. 
   
   Also looking  at the "size" and complexity of this overal change, I think the risks connected with this change are rather small. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#discussion_r841004472



##########
File path: airflow/dag_processing/manager.py
##########
@@ -576,6 +578,8 @@ def _run_parsing_loop(self):
                 self.waitables.pop(sentinel)
                 self._processors.pop(processor.file_path)
 
+            if conf.getboolean("scheduler", "standalone_dag_processor"):

Review comment:
       We should read configuration option values out of the loop because they are not cached.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1077579824


   BTW. It's been a bit easier than I thought it will be it seems :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#discussion_r832071565



##########
File path: airflow/dag_processing/manager.py
##########
@@ -383,14 +383,15 @@ def __init__(
         self._dag_ids = dag_ids
         self._async_mode = async_mode
         self._parsing_start_time: Optional[int] = None
+        self._standalone_mode = conf.getboolean("scheduler", "standalone_dag_processor")
 
         # Set the signal conn in to non-blocking mode, so that attempting to
         # send when the buffer is full errors, rather than hangs for-ever
         # attempting to send (this is to avoid deadlocks!)
         #
         # Don't do this in sync_mode, as we _need_ the DagParsingStat sent to
         # continue the scheduler
-        if self._async_mode:
+        if self._async_mode and not self._standalone_mode and self._signal_conn is not None:

Review comment:
       Do we ever get the situation that we have `self._signal_conn` and 'self._standalone_mode`  both set ? I do not think so - after this change we can simplify that check to just `and not self._standalone_mode`. 
   
   I believe after this change we only have two cases:
   
   a) Standalone mode -> no  _signal_conn set
   b) Non-standalone mode -> _signal_conn  set
   
   

##########
File path: airflow/cli/commands/dag_processor_command.py
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""DagProcessor command"""
+import logging
+from datetime import timedelta
+
+import daemon
+from daemon.pidfile import TimeoutPIDLockFile
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.utils import cli as cli_utils
+from airflow.utils.cli import setup_locations, setup_logging
+
+log = logging.getLogger(__name__)
+
+
+def _create_dag_processor_manager(args) -> DagFileProcessorManager:
+    """Creates DagFileProcessorProcess instance."""
+    processor_timeout_seconds: int = conf.getint('core', 'dag_file_processor_timeout')
+    processor_timeout = timedelta(seconds=processor_timeout_seconds)
+    return DagFileProcessorManager(
+        dag_directory=settings.DAGS_FOLDER,
+        max_runs=args.num_runs,
+        processor_timeout=processor_timeout,
+        dag_ids=[],
+        pickle_dags=args.do_pickle,
+    )
+
+
+@cli_utils.action_cli
+def dag_processor(args):
+    """Starts Airflow Dag Processor Job"""
+    if not conf.getboolean("scheduler", "standalone_dag_processor"):
+        raise SystemExit('The option [scheduler/standalone_dag_processor] must be True.')
+
+    sql_conn: str = conf.get('core', 'sql_alchemy_conn').lower()
+    if sql_conn.startswith('sqlite'):
+        raise SystemExit('Standalone DagProcessor is not supported when using sqlite.')

Review comment:
       Same here :)

##########
File path: airflow/cli/commands/dag_processor_command.py
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""DagProcessor command"""
+import logging
+from datetime import timedelta
+
+import daemon
+from daemon.pidfile import TimeoutPIDLockFile
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.utils import cli as cli_utils
+from airflow.utils.cli import setup_locations, setup_logging
+
+log = logging.getLogger(__name__)
+
+
+def _create_dag_processor_manager(args) -> DagFileProcessorManager:
+    """Creates DagFileProcessorProcess instance."""
+    processor_timeout_seconds: int = conf.getint('core', 'dag_file_processor_timeout')
+    processor_timeout = timedelta(seconds=processor_timeout_seconds)
+    return DagFileProcessorManager(
+        dag_directory=settings.DAGS_FOLDER,
+        max_runs=args.num_runs,
+        processor_timeout=processor_timeout,
+        dag_ids=[],
+        pickle_dags=args.do_pickle,
+    )
+
+
+@cli_utils.action_cli
+def dag_processor(args):
+    """Starts Airflow Dag Processor Job"""
+    if not conf.getboolean("scheduler", "standalone_dag_processor"):

Review comment:
       Nice check

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -804,7 +808,7 @@ def _run_scheduler_loop(self) -> None:
                     loop_count,
                 )
                 break
-            if self.processor_agent.done:
+            if not self._standalone_dag_processor and self.processor_agent and self.processor_agent.done:

Review comment:
       ```suggestion
               if self.processor_agent and self.processor_agent.done:
   ```

##########
File path: airflow/dag_processing/manager.py
##########
@@ -383,14 +383,15 @@ def __init__(
         self._dag_ids = dag_ids
         self._async_mode = async_mode
         self._parsing_start_time: Optional[int] = None
+        self._standalone_mode = conf.getboolean("scheduler", "standalone_dag_processor")

Review comment:
       I think we do not need it here (see below). Using `signal_conn is not None` is more than enough I think and it will simplify quite a few ifs below (and removes the need for mental correlation between `standalone_mode` and `signal_conn`)

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -779,8 +783,8 @@ def _run_scheduler_loop(self) -> None:
                     self.executor.heartbeat()
                     session.expunge_all()
                     num_finished_events = self._process_executor_events(session=session)
-
-                self.processor_agent.heartbeat()
+                if not self._standalone_dag_processor and self.processor_agent:

Review comment:
       ```suggestion
                   if self.processor_agent:
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -734,7 +738,7 @@ def _run_scheduler_loop(self) -> None:
 
         :rtype: None
         """
-        if not self.processor_agent:
+        if not self._standalone_dag_processor and not self.processor_agent:

Review comment:
       ```suggestion
           if self.processor_agent:
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -684,23 +685,25 @@ def _execute(self) -> None:
 
             self.register_signals()
 
-            self.processor_agent.start()
+            if not self._standalone_dag_processor and self.processor_agent:
+                self.processor_agent.start()
 
             execute_start_time = timezone.utcnow()
 
             self._run_scheduler_loop()
 
-            # Stop any processors
-            self.processor_agent.terminate()
+            if not self._standalone_dag_processor and self.processor_agent:

Review comment:
       ```suggestion
               if self.processor_agent:
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -766,7 +770,7 @@ def _run_scheduler_loop(self) -> None:
         for loop_count in itertools.count(start=1):
             with Stats.timer() as timer:
 
-                if self.using_sqlite:
+                if not self._standalone_dag_processor and self.using_sqlite and self.processor_agent:

Review comment:
       ```suggestion
                   if self.processor_agent:
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -711,10 +714,11 @@ def _execute(self) -> None:
                 self.executor.end()
             except Exception:
                 self.log.exception("Exception when executing Executor.end")
-            try:
-                self.processor_agent.end()
-            except Exception:
-                self.log.exception("Exception when executing DagFileProcessorAgent.end")
+            if not self._standalone_dag_processor and self.processor_agent:

Review comment:
       ```suggestion
               if self.processor_agent:
   ```

##########
File path: airflow/dag_processing/manager.py
##########
@@ -530,10 +535,7 @@ def _run_parsing_loop(self):
         while True:
             loop_start_time = time.monotonic()
             ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
-            if self._signal_conn in ready:
-                if conf.getboolean("scheduler", "standalone_dag_processor"):
-                    # Nothing to do if callbacks are not stored in the database
-                    self._fetch_callbacks(maxCallbacksPerLoop)
+            if self._signal_conn in ready and not self._standalone_mode:

Review comment:
       Same as above. This if can be simplified I think:
   
   ```
               if self._standalone_mode:
                   self._fetch_callbacks(max_callbacks_per_loop)
               else:
                   ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
                   if self._signal_conn in ready:
   ````
   
   I think there is no point in cheacking "_signal_conn" and "ready" in standaline mode.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -684,23 +685,25 @@ def _execute(self) -> None:
 
             self.register_signals()
 
-            self.processor_agent.start()
+            if not self._standalone_dag_processor and self.processor_agent:

Review comment:
       ```suggestion
               if self.processor_agent:
   ```
   
   same reason as above. `_stamdalone_dag_processor` and `self.processor_agent` are correlated. No need to check "standalone" one - it will never be True when `self.processor_agent` is set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mhenc commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
mhenc commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1068887528


   cc: @potiuk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1084282464


   I think `dags` command is conceptualy different than `celery` as `celery` is really there to manage "celery workers". Dag is there mostly to manage "dags" and do some actions around it. Somehow `dags parser` or `dags processor` does not feel like belonging to the same group as `dags list` or `dags test`. Those are kind of dfferent "lifecycle".
   
   I think the main reason "worker" is under "celery" is that the whole "celery" group is disabled when celery executor is not used. I'd say "dags processor/parser" is a "core" component - similar as scheduler, webserver, triggerer - especially when we go into more isolation mode, it deserves "top-level" entry.
   
   I also thing "dags-parser" is worse than "dags-processor". One - because it introduces a new name, we already have DagProcessor and what it does is conceptually the same as the separated command. And Two - DagsProcessor does more than parsing - it Parses, Serializes, Stores the DAGS in the database but also executes Dag Callbacks. So by all mean "dags-parser" does not do the justice to what tthis component does. The `dags-processor` is much more accurate.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #22305:
URL: https://github.com/apache/airflow/pull/22305


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1077579351


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1083210925


   Have we considered `dag-parser` instead of `dag-processor`? Or maybe even `dags parser` (like we have `celery worker`)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1084282464


   I think `dags` command is conceptualy different than `celery` as `celery` is really there to manage "celery workers". Dag is there mostly to manage "dags" and do some actions around it. Somehow `dags parser` or `dags processor` does not feel like belonging to the same group as `dags list` or `dags test`. Those are kind of dfferent "lifecycle".
   
   I think the main reason "worker" is under "celery" is that the whole "celery" group is disabled when celery executor is not used. I'd say "dags processor/parser" is a "core" component - similar as scheduler, webserver, triggerer - especially when we go into more isolation mode, it deserves "top-level" entry.
   
   I also thing "dags-parser" is worse than "dags-processor". One - because it introduces a new name, we already have DagProcessor and what it does is conceptually the same as the separated command. And Two - DagsProcessor does more than parsing - it Parses, Serializes, Stores the DAGS in the database but also executes Dag Callbacks. So by all means "dags-parser" does not do the justice to what tthis component does. The `dags-processor` is much more accurate.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1081932313


   Any more reviews anyone:)? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#discussion_r841004257



##########
File path: airflow/cli/commands/dag_processor_command.py
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""DagProcessor command"""
+import logging
+from datetime import timedelta
+
+import daemon
+from daemon.pidfile import TimeoutPIDLockFile
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.utils import cli as cli_utils
+from airflow.utils.cli import setup_locations, setup_logging
+
+log = logging.getLogger(__name__)
+
+
+def _create_dag_processor_manager(args) -> DagFileProcessorManager:
+    """Creates DagFileProcessorProcess instance."""
+    processor_timeout_seconds: int = conf.getint('core', 'dag_file_processor_timeout')
+    processor_timeout = timedelta(seconds=processor_timeout_seconds)
+    return DagFileProcessorManager(
+        dag_directory=settings.DAGS_FOLDER,
+        max_runs=args.num_runs,
+        processor_timeout=processor_timeout,
+        dag_ids=[],
+        pickle_dags=args.do_pickle,
+    )
+
+
+@cli_utils.action_cli
+def dag_processor(args):
+    """Starts Airflow Dag Processor Job"""
+    if not conf.getboolean("scheduler", "standalone_dag_processor"):
+        raise SystemExit('The option [scheduler/standalone_dag_processor] must be True.')
+
+    sql_conn: str = conf.get('core', 'sql_alchemy_conn').lower()
+    if sql_conn.startswith('sqlite'):
+        raise SystemExit('Standalone DagProcessor is not supported when using sqlite.')
+
+    manager = _create_dag_processor_manager(args)
+
+    if args.daemon:
+        pid, stdout, stderr, log_file = setup_locations(
+            "dag-processor", args.pid, args.stdout, args.stderr, args.log_file
+        )
+        handle = setup_logging(log_file)
+        with open(stdout, 'w+') as stdout_handle, open(stderr, 'w+') as stderr_handle:
+            ctx = daemon.DaemonContext(
+                pidfile=TimeoutPIDLockFile(pid, -1),
+                files_preserve=[handle],
+                stdout=stdout_handle,
+                stderr=stderr_handle,
+            )
+            with ctx:
+                try:
+                    manager.register_exit_signals()

Review comment:
       Should we also call.`manager.start`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#discussion_r838050912



##########
File path: airflow/dag_processing/manager.py
##########
@@ -239,13 +239,13 @@ def _run_processor_manager(
         airflow.settings.initialize()
         del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
         processor_manager = DagFileProcessorManager(
-            dag_directory,
-            max_runs,
-            processor_timeout,
-            signal_conn,
-            dag_ids,
-            pickle_dags,
-            async_mode,
+            dag_directory=dag_directory,
+            max_runs=max_runs,
+            processor_timeout=processor_timeout,
+            signal_conn=signal_conn,
+            dag_ids=dag_ids,
+            pickle_dags=pickle_dags,

Review comment:
       ```suggestion
               dag_ids=dag_ids,
               pickle_dags=pickle_dags,
               signal_conn=signal_conn,
   ```
   
   Might as well reorder here as well, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1084282464


   I think `dags` command is conceptualy different than `celery` as `celery` is really there to manage "celery workers". Dag is there mostly to manage "dags" and do some actions around it. Somehow `dags parser` or `dags processor` does not feel like belonging to the same group as `dags list` or `dags test`. Those are kind of dfferent "lifecycle".
   
   I think the main reason "worker" is under "celery" is that the whole "celery" group is disabled when celery executor is not used. I'd say "dags processor/parser" is a "core" component - similar as scheduler, webserver, triggerer - especially when we go into more isolation mode, it deserves "top-level" entry.
   
   I also thing "dags-parser" is worse than "dags-processor". One - because it introduces a new name, we already have DagProcessor and what it does is conceptually the same as the separated command. And Two - DagsProcessor does more than parsing - it Parses, Serializes, Stores the DAGS in the database but also executes Dag Callbacks. So by all means "dags-parser" does not do the justice to what tthis component does. The `dags-processor` is much more accurate.
   
   I would think what's better "dags-processor" or "dag-processor" and I am not sure about plural vs. singular though.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mhenc commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
mhenc commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1078798879


   I will update documentation after we run more tests - of course I ran my own tests, but I hope someone else verify it as well, before we add it to the public docs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #22305: Add dag-processor cli command

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#issuecomment-1077579306


   Since this is a core part of scheduler behaviour we definitely need another commiter's review. Anyone? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org