You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/12 17:59:23 UTC
[GitHub] [airflow] raphaelauv commented on issue #19251: Add a way to skip the secret_backend
raphaelauv commented on issue #19251:
URL: https://github.com/apache/airflow/issues/19251#issuecomment-1346962712
@rodrigocollavo
the full solution
```python
from typing import Optional
from airflow.providers.google.cloud.secrets.secret_manager import CloudSecretManagerBackend
class CustomCloudSecretManagerBackend(CloudSecretManagerBackend):
"""
https://github.com/apache/airflow/issues/19251
Add the option secret_lookup_prefix to the GCP CloudSecretManagerBackend
when set this option will only look inside GCP secret manager the variables or connections prefixed
with the same value
example:
with secret_lookup_prefix=None
Variable.get("TOTO") will call the GCP secret provider
Connection.get("TOTO") will call the GCP secret provider
with secret_lookup_prefix="secret_"
Variable.get("TOTO") will NOT call the GCP secret provider
Variable.get("secret_TOTO") will call the GCP secret provider with TOTO ( without the prefix secret_ )
Connection.get("TOTO") will NOT call the GCP secret provider
Connection.get("secret_TOTO") will call the GCP secret provider with TOTO ( without the prefix secret_ )
"""
def __init__(
self,
secret_lookup_prefix: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
self.secret_lookup_prefix = secret_lookup_prefix
def get_variable(self, key: str) -> Optional[str]:
if self.variables_prefix is None:
return None
if self.secret_lookup_prefix is not None:
if not key.startswith(self.secret_lookup_prefix):
return None
else:
key = key[len(self.secret_lookup_prefix):]
return self._get_secret(self.variables_prefix, key)
def get_conn_uri(self, conn_id: str) -> Optional[str]:
if self.connections_prefix is None:
return None
if self.secret_lookup_prefix is not None:
if not conn_id.startswith(self.secret_lookup_prefix):
return None
else:
conn_id = conn_id[len(self.secret_lookup_prefix):]
return self._get_secret(self.connections_prefix, conn_id)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org