You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/07 11:27:42 UTC

[airflow] branch master updated: Add airflow connections export command (#9856) (#10081)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new d2540e6  Add airflow connections export command (#9856) (#10081)
d2540e6 is described below

commit d2540e65923d1a95583687631fee60efa2d0a988
Author: Shekhar Singh <sh...@gmail.com>
AuthorDate: Fri Aug 7 16:57:11 2020 +0530

    Add airflow connections export command (#9856) (#10081)
---
 airflow/cli/cli_parser.py                     |  25 +++
 airflow/cli/commands/connection_command.py    |  67 ++++++
 docs/howto/connection/index.rst               |  87 ++++++++
 tests/cli/commands/test_connection_command.py | 297 ++++++++++++++++++++++++++
 4 files changed, 476 insertions(+)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 5c043cc..4ddc27b 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -629,6 +629,15 @@ ARG_INCLUDE_SECRETS = Arg(
     ),
     action="store_true",
     default=False)
+ARG_CONN_EXPORT = Arg(
+    ('file',),
+    help='Output file path for exporting the connections',
+    type=argparse.FileType('w', encoding='UTF-8'))
+ARG_CONN_EXPORT_FORMAT = Arg(
+    ('--format',),
+    help='Format of the connections data in file',
+    type=str,
+    choices=['json', 'yaml', 'env'])
 # users
 ARG_USERNAME = Arg(
     ('-u', '--username'),
@@ -1107,6 +1116,22 @@ CONNECTIONS_COMMANDS = (
         func=lazy_load_command('airflow.cli.commands.connection_command.connections_delete'),
         args=(ARG_CONN_ID,),
     ),
+    ActionCommand(
+        name='export',
+        help='Export all connections',
+        description=("All connections can be exported in STDOUT using the following command:\n"
+                     "airflow connections export -\n"
+                     "The file format can be determined by the provided file extension. eg, The following "
+                     "command will export the connections in JSON format:\n"
+                     "airflow connections export /tmp/connections.json\n"
+                     "The --format parameter can be used to mention the connections format. eg, "
+                     "the default format is JSON in STDOUT mode, which can be overridden using: \n"
+                     "airflow connections export - --format yaml\n"
+                     "The --format parameter can also be used for the files, for example:\n"
+                     "airflow connections export /tmp/connections --format json\n"),
+        func=lazy_load_command('airflow.cli.commands.connection_command.connections_export'),
+        args=(ARG_CONN_EXPORT, ARG_CONN_EXPORT_FORMAT,),
+    ),
 )
 USERS_COMMANDS = (
     ActionCommand(
diff --git a/airflow/cli/commands/connection_command.py b/airflow/cli/commands/connection_command.py
index cc23849..39dec97 100644
--- a/airflow/cli/commands/connection_command.py
+++ b/airflow/cli/commands/connection_command.py
@@ -15,10 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 """Connection sub-commands"""
+import io
+import json
+import os
 import sys
 from typing import List
 from urllib.parse import urlunparse
 
+import yaml
 from sqlalchemy.orm import exc
 from tabulate import tabulate
 
@@ -67,6 +71,69 @@ def connections_list(args):
         print(msg)
 
 
+def _format_connections(conns: List[Connection], fmt: str) -> str:
+    if fmt == '.env':
+        connections_env = ""
+        for conn in conns:
+            connections_env += f"{conn.conn_id}={conn.get_uri()}\n"
+        return connections_env
+
+    connections_dict = {}
+    for conn in conns:
+        connections_dict[conn.conn_id] = {
+            'conn_type': conn.conn_type,
+            'host': conn.host,
+            'login': conn.login,
+            'password': conn.password,
+            'schema': conn.schema,
+            'port': conn.port,
+            'extra': conn.extra,
+        }
+
+    if fmt == '.yaml':
+        return yaml.dump(connections_dict)
+
+    if fmt == '.json':
+        return json.dumps(connections_dict)
+
+    return json.dumps(connections_dict)
+
+
+def _is_stdout(fileio: io.TextIOWrapper) -> bool:
+    if fileio.name == '<stdout>':
+        return True
+    return False
+
+
+def connections_export(args):
+    """Exports all connections to a file"""
+    allowed_formats = ['.yaml', '.json', '.env']
+    provided_format = None if args.format is None else f".{args.format.lower()}"
+    default_format = provided_format or '.json'
+
+    with create_session() as session:
+        if _is_stdout(args.file):
+            filetype = default_format
+        elif provided_format is not None:
+            filetype = provided_format
+        else:
+            _, filetype = os.path.splitext(args.file.name)
+            filetype = filetype.lower()
+            if filetype not in allowed_formats:
+                msg = f"Unsupported file format. " \
+                      f"The file must have the extension {', '.join(allowed_formats)}"
+                raise SystemExit(msg)
+
+        connections = session.query(Connection).all()
+        msg = _format_connections(connections, filetype)
+        args.file.write(msg)
+
+        if _is_stdout(args.file):
+            print("Connections successfully exported.", file=sys.stderr)
+        else:
+            print(f"Connections successfully exported to {args.file.name}")
+
+
 alternative_conn_specs = ['conn_type', 'conn_host',
                           'conn_login', 'conn_password', 'conn_schema', 'conn_port']
 
diff --git a/docs/howto/connection/index.rst b/docs/howto/connection/index.rst
index 0254d59..c2d9509 100644
--- a/docs/howto/connection/index.rst
+++ b/docs/howto/connection/index.rst
@@ -88,6 +88,93 @@ Alternatively you may specify each parameter individually:
         --conn-schema 'schema' \
         ...
 
+.. _connection/export:
+
+Exporting Connections from the CLI
+----------------------------------
+
+You may export connections from the database using the CLI. The supported formats are ``json``, ``yaml`` and ``env``.
+
+You may mention the target file as the parameter:
+
+.. code-block:: bash
+
+    airflow connections export connections.json
+
+Alternatively you may specify ``format`` parameter for overriding the format:
+
+.. code-block:: bash
+
+    airflow connections export /tmp/connections --format yaml
+
+You may also specify ``-`` for STDOUT:
+
+.. code-block:: bash
+
+    airflow connections export -
+
+The JSON format contains an object where the key contains the connection ID and the value contains the definition of the connection. In this format, the connection is defined as a JSON object. The following is a sample JSON file.
+
+.. code-block:: json
+
+    {
+        "CONN_A": {
+            "conn_type": "mysql",
+            "host": "mysql",
+            "login": "root",
+            "password": "plainpassword",
+            "schema": "airflow",
+            "port": null,
+            "extra": null,
+            "is_encrypted": false,
+            "is_extra_encrypted": false
+        },
+        "CONN_B": {
+            "conn_type": "druid",
+            "host": "druid-broker",
+            "login": null,
+            "password": null,
+            "schema": null,
+            "port": 8082,
+            "extra": "{\"endpoint\": \"druid/v2/sql\"}",
+            "is_encrypted": false,
+            "is_extra_encrypted": false
+        }
+    }
+
+The YAML file structure is similar to that of a JSON. The key-value pair of connection ID and the definitions of one or more connections. In this format, the connection is defined as a YAML object. The following is a sample YAML file.
+
+.. code-block:: yaml
+
+    CONN_A:
+      conn_type: mysql
+      extra:
+      host: mysql
+      is_encrypted: false
+      is_extra_encrypted: false
+      login: root
+      password: plainpassword
+      port:
+      schema: airflow
+
+    CONN_B:
+      conn_type: druid
+      extra: '{"endpoint": "druid/v2/sql"}'
+      host: druid-broker
+      is_encrypted: false
+      is_extra_encrypted: false
+      login:
+      password:
+      port: 8082
+      schema:
+
+You may also export connections in ``.env`` format. The key is the connection ID, and the value describes the connection using the URI. The following is a sample ENV file.
+
+.. code-block:: text
+
+    CONN_A=mysql://root:plainpassword@mysql/airflow
+    CONN_B=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql
+
 .. _environment_variables_secrets_backend:
 
 Storing a Connection in Environment Variables
diff --git a/tests/cli/commands/test_connection_command.py b/tests/cli/commands/test_connection_command.py
index 75df6bd..c6dbac8 100644
--- a/tests/cli/commands/test_connection_command.py
+++ b/tests/cli/commands/test_connection_command.py
@@ -16,6 +16,7 @@
 # under the License.
 
 import io
+import json
 import unittest
 from contextlib import redirect_stdout
 from unittest import mock
@@ -111,6 +112,302 @@ class TestCliListConnections(unittest.TestCase):
             connection_command.connections_list(args)
 
 
+class TestCliExportConnections(unittest.TestCase):
+    @provide_session
+    def setUp(self, session=None):
+        clear_db_connections(add_default_connections_back=False)
+        merge_conn(
+            Connection(
+                conn_id="airflow_db",
+                conn_type="mysql",
+                host="mysql",
+                login="root",
+                password="plainpassword",
+                schema="airflow",
+            ),
+            session
+        )
+        merge_conn(
+            Connection(
+                conn_id="druid_broker_default",
+                conn_type="druid",
+                host="druid-broker",
+                port=8082,
+                extra='{"endpoint": "druid/v2/sql"}',
+            ),
+            session
+        )
+
+        self.parser = cli_parser.get_parser()
+
+    def tearDown(self):
+        clear_db_connections()
+
+    def test_cli_connections_export_should_return_error_for_invalid_command(self):
+        with self.assertRaises(SystemExit):
+            self.parser.parse_args([
+                "connections",
+                "export",
+            ])
+
+    def test_cli_connections_export_should_return_error_for_invalid_format(self):
+        with self.assertRaises(SystemExit):
+            self.parser.parse_args([
+                "connections",
+                "export",
+                "--format",
+                "invalid",
+                "/path/to/file"
+            ])
+
+    @mock.patch('os.path.splitext')
+    @mock.patch('builtins.open', new_callable=mock.mock_open())
+    def test_cli_connections_export_should_return_error_for_invalid_export_format(self,
+                                                                                  mock_file_open,
+                                                                                  mock_splittext):
+        output_filepath = '/tmp/connections.invalid'
+        mock_splittext.return_value = (None, '.invalid')
+
+        args = self.parser.parse_args([
+            "connections",
+            "export",
+            output_filepath,
+        ])
+        with self.assertRaisesRegex(
+            SystemExit, r"Unsupported file format. The file must have the extension .yaml, .json, .env"
+        ):
+            connection_command.connections_export(args)
+
+        mock_splittext.assert_called_once()
+        mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+        mock_file_open.return_value.write.assert_not_called()
+
+    @mock.patch('os.path.splitext')
+    @mock.patch('builtins.open', new_callable=mock.mock_open())
+    @mock.patch.object(connection_command, 'create_session')
+    def test_cli_connections_export_should_return_error_if_create_session_fails(self, mock_session,
+                                                                                mock_file_open,
+                                                                                mock_splittext):
+        output_filepath = '/tmp/connections.json'
+
+        def my_side_effect():
+            raise Exception("dummy exception")
+        mock_session.side_effect = my_side_effect
+        mock_splittext.return_value = (None, '.json')
+
+        args = self.parser.parse_args([
+            "connections",
+            "export",
+            output_filepath,
+        ])
+        with self.assertRaisesRegex(Exception, r"dummy exception"):
+            connection_command.connections_export(args)
+
+        mock_splittext.assert_not_called()
+        mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+        mock_file_open.return_value.write.assert_not_called()
+
+    @mock.patch('os.path.splitext')
+    @mock.patch('builtins.open', new_callable=mock.mock_open())
+    @mock.patch.object(connection_command, 'create_session')
+    def test_cli_connections_export_should_return_error_if_fetching_connections_fails(self, mock_session,
+                                                                                      mock_file_open,
+                                                                                      mock_splittext):
+        output_filepath = '/tmp/connections.json'
+
+        def my_side_effect():
+            raise Exception("dummy exception")
+        mock_session.return_value.__enter__.return_value.query.return_value.all.side_effect = my_side_effect
+        mock_splittext.return_value = (None, '.json')
+
+        args = self.parser.parse_args([
+            "connections",
+            "export",
+            output_filepath,
+        ])
+        with self.assertRaisesRegex(Exception, r"dummy exception"):
+            connection_command.connections_export(args)
+
+        mock_splittext.assert_called_once()
+        mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+        mock_file_open.return_value.write.assert_not_called()
+
+    @mock.patch('os.path.splitext')
+    @mock.patch('builtins.open', new_callable=mock.mock_open())
+    @mock.patch.object(connection_command, 'create_session')
+    def test_cli_connections_export_should_not_return_error_if_connections_is_empty(self, mock_session,
+                                                                                    mock_file_open,
+                                                                                    mock_splittext):
+        output_filepath = '/tmp/connections.json'
+
+        mock_session.return_value.__enter__.return_value.query.return_value.all.return_value = []
+        mock_splittext.return_value = (None, '.json')
+
+        args = self.parser.parse_args([
+            "connections",
+            "export",
+            output_filepath,
+        ])
+        connection_command.connections_export(args)
+
+        mock_splittext.assert_called_once()
+        mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+        mock_file_open.return_value.write.assert_called_once_with('{}')
+
+    @mock.patch('os.path.splitext')
+    @mock.patch('builtins.open', new_callable=mock.mock_open())
+    def test_cli_connections_export_should_export_as_json(self, mock_file_open, mock_splittext):
+        output_filepath = '/tmp/connections.json'
+        mock_splittext.return_value = (None, '.json')
+
+        args = self.parser.parse_args([
+            "connections",
+            "export",
+            output_filepath,
+        ])
+        connection_command.connections_export(args)
+
+        expected_connections = json.dumps({
+            "airflow_db": {
+                "conn_type": "mysql",
+                "host": "mysql",
+                "login": "root",
+                "password": "plainpassword",
+                "schema": "airflow",
+                "port": None,
+                "extra": None,
+            },
+            "druid_broker_default": {
+                "conn_type": "druid",
+                "host": "druid-broker",
+                "login": None,
+                "password": None,
+                "schema": None,
+                "port": 8082,
+                "extra": "{\"endpoint\": \"druid/v2/sql\"}",
+            }
+        })
+
+        mock_splittext.assert_called_once()
+        mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+        mock_file_open.return_value.write.assert_called_once_with(expected_connections)
+
+    @mock.patch('os.path.splitext')
+    @mock.patch('builtins.open', new_callable=mock.mock_open())
+    def test_cli_connections_export_should_export_as_yaml(self, mock_file_open, mock_splittext):
+        output_filepath = '/tmp/connections.yaml'
+        mock_splittext.return_value = (None, '.yaml')
+
+        args = self.parser.parse_args([
+            "connections",
+            "export",
+            output_filepath,
+        ])
+        connection_command.connections_export(args)
+
+        expected_connections = ("airflow_db:\n"
+                                "  conn_type: mysql\n"
+                                "  extra: null\n"
+                                "  host: mysql\n"
+                                "  login: root\n"
+                                "  password: plainpassword\n"
+                                "  port: null\n"
+                                "  schema: airflow\n"
+                                "druid_broker_default:\n"
+                                "  conn_type: druid\n"
+                                "  extra: \'{\"endpoint\": \"druid/v2/sql\"}\'\n"
+                                "  host: druid-broker\n"
+                                "  login: null\n"
+                                "  password: null\n"
+                                "  port: 8082\n"
+                                "  schema: null\n")
+        mock_splittext.assert_called_once()
+        mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+        mock_file_open.return_value.write.assert_called_once_with(expected_connections)
+
+    @mock.patch('os.path.splitext')
+    @mock.patch('builtins.open', new_callable=mock.mock_open())
+    def test_cli_connections_export_should_export_as_env(self, mock_file_open, mock_splittext):
+        output_filepath = '/tmp/connections.env'
+        mock_splittext.return_value = (None, '.env')
+
+        args = self.parser.parse_args([
+            "connections",
+            "export",
+            output_filepath,
+        ])
+        connection_command.connections_export(args)
+
+        expected_connections = (
+            "airflow_db=mysql://root:plainpassword@mysql/airflow\n"
+            "druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql\n")
+
+        mock_splittext.assert_called_once()
+        mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+        mock_file_open.return_value.write.assert_called_once_with(expected_connections)
+
+    @mock.patch('os.path.splitext')
+    @mock.patch('builtins.open', new_callable=mock.mock_open())
+    def test_cli_connections_export_should_export_as_env_for_uppercase_file_extension(self, mock_file_open,
+                                                                                      mock_splittext):
+        output_filepath = '/tmp/connections.ENV'
+        mock_splittext.return_value = (None, '.ENV')
+
+        args = self.parser.parse_args([
+            "connections",
+            "export",
+            output_filepath,
+        ])
+        connection_command.connections_export(args)
+
+        expected_connections = (
+            "airflow_db=mysql://root:plainpassword@mysql/airflow\n"
+            "druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql\n")
+
+        mock_splittext.assert_called_once()
+        mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+        mock_file_open.return_value.write.assert_called_once_with(expected_connections)
+
+    @mock.patch('os.path.splitext')
+    @mock.patch('builtins.open', new_callable=mock.mock_open())
+    def test_cli_connections_export_should_force_export_as_specified_format(self, mock_file_open,
+                                                                            mock_splittext):
+        output_filepath = '/tmp/connections.yaml'
+
+        args = self.parser.parse_args([
+            "connections",
+            "export",
+            output_filepath,
+            "--format",
+            "json",
+        ])
+        connection_command.connections_export(args)
+
+        expected_connections = json.dumps({
+            "airflow_db": {
+                "conn_type": "mysql",
+                "host": "mysql",
+                "login": "root",
+                "password": "plainpassword",
+                "schema": "airflow",
+                "port": None,
+                "extra": None,
+            },
+            "druid_broker_default": {
+                "conn_type": "druid",
+                "host": "druid-broker",
+                "login": None,
+                "password": None,
+                "schema": None,
+                "port": 8082,
+                "extra": "{\"endpoint\": \"druid/v2/sql\"}",
+            }
+        })
+        mock_splittext.assert_not_called()
+        mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+        mock_file_open.return_value.write.assert_called_once_with(expected_connections)
+
+
 TEST_URL = "postgresql://airflow:airflow@host:5432/airflow"