You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/12/02 15:22:59 UTC

[1/5] incubator-airflow git commit: [AIRFLOW-1559] Make database pooling optional

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 406d738b1 -> 1359d8735


[AIRFLOW-1559] Make database pooling optional

In situations where a database is heavily loaded with connections it
can be beneficial for operators to (temporarily) reduce the connection
footprint of Airflow on the database. This is particularly important
when Airflow or self-made extensions do not dispose the connection
pool when terminating.

Disabling the connection pool comes with a slowdown but that may be
acceptable in many deployment scenarios.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3bde95e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3bde95e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3bde95e5

Branch: refs/heads/master
Commit: 3bde95e599b56af9bb98caf66ac8248d6b7b9094
Parents: 0211219
Author: Stephan Erb <st...@blue-yonder.com>
Authored: Fri Nov 24 20:40:10 2017 +0100
Committer: Stephan Erb <st...@blue-yonder.com>
Committed: Wed Nov 29 08:50:34 2017 +0100

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg | 5 ++++-
 airflow/settings.py                          | 4 +++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bde95e5/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index a339673..7a8ddf2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -67,8 +67,11 @@ executor = SequentialExecutor
 # their website
 sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db
 
+# If SqlAlchemy should pool database connections.
+sql_alchemy_pool_enabled = True
+
 # The SqlAlchemy pool size is the maximum number of database connections
-# in the pool.
+# in the pool. 0 indicates no limit.
 sql_alchemy_pool_size = 5
 
 # The SqlAlchemy pool recycle is the number of seconds a connection

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bde95e5/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 39342df..5559646 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -139,7 +139,9 @@ def configure_orm(disable_connection_pool=False):
     global engine
     global Session
     engine_args = {}
-    if disable_connection_pool:
+
+    pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED')
+    if disable_connection_pool or not pool_connections:
         engine_args['poolclass'] = NullPool
     elif 'sqlite' not in SQL_ALCHEMY_CONN:
         # Engine args not supported by sqlite


[2/5] incubator-airflow git commit: [AIRFLOW-1559] Close file handles in subprocesses

Posted by bo...@apache.org.
[AIRFLOW-1559] Close file handles in subprocesses

All file descriptors except 0, 1 and 2 will be closed before the
child process is executed. This is the default on Python 3.2 and
above. This patch ensures consistent behaviour for older Python
versions.

Resources will be released once the main thread disposes
them, independent of the longevity of its subprocesses.

Background information:

* https://www.python.org/dev/peps/pep-0446/
* https://bugs.python.org/issue7213


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5a303ebb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5a303ebb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5a303ebb

Branch: refs/heads/master
Commit: 5a303ebbc572cee7c9c30be84ebf625357360d4b
Parents: 3bde95e
Author: Stephan Erb <st...@blue-yonder.com>
Authored: Sat Nov 25 21:16:36 2017 +0100
Committer: Stephan Erb <st...@blue-yonder.com>
Committed: Wed Nov 29 09:46:42 2017 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py                              |  8 ++++----
 airflow/configuration.py                        |  5 ++++-
 airflow/contrib/hooks/gcp_dataflow_hook.py      |  8 ++++++--
 airflow/contrib/hooks/ssh_hook.py               |  3 ++-
 airflow/executors/celery_executor.py            |  2 +-
 airflow/executors/dask_executor.py              |  2 +-
 airflow/executors/local_executor.py             |  2 +-
 airflow/executors/sequential_executor.py        |  2 +-
 airflow/hooks/hive_hooks.py                     |  3 ++-
 airflow/hooks/pig_hook.py                       |  3 ++-
 airflow/minihivecluster.py                      |  3 ++-
 airflow/operators/python_operator.py            |  4 +++-
 airflow/operators/s3_file_transform_operator.py |  2 +-
 airflow/security/kerberos.py                    |  2 +-
 airflow/task_runner/base_task_runner.py         | 11 +++++++----
 airflow/utils/helpers.py                        |  2 +-
 16 files changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 6d01293..4e56d54 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -771,7 +771,7 @@ def webserver(args):
                 },
             )
             with ctx:
