You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@liminal.apache.org by av...@apache.org on 2021/11/21 07:17:49 UTC

[incubator-liminal] branch master updated: [LIMINAL-82] change variable resolution to be in real time

This is an automated email from the ASF dual-hosted git repository.

aviemzur pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git


The following commit(s) were added to refs/heads/master by this push:
     new da1d4c6  [LIMINAL-82] change variable resolution to be in real time
da1d4c6 is described below

commit da1d4c6377f09aba5a22be7c7d024354d2da418a
Author: Aviem Zur <av...@gmail.com>
AuthorDate: Sun Nov 21 09:17:45 2021 +0200

    [LIMINAL-82] change variable resolution to be in real time
---
 .../helloworld/hello_world.py                      |   3 +
 examples/liminal-getting-started/liminal.yml       |  19 +-
 .../runners/airflow/dag/liminal_register_dags.py   |   6 +-
 liminal/runners/airflow/executors/emr.py           |  38 ++--
 liminal/runners/airflow/executors/kubernetes.py    |  10 +-
 liminal/runners/airflow/model/executor.py          |  21 +++
 liminal/runners/airflow/model/task.py              |  35 +++-
 .../operators/operator_with_variable_resolving.py  | 201 +++++++++++++++++++++
 liminal/runners/airflow/tasks/containerable.py     |  22 +--
 .../airflow/tasks/create_cloudformation_stack.py   |  44 +++--
 .../airflow/tasks/delete_cloudformation_stack.py   |  23 ++-
 liminal/runners/airflow/tasks/job_end.py           |   4 +-
 liminal/runners/airflow/tasks/job_start.py         |   4 +-
 liminal/runners/airflow/tasks/python.py            |   6 +-
 liminal/runners/airflow/tasks/spark.py             |   8 +-
 requirements.txt                                   |   5 +-
 tests/runners/airflow/executors/test_emr.py        |  16 +-
 .../test_operator_with_variable_resolving.py       | 174 ++++++++++++++++++
 .../tasks/test_create_cloudformation_stack.py      |  16 +-
 .../tasks/test_delete_cloudformation_stack.py      |   7 +-
 tests/test_licenses.py                             |   6 +-
 21 files changed, 555 insertions(+), 113 deletions(-)

diff --git a/examples/liminal-getting-started/helloworld/hello_world.py b/examples/liminal-getting-started/helloworld/hello_world.py
index f9cd850..f3b75fb 100644
--- a/examples/liminal-getting-started/helloworld/hello_world.py
+++ b/examples/liminal-getting-started/helloworld/hello_world.py
@@ -17,8 +17,11 @@
 # under the License.
 
 import json
+import os
 
 print('Hello world!\n')
+print('Environment:')
+print(os.environ)
 
 with open('/mnt/gettingstartedvol/hello_world_output.json', 'w') as file:
     file.write(json.dumps({'hello': 1, 'world': 2}))
diff --git a/examples/liminal-getting-started/liminal.yml b/examples/liminal-getting-started/liminal.yml
index bcbfd7d..5521841 100644
--- a/examples/liminal-getting-started/liminal.yml
+++ b/examples/liminal-getting-started/liminal.yml
@@ -22,6 +22,8 @@ volumes:
     claim_name: gettingstartedvol-pvc
     local:
       path: .
+variables:
+  myvar: myval
 images:
   - image: python_hello_world_example_image
     type: python
@@ -50,28 +52,15 @@ pipelines:
     tasks:
       - task: python_hello_world_example
         type: python
-        description: static input task
         image: python_hello_world_example_image
         env_vars:
-          env1: "a"
-          env2: "b"
+          env1: "{{myvar}}"
+          env2: "foo"
         mounts:
           - mount: mymount
             volume: gettingstartedvol
             path: /mnt/gettingstartedvol
         cmd: python -u hello_world.py
-      - task: parallelized_hello_world_example_task
-        type: python
-        description: parallelized parallelized hello world example
-        image: python_hello_world_example_image
-        env_vars:
-          env1: "a"
-          env2: "b"
-        mounts:
-          - mount: mymount
-            volume: gettingstartedvol
-            path: /mnt/gettingstartedvol
-        split_input: True
         executors: 2
         cmd: python -u hello_world.py
       - task: python_hello_world_output_task
