You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2021/04/14 05:05:38 UTC
[airflow] branch master updated: Add support for arbitrary json in
conn uri format (#15100)
This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new a4c4e61 Add support for arbitrary json in conn uri format (#15100)
a4c4e61 is described below
commit a4c4e616e7684b708f6704a27ba9f7d2c9cb1022
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Apr 13 22:05:12 2021 -0700
Add support for arbitrary json in conn uri format (#15100)
Currently in airflow web UI and the CLI you can store arbitrary (e.g. nested) json in the `extra` field. But the URI format can only handle primitive key-value pairs. This PR provides support for arbitrary json in the URI format.
Co-authored-by: Daniel Standish <ds...@users.noreply.github.com>
Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
---
airflow/models/connection.py | 19 +++++++--
docs/apache-airflow/howto/connection.rst | 68 ++++++++++++++++++++++++--------
tests/models/test_connection.py | 61 ++++++++++++++++++++++++++--
tests/secrets/test_local_filesystem.py | 41 ++++++++++++-------
4 files changed, 151 insertions(+), 38 deletions(-)
diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 67a3b4d..78c7629 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -89,6 +89,8 @@ class Connection(Base, LoggingMixin): # pylint: disable=too-many-instance-attri
:type uri: str
"""
+ EXTRA_KEY = '__extra__'
+
__tablename__ = "connection"
id = Column(Integer(), primary_key=True)
@@ -161,7 +163,11 @@ class Connection(Base, LoggingMixin): # pylint: disable=too-many-instance-attri
self.password = unquote(uri_parts.password) if uri_parts.password else uri_parts.password
self.port = uri_parts.port
if uri_parts.query:
- self.extra = json.dumps(dict(parse_qsl(uri_parts.query, keep_blank_values=True)))
+ query = dict(parse_qsl(uri_parts.query, keep_blank_values=True))
+ if self.EXTRA_KEY in query:
+ self.extra = query[self.EXTRA_KEY]
+ else:
+ self.extra = json.dumps(query)
def get_uri(self) -> str:
"""Return connection in URI format"""
@@ -194,8 +200,15 @@ class Connection(Base, LoggingMixin): # pylint: disable=too-many-instance-attri
uri += host_block
- if self.extra_dejson:
- uri += f'?{urlencode(self.extra_dejson)}'
+ if self.extra:
+ try:
+ query = urlencode(self.extra_dejson)
+ except TypeError:
+ query = None
+ if query and self.extra_dejson == dict(parse_qsl(query, keep_blank_values=True)):
+ uri += '?' + query
+ else:
+ uri += '?' + urlencode({self.EXTRA_KEY: self.extra})
return uri
diff --git a/docs/apache-airflow/howto/connection.rst b/docs/apache-airflow/howto/connection.rst
index 2d15324..cbaecf2 100644
--- a/docs/apache-airflow/howto/connection.rst
+++ b/docs/apache-airflow/howto/connection.rst
@@ -212,10 +212,6 @@ In general, Airflow's URI format is like so:
my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2
-.. note::
-
- The params ``param1`` and ``param2`` are just examples; you may supply arbitrary urlencoded json-serializable data there.
-
The above URI would produce a ``Connection`` object equivalent to the following:
.. code-block:: python
@@ -232,17 +228,6 @@ The above URI would produce a ``Connection`` object equivalent to the following:
extra=json.dumps(dict(param1='val1', param2='val2'))
)
-You can verify a URI is parsed correctly like so:
-
-.. code-block:: pycon
-
- >>> from airflow.models.connection import Connection
-
- >>> c = Connection(uri='my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2')
- >>> print(c.login)
- my-login
- >>> print(c.password)
- my-password
.. _generating_connection_uri:
@@ -289,12 +274,63 @@ Additionally, if you have created a connection, you can use ``airflow connection
.. _manage-connections-connection-types:
+Encoding arbitrary JSON
+^^^^^^^^^^^^^^^^^^^^^^^
+
+Some JSON structures cannot be urlencoded without loss. For such JSON, ``get_uri``
+will store the entire string under the url query param ``__extra__``.
+
+For example:
+
+.. code-block:: pycon
+
+ >>> extra_dict = {'my_val': ['list', 'of', 'values'], 'extra': {'nested': {'json': 'val'}}}
+ >>> c = Connection(
+ >>> conn_type='scheme',
+ >>> host='host/location',
+ >>> schema='schema',
+ >>> login='user',
+ >>> password='password',
+ >>> port=1234,
+ >>> extra=json.dumps(extra_dict),
+ >>> )
+ >>> uri = c.get_uri()
+ >>> uri
+ 'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'
+
+
+And we can verify that it returns the same dictionary:
+
+.. code-block:: pycon
+
+ >>> new_c = Connection(uri=uri)
+ >>> new_c.extra_dejson == extra_dict
+ True
+
+
+But for the most common case of storing only key-value pairs, plain url encoding is used.
+
+You can verify a URI is parsed correctly like so:
+
+.. code-block:: pycon
+
+ >>> from airflow.models.connection import Connection
+
+ >>> c = Connection(uri='my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2')
+ >>> print(c.login)
+ my-login
+ >>> print(c.password)
+ my-password
+
+
Handling of special characters in connection params
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. note::
- This process is automated as described in section :ref:`Generating a Connection URI <generating_connection_uri>`.
+ Use the convenience method ``Connection.get_uri`` when generating a connection
+ as described in section :ref:`Generating a Connection URI <generating_connection_uri>`.
+ This section for informational purposes only.
Special handling is required for certain characters when building a URI manually.
diff --git a/tests/models/test_connection.py b/tests/models/test_connection.py
index 526d029..21fb7aa 100644
--- a/tests/models/test_connection.py
+++ b/tests/models/test_connection.py
@@ -138,6 +138,61 @@ class TestConnection(unittest.TestCase):
description='with extras',
),
UriTestCaseConfig(
+ test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?' '__extra__=single+value',
+ test_conn_attributes=dict(
+ conn_type='scheme',
+ host='host/location',
+ schema='schema',
+ login='user',
+ password='password',
+ port=1234,
+ extra='single value',
+ ),
+ description='with extras single value',
+ ),
+ UriTestCaseConfig(
+ test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?'
+ '__extra__=arbitrary+string+%2A%29%2A%24',
+ test_conn_attributes=dict(
+ conn_type='scheme',
+ host='host/location',
+ schema='schema',
+ login='user',
+ password='password',
+ port=1234,
+ extra='arbitrary string *)*$',
+ ),
+ description='with extra non-json',
+ ),
+ UriTestCaseConfig(
+ test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?'
+ '__extra__=%5B%22list%22%2C+%22of%22%2C+%22values%22%5D',
+ test_conn_attributes=dict(
+ conn_type='scheme',
+ host='host/location',
+ schema='schema',
+ login='user',
+ password='password',
+ port=1234,
+ extra_dejson=['list', 'of', 'values'],
+ ),
+ description='with extras list',
+ ),
+ UriTestCaseConfig(
+ test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?'
+ '__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D', # noqa: E501 # pylint: disable=C0301
+ test_conn_attributes=dict(
+ conn_type='scheme',
+ host='host/location',
+ schema='schema',
+ login='user',
+ password='password',
+ port=1234,
+ extra_dejson={'my_val': ['list', 'of', 'values'], 'extra': {'nested': {'json': 'val'}}},
+ ),
+ description='with nested json',
+ ),
+ UriTestCaseConfig(
test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?extra1=a%20value&extra2=',
test_conn_attributes=dict(
conn_type='scheme',
@@ -351,11 +406,9 @@ class TestConnection(unittest.TestCase):
for conn_attr, expected_val in test_config.test_conn_attributes.items():
actual_val = getattr(new_conn, conn_attr)
if expected_val is None:
- assert expected_val is None
- if isinstance(expected_val, dict):
- assert expected_val == actual_val
+ assert actual_val is None
else:
- assert expected_val == actual_val
+ assert actual_val == expected_val
@parameterized.expand(
[
diff --git a/tests/secrets/test_local_filesystem.py b/tests/secrets/test_local_filesystem.py
index 93e83ab..1c6c5ae 100644
--- a/tests/secrets/test_local_filesystem.py
+++ b/tests/secrets/test_local_filesystem.py
@@ -204,7 +204,10 @@ class TestLoadConnection(unittest.TestCase):
@parameterized.expand(
(
- ("""CONN_A: 'mysql://host_a'""", {"CONN_A": "mysql://host_a"}),
+ (
+ """CONN_A: 'mysql://host_a'""",
+ {"CONN_A": {'conn_type': 'mysql', 'host': 'host_a'}},
+ ),
(
"""
conn_a: mysql://hosta
@@ -216,28 +219,36 @@ class TestLoadConnection(unittest.TestCase):
password: None
port: 1234
extra_dejson:
- extra__google_cloud_platform__keyfile_dict:
- a: b
+ arbitrary_dict:
+ a: b
+ extra__google_cloud_platform__keyfile_dict: '{"a": "b"}'
extra__google_cloud_platform__keyfile_path: asaa""",
{
- "conn_a": "mysql://hosta",
- "conn_b": ''.join(
- """scheme://Login:None@host:1234/lschema?
- extra__google_cloud_platform__keyfile_dict=%7B%27a%27%3A+%27b%27%7D
- &extra__google_cloud_platform__keyfile_path=asaa""".split()
- ),
+ "conn_a": {'conn_type': 'mysql', 'host': 'hosta'},
+ "conn_b": {
+ 'conn_type': 'scheme',
+ 'host': 'host',
+ 'schema': 'lschema',
+ 'login': 'Login',
+ 'password': 'None',
+ 'port': 1234,
+ 'extra_dejson': {
+ 'arbitrary_dict': {"a": "b"},
+ 'extra__google_cloud_platform__keyfile_dict': '{"a": "b"}',
+ 'extra__google_cloud_platform__keyfile_path': 'asaa',
+ },
+ },
},
),
)
)
- def test_yaml_file_should_load_connection(self, file_content, expected_connection_uris):
+ def test_yaml_file_should_load_connection(self, file_content, expected_attrs_dict):
with mock_local_file(file_content):
connections_by_conn_id = local_filesystem.load_connections_dict("a.yaml")
- connection_uris_by_conn_id = {
- conn_id: connection.get_uri() for conn_id, connection in connections_by_conn_id.items()
- }
-
- assert expected_connection_uris == connection_uris_by_conn_id
+ for conn_id, connection in connections_by_conn_id.items():
+ expected_attrs = expected_attrs_dict[conn_id]
+ actual_attrs = {k: getattr(connection, k) for k in expected_attrs.keys()}
+ assert actual_attrs == expected_attrs
@parameterized.expand(
(