You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/06 09:24:50 UTC

[GitHub] ashb commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize Celery Executor

ashb commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize Celery Executor
URL: https://github.com/apache/incubator-airflow/pull/3830#discussion_r215555183
 
 

 ##########
 File path: airflow/executors/celery_executor.py
 ##########
 @@ -85,30 +139,67 @@ def execute_async(self, key, command,
             args=[command], queue=queue)
         self.last_state[key] = celery_states.PENDING
 
+    def _num_tasks_per_process(self):
+        """
+        How many Celery tasks should be sent to each worker process.
+        :return: Number of tasks that should be used per process
+        :rtype: int
+        """
+        return max(1,
+                   int(math.ceil(1.0 * len(self.tasks) / self._sync_parallelism)))
+
     def sync(self):
-        self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
-        for key, task in list(self.tasks.items()):
-            try:
-                state = task.state
-                if self.last_state[key] != state:
-                    if state == celery_states.SUCCESS:
-                        self.success(key)
-                        del self.tasks[key]
-                        del self.last_state[key]
-                    elif state == celery_states.FAILURE:
-                        self.fail(key)
-                        del self.tasks[key]
-                        del self.last_state[key]
-                    elif state == celery_states.REVOKED:
-                        self.fail(key)
-                        del self.tasks[key]
-                        del self.last_state[key]
-                    else:
-                        self.log.info("Unexpected state: %s", state)
-                        self.last_state[key] = state
-            except Exception as e:
-                self.log.error("Error syncing the celery executor, ignoring it:")
-                self.log.exception(e)
+        num_processes = min(len(self.tasks), self._sync_parallelism)
+        if num_processes == 0:
+            self.log.debug("No task to query celery, skipping sync")
+            return
+
+        self.log.debug("Inquiring about %s celery task(s) using %s processes",
+                       len(self.tasks), num_processes)
+
+        # Recreate the process pool each sync in case processes in the pool die
+        self._sync_pool = Pool(processes=num_processes)
 
 Review comment:
   Do we need to do any special handling to ensure of the SQLA connection when using multiprocssing? (I guess to make sure we don't end up with 16 extra connections to the DB that we don't need)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services