You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/22 11:51:56 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #22305: Add dag-processor cli command

potiuk commented on a change in pull request #22305:
URL: https://github.com/apache/airflow/pull/22305#discussion_r832071565



##########
File path: airflow/dag_processing/manager.py
##########
@@ -383,14 +383,15 @@ def __init__(
         self._dag_ids = dag_ids
         self._async_mode = async_mode
         self._parsing_start_time: Optional[int] = None
+        self._standalone_mode = conf.getboolean("scheduler", "standalone_dag_processor")
 
         # Set the signal conn in to non-blocking mode, so that attempting to
         # send when the buffer is full errors, rather than hangs for-ever
         # attempting to send (this is to avoid deadlocks!)
         #
         # Don't do this in sync_mode, as we _need_ the DagParsingStat sent to
         # continue the scheduler
-        if self._async_mode:
+        if self._async_mode and not self._standalone_mode and self._signal_conn is not None:

Review comment:
       Do we ever get the situation that we have `self._signal_conn` and 'self._standalone_mode`  both set ? I do not think so - after this change we can simplify that check to just `and not self._standalone_mode`. 
   
   I believe after this change we only have two cases:
   
   a) Standalone mode -> no  _signal_conn set
   b) Non-standalone mode -> _signal_conn  set
   
   

##########
File path: airflow/cli/commands/dag_processor_command.py
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""DagProcessor command"""
+import logging
+from datetime import timedelta
+
+import daemon
+from daemon.pidfile import TimeoutPIDLockFile
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.utils import cli as cli_utils
+from airflow.utils.cli import setup_locations, setup_logging
+
+log = logging.getLogger(__name__)
+
+
+def _create_dag_processor_manager(args) -> DagFileProcessorManager:
+    """Creates DagFileProcessorProcess instance."""
+    processor_timeout_seconds: int = conf.getint('core', 'dag_file_processor_timeout')
+    processor_timeout = timedelta(seconds=processor_timeout_seconds)
+    return DagFileProcessorManager(
+        dag_directory=settings.DAGS_FOLDER,
+        max_runs=args.num_runs,
+        processor_timeout=processor_timeout,
+        dag_ids=[],
+        pickle_dags=args.do_pickle,
+    )
+
+
+@cli_utils.action_cli
+def dag_processor(args):
+    """Starts Airflow Dag Processor Job"""
+    if not conf.getboolean("scheduler", "standalone_dag_processor"):
+        raise SystemExit('The option [scheduler/standalone_dag_processor] must be True.')
+
+    sql_conn: str = conf.get('core', 'sql_alchemy_conn').lower()
+    if sql_conn.startswith('sqlite'):
+        raise SystemExit('Standalone DagProcessor is not supported when using sqlite.')

Review comment:
       Same here :)

##########
File path: airflow/cli/commands/dag_processor_command.py
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""DagProcessor command"""
+import logging
+from datetime import timedelta
+
+import daemon
+from daemon.pidfile import TimeoutPIDLockFile
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.utils import cli as cli_utils
+from airflow.utils.cli import setup_locations, setup_logging
+
+log = logging.getLogger(__name__)
+
+
+def _create_dag_processor_manager(args) -> DagFileProcessorManager:
+    """Creates DagFileProcessorProcess instance."""
+    processor_timeout_seconds: int = conf.getint('core', 'dag_file_processor_timeout')
+    processor_timeout = timedelta(seconds=processor_timeout_seconds)
+    return DagFileProcessorManager(
+        dag_directory=settings.DAGS_FOLDER,
+        max_runs=args.num_runs,
+        processor_timeout=processor_timeout,
+        dag_ids=[],
+        pickle_dags=args.do_pickle,
+    )
+
+
+@cli_utils.action_cli
+def dag_processor(args):
+    """Starts Airflow Dag Processor Job"""
+    if not conf.getboolean("scheduler", "standalone_dag_processor"):

Review comment:
       Nice check

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -804,7 +808,7 @@ def _run_scheduler_loop(self) -> None:
                     loop_count,
                 )
                 break
