You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/07/23 10:43:49 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #5615: [AIRFLOW-XXX] Use events/messages not multiprocessing.Manager

potiuk commented on a change in pull request #5615: [AIRFLOW-XXX] Use events/messages not multiprocessing.Manager
URL: https://github.com/apache/airflow/pull/5615#discussion_r306249102
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -543,94 +543,86 @@ def wait_until_finished(self):
         Should only be used when launched DAG file processor manager in sync mode.
         Wait for done signal from the manager.
         """
-        while True:
-            if self._parent_signal_conn.recv() == DagParsingSignal.MANAGER_DONE:
-                break
+        while not self.done:
+            result = self._parent_signal_conn.recve()
+            if isinstance(result, DagParsingStat):
+                self._sync_metadata(result)
 
     @staticmethod
-    def _launch_process(dag_directory,
-                        file_paths,
-                        max_runs,
-                        processor_factory,
-                        signal_conn,
-                        _stat_queue,
-                        result_queue,
-                        async_mode):
-        def helper():
-            # Reload configurations and settings to avoid collision with parent process.
-            # Because this process may need custom configurations that cannot be shared,
-            # e.g. RotatingFileHandler. And it can cause connection corruption if we
-            # do not recreate the SQLA connection pool.
-            os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
-            # Replicating the behavior of how logging module was loaded
-            # in logging_config.py
-            reload_module(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.', 1)[0]))
-            reload_module(airflow.settings)
-            airflow.settings.initialize()
-            del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
-            processor_manager = DagFileProcessorManager(dag_directory,
-                                                        file_paths,
-                                                        max_runs,
-                                                        processor_factory,
-                                                        signal_conn,
-                                                        _stat_queue,
-                                                        result_queue,
-                                                        async_mode)
-
-            processor_manager.start()
-
-        p = multiprocessing.Process(target=helper,
-                                    args=(),
-                                    name="DagFileProcessorManager")
-        p.start()
-        return p
+    def _run_processor_manager(dag_directory,
+                               file_paths,
+                               max_runs,
+                               processor_factory,
+                               signal_conn,
+                               async_mode):
+        from setproctitle import setproctitle
+        setproctitle("airflow scheduler -- DagFileProcessorManager")
+        # Reload configurations and settings to avoid collision with parent process.
+        # Because this process may need custom configurations that cannot be shared,
+        # e.g. RotatingFileHandler. And it can cause connection corruption if we
+        # do not recreate the SQLA connection pool.
+        os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
+        # Replicating the behavior of how logging module was loaded
+        # in logging_config.py
+        reload_module(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.', 1)[0]))
+        reload_module(airflow.settings)
+        airflow.settings.initialize()
+        del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
+        processor_manager = DagFileProcessorManager(dag_directory,
+                                                    file_paths,
+                                                    max_runs,
+                                                    processor_factory,
+                                                    signal_conn,
+                                                    async_mode)
+
+        processor_manager.start()
 
     def harvest_simple_dags(self):
         """
         Harvest DAG parsing results from result queue and sync metadata from stat queue.
+
         :return: List of parsing result in SimpleDag format.
         """
-        # Metadata and results to be harvested can be inconsistent,
-        # but it should not be a big problem.
-        self._sync_metadata()
-        # Heartbeating after syncing metadata so we do not restart manager
-        # if it processed all files for max_run times and exit normally.
-        self._heartbeat_manager()
+
         simple_dags = []
-        while True:
-            try:
-                result = self._result_queue.get_nowait()
-                try:
-                    simple_dags.append(result)
-                finally:
-                    self._result_queue.task_done()
-            except Empty:
-                break
+        while self._parent_signal_conn.poll():
+            result = self._parent_signal_conn.recv()
+
+            self.log.debug("Received message of type %s", type(result).__name__)
+            if isinstance(result, DagParsingStat):
+                self._sync_metadata(result)
+            else:
+                simple_dags.append(result)
+
+        # Receive any pending messages before checking if the process has exited.
+        self._heartbeat_manager()
 
         self._result_count = 0
 
         return simple_dags
 
     def _heartbeat_manager(self):
         """
-        Heartbeat DAG file processor and start it if it is not alive.
-        :return:
+        Heartbeat DAG file processor and retstart it if we are not done.
 
 Review comment:
   NIT: Retstart -> restart

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services