diff --git a/liminal/runners/airflow/dag/liminal_register_dags.py b/liminal/runners/airflow/dag/liminal_register_dags.py
index 46ff6e4..77a729d 100644
--- a/liminal/runners/airflow/dag/liminal_register_dags.py
+++ b/liminal/runners/airflow/dag/liminal_register_dags.py
@@ -38,8 +38,7 @@ def register_dags(configs_path):
     """
     logging.info(f'Registering DAGs from path: {configs_path}')
     config_util = ConfigUtil(configs_path)
-    # TODO - change is_render_variable to False when runtime resolving is available
-    configs = config_util.safe_load(is_render_variables=True)
+    configs = config_util.safe_load(is_render_variables=False)
 
     if os.getenv('POD_NAMESPACE') != "jenkins":
         config_util.snapshot_final_liminal_configs()
@@ -80,7 +79,8 @@ def register_dags(configs_path):
                         trigger_rule=trigger_rule,
                         liminal_config=config,
                         pipeline_config=pipeline,
-                        task_config=task
+                        task_config=task,
+                        variables=config.get('variables', {})
                     )
 
                     executor_id = task.get('executor')
diff --git a/liminal/runners/airflow/executors/emr.py b/liminal/runners/airflow/executors/emr.py
index 83354aa..9eb7ba3 100644
--- a/liminal/runners/airflow/executors/emr.py
+++ b/liminal/runners/airflow/executors/emr.py
@@ -44,28 +44,32 @@ class EMRExecutor(executor.Executor):
         self._validate_task_type(task)
 
         # assuming emr already exists
-        add_step = EmrAddStepsOperator(
-            task_id=f'{task.task_id}_add_step',
-            job_flow_id=self.job_flow_id,
-            job_flow_name=self.job_flow_name,
-            aws_conn_id=self.aws_conn_id,
-            steps=self.__generate_emr_step(task.task_id,
-                                           [str(x) for x in task.get_runnable_command()]),
-            cluster_states=self.cluster_states,
-            dag=task.dag
+        add_step = executor.add_variables_to_operator(
+            EmrAddStepsOperator(
+                task_id=f'{task.task_id}_add_step',
+                job_flow_id=self.job_flow_id,
+                job_flow_name=self.job_flow_name,
+                aws_conn_id=self.aws_conn_id,
+                steps=self.__generate_emr_step(task.task_id,
+                                               [str(x) for x in task.get_runnable_command()]),
+                cluster_states=self.cluster_states,
+            ),
+            task
         )
 
         if task.parent:
             parent.set_downstream(add_step)
 
-        emr_sensor_step = EmrStepSensor(
-            task_id=f'{task.task_id}_watch_step',
-            job_flow_id="{{ task_instance.xcom_pull('" + add_step.task_id +
-                        "', key='job_flow_id') }}",
-            step_id="{{ task_instance.xcom_pull('" + add_step.task_id +
-                    "', key='return_value')[0] }}",
-            aws_conn_id=self.aws_conn_id,
-            dag=task.dag
+        emr_sensor_step = executor.add_variables_to_operator(
+            EmrStepSensor(
+                task_id=f'{task.task_id}_watch_step',
+                job_flow_id="{{ task_instance.xcom_pull('" + add_step.task_id +
+                            "', key='job_flow_id') }}",
+                step_id="{{ task_instance.xcom_pull('" + add_step.task_id +
+                        "', key='return_value')[0] }}",
+                aws_conn_id=self.aws_conn_id
+            ),
+            task
         )
 
         add_step.set_downstream(emr_sensor_step)
diff --git a/liminal/runners/airflow/executors/kubernetes.py b/liminal/runners/airflow/executors/kubernetes.py
index be1e25e..7e69761 100644
--- a/liminal/runners/airflow/executors/kubernetes.py
+++ b/liminal/runners/airflow/executors/kubernetes.py
@@ -54,10 +54,12 @@ class KubernetesPodExecutor(executor.Executor):
 
         self._validate_task_type(task)
 
-        pod_task = KubernetesPodOperator(
-            dag=task.dag,
-            trigger_rule=task.trigger_rule,
-            **self.__kubernetes_kwargs(task)
+        pod_task = executor.add_variables_to_operator(
+            KubernetesPodOperator(
+                trigger_rule=task.trigger_rule,
+                **self.__kubernetes_kwargs(task)
+            ),
+            task
         )
 
         if parent:
diff --git a/liminal/runners/airflow/model/executor.py b/liminal/runners/airflow/model/executor.py
index 918ed21..9e30aac 100644
--- a/liminal/runners/airflow/model/executor.py
+++ b/liminal/runners/airflow/model/executor.py
@@ -18,6 +18,27 @@
 
 from abc import ABC, abstractmethod
 
+from airflow.models import BaseOperator
+
+from liminal.runners.airflow.operators.operator_with_variable_resolving import \
+    OperatorWithVariableResolving
+
+
+def add_variables_to_operator(operator, task) -> BaseOperator:
+    """
+    :param operator: Airflow operator
+    :type operator: BaseOperator
+    :param task: Task instance
+    :returns: OperatorWithVariableResolving wrapping given operator
+    """
+    return OperatorWithVariableResolving(
+        dag=task.dag,
+        task_config=task.task_config,
+        variables=task.variables,
+        liminal_task_instance=task,
+        operator=operator
+    )
+
 
 class Executor(ABC):
     """
diff --git a/liminal/runners/airflow/model/task.py b/liminal/runners/airflow/model/task.py
index e311b59..e7a2493 100644
--- a/liminal/runners/airflow/model/task.py
+++ b/liminal/runners/airflow/model/task.py
@@ -19,7 +19,12 @@
 """
 Base task.
 """
-from abc import ABC, abstractmethod
+import json
+from abc import ABC
+
+from airflow.models import BaseOperator
+
+from liminal.runners.airflow.model import executor
 
 
 class Task(ABC):
@@ -28,7 +33,7 @@ class Task(ABC):
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config):
+                 task_config, variables=None):
         self.liminal_config = liminal_config
         self.dag = dag
         self.pipeline_config = pipeline_config
@@ -36,3 +41,29 @@ class Task(ABC):
         self.parent = parent
         self.trigger_rule = trigger_rule
         self.task_config = task_config