-                subprocess.Popen(run_args)
+                subprocess.Popen(run_args, close_fds=True)
 
                 # Reading pid file directly, since Popen#pid doesn't
                 # seem to return the right value with DaemonContext.
@@ -790,7 +790,7 @@ def webserver(args):
             stdout.close()
             stderr.close()
         else:
-            gunicorn_master_proc = subprocess.Popen(run_args)
+            gunicorn_master_proc = subprocess.Popen(run_args, close_fds=True)
 
             signal.signal(signal.SIGINT, kill_proc)
             signal.signal(signal.SIGTERM, kill_proc)
@@ -881,7 +881,7 @@ def worker(args):
             stderr=stderr,
         )
         with ctx:
-            sp = subprocess.Popen(['airflow', 'serve_logs'], env=env)
+            sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
             worker.run(**options)
             sp.kill()
 
@@ -891,7 +891,7 @@ def worker(args):
         signal.signal(signal.SIGINT, sigint_handler)
         signal.signal(signal.SIGTERM, sigint_handler)
 
-        sp = subprocess.Popen(['airflow', 'serve_logs'], env=env)
+        sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
 
         worker.run(**options)
         sp.kill()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index ff81d98..d61afb7 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -85,7 +85,10 @@ def run_command(command):
     Runs command and returns stdout
     """
     process = subprocess.Popen(
-        shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        shlex.split(command),
+        stdout=subprocess.PIPE,
+        stderr=subprocess.PIPE,
+        close_fds=True)
     output, stderr = [stream.decode(sys.getdefaultencoding(), 'ignore')
                       for stream in process.communicate()]
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index b1e1474..1928c3b 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -91,8 +91,12 @@ class _DataflowJob(LoggingMixin):
 class _Dataflow(LoggingMixin):
     def __init__(self, cmd):
         self.log.info("Running command: %s", ' '.join(cmd))
-        self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
-                                      stderr=subprocess.PIPE)
+        self._proc = subprocess.Popen(
+            cmd,
+            shell=False,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            close_fds=True)
 
     def _line(self, fd):
         if fd == self._proc.stderr.fileno():

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/contrib/hooks/ssh_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py
index a85911b..4b60405 100755
--- a/airflow/contrib/hooks/ssh_hook.py
+++ b/airflow/contrib/hooks/ssh_hook.py
@@ -202,7 +202,8 @@ class SSHHook(BaseHook, LoggingMixin):
 
         proc = subprocess.Popen(ssh_cmd,
                                 stdin=subprocess.PIPE,
-                                stdout=subprocess.PIPE)
+                                stdout=subprocess.PIPE,
+                                close_fds=True)
         ready = proc.stdout.read(5)
         assert ready == b"ready", \
             "Did not get 'ready' from remote, got '{0}' instead".format(ready)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index d3809b3..4827f03 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -49,7 +49,7 @@ def execute_command(command):
     log = LoggingMixin().log
     log.info("Executing command in Celery: %s", command)
     try:
-        subprocess.check_call(command, shell=True)
+        subprocess.check_call(command, shell=True, close_fds=True)
     except subprocess.CalledProcessError as e:
         log.error(e)
         raise AirflowException('Celery command failed')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index 07b8a82..70edbe5 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -44,7 +44,7 @@ class DaskExecutor(BaseExecutor):
             )
 
         def airflow_run():
-            return subprocess.check_call(command, shell=True)
+            return subprocess.check_call(command, shell=True, close_fds=True)
 
         future = self.client.submit(airflow_run, pure=False)
         self.futures[future] = key

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 71bee22..1d18d6d 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -79,7 +79,7 @@ class LocalWorker(multiprocessing.Process, LoggingMixin):
         self.log.info("%s running %s", self.__class__.__name__, command)
         command = "exec bash -c '{0}'".format(command)
         try:
-            subprocess.check_call(command, shell=True)
+            subprocess.check_call(command, shell=True, close_fds=True)
             state = State.SUCCESS
         except subprocess.CalledProcessError as e:
             state = State.FAILED

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/executors/sequential_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py
index a15450d..ec51694 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -40,7 +40,7 @@ class SequentialExecutor(BaseExecutor):
             self.log.info("Executing command: %s", command)
 
             try:
-                subprocess.check_call(command, shell=True)
+                subprocess.check_call(command, shell=True, close_fds=True)
                 self.change_state(key, State.SUCCESS)
             except subprocess.CalledProcessError as e:
                 self.change_state(key, State.FAILED)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index a8f9c8f..eb39469 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -204,7 +204,8 @@ class HiveCliHook(BaseHook):
                     hive_cmd,
                     stdout=subprocess.PIPE,
                     stderr=subprocess.STDOUT,
-                    cwd=tmp_dir)
+                    cwd=tmp_dir,
+                    close_fds=True)
                 self.sp = sp
                 stdout = ''
                 while True:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/hooks/pig_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py
index 276b37a..e7e6a68 100644
--- a/airflow/hooks/pig_hook.py
+++ b/airflow/hooks/pig_hook.py
@@ -67,7 +67,8 @@ class PigCliHook(BaseHook):
                     pig_cmd,
                     stdout=subprocess.PIPE,
                     stderr=subprocess.STDOUT,
-                    cwd=tmp_dir)
+                    cwd=tmp_dir,
+                    close_fds=True)
                 self.sp = sp
                 stdout = ''
                 for line in iter(sp.stdout.readline, ''):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/minihivecluster.py
----------------------------------------------------------------------
diff --git a/airflow/minihivecluster.py b/airflow/minihivecluster.py
index b975f27..c5441c6 100644
--- a/airflow/minihivecluster.py
+++ b/airflow/minihivecluster.py
@@ -30,7 +30,8 @@ class MiniHiveCluster(object):
         cmd = ["java", "-cp", classpath, self._minicluster_class]
 
         self.hive = subprocess.Popen(cmd, bufsize=0, stdout=subprocess.PIPE,
-                                     stderr=subprocess.PIPE, universal_newlines=True)
+                                     stderr=subprocess.PIPE, universal_newlines=True,
+                                     close_fds=True)
 
     def terminate(self):
         self.hive.terminate()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py
index 18e7bce..b20afae 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -275,7 +275,9 @@ class PythonVirtualenvOperator(PythonOperator):
     def _execute_in_subprocess(self, cmd):
         try:
             self.log.info("Executing cmd\n{}".format(cmd))
-            output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+            output = subprocess.check_output(cmd,
+                                             stderr=subprocess.STDOUT,
+                                             close_fds=True)
             if output:
                 self.log.info("Got output\n{}".format(output))
         except subprocess.CalledProcessError as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/operators/s3_file_transform_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py
index e105e3d..a27a782 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -88,7 +88,7 @@ class S3FileTransformOperator(BaseOperator):
             source_s3.connection.close()
             transform_script_process = subprocess.Popen(
                 [self.transform_script, f_source.name, f_dest.name],
-                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+                stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
             (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate()
             self.log.info("Transform script stdout %s", transform_script_stdoutdata)
             if transform_script_process.returncode > 0:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/security/kerberos.py
----------------------------------------------------------------------
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index 7a169b2..55566a0 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -73,7 +73,7 @@ def perform_krb181_workaround():
     log.info("Renewing kerberos ticket to work around kerberos 1.8.1: " +
              " ".join(cmdv))
 
-    ret = subprocess.call(cmdv)
+    ret = subprocess.call(cmdv, close_fds=True)
 
     if ret != 0:
         principal = "%s/%s" % (configuration.get('kerberos', 'principal'), socket.getfqdn())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py
index f4b4f2d..2fa33a9 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -66,10 +66,12 @@ class BaseTaskRunner(LoggingMixin):
 
             # Give ownership of file to user; only they can read and write
             subprocess.call(
-                ['sudo', 'chown', self.run_as_user, cfg_path]
+                ['sudo', 'chown', self.run_as_user, cfg_path],
+                close_fds=True
             )
             subprocess.call(
-                ['sudo', 'chmod', '600', cfg_path]
+                ['sudo', 'chmod', '600', cfg_path],
+                close_fds=True
             )
 
             with os.fdopen(temp_fd, 'w') as temp_file:
@@ -117,7 +119,8 @@ class BaseTaskRunner(LoggingMixin):
             full_cmd,
             stdout=subprocess.PIPE,
             stderr=subprocess.STDOUT,
-            universal_newlines=True
+            universal_newlines=True,
+            close_fds=True,
         )
 
         # Start daemon thread to read subprocess logging output
@@ -154,4 +157,4 @@ class BaseTaskRunner(LoggingMixin):
         A callback that should be called when this is done running.
         """
         if self._cfg_path and os.path.isfile(self._cfg_path):
