You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/08 18:03:38 UTC

[GitHub] [airflow] drivard commented on issue #9713: Looping issue using Hashicorp Vault

drivard commented on issue #9713:
URL: https://github.com/apache/airflow/issues/9713#issuecomment-1179241434

   Hello,
   
   Last week I experienced exactly the same behaviour where I configured Hashicorp Vault to be the secrets backend. It resulted in putting down our vault cluster.
   We are using datadog for monitoring and we saw drastic amount of requests ~10-15k/hour trying to authenticate against the vault from the airflow auth_role configured for the secrets backend.
   
   We host airflow on kubernetes and we use the official helm chart with a custom docker image to deploy it. We host multiple instance of airflow for different environments.
   
   Reading this issue I thought I would try to implement a lru cache in the `VaultBackend` class to try reduce the number of requests sent to vautl, starting by the retrieval of the secrets themselves. Using something like `@lru_cache` decorator on top of the `get_connecions` or `get_variable` [function](https://github.com/apache/airflow/blob/main/airflow/providers/hashicorp/secrets/vault.py#L194-L216). 
   
   It didn't do any good and I believe it is because there was to many thread executing the same code. The cache didn't seem to be shared across the thread within a pod, plus there is too many pods which requires to get the secrets that it defeated the lru_cache thinking.
   
   That brought me to think about centralizing the cache and while I am using the chart to deploy airflow I also use the embedded redis as the celery broker. In the past, I have use redis as a database query cache, so I thought I could re-use the same behaviour to centralize the secrets cache inside a new redis db. Therefore I would gain that the pods or threads would share the access to the same cache.
   
   Here are the steps and ideas I followed:
   
   1. it is a connection or variable which is Json formatted. it can contain password or sensitive data.
    1.1 I need to encrypt the data before sending it to the cache 
   1. caching the vault client reduce a lot the validation against the `is_authenticated` method which generates 3 calls to the vault as per our monitoring
   1. create a small custom wheel package to inject my custom secret backend
   1. install my custom secrets backend inside my custom docker image and redeploy the chart
   1. let's re-use the internal redis by utilizing the `AIRFLOW__CELERY__BROKER_URL` environment variable but also don't mixte the data, so switch the redis db to 1 instead of 0.
   1. I must be able to set a TTL on the cache --> cache_ttl in backend_kwargs configuration
   
   The flow would be the following when trying to fetch a variable, a config or a connection:
   
   1. is the connection in the cache:
       1. YES: retrieve the cache from redis --> un-pickle the data --> decrypt the data using fernet --> continue the normal flow
       1. NO: retrieve the connection from vault --> encrypt the data using fernet --> pickle --> set the data in the cache --> continue the normal flow
   
   The resulting class is the following and I copied the [vault.py](https://github.com/apache/airflow/blob/main/airflow/providers/hashicorp/secrets/vault.py) file as a starter:
   
   ```
   #
   # Licensed to the Apache Software Foundation (ASF) under one
   # or more contributor license agreements.  See the NOTICE file
   # distributed with this work for additional information
   # regarding copyright ownership.  The ASF licenses this file
   # to you under the Apache License, Version 2.0 (the
   # "License"); you may not use this file except in compliance
   # with the License.  You may obtain a copy of the License at
   #
   #   http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing,
   # software distributed under the License is distributed on an
   # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   # KIND, either express or implied.  See the License for the
   # specific language governing permissions and limitations
   # under the License.
   """Objects relating to sourcing connections & variables from Hashicorp Vault"""
   import warnings
   from typing import TYPE_CHECKING, Optional
   
   from airflow.providers.hashicorp._internal_client.vault_client import _VaultClient
   from airflow.secrets import BaseSecretsBackend
   from airflow.utils.log.logging_mixin import LoggingMixin
   from custom_secrets.secrets.redis import RedisCache
   
   
   class CachedVaultBackend(BaseSecretsBackend, LoggingMixin):
       """
       Retrieves Connections and Variables from Hashicorp Vault.
   
       Configurable via ``airflow.cfg`` as follows:
   
       .. code-block:: ini
   
           [secrets]
           backend = airflow.providers.hashicorp.secrets.vault.CachedVaultBackend
           backend_kwargs = {
               "connections_path": "connections",
               "url": "http://127.0.0.1:8200",
               "mount_point": "airflow"
               }
   
       For example, if your keys are under ``connections`` path in ``airflow`` mount_point, this
       would be accessible if you provide ``{"connections_path": "connections"}`` and request
       conn_id ``smtp_default``.
   
       :param connections_path: Specifies the path of the secret to read to get Connections.
           (default: 'connections'). If set to None (null), requests for connections will not be sent to Vault.
       :param variables_path: Specifies the path of the secret to read to get Variable.
           (default: 'variables'). If set to None (null), requests for variables will not be sent to Vault.
       :param config_path: Specifies the path of the secret to read Airflow Configurations
           (default: 'config'). If set to None (null), requests for configurations will not be sent to Vault.
       :param url: Base URL for the Vault instance being addressed.
       :param auth_type: Authentication Type for Vault. Default is ``token``. Available values are:
           ('approle', 'aws_iam', 'azure', 'github', 'gcp', 'kubernetes', 'ldap', 'radius', 'token', 'userpass')
       :param auth_mount_point: It can be used to define mount_point for authentication chosen
             Default depends on the authentication method used.
       :param mount_point: The "path" the secret engine was mounted on. Default is "secret". Note that
            this mount_point is not used for authentication if authentication is done via a
            different engine. For authentication mount_points see, auth_mount_point.
       :param kv_engine_version: Select the version of the engine to run (``1`` or ``2``, default: ``2``).
       :param token: Authentication token to include in requests sent to Vault.
           (for ``token`` and ``github`` auth_type)
       :param token_path: path to file containing authentication token to include in requests sent to Vault
           (for ``token`` and ``github`` auth_type).
       :param username: Username for Authentication (for ``ldap`` and ``userpass`` auth_type).
       :param password: Password for Authentication (for ``ldap`` and ``userpass`` auth_type).
       :param key_id: Key ID for Authentication (for ``aws_iam`` and ''azure`` auth_type).
       :param secret_id: Secret ID for Authentication (for ``approle``, ``aws_iam`` and ``azure`` auth_types).
       :param role_id: Role ID for Authentication (for ``approle``, ``aws_iam`` auth_types).
       :param kubernetes_role: Role for Authentication (for ``kubernetes`` auth_type).
       :param kubernetes_jwt_path: Path for kubernetes jwt token (for ``kubernetes`` auth_type, default:
           ``/var/run/secrets/kubernetes.io/serviceaccount/token``).
       :param gcp_key_path: Path to Google Cloud Service Account key file (JSON) (for ``gcp`` auth_type).
              Mutually exclusive with gcp_keyfile_dict.
       :param gcp_keyfile_dict: Dictionary of keyfile parameters. (for ``gcp`` auth_type).
              Mutually exclusive with gcp_key_path.
       :param gcp_scopes: Comma-separated string containing OAuth2 scopes (for ``gcp`` auth_type).
       :param azure_tenant_id: The tenant id for the Azure Active Directory (for ``azure`` auth_type).
       :param azure_resource: The configured URL for the application registered in Azure Active Directory
              (for ``azure`` auth_type).
       :param radius_host: Host for radius (for ``radius`` auth_type).
       :param radius_secret: Secret for radius (for ``radius`` auth_type).
       :param radius_port: Port for radius (for ``radius`` auth_type).
       """
   
       def __init__(
           self,
           connections_path: str = "connections",
           variables_path: str = "variables",
           config_path: str = "config",
           url: Optional[str] = None,
           auth_type: str = "token",
           auth_mount_point: Optional[str] = None,
           mount_point: str = "secret",
           kv_engine_version: int = 2,
           token: Optional[str] = None,
           token_path: Optional[str] = None,
           username: Optional[str] = None,
           password: Optional[str] = None,
           key_id: Optional[str] = None,
           secret_id: Optional[str] = None,
           role_id: Optional[str] = None,
           kubernetes_role: Optional[str] = None,
           kubernetes_jwt_path: str = "/var/run/secrets/kubernetes.io/serviceaccount/token",
           gcp_key_path: Optional[str] = None,
           gcp_keyfile_dict: Optional[dict] = None,
           gcp_scopes: Optional[str] = None,
           azure_tenant_id: Optional[str] = None,
           azure_resource: Optional[str] = None,
           radius_host: Optional[str] = None,
           radius_secret: Optional[str] = None,
           radius_port: Optional[int] = None,
           cache_ttl: Optional[int] = 600,
           **kwargs,
       ):
           super().__init__()
           if connections_path is not None:
               self.connections_path = connections_path.rstrip("/")
           else:
               self.connections_path = connections_path
           if variables_path is not None:
               self.variables_path = variables_path.rstrip("/")
           else:
               self.variables_path = variables_path
           if config_path is not None:
               self.config_path = config_path.rstrip("/")
           else:
               self.config_path = config_path
           self.mount_point = mount_point
           self.kv_engine_version = kv_engine_version
           # Redis Cache Backend
           self.cache_ttl = cache_ttl
           self.redis_client = RedisCache()
           # to prevent many callback to vault on the lookup token endpoint
           # this will cache the vault_client under the `VAULT_CLIENT` key.
           self.vault_client = self.redis_client.get("VAULT_CLIENT")
           if not self.vault_client:
               self.vault_client = _VaultClient(
                   url=url,
                   auth_type=auth_type,
                   auth_mount_point=auth_mount_point,
                   mount_point=mount_point,
                   kv_engine_version=kv_engine_version,
                   token=token,
                   token_path=token_path,
                   username=username,
                   password=password,
                   key_id=key_id,
                   secret_id=secret_id,
                   role_id=role_id,
                   kubernetes_role=kubernetes_role,
                   kubernetes_jwt_path=kubernetes_jwt_path,
                   gcp_key_path=gcp_key_path,
                   gcp_keyfile_dict=gcp_keyfile_dict,
                   gcp_scopes=gcp_scopes,
                   azure_tenant_id=azure_tenant_id,
                   azure_resource=azure_resource,
                   radius_host=radius_host,
                   radius_secret=radius_secret,
                   radius_port=radius_port,
                   **kwargs,
               )
               self.redis_client.set("VAULT_CLIENT", self.vault_client, ttl=self.cache_ttl)
   
       def get_response(self, conn_id: str) -> Optional[dict]:
           """
           Get data from Vault
   
           :rtype: dict
           :return: The data from the Vault path if exists
           """
           if self.connections_path is None:
               return None
   
           secret_path = self.build_path(self.connections_path, conn_id)
   
           cached = self.redis_client.get(f"conn_id_{conn_id}")
           if not cached:
               cached = self.vault_client.get_secret(secret_path=secret_path)
               self.redis_client.set(f"conn_id_{conn_id}", cached, ttl=self.cache_ttl)
           return cached
   
       def get_conn_uri(self, conn_id: str) -> Optional[str]:
           """
           Get serialized representation of connection
   
           :param conn_id: The connection id
           :rtype: str
           :return: The connection uri retrieved from the secret
           """
           # Since VaultBackend implements `get_connection`, `get_conn_uri` is not used. So we
           # don't need to implement (or direct users to use) method `get_conn_value` instead
           warnings.warn(
               f"Method `{self.__class__.__name__}.get_conn_uri` is deprecated and will be removed "
               "in a future release.",
               PendingDeprecationWarning,
               stacklevel=2,
           )
           response = self.get_response(conn_id)
           return response.get("conn_uri") if response else None
   
       # Make sure connection is imported this way for type checking, otherwise when importing
       # the backend it will get a circular dependency and fail
       if TYPE_CHECKING:
           from airflow.models.connection import Connection
   
       def get_connection(self, conn_id: str) -> "Optional[Connection]":
           """
           Get connection from Vault as secret. Prioritize conn_uri if exists,
           if not fall back to normal Connection creation.
   
           :rtype: Connection
           :return: A Connection object constructed from Vault data
           """
           # The Connection needs to be locally imported because otherwise we get into cyclic import
           # problems when instantiating the backend during configuration
           from airflow.models.connection import Connection
   
           response = self.get_response(conn_id)
           if response is None:
               return None
   
           uri = response.get("conn_uri")
           if uri:
               return Connection(conn_id, uri=uri)
   
           return Connection(conn_id, **response)
   
       def get_variable(self, key: str) -> Optional[str]:
           """
           Get Airflow Variable
   
           :param key: Variable Key
           :rtype: str
           :return: Variable Value retrieved from the vault
           """
           if self.variables_path is None:
               return None
           else:
               secret_path = self.build_path(self.variables_path, key)
               cached = self.redis_client.get(f"variables_{key}")
               if not cached:
                   cached = self.vault_client.get_secret(secret_path=secret_path)
                   self.redis_client.set(f"variables_{key}", cached, ttl=self.cache_ttl)
               return cached.get("value") if cached else None
   
       def get_config(self, key: str) -> Optional[str]:
           """
           Get Airflow Configuration
   
           :param key: Configuration Option Key
           :rtype: str
           :return: Configuration Option Value retrieved from the vault
           """
           if self.config_path is None:
               return None
           else:
               secret_path = self.build_path(self.config_path, key)
               cached = self.redis_client.get(f"config_{key}")
               if not cached:
                   cached = self.vault_client.get_secret(secret_path=secret_path)
                   self.redis_client.set(f"config_{key}", cached, ttl=self.cache_ttl)
               return cached.get("value") if cached else None
   
   ```
   
   I also had to write a redis cache client:
   
   ```
   #
   # Licensed to the Apache Software Foundation (ASF) under one
   # or more contributor license agreements.  See the NOTICE file
   # distributed with this work for additional information
   # regarding copyright ownership.  The ASF licenses this file
   # to you under the Apache License, Version 2.0 (the
   # "License"); you may not use this file except in compliance
   # with the License.  You may obtain a copy of the License at
   #
   #   http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing,
   # software distributed under the License is distributed on an
   # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   # KIND, either express or implied.  See the License for the
   # specific language governing permissions and limitations
   # under the License.
   
   import json
   import os
   import pickle
   from typing import Optional
   
   import redis
   from airflow.exceptions import AirflowException
   from airflow.utils.log.logging_mixin import LoggingMixin
   
   ENV_VAR_PREFIX = "AIRFLOW__"
   
   
   class RedisCache(LoggingMixin):
       """
       Set secrets to a redis cache.
   
       Uses the same Redis server as the CELERY_BROKER_URL.
       """
   
       def __init__(self, db=1):
           try:
               # If redis is not available or in my case istio is blocking the database migration init containers
               redis_url = os.environ.get(f"{ENV_VAR_PREFIX}CELERY__BROKER_URL", f"redis://redis:6379/{db}")
               self.redis_url = redis_url[:-1] + f"{db}"
               self.client = redis.from_url(self.redis_url)
           except Exception as e:
               self.log.error(
                   f"Unable to connect to Redis server. Secrets will not be cached.", extra={"exception": e}
               )
               self.redis_url = None
               self.client = None
           return
   
       def set(self, key, data, ttl=300) -> bool:
           try:
               if key == "VAULT_CLIENT":
                   encoded_data = pickle.dumps(data)
               else:
                   encoded_data = pickle.dumps(self.encrypt(data))
               self.client.set(key, encoded_data, ex=ttl)
               result = True
           except Exception as e:
               self.log.error(f"Unable to set secret to cache: {key}", extra={"exception": e})
               result = False
           return result
   
       def get(self, key) -> Optional[str]:
           try:
               encoded_data = self.client.get(key)
               if key == "VAULT_CLIENT":
                   data = pickle.loads(encoded_data) if encoded_data else None
               else:
                   data = self.decrypt(pickle.loads(encoded_data)) if encoded_data else None
           except Exception as e:
               self.log.error(f"Unable to get secret from cache: {key}", extra={"exception": e})
               data = None
           return data
   
       def decrypt(self, data) -> Optional[str]:
           """Return encrypted password."""
           if data:
               # preventing circular import
               from airflow.models.crypto import get_fernet
   
               fernet = get_fernet()
               if not fernet.is_encrypted:
                   raise AirflowException(
                       f"Can't decrypt encrypted secrets from cache. \nFERNET_KEY configuration is missing"
                   )
               return json.loads(fernet.decrypt(bytes(data, "utf-8")).decode())
           else:
               return data
   
       def encrypt(self, value: Optional[str]) -> Optional[str]:
           """Encrypt password and set in object attribute."""
           # preventing circular imported
           from airflow.models.crypto import get_fernet
   
           fernet = get_fernet()
           if not fernet.is_encrypted:
               raise AirflowException(
                   f"Can't decrypt encrypted secrets from cache. \nFERNET_KEY configuration is missing"
               )
           return fernet.encrypt(bytes(json.dumps(value), "utf-8")).decode() if value else None
   
       def flushdb(self) -> bool:
           result = False
           if self.client:
               self.client.flushdb()
               self.log.error("Cache Database has been invalidated.")
               result = True
           else:
               self.log.error("Unable to invalidate the cache database.")
           return result
   ```
   
   For operational concern I also wrote a `flushdb` method which allows me to invalidate the cache if I do a mistake while setting and testing a new connection.
   I also wrote a dag using the python operator to call the `flushdb` method, to make sure my team was independent.
   
   ```
   def invalidate_redis_cache():
       redis = RedisCache()
       redis.flushdb()
       return
   
   with DAG(
       dag_id="airflow_flushdb_cache",
       default_args=default_args,
       schedule_interval="@daily",
       start_date=pendulum.today.add(days=-1),
       catchup=False,
       max_active_runs=1,
   ) as dag:
       start = EmptyOperator(task_id="start")
       end = EmptyOperator(task_id="end")
       flush_cache = PythonOperator(task_id="invalidate_redis_cache", python_callable=invalidate_redis_cache)
   
       start >> flush_cache >> end
   ```
   
   We have been running the custom code for a week now, airflow has been really stable and our Vault is healthy as we like.
   I started by using a TTL of 1 hour and along the week I pushed it to 24hrs. 
   
   We saw the vault requests passed from ~10k/hour to 50-100/hour and since I pushed it to 24h 50-100/day. I also set my flushdb dag to run daily to clean the redis db.
   
   The following image presents the before and after the custom class setup in terms of calls made to Vault from Redis.
   ![image](https://user-images.githubusercontent.com/643002/178044942-16fab5c6-d31d-4c96-b38e-82e2b8ab93e9.png)
   
   I can also say that redis didn't suffer from adding the centralized caching because the pod instance is using 38m cpu and 151mi ram.
   
   Hope it can help someone. If this could be interesting I can propose a PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org