+        self.variables = variables
+
+    def serialize(self) -> str:
+        """
+        :returns: JSON string representation of this task
+        """
+        data = {
+            'task_id': self.task_id,
+            'dag': None,
+            'parent': self.parent,
+            'trigger_rule': self.trigger_rule,
+            'liminal_config': self.liminal_config,
+            'pipeline_config': self.pipeline_config,
+            'task_config': self.task_config,
+            'variables': self.variables,
+        }
+
+        return json.dumps(data, default=str)
+
+    def _add_variables_to_operator(self, operator) -> BaseOperator:
+        """
+        :param operator: Airflow operator
+        :type operator: BaseOperator
+        :returns: OperatorWithVariableResolving wrapping given operator
+        """
+        return executor.add_variables_to_operator(operator, self)
diff --git a/liminal/runners/airflow/operators/operator_with_variable_resolving.py b/liminal/runners/airflow/operators/operator_with_variable_resolving.py
new file mode 100644
index 0000000..d363ba1
--- /dev/null
+++ b/liminal/runners/airflow/operators/operator_with_variable_resolving.py
@@ -0,0 +1,201 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+import logging
+import re
+from datetime import datetime
+from typing import Any, Dict, Optional, Set
+
+import jinja2
+from airflow.models import BaseOperator
+from airflow.settings import Session
+from jinja2 import Environment
+
+from liminal.runners.airflow.config import standalone_variable_backend
+
+_VAR_REGEX = '(.*){{([^}]*)}}(.*)'
+
+_BASE_OPERATOR_ATTRIBUTES = list(inspect.signature(BaseOperator.__init__).parameters.keys())
+
+
+class OperatorWithVariableResolving(BaseOperator):
+    """
+    Operator delegator that handles liminal variable substitution at run time
+    """
+
+    def __init__(self,
+                 dag,
+                 task_config: dict,
+                 variables: dict = None,
+                 liminal_task_instance=None,
+                 **kwargs):
+        self.operator_delegate: BaseOperator = kwargs.pop('operator')
+        self.liminal_task_instance = \
+            liminal_task_instance.serialize() if liminal_task_instance else None
+        if variables:
+            self.variables = variables.copy()
+        else:
+            self.variables = {}
+        self.task_config = task_config
+        super().__init__(
+            task_id=self.operator_delegate.task_id,
+            dag=dag
+        )
+        self._LOG = logging.getLogger(self.__class__.__name__)
+
+    def execute(self, context):
+        attributes = self._get_operator_delegate_attributes()
+        self._LOG.info(f'task_config: {self.task_config}')
+        self._LOG.info(f'variables: {self.variables}')
+        self.operator_delegate.template_fields = set(list(self.operator_delegate.template_fields) +
+                                                     attributes)
+        self.operator_delegate.render_template_fields(context,
+                                                      LiminalEnvironment(self.variables,
+                                                                         self.task_config))
+        self.operator_delegate.render_template_fields(context)
+
+        if 'ti' in context:
+            context['ti'].xcom_push(key="liminal_task_instance", value=self.liminal_task_instance)
+
+        return self.operator_delegate.execute(context)
+
+    def post_execute(self, context, result=None):
+        self.operator_delegate.post_execute(context, result)
+
+    def _get_operator_delegate_attributes(self):
+        return [
+            attr for attr in dir(self.operator_delegate) if
+            attr not in _BASE_OPERATOR_ATTRIBUTES and attr not in dir(BaseOperator)
+            and not attr.startswith('_')
+            and attr not in ('args', 'kwargs', 'lineage_data', 'subdag', 'template_fields')
+        ]
+
+    def pre_execute(self, context: Any):
+        return self.operator_delegate.pre_execute(context)
+
+    def on_kill(self) -> None:
+        self.operator_delegate.on_kill()
+
+    def render_template_fields(self, context: Dict,
+                               jinja_env: Optional[jinja2.Environment] = None) -> None:
+        pass
+
+    def render_template(self, content: Any, context: Dict,
+                        jinja_env: Optional[jinja2.Environment] = None,
+                        seen_oids: Optional[Set] = None) -> Any:
+        value = self.operator_delegate.render_template(content, context,
+                                                       LiminalEnvironment(self.variables,
+                                                                          self.task_config))
+        return self.operator_delegate.render_template(value, context, jinja_env, seen_oids)
+
+    def get_template_env(self) -> jinja2.Environment:
+        return self.operator_delegate.get_template_env()
+
+    def prepare_template(self) -> None:
+        self.operator_delegate.prepare_template()
+
+    def resolve_template_files(self) -> None:
+        self.operator_delegate.resolve_template_files()
+
+    def clear(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None,
+              upstream: bool = False, downstream: bool = False, session: Session = None):
+        return self.operator_delegate.clear(start_date, end_date, upstream, downstream, session)
+
+    def run(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None,
+            ignore_first_depends_on_past: bool = True, ignore_ti_state: bool = False,
+            mark_success: bool = False) -> None:
+        self.operator_delegate.run(start_date, end_date, ignore_first_depends_on_past,
+                                   ignore_ti_state,
+                                   mark_success)
+
+
+class LiminalEnvironment(Environment):
+    def __init__(self, variables, task_config=None):
+        super().__init__()
+        self.val = None
+        self.variables = variables.copy()
+        logging.info(f'variables: {variables}')
+        if task_config and 'variables' in task_config:
+            task_variables = task_config['variables']
+            if isinstance(task_variables, dict):
+                self.variables.update(task_variables)
+            elif isinstance(task_variables, str):
+                variables_key = self.from_string(task_variables).render()
+                if variables_key in variables:
+                    self.variables.update(variables[variables_key])
+
+    def from_string(self, val, **kwargs):
+        self.val = val
+        return self
+
+    def render(self, *_, **kwargs):
+        """
+        Implements jinja2.environment.Template.render
+        """
+        conf = kwargs['dag_run'].conf if 'dag_run' in kwargs else {}
+        return self.__render(self.val, conf, set())
+
+    def __render(self, val: str, dag_run_conf: dict, unresolved_tags: set):
+        token = re.match(_VAR_REGEX, val)
+        if token and token[2].strip() not in unresolved_tags:
+            tag_name = token[2].strip()
+            prefix = self.__render(token[1], dag_run_conf, unresolved_tags)
+            suffix = self.__render(token[3], dag_run_conf, unresolved_tags)
+            if dag_run_conf and tag_name in dag_run_conf:
+                return self.__render(prefix + str(dag_run_conf[tag_name]) + suffix,
+                                     dag_run_conf,
+                                     unresolved_tags)
+            elif tag_name in self.variables:
+                return self.__render(prefix + str(self.variables[tag_name]) + suffix,
+                                     dag_run_conf,
+                                     unresolved_tags)
+            else:
+                backend_value = standalone_variable_backend.get_variable(tag_name, None)
+                if backend_value:
+                    return self.__render(
+                        prefix + backend_value + suffix,
+                        dag_run_conf,
+                        unresolved_tags
+                    )
+                else:
+                    unresolved_tags.add(tag_name)
+                    return self.__render(
+                        prefix + '{{' + token[2] + '}}' + suffix,
+                        dag_run_conf,
+                        unresolved_tags
+                    )
+        else:
+            return val
+
+
+def add_variables_to_operator(operator, task) -> BaseOperator:
+    """
+    :param operator: Airflow operator
+    :type operator: BaseOperator
+    :param task: Task instance
+    :type task: Task
+    :returns: OperatorWithVariableResolving wrapping given operator
+    """
+    return OperatorWithVariableResolving(
+        dag=task.dag,
+        task_config=task.task_config,
+        variables=task.variables,
+        liminal_task_instance=task,
+        operator=operator
+    )
diff --git a/liminal/runners/airflow/tasks/containerable.py b/liminal/runners/airflow/tasks/containerable.py
index 9f731e3..08e1139 100644
--- a/liminal/runners/airflow/tasks/containerable.py
+++ b/liminal/runners/airflow/tasks/containerable.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 import json
 import logging
 import os
