You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/12/02 15:23:02 UTC

[4/5] incubator-airflow git commit: [AIRFLOW-1665] Reconnect on database errors

[AIRFLOW-1665] Reconnect on database errors

This change enables the scheduler to recover from temporary database
errors and downtimes. The same holds true for the webserver if run
without its regular worker refresh.

The reconnect logic is based on a truncated exponential binary backoff
to ensure reconnect attempts don't overload the database.

Included changes:

* Switch to recommended pessimistic disconnect handling for engines
  http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic
* Remove legacy pool-based disconnect handling.
* Ensure event handlers are registered for each newly created engine.
  Engines are re-initialized in child processes so this is crucial for
  correctness.

This commit is based on a contribution by @vklogin
https://github.com/apache/incubator-airflow/pull/2744


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/94deac34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/94deac34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/94deac34

Branch: refs/heads/master
Commit: 94deac34eca869a0accbc6affe7640b09dab1530
Parents: 6bf1a6e
Author: Stephan Erb <st...@blue-yonder.com>
Authored: Sun Nov 26 21:28:12 2017 +0100
Committer: Stephan Erb <st...@blue-yonder.com>
Committed: Wed Nov 29 12:29:47 2017 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py                           |   1 -
 airflow/config_templates/default_airflow.cfg |   4 +
 airflow/jobs.py                              |   4 +-
 airflow/settings.py                          |   4 +
 airflow/utils/db.py                          |  34 -------
 airflow/utils/sqlalchemy.py                  | 108 ++++++++++++++++++++++
 6 files changed, 117 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 4e56d54..c0ab5e3 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -328,7 +328,6 @@ def run(args, dag=None):
     # while it's waiting for the task to finish.
     settings.configure_orm(disable_connection_pool=True)
 
-    db_utils.pessimistic_connection_handling()
     if dag:
         args.dag_id = dag.dag_id
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 7a8ddf2..32af0a3 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -79,6 +79,10 @@ sql_alchemy_pool_size = 5
 # not apply to sqlite.
 sql_alchemy_pool_recycle = 3600
 
+# How many seconds to retry re-establishing a DB connection after
+# disconnects. Setting this to 0 disables retries.
+sql_alchemy_reconnect_timeout = 300
+
 # The amount of parallelism as a setting to the executor. This defines
 # the max number of task instances that should run simultaneously
 # on this airflow installation

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 91ab96c..ed6da60 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -54,8 +54,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
                                           SimpleDag,
                                           SimpleDagBag,
                                           list_py_file_paths)
-from airflow.utils.db import (
-    create_session, provide_session, pessimistic_connection_handling)
+from airflow.utils.db import create_session, provide_session
 from airflow.utils.email import send_email
 from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter
 from airflow.utils.state import State
@@ -1508,7 +1507,6 @@ class SchedulerJob(BaseJob):
 
     def _execute(self):
         self.log.info("Starting the scheduler")
-        pessimistic_connection_handling()
 
         # DAGs can be pickled for easier remote execution by some executors
         pickle_dags = False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 04d3548..929663b 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -28,6 +28,7 @@ from sqlalchemy.pool import NullPool
 
 from airflow import configuration as conf
 from airflow.logging_config import configure_logging
+from airflow.utils.sqlalchemy import setup_event_handlers
 
 log = logging.getLogger(__name__)
 
@@ -152,6 +153,9 @@ def configure_orm(disable_connection_pool=False):
                                                   'SQL_ALCHEMY_POOL_RECYCLE')
 
     engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
+    reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT')
+    setup_event_handlers(engine, reconnect_timeout)
+
     Session = scoped_session(
         sessionmaker(autocommit=False, autoflush=False, bind=engine))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 9c924d1..64ce220 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -22,9 +22,6 @@ from functools import wraps
 import os
 import contextlib
 
-from sqlalchemy import event, exc
-from sqlalchemy.pool import Pool
-
 from airflow import settings
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -74,21 +71,6 @@ def provide_session(func):
     return wrapper
 
 
