You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/11/05 23:32:25 UTC
[airflow] 01/02: Fix bug when checking for existence of a Variable
(#19395)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit bef01d94d46c8beb2cc0f535158c9bb24bf92a3d
Author: raphaelauv <ra...@users.noreply.github.com>
AuthorDate: Sat Nov 6 00:27:36 2021 +0100
Fix bug when checking for existence of a Variable (#19395)
`check_for_write_conflict` was a `staticmethod` but for some reason it was ignored
(cherry picked from commit 93d2a1626d4da4ae372f5c5edb47a12afc388d33)
---
airflow/models/variable.py | 3 ++-
tests/secrets/test_local_filesystem.py | 24 +++++++++++++++++--
tests/secrets/test_secrets.py | 44 ++++++++++++++++++++++++++++++++--
3 files changed, 66 insertions(+), 5 deletions(-)
diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index b724686..6da65c5 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -197,7 +197,7 @@ class Variable(Base, LoggingMixin):
"""
cls.check_for_write_conflict(key)
- if cls.get_variable_from_secrets(key) is None:
+ if cls.get_variable_from_secrets(key=key) is None:
raise KeyError(f'Variable {key} does not exist')
obj = session.query(cls).filter(cls.key == key).first()
@@ -223,6 +223,7 @@ class Variable(Base, LoggingMixin):
if self._val and self.is_encrypted:
self._val = fernet.rotate(self._val.encode('utf-8')).decode()
+ @staticmethod
def check_for_write_conflict(key: str) -> None:
"""
Logs a warning if a variable exists outside of the metastore.
diff --git a/tests/secrets/test_local_filesystem.py b/tests/secrets/test_local_filesystem.py
index 1c6c5ae..85f0aaa 100644
--- a/tests/secrets/test_local_filesystem.py
+++ b/tests/secrets/test_local_filesystem.py
@@ -25,9 +25,12 @@ from unittest import mock
import pytest
from parameterized import parameterized
+from airflow.configuration import ensure_secrets_loaded
from airflow.exceptions import AirflowException, AirflowFileParseException, ConnectionNotUnique
+from airflow.models import Variable
from airflow.secrets import local_filesystem
from airflow.secrets.local_filesystem import LocalFilesystemBackend
+from tests.test_utils.config import conf_vars
@contextmanager
@@ -380,15 +383,32 @@ class TestLocalFileBackend(unittest.TestCase):
assert "VAL_A" == backend.get_variable("KEY_A")
assert backend.get_variable("KEY_B") is None
+ @conf_vars(
+ {
+ (
+ "secrets",
+ "backend",
+ ): "airflow.secrets.local_filesystem.LocalFilesystemBackend",
+ ("secrets", "backend_kwargs"): '{"variables_file_path": "var.env"}',
+ }
+ )
+ def test_load_secret_backend_LocalFilesystemBackend(self):
+ with mock_local_file("KEY_A=VAL_A"):
+ backends = ensure_secrets_loaded()
+
+ backend_classes = [backend.__class__.__name__ for backend in backends]
+ assert 'LocalFilesystemBackend' in backend_classes
+ assert Variable.get("KEY_A") == "VAL_A"
+
def test_should_read_connection(self):
with NamedTemporaryFile(suffix=".env") as tmp_file:
tmp_file.write(b"CONN_A=mysql://host_a")
tmp_file.flush()
backend = LocalFilesystemBackend(connections_file_path=tmp_file.name)
- assert ["mysql://host_a"] == [conn.get_uri() for conn in backend.get_connections("CONN_A")]
+ assert "mysql://host_a" == backend.get_connection("CONN_A").get_uri()
assert backend.get_variable("CONN_B") is None
def test_files_are_optional(self):
backend = LocalFilesystemBackend()
- assert [] == backend.get_connections("CONN_A")
+ assert None is backend.get_connection("CONN_A")
assert backend.get_variable("VAR_A") is None
diff --git a/tests/secrets/test_secrets.py b/tests/secrets/test_secrets.py
index 31baf38..83a7fe3 100644
--- a/tests/secrets/test_secrets.py
+++ b/tests/secrets/test_secrets.py
@@ -137,10 +137,10 @@ class TestVariableFromSecrets(unittest.TestCase):
Test if Variable is present in Environment Variable, it does not look for it in
Metastore DB
"""
- mock_env_get.return_value = [["something"]] # returns nonempty list
+ mock_env_get.return_value = "something"
Variable.get_variable_from_secrets("fake_var_key")
mock_env_get.assert_called_once_with(key="fake_var_key")
- mock_meta_get.not_called()
+ mock_meta_get.assert_not_called()
def test_backend_fallback_to_default_var(self):
"""
@@ -149,3 +149,43 @@ class TestVariableFromSecrets(unittest.TestCase):
"""
variable_value = Variable.get(key="test_var", default_var="new")
assert "new" == variable_value
+
+ @conf_vars(
+ {
+ (
+ "secrets",
+ "backend",
+ ): "airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend",
+ ("secrets", "backend_kwargs"): '{"variables_prefix": "/airflow", "profile_name": null}',
+ }
+ )
+ @mock.patch.dict(
+ 'os.environ',
+ {
+ 'AIRFLOW_VAR_MYVAR': 'a_venv_value',
+ },
+ )
+ @mock.patch("airflow.secrets.metastore.MetastoreBackend.get_variable")
+ @mock.patch(
+ "airflow.providers.amazon.aws.secrets.systems_manager."
+ "SystemsManagerParameterStoreBackend.get_variable"
+ )
+ def test_backend_variable_order(self, mock_secret_get, mock_meta_get):
+ backends = ensure_secrets_loaded()
+ backend_classes = [backend.__class__.__name__ for backend in backends]
+ assert 'SystemsManagerParameterStoreBackend' in backend_classes
+
+ mock_secret_get.return_value = None
+ mock_meta_get.return_value = None
+
+ assert "a_venv_value" == Variable.get(key="MYVAR")
+ mock_secret_get.assert_called_with(key="MYVAR")
+ mock_meta_get.assert_not_called()
+
+ mock_secret_get.return_value = None
+ mock_meta_get.return_value = "a_metastore_value"
+ assert "a_metastore_value" == Variable.get(key="not_myvar")
+ mock_meta_get.assert_called_once_with(key="not_myvar")
+
+ mock_secret_get.return_value = "a_secret_value"
+ assert "a_secret_value" == Variable.get(key="not_myvar")