You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/10 00:44:20 UTC

[GitHub] r39132 closed pull request #3726: [AIRFLOW-2763] Add check to validate worker connectivity to metadata …

r39132 closed pull request #3726: [AIRFLOW-2763] Add check to validate worker connectivity to metadata …
URL: https://github.com/apache/incubator-airflow/pull/3726
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 45b7903d3e..e22427cf40 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -955,6 +955,11 @@ def worker(args):
     env = os.environ.copy()
     env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME
 
+    if not settings.validate_session():
+        log = LoggingMixin().log
+        log.error("Worker exiting... database connection precheck failed! ")
+        sys.exit(1)
+
     # Celery worker
     from airflow.executors.celery_executor import app as celery_app
     from celery.bin import worker
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 4f1f0df383..9d240b8323 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -173,6 +173,9 @@ killed_task_cleanup_time = 60
 # `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
 dag_run_conf_overrides_params = False
 
+# Worker initialisation check to validate Metadata Database connection
+worker_precheck = False
+
 [cli]
 # In what way should the cli access the API. The LocalClient will use the
 # database directly, while the json_client will use the api running on the
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index 01696c6906..06937452b0 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -51,6 +51,7 @@ enable_xcom_pickling = False
 killed_task_cleanup_time = 5
 secure_mode = False
 hostname_callable = socket:getfqdn
+worker_precheck = False
 
 [cli]
 api_client = airflow.api.client.local_client
diff --git a/airflow/settings.py b/airflow/settings.py
index 7d660ab5e6..098164cdc7 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -28,7 +28,7 @@
 import pendulum
 import socket
 
-from sqlalchemy import create_engine
+from sqlalchemy import create_engine, exc
 from sqlalchemy.orm import scoped_session, sessionmaker
 from sqlalchemy.pool import NullPool
 
@@ -216,6 +216,26 @@ def configure_adapters():
         pass
 
 
+def validate_session():
+    try:
+        worker_precheck = conf.getboolean('core', 'worker_precheck')
+    except conf.AirflowConfigException:
+        worker_precheck = False
+    if not worker_precheck:
+        return True
+    else:
+        check_session = sessionmaker(bind=engine)
+        session = check_session()
+        try:
+            session.execute("select 1")
+            conn_status = True
+        except exc.DBAPIError as err:
+            log.error(err)
+            conn_status = False
+        session.close()
+        return conn_status
+
+
 def configure_action_logging():
     """
     Any additional configuration (register callback) for airflow.utils.action_loggers
diff --git a/tests/cli/test_worker_initialisation.py b/tests/cli/test_worker_initialisation.py
new file mode 100644
index 0000000000..477221693a
--- /dev/null
+++ b/tests/cli/test_worker_initialisation.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+import sqlalchemy
+import airflow
+from argparse import Namespace
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+from mock import patch
+
+patch('airflow.utils.cli.action_logging', lambda x: x).start()
+from airflow.bin import cli # noqa
+mock_args = Namespace(queues=1, concurrency=1)
+
+
+class TestWorkerPrecheck(unittest.TestCase):
+
+    def setUp(self):
+        airflow.configuration.load_test_config()
+
+    @mock.patch('airflow.settings.validate_session')
+    def test_error(self, mock_validate_session):
+        """
+        Test to verify the exit mechanism of airflow-worker cli
+        by mocking validate_session method
+        """
+        mock_validate_session.return_value = False
+        with self.assertRaises(SystemExit) as cm:
+            # airflow.bin.cli.worker(mock_args)
+            cli.worker(mock_args)
+        self.assertEqual(cm.exception.code, 1)
+
+    @mock.patch('airflow.configuration.getboolean')
+    def test_worker_precheck_exception(self, mock_getboolean):
+        """
+        Test to check the behaviour of validate_session method
+        when worker_precheck is absent in airflow configuration
+        """
+        mock_getboolean.side_effect = airflow.configuration.AirflowConfigException
+        self.assertEqual(airflow.settings.validate_session(), True)
+
+    @mock.patch('sqlalchemy.orm.session.Session.execute')
+    @mock.patch('airflow.configuration.getboolean')
+    def test_validate_session_dbapi_exception(self, mock_getboolean, mock_session):
+        """
+        Test to validate connection failure scenario on SELECT 1 query
+        """
+        mock_getboolean.return_value = True
+        mock_session.side_effect = sqlalchemy.exc.OperationalError("m1", "m2", "m3", "m4")
+        self.assertEquals(airflow.settings.validate_session(), False)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services