You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/10 06:58:31 UTC

[GitHub] feng-tao closed pull request #4309: [AIRFLOW-3504] Extend/refine the functionality of "/health" endpoint

feng-tao closed pull request #4309: [AIRFLOW-3504] Extend/refine the functionality of "/health" endpoint
URL: https://github.com/apache/airflow/pull/4309
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 99c08908fc..34b5c89cec 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -465,6 +465,11 @@ dag_dir_list_interval = 300
 # How often should stats be printed to the logs
 print_stats_interval = 30
 
+# If the last scheduler heartbeat happened more than scheduler_health_check_threshold ago (in seconds),
+# scheduler is considered unhealthy.
+# This is used by the health check in the "/health" endpoint
+scheduler_health_check_threshold = 30
+
 child_process_log_directory = {AIRFLOW_HOME}/logs/scheduler
 
 # Local task jobs periodically heartbeat to the DB. If the job has
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index f0a467894d..cf7dff8762 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -112,6 +112,7 @@ docker_image_slave = test/docker-airflow
 [scheduler]
 job_heartbeat_sec = 1
 scheduler_heartbeat_sec = 5
+scheduler_health_check_threshold = 30
 authenticate = true
 max_threads = 2
 catchup_by_default = True
diff --git a/airflow/www/blueprints.py b/airflow/www/blueprints.py
index eb92837fe3..e964081f8a 100644
--- a/airflow/www/blueprints.py
+++ b/airflow/www/blueprints.py
@@ -17,10 +17,16 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+from datetime import timedelta
 from flask import (
-    url_for, Markup, Blueprint, redirect,
+    url_for, Blueprint, redirect,
 )
-import markdown
+from sqlalchemy import func
+
+from airflow import configuration as conf
+from airflow import jobs, settings
+from airflow.utils import timezone
+from airflow.www import utils as wwwutils
 
 routes = Blueprint('routes', __name__)
 
@@ -32,6 +38,35 @@ def index():
 
 @routes.route('/health')
 def health():
-    """ We can add an array of tests here to check the server's health """
-    content = Markup(markdown.markdown("The server is healthy!"))
-    return content
+    """
+    An endpoint helping check the health status of the Airflow instance,
+    including metadatabase and scheduler.
+    """
+    session = settings.Session()
+    BJ = jobs.BaseJob
+    payload = {}
+    scheduler_health_check_threshold = timedelta(seconds=conf.getint('scheduler',
+                                                                     'scheduler_health_check_threshold'
+                                                                     ))
+
+    latest_scheduler_heartbeat = None
+    payload['metadatabase'] = {'status': 'healthy'}
+    try:
+        latest_scheduler_heartbeat = session.query(func.max(BJ.latest_heartbeat)). \
+            filter(BJ.state == 'running', BJ.job_type == 'SchedulerJob'). \
+            scalar()
+    except Exception:
+        payload['metadatabase']['status'] = 'unhealthy'
+
+    if not latest_scheduler_heartbeat:
+        scheduler_status = 'unhealthy'
+    else:
+        if timezone.utcnow() - latest_scheduler_heartbeat <= scheduler_health_check_threshold:
+            scheduler_status = 'healthy'
+        else:
+            scheduler_status = 'unhealthy'
+
+    payload['scheduler'] = {'status': scheduler_status,
+                            'latest_scheduler_heartbeat': str(latest_scheduler_heartbeat)}
+
+    return wwwutils.json_response(payload)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 29b5931bce..4ed0580a01 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -381,7 +381,6 @@ def index(self):
     @expose('/chart_data')
     @data_profiling_required
     @wwwutils.gzipped
-    # @cache.cached(timeout=3600, key_prefix=wwwutils.make_cache_key)
     def chart_data(self):
         from airflow import macros
         import pandas as pd