@@ -29,8 +30,6 @@ from liminal.runners.airflow.model import task
 _LOG = logging.getLogger(__name__)
 ENV = 'env'
 DEFAULT = 'default'
-OUTPUT_PATH = 'OUTPUT_PATH'
-OUTPUT_DESTINATION_PATH = 'OUTPUT_DESTINATION_PATH'
 
 
 class ContainerTask(task.Task, ABC):
@@ -39,19 +38,16 @@ class ContainerTask(task.Task, ABC):
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config):
+                 task_config, variables=None):
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
-                         pipeline_config, task_config)
+                         pipeline_config, task_config, variables)
         env = standalone_variable_backend.get_variable(ENV, DEFAULT)
         self.env_vars = self.__env_vars(env)
         self.image = self.task_config['image']
         self.mounts = self.task_config.get('mounts', [])
-        self.cmds, self.arguments = self._kubernetes_cmds_and_arguments(
-            self.env_vars.get(OUTPUT_PATH),
-            self.env_vars.get(OUTPUT_DESTINATION_PATH)
-        )
+        self.cmds, self.arguments = self._kubernetes_cmds_and_arguments()
 
-    def _kubernetes_cmds_and_arguments(self, output_path, output_destination_path):
+    def _kubernetes_cmds_and_arguments(self):
         cmds = ['/bin/sh', '-c']
 
         arguments = [
@@ -94,12 +90,4 @@ class ContainerTask(task.Task, ABC):
         if ENV not in env_vars:
             env_vars[ENV] = env
 
-        env_vars[OUTPUT_PATH] = self.task_config[
-            OUTPUT_PATH
-        ] if OUTPUT_PATH in self.task_config else '/tmp/s3_mount'
-
-        if 'output_destination_path' in self.task_config:
-            env_vars[OUTPUT_DESTINATION_PATH] = self.task_config[
-                'output_destination_path'
-            ]
         return dict([(k, str(v)) for k, v in env_vars.items()])
diff --git a/liminal/runners/airflow/tasks/create_cloudformation_stack.py b/liminal/runners/airflow/tasks/create_cloudformation_stack.py
index c94d640..2ac1605 100644
--- a/liminal/runners/airflow/tasks/create_cloudformation_stack.py
+++ b/liminal/runners/airflow/tasks/create_cloudformation_stack.py
@@ -22,6 +22,8 @@ from flatdict import FlatDict
 
 from liminal.runners.airflow.operators.cloudformation import CloudFormationCreateStackOperator, \
     CloudFormationCreateStackSensor, CloudFormationHook
+from liminal.runners.airflow.operators.operator_with_variable_resolving import \
+    OperatorWithVariableResolving
 from liminal.runners.airflow.tasks import airflow
 
 
@@ -31,37 +33,41 @@ class CreateCloudFormationStackTask(airflow.AirflowTask):
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config):
+                 task_config, variables=None):
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
-                         pipeline_config, task_config)
+                         pipeline_config, task_config, variables)
         self.stack_name = task_config['stack_name']
 
     def apply_task_to_dag(self):
-        check_cloudformation_stack_exists_task = BranchPythonOperator(
-            op_kwargs={'stack_name': self.stack_name},
-            task_id=f'is-cloudformation-{self.task_id}-running',
-            python_callable=self.__cloudformation_stack_running_branch,
-            dag=self.dag
+        check_cloudformation_stack_exists_task = self._add_variables_to_operator(
+            BranchPythonOperator(
+                templates_dict={'stack_name': self.stack_name},
+                task_id=f'is-cloudformation-{self.task_id}-running',
+                python_callable=self.__cloudformation_stack_running_branch,
+                provide_context=True,
+            )
         )
 
-        create_cloudformation_stack_task = CloudFormationCreateStackOperator(
-            task_id=f'create-cloudformation-{self.task_id}',
-            params={
-                **self.__reformatted_params()
-            },
-            dag=self.dag
+        create_cloudformation_stack_task = self._add_variables_to_operator(
+            CloudFormationCreateStackOperator(
+                task_id=f'create-cloudformation-{self.task_id}',
+                params={
+                    **self.__reformatted_params()
+                }
+            )
         )
 
-        create_stack_sensor_task = CloudFormationCreateStackSensor(
-            task_id=f'cloudformation-watch-{self.task_id}-create',
-            stack_name=self.stack_name,
-            dag=self.dag
+        create_stack_sensor_task = self._add_variables_to_operator(
+            CloudFormationCreateStackSensor(
+                task_id=f'cloudformation-watch-{self.task_id}-create',
+                stack_name=self.stack_name,
+            )
         )
 
         stack_creation_end_task = DummyOperator(
             task_id=f'creation-end-{self.task_id}',
-            dag=self.dag,
-            trigger_rule='all_done'
+            trigger_rule='all_done',
+            dag=self.dag
         )
 
         if self.parent:
diff --git a/liminal/runners/airflow/tasks/delete_cloudformation_stack.py b/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
index 324c961..756f645 100644
--- a/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
+++ b/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python_operator import BranchPythonOperator
 from airflow.utils.trigger_rule import TriggerRule
@@ -30,9 +31,9 @@ class DeleteCloudFormationStackTask(airflow.AirflowTask):
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config):
+                 task_config, variables=None):
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
-                         pipeline_config, task_config)
+                         pipeline_config, task_config, variables)
         self.stack_name = task_config['stack_name']
 
     def apply_task_to_dag(self):
