You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@liminal.apache.org by av...@apache.org on 2021/07/19 14:43:24 UTC

[incubator-liminal] branch master updated: Load liminal configuration files using the config utils in liminal_apps_builder

This is an automated email from the ASF dual-hosted git repository.

aviemzur pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git


The following commit(s) were added to refs/heads/master by this push:
     new 0118bb0  Load liminal configuration files using the config utils in liminal_apps_builder
0118bb0 is described below

commit 0118bb0c67c940c3771e13c2cf7ffadba5527bfc
Author: Zion Rubin <zi...@naturalint.com>
AuthorDate: Mon Jul 19 17:43:17 2021 +0300

    Load liminal configuration files using the config utils in liminal_apps_builder
---
 liminal/build/liminal_apps_builder.py          | 63 ++++++++++++--------------
 liminal/core/config/config.py                  | 23 ++++++----
 tests/liminal/core/config/test_config.py       | 34 ++++++++++++--
 tests/runners/airflow/tasks/test_spark_task.py |  6 +--
 4 files changed, 75 insertions(+), 51 deletions(-)

diff --git a/liminal/build/liminal_apps_builder.py b/liminal/build/liminal_apps_builder.py
index 5facc7d..d8297af 100644
--- a/liminal/build/liminal_apps_builder.py
+++ b/liminal/build/liminal_apps_builder.py
@@ -19,9 +19,8 @@
 import logging
 import os
 
-import yaml
-
 from liminal.build.image_builder import ImageBuilder, ServiceImageBuilderMixin
+from liminal.core.config.config import ConfigUtil
 from liminal.core.util import files_util, class_util
 
 
@@ -29,40 +28,35 @@ def build_liminal_apps(path):
     """
     Build images for liminal apps in path.
     """
-    config_files = files_util.find_config_files(path)
-
-    for config_file in config_files:
-        logging.info(f'Building artifacts for file: {config_file}')
-
-        base_path = os.path.dirname(config_file)
-
-        with open(config_file) as stream:
-            liminal_config = yaml.safe_load(stream)
-
-            if 'pipelines' in liminal_config:
-                for pipeline in liminal_config['pipelines']:
-                    for task in pipeline['tasks']:
-                        task_name = task['task']
-
-                        if 'source' in task:
-                            task_type = task['type']
-                            builder_class = __get_task_build_class(task_type)
-                            if builder_class:
-                                __build_image(base_path, task, builder_class)
-                            else:
-                                raise ValueError(f'No such task type: {task_type}')
+    config_util = ConfigUtil(path)
+    configs = config_util.safe_load(is_render_variables=True, soft_merge=True)
+
+    for liminal_config in configs:
+        base_path = os.path.dirname(files_util.resolve_pipeline_source_file(liminal_config['name']))
+        if 'pipelines' in liminal_config:
+            for pipeline in liminal_config['pipelines']:
+                for task in pipeline['tasks']:
+                    task_name = task['task']
+
+                    if 'source' in task:
+                        task_type = task['type']
+                        builder_class = __get_task_build_class(task_type)
+                        if builder_class:
+                            __build_image(base_path, task, builder_class)
                         else:
-                            logging.info(
-                                f'No source configured for task {task_name}, skipping build..')
-
-            if 'services' in liminal_config:
-                for service in liminal_config['services']:
-                    service_type = service['type']
-                    builder_class = __get_service_build_class(service_type)
-                    if builder_class:
-                        __build_image(base_path, service, builder_class)
+                            raise ValueError(f'No such task type: {task_type}')
                     else:
-                        raise ValueError(f'No such service type: {service_type}')
+                        logging.info(
+                            f'No source configured for task {task_name}, skipping build..')
+
+        if 'services' in liminal_config:
+            for service in liminal_config['services']:
+                service_type = service['type']
+                builder_class = __get_service_build_class(service_type)
+                if builder_class:
+                    __build_image(base_path, service, builder_class)
+                else:
+                    raise ValueError(f'No such service type: {service_type}')
 
 
 def __build_image(base_path, builder_config, builder):
@@ -95,7 +89,6 @@ task_build_types = class_util.find_subclasses_in_packages(
     [image_builders_package],
     ImageBuilder)
 
-
 logging.info(f'Finished loading image builder implementations: {task_build_types}')
 logging.info(f'Loading service image builder implementations..')
 
diff --git a/liminal/core/config/config.py b/liminal/core/config/config.py
index 28a5ff4..e43a870 100644
--- a/liminal/core/config/config.py
+++ b/liminal/core/config/config.py
@@ -50,7 +50,7 @@ class ConfigUtil:
         self.snapshot_path = os.path.join(environment.get_airflow_home_dir(),
                                           '../liminal_config_files')
 
-    def safe_load(self, is_render_variables):
+    def safe_load(self, is_render_variables, soft_merge=False):
         """
         :returns list of config files after enrich with defaults and supers
         """
@@ -65,9 +65,9 @@ class ConfigUtil:
             logging.info(f'Loading yml {name}')
             # noinspection PyBroadException
             try:
-                superliminal = self.__get_superliminal(subliminal)
+                superliminal = self.__get_superliminal(subliminal, soft_merge)
                 enriched_config = self.__merge_configs(subliminal, superliminal,
-                                                       is_render_variables)
+                                                       is_render_variables, soft_merge)
                 enriched_configs.append(enriched_config)
             except Exception:
                 logging.error(f'Failed to load yml {name}')
