You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/08 18:03:38 UTC
[GitHub] [airflow] drivard commented on issue #9713: Looping issue using Hashicorp Vault
drivard commented on issue #9713:
URL: https://github.com/apache/airflow/issues/9713#issuecomment-1179241434
Hello,
Last week I experienced exactly the same behaviour where I configured Hashicorp Vault to be the secrets backend. It resulted in putting down our vault cluster.
We are using datadog for monitoring and we saw drastic amount of requests ~10-15k/hour trying to authenticate against the vault from the airflow auth_role configured for the secrets backend.
We host airflow on kubernetes and we use the official helm chart with a custom docker image to deploy it. We host multiple instance of airflow for different environments.
Reading this issue I thought I would try to implement a lru cache in the `VaultBackend` class to try reduce the number of requests sent to vautl, starting by the retrieval of the secrets themselves. Using something like `@lru_cache` decorator on top of the `get_connecions` or `get_variable` [function](https://github.com/apache/airflow/blob/main/airflow/providers/hashicorp/secrets/vault.py#L194-L216).
It didn't do any good and I believe it is because there was to many thread executing the same code. The cache didn't seem to be shared across the thread within a pod, plus there is too many pods which requires to get the secrets that it defeated the lru_cache thinking.
That brought me to think about centralizing the cache and while I am using the chart to deploy airflow I also use the embedded redis as the celery broker. In the past, I have use redis as a database query cache, so I thought I could re-use the same behaviour to centralize the secrets cache inside a new redis db. Therefore I would gain that the pods or threads would share the access to the same cache.
Here are the steps and ideas I followed:
1. it is a connection or variable which is Json formatted. it can contain password or sensitive data.
1.1 I need to encrypt the data before sending it to the cache
1. caching the vault client reduce a lot the validation against the `is_authenticated` method which generates 3 calls to the vault as per our monitoring
1. create a small custom wheel package to inject my custom secret backend
1. install my custom secrets backend inside my custom docker image and redeploy the chart
1. let's re-use the internal redis by utilizing the `AIRFLOW__CELERY__BROKER_URL` environment variable but also don't mixte the data, so switch the redis db to 1 instead of 0.
1. I must be able to set a TTL on the cache --> cache_ttl in backend_kwargs configuration
The flow would be the following when trying to fetch a variable, a config or a connection:
1. is the connection in the cache:
1. YES: retrieve the cache from redis --> un-pickle the data --> decrypt the data using fernet --> continue the normal flow
1. NO: retrieve the connection from vault --> encrypt the data using fernet --> pickle --> set the data in the cache --> continue the normal flow
The resulting class is the following and I copied the [vault.py](https://github.com/apache/airflow/blob/main/airflow/providers/hashicorp/secrets/vault.py) file as a starter:
```
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Objects relating to sourcing connections & variables from Hashicorp Vault"""
import warnings
from typing import TYPE_CHECKING, Optional
from airflow.providers.hashicorp._internal_client.vault_client import _VaultClient
from airflow.secrets import BaseSecretsBackend
from airflow.utils.log.logging_mixin import LoggingMixin
from custom_secrets.secrets.redis import RedisCache
class CachedVaultBackend(BaseSecretsBackend, LoggingMixin):
"""
Retrieves Connections and Variables from Hashicorp Vault.
Configurable via ``airflow.cfg`` as follows:
.. code-block:: ini
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.CachedVaultBackend
backend_kwargs = {
"connections_path": "connections",
"url": "http://127.0.0.1:8200",
"mount_point": "airflow"
}
For example, if your keys are under ``connections`` path in ``airflow`` mount_point, this
would be accessible if you provide ``{"connections_path": "connections"}`` and request
conn_id ``smtp_default``.
:param connections_path: Specifies the path of the secret to read to get Connections.
(default: 'connections'). If set to None (null), requests for connections will not be sent to Vault.
:param variables_path: Specifies the path of the secret to read to get Variable.
(default: 'variables'). If set to None (null), requests for variables will not be sent to Vault.
:param config_path: Specifies the path of the secret to read Airflow Configurations
(default: 'config'). If set to None (null), requests for configurations will not be sent to Vault.
:param url: Base URL for the Vault instance being addressed.
:param auth_type: Authentication Type for Vault. Default is ``token``. Available values are:
('approle', 'aws_iam', 'azure', 'github', 'gcp', 'kubernetes', 'ldap', 'radius', 'token', 'userpass')
:param auth_mount_point: It can be used to define mount_point for authentication chosen
Default depends on the authentication method used.
:param mount_point: The "path" the secret engine was mounted on. Default is "secret". Note that
this mount_point is not used for authentication if authentication is done via a
different engine. For authentication mount_points see, auth_mount_point.
:param kv_engine_version: Select the version of the engine to run (``1`` or ``2``, default: ``2``).
:param token: Authentication token to include in requests sent to Vault.
(for ``token`` and ``github`` auth_type)
:param token_path: path to file containing authentication token to include in requests sent to Vault
(for ``token`` and ``github`` auth_type).
:param username: Username for Authentication (for ``ldap`` and ``userpass`` auth_type).
:param password: Password for Authentication (for ``ldap`` and ``userpass`` auth_type).
:param key_id: Key ID for Authentication (for ``aws_iam`` and ''azure`` auth_type).
:param secret_id: Secret ID for Authentication (for ``approle``, ``aws_iam`` and ``azure`` auth_types).
:param role_id: Role ID for Authentication (for ``approle``, ``aws_iam`` auth_types).
:param kubernetes_role: Role for Authentication (for ``kubernetes`` auth_type).
:param kubernetes_jwt_path: Path for kubernetes jwt token (for ``kubernetes`` auth_type, default:
``/var/run/secrets/kubernetes.io/serviceaccount/token``).
:param gcp_key_path: Path to Google Cloud Service Account key file (JSON) (for ``gcp`` auth_type).
Mutually exclusive with gcp_keyfile_dict.
:param gcp_keyfile_dict: Dictionary of keyfile parameters. (for ``gcp`` auth_type).
Mutually exclusive with gcp_key_path.
:param gcp_scopes: Comma-separated string containing OAuth2 scopes (for ``gcp`` auth_type).
:param azure_tenant_id: The tenant id for the Azure Active Directory (for ``azure`` auth_type).
:param azure_resource: The configured URL for the application registered in Azure Active Directory
(for ``azure`` auth_type).
:param radius_host: Host for radius (for ``radius`` auth_type).
:param radius_secret: Secret for radius (for ``radius`` auth_type).
:param radius_port: Port for radius (for ``radius`` auth_type).
"""
def __init__(
self,
connections_path: str = "connections",
variables_path: str = "variables",
config_path: str = "config",
url: Optional[str] = None,
auth_type: str = "token",
auth_mount_point: Optional[str] = None,
mount_point: str = "secret",
kv_engine_version: int = 2,
token: Optional[str] = None,
token_path: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
key_id: Optional[str] = None,
secret_id: Optional[str] = None,
role_id: Optional[str] = None,
kubernetes_role: Optional[str] = None,
kubernetes_jwt_path: str = "/var/run/secrets/kubernetes.io/serviceaccount/token",
gcp_key_path: Optional[str] = None,
gcp_keyfile_dict: Optional[dict] = None,
gcp_scopes: Optional[str] = None,
azure_tenant_id: Optional[str] = None,
azure_resource: Optional[str] = None,
radius_host: Optional[str] = None,
radius_secret: Optional[str] = None,
radius_port: Optional[int] = None,
cache_ttl: Optional[int] = 600,
**kwargs,
):
super().__init__()
if connections_path is not None:
self.connections_path = connections_path.rstrip("/")
else:
self.connections_path = connections_path
if variables_path is not None:
self.variables_path = variables_path.rstrip("/")
else:
self.variables_path = variables_path
if config_path is not None:
self.config_path = config_path.rstrip("/")
else:
self.config_path = config_path
self.mount_point = mount_point
self.kv_engine_version = kv_engine_version
# Redis Cache Backend
self.cache_ttl = cache_ttl
self.redis_client = RedisCache()
# to prevent many callback to vault on the lookup token endpoint
# this will cache the vault_client under the `VAULT_CLIENT` key.
self.vault_client = self.redis_client.get("VAULT_CLIENT")
if not self.vault_client:
self.vault_client = _VaultClient(
url=url,
auth_type=auth_type,
auth_mount_point=auth_mount_point,
mount_point=mount_point,
kv_engine_version=kv_engine_version,
token=token,
token_path=token_path,
username=username,
password=password,
key_id=key_id,
secret_id=secret_id,
role_id=role_id,
kubernetes_role=kubernetes_role,
kubernetes_jwt_path=kubernetes_jwt_path,
gcp_key_path=gcp_key_path,
gcp_keyfile_dict=gcp_keyfile_dict,
gcp_scopes=gcp_scopes,
azure_tenant_id=azure_tenant_id,
azure_resource=azure_resource,
radius_host=radius_host,
radius_secret=radius_secret,
radius_port=radius_port,
**kwargs,
)
self.redis_client.set("VAULT_CLIENT", self.vault_client, ttl=self.cache_ttl)
def get_response(self, conn_id: str) -> Optional[dict]:
"""
Get data from Vault
:rtype: dict
:return: The data from the Vault path if exists
"""
if self.connections_path is None:
return None
secret_path = self.build_path(self.connections_path, conn_id)
cached = self.redis_client.get(f"conn_id_{conn_id}")
if not cached:
cached = self.vault_client.get_secret(secret_path=secret_path)
self.redis_client.set(f"conn_id_{conn_id}", cached, ttl=self.cache_ttl)
return cached
def get_conn_uri(self, conn_id: str) -> Optional[str]:
"""
Get serialized representation of connection
:param conn_id: The connection id
:rtype: str
:return: The connection uri retrieved from the secret
"""
# Since VaultBackend implements `get_connection`, `get_conn_uri` is not used. So we
# don't need to implement (or direct users to use) method `get_conn_value` instead
warnings.warn(
f"Method `{self.__class__.__name__}.get_conn_uri` is deprecated and will be removed "
"in a future release.",
PendingDeprecationWarning,
stacklevel=2,
)
response = self.get_response(conn_id)
return response.get("conn_uri") if response else None
# Make sure connection is imported this way for type checking, otherwise when importing
# the backend it will get a circular dependency and fail
if TYPE_CHECKING:
from airflow.models.connection import Connection
def get_connection(self, conn_id: str) -> "Optional[Connection]":
"""
Get connection from Vault as secret. Prioritize conn_uri if exists,
if not fall back to normal Connection creation.
:rtype: Connection
:return: A Connection object constructed from Vault data
"""
# The Connection needs to be locally imported because otherwise we get into cyclic import
# problems when instantiating the backend during configuration
from airflow.models.connection import Connection
response = self.get_response(conn_id)
if response is None:
return None
uri = response.get("conn_uri")
if uri:
return Connection(conn_id, uri=uri)
return Connection(conn_id, **response)
def get_variable(self, key: str) -> Optional[str]:
"""
Get Airflow Variable
:param key: Variable Key
:rtype: str
:return: Variable Value retrieved from the vault
"""
if self.variables_path is None:
return None
else:
secret_path = self.build_path(self.variables_path, key)
cached = self.redis_client.get(f"variables_{key}")
if not cached:
cached = self.vault_client.get_secret(secret_path=secret_path)
self.redis_client.set(f"variables_{key}", cached, ttl=self.cache_ttl)
return cached.get("value") if cached else None
def get_config(self, key: str) -> Optional[str]:
"""
Get Airflow Configuration
:param key: Configuration Option Key
:rtype: str
:return: Configuration Option Value retrieved from the vault
"""
if self.config_path is None:
return None
else:
secret_path = self.build_path(self.config_path, key)
cached = self.redis_client.get(f"config_{key}")
if not cached:
cached = self.vault_client.get_secret(secret_path=secret_path)
self.redis_client.set(f"config_{key}", cached, ttl=self.cache_ttl)
return cached.get("value") if cached else None
```
I also had to write a redis cache client:
```
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import os
import pickle
from typing import Optional
import redis
from airflow.exceptions import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
ENV_VAR_PREFIX = "AIRFLOW__"
class RedisCache(LoggingMixin):
"""
Set secrets to a redis cache.
Uses the same Redis server as the CELERY_BROKER_URL.
"""
def __init__(self, db=1):
try:
# If redis is not available or in my case istio is blocking the database migration init containers
redis_url = os.environ.get(f"{ENV_VAR_PREFIX}CELERY__BROKER_URL", f"redis://redis:6379/{db}")
self.redis_url = redis_url[:-1] + f"{db}"
self.client = redis.from_url(self.redis_url)
except Exception as e:
self.log.error(
f"Unable to connect to Redis server. Secrets will not be cached.", extra={"exception": e}
)
self.redis_url = None
self.client = None
return
def set(self, key, data, ttl=300) -> bool:
try:
if key == "VAULT_CLIENT":
encoded_data = pickle.dumps(data)
else:
encoded_data = pickle.dumps(self.encrypt(data))
self.client.set(key, encoded_data, ex=ttl)
result = True
except Exception as e:
self.log.error(f"Unable to set secret to cache: {key}", extra={"exception": e})
result = False
return result
def get(self, key) -> Optional[str]:
try:
encoded_data = self.client.get(key)
if key == "VAULT_CLIENT":
data = pickle.loads(encoded_data) if encoded_data else None
else:
data = self.decrypt(pickle.loads(encoded_data)) if encoded_data else None
except Exception as e:
self.log.error(f"Unable to get secret from cache: {key}", extra={"exception": e})
data = None
return data
def decrypt(self, data) -> Optional[str]:
"""Return encrypted password."""
if data:
# preventing circular import
from airflow.models.crypto import get_fernet
fernet = get_fernet()
if not fernet.is_encrypted:
raise AirflowException(
f"Can't decrypt encrypted secrets from cache. \nFERNET_KEY configuration is missing"
)
return json.loads(fernet.decrypt(bytes(data, "utf-8")).decode())
else:
return data
def encrypt(self, value: Optional[str]) -> Optional[str]:
"""Encrypt password and set in object attribute."""
# preventing circular imported
from airflow.models.crypto import get_fernet
fernet = get_fernet()
if not fernet.is_encrypted:
raise AirflowException(
f"Can't decrypt encrypted secrets from cache. \nFERNET_KEY configuration is missing"
)
return fernet.encrypt(bytes(json.dumps(value), "utf-8")).decode() if value else None
def flushdb(self) -> bool:
result = False
if self.client:
self.client.flushdb()
self.log.error("Cache Database has been invalidated.")
result = True
else:
self.log.error("Unable to invalidate the cache database.")
return result
```
For operational concern I also wrote a `flushdb` method which allows me to invalidate the cache if I do a mistake while setting and testing a new connection.
I also wrote a dag using the python operator to call the `flushdb` method, to make sure my team was independent.
```
def invalidate_redis_cache():
redis = RedisCache()
redis.flushdb()
return
with DAG(
dag_id="airflow_flushdb_cache",
default_args=default_args,
schedule_interval="@daily",
start_date=pendulum.today.add(days=-1),
catchup=False,
max_active_runs=1,
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
flush_cache = PythonOperator(task_id="invalidate_redis_cache", python_callable=invalidate_redis_cache)
start >> flush_cache >> end
```
We have been running the custom code for a week now, airflow has been really stable and our Vault is healthy as we like.
I started by using a TTL of 1 hour and along the week I pushed it to 24hrs.
We saw the vault requests passed from ~10k/hour to 50-100/hour and since I pushed it to 24h 50-100/day. I also set my flushdb dag to run daily to clean the redis db.
The following image presents the before and after the custom class setup in terms of calls made to Vault from Redis.
![image](https://user-images.githubusercontent.com/643002/178044942-16fab5c6-d31d-4c96-b38e-82e2b8ab93e9.png)
I can also say that redis didn't suffer from adding the centralized caching because the pod instance is using 38m cpu and 151mi ram.
Hope it can help someone. If this could be interesting I can propose a PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org