-            if self.processor_agent.done:
+            if not self._standalone_dag_processor and self.processor_agent and self.processor_agent.done:

Review comment:
       ```suggestion
               if self.processor_agent and self.processor_agent.done:
   ```

##########
File path: airflow/dag_processing/manager.py
##########
@@ -383,14 +383,15 @@ def __init__(
         self._dag_ids = dag_ids
         self._async_mode = async_mode
         self._parsing_start_time: Optional[int] = None
+        self._standalone_mode = conf.getboolean("scheduler", "standalone_dag_processor")

Review comment:
       I think we do not need it here (see below). Using `signal_conn is not None` is more than enough I think and it will simplify quite a few ifs below (and removes the need for mental correlation between `standalone_mode` and `signal_conn`)

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -779,8 +783,8 @@ def _run_scheduler_loop(self) -> None:
                     self.executor.heartbeat()
                     session.expunge_all()
                     num_finished_events = self._process_executor_events(session=session)
-
-                self.processor_agent.heartbeat()
+                if not self._standalone_dag_processor and self.processor_agent:

Review comment:
       ```suggestion
                   if self.processor_agent:
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -734,7 +738,7 @@ def _run_scheduler_loop(self) -> None:
 
         :rtype: None
         """
-        if not self.processor_agent:
+        if not self._standalone_dag_processor and not self.processor_agent:

Review comment:
       ```suggestion
           if self.processor_agent:
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -684,23 +685,25 @@ def _execute(self) -> None:
 
             self.register_signals()
 
-            self.processor_agent.start()
+            if not self._standalone_dag_processor and self.processor_agent:
+                self.processor_agent.start()
 
             execute_start_time = timezone.utcnow()
 
             self._run_scheduler_loop()
 
-            # Stop any processors
-            self.processor_agent.terminate()
+            if not self._standalone_dag_processor and self.processor_agent:

Review comment:
       ```suggestion
               if self.processor_agent:
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -766,7 +770,7 @@ def _run_scheduler_loop(self) -> None:
         for loop_count in itertools.count(start=1):
             with Stats.timer() as timer:
 
-                if self.using_sqlite:
+                if not self._standalone_dag_processor and self.using_sqlite and self.processor_agent:

Review comment:
       ```suggestion
                   if self.processor_agent:
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -711,10 +714,11 @@ def _execute(self) -> None:
                 self.executor.end()
             except Exception:
                 self.log.exception("Exception when executing Executor.end")
-            try:
-                self.processor_agent.end()
-            except Exception:
-                self.log.exception("Exception when executing DagFileProcessorAgent.end")
+            if not self._standalone_dag_processor and self.processor_agent:

Review comment:
       ```suggestion
               if self.processor_agent:
   ```

##########
File path: airflow/dag_processing/manager.py
##########
@@ -530,10 +535,7 @@ def _run_parsing_loop(self):
         while True:
             loop_start_time = time.monotonic()
             ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
-            if self._signal_conn in ready:
-                if conf.getboolean("scheduler", "standalone_dag_processor"):
-                    # Nothing to do if callbacks are not stored in the database
-                    self._fetch_callbacks(maxCallbacksPerLoop)
+            if self._signal_conn in ready and not self._standalone_mode:

Review comment:
       Same as above. This if can be simplified I think:
   
   ```
               if self._standalone_mode:
                   self._fetch_callbacks(max_callbacks_per_loop)
               else:
                   ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
                   if self._signal_conn in ready:
   ````
   
   I think there is no point in cheacking "_signal_conn" and "ready" in standaline mode.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -684,23 +685,25 @@ def _execute(self) -> None:
 
             self.register_signals()
 
-            self.processor_agent.start()
+            if not self._standalone_dag_processor and self.processor_agent:

Review comment:
       ```suggestion
               if self.processor_agent:
   ```
   
   same reason as above. `_stamdalone_dag_processor` and `self.processor_agent` are correlated. No need to check "standalone" one - it will never be True when `self.processor_agent` is set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org