You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/19 11:22:49 UTC
[airflow] branch main updated: trino: Support CertificateAuthentication in the trino hook (#26246)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b2766d0515 trino: Support CertificateAuthentication in the trino hook (#26246)
b2766d0515 is described below
commit b2766d0515a33222f2c2c2281ef062ffbca36475
Author: c2zwdjnlcg <11...@users.noreply.github.com>
AuthorDate: Mon Sep 19 04:22:39 2022 -0700
trino: Support CertificateAuthentication in the trino hook (#26246)
Allows authenticating to trino clusters with mtls
---
airflow/providers/trino/hooks/trino.py | 9 +++++++--
docs/apache-airflow-providers-trino/connections.rst | 3 ++-
tests/providers/trino/hooks/test_trino.py | 20 +++++++++++++++++++-
3 files changed, 28 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/trino/hooks/trino.py b/airflow/providers/trino/hooks/trino.py
index d5a3530703..da79664aa4 100644
--- a/airflow/providers/trino/hooks/trino.py
+++ b/airflow/providers/trino/hooks/trino.py
@@ -101,12 +101,17 @@ class TrinoHook(DbApiHook):
extra = db.extra_dejson
auth = None
user = db.login
- if db.password and extra.get('auth') == 'kerberos':
- raise AirflowException("Kerberos authorization doesn't support password.")
+ if db.password and extra.get('auth') in ('kerberos', 'certs'):
+ raise AirflowException(f"The {extra.get('auth')!r} authorization type doesn't support password.")
elif db.password:
auth = trino.auth.BasicAuthentication(db.login, db.password) # type: ignore[attr-defined]
elif extra.get('auth') == 'jwt':
auth = trino.auth.JWTAuthentication(token=extra.get('jwt__token'))
+ elif extra.get('auth') == 'certs':
+ auth = trino.auth.CertificateAuthentication(
+ extra.get('certs__client_cert_path'),
+ extra.get('certs__client_key_path'),
+ )
elif extra.get('auth') == 'kerberos':
auth = trino.auth.KerberosAuthentication( # type: ignore[attr-defined]
config=extra.get('kerberos__config', os.environ.get('KRB5_CONFIG')),
diff --git a/docs/apache-airflow-providers-trino/connections.rst b/docs/apache-airflow-providers-trino/connections.rst
index 111205b8a9..9e7bb7301e 100644
--- a/docs/apache-airflow-providers-trino/connections.rst
+++ b/docs/apache-airflow-providers-trino/connections.rst
@@ -43,10 +43,11 @@ Password
Extra (optional, connection parameters)
Specify the extra parameters (as json dictionary) that can be used in Trino connection. The following parameters out of the standard python parameters are supported:
- * ``auth`` - Specifies which type of authentication needs to be enabled. The value can be ``kerberos``, ``jwt``.
+ * ``auth`` - Specifies which type of authentication needs to be enabled. The value can be ``certs``, ``kerberos``, or ``jwt``
* ``impersonate_as_owner`` - Boolean that allows to set ``AIRFLOW_CTX_DAG_OWNER`` as a user of the connection.
The following extra parameters can be used to configure authentication:
* ``jwt__token`` - If jwt authentication should be used, the value of token is given via this parameter.
+ * ``certs__client_cert_path``, ``certs__client_key_path``- If certificate authentication should be used, the path to the client certificate and key is given via these parameters.
* ``kerberos__service_name``, ``kerberos__config``, ``kerberos__mutual_authentication``, ``kerberos__force_preemptive``, ``kerberos__hostname_override``, ``kerberos__sanitize_mutual_error_response``, ``kerberos__principal``,``kerberos__delegate``, ``kerberos__ca_bundle`` - These parameters can be set when enabling ``kerberos`` authentication.
diff --git a/tests/providers/trino/hooks/test_trino.py b/tests/providers/trino/hooks/test_trino.py
index b077aa52e3..11d95c2bcc 100644
--- a/tests/providers/trino/hooks/test_trino.py
+++ b/tests/providers/trino/hooks/test_trino.py
@@ -36,6 +36,7 @@ BASIC_AUTHENTICATION = 'airflow.providers.trino.hooks.trino.trino.auth.BasicAuth
KERBEROS_AUTHENTICATION = 'airflow.providers.trino.hooks.trino.trino.auth.KerberosAuthentication'
TRINO_DBAPI_CONNECT = 'airflow.providers.trino.hooks.trino.trino.dbapi.connect'
JWT_AUTHENTICATION = 'airflow.providers.trino.hooks.trino.trino.auth.JWTAuthentication'
+CERT_AUTHENTICATION = 'airflow.providers.trino.hooks.trino.trino.auth.CertificateAuthentication'
class TestTrinoHookConn:
@@ -92,7 +93,7 @@ class TestTrinoHookConn:
extra=json.dumps(extras),
)
with pytest.raises(
- AirflowException, match=re.escape("Kerberos authorization doesn't support password.")
+ AirflowException, match=re.escape("The 'kerberos' authorization type doesn't support password.")
):
TrinoHook().get_conn()
@@ -111,6 +112,23 @@ class TestTrinoHookConn:
TrinoHook().get_conn()
self.assert_connection_called_with(mock_connect, auth=mock_jwt_auth)
+ @patch(CERT_AUTHENTICATION)
+ @patch(TRINO_DBAPI_CONNECT)
+ @patch(HOOK_GET_CONNECTION)
+ def test_get_conn_cert_auth(self, mock_get_connection, mock_connect, mock_cert_auth):
+ extras = {
+ 'auth': 'certs',
+ 'certs__client_cert_path': '/path/to/client.pem',
+ 'certs__client_key_path': '/path/to/client.key',
+ }
+ self.set_get_connection_return_value(
+ mock_get_connection,
+ extra=json.dumps(extras),
+ )
+ TrinoHook().get_conn()
+ self.assert_connection_called_with(mock_connect, auth=mock_cert_auth)
+ mock_cert_auth.assert_called_once_with('/path/to/client.pem', '/path/to/client.key')
+
@patch(KERBEROS_AUTHENTICATION)
@patch(TRINO_DBAPI_CONNECT)
@patch(HOOK_GET_CONNECTION)