You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/29 15:10:00 UTC

[GitHub] [airflow] jhtimmins commented on a change in pull request #7128: [AIRFLOW-6529] Pickle error occurs when the scheduler tries to run on macOS.

jhtimmins commented on a change in pull request #7128:
URL: https://github.com/apache/airflow/pull/7128#discussion_r417022066



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -131,10 +131,47 @@ def __enter__(self):
     def __exit__(self, *exc_info):
         # shutil.rmtree(self.settings_root)
         # Reset config
+
         conf.set('logging', 'logging_config_class', '')
         sys.path.remove(self.settings_root)
 
 
+class FakeDagFileProcessorRunner(DagFileProcessorProcess):
+    # This fake processor will return the zombies it received in constructor
+    # as its processing result w/o actually parsing anything.
+    def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
+        super().__init__(file_path, pickle_dags, dag_id_white_list, zombies)
+        self._result = zombies, 0
+
+    def start(self):
+        pass
+
+    @property
+    def start_time(self):
+        return DEFAULT_DATE
+
+    @property
+    def pid(self):
+        return 1234
+
+    @property
+    def done(self):
+        return True
+
+    @property
+    def result(self):
+        return self._result
+
+
+def fake_dag_file_processor_factory(file_path, zombies, dag_ids, pickle_dags):

Review comment:
       For clarity, it might be worth keeping the argument order consistent. So the preceding line would be:
   `def fake_dag_file_processor_factory(file_path, pickle_dags, dag_ids, zombies):`

##########
File path: tests/utils/test_dag_processing.py
##########
@@ -131,10 +131,47 @@ def __enter__(self):
     def __exit__(self, *exc_info):
         # shutil.rmtree(self.settings_root)
         # Reset config
+
         conf.set('logging', 'logging_config_class', '')
         sys.path.remove(self.settings_root)
 
 
+class FakeDagFileProcessorRunner(DagFileProcessorProcess):
+    # This fake processor will return the zombies it received in constructor
+    # as its processing result w/o actually parsing anything.
+    def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
+        super().__init__(file_path, pickle_dags, dag_id_white_list, zombies)
+        self._result = zombies, 0

Review comment:
       Might be worth defining the zero to provide some context for what it represents.

##########
File path: airflow/utils/dag_processing.py
##########
@@ -332,8 +340,20 @@ def start(self):
         """
         Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
         """
-        self._parent_signal_conn, child_signal_conn = multiprocessing.Pipe()
-        self._process = multiprocessing.Process(
+        if conf.has_option('core', 'mp_start_method'):
+            mp_start_method = conf.get('core', 'mp_start_method')
+        else:
+            mp_start_method = mp.get_start_method()
+
+        possible_value_list = mp.get_all_start_methods()
+        if mp_start_method not in possible_value_list:
+            raise AirflowConfigException(
+                "mp_start_method should not be " + mp_start_method +
+                ". Possible value is one of " + str(possible_value_list))
+        cxt = mp.get_context(mp_start_method)
+
+        self._parent_signal_conn, child_signal_conn = cxt.Pipe()
+        self._process = cxt.Process(

Review comment:
       It looks like this block of code is identical to https://github.com/apache/airflow/pull/7128/files#diff-c35269bcfbbe386e269ffa7487e86192R171-R184. Given that they would probably need to change at the same time if either ever changed, this might be a good case for a utility mixin. Lines 343-352 could go into a MultiProcessingConfigMixin class or similar. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org