You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2021/04/14 05:05:38 UTC

[airflow] branch master updated: Add support for arbitrary json in conn uri format (#15100)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new a4c4e61  Add support for arbitrary json in conn uri format (#15100)
a4c4e61 is described below

commit a4c4e616e7684b708f6704a27ba9f7d2c9cb1022
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Apr 13 22:05:12 2021 -0700

    Add support for arbitrary json in conn uri format (#15100)
    
    Currently in airflow web UI and the CLI you can store arbitrary (e.g. nested) json in the `extra` field.  But the URI format can only handle primitive key-value pairs.  This PR provides support for arbitrary json in the URI format.
    
    Co-authored-by: Daniel Standish <ds...@users.noreply.github.com>
    Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
---
 airflow/models/connection.py             | 19 +++++++--
 docs/apache-airflow/howto/connection.rst | 68 ++++++++++++++++++++++++--------
 tests/models/test_connection.py          | 61 ++++++++++++++++++++++++++--
 tests/secrets/test_local_filesystem.py   | 41 ++++++++++++-------
 4 files changed, 151 insertions(+), 38 deletions(-)

diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 67a3b4d..78c7629 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -89,6 +89,8 @@ class Connection(Base, LoggingMixin):  # pylint: disable=too-many-instance-attri
     :type uri: str
     """
 
+    EXTRA_KEY = '__extra__'
+
     __tablename__ = "connection"
 
     id = Column(Integer(), primary_key=True)
@@ -161,7 +163,11 @@ class Connection(Base, LoggingMixin):  # pylint: disable=too-many-instance-attri
         self.password = unquote(uri_parts.password) if uri_parts.password else uri_parts.password
         self.port = uri_parts.port
         if uri_parts.query:
-            self.extra = json.dumps(dict(parse_qsl(uri_parts.query, keep_blank_values=True)))
+            query = dict(parse_qsl(uri_parts.query, keep_blank_values=True))
+            if self.EXTRA_KEY in query:
+                self.extra = query[self.EXTRA_KEY]
+            else:
+                self.extra = json.dumps(query)
 
     def get_uri(self) -> str:
         """Return connection in URI format"""
@@ -194,8 +200,15 @@ class Connection(Base, LoggingMixin):  # pylint: disable=too-many-instance-attri
 
         uri += host_block
 
-        if self.extra_dejson:
-            uri += f'?{urlencode(self.extra_dejson)}'
+        if self.extra:
+            try:
+                query = urlencode(self.extra_dejson)
+            except TypeError:
+                query = None
+            if query and self.extra_dejson == dict(parse_qsl(query, keep_blank_values=True)):
+                uri += '?' + query
+            else:
+                uri += '?' + urlencode({self.EXTRA_KEY: self.extra})
 
         return uri
 
diff --git a/docs/apache-airflow/howto/connection.rst b/docs/apache-airflow/howto/connection.rst
index 2d15324..cbaecf2 100644
--- a/docs/apache-airflow/howto/connection.rst
+++ b/docs/apache-airflow/howto/connection.rst
@@ -212,10 +212,6 @@ In general, Airflow's URI format is like so:
 
     my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2
 
-.. note::
-
-    The params ``param1`` and ``param2`` are just examples; you may supply arbitrary urlencoded json-serializable data there.
-
 The above URI would produce a ``Connection`` object equivalent to the following:
 
 .. code-block:: python
@@ -232,17 +228,6 @@ The above URI would produce a ``Connection`` object equivalent to the following:
         extra=json.dumps(dict(param1='val1', param2='val2'))
     )
 
-You can verify a URI is parsed correctly like so:
-
-.. code-block:: pycon
-
-    >>> from airflow.models.connection import Connection
-
-    >>> c = Connection(uri='my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2')
-    >>> print(c.login)
-    my-login
-    >>> print(c.password)
-    my-password
 
 .. _generating_connection_uri:
 
@@ -289,12 +274,63 @@ Additionally, if you have created a connection, you can use ``airflow connection
 
 .. _manage-connections-connection-types:
 
+Encoding arbitrary JSON
+^^^^^^^^^^^^^^^^^^^^^^^
+
+Some JSON structures cannot be urlencoded without loss.  For such JSON, ``get_uri``
+will store the entire string under the url query param ``__extra__``.
+
+For example:
+
+.. code-block:: pycon
+
+    >>> extra_dict = {'my_val': ['list', 'of', 'values'], 'extra': {'nested': {'json': 'val'}}}
+    >>> c = Connection(
+    >>>     conn_type='scheme',
+    >>>     host='host/location',
+    >>>     schema='schema',
+    >>>     login='user',
+    >>>     password='password',
+    >>>     port=1234,
+    >>>     extra=json.dumps(extra_dict),
+    >>> )
+    >>> uri = c.get_uri()
+    >>> uri
+    'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'
+
+
+And we can verify that it returns the same dictionary:
+
+.. code-block:: pycon
+
+    >>> new_c = Connection(uri=uri)
+    >>> new_c.extra_dejson == extra_dict
+    True
+
+
+But for the most common case of storing only key-value pairs, plain url encoding is used.
+
+You can verify a URI is parsed correctly like so:
+
+.. code-block:: pycon
+
+    >>> from airflow.models.connection import Connection
+
+    >>> c = Connection(uri='my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2')
+    >>> print(c.login)
+    my-login
+    >>> print(c.password)
+    my-password
+
+
 Handling of special characters in connection params
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 .. note::
 
-    This process is automated as described in section :ref:`Generating a Connection URI <generating_connection_uri>`.
+    Use the convenience method ``Connection.get_uri`` when generating a connection
+    as described in section :ref:`Generating a Connection URI <generating_connection_uri>`.
+    This section for informational purposes only.
 
 Special handling is required for certain characters when building a URI manually.
 
diff --git a/tests/models/test_connection.py b/tests/models/test_connection.py
index 526d029..21fb7aa 100644
--- a/tests/models/test_connection.py
+++ b/tests/models/test_connection.py
@@ -138,6 +138,61 @@ class TestConnection(unittest.TestCase):
             description='with extras',
         ),
         UriTestCaseConfig(
+            test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?' '__extra__=single+value',
+            test_conn_attributes=dict(
+                conn_type='scheme',
+                host='host/location',
+                schema='schema',
+                login='user',
+                password='password',
+                port=1234,
+                extra='single value',
+            ),
+            description='with extras single value',
+        ),
+        UriTestCaseConfig(
+            test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?'
+            '__extra__=arbitrary+string+%2A%29%2A%24',
+            test_conn_attributes=dict(
+                conn_type='scheme',
+                host='host/location',
+                schema='schema',
+                login='user',
+                password='password',
+                port=1234,
+                extra='arbitrary string *)*$',
+            ),
+            description='with extra non-json',
+        ),
+        UriTestCaseConfig(
+            test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?'
+            '__extra__=%5B%22list%22%2C+%22of%22%2C+%22values%22%5D',
+            test_conn_attributes=dict(
+                conn_type='scheme',
+                host='host/location',
+                schema='schema',
+                login='user',
+                password='password',
+                port=1234,
+                extra_dejson=['list', 'of', 'values'],
+            ),
+            description='with extras list',
+        ),
+        UriTestCaseConfig(
+            test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?'
+            '__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D',  # noqa: E501 # pylint: disable=C0301
+            test_conn_attributes=dict(
+                conn_type='scheme',
+                host='host/location',
+                schema='schema',
+                login='user',
+                password='password',
+                port=1234,
+                extra_dejson={'my_val': ['list', 'of', 'values'], 'extra': {'nested': {'json': 'val'}}},
+            ),
+            description='with nested json',
+        ),
+        UriTestCaseConfig(
             test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?extra1=a%20value&extra2=',
             test_conn_attributes=dict(
                 conn_type='scheme',
@@ -351,11 +406,9 @@ class TestConnection(unittest.TestCase):
         for conn_attr, expected_val in test_config.test_conn_attributes.items():
             actual_val = getattr(new_conn, conn_attr)
             if expected_val is None:
-                assert expected_val is None
-            if isinstance(expected_val, dict):
-                assert expected_val == actual_val
+                assert actual_val is None
             else:
-                assert expected_val == actual_val
+                assert actual_val == expected_val
 
     @parameterized.expand(
         [
diff --git a/tests/secrets/test_local_filesystem.py b/tests/secrets/test_local_filesystem.py
index 93e83ab..1c6c5ae 100644
--- a/tests/secrets/test_local_filesystem.py
+++ b/tests/secrets/test_local_filesystem.py
@@ -204,7 +204,10 @@ class TestLoadConnection(unittest.TestCase):
 
     @parameterized.expand(
         (
-            ("""CONN_A: 'mysql://host_a'""", {"CONN_A": "mysql://host_a"}),
+            (
+                """CONN_A: 'mysql://host_a'""",
+                {"CONN_A": {'conn_type': 'mysql', 'host': 'host_a'}},
+            ),
             (
                 """
             conn_a: mysql://hosta
@@ -216,28 +219,36 @@ class TestLoadConnection(unittest.TestCase):
                password: None
                port: 1234
                extra_dejson:
-                 extra__google_cloud_platform__keyfile_dict:
-                   a: b
+                 arbitrary_dict:
+                    a: b
+                 extra__google_cloud_platform__keyfile_dict: '{"a": "b"}'
                  extra__google_cloud_platform__keyfile_path: asaa""",
                 {
-                    "conn_a": "mysql://hosta",
-                    "conn_b": ''.join(
-                        """scheme://Login:None@host:1234/lschema?
-                        extra__google_cloud_platform__keyfile_dict=%7B%27a%27%3A+%27b%27%7D
-                        &extra__google_cloud_platform__keyfile_path=asaa""".split()
-                    ),
+                    "conn_a": {'conn_type': 'mysql', 'host': 'hosta'},
+                    "conn_b": {
+                        'conn_type': 'scheme',
+                        'host': 'host',
+                        'schema': 'lschema',
+                        'login': 'Login',
+                        'password': 'None',
+                        'port': 1234,
+                        'extra_dejson': {
+                            'arbitrary_dict': {"a": "b"},
+                            'extra__google_cloud_platform__keyfile_dict': '{"a": "b"}',
+                            'extra__google_cloud_platform__keyfile_path': 'asaa',
+                        },
+                    },
                 },
             ),
         )
     )
-    def test_yaml_file_should_load_connection(self, file_content, expected_connection_uris):
+    def test_yaml_file_should_load_connection(self, file_content, expected_attrs_dict):
         with mock_local_file(file_content):
             connections_by_conn_id = local_filesystem.load_connections_dict("a.yaml")
-            connection_uris_by_conn_id = {
-                conn_id: connection.get_uri() for conn_id, connection in connections_by_conn_id.items()
-            }
-
-            assert expected_connection_uris == connection_uris_by_conn_id
+            for conn_id, connection in connections_by_conn_id.items():
+                expected_attrs = expected_attrs_dict[conn_id]
+                actual_attrs = {k: getattr(connection, k) for k in expected_attrs.keys()}
+                assert actual_attrs == expected_attrs
 
     @parameterized.expand(
         (