-            subprocess.call(['sudo', 'rm', self._cfg_path])
+            subprocess.call(['sudo', 'rm', self._cfg_path], close_fds=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 4b8944e..6a70725 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -192,7 +192,7 @@ def kill_using_shell(logger, pid, signal=signal.SIGTERM):
         else:
             args = ["kill", "-{}".format(int(signal)), str(pid)]
         # PID may not exist and return a non-zero error code
-        logger.error(subprocess.check_output(args))
+        logger.error(subprocess.check_output(args, close_fds=True))
         logger.info("Killed process {} with signal {}".format(pid, signal))
         return True
     except psutil.NoSuchProcess as e:


[3/5] incubator-airflow git commit: [AIRFLOW-1559] Dispose SQLAlchemy engines on exit

Posted by bo...@apache.org.
[AIRFLOW-1559] Dispose SQLAlchemy engines on exit

When a forked process or the entire interpreter terminates, we have
to close all pooled database connections. The database can run out
of connections otherwise. At a minimum, it will print errors in its
log file.

By using an atexit handler we ensure that connections are closed
for each interpreter and Gunicorn worker termination. Only usages
of multiprocessing.Process require special handling as those
terminate via os._exit() which does not run finalizers.

This commit is based on a contribution by @dhuang
https://github.com/apache/incubator-airflow/pull/2767


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6bf1a6ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6bf1a6ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6bf1a6ed

Branch: refs/heads/master
Commit: 6bf1a6edaf13d3e255c47488f2747a2b8ebeff6c
Parents: 5a303eb
Author: Stephan Erb <st...@blue-yonder.com>
Authored: Sat Nov 25 22:21:28 2017 +0100
Committer: Stephan Erb <st...@blue-yonder.com>
Committed: Wed Nov 29 09:49:57 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py     |  3 +++
 airflow/settings.py | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6bf1a6ed/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 868e785..91ab96c 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -384,6 +384,9 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
             finally:
                 sys.stdout = sys.__stdout__
                 sys.stderr = sys.__stderr__
+                # We re-initialized the ORM within this Process above so we need to
+                # tear it down manually here
+                settings.dispose_orm()
 
         p = multiprocessing.Process(target=helper,
                                     args=(),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6bf1a6ed/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 5559646..04d3548 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -17,6 +17,7 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
+import atexit
 import logging
 import os
 import pendulum
@@ -136,6 +137,7 @@ def configure_vars():
 
 
 def configure_orm(disable_connection_pool=False):
+    log.debug("Setting up DB connection pool (PID %s)" % os.getpid())
     global engine
     global Session
     engine_args = {}
@@ -154,6 +156,20 @@ def configure_orm(disable_connection_pool=False):
         sessionmaker(autocommit=False, autoflush=False, bind=engine))
 
 
+def dispose_orm():
+    """ Properly close pooled database connections """
+    log.debug("Disposing DB connection pool (PID %s)", os.getpid())
+    global engine
+    global Session
+
+    if Session:
+        Session.remove()
+        Session = None
+    if engine:
+        engine.dispose()
+        engine = None
+
+
 def configure_adapters():
     from pendulum import Pendulum
     try:
@@ -180,6 +196,9 @@ configure_vars()
 configure_adapters()
 configure_orm()
 
+# Ensure we close DB connections at scheduler and gunicon worker terminations
+atexit.register(dispose_orm)
+
 # Const stuff
 
 KILOBYTE = 1024


[5/5] incubator-airflow git commit: Merge pull request #2822 from StephanErb/db_robustness

Posted by bo...@apache.org.
Merge pull request #2822 from StephanErb/db_robustness


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1359d873
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1359d873
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1359d873

Branch: refs/heads/master
Commit: 1359d87352bda220f5d88613fd81904378624c7b
Parents: 406d738 94deac3
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Dec 2 16:22:50 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Dec 2 16:22:50 2017 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py                              |   9 +-
 airflow/config_templates/default_airflow.cfg    |   9 +-
 airflow/configuration.py                        |   5 +-
 airflow/contrib/hooks/gcp_dataflow_hook.py      |   8 +-
 airflow/contrib/hooks/ssh_hook.py               |   3 +-
 airflow/executors/celery_executor.py            |   2 +-
 airflow/executors/dask_executor.py              |   2 +-
 airflow/executors/local_executor.py             |   2 +-
 airflow/executors/sequential_executor.py        |   2 +-
 airflow/hooks/hive_hooks.py                     |   3 +-
 airflow/hooks/pig_hook.py                       |   3 +-
 airflow/jobs.py                                 |   7 +-
 airflow/minihivecluster.py                      |   3 +-
 airflow/operators/python_operator.py            |   4 +-
 airflow/operators/s3_file_transform_operator.py |   2 +-
 airflow/security/kerberos.py                    |   2 +-
 airflow/settings.py                             |  27 ++++-
 airflow/task_runner/base_task_runner.py         |  11 +-
 airflow/utils/db.py                             |  34 ------
 airflow/utils/helpers.py                        |   2 +-
 airflow/utils/sqlalchemy.py                     | 108 +++++++++++++++++++
 21 files changed, 185 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1359d873/airflow/bin/cli.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1359d873/airflow/jobs.py
----------------------------------------------------------------------
diff --cc airflow/jobs.py
index 0ff7819,ed6da60..28c5980
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@@ -54,10 -54,9 +54,9 @@@ from airflow.utils.dag_processing impor
                                            SimpleDag,
                                            SimpleDagBag,
                                            list_py_file_paths)
