You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/06/09 00:11:04 UTC
incubator-airflow git commit: [AIRFLOW-1024] Ignore celery executor
errors (#49)
Repository: incubator-airflow
Updated Branches:
refs/heads/master 38cbf132a -> 7af20fe45
[AIRFLOW-1024] Ignore celery executor errors (#49)
Code defensively around the interactions with
celery so that
we just log errors instead of crashing the
scheduler.
It might makes sense to make the try catches one
level higher
(to catch errors from all executors), but this
needs some investigation.
Closes #2355 from aoen/ddavydov--
handle_celery_executor_errors_gracefully
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7af20fe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7af20fe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7af20fe4
Branch: refs/heads/master
Commit: 7af20fe452bad36767c43720168b5a34ce69eb0a
Parents: 38cbf13
Author: Dan Davydov <da...@airbnb.com>
Authored: Thu Jun 8 17:10:35 2017 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Thu Jun 8 17:10:38 2017 -0700
----------------------------------------------------------------------
airflow/executors/celery_executor.py | 39 +++++++++++++++++--------------
1 file changed, 22 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7af20fe4/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 0b6cd59..d7f74c6 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -17,6 +17,7 @@ import logging
import subprocess
import ssl
import time
+import traceback
from celery import Celery
from celery import states as celery_states
@@ -101,23 +102,27 @@ class CeleryExecutor(BaseExecutor):
self.logger.debug(
"Inquiring about {} celery task(s)".format(len(self.tasks)))
for key, async in list(self.tasks.items()):
- state = async.state
- if self.last_state[key] != state:
- if state == celery_states.SUCCESS:
- self.success(key)
- del self.tasks[key]
- del self.last_state[key]
- elif state == celery_states.FAILURE:
- self.fail(key)
- del self.tasks[key]
- del self.last_state[key]
- elif state == celery_states.REVOKED:
- self.fail(key)
- del self.tasks[key]
- del self.last_state[key]
- else:
- self.logger.info("Unexpected state: " + async.state)
- self.last_state[key] = async.state
+ try:
+ state = async.state
+ if self.last_state[key] != state:
+ if state == celery_states.SUCCESS:
+ self.success(key)
+ del self.tasks[key]
+ del self.last_state[key]
+ elif state == celery_states.FAILURE:
+ self.fail(key)
+ del self.tasks[key]
+ del self.last_state[key]
+ elif state == celery_states.REVOKED:
+ self.fail(key)
+ del self.tasks[key]
+ del self.last_state[key]
+ else:
+ self.logger.info("Unexpected state: " + async.state)
+ self.last_state[key] = async.state
+ except Exception as e:
+ logging.error("Error syncing the celery executor, ignoring "
+ "it:\n{}\n".format(e, traceback.format_exc()))
def end(self, synchronous=False):
if synchronous: