You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/30 10:48:20 UTC

[airflow] branch v1-10-test updated (38dcbdc -> 13e5349)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    omit 38dcbdc  Add docs about reload_on_plugin_change opiton (#9575)
    omit db9f047  [AIRFLOW-6897] Simplify DagFileProcessorManager (#7521)
    omit 386498b  Fix failing tests from #8997 (#9576)
    omit 09ba9c4  Reload gunicorn when plugins has beeen changed (#8997)
     new 0ee254c  Add copy button to Code Blocks in Airflow Docs (#9450)
     new 36f79a9  Reload gunicorn when plugins has beeen changed (#8997)
     new b6307ef  Fix failing tests from #8997 (#9576)
     new 38e2794  [AIRFLOW-6897] Simplify DagFileProcessorManager (#7521)
     new 13e5349  Add docs about reload_on_plugin_change opiton (#9575)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (38dcbdc)
            \
             N -- N -- N   refs/heads/v1-10-test (13e5349)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/conf.py                            |  1 +
 requirements/requirements-python2.7.txt | 44 ++++++++++++------------
 requirements/requirements-python3.5.txt | 54 +++++++++++++++---------------
 requirements/requirements-python3.6.txt | 59 +++++++++++++++++----------------
 requirements/requirements-python3.7.txt | 59 +++++++++++++++++----------------
 requirements/setup-2.7.md5              |  2 +-
 requirements/setup-3.5.md5              |  2 +-
 requirements/setup-3.6.md5              |  2 +-
 requirements/setup-3.7.md5              |  2 +-
 setup.py                                |  4 ++-
 10 files changed, 117 insertions(+), 112 deletions(-)


[airflow] 04/05: [AIRFLOW-6897] Simplify DagFileProcessorManager (#7521)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 38e27948289860e53e51b60b74bd2d0310a4076a
Author: Kamil Breguła <mi...@users.noreply.github.com>
AuthorDate: Mon Feb 24 16:15:38 2020 +0100

    [AIRFLOW-6897] Simplify DagFileProcessorManager (#7521)
    
    (cherry picked from commit 83d826b9925ce0eb2bd1fe403f5151fbef310b63)
---
 airflow/utils/dag_processing.py | 129 ++++++++++++++++++++--------------------
 1 file changed, 63 insertions(+), 66 deletions(-)

diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 3aac8fd..e4ecc29 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -763,7 +763,7 @@ class DagFileProcessorManager(LoggingMixin):
         # Map from file path to the processor
         self._processors = {}
 
-        self._heartbeat_count = 0
+        self._num_run = 0
 
         # Map from file path to stats about the file
         self._file_stats = {}  # type: dict(str, DagFileStat)
@@ -843,11 +843,24 @@ class DagFileProcessorManager(LoggingMixin):
                 # are told to (as that would open another connection to the
                 # SQLite DB which isn't a good practice
                 continue
-
+            # pylint: enable=no-else-break
             self._refresh_dag_dir()
-            self._find_zombies()
+            self._find_zombies()  # pylint: disable=no-value-for-parameter
+
+            self._kill_timed_out_processors()
+            simple_dags = self.collect_results()
+
+            # Generate more file paths to process if we processed all the files
+            # already.
+            if not self._file_path_queue:
+                self.emit_metrics()
+                self.prepare_file_path_queue()
+
+            self.start_new_processes()
+
+            # Update number of loop iteration.
+            self._num_run += 1
 
-            simple_dags = self.heartbeat()
             for simple_dag in simple_dags:
                 self._signal_conn.send(simple_dag)
 
@@ -1197,65 +1210,11 @@ class DagFileProcessorManager(LoggingMixin):
 
         return simple_dags
 
-    def heartbeat(self):
-        """
-        This should be periodically called by the manager loop. This method will
-        kick off new processes to process DAG definition files and read the
-        results from the finished processors.
-
-        :return: a list of SimpleDags that were produced by processors that
-            have finished since the last time this was called
-        :rtype: list[airflow.utils.dag_processing.SimpleDag]
+    def start_new_processes(self):
+        """"
+        Start more processors if we have enough slots and files to process
         """
-        simple_dags = self.collect_results()
-
-        # Generate more file paths to process if we processed all the files
-        # already.
-        if len(self._file_path_queue) == 0:
-            self.emit_metrics()
-
-            self._parsing_start_time = timezone.utcnow()
-            # If the file path is already being processed, or if a file was
-            # processed recently, wait until the next batch
-            file_paths_in_progress = self._processors.keys()
-            now = timezone.utcnow()
-            file_paths_recently_processed = []
-            for file_path in self._file_paths:
-                last_finish_time = self.get_last_finish_time(file_path)
-                if (last_finish_time is not None and
-                    (now - last_finish_time).total_seconds() <
-                        self._file_process_interval):
-                    file_paths_recently_processed.append(file_path)
-
-            files_paths_at_run_limit = [file_path
-                                        for file_path, stat in self._file_stats.items()
-                                        if stat.run_count == self._max_runs]
-
-            files_paths_to_queue = list(set(self._file_paths) -
-                                        set(file_paths_in_progress) -
-                                        set(file_paths_recently_processed) -
-                                        set(files_paths_at_run_limit))
-
-            for file_path, processor in self._processors.items():
-                self.log.debug(
-                    "File path %s is still being processed (started: %s)",
-                    processor.file_path, processor.start_time.isoformat()
-                )
-
-            self.log.debug(
-                "Queuing the following files for processing:\n\t%s",
-                "\n\t".join(files_paths_to_queue)
-            )
-
-            for file_path in files_paths_to_queue:
-                if file_path not in self._file_stats:
-                    self._file_stats[file_path] = DagFileStat(0, 0, None, None, 0)
-
-            self._file_path_queue.extend(files_paths_to_queue)
-
-        # Start more processors if we have enough slots and files to process
-        while (self._parallelism - len(self._processors) > 0 and
-               len(self._file_path_queue) > 0):
+        while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
             file_path = self._file_path_queue.pop(0)
             processor = self._processor_factory(file_path, self._zombies)
             Stats.incr('dag_processing.processes')
@@ -1267,10 +1226,48 @@ class DagFileProcessorManager(LoggingMixin):
             )
             self._processors[file_path] = processor
 
-        # Update heartbeat count.
-        self._heartbeat_count += 1
+    def prepare_file_path_queue(self):
+        """
+        Generate more file paths to process. Result are saved in _file_path_queue.
+        """
+        self._parsing_start_time = timezone.utcnow()
+        # If the file path is already being processed, or if a file was
+        # processed recently, wait until the next batch
+        file_paths_in_progress = self._processors.keys()
+        now = timezone.utcnow()
+        file_paths_recently_processed = []
+        for file_path in self._file_paths:
+            last_finish_time = self.get_last_finish_time(file_path)
+            if (last_finish_time is not None and
+                (now - last_finish_time).total_seconds() <
+                    self._file_process_interval):
+                file_paths_recently_processed.append(file_path)
+
+        files_paths_at_run_limit = [file_path
+                                    for file_path, stat in self._file_stats.items()
+                                    if stat.run_count == self._max_runs]
+
+        files_paths_to_queue = list(set(self._file_paths) -
+                                    set(file_paths_in_progress) -
+                                    set(file_paths_recently_processed) -
+                                    set(files_paths_at_run_limit))
 
-        return simple_dags
+        for file_path, processor in self._processors.items():
+            self.log.debug(
+                "File path %s is still being processed (started: %s)",
+                processor.file_path, processor.start_time.isoformat()
+            )
+
+        self.log.debug(
+            "Queuing the following files for processing:\n\t%s",
+            "\n\t".join(files_paths_to_queue)
+        )
+
+        for file_path in files_paths_to_queue:
+            if file_path not in self._file_stats:
+                self._file_stats[file_path] = DagFileStat(0, 0, None, None, 0)
+
+        self._file_path_queue.extend(files_paths_to_queue)
 
     @provide_session
     def _find_zombies(self, session):
@@ -1338,7 +1335,7 @@ class DagFileProcessorManager(LoggingMixin):
         for stat in self._file_stats.values():
             if stat.run_count < self._max_runs:
                 return False
-        if self._heartbeat_count < self._max_runs:
+        if self._num_run < self._max_runs:
             return False
         return True
 


[airflow] 03/05: Fix failing tests from #8997 (#9576)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b6307eff8fc38c72714fa1d5784bbdc3640641ea
Author: Kamil Breguła <mi...@users.noreply.github.com>
AuthorDate: Mon Jun 29 21:02:44 2020 +0200

    Fix failing tests from #8997 (#9576)
    
    (cherry picked from commit 9858294f7252e626bec7b701280e81d4f27df452)
---
 airflow/bin/cli.py                           | 4 +++-
 airflow/config_templates/config.yml          | 4 ++--
 airflow/config_templates/default_airflow.cfg | 4 ++--
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index b81c7b1..ba19561 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1135,7 +1135,9 @@ def webserver(args):
                 master_timeout=conf.getint('webserver', 'web_server_master_timeout'),
                 worker_refresh_interval=conf.getint('webserver', 'worker_refresh_interval', fallback=10),
                 worker_refresh_batch_size=conf.getint('webserver', 'worker_refresh_batch_size', fallback=1),
-                reload_on_plugin_change=conf.getint('webserver', 'reload_on_plugin_change', fallback=1),
+                reload_on_plugin_change=conf.getboolean(
+                    'webserver', 'reload_on_plugin_change', fallback=False
+                ),
             ).start()
 
         if args.daemon:
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 1f697b1..f632cd5 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -701,12 +701,12 @@
       default: "30"
     - name: reload_on_plugin_change
       description: |
-        If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+        If set to True, Airflow will track files in plugins_folder directory. When it detects changes,
         then reload the gunicorn.
       version_added: ~
       type: boolean
       example: ~
-      default: False
+      default: "False"
     - name: secret_key
       description: |
         Secret key used to run your flask app
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index cfc92ad..a061d46 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -341,9 +341,9 @@ worker_refresh_batch_size = 1
 # Number of seconds to wait before refreshing a batch of workers.
 worker_refresh_interval = 30
 
-# If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+# If set to True, Airflow will track files in plugins_folder directory. When it detects changes,
 # then reload the gunicorn.
-reload_on_plugin_change =
+reload_on_plugin_change = False
 
 # Secret key used to run your flask app
 # It should be as random as possible


[airflow] 02/05: Reload gunicorn when plugins has beeen changed (#8997)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 36f79a9b227992f2efd6dca0ae7d4ad15dc8d2e6
Author: Kamil Breguła <ka...@polidea.com>
AuthorDate: Mon Jun 29 20:38:26 2020 +0200

    Reload gunicorn when plugins has beeen changed (#8997)
    
    (cherry picked from commit 1c48ffbe25c3e304660b7e75a49e88bd114dde46)
---
 airflow/bin/cli.py                           | 350 +++++++++++++++++++--------
 airflow/config_templates/config.yml          |   8 +
 airflow/config_templates/default_airflow.cfg |   4 +
 tests/cli/test_cli.py                        | 233 +++++++++++++++---
 4 files changed, 452 insertions(+), 143 deletions(-)

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 8b5522c..b81c7b1 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -20,6 +20,7 @@
 
 from __future__ import print_function
 import errno
+import hashlib
 import importlib
 import locale
 import logging
@@ -777,31 +778,11 @@ def clear(args):
     )
 
 
-def get_num_ready_workers_running(gunicorn_master_proc):
-    workers = psutil.Process(gunicorn_master_proc.pid).children()
-
-    def ready_prefix_on_cmdline(proc):
-        try:
-            cmdline = proc.cmdline()
-            if len(cmdline) > 0:
-                return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
-        except psutil.NoSuchProcess:
-            pass
-        return False
-
-    ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
-    return len(ready_workers)
-
-
-def get_num_workers_running(gunicorn_master_proc):
-    workers = psutil.Process(gunicorn_master_proc.pid).children()
-    return len(workers)
-
-
-def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
+class GunicornMonitor(LoggingMixin):
     """
     Runs forever, monitoring the child processes of @gunicorn_master_proc and
-    restarting workers occasionally.
+    restarting workers occasionally or when files in the plug-in directory
+    has been modified.
     Each iteration of the loop traverses one edge of this state transition
     diagram, where each state (node) represents
     [ num_ready_workers_running / num_workers_running ]. We expect most time to
@@ -818,92 +799,245 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
     master process, which increases and decreases the number of child workers
     respectively. Gunicorn guarantees that on TTOU workers are terminated
     gracefully and that the oldest worker is terminated.
+
+    :param gunicorn_master_proc:  handle for the main Gunicorn process
+    :param num_workers_expected:  Number of workers to run the Gunicorn web server
+    :param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
+        doesn't respond
+    :param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
+    :param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker
+        refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by
+        bringing up new ones and killing old ones.
+    :param reload_on_plugin_change: If set to True, Airflow will track files in plugins_follder directory.
+        When it detects changes, then reload the gunicorn.
     """
+    def __init__(
+        self,
+        gunicorn_master_proc,
+        num_workers_expected,
+        master_timeout,
+        worker_refresh_interval,
+        worker_refresh_batch_size,
+        reload_on_plugin_change
+    ):
+        super(GunicornMonitor, self).__init__()
+        self.gunicorn_master_proc = gunicorn_master_proc
+        self.num_workers_expected = num_workers_expected
+        self.master_timeout = master_timeout
+        self.worker_refresh_interval = worker_refresh_interval
+        self.worker_refresh_batch_size = worker_refresh_batch_size
+        self.reload_on_plugin_change = reload_on_plugin_change
+
+        self._num_workers_running = 0
+        self._num_ready_workers_running = 0
+        self._last_refresh_time = time.time() if worker_refresh_interval > 0 else None
+        self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
+        self._restart_on_next_plugin_check = False
+
+    def _generate_plugin_state(self):
+        """
+        Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
+        directory.
+        """
+        if not settings.PLUGINS_FOLDER:
+            return {}
+
+        all_filenames = []
+        for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER):
+            all_filenames.extend(os.path.join(root, f) for f in filenames)
+        plugin_state = {f: self._get_file_hash(f) for f in sorted(all_filenames)}
+        return plugin_state
+
+    @staticmethod
+    def _get_file_hash(fname):
+        """Calculate MD5 hash for file"""
+        hash_md5 = hashlib.md5()
+        with open(fname, "rb") as f:
+            for chunk in iter(lambda: f.read(4096), b""):
+                hash_md5.update(chunk)
+        return hash_md5.hexdigest()
+
+    def _get_num_ready_workers_running(self):
+        """Returns number of ready Gunicorn workers by looking for READY_PREFIX in process name"""
+        workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+
+        def ready_prefix_on_cmdline(proc):
+            try:
+                cmdline = proc.cmdline()
+                if len(cmdline) > 0:  # pylint: disable=len-as-condition
+                    return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
+            except psutil.NoSuchProcess:
+                pass
+            return False
+
+        ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
+        return len(ready_workers)
 
-    def wait_until_true(fn, timeout=0):
+    def _get_num_workers_running(self):
+        """Returns number of running Gunicorn workers processes"""
+        workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+        return len(workers)
+
+    def _wait_until_true(self, fn, timeout=0):
         """
         Sleeps until fn is true
         """
-        t = time.time()
+        start_time = time.time()
         while not fn():
-            if 0 < timeout and timeout <= time.time() - t:
+            if 0 < timeout <= time.time() - start_time:
                 raise AirflowWebServerTimeout(
-                    "No response from gunicorn master within {0} seconds"
-                    .format(timeout))
+                    "No response from gunicorn master within {0} seconds".format(timeout)
+                )
             time.sleep(0.1)
 
-    def start_refresh(gunicorn_master_proc):
-        batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
-        log.debug('%s doing a refresh of %s workers', state, batch_size)
-        sys.stdout.flush()
-        sys.stderr.flush()
-
+    def _spawn_new_workers(self, count):
+        """
+        Send signal to kill the worker.
+        :param count: The number of workers to spawn
+        """
         excess = 0
-        for _ in range(batch_size):
-            gunicorn_master_proc.send_signal(signal.SIGTTIN)
+        for _ in range(count):
+            # TTIN: Increment the number of processes by one
+            self.gunicorn_master_proc.send_signal(signal.SIGTTIN)
             excess += 1
-            wait_until_true(lambda: num_workers_expected + excess ==
-                            get_num_workers_running(gunicorn_master_proc),
-                            master_timeout)
+            self._wait_until_true(
+                lambda: self.num_workers_expected + excess == self._get_num_workers_running(),
+                timeout=self.master_timeout
+            )
 
-    try:
-        wait_until_true(lambda: num_workers_expected ==
-                        get_num_workers_running(gunicorn_master_proc),
-                        master_timeout)
-        while True:
-            num_workers_running = get_num_workers_running(gunicorn_master_proc)
-            num_ready_workers_running = \
-                get_num_ready_workers_running(gunicorn_master_proc)
-
-            state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
-
-            # Whenever some workers are not ready, wait until all workers are ready
-            if num_ready_workers_running < num_workers_running:
-                log.debug('%s some workers are starting up, waiting...', state)
-                sys.stdout.flush()
+    def _kill_old_workers(self, count):
+        """
+        Send signal to kill the worker.
+        :param count: The number of workers to kill
+        """
+        for _ in range(count):
+            count -= 1
+            # TTOU: Decrement the number of processes by one
+            self.gunicorn_master_proc.send_signal(signal.SIGTTOU)
+            self._wait_until_true(
+                lambda: self.num_workers_expected + count == self._get_num_workers_running(),
+                timeout=self.master_timeout)
+
+    def _reload_gunicorn(self):
+        """
+        Send signal to reload the gunciron configuration. When gunciorn receive signals, it reload the
+        configuration, start the new worker processes with a new configuration and gracefully
+        shutdown older workers.
+        """
+        # HUP: Reload the configuration.
+        self.gunicorn_master_proc.send_signal(signal.SIGHUP)
+        time.sleep(1)
+        self._wait_until_true(
+            lambda: self.num_workers_expected == self._get_num_workers_running(),
+            timeout=self.master_timeout
+        )
+
+    def start(self):
+        """
+        Starts monitoring the webserver.
+        """
+        try:  # pylint: disable=too-many-nested-blocks
+            self._wait_until_true(
+                lambda: self.num_workers_expected == self._get_num_workers_running(),
+                timeout=self.master_timeout
+            )
+            while True:
+                if self.gunicorn_master_proc.poll() is not None:
+                    sys.exit(self.gunicorn_master_proc.returncode)
+                self._check_workers()
+                # Throttle loop
                 time.sleep(1)
 
-            # Kill a worker gracefully by asking gunicorn to reduce number of workers
-            elif num_workers_running > num_workers_expected:
-                excess = num_workers_running - num_workers_expected
-                log.debug('%s killing %s workers', state, excess)
-
-                for _ in range(excess):
-                    gunicorn_master_proc.send_signal(signal.SIGTTOU)
-                    excess -= 1
-                    wait_until_true(lambda: num_workers_expected + excess ==
-                                    get_num_workers_running(gunicorn_master_proc),
-                                    master_timeout)
-
-            # Start a new worker by asking gunicorn to increase number of workers
-            elif num_workers_running == num_workers_expected:
-                refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
-                log.debug(
-                    '%s sleeping for %ss starting doing a refresh...',
-                    state, refresh_interval
+        except (AirflowWebServerTimeout, OSError) as err:
+            self.log.error(err)
+            self.log.error("Shutting down webserver")
+            try:
+                self.gunicorn_master_proc.terminate()
+                self.gunicorn_master_proc.wait()
+            finally:
+                sys.exit(1)
+
+    def _check_workers(self):
+        num_workers_running = self._get_num_workers_running()
+        num_ready_workers_running = self._get_num_ready_workers_running()
+
+        # Whenever some workers are not ready, wait until all workers are ready
+        if num_ready_workers_running < num_workers_running:
+            self.log.debug(
+                '[%d / %d] Some workers are starting up, waiting...',
+                num_ready_workers_running, num_workers_running
+            )
+            time.sleep(1)
+            return
+
+        # If there are too many workers, then kill a worker gracefully by asking gunicorn to reduce
+        # number of workers
+        if num_workers_running > self.num_workers_expected:
+            excess = min(num_workers_running - self.num_workers_expected, self.worker_refresh_batch_size)
+            self.log.debug(
+                '[%d / %d] Killing %s workers', num_ready_workers_running, num_workers_running, excess
+            )
+            self._kill_old_workers(excess)
+            return
+
+        # If there are too few workers, start a new worker by asking gunicorn
+        # to increase number of workers
+        if num_workers_running < self.num_workers_expected:
+            self.log.error(
+                "[%d / %d] Some workers seem to have died and gunicorn did not restart "
+                "them as expected",
+                num_ready_workers_running, num_workers_running
+            )
+            time.sleep(10)
+            num_workers_running = self._get_num_workers_running()
+            if num_workers_running < self.num_workers_expected:
+                new_worker_count = min(
+                    num_workers_running - self.worker_refresh_batch_size, self.worker_refresh_batch_size
+                )
+                self.log.debug(
+                    '[%d / %d] Spawning %d workers',
+                    num_ready_workers_running, num_workers_running, new_worker_count
+                )
+                self._spawn_new_workers(num_workers_running)
+            return
+
+        # Now the number of running and expected worker should be equal
+
+        # If workers should be restarted periodically.
+        if self.worker_refresh_interval > 0 and self._last_refresh_time:
+            # and we refreshed the workers a long time ago, refresh the workers
+            last_refresh_diff = (time.time() - self._last_refresh_time)
+            if self.worker_refresh_interval < last_refresh_diff:
+                num_new_workers = self.worker_refresh_batch_size
+                self.log.debug(
+                    '[%d / %d] Starting doing a refresh. Starting %d workers.',
+                    num_ready_workers_running, num_workers_running, num_new_workers
                 )
-                time.sleep(refresh_interval)
-                start_refresh(gunicorn_master_proc)
+                self._spawn_new_workers(num_new_workers)
+                self._last_refresh_time = time.time()
+                return
 
-            else:
-                # num_ready_workers_running == num_workers_running < num_workers_expected
-                log.error((
-                    "%s some workers seem to have died and gunicorn"
-                    "did not restart them as expected"
-                ), state)
-                time.sleep(10)
-                if len(
-                    psutil.Process(gunicorn_master_proc.pid).children()
-                ) < num_workers_expected:
-                    start_refresh(gunicorn_master_proc)
-    except (AirflowWebServerTimeout, OSError) as err:
-        log.error(err)
-        log.error("Shutting down webserver")
-        try:
-            gunicorn_master_proc.terminate()
-            gunicorn_master_proc.wait()
-        finally:
-            sys.exit(1)
+        # if we should check the directory with the plugin,
+        if self.reload_on_plugin_change:
+            # compare the previous and current contents of the directory
+            new_state = self._generate_plugin_state()
+            # If changed, wait until its content is fully saved.
+            if new_state != self._last_plugin_state:
+                self.log.debug(
+                    '[%d / %d] Plugins folder changed. The gunicorn will be restarted the next time the '
+                    'plugin directory is checked, if there is no change in it.',
+                    num_ready_workers_running, num_workers_running
+                )
+                self._restart_on_next_plugin_check = True
+                self._last_plugin_state = new_state
+            elif self._restart_on_next_plugin_check:
+                self.log.debug(
+                    '[%d / %d] Starts reloading the gunicorn configuration.',
+                    num_ready_workers_running, num_workers_running
+                )
+                self._restart_on_next_plugin_check = False
+                self._last_refresh_time = time.time()
+                self._reload_gunicorn()
 
 
 @cli_utils.action_logging
@@ -962,13 +1096,13 @@ def webserver(args):
 
         run_args = [
             'gunicorn',
-            '-w', str(num_workers),
-            '-k', str(args.workerclass),
-            '-t', str(worker_timeout),
-            '-b', args.hostname + ':' + str(args.port),
-            '-n', 'airflow-webserver',
-            '-p', str(pid),
-            '-c', 'python:airflow.www.gunicorn_config',
+            '--workers', str(num_workers),
+            '--worker-class', str(args.workerclass),
+            '--timeout', str(worker_timeout),
+            '--bind', args.hostname + ':' + str(args.port),
+            '--name', 'airflow-webserver',
+            '--pid', str(pid),
+            '--config', 'python:airflow.www.gunicorn_config',
         ]
 
         if args.access_logfile:
@@ -978,7 +1112,7 @@ def webserver(args):
             run_args += ['--error-logfile', str(args.error_logfile)]
 
         if args.daemon:
-            run_args += ['-D']
+            run_args += ['--deamon']
 
         if ssl_cert:
             run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
@@ -995,12 +1129,14 @@ def webserver(args):
 
         def monitor_gunicorn(gunicorn_master_proc):
             # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
-            if conf.getint('webserver', 'worker_refresh_interval') > 0:
-                master_timeout = conf.getint('webserver', 'web_server_master_timeout')
-                restart_workers(gunicorn_master_proc, num_workers, master_timeout)
-            else:
-                while True:
-                    time.sleep(1)
+            GunicornMonitor(
+                gunicorn_master_proc=gunicorn_master_proc,
+                num_workers_expected=num_workers,
+                master_timeout=conf.getint('webserver', 'web_server_master_timeout'),
+                worker_refresh_interval=conf.getint('webserver', 'worker_refresh_interval', fallback=10),
+                worker_refresh_batch_size=conf.getint('webserver', 'worker_refresh_batch_size', fallback=1),
+                reload_on_plugin_change=conf.getint('webserver', 'reload_on_plugin_change', fallback=1),
+            ).start()
 
         if args.daemon:
             base, ext = os.path.splitext(pid)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index e32b8cc..1f697b1 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -699,6 +699,14 @@
       type: string
       example: ~
       default: "30"
+    - name: reload_on_plugin_change
+      description: |
+        If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+        then reload the gunicorn.
+      version_added: ~
+      type: boolean
+      example: ~
+      default: False
     - name: secret_key
       description: |
         Secret key used to run your flask app
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index c75d3ae..cfc92ad 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -341,6 +341,10 @@ worker_refresh_batch_size = 1
 # Number of seconds to wait before refreshing a batch of workers.
 worker_refresh_interval = 30
 
+# If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+# then reload the gunicorn.
+reload_on_plugin_change =
+
 # Secret key used to run your flask app
 # It should be as random as possible
 secret_key = temporary_key
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index a2a81ac..5748057 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -17,6 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import contextlib
+import errno
 import io
 
 import logging
@@ -28,8 +29,8 @@ from six import StringIO, PY2
 import sys
 
 from datetime import datetime, timedelta, time
-from mock import patch, Mock, MagicMock
-from time import sleep
+from mock import patch, MagicMock
+from time import sleep, time as timetime
 import psutil
 import pytz
 import subprocess
@@ -37,9 +38,10 @@ import pytest
 from argparse import Namespace
 from airflow import settings
 import airflow.bin.cli as cli
-from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
+from airflow.bin.cli import run, get_dag
 from airflow.models import TaskInstance
 from airflow.utils import timezone
+from airflow.utils.file import TemporaryDirectory
 from airflow.utils.state import State
 from airflow.settings import Session
 from airflow import models
@@ -154,39 +156,6 @@ class TestCLI(unittest.TestCase):
         cls.dagbag = models.DagBag(include_examples=True)
         cls.parser = cli.CLIFactory.get_parser()
 
-    def setUp(self):
-        self.gunicorn_master_proc = Mock(pid=None)
-        self.children = MagicMock()
-        self.child = MagicMock()
-        self.process = MagicMock()
-
-    def test_ready_prefix_on_cmdline(self):
-        self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX]
-        self.process.children.return_value = [self.child]
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 1)
-
-    def test_ready_prefix_on_cmdline_no_children(self):
-        self.process.children.return_value = []
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
-    def test_ready_prefix_on_cmdline_zombie(self):
-        self.child.cmdline.return_value = []
-        self.process.children.return_value = [self.child]
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
-    def test_ready_prefix_on_cmdline_dead_process(self):
-        self.child.cmdline.side_effect = psutil.NoSuchProcess(11347)
-        self.process.children.return_value = [self.child]
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
     def test_cli_webserver_debug(self):
         env = os.environ.copy()
         p = psutil.Popen(["airflow", "webserver", "-d"], env=env)
@@ -847,3 +816,195 @@ class TestShowInfo(unittest.TestCase):
         self.assertIn("https://file.io/TEST", temp_stdout.getvalue())
         content = mock_requests.post.call_args[1]["files"]["file"][1]
         self.assertIn("postgresql+psycopg2://p...s:PASSWORD@postgres/airflow", content)
+
+
+class TestGunicornMonitor(unittest.TestCase):
+
+    def setUp(self):
+        self.gunicorn_master_proc = mock.Mock(pid=2137)
+        self.monitor = cli.GunicornMonitor(
+            gunicorn_master_proc=self.gunicorn_master_proc,
+            num_workers_expected=4,
+            master_timeout=60,
+            worker_refresh_interval=60,
+            worker_refresh_batch_size=2,
+            reload_on_plugin_change=True,
+        )
+        mock.patch.object(self.monitor, '_generate_plugin_state', return_value={}).start()
+        mock.patch.object(self.monitor, '_get_num_ready_workers_running', return_value=4).start()
+        mock.patch.object(self.monitor, '_get_num_workers_running', return_value=4).start()
+        mock.patch.object(self.monitor, '_spawn_new_workers', return_value=None).start()
+        mock.patch.object(self.monitor, '_kill_old_workers', return_value=None).start()
+        mock.patch.object(self.monitor, '_reload_gunicorn', return_value=None).start()
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_wait_for_workers_to_start(self, mock_sleep):
+        self.monitor._get_num_ready_workers_running.return_value = 0
+        self.monitor._get_num_workers_running.return_value = 4
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_kill_excess_workers(self, mock_sleep):
+        self.monitor._get_num_ready_workers_running.return_value = 10
+        self.monitor._get_num_workers_running.return_value = 10
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_called_once_with(2)  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_start_new_workers_when_missing(self, mock_sleep):
+        self.monitor._get_num_ready_workers_running.return_value = 2
+        self.monitor._get_num_workers_running.return_value = 2
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_called_once_with(2)  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_start_new_workers_when_refresh_interval_has_passed(self, mock_sleep):
+        self.monitor._last_refresh_time -= 200
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_called_once_with(2)  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+        self.assertAlmostEqual(self.monitor._last_refresh_time, timetime(), delta=5)
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_reload_when_plugin_has_been_changed(self, mock_sleep):
+        self.monitor._generate_plugin_state.return_value = {'AA': 12}
+
+        self.monitor._check_workers()
+
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+        self.monitor._generate_plugin_state.return_value = {'AA': 32}
+
+        self.monitor._check_workers()
+
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+        self.monitor._generate_plugin_state.return_value = {'AA': 32}
+
+        self.monitor._check_workers()
+
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_called_once_with()  # pylint: disable=no-member
+        self.assertAlmostEqual(self.monitor._last_refresh_time, timetime(), delta=5)
+
+
+class TestGunicornMonitorGeneratePluginState(unittest.TestCase):
+    @staticmethod
+    def _prepare_test_file(filepath, size):
+        try:
+            os.makedirs(os.path.dirname(filepath))
+        except OSError as e:
+            # be happy if someone already created the path
+            if e.errno != errno.EEXIST:
+                raise
+        with open(filepath, "w") as file:
+            file.write("A" * size)
+            file.flush()
+
+    def test_should_detect_changes_in_directory(self):
+        with TemporaryDirectory(prefix="tmp") as tempdir, \
+                mock.patch("airflow.bin.cli.settings.PLUGINS_FOLDER", tempdir):
+            self._prepare_test_file("{}/file1.txt".format(tempdir), 100)
+            self._prepare_test_file("{}/nested/nested/nested/nested/file2.txt".format(tempdir), 200)
+            self._prepare_test_file("{}/file3.txt".format(tempdir), 300)
+
+            monitor = cli.GunicornMonitor(
+                gunicorn_master_proc=mock.MagicMock(),
+                num_workers_expected=4,
+                master_timeout=60,
+                worker_refresh_interval=60,
+                worker_refresh_batch_size=2,
+                reload_on_plugin_change=True,
+            )
+
+            # When the files have not changed, the result should be constant
+            state_a = monitor._generate_plugin_state()
+            state_b = monitor._generate_plugin_state()
+
+            self.assertEqual(state_a, state_b)
+            self.assertEqual(3, len(state_a))
+
+            # Should detect new file
+            self._prepare_test_file("{}/file4.txt".format(tempdir), 400)
+
+            state_c = monitor._generate_plugin_state()
+
+            self.assertNotEqual(state_b, state_c)
+            self.assertEqual(4, len(state_c))
+
+            # Should detect changes in files
+            self._prepare_test_file("{}/file4.txt".format(tempdir), 450)
+
+            state_d = monitor._generate_plugin_state()
+
+            self.assertNotEqual(state_c, state_d)
+            self.assertEqual(4, len(state_d))
+
+            # Should support large files
+            self._prepare_test_file("{}/file4.txt".format(tempdir), 4000000)
+
+            state_d = monitor._generate_plugin_state()
+
+            self.assertNotEqual(state_c, state_d)
+            self.assertEqual(4, len(state_d))
+
+
+class TestCLIGetNumReadyWorkersRunning(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.parser = cli.get_parser()
+
+    def setUp(self):
+        self.gunicorn_master_proc = mock.Mock(pid=2137)
+        self.children = mock.MagicMock()
+        self.child = mock.MagicMock()
+        self.process = mock.MagicMock()
+        self.monitor = cli.GunicornMonitor(
+            gunicorn_master_proc=self.gunicorn_master_proc,
+            num_workers_expected=4,
+            master_timeout=60,
+            worker_refresh_interval=60,
+            worker_refresh_batch_size=2,
+            reload_on_plugin_change=True,
+        )
+
+    def test_ready_prefix_on_cmdline(self):
+        self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX]
+        self.process.children.return_value = [self.child]
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 1)
+
+    def test_ready_prefix_on_cmdline_no_children(self):
+        self.process.children.return_value = []
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
+
+    def test_ready_prefix_on_cmdline_zombie(self):
+        self.child.cmdline.return_value = []
+        self.process.children.return_value = [self.child]
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
+
+    def test_ready_prefix_on_cmdline_dead_process(self):
+        self.child.cmdline.side_effect = psutil.NoSuchProcess(11347)
+        self.process.children.return_value = [self.child]
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)


[airflow] 05/05: Add docs about reload_on_plugin_change opiton (#9575)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 13e53491f6ed034563a5e7e1174194842f00d2a8
Author: Kamil Breguła <mi...@users.noreply.github.com>
AuthorDate: Mon Jun 29 23:24:24 2020 +0200

    Add docs about reload_on_plugin_change opiton (#9575)
    
    
    (cherry picked from commit 656c48da9c1d2ea13b928bd6b968ccd86bc7a95d)
---
 docs/plugins.rst | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/docs/plugins.rst b/docs/plugins.rst
index bfe08aa..88559e0 100644
--- a/docs/plugins.rst
+++ b/docs/plugins.rst
@@ -318,3 +318,12 @@ This will create a hook, and an operator accessible at:
 
 - ``airflow.hooks.my_namespace.MyHook``
 - ``airflow.operators.my_namespace.MyOperator``
+
+Automatic reloading webserver
+-----------------------------
+
+To enable automatic reloading of the webserver, when changes in a directory with plugins has been detected,
+you should set ``reload_on_plugin_change`` option in ``[webserver]`` section to ``True``.
+
+.. note::
+    For more information on setting the configuration, see :doc:`/howto/set-config`


[airflow] 01/05: Add copy button to Code Blocks in Airflow Docs (#9450)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0ee254c0a6202eee198bbf804139e9232737bf07
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Jun 20 21:03:49 2020 +0100

    Add copy button to Code Blocks in Airflow Docs (#9450)
    
    (cherry picked from commit 5b820a76e448aa4ec8fcc14015d23bf6bdb3ee9e)
---
 docs/conf.py                            |  1 +
 requirements/requirements-python2.7.txt | 44 ++++++++++++------------
 requirements/requirements-python3.5.txt | 54 +++++++++++++++---------------
 requirements/requirements-python3.6.txt | 59 +++++++++++++++++----------------
 requirements/requirements-python3.7.txt | 59 +++++++++++++++++----------------
 requirements/setup-2.7.md5              |  2 +-
 requirements/setup-3.5.md5              |  2 +-
 requirements/setup-3.6.md5              |  2 +-
 requirements/setup-3.7.md5              |  2 +-
 setup.py                                |  4 ++-
 10 files changed, 117 insertions(+), 112 deletions(-)

diff --git a/docs/conf.py b/docs/conf.py
index 8eeb3b5..6df66f8 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -132,6 +132,7 @@ extensions = [
     'exampleinclude',
     'docroles',
     'removemarktransform',
+    'sphinx_copybutton',
 ]
 
 autodoc_default_options = {
diff --git a/requirements/requirements-python2.7.txt b/requirements/requirements-python2.7.txt
index 5d8c1df..314c660 100644
--- a/requirements/requirements-python2.7.txt
+++ b/requirements/requirements-python2.7.txt
@@ -24,7 +24,7 @@ PySmbClient==0.1.5
 PyYAML==5.3.1
 Pygments==2.5.2
 SQLAlchemy-JSONField==0.8.0
-SQLAlchemy==1.3.17
+SQLAlchemy==1.3.18
 Sphinx==1.8.5
 Unidecode==1.1.1
 WTForms==2.3.1
@@ -69,18 +69,18 @@ beautifulsoup4==4.7.1
 billiard==3.6.3.0
 bleach==3.1.5
 blinker==1.4
-boto3==1.14.8
+boto3==1.14.13
 boto==2.49.0
-botocore==1.17.8
+botocore==1.17.13
 cached-property==1.5.1
 cachetools==3.1.1
 cassandra-driver==3.20.2
 cattrs==1.0.0
-celery==4.4.5
+celery==4.4.6
 certifi==2020.6.20
 cffi==1.14.0
 cfgv==2.0.1
-cfn-lint==0.33.1
+cfn-lint==0.33.2
 cgroupspy==0.1.6
 chardet==3.0.4
 click==6.7
@@ -95,11 +95,11 @@ coverage==5.1
 croniter==0.3.34
 cryptography==2.9.2
 cx-Oracle==7.3.0
-datadog==0.37.0
+datadog==0.37.1
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
-distlib==0.3.0
+distlib==0.3.1
 dnspython==1.16.0
 docker-pycreds==0.4.0
 docker==3.7.3
@@ -138,7 +138,7 @@ google-cloud-core==1.3.0
 google-cloud-dlp==1.0.0
 google-cloud-language==1.3.0
 google-cloud-secret-manager==1.0.0
-google-cloud-spanner==1.17.0
+google-cloud-spanner==1.17.1
 google-cloud-speech==1.3.2
 google-cloud-storage==1.29.0
 google-cloud-texttospeech==1.0.1
@@ -150,7 +150,7 @@ googleapis-common-protos==1.52.0
 graphviz==0.14
 grpc-google-iam-v1==0.12.3
 grpcio-gcp==0.2.2
-grpcio==1.29.0
+grpcio==1.30.0
 gunicorn==19.10.0
 hdfs==2.5.8
 hmsclient==0.1.1
@@ -158,14 +158,14 @@ httplib2==0.18.1
 humanize==0.5.1
 hvac==0.10.4
 identify==1.4.20
-idna==2.9
+idna==2.10
 ijson==2.6.1
 imagesize==1.2.0
-importlib-metadata==1.6.1
+importlib-metadata==1.7.0
 importlib-resources==2.0.1
 inflection==0.3.1
 ipaddress==1.0.23
-ipdb==0.13.2
+ipdb==0.13.3
 ipython-genutils==0.2.0
 ipython==5.10.0
 iso8601==0.1.12
@@ -180,7 +180,7 @@ jsonpickle==1.4.1
 jsonpointer==2.0
 jsonschema==3.2.0
 junit-xml==1.9
-jupyter-client==5.3.4
+jupyter-client==5.3.5
 jupyter-core==4.6.3
 kombu==4.6.3
 kubernetes==11.0.0
@@ -198,8 +198,8 @@ mongomock==3.19.0
 monotonic==1.5
 more-itertools==5.0.0
 moto==1.3.14
-msrest==0.6.16
-msrestazure==0.6.3
+msrest==0.6.17
+msrestazure==0.6.4
 multi-key-dict==2.0.3
 mysqlclient==1.3.14
 natsort==6.2.1
@@ -235,13 +235,13 @@ protobuf==3.12.2
 psutil==5.7.0
 psycopg2-binary==2.8.5
 ptyprocess==0.6.0
-py==1.8.2
+py==1.9.0
 pyOpenSSL==19.1.0
 pyasn1-modules==0.2.8
 pyasn1==0.4.8
 pycodestyle==2.6.0
 pycparser==2.20
-pycryptodomex==3.9.7
+pycryptodomex==3.9.8
 pydata-google-auth==1.1.0
 pydruid==0.5.8
 pyflakes==2.2.0
@@ -252,7 +252,7 @@ pyparsing==2.4.7
 pyrsistent==0.16.0
 pysftp==0.2.9
 pytest-cov==2.10.0
-pytest-forked==1.1.3
+pytest-forked==1.2.0
 pytest-instafail==0.4.2
 pytest-rerunfailures==9.0
 pytest-timeout==1.4.1
@@ -266,7 +266,7 @@ python-jenkins==1.7.0
 python-jose==3.1.0
 python-nvd3==0.15.0
 python-openid==2.2.5
-python-slugify==4.0.0
+python-slugify==4.0.1
 pytz==2020.1
 pytzdata==2019.3
 pywinrm==0.4.1
@@ -319,7 +319,7 @@ thrift==0.13.0
 tokenize-rt==3.2.0
 toml==0.10.1
 tornado==5.1.1
-tqdm==4.46.1
+tqdm==4.47.0
 traceback2==1.4.0
 traitlets==4.3.3
 typing-extensions==3.7.4.2
@@ -331,8 +331,8 @@ uritemplate==3.0.1
 urllib3==1.25.9
 vertica-python==0.10.4
 vine==1.3.0
-virtualenv==20.0.24
-wcwidth==0.2.4
+virtualenv==20.0.25
+wcwidth==0.2.5
 webencodings==0.5.1
 websocket-client==0.57.0
 wrapt==1.12.1
diff --git a/requirements/requirements-python3.5.txt b/requirements/requirements-python3.5.txt
index 4f12b75..9cc708b 100644
--- a/requirements/requirements-python3.5.txt
+++ b/requirements/requirements-python3.5.txt
@@ -24,7 +24,7 @@ PySmbClient==0.1.5
 PyYAML==5.3.1
 Pygments==2.6.1
 SQLAlchemy-JSONField==0.9.0
-SQLAlchemy==1.3.17
+SQLAlchemy==1.3.18
 Sphinx==3.1.1
 Unidecode==1.1.1
 WTForms==2.3.1
@@ -60,18 +60,18 @@ bcrypt==3.1.7
 beautifulsoup4==4.7.1
 billiard==3.6.3.0
 blinker==1.4
-boto3==1.14.8
+boto3==1.14.13
 boto==2.49.0
-botocore==1.17.8
+botocore==1.17.13
 cached-property==1.5.1
-cachetools==4.1.0
+cachetools==4.1.1
 cassandra-driver==3.20.2
 cattrs==1.0.0
-celery==4.4.5
+celery==4.4.6
 certifi==2020.6.20
 cffi==1.14.0
 cfgv==2.0.1
-cfn-lint==0.33.1
+cfn-lint==0.33.2
 cgroupspy==0.1.6
 chardet==3.0.4
 click==6.7
@@ -82,12 +82,12 @@ configparser==3.5.3
 coverage==5.1
 croniter==0.3.34
 cryptography==2.9.2
-cx-Oracle==7.3.0
-datadog==0.37.0
+cx-Oracle==8.0.0
+datadog==0.37.1
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
-distlib==0.3.0
+distlib==0.3.1
 dnspython==1.16.0
 docker-pycreds==0.4.0
 docker==3.7.3
@@ -124,7 +124,7 @@ google-cloud-core==1.3.0
 google-cloud-dlp==1.0.0
 google-cloud-language==1.3.0
 google-cloud-secret-manager==1.0.0
-google-cloud-spanner==1.17.0
+google-cloud-spanner==1.17.1
 google-cloud-speech==1.3.2
 google-cloud-storage==1.29.0
 google-cloud-texttospeech==1.0.1
@@ -136,7 +136,7 @@ googleapis-common-protos==1.52.0
 graphviz==0.14
 grpc-google-iam-v1==0.12.3
 grpcio-gcp==0.2.2
-grpcio==1.29.0
+grpcio==1.30.0
 gunicorn==19.10.0
 hdfs==2.5.8
 hmsclient==0.1.1
@@ -144,13 +144,13 @@ httplib2==0.18.1
 humanize==0.5.1
 hvac==0.10.4
 identify==1.4.20
-idna==2.9
+idna==2.10
 ijson==2.6.1
 imagesize==1.2.0
-importlib-metadata==1.6.1
+importlib-metadata==1.7.0
 importlib-resources==2.0.1
 inflection==0.5.0
-ipdb==0.13.2
+ipdb==0.13.3
 ipython-genutils==0.2.0
 ipython==7.9.0
 iso8601==0.1.12
@@ -166,9 +166,9 @@ jsonpickle==1.4.1
 jsonpointer==2.0
 jsonschema==3.2.0
 junit-xml==1.9
-jupyter-client==6.1.3
+jupyter-client==6.1.5
 jupyter-core==4.6.3
-kombu==4.6.10
+kombu==4.6.11
 kubernetes==11.0.0
 lazy-object-proxy==1.5.0
 ldap3==2.7
@@ -181,8 +181,8 @@ mock==3.0.5
 mongomock==3.19.0
 more-itertools==8.4.0
 moto==1.3.14
-msrest==0.6.16
-msrestazure==0.6.3
+msrest==0.6.17
+msrestazure==0.6.4
 multi-key-dict==2.0.3
 mypy-extensions==0.4.3
 mypy==0.720
@@ -220,14 +220,14 @@ protobuf==3.12.2
 psutil==5.7.0
 psycopg2-binary==2.8.5
 ptyprocess==0.6.0
-py==1.8.2
+py==1.9.0
 pyOpenSSL==19.1.0
 pyarrow==0.17.1
 pyasn1-modules==0.2.8
 pyasn1==0.4.8
 pycodestyle==2.6.0
 pycparser==2.20
-pycryptodomex==3.9.7
+pycryptodomex==3.9.8
 pydata-google-auth==1.1.0
 pydruid==0.5.8
 pyflakes==2.2.0
@@ -238,7 +238,7 @@ pyparsing==2.4.7
 pyrsistent==0.16.0
 pysftp==0.2.9
 pytest-cov==2.10.0
-pytest-forked==1.1.3
+pytest-forked==1.2.0
 pytest-instafail==0.4.2
 pytest-rerunfailures==9.0
 pytest-timeout==1.4.1
@@ -251,8 +251,8 @@ python-http-client==3.2.7
 python-jenkins==1.7.0
 python-jose==3.1.0
 python-nvd3==0.15.0
-python-slugify==4.0.0
-python3-openid==3.1.0
+python-slugify==4.0.1
+python3-openid==3.2.0
 pytz==2020.1
 pytzdata==2019.3
 pywinrm==0.4.1
@@ -277,7 +277,7 @@ setproctitle==1.1.10
 six==1.15.0
 slackclient==1.3.2
 snowballstemmer==2.0.0
-snowflake-connector-python==2.2.7
+snowflake-connector-python==2.2.8
 snowflake-sqlalchemy==1.2.3
 soupsieve==2.0.1
 sphinx-argparse==0.2.5
@@ -305,7 +305,7 @@ thrift==0.13.0
 tokenize-rt==3.2.0
 toml==0.10.1
 tornado==5.1.1
-tqdm==4.46.1
+tqdm==4.47.0
 traitlets==4.3.3
 typed-ast==1.4.1
 typing-extensions==3.7.4.2
@@ -316,8 +316,8 @@ uritemplate==3.0.1
 urllib3==1.25.9
 vertica-python==0.10.4
 vine==1.3.0
-virtualenv==20.0.24
-wcwidth==0.2.4
+virtualenv==20.0.25
+wcwidth==0.2.5
 websocket-client==0.57.0
 wrapt==1.12.1
 xmltodict==0.12.0
diff --git a/requirements/requirements-python3.6.txt b/requirements/requirements-python3.6.txt
index 85fb5f4..275cd7c 100644
--- a/requirements/requirements-python3.6.txt
+++ b/requirements/requirements-python3.6.txt
@@ -25,7 +25,7 @@ PyYAML==5.3.1
 Pygments==2.6.1
 SQLAlchemy-JSONField==0.9.0
 SQLAlchemy-Utils==0.36.6
-SQLAlchemy==1.3.17
+SQLAlchemy==1.3.18
 Sphinx==3.1.1
 Unidecode==1.1.1
 WTForms==2.3.1
@@ -62,18 +62,18 @@ beautifulsoup4==4.7.1
 billiard==3.6.3.0
 black==19.10b0
 blinker==1.4
-boto3==1.14.8
+boto3==1.14.13
 boto==2.49.0
-botocore==1.17.8
+botocore==1.17.13
 cached-property==1.5.1
-cachetools==4.1.0
+cachetools==4.1.1
 cassandra-driver==3.20.2
 cattrs==1.0.0
-celery==4.4.5
+celery==4.4.6
 certifi==2020.6.20
 cffi==1.14.0
 cfgv==3.1.0
-cfn-lint==0.33.1
+cfn-lint==0.33.2
 cgroupspy==0.1.6
 chardet==3.0.4
 click==6.7
@@ -84,12 +84,12 @@ configparser==3.5.3
 coverage==5.1
 croniter==0.3.34
 cryptography==2.9.2
-cx-Oracle==7.3.0
-datadog==0.37.0
+cx-Oracle==8.0.0
+datadog==0.37.1
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
-distlib==0.3.0
+distlib==0.3.1
 dnspython==1.16.0
 docker-pycreds==0.4.0
 docker==3.7.3
@@ -126,7 +126,7 @@ google-cloud-core==1.3.0
 google-cloud-dlp==1.0.0
 google-cloud-language==1.3.0
 google-cloud-secret-manager==1.0.0
-google-cloud-spanner==1.17.0
+google-cloud-spanner==1.17.1
 google-cloud-speech==1.3.2
 google-cloud-storage==1.29.0
 google-cloud-texttospeech==1.0.1
@@ -138,7 +138,7 @@ googleapis-common-protos==1.52.0
 graphviz==0.14
 grpc-google-iam-v1==0.12.3
 grpcio-gcp==0.2.2
-grpcio==1.29.0
+grpcio==1.30.0
 gunicorn==19.10.0
 hdfs==2.5.8
 hmsclient==0.1.1
@@ -146,15 +146,15 @@ httplib2==0.18.1
 humanize==0.5.1
 hvac==0.10.4
 identify==1.4.20
-idna==2.9
+idna==2.10
 ijson==2.6.1
 imagesize==1.2.0
-importlib-metadata==1.6.1
+importlib-metadata==1.7.0
 importlib-resources==2.0.1
 inflection==0.5.0
-ipdb==0.13.2
+ipdb==0.13.3
 ipython-genutils==0.2.0
-ipython==7.15.0
+ipython==7.16.1
 iso8601==0.1.12
 isodate==0.6.0
 itsdangerous==1.1.0
@@ -168,9 +168,9 @@ jsonpickle==1.4.1
 jsonpointer==2.0
 jsonschema==3.2.0
 junit-xml==1.9
-jupyter-client==6.1.3
+jupyter-client==6.1.5
 jupyter-core==4.6.3
-kombu==4.6.10
+kombu==4.6.11
 kubernetes==11.0.0
 lazy-object-proxy==1.5.0
 ldap3==2.7
@@ -183,8 +183,8 @@ mock==4.0.2
 mongomock==3.19.0
 more-itertools==8.4.0
 moto==1.3.14
-msrest==0.6.16
-msrestazure==0.6.3
+msrest==0.6.17
+msrestazure==0.6.4
 multi-key-dict==2.0.3
 mypy-extensions==0.4.3
 mypy==0.720
@@ -203,7 +203,7 @@ oscrypto==1.2.0
 packaging==20.4
 pandas-gbq==0.13.2
 pandas==1.0.5
-papermill==2.1.1
+papermill==2.1.2
 parameterized==0.7.4
 paramiko==2.7.1
 parso==0.7.0
@@ -222,14 +222,14 @@ protobuf==3.12.2
 psutil==5.7.0
 psycopg2-binary==2.8.5
 ptyprocess==0.6.0
-py==1.8.2
+py==1.9.0
 pyOpenSSL==19.1.0
 pyarrow==0.17.1
 pyasn1-modules==0.2.8
 pyasn1==0.4.8
 pycodestyle==2.6.0
 pycparser==2.20
-pycryptodomex==3.9.7
+pycryptodomex==3.9.8
 pydata-google-auth==1.1.0
 pydruid==0.5.8
 pyflakes==2.2.0
@@ -240,7 +240,7 @@ pyparsing==2.4.7
 pyrsistent==0.16.0
 pysftp==0.2.9
 pytest-cov==2.10.0
-pytest-forked==1.1.3
+pytest-forked==1.2.0
 pytest-instafail==0.4.2
 pytest-rerunfailures==9.0
 pytest-timeout==1.4.1
@@ -253,8 +253,8 @@ python-http-client==3.2.7
 python-jenkins==1.7.0
 python-jose==3.1.0
 python-nvd3==0.15.0
-python-slugify==4.0.0
-python3-openid==3.1.0
+python-slugify==4.0.1
+python3-openid==3.2.0
 pytz==2020.1
 pytzdata==2019.3
 pywinrm==0.4.1
@@ -280,11 +280,12 @@ setproctitle==1.1.10
 six==1.15.0
 slackclient==1.3.2
 snowballstemmer==2.0.0
-snowflake-connector-python==2.2.7
+snowflake-connector-python==2.2.8
 snowflake-sqlalchemy==1.2.3
 soupsieve==2.0.1
 sphinx-argparse==0.2.5
 sphinx-autoapi==1.0.0
+sphinx-copybutton==0.2.12
 sphinx-jinja==1.1.1
 sphinx-rtd-theme==0.5.0
 sphinxcontrib-applehelp==1.0.2
@@ -307,7 +308,7 @@ thrift-sasl==0.4.2
 thrift==0.13.0
 toml==0.10.1
 tornado==5.1.1
-tqdm==4.46.1
+tqdm==4.47.0
 traitlets==4.3.3
 typed-ast==1.4.1
 typing-extensions==3.7.4.2
@@ -318,8 +319,8 @@ uritemplate==3.0.1
 urllib3==1.25.9
 vertica-python==0.10.4
 vine==1.3.0
-virtualenv==20.0.24
-wcwidth==0.2.4
+virtualenv==20.0.25
+wcwidth==0.2.5
 websocket-client==0.57.0
 wrapt==1.12.1
 xmltodict==0.12.0
diff --git a/requirements/requirements-python3.7.txt b/requirements/requirements-python3.7.txt
index 1483b27..91eb574 100644
--- a/requirements/requirements-python3.7.txt
+++ b/requirements/requirements-python3.7.txt
@@ -25,7 +25,7 @@ PyYAML==5.3.1
 Pygments==2.6.1
 SQLAlchemy-JSONField==0.9.0
 SQLAlchemy-Utils==0.36.6
-SQLAlchemy==1.3.17
+SQLAlchemy==1.3.18
 Sphinx==3.1.1
 Unidecode==1.1.1
 WTForms==2.3.1
@@ -62,18 +62,18 @@ beautifulsoup4==4.7.1
 billiard==3.6.3.0
 black==19.10b0
 blinker==1.4
-boto3==1.14.8
+boto3==1.14.13
 boto==2.49.0
-botocore==1.17.8
+botocore==1.17.13
 cached-property==1.5.1
-cachetools==4.1.0
+cachetools==4.1.1
 cassandra-driver==3.20.2
 cattrs==1.0.0
-celery==4.4.5
+celery==4.4.6
 certifi==2020.6.20
 cffi==1.14.0
 cfgv==3.1.0
-cfn-lint==0.33.1
+cfn-lint==0.33.2
 cgroupspy==0.1.6
 chardet==3.0.4
 click==6.7
@@ -84,12 +84,12 @@ configparser==3.5.3
 coverage==5.1
 croniter==0.3.34
 cryptography==2.9.2
-cx-Oracle==7.3.0
-datadog==0.37.0
+cx-Oracle==8.0.0
+datadog==0.37.1
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
-distlib==0.3.0
+distlib==0.3.1
 dnspython==1.16.0
 docker-pycreds==0.4.0
 docker==3.7.3
@@ -126,7 +126,7 @@ google-cloud-core==1.3.0
 google-cloud-dlp==1.0.0
 google-cloud-language==1.3.0
 google-cloud-secret-manager==1.0.0
-google-cloud-spanner==1.17.0
+google-cloud-spanner==1.17.1
 google-cloud-speech==1.3.2
 google-cloud-storage==1.29.0
 google-cloud-texttospeech==1.0.1
@@ -138,7 +138,7 @@ googleapis-common-protos==1.52.0
 graphviz==0.14
 grpc-google-iam-v1==0.12.3
 grpcio-gcp==0.2.2
-grpcio==1.29.0
+grpcio==1.30.0
 gunicorn==19.10.0
 hdfs==2.5.8
 hmsclient==0.1.1
@@ -146,14 +146,14 @@ httplib2==0.18.1
 humanize==0.5.1
 hvac==0.10.4
 identify==1.4.20
-idna==2.9
+idna==2.10
 ijson==2.6.1
 imagesize==1.2.0
-importlib-metadata==1.6.1
+importlib-metadata==1.7.0
 inflection==0.5.0
-ipdb==0.13.2
+ipdb==0.13.3
 ipython-genutils==0.2.0
-ipython==7.15.0
+ipython==7.16.1
 iso8601==0.1.12
 isodate==0.6.0
 itsdangerous==1.1.0
@@ -167,9 +167,9 @@ jsonpickle==1.4.1
 jsonpointer==2.0
 jsonschema==3.2.0
 junit-xml==1.9
-jupyter-client==6.1.3
+jupyter-client==6.1.5
 jupyter-core==4.6.3
-kombu==4.6.10
+kombu==4.6.11
 kubernetes==11.0.0
 lazy-object-proxy==1.5.0
 ldap3==2.7
@@ -182,8 +182,8 @@ mock==4.0.2
 mongomock==3.19.0
 more-itertools==8.4.0
 moto==1.3.14
-msrest==0.6.16
-msrestazure==0.6.3
+msrest==0.6.17
+msrestazure==0.6.4
 multi-key-dict==2.0.3
 mypy-extensions==0.4.3
 mypy==0.720
@@ -202,7 +202,7 @@ oscrypto==1.2.0
 packaging==20.4
 pandas-gbq==0.13.2
 pandas==1.0.5
-papermill==2.1.1
+papermill==2.1.2
 parameterized==0.7.4
 paramiko==2.7.1
 parso==0.7.0
@@ -221,14 +221,14 @@ protobuf==3.12.2
 psutil==5.7.0
 psycopg2-binary==2.8.5
 ptyprocess==0.6.0
-py==1.8.2
+py==1.9.0
 pyOpenSSL==19.1.0
 pyarrow==0.17.1
 pyasn1-modules==0.2.8
 pyasn1==0.4.8
 pycodestyle==2.6.0
 pycparser==2.20
-pycryptodomex==3.9.7
+pycryptodomex==3.9.8
 pydata-google-auth==1.1.0
 pydruid==0.5.8
 pyflakes==2.2.0
@@ -239,7 +239,7 @@ pyparsing==2.4.7
 pyrsistent==0.16.0
 pysftp==0.2.9
 pytest-cov==2.10.0
-pytest-forked==1.1.3
+pytest-forked==1.2.0
 pytest-instafail==0.4.2
 pytest-rerunfailures==9.0
 pytest-timeout==1.4.1
@@ -252,8 +252,8 @@ python-http-client==3.2.7
 python-jenkins==1.7.0
 python-jose==3.1.0
 python-nvd3==0.15.0
-python-slugify==4.0.0
-python3-openid==3.1.0
+python-slugify==4.0.1
+python3-openid==3.2.0
 pytz==2020.1
 pytzdata==2019.3
 pywinrm==0.4.1
@@ -279,11 +279,12 @@ setproctitle==1.1.10
 six==1.15.0
 slackclient==1.3.2
 snowballstemmer==2.0.0
-snowflake-connector-python==2.2.7
+snowflake-connector-python==2.2.8
 snowflake-sqlalchemy==1.2.3
 soupsieve==2.0.1
 sphinx-argparse==0.2.5
 sphinx-autoapi==1.0.0
+sphinx-copybutton==0.2.12
 sphinx-jinja==1.1.1
 sphinx-rtd-theme==0.5.0
 sphinxcontrib-applehelp==1.0.2
@@ -306,7 +307,7 @@ thrift-sasl==0.4.2
 thrift==0.13.0
 toml==0.10.1
 tornado==5.1.1
-tqdm==4.46.1
+tqdm==4.47.0
 traitlets==4.3.3
 typed-ast==1.4.1
 typing-extensions==3.7.4.2
@@ -316,8 +317,8 @@ uritemplate==3.0.1
 urllib3==1.25.9
 vertica-python==0.10.4
 vine==1.3.0
-virtualenv==20.0.24
-wcwidth==0.2.4
+virtualenv==20.0.25
+wcwidth==0.2.5
 websocket-client==0.57.0
 wrapt==1.12.1
 xmltodict==0.12.0
diff --git a/requirements/setup-2.7.md5 b/requirements/setup-2.7.md5
index 0cea857..02fe09a 100644
--- a/requirements/setup-2.7.md5
+++ b/requirements/setup-2.7.md5
@@ -1 +1 @@
-77db06fc6e178c2ddc7e84f3c63d4c63  /opt/airflow/setup.py
+f86f24b093d55ecc490bb77e74b332eb  /opt/airflow/setup.py
diff --git a/requirements/setup-3.5.md5 b/requirements/setup-3.5.md5
index 0cea857..02fe09a 100644
--- a/requirements/setup-3.5.md5
+++ b/requirements/setup-3.5.md5
@@ -1 +1 @@
-77db06fc6e178c2ddc7e84f3c63d4c63  /opt/airflow/setup.py
+f86f24b093d55ecc490bb77e74b332eb  /opt/airflow/setup.py
diff --git a/requirements/setup-3.6.md5 b/requirements/setup-3.6.md5
index 0cea857..02fe09a 100644
--- a/requirements/setup-3.6.md5
+++ b/requirements/setup-3.6.md5
@@ -1 +1 @@
-77db06fc6e178c2ddc7e84f3c63d4c63  /opt/airflow/setup.py
+f86f24b093d55ecc490bb77e74b332eb  /opt/airflow/setup.py
diff --git a/requirements/setup-3.7.md5 b/requirements/setup-3.7.md5
index 0cea857..02fe09a 100644
--- a/requirements/setup-3.7.md5
+++ b/requirements/setup-3.7.md5
@@ -1 +1 @@
-77db06fc6e178c2ddc7e84f3c63d4c63  /opt/airflow/setup.py
+f86f24b093d55ecc490bb77e74b332eb  /opt/airflow/setup.py
diff --git a/setup.py b/setup.py
index 443025c..3c6defc 100644
--- a/setup.py
+++ b/setup.py
@@ -228,6 +228,7 @@ doc = [
     'sphinx==1.8.5;python_version<"3.0"',
     'sphinx-argparse>=0.1.13',
     'sphinx-autoapi==1.0.0',
+    'sphinx-copybutton;python_version>="3.6"',
     'sphinx-jinja~=1.1',
     'sphinx-rtd-theme>=0.1.6',
     'sphinxcontrib-httpdomain>=1.7.0',
@@ -314,7 +315,8 @@ mysql = [
     'mysqlclient>=1.3.6,<1.4',
 ]
 oracle = [
-    'cx_Oracle>=5.1.2',
+    'cx_Oracle>=5.1.2, <8.0;python_version<"3.0"',
+    'cx_Oracle>=5.1.2;python_version>="3.0"',
 ]
 pagerduty = [
     'pypd>=1.1.0',