@@ -77,15 +77,15 @@ class ConfigUtil:
 
         return self.loaded_subliminals
 
-    def __merge_configs(self, subliminal, superliminal, is_render_variables):
+    def __merge_configs(self, subliminal, superliminal, is_render_variables, soft_merge):
         if not superliminal:
             return subliminal
 
         sub = subliminal.copy()
         supr = superliminal.copy()
 
-        merged_superliminal = self.__merge_configs(supr, self.__get_superliminal(supr),
-                                                   is_render_variables)
+        merged_superliminal = self.__merge_configs(supr, self.__get_superliminal(supr, soft_merge),
+                                                   is_render_variables, soft_merge)
 
         sub[self.__EXECUTORS] = self.__merge_executors(sub, merged_superliminal)
 
@@ -94,7 +94,7 @@ class ConfigUtil:
         else:
             return self.__merge_superliminals(sub, merged_superliminal)
 
-    def __get_superliminal(self, liminal):
+    def __get_superliminal(self, liminal, soft_merge):
         superliminal = {}
         if not self.__is_base_config(liminal):
             superliminal_name = liminal.get(self.__SUPER, '')
@@ -102,10 +102,13 @@ class ConfigUtil:
                 superliminal = self.base
             else:
                 superliminal = self.__get_config(superliminal_name)
-
                 if not superliminal:
-                    raise FileNotFoundError(
-                        f"superliminal '{superliminal_name}' is missing from '{self.configs_path}'")
+                    supr_is_missing_msg = f"superliminal '{superliminal_name}' " + \
+                                          f"is missing from '{self.configs_path}'"
+                    if soft_merge:
+                        logging.warning(supr_is_missing_msg)
+                    else:
+                        raise FileNotFoundError(supr_is_missing_msg)
 
         return superliminal
 
diff --git a/tests/liminal/core/config/test_config.py b/tests/liminal/core/config/test_config.py
index f1ed7a7..e6da54a 100644
--- a/tests/liminal/core/config/test_config.py
+++ b/tests/liminal/core/config/test_config.py
@@ -165,10 +165,10 @@ class TestHierarchicalConfig(TestCase):
         config_util = ConfigUtil("")
 
         self.assertEqual(base,
-                         config_util._ConfigUtil__get_superliminal(subliminal))
+                         config_util._ConfigUtil__get_superliminal(subliminal, False))
 
         self.assertEqual({},
-                         config_util._ConfigUtil__get_superliminal(base))
+                         config_util._ConfigUtil__get_superliminal(base, False))
 
         liminal = {
             "name": "subliminal_test",
@@ -177,7 +177,7 @@ class TestHierarchicalConfig(TestCase):
         }
 
         with self.assertRaises(FileNotFoundError):
-            config_util._ConfigUtil__get_superliminal(liminal)
+            config_util._ConfigUtil__get_superliminal(liminal, False)
 
     @mock.patch("liminal.core.util.files_util.load")
     def test_merge_superliminals(self, find_config_files_mock):
@@ -444,3 +444,31 @@ class TestHierarchicalConfig(TestCase):
                 m.assert_called_once_with(
                     os.path.join('/tmp', '../liminal_config_files/my_subliminal_test.yml'), 'w')
                 ydm.assert_called_once_with(expected, m.return_value, default_flow_style=False)
+
+    @mock.patch("liminal.core.util.files_util.load")
+    def test_soft_merge_load(self, find_config_files_mock):
+        subliminal = {
+            "name": "my_name",
+            "type": "sub",
+            "super": "my_super"
+        }
+        find_config_files_mock.return_value = {"my_subliminal_test": subliminal}
+
+        config_util = ConfigUtil("")
+
+        self.assertEqual([subliminal],
+                         config_util.safe_load(is_render_variables=True, soft_merge=True))
+
+    def test_non_soft_merge_load(self):
+        subliminal = {
+            "name": "my_name",
+            "type": "sub",
+            "super": "my_super"
+        }
+
+        config_util = ConfigUtil("")
+
+        self.assertRaises(FileNotFoundError,
+                          config_util._ConfigUtil__get_superliminal,
+                          subliminal,
+                          False)
diff --git a/tests/runners/airflow/tasks/test_spark_task.py b/tests/runners/airflow/tasks/test_spark_task.py
index 7836f64..5223ba7 100644
--- a/tests/runners/airflow/tasks/test_spark_task.py
+++ b/tests/runners/airflow/tasks/test_spark_task.py
@@ -61,7 +61,7 @@ class TestSparkTask(TestCase):
             task_config=task_config
         ).get_runnable_command()
 
-        self.assertEqual(actual, expected)
+        self.assertEqual(actual.sort(), expected.sort())
 
     def test_missing_spark_arguments(self):
         task_config = {
@@ -121,7 +121,7 @@ class TestSparkTask(TestCase):
                     'select * from dlk_visitor_funnel_dwh_staging.fact_events where '
                     "unified_Date_prt >= '{{yesterday_ds}}'",
                     '--output',
-                    'mytable']
+                    'mytable'].sort()
 
         actual = SparkTask(
             'my_spark_task',
@@ -133,4 +133,4 @@ class TestSparkTask(TestCase):
             task_config=task_config
         ).get_runnable_command()
 
-        self.assertEqual(actual, expected)
+        self.assertEqual(actual.sort(), expected)