@@ -44,16 +45,18 @@ class DeleteCloudFormationStackTask(airflow.AirflowTask):
             dag=self.dag
         )
 
-        delete_stack_task = CloudFormationDeleteStackOperator(
-            task_id=f'delete-cloudformation-{self.task_id}',
-            params={'StackName': self.stack_name},
-            dag=self.dag
+        delete_stack_task = self._add_variables_to_operator(
+            CloudFormationDeleteStackOperator(
+                task_id=f'delete-cloudformation-{self.task_id}',
+                params={'StackName': self.stack_name},
+            )
         )
 
-        delete_stack_sensor = CloudFormationDeleteStackSensor(
-            task_id=f'cloudformation-watch-{self.task_id}-delete',
-            stack_name=self.stack_name,
-            dag=self.dag
+        delete_stack_sensor = self._add_variables_to_operator(
+            CloudFormationDeleteStackSensor(
+                task_id=f'cloudformation-watch-{self.task_id}-delete',
+                stack_name=self.stack_name,
+            )
         )
 
         stack_delete_end_task = DummyOperator(
diff --git a/liminal/runners/airflow/tasks/job_end.py b/liminal/runners/airflow/tasks/job_end.py
index ff5cffa..4f8c295 100644
--- a/liminal/runners/airflow/tasks/job_end.py
+++ b/liminal/runners/airflow/tasks/job_end.py
@@ -26,9 +26,9 @@ class JobEndTask(airflow.AirflowTask):
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config):
+                 task_config, variables=None):
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                         task_config)
+                         task_config, variables)
         metrics = self.liminal_config.get('metrics', {})
         self.metrics_namespace = metrics.get('namespace', '')
         self.metrics_backends = metrics.get('backends', [])
diff --git a/liminal/runners/airflow/tasks/job_start.py b/liminal/runners/airflow/tasks/job_start.py
index b6bef32..78b69a2 100644
--- a/liminal/runners/airflow/tasks/job_start.py
+++ b/liminal/runners/airflow/tasks/job_start.py
@@ -26,9 +26,9 @@ class JobStartTask(airflow.AirflowTask):
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config):
+                 task_config, variables=None):
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                         task_config)
+                         task_config, variables)
         metrics = self.liminal_config.get('metrics', {})
         self.metrics_namespace = metrics.get('namespace', '')
         self.metrics_backends = metrics.get('backends', [])
diff --git a/liminal/runners/airflow/tasks/python.py b/liminal/runners/airflow/tasks/python.py
index 878ce62..81163dd 100644
--- a/liminal/runners/airflow/tasks/python.py
+++ b/liminal/runners/airflow/tasks/python.py
@@ -24,14 +24,14 @@ class PythonTask(containerable.ContainerTask):
     Python task.
     """
 
-    def _kubernetes_cmds_and_arguments(self, output_path, output_destination_path):
+    def _kubernetes_cmds_and_arguments(self):
         cmds = ['/bin/bash', '-c']
         arguments = [
 
-            self.__cmd(output_path, output_destination_path)
+            self.__cmd()
         ]
 
         return cmds, arguments
 
-    def __cmd(self, output_path, output_destination_path):
+    def __cmd(self):
         return self.task_config['cmd']
diff --git a/liminal/runners/airflow/tasks/spark.py b/liminal/runners/airflow/tasks/spark.py
index 247df2c..8938011 100644
--- a/liminal/runners/airflow/tasks/spark.py
+++ b/liminal/runners/airflow/tasks/spark.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from itertools import chain
 
 from flatdict import FlatDict
@@ -28,11 +29,12 @@ class SparkTask(hadoop.HadoopTask, containerable.ContainerTask):
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config):
+                 task_config, variables=None):
         task_config['image'] = task_config.get('image', '')
         task_config['cmd'] = task_config.get('cmd', [])
+        task_config['env_vars'] = {'SPARK_LOCAL_HOSTNAME': 'localhost'}
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
-                         pipeline_config, task_config)
+                         pipeline_config, task_config, variables)
 
     def get_runnable_command(self):
         """
@@ -88,5 +90,5 @@ class SparkTask(hadoop.HadoopTask, containerable.ContainerTask):
     def __interleaving(keys, values):
         return list(chain.from_iterable(zip(keys, values)))
 
-    def _kubernetes_cmds_and_arguments(self, output_path, output_destination_path):
+    def _kubernetes_cmds_and_arguments(self):
         return self.__generate_spark_submit(), []