diff --git a/airflow/www_rbac/blueprints.py b/airflow/www_rbac/blueprints.py
index 75bbb93aea..dfa29f19ca 100644
--- a/airflow/www_rbac/blueprints.py
+++ b/airflow/www_rbac/blueprints.py
@@ -17,8 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-from flask import Markup, Blueprint, redirect
-import markdown
+from flask import Blueprint, redirect
 
 routes = Blueprint('routes', __name__)
 
@@ -26,10 +25,3 @@
 @routes.route('/')
 def index():
     return redirect('/home')
-
-
-@routes.route('/health')
-def health():
-    """ We can add an array of tests here to check the server's health """
-    content = Markup(markdown.markdown("The server is healthy!"))
-    return content
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 55e160bc55..093693f461 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -45,7 +45,7 @@
 from past.builtins import unicode
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
-from sqlalchemy import or_, desc, and_, union_all
+from sqlalchemy import func, or_, desc, and_, union_all
 from wtforms import SelectField, validators
 
 import airflow
@@ -151,6 +151,42 @@ def render(self, template, **context):
 
 
 class Airflow(AirflowBaseView):
+    @expose('/health')
+    @provide_session
+    def health(self, session=None):
+        """
+        An endpoint helping check the health status of the Airflow instance,
+        including metadatabase and scheduler.
+        """
+
+        BJ = jobs.BaseJob
+        payload = {}
+        scheduler_health_check_threshold = timedelta(seconds=conf.getint('scheduler',
+                                                                         'scheduler_health_check_threshold'
+                                                                         ))
+
+        latest_scheduler_heartbeat = None
+        payload['metadatabase'] = {'status': 'healthy'}
+        try:
+            latest_scheduler_heartbeat = session.query(func.max(BJ.latest_heartbeat)).\
+                filter(BJ.state == 'running', BJ.job_type == 'SchedulerJob').\
+                scalar()
+        except Exception:
+            payload['metadatabase']['status'] = 'unhealthy'
+
+        if not latest_scheduler_heartbeat:
+            scheduler_status = 'unhealthy'
+        else:
+            if timezone.utcnow() - latest_scheduler_heartbeat <= scheduler_health_check_threshold:
+                scheduler_status = 'healthy'
+            else:
+                scheduler_status = 'unhealthy'
+
+        payload['scheduler'] = {'status': scheduler_status,
+                                'latest_scheduler_heartbeat': str(latest_scheduler_heartbeat)}
+
+        return wwwutils.json_response(payload)
+
     @expose('/home')
     @has_access
     @provide_session
diff --git a/docs/howto/check-health.rst b/docs/howto/check-health.rst
new file mode 100644
index 0000000000..e18026f1e2
--- /dev/null
+++ b/docs/howto/check-health.rst
@@ -0,0 +1,47 @@
+..  Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+..    http://www.apache.org/licenses/LICENSE-2.0
+
+..  Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Checking Airflow Health Status
+==============================
+
+To check the health status of your Airflow instance, you can simply access the endpoint
+``"/health"``. It will return a JSON object in which a high-level glance is provided.
+
+.. code-block:: JSON
+
+  {
+    "metadatabase":{
+      "status":"healthy"
+    },
+    "scheduler":{
+      "status":"healthy",
+      "latest_scheduler_heartbeat":"2018-12-26 17:15:11+00:00"
+    }
+  }
+
+* The ``status`` of each component can be either "healthy" or "unhealthy".
+
+    * The status of ``metadatabase`` is depending on whether a valid connection can be initiated
+      with the database backend of Airflow.
+    * The status of ``scheduler`` is depending on when the latest scheduler heartbeat happened. If the latest
+      scheduler heartbeat happened 30 seconds (default value) earlier than the current time, scheduler component is
+      considered unhealthy. You can also specify this threshold value by changing
+      ``scheduler_health_check_threshold`` in ``scheduler`` section of the ``airflow.cfg`` file.
+
+* The response code of ``"/health"`` endpoint is not used to label the health status of the
+  application (it would always be 200). Hence please be reminded not to use the response code here
+  for health-check purpose.
diff --git a/docs/howto/index.rst b/docs/howto/index.rst
index a56091ddf9..42f2e680b7 100644
--- a/docs/howto/index.rst
+++ b/docs/howto/index.rst
@@ -40,4 +40,5 @@ configuring an Airflow environment.
     run-with-systemd
     run-with-upstart
     use-test-config