- from airflow.utils.db import (
-     create_session, provide_session, pessimistic_connection_handling)
+ from airflow.utils.db import create_session, provide_session
  from airflow.utils.email import send_email
 -from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter
 +from airflow.utils.log.logging_mixin import LoggingMixin, set_context, StreamLogWriter
  from airflow.utils.state import State
  
  Base = models.Base

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1359d873/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------


[4/5] incubator-airflow git commit: [AIRFLOW-1665] Reconnect on database errors

Posted by bo...@apache.org.
[AIRFLOW-1665] Reconnect on database errors

This change enables the scheduler to recover from temporary database
errors and downtimes. The same holds true for the webserver if run
without its regular worker refresh.

The reconnect logic is based on a truncated exponential binary backoff
to ensure reconnect attempts don't overload the database.

Included changes:

* Switch to recommended pessimistic disconnect handling for engines
  http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic
* Remove legacy pool-based disconnect handling.
* Ensure event handlers are registered for each newly created engine.
  Engines are re-initialized in child processes so this is crucial for
  correctness.

This commit is based on a contribution by @vklogin
https://github.com/apache/incubator-airflow/pull/2744


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/94deac34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/94deac34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/94deac34

Branch: refs/heads/master
Commit: 94deac34eca869a0accbc6affe7640b09dab1530
Parents: 6bf1a6e
Author: Stephan Erb <st...@blue-yonder.com>
Authored: Sun Nov 26 21:28:12 2017 +0100
Committer: Stephan Erb <st...@blue-yonder.com>
Committed: Wed Nov 29 12:29:47 2017 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py                           |   1 -
 airflow/config_templates/default_airflow.cfg |   4 +
 airflow/jobs.py                              |   4 +-
 airflow/settings.py                          |   4 +
 airflow/utils/db.py                          |  34 -------
 airflow/utils/sqlalchemy.py                  | 108 ++++++++++++++++++++++
 6 files changed, 117 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 4e56d54..c0ab5e3 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -328,7 +328,6 @@ def run(args, dag=None):
     # while it's waiting for the task to finish.
     settings.configure_orm(disable_connection_pool=True)
 
