You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/05 16:14:05 UTC

[GitHub] [airflow] ashb commented on a change in pull request #8671: Fix pickling failure when spawning processes

ashb commented on a change in pull request #8671:
URL: https://github.com/apache/airflow/pull/8671#discussion_r420224802



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -951,6 +955,19 @@ def _prepare_simple_dags(self, dags: List[DAG], pickle_dags: bool, session=None)
         return simple_dags
 
 
+def processor_factory(file_path, failure_callback_requests, dag_ids, pickle_dags):
+    """
+    Returns DagFileProcessorProcess instance. Will get pickled when start_method is
+    'spawn', so this can't be an class method.

Review comment:
       Fwiw I think this could be _static_ method, just not an instance method. Check out what we do for SchedulerJob:
   
   https://github.com/apache/airflow/blob/8d6f1aa4b5bb8809ffc55dc0c62e6d0e89f331e5/airflow/jobs/scheduler_job.py#L185-L196
   
   and
   
   https://github.com/apache/airflow/blob/8d6f1aa4b5bb8809ffc55dc0c62e6d0e89f331e5/airflow/jobs/scheduler_job.py#L111-L112
   
   Just thinking it would be good to be consistent.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1525,16 +1542,6 @@ def _execute(self):
         if self.do_pickle and self.executor.__class__ not in (LocalExecutor, SequentialExecutor):
             pickle_dags = True
 
-        self.log.info("Processing each file at most %s times", self.num_runs)

Review comment:
       You removed this log, did you mean to?

##########
File path: airflow/utils/mixins.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import multiprocessing
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+
+
+class MultiprocessingStartMethodMixin:
+    """
+    Convenience class to add support for different types of multiprocessing.
+    """
+    def _get_multiprocessing_start_method(self):
+        """
+        Determine method of creating new processes by checking if the
+        mp_start_method is set in configs, else, it uses the OS default.
+        """
+        if conf.has_option('core', 'mp_start_method'):
+            mp_start_method = conf.get('core', 'mp_start_method')
+        else:
+            mp_start_method = multiprocessing.get_start_method()
+        possible_value_list = multiprocessing.get_all_start_methods()
+
+        if mp_start_method not in possible_value_list:

Review comment:
       This should probably be enforced by `validate()` in airflow.configuration.ConfigParser

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2487,6 +2487,32 @@ def test_scheduler_multiprocessing(self):
         self.assertEqual(
             len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 0)
 
+    def test_scheduler_multiprocessing_with_spawn_method(self):
+        """
+        Test that the scheduler can successfully queue multiple dags in parallel
+        """
+        try:
+            conf.set('core', 'mp_start_method', 'spawn')

Review comment:
       Is there a reason we can't use the `conf_var` decorator here?

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2487,6 +2487,32 @@ def test_scheduler_multiprocessing(self):
         self.assertEqual(
             len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 0)
 
+    def test_scheduler_multiprocessing_with_spawn_method(self):
+        """
+        Test that the scheduler can successfully queue multiple dags in parallel
+        """
+        try:
+            conf.set('core', 'mp_start_method', 'spawn')
+            dag_ids = ['test_start_date_scheduling', 'test_dagrun_states_success']
+            for dag_id in dag_ids:
+                dag = self.dagbag.get_dag(dag_id)
+                dag.clear()
+
+            scheduler = SchedulerJob(dag_ids=dag_ids,
+                                     executor=self.null_exec,
+                                     subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'),
+                                     num_runs=1)
+
+            scheduler.run()
+
+            # zero tasks ran
+            dag_id = 'test_start_date_scheduling'
+            session = settings.Session()
+            self.assertEqual(
+                len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 0)

Review comment:
       This leaks an session (or at least doesn't close/give it back as quick as it might
   
   ```suggestion
               with create_session() as session;
                   self.assertEqual(
                       session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).count(), 0)
   ```

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2487,6 +2487,32 @@ def test_scheduler_multiprocessing(self):
         self.assertEqual(
             len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 0)
 
+    def test_scheduler_multiprocessing_with_spawn_method(self):
+        """
+        Test that the scheduler can successfully queue multiple dags in parallel

Review comment:
       ```suggestion
           Test that the scheduler can successfully queue multiple dags in parallel
           when using "spawn" mode of multiprocessing. (Fork is default on Linux and older OSX)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org