+    check-health
 
diff --git a/tests/core.py b/tests/core.py
index f39daed8f6..4927c623d3 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1868,8 +1868,69 @@ def test_query(self):
         self.assertIn("TEST", response.data.decode('utf-8'))
 
     def test_health(self):
-        response = self.app.get('/health')
-        self.assertIn('The server is healthy!', response.data.decode('utf-8'))
+        BJ = jobs.BaseJob
+        session = Session()
+
+        # case-1: healthy scheduler status
+        last_scheduler_heartbeat_for_testing_1 = timezone.utcnow()
+        session.add(BJ(job_type='SchedulerJob',
+                       state='running',
+                       latest_heartbeat=last_scheduler_heartbeat_for_testing_1))
+        session.commit()
+
+        response_json = json.loads(self.app.get('/health').data.decode('utf-8'))
+
+        self.assertEqual('healthy', response_json['metadatabase']['status'])
+        self.assertEqual('healthy', response_json['scheduler']['status'])
+        self.assertEqual(str(last_scheduler_heartbeat_for_testing_1),
+                         response_json['scheduler']['latest_scheduler_heartbeat'])
+
+        session.query(BJ).\
+            filter(BJ.job_type == 'SchedulerJob',
+                   BJ.state == 'running',
+                   BJ.latest_heartbeat == last_scheduler_heartbeat_for_testing_1).\
+            delete()
+        session.commit()
+
+        # case-2: unhealthy scheduler status - scenario 1 (SchedulerJob is running too slowly)
+        last_scheduler_heartbeat_for_testing_2 = timezone.utcnow() - timedelta(minutes=1)
+        (session.query(BJ)
+                .filter(BJ.job_type == 'SchedulerJob')
+                .update({'latest_heartbeat': last_scheduler_heartbeat_for_testing_2 - timedelta(seconds=1)}))
+        session.add(BJ(job_type='SchedulerJob',
+                       state='running',
+                       latest_heartbeat=last_scheduler_heartbeat_for_testing_2))
+        session.commit()
+
+        response_json = json.loads(self.app.get('/health').data.decode('utf-8'))
+
+        self.assertEqual('healthy', response_json['metadatabase']['status'])
+        self.assertEqual('unhealthy', response_json['scheduler']['status'])
+        self.assertEqual(str(last_scheduler_heartbeat_for_testing_2),
+                         response_json['scheduler']['latest_scheduler_heartbeat'])
+
+        session.query(BJ).\
+            filter(BJ.job_type == 'SchedulerJob',
+                   BJ.state == 'running',
+                   BJ.latest_heartbeat == last_scheduler_heartbeat_for_testing_1).\
+            delete()
+        session.commit()
+
+        # case-3: unhealthy scheduler status - scenario 2 (no running SchedulerJob)
+        session.query(BJ).\
+            filter(BJ.job_type == 'SchedulerJob',
+                   BJ.state == 'running').\
+            delete()
+        session.commit()
+
+        response_json = json.loads(self.app.get('/health').data.decode('utf-8'))
+
+        self.assertEqual('healthy', response_json['metadatabase']['status'])
+        self.assertEqual('unhealthy', response_json['scheduler']['status'])
+        self.assertEqual('None',
+                         response_json['scheduler']['latest_scheduler_heartbeat'])
+
+        session.close()
 
     def test_noaccess(self):
         response = self.app.get('/admin/airflow/noaccess')
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index df9b5c1da2..14f1d431f9 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -29,6 +29,7 @@
 import unittest
 import urllib
 