diff --git a/requirements.txt b/requirements.txt
index 1b3b0bf..415ee41 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -20,7 +20,7 @@ docker==4.2.0
 apache-airflow==2.1.2
 click==7.1.1
 Flask==1.1.1
-pyyaml==5.3.1
+pyyaml==5.4.1
 boto3==1.17.112
 botocore==1.20.112
 wheel==0.36.2
@@ -43,4 +43,5 @@ requests==2.25.0
 apache-airflow-providers-amazon==1.4.0
 apache-airflow-providers-cncf-kubernetes==1.0.2
 #apache-airflow-providers-google==4.0.0
-#apache-airflow[google,amazon,apache.spark]==2.0.0
\ No newline at end of file
+#apache-airflow[google,amazon,apache.spark]==2.0.0
+cfn-lint==0.53.0
diff --git a/tests/runners/airflow/executors/test_emr.py b/tests/runners/airflow/executors/test_emr.py
index b4d107b..19c7b99 100644
--- a/tests/runners/airflow/executors/test_emr.py
+++ b/tests/runners/airflow/executors/test_emr.py
@@ -28,6 +28,8 @@ from moto import mock_emr
 
 from liminal.runners.airflow import DummyDag
 from liminal.runners.airflow.executors.emr import EMRExecutor
+from liminal.runners.airflow.operators.operator_with_variable_resolving import \
+    OperatorWithVariableResolving
 from liminal.runners.airflow.tasks import hadoop
 from tests.util import dag_test_utils
 
@@ -80,6 +82,8 @@ class TestEMRExecutorTask(TestCase):
         self.hadoop_task.dag = self.dag
         self.hadoop_task.trigger_rule = 'all_done'
         self.hadoop_task.parent = None
