You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/07 11:27:42 UTC
[airflow] branch master updated: Add airflow connections export
command (#9856) (#10081)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new d2540e6 Add airflow connections export command (#9856) (#10081)
d2540e6 is described below
commit d2540e65923d1a95583687631fee60efa2d0a988
Author: Shekhar Singh <sh...@gmail.com>
AuthorDate: Fri Aug 7 16:57:11 2020 +0530
Add airflow connections export command (#9856) (#10081)
---
airflow/cli/cli_parser.py | 25 +++
airflow/cli/commands/connection_command.py | 67 ++++++
docs/howto/connection/index.rst | 87 ++++++++
tests/cli/commands/test_connection_command.py | 297 ++++++++++++++++++++++++++
4 files changed, 476 insertions(+)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 5c043cc..4ddc27b 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -629,6 +629,15 @@ ARG_INCLUDE_SECRETS = Arg(
),
action="store_true",
default=False)
+ARG_CONN_EXPORT = Arg(
+ ('file',),
+ help='Output file path for exporting the connections',
+ type=argparse.FileType('w', encoding='UTF-8'))
+ARG_CONN_EXPORT_FORMAT = Arg(
+ ('--format',),
+ help='Format of the connections data in file',
+ type=str,
+ choices=['json', 'yaml', 'env'])
# users
ARG_USERNAME = Arg(
('-u', '--username'),
@@ -1107,6 +1116,22 @@ CONNECTIONS_COMMANDS = (
func=lazy_load_command('airflow.cli.commands.connection_command.connections_delete'),
args=(ARG_CONN_ID,),
),
+ ActionCommand(
+ name='export',
+ help='Export all connections',
+ description=("All connections can be exported in STDOUT using the following command:\n"
+ "airflow connections export -\n"
+ "The file format can be determined by the provided file extension. eg, The following "
+ "command will export the connections in JSON format:\n"
+ "airflow connections export /tmp/connections.json\n"
+ "The --format parameter can be used to mention the connections format. eg, "
+ "the default format is JSON in STDOUT mode, which can be overridden using: \n"
+ "airflow connections export - --format yaml\n"
+ "The --format parameter can also be used for the files, for example:\n"
+ "airflow connections export /tmp/connections --format json\n"),
+ func=lazy_load_command('airflow.cli.commands.connection_command.connections_export'),
+ args=(ARG_CONN_EXPORT, ARG_CONN_EXPORT_FORMAT,),
+ ),
)
USERS_COMMANDS = (
ActionCommand(
diff --git a/airflow/cli/commands/connection_command.py b/airflow/cli/commands/connection_command.py
index cc23849..39dec97 100644
--- a/airflow/cli/commands/connection_command.py
+++ b/airflow/cli/commands/connection_command.py
@@ -15,10 +15,14 @@
# specific language governing permissions and limitations
# under the License.
"""Connection sub-commands"""
+import io
+import json
+import os
import sys
from typing import List
from urllib.parse import urlunparse
+import yaml
from sqlalchemy.orm import exc
from tabulate import tabulate
@@ -67,6 +71,69 @@ def connections_list(args):
print(msg)
+def _format_connections(conns: List[Connection], fmt: str) -> str:
+ if fmt == '.env':
+ connections_env = ""
+ for conn in conns:
+ connections_env += f"{conn.conn_id}={conn.get_uri()}\n"
+ return connections_env
+
+ connections_dict = {}
+ for conn in conns:
+ connections_dict[conn.conn_id] = {
+ 'conn_type': conn.conn_type,
+ 'host': conn.host,
+ 'login': conn.login,
+ 'password': conn.password,
+ 'schema': conn.schema,
+ 'port': conn.port,
+ 'extra': conn.extra,
+ }
+
+ if fmt == '.yaml':
+ return yaml.dump(connections_dict)
+
+ if fmt == '.json':
+ return json.dumps(connections_dict)
+
+ return json.dumps(connections_dict)
+
+
+def _is_stdout(fileio: io.TextIOWrapper) -> bool:
+ if fileio.name == '<stdout>':
+ return True
+ return False
+
+
+def connections_export(args):
+ """Exports all connections to a file"""
+ allowed_formats = ['.yaml', '.json', '.env']
+ provided_format = None if args.format is None else f".{args.format.lower()}"
+ default_format = provided_format or '.json'
+
+ with create_session() as session:
+ if _is_stdout(args.file):
+ filetype = default_format
+ elif provided_format is not None:
+ filetype = provided_format
+ else:
+ _, filetype = os.path.splitext(args.file.name)
+ filetype = filetype.lower()
+ if filetype not in allowed_formats:
+ msg = f"Unsupported file format. " \
+ f"The file must have the extension {', '.join(allowed_formats)}"
+ raise SystemExit(msg)
+
+ connections = session.query(Connection).all()
+ msg = _format_connections(connections, filetype)
+ args.file.write(msg)
+
+ if _is_stdout(args.file):
+ print("Connections successfully exported.", file=sys.stderr)
+ else:
+ print(f"Connections successfully exported to {args.file.name}")
+
+
alternative_conn_specs = ['conn_type', 'conn_host',
'conn_login', 'conn_password', 'conn_schema', 'conn_port']
diff --git a/docs/howto/connection/index.rst b/docs/howto/connection/index.rst
index 0254d59..c2d9509 100644
--- a/docs/howto/connection/index.rst
+++ b/docs/howto/connection/index.rst
@@ -88,6 +88,93 @@ Alternatively you may specify each parameter individually:
--conn-schema 'schema' \
...
+.. _connection/export:
+
+Exporting Connections from the CLI
+----------------------------------
+
+You may export connections from the database using the CLI. The supported formats are ``json``, ``yaml`` and ``env``.
+
+You may mention the target file as the parameter:
+
+.. code-block:: bash
+
+ airflow connections export connections.json
+
+Alternatively you may specify ``format`` parameter for overriding the format:
+
+.. code-block:: bash
+
+ airflow connections export /tmp/connections --format yaml
+
+You may also specify ``-`` for STDOUT:
+
+.. code-block:: bash
+
+ airflow connections export -
+
+The JSON format contains an object where the key contains the connection ID and the value contains the definition of the connection. In this format, the connection is defined as a JSON object. The following is a sample JSON file.
+
+.. code-block:: json
+
+ {
+ "CONN_A": {
+ "conn_type": "mysql",
+ "host": "mysql",
+ "login": "root",
+ "password": "plainpassword",
+ "schema": "airflow",
+ "port": null,
+ "extra": null,
+ "is_encrypted": false,
+ "is_extra_encrypted": false
+ },
+ "CONN_B": {
+ "conn_type": "druid",
+ "host": "druid-broker",
+ "login": null,
+ "password": null,
+ "schema": null,
+ "port": 8082,
+ "extra": "{\"endpoint\": \"druid/v2/sql\"}",
+ "is_encrypted": false,
+ "is_extra_encrypted": false
+ }
+ }
+
+The YAML file structure is similar to that of a JSON. The key-value pair of connection ID and the definitions of one or more connections. In this format, the connection is defined as a YAML object. The following is a sample YAML file.
+
+.. code-block:: yaml
+
+ CONN_A:
+ conn_type: mysql
+ extra:
+ host: mysql
+ is_encrypted: false
+ is_extra_encrypted: false
+ login: root
+ password: plainpassword
+ port:
+ schema: airflow
+
+ CONN_B:
+ conn_type: druid
+ extra: '{"endpoint": "druid/v2/sql"}'
+ host: druid-broker
+ is_encrypted: false
+ is_extra_encrypted: false
+ login:
+ password:
+ port: 8082
+ schema:
+
+You may also export connections in ``.env`` format. The key is the connection ID, and the value describes the connection using the URI. The following is a sample ENV file.
+
+.. code-block:: text
+
+ CONN_A=mysql://root:plainpassword@mysql/airflow
+ CONN_B=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql
+
.. _environment_variables_secrets_backend:
Storing a Connection in Environment Variables
diff --git a/tests/cli/commands/test_connection_command.py b/tests/cli/commands/test_connection_command.py
index 75df6bd..c6dbac8 100644
--- a/tests/cli/commands/test_connection_command.py
+++ b/tests/cli/commands/test_connection_command.py
@@ -16,6 +16,7 @@
# under the License.
import io
+import json
import unittest
from contextlib import redirect_stdout
from unittest import mock
@@ -111,6 +112,302 @@ class TestCliListConnections(unittest.TestCase):
connection_command.connections_list(args)
+class TestCliExportConnections(unittest.TestCase):
+ @provide_session
+ def setUp(self, session=None):
+ clear_db_connections(add_default_connections_back=False)
+ merge_conn(
+ Connection(
+ conn_id="airflow_db",
+ conn_type="mysql",
+ host="mysql",
+ login="root",
+ password="plainpassword",
+ schema="airflow",
+ ),
+ session
+ )
+ merge_conn(
+ Connection(
+ conn_id="druid_broker_default",
+ conn_type="druid",
+ host="druid-broker",
+ port=8082,
+ extra='{"endpoint": "druid/v2/sql"}',
+ ),
+ session
+ )
+
+ self.parser = cli_parser.get_parser()
+
+ def tearDown(self):
+ clear_db_connections()
+
+ def test_cli_connections_export_should_return_error_for_invalid_command(self):
+ with self.assertRaises(SystemExit):
+ self.parser.parse_args([
+ "connections",
+ "export",
+ ])
+
+ def test_cli_connections_export_should_return_error_for_invalid_format(self):
+ with self.assertRaises(SystemExit):
+ self.parser.parse_args([
+ "connections",
+ "export",
+ "--format",
+ "invalid",
+ "/path/to/file"
+ ])
+
+ @mock.patch('os.path.splitext')
+ @mock.patch('builtins.open', new_callable=mock.mock_open())
+ def test_cli_connections_export_should_return_error_for_invalid_export_format(self,
+ mock_file_open,
+ mock_splittext):
+ output_filepath = '/tmp/connections.invalid'
+ mock_splittext.return_value = (None, '.invalid')
+
+ args = self.parser.parse_args([
+ "connections",
+ "export",
+ output_filepath,
+ ])
+ with self.assertRaisesRegex(
+ SystemExit, r"Unsupported file format. The file must have the extension .yaml, .json, .env"
+ ):
+ connection_command.connections_export(args)
+
+ mock_splittext.assert_called_once()
+ mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+ mock_file_open.return_value.write.assert_not_called()
+
+ @mock.patch('os.path.splitext')
+ @mock.patch('builtins.open', new_callable=mock.mock_open())
+ @mock.patch.object(connection_command, 'create_session')
+ def test_cli_connections_export_should_return_error_if_create_session_fails(self, mock_session,
+ mock_file_open,
+ mock_splittext):
+ output_filepath = '/tmp/connections.json'
+
+ def my_side_effect():
+ raise Exception("dummy exception")
+ mock_session.side_effect = my_side_effect
+ mock_splittext.return_value = (None, '.json')
+
+ args = self.parser.parse_args([
+ "connections",
+ "export",
+ output_filepath,
+ ])
+ with self.assertRaisesRegex(Exception, r"dummy exception"):
+ connection_command.connections_export(args)
+
+ mock_splittext.assert_not_called()
+ mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+ mock_file_open.return_value.write.assert_not_called()
+
+ @mock.patch('os.path.splitext')
+ @mock.patch('builtins.open', new_callable=mock.mock_open())
+ @mock.patch.object(connection_command, 'create_session')
+ def test_cli_connections_export_should_return_error_if_fetching_connections_fails(self, mock_session,
+ mock_file_open,
+ mock_splittext):
+ output_filepath = '/tmp/connections.json'
+
+ def my_side_effect():
+ raise Exception("dummy exception")
+ mock_session.return_value.__enter__.return_value.query.return_value.all.side_effect = my_side_effect
+ mock_splittext.return_value = (None, '.json')
+
+ args = self.parser.parse_args([
+ "connections",
+ "export",
+ output_filepath,
+ ])
+ with self.assertRaisesRegex(Exception, r"dummy exception"):
+ connection_command.connections_export(args)
+
+ mock_splittext.assert_called_once()
+ mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+ mock_file_open.return_value.write.assert_not_called()
+
+ @mock.patch('os.path.splitext')
+ @mock.patch('builtins.open', new_callable=mock.mock_open())
+ @mock.patch.object(connection_command, 'create_session')
+ def test_cli_connections_export_should_not_return_error_if_connections_is_empty(self, mock_session,
+ mock_file_open,
+ mock_splittext):
+ output_filepath = '/tmp/connections.json'
+
+ mock_session.return_value.__enter__.return_value.query.return_value.all.return_value = []
+ mock_splittext.return_value = (None, '.json')
+
+ args = self.parser.parse_args([
+ "connections",
+ "export",
+ output_filepath,
+ ])
+ connection_command.connections_export(args)
+
+ mock_splittext.assert_called_once()
+ mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+ mock_file_open.return_value.write.assert_called_once_with('{}')
+
+ @mock.patch('os.path.splitext')
+ @mock.patch('builtins.open', new_callable=mock.mock_open())
+ def test_cli_connections_export_should_export_as_json(self, mock_file_open, mock_splittext):
+ output_filepath = '/tmp/connections.json'
+ mock_splittext.return_value = (None, '.json')
+
+ args = self.parser.parse_args([
+ "connections",
+ "export",
+ output_filepath,
+ ])
+ connection_command.connections_export(args)
+
+ expected_connections = json.dumps({
+ "airflow_db": {
+ "conn_type": "mysql",
+ "host": "mysql",
+ "login": "root",
+ "password": "plainpassword",
+ "schema": "airflow",
+ "port": None,
+ "extra": None,
+ },
+ "druid_broker_default": {
+ "conn_type": "druid",
+ "host": "druid-broker",
+ "login": None,
+ "password": None,
+ "schema": None,
+ "port": 8082,
+ "extra": "{\"endpoint\": \"druid/v2/sql\"}",
+ }
+ })
+
+ mock_splittext.assert_called_once()
+ mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+ mock_file_open.return_value.write.assert_called_once_with(expected_connections)
+
+ @mock.patch('os.path.splitext')
+ @mock.patch('builtins.open', new_callable=mock.mock_open())
+ def test_cli_connections_export_should_export_as_yaml(self, mock_file_open, mock_splittext):
+ output_filepath = '/tmp/connections.yaml'
+ mock_splittext.return_value = (None, '.yaml')
+
+ args = self.parser.parse_args([
+ "connections",
+ "export",
+ output_filepath,
+ ])
+ connection_command.connections_export(args)
+
+ expected_connections = ("airflow_db:\n"
+ " conn_type: mysql\n"
+ " extra: null\n"
+ " host: mysql\n"
+ " login: root\n"
+ " password: plainpassword\n"
+ " port: null\n"
+ " schema: airflow\n"
+ "druid_broker_default:\n"
+ " conn_type: druid\n"
+ " extra: \'{\"endpoint\": \"druid/v2/sql\"}\'\n"
+ " host: druid-broker\n"
+ " login: null\n"
+ " password: null\n"
+ " port: 8082\n"
+ " schema: null\n")
+ mock_splittext.assert_called_once()
+ mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+ mock_file_open.return_value.write.assert_called_once_with(expected_connections)
+
+ @mock.patch('os.path.splitext')
+ @mock.patch('builtins.open', new_callable=mock.mock_open())
+ def test_cli_connections_export_should_export_as_env(self, mock_file_open, mock_splittext):
+ output_filepath = '/tmp/connections.env'
+ mock_splittext.return_value = (None, '.env')
+
+ args = self.parser.parse_args([
+ "connections",
+ "export",
+ output_filepath,
+ ])
+ connection_command.connections_export(args)
+
+ expected_connections = (
+ "airflow_db=mysql://root:plainpassword@mysql/airflow\n"
+ "druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql\n")
+
+ mock_splittext.assert_called_once()
+ mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+ mock_file_open.return_value.write.assert_called_once_with(expected_connections)
+
+ @mock.patch('os.path.splitext')
+ @mock.patch('builtins.open', new_callable=mock.mock_open())
+ def test_cli_connections_export_should_export_as_env_for_uppercase_file_extension(self, mock_file_open,
+ mock_splittext):
+ output_filepath = '/tmp/connections.ENV'
+ mock_splittext.return_value = (None, '.ENV')
+
+ args = self.parser.parse_args([
+ "connections",
+ "export",
+ output_filepath,
+ ])
+ connection_command.connections_export(args)
+
+ expected_connections = (
+ "airflow_db=mysql://root:plainpassword@mysql/airflow\n"
+ "druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql\n")
+
+ mock_splittext.assert_called_once()
+ mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+ mock_file_open.return_value.write.assert_called_once_with(expected_connections)
+
+ @mock.patch('os.path.splitext')
+ @mock.patch('builtins.open', new_callable=mock.mock_open())
+ def test_cli_connections_export_should_force_export_as_specified_format(self, mock_file_open,
+ mock_splittext):
+ output_filepath = '/tmp/connections.yaml'
+
+ args = self.parser.parse_args([
+ "connections",
+ "export",
+ output_filepath,
+ "--format",
+ "json",
+ ])
+ connection_command.connections_export(args)
+
+ expected_connections = json.dumps({
+ "airflow_db": {
+ "conn_type": "mysql",
+ "host": "mysql",
+ "login": "root",
+ "password": "plainpassword",
+ "schema": "airflow",
+ "port": None,
+ "extra": None,
+ },
+ "druid_broker_default": {
+ "conn_type": "druid",
+ "host": "druid-broker",
+ "login": None,
+ "password": None,
+ "schema": None,
+ "port": 8082,
+ "extra": "{\"endpoint\": \"druid/v2/sql\"}",
+ }
+ })
+ mock_splittext.assert_not_called()
+ mock_file_open.assert_called_once_with(output_filepath, 'w', -1, 'UTF-8', None)
+ mock_file_open.return_value.write.assert_called_once_with(expected_connections)
+
+
TEST_URL = "postgresql://airflow:airflow@host:5432/airflow"