+from datetime import timedelta
 from flask._compat import PY2
 from urllib.parse import quote_plus
 from werkzeug.test import Client
@@ -36,6 +37,7 @@
 from airflow import configuration as conf
 from airflow import models, settings
 from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.jobs import BaseJob
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.models.connection import Connection
 from airflow.operators.dummy_operator import DummyOperator
@@ -322,8 +324,66 @@ def test_index(self):
         self.check_content_in_response('DAGs', resp)
 
     def test_health(self):
-        resp = self.client.get('health', follow_redirects=True)
-        self.check_content_in_response('The server is healthy!', resp)
+
+        # case-1: healthy scheduler status
+        last_scheduler_heartbeat_for_testing_1 = timezone.utcnow()
+        self.session.add(BaseJob(job_type='SchedulerJob',
+                                 state='running',
+                                 latest_heartbeat=last_scheduler_heartbeat_for_testing_1))
+        self.session.commit()
+
+        resp_json = json.loads(self.client.get('health', follow_redirects=True).data.decode('utf-8'))
+
+        self.assertEqual('healthy', resp_json['metadatabase']['status'])
+        self.assertEqual('healthy', resp_json['scheduler']['status'])
+        self.assertEqual(str(last_scheduler_heartbeat_for_testing_1),
+                         resp_json['scheduler']['latest_scheduler_heartbeat'])
+
+        self.session.query(BaseJob).\
+            filter(BaseJob.job_type == 'SchedulerJob',
+                   BaseJob.state == 'running',
+                   BaseJob.latest_heartbeat == last_scheduler_heartbeat_for_testing_1).\
+            delete()
+        self.session.commit()
+
+        # case-2: unhealthy scheduler status - scenario 1 (SchedulerJob is running too slowly)
+        last_scheduler_heartbeat_for_testing_2 = timezone.utcnow() - timedelta(minutes=1)
+        (self.session
+             .query(BaseJob)
+             .filter(BaseJob.job_type == 'SchedulerJob')
+             .update({'latest_heartbeat': last_scheduler_heartbeat_for_testing_2 - timedelta(seconds=1)}))
+        self.session.add(BaseJob(job_type='SchedulerJob',
+                                 state='running',
+                                 latest_heartbeat=last_scheduler_heartbeat_for_testing_2))
+        self.session.commit()
+
+        resp_json = json.loads(self.client.get('health', follow_redirects=True).data.decode('utf-8'))
+
+        self.assertEqual('healthy', resp_json['metadatabase']['status'])
+        self.assertEqual('unhealthy', resp_json['scheduler']['status'])
+        self.assertEqual(str(last_scheduler_heartbeat_for_testing_2),
+                         resp_json['scheduler']['latest_scheduler_heartbeat'])
+
+        self.session.query(BaseJob).\
+            filter(BaseJob.job_type == 'SchedulerJob',
+                   BaseJob.state == 'running',
+                   BaseJob.latest_heartbeat == last_scheduler_heartbeat_for_testing_2).\
+            delete()
+        self.session.commit()
+
+        # case-3: unhealthy scheduler status - scenario 2 (no running SchedulerJob)
+        self.session.query(BaseJob).\
+            filter(BaseJob.job_type == 'SchedulerJob',
+                   BaseJob.state == 'running').\
+            delete()
+        self.session.commit()
+
+        resp_json = json.loads(self.client.get('health', follow_redirects=True).data.decode('utf-8'))
+
+        self.assertEqual('healthy', resp_json['metadatabase']['status'])
+        self.assertEqual('unhealthy', resp_json['scheduler']['status'])
+        self.assertEqual('None',
+                         resp_json['scheduler']['latest_scheduler_heartbeat'])
 
     def test_home(self):
         resp = self.client.get('home', follow_redirects=True)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services