You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@liminal.apache.org by av...@apache.org on 2021/07/19 14:43:24 UTC
[incubator-liminal] branch master updated: Load liminal
configuration files using the config utils in liminal_apps_builder
This is an automated email from the ASF dual-hosted git repository.
aviemzur pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
The following commit(s) were added to refs/heads/master by this push:
new 0118bb0 Load liminal configuration files using the config utils in liminal_apps_builder
0118bb0 is described below
commit 0118bb0c67c940c3771e13c2cf7ffadba5527bfc
Author: Zion Rubin <zi...@naturalint.com>
AuthorDate: Mon Jul 19 17:43:17 2021 +0300
Load liminal configuration files using the config utils in liminal_apps_builder
---
liminal/build/liminal_apps_builder.py | 63 ++++++++++++--------------
liminal/core/config/config.py | 23 ++++++----
tests/liminal/core/config/test_config.py | 34 ++++++++++++--
tests/runners/airflow/tasks/test_spark_task.py | 6 +--
4 files changed, 75 insertions(+), 51 deletions(-)
diff --git a/liminal/build/liminal_apps_builder.py b/liminal/build/liminal_apps_builder.py
index 5facc7d..d8297af 100644
--- a/liminal/build/liminal_apps_builder.py
+++ b/liminal/build/liminal_apps_builder.py
@@ -19,9 +19,8 @@
import logging
import os
-import yaml
-
from liminal.build.image_builder import ImageBuilder, ServiceImageBuilderMixin
+from liminal.core.config.config import ConfigUtil
from liminal.core.util import files_util, class_util
@@ -29,40 +28,35 @@ def build_liminal_apps(path):
"""
Build images for liminal apps in path.
"""
- config_files = files_util.find_config_files(path)
-
- for config_file in config_files:
- logging.info(f'Building artifacts for file: {config_file}')
-
- base_path = os.path.dirname(config_file)
-
- with open(config_file) as stream:
- liminal_config = yaml.safe_load(stream)
-
- if 'pipelines' in liminal_config:
- for pipeline in liminal_config['pipelines']:
- for task in pipeline['tasks']:
- task_name = task['task']
-
- if 'source' in task:
- task_type = task['type']
- builder_class = __get_task_build_class(task_type)
- if builder_class:
- __build_image(base_path, task, builder_class)
- else:
- raise ValueError(f'No such task type: {task_type}')
+ config_util = ConfigUtil(path)
+ configs = config_util.safe_load(is_render_variables=True, soft_merge=True)
+
+ for liminal_config in configs:
+ base_path = os.path.dirname(files_util.resolve_pipeline_source_file(liminal_config['name']))
+ if 'pipelines' in liminal_config:
+ for pipeline in liminal_config['pipelines']:
+ for task in pipeline['tasks']:
+ task_name = task['task']
+
+ if 'source' in task:
+ task_type = task['type']
+ builder_class = __get_task_build_class(task_type)
+ if builder_class:
+ __build_image(base_path, task, builder_class)
else:
- logging.info(
- f'No source configured for task {task_name}, skipping build..')
-
- if 'services' in liminal_config:
- for service in liminal_config['services']:
- service_type = service['type']
- builder_class = __get_service_build_class(service_type)
- if builder_class:
- __build_image(base_path, service, builder_class)
+ raise ValueError(f'No such task type: {task_type}')
else:
- raise ValueError(f'No such service type: {service_type}')
+ logging.info(
+ f'No source configured for task {task_name}, skipping build..')
+
+ if 'services' in liminal_config:
+ for service in liminal_config['services']:
+ service_type = service['type']
+ builder_class = __get_service_build_class(service_type)
+ if builder_class:
+ __build_image(base_path, service, builder_class)
+ else:
+ raise ValueError(f'No such service type: {service_type}')
def __build_image(base_path, builder_config, builder):
@@ -95,7 +89,6 @@ task_build_types = class_util.find_subclasses_in_packages(
[image_builders_package],
ImageBuilder)
-
logging.info(f'Finished loading image builder implementations: {task_build_types}')
logging.info(f'Loading service image builder implementations..')
diff --git a/liminal/core/config/config.py b/liminal/core/config/config.py
index 28a5ff4..e43a870 100644
--- a/liminal/core/config/config.py
+++ b/liminal/core/config/config.py
@@ -50,7 +50,7 @@ class ConfigUtil:
self.snapshot_path = os.path.join(environment.get_airflow_home_dir(),
'../liminal_config_files')
- def safe_load(self, is_render_variables):
+ def safe_load(self, is_render_variables, soft_merge=False):
"""
:returns list of config files after enrich with defaults and supers
"""
@@ -65,9 +65,9 @@ class ConfigUtil:
logging.info(f'Loading yml {name}')
# noinspection PyBroadException
try:
- superliminal = self.__get_superliminal(subliminal)
+ superliminal = self.__get_superliminal(subliminal, soft_merge)
enriched_config = self.__merge_configs(subliminal, superliminal,
- is_render_variables)
+ is_render_variables, soft_merge)
enriched_configs.append(enriched_config)
except Exception:
logging.error(f'Failed to load yml {name}')
@@ -77,15 +77,15 @@ class ConfigUtil:
return self.loaded_subliminals
- def __merge_configs(self, subliminal, superliminal, is_render_variables):
+ def __merge_configs(self, subliminal, superliminal, is_render_variables, soft_merge):
if not superliminal:
return subliminal
sub = subliminal.copy()
supr = superliminal.copy()
- merged_superliminal = self.__merge_configs(supr, self.__get_superliminal(supr),
- is_render_variables)
+ merged_superliminal = self.__merge_configs(supr, self.__get_superliminal(supr, soft_merge),
+ is_render_variables, soft_merge)
sub[self.__EXECUTORS] = self.__merge_executors(sub, merged_superliminal)
@@ -94,7 +94,7 @@ class ConfigUtil:
else:
return self.__merge_superliminals(sub, merged_superliminal)
- def __get_superliminal(self, liminal):
+ def __get_superliminal(self, liminal, soft_merge):
superliminal = {}
if not self.__is_base_config(liminal):
superliminal_name = liminal.get(self.__SUPER, '')
@@ -102,10 +102,13 @@ class ConfigUtil:
superliminal = self.base
else:
superliminal = self.__get_config(superliminal_name)
-
if not superliminal:
- raise FileNotFoundError(
- f"superliminal '{superliminal_name}' is missing from '{self.configs_path}'")
+ supr_is_missing_msg = f"superliminal '{superliminal_name}' " + \
+ f"is missing from '{self.configs_path}'"
+ if soft_merge:
+ logging.warning(supr_is_missing_msg)
+ else:
+ raise FileNotFoundError(supr_is_missing_msg)
return superliminal
diff --git a/tests/liminal/core/config/test_config.py b/tests/liminal/core/config/test_config.py
index f1ed7a7..e6da54a 100644
--- a/tests/liminal/core/config/test_config.py
+++ b/tests/liminal/core/config/test_config.py
@@ -165,10 +165,10 @@ class TestHierarchicalConfig(TestCase):
config_util = ConfigUtil("")
self.assertEqual(base,
- config_util._ConfigUtil__get_superliminal(subliminal))
+ config_util._ConfigUtil__get_superliminal(subliminal, False))
self.assertEqual({},
- config_util._ConfigUtil__get_superliminal(base))
+ config_util._ConfigUtil__get_superliminal(base, False))
liminal = {
"name": "subliminal_test",
@@ -177,7 +177,7 @@ class TestHierarchicalConfig(TestCase):
}
with self.assertRaises(FileNotFoundError):
- config_util._ConfigUtil__get_superliminal(liminal)
+ config_util._ConfigUtil__get_superliminal(liminal, False)
@mock.patch("liminal.core.util.files_util.load")
def test_merge_superliminals(self, find_config_files_mock):
@@ -444,3 +444,31 @@ class TestHierarchicalConfig(TestCase):
m.assert_called_once_with(
os.path.join('/tmp', '../liminal_config_files/my_subliminal_test.yml'), 'w')
ydm.assert_called_once_with(expected, m.return_value, default_flow_style=False)
+
+ @mock.patch("liminal.core.util.files_util.load")
+ def test_soft_merge_load(self, find_config_files_mock):
+ subliminal = {
+ "name": "my_name",
+ "type": "sub",
+ "super": "my_super"
+ }
+ find_config_files_mock.return_value = {"my_subliminal_test": subliminal}
+
+ config_util = ConfigUtil("")
+
+ self.assertEqual([subliminal],
+ config_util.safe_load(is_render_variables=True, soft_merge=True))
+
+ def test_non_soft_merge_load(self):
+ subliminal = {
+ "name": "my_name",
+ "type": "sub",
+ "super": "my_super"
+ }
+
+ config_util = ConfigUtil("")
+
+ self.assertRaises(FileNotFoundError,
+ config_util._ConfigUtil__get_superliminal,
+ subliminal,
+ False)
diff --git a/tests/runners/airflow/tasks/test_spark_task.py b/tests/runners/airflow/tasks/test_spark_task.py
index 7836f64..5223ba7 100644
--- a/tests/runners/airflow/tasks/test_spark_task.py
+++ b/tests/runners/airflow/tasks/test_spark_task.py
@@ -61,7 +61,7 @@ class TestSparkTask(TestCase):
task_config=task_config
).get_runnable_command()
- self.assertEqual(actual, expected)
+ self.assertEqual(actual.sort(), expected.sort())
def test_missing_spark_arguments(self):
task_config = {
@@ -121,7 +121,7 @@ class TestSparkTask(TestCase):
'select * from dlk_visitor_funnel_dwh_staging.fact_events where '
"unified_Date_prt >= '{{yesterday_ds}}'",
'--output',
- 'mytable']
+ 'mytable'].sort()
actual = SparkTask(
'my_spark_task',
@@ -133,4 +133,4 @@ class TestSparkTask(TestCase):
task_config=task_config
).get_runnable_command()
- self.assertEqual(actual, expected)
+ self.assertEqual(actual.sort(), expected)