-    db_utils.pessimistic_connection_handling()
     if dag:
         args.dag_id = dag.dag_id
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 7a8ddf2..32af0a3 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -79,6 +79,10 @@ sql_alchemy_pool_size = 5
 # not apply to sqlite.
 sql_alchemy_pool_recycle = 3600
 
+# How many seconds to retry re-establishing a DB connection after
+# disconnects. Setting this to 0 disables retries.
+sql_alchemy_reconnect_timeout = 300
+
 # The amount of parallelism as a setting to the executor. This defines
 # the max number of task instances that should run simultaneously
 # on this airflow installation

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 91ab96c..ed6da60 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -54,8 +54,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
                                           SimpleDag,
                                           SimpleDagBag,
                                           list_py_file_paths)
-from airflow.utils.db import (
-    create_session, provide_session, pessimistic_connection_handling)
+from airflow.utils.db import create_session, provide_session
 from airflow.utils.email import send_email
 from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter
 from airflow.utils.state import State
@@ -1508,7 +1507,6 @@ class SchedulerJob(BaseJob):
 
     def _execute(self):
         self.log.info("Starting the scheduler")
-        pessimistic_connection_handling()
 
         # DAGs can be pickled for easier remote execution by some executors
         pickle_dags = False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 04d3548..929663b 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -28,6 +28,7 @@ from sqlalchemy.pool import NullPool
 
 from airflow import configuration as conf
 from airflow.logging_config import configure_logging