+        self.hadoop_task.task_config = {}
+        self.hadoop_task.variables = {}
 
         self.emr = EMRExecutor(
             self.executor_name,
@@ -92,8 +96,10 @@ class TestEMRExecutorTask(TestCase):
 
         self.assertEqual(len(self.dag.tasks), 2)
 
-        self.assertIsInstance(self.dag.tasks[0], EmrAddStepsOperator)
-        self.assertIsInstance(self.dag.tasks[1], EmrStepSensor)
+        self.assertIsInstance(self.dag.tasks[0], OperatorWithVariableResolving)
+        self.assertIsInstance(self.dag.tasks[0].operator_delegate, EmrAddStepsOperator)
+        self.assertIsInstance(self.dag.tasks[1], OperatorWithVariableResolving)
+        self.assertIsInstance(self.dag.tasks[1].operator_delegate, EmrStepSensor)
 
     @mock.patch.object(EmrHook, 'get_conn')
     def test_add_step(self, mock_emr_hook_get_conn):
@@ -107,7 +113,8 @@ class TestEMRExecutorTask(TestCase):
 
         emr_add_step_task = self.dag.tasks[0]
 
-        self.assertIsInstance(emr_add_step_task, EmrAddStepsOperator)
+        self.assertIsInstance(emr_add_step_task, OperatorWithVariableResolving)
+        self.assertIsInstance(emr_add_step_task.operator_delegate, EmrAddStepsOperator)
 
         emr_add_step_task.render_template_fields({})
 
@@ -133,6 +140,7 @@ class TestEMRExecutorTask(TestCase):
 
         emr_watch_step_task = self.dag.tasks[1]
 
-        self.assertIsInstance(emr_watch_step_task, EmrStepSensor)
+        self.assertIsInstance(emr_watch_step_task, OperatorWithVariableResolving)
+        self.assertIsInstance(emr_watch_step_task.operator_delegate, EmrStepSensor)
 
         # todo - elaborate tests
diff --git a/tests/runners/airflow/operators/test_operator_with_variable_resolving.py b/tests/runners/airflow/operators/test_operator_with_variable_resolving.py
new file mode 100644
index 0000000..53a875d
--- /dev/null
+++ b/tests/runners/airflow/operators/test_operator_with_variable_resolving.py
@@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from unittest import TestCase
+from unittest.mock import patch, MagicMock
+
+from airflow.models import BaseOperator
+
+from liminal.runners.airflow.operators.operator_with_variable_resolving import \
+    OperatorWithVariableResolving
+
+
+class TestKubernetesPodOperatorWithAutoImage(TestCase):
+    @patch.dict(os.environ,
+                {'env': 'myenv', 'LIMINAL_STAND_ALONE_MODE': 'True', 'nested': 'value1'})
+    def test_value_from_environment_and_liminal_config(self):
+        variables = {'my-image': 'my-image', 'something-value1': 'stg'}
+        operator = TestOperator(task_id='abc', field='{{env}}', expected='{{my-image}}')
+        ret_val = OperatorWithVariableResolving(dag=None,
+                                                operator=operator,
+                                                task_id='my_task',
+                                                task_config={},
+                                                variables=variables).execute({})
+
+        self.assertEqual(operator.field, 'myenv')
+        self.assertEqual(ret_val, 'my-image')
+
+    @patch.dict(os.environ,
+                {'env': 'myenv', 'LIMINAL_STAND_ALONE_MODE': 'True', 'nested': 'value1'})
+    def test_value_from_task_variables(self):
+        variables = {'my-image': 'my-image', 'my_dict_var': {'env': 'myenv2'}}
+        operator = TestOperator(task_id='abc', field='{{env}}', expected='{{my-image}}')
+        ret_val = OperatorWithVariableResolving(dag=None,
+                                                operator=operator,
+                                                task_id='my_task',
+                                                task_config={'variables': 'my_dict_var'},
+                                                variables=variables).execute({})
+
+        self.assertEqual(operator.field, 'myenv2')
+        self.assertEqual(ret_val, 'my-image')
+
+    @patch.dict(os.environ,
+                {'env': 'myenv', 'LIMINAL_STAND_ALONE_MODE': 'True', 'nested': 'value1'})
+    def test_value_from_inline_task_variables(self):
+        variables = {'my-image': 'my-image'}
+        operator = TestOperator(task_id='abc', field='{{env}}', expected='{{my-image}}')
+        ret_val = OperatorWithVariableResolving(dag=None,
+                                                operator=operator,
+                                                task_id='my_task',
+                                                task_config={'variables': {'env': 'myenv2'}},
+                                                variables=variables).execute({})
+
+        self.assertEqual(operator.field, 'myenv2')
+        self.assertEqual(ret_val, 'my-image')
+
+    @patch.dict(os.environ, {'env': 'staging', 'LIMINAL_STAND_ALONE_MODE': 'True'})
+    def test_nested_variables(self):
+        variables = {'nested-the-s3-location': 'good', 'my-image': 'my-image',
+                     'something-value1': 'stg', 'env_val-staging': 'the-s3-location',
+                     'nested': 'value1'}
+        operator = TestOperator(task_id='abc', field='something-{{nested-{{env_val-{{env}}}}}}',
+                                expected='{{my-image}}')
+
+        OperatorWithVariableResolving(dag=None,
+                                      operator=operator,
+                                      task_id='my_task',
+                                      task_config={},
+                                      variables=variables).execute({})
+
+        self.assertEqual(operator.field, 'something-good')
+
+    @patch.dict(os.environ, {'env': 'staging', 'LIMINAL_STAND_ALONE_MODE': 'True'})
+    def test_dag_run_conf_parameters(self):
+        dag_run = MagicMock()
+        dag_run.conf = {'nested-the-s3-location': 'good', 'my-image': 'my-image',
+                        'something-value1': 'stg', 'env_val-staging': 'the-s3-location',
+                        'nested': 'value1'}
+        operator = TestOperator(task_id='abc', field='something-{{nested-{{env_val-{{env}}}}}}',
+                                expected='{{my-image}}')
+        OperatorWithVariableResolving(dag=None,
+                                      operator=operator,
+                                      task_id='my_task',
+                                      task_config={},
+                                      variables={}).execute({'dag_run': dag_run})
+
+        self.assertEqual(operator.field, 'something-good')
+
+    @patch.dict(os.environ, {'env': 'staging', 'LIMINAL_STAND_ALONE_MODE': 'True'})
+    def test_duplicated_params(self):
+        dag_run = MagicMock()
+        dag_run.conf = {'nested-the-s3-location': 'good', 'my-image': 'my-image',
+                        'something-value1': 'stg', 'env_val-staging': 'the-s3-location',
+                        'nested': 'value1'}
+        operator = TestOperator(task_id='abc', field='something-{{nested-{{env_val-{{env}}}}}}-'
+                                                     '{{nested-{{env_val-{{env}}}}}}',
+                                expected='{{my-image}}')
+        OperatorWithVariableResolving(dag=None,
+                                      operator=operator,
+                                      task_id='my_task',
+                                      task_config={},
+                                      variables={}).execute({'dag_run': dag_run})
+
+        self.assertEqual(operator.field, 'something-good-good')
+
+        operator = TestOperator(task_id='abc', field='something-{{env}} {{env}}',
+                                expected='{{my-image}}')
+        OperatorWithVariableResolving(dag=None,
+                                      operator=operator,
+                                      task_id='my_task',
+                                      task_config={},
+                                      variables={}).execute({'dag_run': dag_run})
+
+        self.assertEqual(operator.field, 'something-staging staging')
+
+        dag_run.conf = {'var1': '{{var2}}', 'var2': 'bla', 'my-image': 'my-image'}
+        operator = TestOperator(task_id='abc', field='{{ var1 }}',
+                                expected='{{my-image}}')
+        OperatorWithVariableResolving(dag=None,
+                                      operator=operator,
+                                      task_id='my_task',
+                                      task_config={},
+                                      variables={}).execute({'dag_run': dag_run})
+
+        self.assertEqual(operator.field, 'bla')
+
+    @patch.dict(os.environ, {'env': 'staging', 'LIMINAL_STAND_ALONE_MODE': 'True'})
+    def test_non_existing_param(self):
+        dag_run = MagicMock()
+        dag_run.conf = {'var1': '{{var2}}', 'var2': 'bla'}
+        operator = TestOperator(task_id='abc', field='{{myimage}}',
+                                expected='{{myimage}}')
+        OperatorWithVariableResolving(dag=None,
+                                      operator=operator,
+                                      task_id='my_task',
+                                      task_config={},
+                                      variables={}).execute({'dag_run': dag_run})
+
+        self.assertEqual(operator.field, '')
+
+
+class BaseTestOperator(BaseOperator):
+
+    def execute(self, context):
+        raise NotImplementedError()
+
+    def __init__(self, field, expected, *args, **kwargs):
+        self.field = field
+        self.expected = expected
+        super().__init__(*args, **kwargs)
+
+
+class TestOperator(BaseTestOperator):
+
+    def __init__(self, *args, **kwargs):
+        super(TestOperator, self).__init__(*args, **kwargs)
+
+    def execute(self, context):
+        return self.expected
diff --git a/tests/runners/airflow/tasks/test_create_cloudformation_stack.py b/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
index a2317d1..e57dad4 100644
--- a/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
+++ b/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
@@ -26,6 +26,7 @@ from moto import mock_cloudformation
 from liminal.runners.airflow.executors import airflow
 from liminal.runners.airflow.operators.cloudformation import CloudFormationCreateStackOperator, \
     CloudFormationCreateStackSensor, CloudFormationHook
+from liminal.runners.airflow.operators.operator_with_variable_resolving import OperatorWithVariableResolving
 from liminal.runners.airflow.tasks.create_cloudformation_stack import CreateCloudFormationStackTask
 from tests.util import dag_test_utils
 
@@ -87,9 +88,12 @@ class TestCreateCloudFormationStackTask(TestCase):
     def test_apply_task_to_dag(self):
         self.assertEqual(len(self.dag.tasks), 4)
 
-        self.assertIsInstance(self.dag.tasks[0], BranchPythonOperator)
-        self.assertIsInstance(self.dag.tasks[1], CloudFormationCreateStackOperator)
-        self.assertIsInstance(self.dag.tasks[2], CloudFormationCreateStackSensor)
+        self.assertIsInstance(self.dag.tasks[0], OperatorWithVariableResolving)
+        self.assertIsInstance(self.dag.tasks[0].operator_delegate, BranchPythonOperator)
+        self.assertIsInstance(self.dag.tasks[1], OperatorWithVariableResolving)
+        self.assertIsInstance(self.dag.tasks[1].operator_delegate, CloudFormationCreateStackOperator)
+        self.assertIsInstance(self.dag.tasks[2], OperatorWithVariableResolving)
+        self.assertIsInstance(self.dag.tasks[2].operator_delegate, CloudFormationCreateStackSensor)
         self.assertIsInstance(self.dag.tasks[3], DummyOperator)
 
     def test_cloudformation_does_not_exist(self):
@@ -98,7 +102,7 @@ class TestCreateCloudFormationStackTask(TestCase):
             mock_cf_conn.describe_stacks.return_value.raiseError.side_effect = Exception()
 
             mock_conn.return_value = mock_cf_conn
-            is_cloudformation_exists = self.dag.tasks[0]
+            is_cloudformation_exists = self.dag.tasks[0].operator_delegate
 
             print(is_cloudformation_exists)
             self.assertEqual(
@@ -106,7 +110,7 @@ class TestCreateCloudFormationStackTask(TestCase):
                 'create-cloudformation-create_emr')
 
     def test_cloudformation_exist_and_running(self):
-        is_cloudformation_exists = self.dag.tasks[0]
+        is_cloudformation_exists = self.dag.tasks[0].operator_delegate
 
         for status in ['CREATE_COMPLETE', 'DELETE_FAILED']:
             with mock.patch.object(CloudFormationHook, 'get_conn') as mock_conn:
@@ -126,7 +130,7 @@ class TestCreateCloudFormationStackTask(TestCase):
                     'creation-end-create_emr')
 
     def test_cloudformation_exists_and_not_running(self):
