You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/01 15:58:10 UTC

[GitHub] [airflow] jhtimmins opened a new pull request #8671: WIP - Resolve serialization issue

jhtimmins opened a new pull request #8671:
URL: https://github.com/apache/airflow/pull/8671


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Target Github ISSUE in description if exists
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] BasPH commented on pull request #8671: WIP - Resolve serialization issue

Posted by GitBox <gi...@apache.org>.
BasPH commented on pull request #8671:
URL: https://github.com/apache/airflow/pull/8671#issuecomment-622875501


   Could you the describe the issue you're fixing here?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8671: Fix pickling failure when spawning processes

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8671:
URL: https://github.com/apache/airflow/pull/8671#discussion_r420224802



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -951,6 +955,19 @@ def _prepare_simple_dags(self, dags: List[DAG], pickle_dags: bool, session=None)
         return simple_dags
 
 
+def processor_factory(file_path, failure_callback_requests, dag_ids, pickle_dags):
+    """
+    Returns DagFileProcessorProcess instance. Will get pickled when start_method is
+    'spawn', so this can't be an class method.

Review comment:
       Fwiw I think this could be _static_ method, just not an instance method. Check out what we do for SchedulerJob:
   
   https://github.com/apache/airflow/blob/8d6f1aa4b5bb8809ffc55dc0c62e6d0e89f331e5/airflow/jobs/scheduler_job.py#L185-L196
   
   and
   
   https://github.com/apache/airflow/blob/8d6f1aa4b5bb8809ffc55dc0c62e6d0e89f331e5/airflow/jobs/scheduler_job.py#L111-L112
   
   Just thinking it would be good to be consistent.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1525,16 +1542,6 @@ def _execute(self):
         if self.do_pickle and self.executor.__class__ not in (LocalExecutor, SequentialExecutor):
             pickle_dags = True
 
-        self.log.info("Processing each file at most %s times", self.num_runs)

Review comment:
       You removed this log, did you mean to?

##########
File path: airflow/utils/mixins.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import multiprocessing
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+
+
+class MultiprocessingStartMethodMixin:
+    """
+    Convenience class to add support for different types of multiprocessing.
+    """
+    def _get_multiprocessing_start_method(self):
+        """
+        Determine method of creating new processes by checking if the
+        mp_start_method is set in configs, else, it uses the OS default.
+        """
+        if conf.has_option('core', 'mp_start_method'):
+            mp_start_method = conf.get('core', 'mp_start_method')
+        else:
+            mp_start_method = multiprocessing.get_start_method()
+        possible_value_list = multiprocessing.get_all_start_methods()
+
+        if mp_start_method not in possible_value_list:

Review comment:
       This should probably be enforced by `validate()` in airflow.configuration.ConfigParser

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2487,6 +2487,32 @@ def test_scheduler_multiprocessing(self):
         self.assertEqual(
             len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 0)
 
+    def test_scheduler_multiprocessing_with_spawn_method(self):
+        """
+        Test that the scheduler can successfully queue multiple dags in parallel
+        """
+        try:
+            conf.set('core', 'mp_start_method', 'spawn')

Review comment:
       Is there a reason we can't use the `conf_var` decorator here?

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2487,6 +2487,32 @@ def test_scheduler_multiprocessing(self):
         self.assertEqual(
             len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 0)
 
+    def test_scheduler_multiprocessing_with_spawn_method(self):
+        """
+        Test that the scheduler can successfully queue multiple dags in parallel
+        """
+        try:
+            conf.set('core', 'mp_start_method', 'spawn')
+            dag_ids = ['test_start_date_scheduling', 'test_dagrun_states_success']
+            for dag_id in dag_ids:
+                dag = self.dagbag.get_dag(dag_id)
+                dag.clear()
+
+            scheduler = SchedulerJob(dag_ids=dag_ids,
+                                     executor=self.null_exec,
+                                     subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'),
+                                     num_runs=1)
+
+            scheduler.run()
+
+            # zero tasks ran
+            dag_id = 'test_start_date_scheduling'
+            session = settings.Session()
+            self.assertEqual(
+                len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 0)

Review comment:
       This leaks an session (or at least doesn't close/give it back as quick as it might
   
   ```suggestion
               with create_session() as session;
                   self.assertEqual(
                       session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).count(), 0)
   ```

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2487,6 +2487,32 @@ def test_scheduler_multiprocessing(self):
         self.assertEqual(
             len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 0)
 
+    def test_scheduler_multiprocessing_with_spawn_method(self):
+        """
+        Test that the scheduler can successfully queue multiple dags in parallel

Review comment:
       ```suggestion
           Test that the scheduler can successfully queue multiple dags in parallel
           when using "spawn" mode of multiprocessing. (Fork is default on Linux and older OSX)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on pull request #8671: WIP - Resolve serialization issue

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on pull request #8671:
URL: https://github.com/apache/airflow/pull/8671#issuecomment-623533033


   Hi @BasPH @tooptoop4, this PR is a continuation of https://github.com/apache/airflow/pull/7128, which was started by @sarutak. This addresses issues causing Airflow to fail on Python 3.8. I'll add additional info when I submit the PR that will explain the specific changes and the reason/need for them.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on a change in pull request #8671: Fix pickling failure when spawning processes

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on a change in pull request #8671:
URL: https://github.com/apache/airflow/pull/8671#discussion_r420367691



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -951,6 +955,19 @@ def _prepare_simple_dags(self, dags: List[DAG], pickle_dags: bool, session=None)
         return simple_dags
 
 
+def processor_factory(file_path, failure_callback_requests, dag_ids, pickle_dags):
+    """
+    Returns DagFileProcessorProcess instance. Will get pickled when start_method is
+    'spawn', so this can't be an class method.

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on a change in pull request #8671: Fix pickling failure when spawning processes

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on a change in pull request #8671:
URL: https://github.com/apache/airflow/pull/8671#discussion_r420367691



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -951,6 +955,19 @@ def _prepare_simple_dags(self, dags: List[DAG], pickle_dags: bool, session=None)
         return simple_dags
 
 
+def processor_factory(file_path, failure_callback_requests, dag_ids, pickle_dags):
+    """
+    Returns DagFileProcessorProcess instance. Will get pickled when start_method is
+    'spawn', so this can't be an class method.

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org