+from airflow.utils.sqlalchemy import setup_event_handlers
 
 log = logging.getLogger(__name__)
 
@@ -152,6 +153,9 @@ def configure_orm(disable_connection_pool=False):
                                                   'SQL_ALCHEMY_POOL_RECYCLE')
 
     engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
+    reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT')
+    setup_event_handlers(engine, reconnect_timeout)
+
     Session = scoped_session(
         sessionmaker(autocommit=False, autoflush=False, bind=engine))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 9c924d1..64ce220 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -22,9 +22,6 @@ from functools import wraps
 import os
 import contextlib
 
-from sqlalchemy import event, exc
-from sqlalchemy.pool import Pool
-
 from airflow import settings
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -74,21 +71,6 @@ def provide_session(func):
     return wrapper
 
 
-def pessimistic_connection_handling():
-    @event.listens_for(Pool, "checkout")
-    def ping_connection(dbapi_connection, connection_record, connection_proxy):
-        '''
-        Disconnect Handling - Pessimistic, taken from:
-        http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
-        '''
-        cursor = dbapi_connection.cursor()
-        try:
-            cursor.execute("SELECT 1")
-        except:
-            raise exc.DisconnectionError()
-        cursor.close()
-
-
 @provide_session
 def merge_conn(conn, session=None):
     from airflow import models