-        is_cloudformation_exists = self.dag.tasks[0]
+        is_cloudformation_exists = self.dag.tasks[0].operator_delegate
 
         for status in ['DELETED']:
             with mock.patch.object(CloudFormationHook, 'get_conn') as mock_conn:
diff --git a/tests/runners/airflow/tasks/test_delete_cloudformation_stack.py b/tests/runners/airflow/tasks/test_delete_cloudformation_stack.py
index fc96114..dbb2656 100644
--- a/tests/runners/airflow/tasks/test_delete_cloudformation_stack.py
+++ b/tests/runners/airflow/tasks/test_delete_cloudformation_stack.py
@@ -27,6 +27,7 @@ from moto import mock_cloudformation
 
 from liminal.runners.airflow.operators.cloudformation import CloudFormationDeleteStackOperator, \
     CloudFormationDeleteStackSensor
+from liminal.runners.airflow.operators.operator_with_variable_resolving import OperatorWithVariableResolving
 from liminal.runners.airflow.tasks.delete_cloudformation_stack import DeleteCloudFormationStackTask
 from tests.util import dag_test_utils
 
@@ -68,8 +69,10 @@ class TestDeleteCloudFormationStackTask(TestCase):
         self.assertEqual(len(self.dag.tasks), 4)
 
         self.assertIsInstance(self.dag.tasks[0], BranchPythonOperator)
-        self.assertIsInstance(self.dag.tasks[1], CloudFormationDeleteStackOperator)
-        self.assertIsInstance(self.dag.tasks[2], CloudFormationDeleteStackSensor)
+        self.assertIsInstance(self.dag.tasks[1], OperatorWithVariableResolving)
+        self.assertIsInstance(self.dag.tasks[1].operator_delegate, CloudFormationDeleteStackOperator)
+        self.assertIsInstance(self.dag.tasks[2], OperatorWithVariableResolving)
+        self.assertIsInstance(self.dag.tasks[2].operator_delegate, CloudFormationDeleteStackSensor)
         self.assertIsInstance(self.dag.tasks[3], DummyOperator)
 
     def test_check_dags_queued_task(self):
diff --git a/tests/test_licenses.py b/tests/test_licenses.py
index e105b3c..6b4a3b4 100644
--- a/tests/test_licenses.py
+++ b/tests/test_licenses.py
@@ -22,8 +22,10 @@ from unittest import TestCase
 
 from termcolor import colored
 
-EXCLUDED_EXTENSIONS = ['.gif', '.png', '.pyc', 'LICENSE', 'DISCLAIMER', 'NOTICE', '.whl']
-EXCLUDED_DIRS = ['docs/build', '.git', '.idea', 'venv', 'apache_liminal.egg-info']
+EXCLUDED_EXTENSIONS = [
+    '.gif', '.png', '.pyc', 'LICENSE', 'DISCLAIMER', 'DISCLAIMER-WIP', 'NOTICE', '.whl'
+]
+EXCLUDED_DIRS = ['docs/build', 'build', 'dist', '.git', '.idea', 'venv', 'apache_liminal.egg-info']
 EXCLUDED_FILES = ['DISCLAIMER-WIP']
 
 PYTHON_LICENSE_HEADER = """