-def pessimistic_connection_handling():
-    @event.listens_for(Pool, "checkout")
-    def ping_connection(dbapi_connection, connection_record, connection_proxy):
-        '''
-        Disconnect Handling - Pessimistic, taken from:
-        http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
-        '''
-        cursor = dbapi_connection.cursor()
-        try:
-            cursor.execute("SELECT 1")
-        except:
-            raise exc.DisconnectionError()
-        cursor.close()
-
-
 @provide_session
 def merge_conn(conn, session=None):
     from airflow import models
@@ -98,22 +80,6 @@ def merge_conn(conn, session=None):
         session.commit()
 
 
-@event.listens_for(settings.engine, "connect")
-def connect(dbapi_connection, connection_record):
-    connection_record.info['pid'] = os.getpid()
-
-
-@event.listens_for(settings.engine, "checkout")
-def checkout(dbapi_connection, connection_record, connection_proxy):
-    pid = os.getpid()
-    if connection_record.info['pid'] != pid:
-        connection_record.connection = connection_proxy.connection = None
-        raise exc.DisconnectionError(
-            "Connection record belongs to pid {}, "
-            "attempting to check out in pid {}".format(connection_record.info['pid'], pid)
-        )
-
-
 def initdb():
     session = settings.Session()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/utils/sqlalchemy.py
----------------------------------------------------------------------
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
new file mode 100644
index 0000000..fa07437
--- /dev/null
+++ b/airflow/utils/sqlalchemy.py
@@ -0,0 +1,108 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import os
+import time
+import random
+
+from sqlalchemy import event, exc, select
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+log = LoggingMixin().log
+
+
+def setup_event_handlers(
+        engine,
+        reconnect_timeout_seconds,
+        initial_backoff_seconds=0.2,
+        max_backoff_seconds=120):
+
+    @event.listens_for(engine, "engine_connect")
+    def ping_connection(connection, branch):
+        """
+        Pessimistic SQLAlchemy disconnect handling. Ensures that each
+        connection returned from the pool is properly connected to the database.
+
+        http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic
+        """
+        if branch:
+            # "branch" refers to a sub-connection of a connection,
+            # we don't want to bother pinging on these.
+            return
+
+        start = time.time()
+        backoff = initial_backoff_seconds
+
+        # turn off "close with result".  This flag is only used with
+        # "connectionless" execution, otherwise will be False in any case
+        save_should_close_with_result = connection.should_close_with_result
+
+        while True:
+            connection.should_close_with_result = False
+
+            try:
+                connection.scalar(select([1]))
+                # If we made it here then the connection appears to be healty
+                break
+            except exc.DBAPIError as err:
+                if time.time() - start >= reconnect_timeout_seconds:
+                    log.error(
+                        "Failed to re-establish DB connection within %s secs: %s",
+                        reconnect_timeout_seconds,
+                        err)
+                    raise
+                if err.connection_invalidated:
+                    log.warning("DB connection invalidated. Reconnecting...")
+
+                    # Use a truncated binary exponential backoff. Also includes
+                    # a jitter to prevent the thundering herd problem of
+                    # simultaneous client reconnects
+                    backoff += backoff * random.random()
+                    time.sleep(min(backoff, max_backoff_seconds))
+
+                    # run the same SELECT again - the connection will re-validate
+                    # itself and establish a new connection.  The disconnect detection
+                    # here also causes the whole connection pool to be invalidated
+                    # so that all stale connections are discarded.
+                    continue
+                else:
+                    log.error(
+                        "Unknown database connection error. Not retrying: %s",
+                        err)
+                    raise
+            finally:
+                # restore "close with result"
+                connection.should_close_with_result = save_should_close_with_result
+
+
+    @event.listens_for(engine, "connect")
+    def connect(dbapi_connection, connection_record):
+        connection_record.info['pid'] = os.getpid()
+
+
+    @event.listens_for(engine, "checkout")
+    def checkout(dbapi_connection, connection_record, connection_proxy):
+        pid = os.getpid()
+        if connection_record.info['pid'] != pid:
+            connection_record.connection = connection_proxy.connection = None
+            raise exc.DisconnectionError(
+                "Connection record belongs to pid {}, "
+                "attempting to check out in pid {}".format(connection_record.info['pid'], pid)
+            )