@@ -98,22 +80,6 @@ def merge_conn(conn, session=None):
         session.commit()
 
 
-@event.listens_for(settings.engine, "connect")
-def connect(dbapi_connection, connection_record):
-    connection_record.info['pid'] = os.getpid()
-
-
-@event.listens_for(settings.engine, "checkout")
-def checkout(dbapi_connection, connection_record, connection_proxy):
-    pid = os.getpid()
-    if connection_record.info['pid'] != pid:
-        connection_record.connection = connection_proxy.connection = None
-        raise exc.DisconnectionError(
-            "Connection record belongs to pid {}, "
-            "attempting to check out in pid {}".format(connection_record.info['pid'], pid)
-        )
-
-
 def initdb():
     session = settings.Session()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/utils/sqlalchemy.py
----------------------------------------------------------------------
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
new file mode 100644
index 0000000..fa07437
--- /dev/null
+++ b/airflow/utils/sqlalchemy.py
@@ -0,0 +1,108 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import os
+import time
+import random
+
+from sqlalchemy import event, exc, select
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+log = LoggingMixin().log
+
+
+def setup_event_handlers(
+        engine,
+        reconnect_timeout_seconds,
+        initial_backoff_seconds=0.2,
+        max_backoff_seconds=120):
+
+    @event.listens_for(engine, "engine_connect")
+    def ping_connection(connection, branch):
+        """
+        Pessimistic SQLAlchemy disconnect handling. Ensures that each
+        connection returned from the pool is properly connected to the database.
+
+        http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic
+        """
+        if branch:
+            # "branch" refers to a sub-connection of a connection,
+            # we don't want to bother pinging on these.
+            return
+
+        start = time.time()
+        backoff = initial_backoff_seconds
+
+        # turn off "close with result".  This flag is only used with
+        # "connectionless" execution, otherwise will be False in any case
+        save_should_close_with_result = connection.should_close_with_result
+
+        while True:
+            connection.should_close_with_result = False
+
+            try:
+                connection.scalar(select([1]))
+                # If we made it here then the connection appears to be healty
+                break
+            except exc.DBAPIError as err:
+                if time.time() - start >= reconnect_timeout_seconds:
+                    log.error(
+                        "Failed to re-establish DB connection within %s secs: %s",
+                        reconnect_timeout_seconds,
+                        err)
+                    raise
+                if err.connection_invalidated:
+                    log.warning("DB connection invalidated. Reconnecting...")
+
+                    # Use a truncated binary exponential backoff. Also includes
+                    # a jitter to prevent the thundering herd problem of
+                    # simultaneous client reconnects
+                    backoff += backoff * random.random()
+                    time.sleep(min(backoff, max_backoff_seconds))
+
+                    # run the same SELECT again - the connection will re-validate
+                    # itself and establish a new connection.  The disconnect detection
+                    # here also causes the whole connection pool to be invalidated
+                    # so that all stale connections are discarded.
+                    continue
+                else:
+                    log.error(
+                        "Unknown database connection error. Not retrying: %s",
+                        err)
+                    raise
+            finally:
+                # restore "close with result"
+                connection.should_close_with_result = save_should_close_with_result
+
+
+    @event.listens_for(engine, "connect")
+    def connect(dbapi_connection, connection_record):
+        connection_record.info['pid'] = os.getpid()
+
+
+    @event.listens_for(engine, "checkout")
+    def checkout(dbapi_connection, connection_record, connection_proxy):
+        pid = os.getpid()
+        if connection_record.info['pid'] != pid:
+            connection_record.connection = connection_proxy.connection = None
+            raise exc.DisconnectionError(
+                "Connection record belongs to pid {}, "
+                "attempting to check out in pid {}".format(connection_record.info['pid'], pid)
+            )