You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/19 11:22:49 UTC

[airflow] branch main updated: trino: Support CertificateAuthentication in the trino hook (#26246)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b2766d0515 trino: Support CertificateAuthentication in the trino hook (#26246)
b2766d0515 is described below

commit b2766d0515a33222f2c2c2281ef062ffbca36475
Author: c2zwdjnlcg <11...@users.noreply.github.com>
AuthorDate: Mon Sep 19 04:22:39 2022 -0700

    trino: Support CertificateAuthentication in the trino hook (#26246)
    
    Allows authenticating to trino clusters with mtls
---
 airflow/providers/trino/hooks/trino.py              |  9 +++++++--
 docs/apache-airflow-providers-trino/connections.rst |  3 ++-
 tests/providers/trino/hooks/test_trino.py           | 20 +++++++++++++++++++-
 3 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/trino/hooks/trino.py b/airflow/providers/trino/hooks/trino.py
index d5a3530703..da79664aa4 100644
--- a/airflow/providers/trino/hooks/trino.py
+++ b/airflow/providers/trino/hooks/trino.py
@@ -101,12 +101,17 @@ class TrinoHook(DbApiHook):
         extra = db.extra_dejson
         auth = None
         user = db.login
-        if db.password and extra.get('auth') == 'kerberos':
-            raise AirflowException("Kerberos authorization doesn't support password.")
+        if db.password and extra.get('auth') in ('kerberos', 'certs'):
+            raise AirflowException(f"The {extra.get('auth')!r} authorization type doesn't support password.")
         elif db.password:
             auth = trino.auth.BasicAuthentication(db.login, db.password)  # type: ignore[attr-defined]
         elif extra.get('auth') == 'jwt':
             auth = trino.auth.JWTAuthentication(token=extra.get('jwt__token'))
+        elif extra.get('auth') == 'certs':
+            auth = trino.auth.CertificateAuthentication(
+                extra.get('certs__client_cert_path'),
+                extra.get('certs__client_key_path'),
+            )
         elif extra.get('auth') == 'kerberos':
             auth = trino.auth.KerberosAuthentication(  # type: ignore[attr-defined]
                 config=extra.get('kerberos__config', os.environ.get('KRB5_CONFIG')),
diff --git a/docs/apache-airflow-providers-trino/connections.rst b/docs/apache-airflow-providers-trino/connections.rst
index 111205b8a9..9e7bb7301e 100644
--- a/docs/apache-airflow-providers-trino/connections.rst
+++ b/docs/apache-airflow-providers-trino/connections.rst
@@ -43,10 +43,11 @@ Password
 Extra (optional, connection parameters)
     Specify the extra parameters (as json dictionary) that can be used in Trino connection. The following parameters out of the standard python parameters are supported:
 
-    * ``auth`` - Specifies which type of authentication needs to be enabled. The value can be ``kerberos``, ``jwt``.
+    * ``auth`` - Specifies which type of authentication needs to be enabled. The value can be ``certs``, ``kerberos``, or ``jwt``
     * ``impersonate_as_owner`` - Boolean that allows to set ``AIRFLOW_CTX_DAG_OWNER`` as a user of the connection.
 
     The following extra parameters can be used to configure authentication:
 
     * ``jwt__token`` - If jwt authentication should be used, the value of token is given via this parameter.
+    * ``certs__client_cert_path``, ``certs__client_key_path``- If certificate authentication should be used, the path to the client certificate and key is given via these parameters.
     * ``kerberos__service_name``, ``kerberos__config``, ``kerberos__mutual_authentication``, ``kerberos__force_preemptive``, ``kerberos__hostname_override``, ``kerberos__sanitize_mutual_error_response``, ``kerberos__principal``,``kerberos__delegate``, ``kerberos__ca_bundle`` - These parameters can be set when enabling ``kerberos`` authentication.
diff --git a/tests/providers/trino/hooks/test_trino.py b/tests/providers/trino/hooks/test_trino.py
index b077aa52e3..11d95c2bcc 100644
--- a/tests/providers/trino/hooks/test_trino.py
+++ b/tests/providers/trino/hooks/test_trino.py
@@ -36,6 +36,7 @@ BASIC_AUTHENTICATION = 'airflow.providers.trino.hooks.trino.trino.auth.BasicAuth
 KERBEROS_AUTHENTICATION = 'airflow.providers.trino.hooks.trino.trino.auth.KerberosAuthentication'
 TRINO_DBAPI_CONNECT = 'airflow.providers.trino.hooks.trino.trino.dbapi.connect'
 JWT_AUTHENTICATION = 'airflow.providers.trino.hooks.trino.trino.auth.JWTAuthentication'
+CERT_AUTHENTICATION = 'airflow.providers.trino.hooks.trino.trino.auth.CertificateAuthentication'
 
 
 class TestTrinoHookConn:
@@ -92,7 +93,7 @@ class TestTrinoHookConn:
             extra=json.dumps(extras),
         )
         with pytest.raises(
-            AirflowException, match=re.escape("Kerberos authorization doesn't support password.")
+            AirflowException, match=re.escape("The 'kerberos' authorization type doesn't support password.")
         ):
             TrinoHook().get_conn()
 
@@ -111,6 +112,23 @@ class TestTrinoHookConn:
         TrinoHook().get_conn()
         self.assert_connection_called_with(mock_connect, auth=mock_jwt_auth)
 
+    @patch(CERT_AUTHENTICATION)
+    @patch(TRINO_DBAPI_CONNECT)
+    @patch(HOOK_GET_CONNECTION)
+    def test_get_conn_cert_auth(self, mock_get_connection, mock_connect, mock_cert_auth):
+        extras = {
+            'auth': 'certs',
+            'certs__client_cert_path': '/path/to/client.pem',
+            'certs__client_key_path': '/path/to/client.key',
+        }
+        self.set_get_connection_return_value(
+            mock_get_connection,
+            extra=json.dumps(extras),
+        )
+        TrinoHook().get_conn()
+        self.assert_connection_called_with(mock_connect, auth=mock_cert_auth)
+        mock_cert_auth.assert_called_once_with('/path/to/client.pem', '/path/to/client.key')
+
     @patch(KERBEROS_AUTHENTICATION)
     @patch(TRINO_DBAPI_CONNECT)
     @patch(HOOK_GET_CONNECTION)