You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@liminal.apache.org by av...@apache.org on 2021/11/21 07:17:49 UTC
[incubator-liminal] branch master updated: [LIMINAL-82] change variable resolution to be in real time
This is an automated email from the ASF dual-hosted git repository.
aviemzur pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
The following commit(s) were added to refs/heads/master by this push:
new da1d4c6 [LIMINAL-82] change variable resolution to be in real time
da1d4c6 is described below
commit da1d4c6377f09aba5a22be7c7d024354d2da418a
Author: Aviem Zur <av...@gmail.com>
AuthorDate: Sun Nov 21 09:17:45 2021 +0200
[LIMINAL-82] change variable resolution to be in real time
---
.../helloworld/hello_world.py | 3 +
examples/liminal-getting-started/liminal.yml | 19 +-
.../runners/airflow/dag/liminal_register_dags.py | 6 +-
liminal/runners/airflow/executors/emr.py | 38 ++--
liminal/runners/airflow/executors/kubernetes.py | 10 +-
liminal/runners/airflow/model/executor.py | 21 +++
liminal/runners/airflow/model/task.py | 35 +++-
.../operators/operator_with_variable_resolving.py | 201 +++++++++++++++++++++
liminal/runners/airflow/tasks/containerable.py | 22 +--
.../airflow/tasks/create_cloudformation_stack.py | 44 +++--
.../airflow/tasks/delete_cloudformation_stack.py | 23 ++-
liminal/runners/airflow/tasks/job_end.py | 4 +-
liminal/runners/airflow/tasks/job_start.py | 4 +-
liminal/runners/airflow/tasks/python.py | 6 +-
liminal/runners/airflow/tasks/spark.py | 8 +-
requirements.txt | 5 +-
tests/runners/airflow/executors/test_emr.py | 16 +-
.../test_operator_with_variable_resolving.py | 174 ++++++++++++++++++
.../tasks/test_create_cloudformation_stack.py | 16 +-
.../tasks/test_delete_cloudformation_stack.py | 7 +-
tests/test_licenses.py | 6 +-
21 files changed, 555 insertions(+), 113 deletions(-)
diff --git a/examples/liminal-getting-started/helloworld/hello_world.py b/examples/liminal-getting-started/helloworld/hello_world.py
index f9cd850..f3b75fb 100644
--- a/examples/liminal-getting-started/helloworld/hello_world.py
+++ b/examples/liminal-getting-started/helloworld/hello_world.py
@@ -17,8 +17,11 @@
# under the License.
import json
+import os
print('Hello world!\n')
+print('Environment:')
+print(os.environ)
with open('/mnt/gettingstartedvol/hello_world_output.json', 'w') as file:
file.write(json.dumps({'hello': 1, 'world': 2}))
diff --git a/examples/liminal-getting-started/liminal.yml b/examples/liminal-getting-started/liminal.yml
index bcbfd7d..5521841 100644
--- a/examples/liminal-getting-started/liminal.yml
+++ b/examples/liminal-getting-started/liminal.yml
@@ -22,6 +22,8 @@ volumes:
claim_name: gettingstartedvol-pvc
local:
path: .
+variables:
+ myvar: myval
images:
- image: python_hello_world_example_image
type: python
@@ -50,28 +52,15 @@ pipelines:
tasks:
- task: python_hello_world_example
type: python
- description: static input task
image: python_hello_world_example_image
env_vars:
- env1: "a"
- env2: "b"
+ env1: "{{myvar}}"
+ env2: "foo"
mounts:
- mount: mymount
volume: gettingstartedvol
path: /mnt/gettingstartedvol
cmd: python -u hello_world.py
- - task: parallelized_hello_world_example_task
- type: python
- description: parallelized parallelized hello world example
- image: python_hello_world_example_image
- env_vars:
- env1: "a"
- env2: "b"
- mounts:
- - mount: mymount
- volume: gettingstartedvol
- path: /mnt/gettingstartedvol
- split_input: True
executors: 2
cmd: python -u hello_world.py
- task: python_hello_world_output_task
diff --git a/liminal/runners/airflow/dag/liminal_register_dags.py b/liminal/runners/airflow/dag/liminal_register_dags.py
index 46ff6e4..77a729d 100644
--- a/liminal/runners/airflow/dag/liminal_register_dags.py
+++ b/liminal/runners/airflow/dag/liminal_register_dags.py
@@ -38,8 +38,7 @@ def register_dags(configs_path):
"""
logging.info(f'Registering DAGs from path: {configs_path}')
config_util = ConfigUtil(configs_path)
- # TODO - change is_render_variable to False when runtime resolving is available
- configs = config_util.safe_load(is_render_variables=True)
+ configs = config_util.safe_load(is_render_variables=False)
if os.getenv('POD_NAMESPACE') != "jenkins":
config_util.snapshot_final_liminal_configs()
@@ -80,7 +79,8 @@ def register_dags(configs_path):
trigger_rule=trigger_rule,
liminal_config=config,
pipeline_config=pipeline,
- task_config=task
+ task_config=task,
+ variables=config.get('variables', {})
)
executor_id = task.get('executor')
diff --git a/liminal/runners/airflow/executors/emr.py b/liminal/runners/airflow/executors/emr.py
index 83354aa..9eb7ba3 100644
--- a/liminal/runners/airflow/executors/emr.py
+++ b/liminal/runners/airflow/executors/emr.py
@@ -44,28 +44,32 @@ class EMRExecutor(executor.Executor):
self._validate_task_type(task)
# assuming emr already exists
- add_step = EmrAddStepsOperator(
- task_id=f'{task.task_id}_add_step',
- job_flow_id=self.job_flow_id,
- job_flow_name=self.job_flow_name,
- aws_conn_id=self.aws_conn_id,
- steps=self.__generate_emr_step(task.task_id,
- [str(x) for x in task.get_runnable_command()]),
- cluster_states=self.cluster_states,
- dag=task.dag
+ add_step = executor.add_variables_to_operator(
+ EmrAddStepsOperator(
+ task_id=f'{task.task_id}_add_step',
+ job_flow_id=self.job_flow_id,
+ job_flow_name=self.job_flow_name,
+ aws_conn_id=self.aws_conn_id,
+ steps=self.__generate_emr_step(task.task_id,
+ [str(x) for x in task.get_runnable_command()]),
+ cluster_states=self.cluster_states,
+ ),
+ task
)
if task.parent:
parent.set_downstream(add_step)
- emr_sensor_step = EmrStepSensor(
- task_id=f'{task.task_id}_watch_step',
- job_flow_id="{{ task_instance.xcom_pull('" + add_step.task_id +
- "', key='job_flow_id') }}",
- step_id="{{ task_instance.xcom_pull('" + add_step.task_id +
- "', key='return_value')[0] }}",
- aws_conn_id=self.aws_conn_id,
- dag=task.dag
+ emr_sensor_step = executor.add_variables_to_operator(
+ EmrStepSensor(
+ task_id=f'{task.task_id}_watch_step',
+ job_flow_id="{{ task_instance.xcom_pull('" + add_step.task_id +
+ "', key='job_flow_id') }}",
+ step_id="{{ task_instance.xcom_pull('" + add_step.task_id +
+ "', key='return_value')[0] }}",
+ aws_conn_id=self.aws_conn_id
+ ),
+ task
)
add_step.set_downstream(emr_sensor_step)
diff --git a/liminal/runners/airflow/executors/kubernetes.py b/liminal/runners/airflow/executors/kubernetes.py
index be1e25e..7e69761 100644
--- a/liminal/runners/airflow/executors/kubernetes.py
+++ b/liminal/runners/airflow/executors/kubernetes.py
@@ -54,10 +54,12 @@ class KubernetesPodExecutor(executor.Executor):
self._validate_task_type(task)
- pod_task = KubernetesPodOperator(
- dag=task.dag,
- trigger_rule=task.trigger_rule,
- **self.__kubernetes_kwargs(task)
+ pod_task = executor.add_variables_to_operator(
+ KubernetesPodOperator(
+ trigger_rule=task.trigger_rule,
+ **self.__kubernetes_kwargs(task)
+ ),
+ task
)
if parent:
diff --git a/liminal/runners/airflow/model/executor.py b/liminal/runners/airflow/model/executor.py
index 918ed21..9e30aac 100644
--- a/liminal/runners/airflow/model/executor.py
+++ b/liminal/runners/airflow/model/executor.py
@@ -18,6 +18,27 @@
from abc import ABC, abstractmethod
+from airflow.models import BaseOperator
+
+from liminal.runners.airflow.operators.operator_with_variable_resolving import \
+ OperatorWithVariableResolving
+
+
+def add_variables_to_operator(operator, task) -> BaseOperator:
+ """
+ :param operator: Airflow operator
+ :type operator: BaseOperator
+ :param task: Task instance
+ :returns: OperatorWithVariableResolving wrapping given operator
+ """
+ return OperatorWithVariableResolving(
+ dag=task.dag,
+ task_config=task.task_config,
+ variables=task.variables,
+ liminal_task_instance=task,
+ operator=operator
+ )
+
class Executor(ABC):
"""
diff --git a/liminal/runners/airflow/model/task.py b/liminal/runners/airflow/model/task.py
index e311b59..e7a2493 100644
--- a/liminal/runners/airflow/model/task.py
+++ b/liminal/runners/airflow/model/task.py
@@ -19,7 +19,12 @@
"""
Base task.
"""
-from abc import ABC, abstractmethod
+import json
+from abc import ABC
+
+from airflow.models import BaseOperator
+
+from liminal.runners.airflow.model import executor
class Task(ABC):
@@ -28,7 +33,7 @@ class Task(ABC):
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config):
+ task_config, variables=None):
self.liminal_config = liminal_config
self.dag = dag
self.pipeline_config = pipeline_config
@@ -36,3 +41,29 @@ class Task(ABC):
self.parent = parent
self.trigger_rule = trigger_rule
self.task_config = task_config
+ self.variables = variables
+
+ def serialize(self) -> str:
+ """
+ :returns: JSON string representation of this task
+ """
+ data = {
+ 'task_id': self.task_id,
+ 'dag': None,
+ 'parent': self.parent,
+ 'trigger_rule': self.trigger_rule,
+ 'liminal_config': self.liminal_config,
+ 'pipeline_config': self.pipeline_config,
+ 'task_config': self.task_config,
+ 'variables': self.variables,
+ }
+
+ return json.dumps(data, default=str)
+
+ def _add_variables_to_operator(self, operator) -> BaseOperator:
+ """
+ :param operator: Airflow operator
+ :type operator: BaseOperator
+ :returns: OperatorWithVariableResolving wrapping given operator
+ """
+ return executor.add_variables_to_operator(operator, self)
diff --git a/liminal/runners/airflow/operators/operator_with_variable_resolving.py b/liminal/runners/airflow/operators/operator_with_variable_resolving.py
new file mode 100644
index 0000000..d363ba1
--- /dev/null
+++ b/liminal/runners/airflow/operators/operator_with_variable_resolving.py
@@ -0,0 +1,201 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+import logging
+import re
+from datetime import datetime
+from typing import Any, Dict, Optional, Set
+
+import jinja2
+from airflow.models import BaseOperator
+from airflow.settings import Session
+from jinja2 import Environment
+
+from liminal.runners.airflow.config import standalone_variable_backend
+
+_VAR_REGEX = '(.*){{([^}]*)}}(.*)'
+
+_BASE_OPERATOR_ATTRIBUTES = list(inspect.signature(BaseOperator.__init__).parameters.keys())
+
+
+class OperatorWithVariableResolving(BaseOperator):
+ """
+ Operator delegator that handles liminal variable substitution at run time
+ """
+
+ def __init__(self,
+ dag,
+ task_config: dict,
+ variables: dict = None,
+ liminal_task_instance=None,
+ **kwargs):
+ self.operator_delegate: BaseOperator = kwargs.pop('operator')
+ self.liminal_task_instance = \
+ liminal_task_instance.serialize() if liminal_task_instance else None
+ if variables:
+ self.variables = variables.copy()
+ else:
+ self.variables = {}
+ self.task_config = task_config
+ super().__init__(
+ task_id=self.operator_delegate.task_id,
+ dag=dag
+ )
+ self._LOG = logging.getLogger(self.__class__.__name__)
+
+ def execute(self, context):
+ attributes = self._get_operator_delegate_attributes()
+ self._LOG.info(f'task_config: {self.task_config}')
+ self._LOG.info(f'variables: {self.variables}')
+ self.operator_delegate.template_fields = set(list(self.operator_delegate.template_fields) +
+ attributes)
+ self.operator_delegate.render_template_fields(context,
+ LiminalEnvironment(self.variables,
+ self.task_config))
+ self.operator_delegate.render_template_fields(context)
+
+ if 'ti' in context:
+ context['ti'].xcom_push(key="liminal_task_instance", value=self.liminal_task_instance)
+
+ return self.operator_delegate.execute(context)
+
+ def post_execute(self, context, result=None):
+ self.operator_delegate.post_execute(context, result)
+
+ def _get_operator_delegate_attributes(self):
+ return [
+ attr for attr in dir(self.operator_delegate) if
+ attr not in _BASE_OPERATOR_ATTRIBUTES and attr not in dir(BaseOperator)
+ and not attr.startswith('_')
+ and attr not in ('args', 'kwargs', 'lineage_data', 'subdag', 'template_fields')
+ ]
+
+ def pre_execute(self, context: Any):
+ return self.operator_delegate.pre_execute(context)
+
+ def on_kill(self) -> None:
+ self.operator_delegate.on_kill()
+
+ def render_template_fields(self, context: Dict,
+ jinja_env: Optional[jinja2.Environment] = None) -> None:
+ pass
+
+ def render_template(self, content: Any, context: Dict,
+ jinja_env: Optional[jinja2.Environment] = None,
+ seen_oids: Optional[Set] = None) -> Any:
+ value = self.operator_delegate.render_template(content, context,
+ LiminalEnvironment(self.variables,
+ self.task_config))
+ return self.operator_delegate.render_template(value, context, jinja_env, seen_oids)
+
+ def get_template_env(self) -> jinja2.Environment:
+ return self.operator_delegate.get_template_env()
+
+ def prepare_template(self) -> None:
+ self.operator_delegate.prepare_template()
+
+ def resolve_template_files(self) -> None:
+ self.operator_delegate.resolve_template_files()
+
+ def clear(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None,
+ upstream: bool = False, downstream: bool = False, session: Session = None):
+ return self.operator_delegate.clear(start_date, end_date, upstream, downstream, session)
+
+ def run(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None,
+ ignore_first_depends_on_past: bool = True, ignore_ti_state: bool = False,
+ mark_success: bool = False) -> None:
+ self.operator_delegate.run(start_date, end_date, ignore_first_depends_on_past,
+ ignore_ti_state,
+ mark_success)
+
+
+class LiminalEnvironment(Environment):
+ def __init__(self, variables, task_config=None):
+ super().__init__()
+ self.val = None
+ self.variables = variables.copy()
+ logging.info(f'variables: {variables}')
+ if task_config and 'variables' in task_config:
+ task_variables = task_config['variables']
+ if isinstance(task_variables, dict):
+ self.variables.update(task_variables)
+ elif isinstance(task_variables, str):
+ variables_key = self.from_string(task_variables).render()
+ if variables_key in variables:
+ self.variables.update(variables[variables_key])
+
+ def from_string(self, val, **kwargs):
+ self.val = val
+ return self
+
+ def render(self, *_, **kwargs):
+ """
+ Implements jinja2.environment.Template.render
+ """
+ conf = kwargs['dag_run'].conf if 'dag_run' in kwargs else {}
+ return self.__render(self.val, conf, set())
+
+ def __render(self, val: str, dag_run_conf: dict, unresolved_tags: set):
+ token = re.match(_VAR_REGEX, val)
+ if token and token[2].strip() not in unresolved_tags:
+ tag_name = token[2].strip()
+ prefix = self.__render(token[1], dag_run_conf, unresolved_tags)
+ suffix = self.__render(token[3], dag_run_conf, unresolved_tags)
+ if dag_run_conf and tag_name in dag_run_conf:
+ return self.__render(prefix + str(dag_run_conf[tag_name]) + suffix,
+ dag_run_conf,
+ unresolved_tags)
+ elif tag_name in self.variables:
+ return self.__render(prefix + str(self.variables[tag_name]) + suffix,
+ dag_run_conf,
+ unresolved_tags)
+ else:
+ backend_value = standalone_variable_backend.get_variable(tag_name, None)
+ if backend_value:
+ return self.__render(
+ prefix + backend_value + suffix,
+ dag_run_conf,
+ unresolved_tags
+ )
+ else:
+ unresolved_tags.add(tag_name)
+ return self.__render(
+ prefix + '{{' + token[2] + '}}' + suffix,
+ dag_run_conf,
+ unresolved_tags
+ )
+ else:
+ return val
+
+
+def add_variables_to_operator(operator, task) -> BaseOperator:
+ """
+ :param operator: Airflow operator
+ :type operator: BaseOperator
+ :param task: Task instance
+ :type task: Task
+ :returns: OperatorWithVariableResolving wrapping given operator
+ """
+ return OperatorWithVariableResolving(
+ dag=task.dag,
+ task_config=task.task_config,
+ variables=task.variables,
+ liminal_task_instance=task,
+ operator=operator
+ )
diff --git a/liminal/runners/airflow/tasks/containerable.py b/liminal/runners/airflow/tasks/containerable.py
index 9f731e3..08e1139 100644
--- a/liminal/runners/airflow/tasks/containerable.py
+++ b/liminal/runners/airflow/tasks/containerable.py
@@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
import json
import logging
import os
@@ -29,8 +30,6 @@ from liminal.runners.airflow.model import task
_LOG = logging.getLogger(__name__)
ENV = 'env'
DEFAULT = 'default'
-OUTPUT_PATH = 'OUTPUT_PATH'
-OUTPUT_DESTINATION_PATH = 'OUTPUT_DESTINATION_PATH'
class ContainerTask(task.Task, ABC):
@@ -39,19 +38,16 @@ class ContainerTask(task.Task, ABC):
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config):
+ task_config, variables=None):
super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
- pipeline_config, task_config)
+ pipeline_config, task_config, variables)
env = standalone_variable_backend.get_variable(ENV, DEFAULT)
self.env_vars = self.__env_vars(env)
self.image = self.task_config['image']
self.mounts = self.task_config.get('mounts', [])
- self.cmds, self.arguments = self._kubernetes_cmds_and_arguments(
- self.env_vars.get(OUTPUT_PATH),
- self.env_vars.get(OUTPUT_DESTINATION_PATH)
- )
+ self.cmds, self.arguments = self._kubernetes_cmds_and_arguments()
- def _kubernetes_cmds_and_arguments(self, output_path, output_destination_path):
+ def _kubernetes_cmds_and_arguments(self):
cmds = ['/bin/sh', '-c']
arguments = [
@@ -94,12 +90,4 @@ class ContainerTask(task.Task, ABC):
if ENV not in env_vars:
env_vars[ENV] = env
- env_vars[OUTPUT_PATH] = self.task_config[
- OUTPUT_PATH
- ] if OUTPUT_PATH in self.task_config else '/tmp/s3_mount'
-
- if 'output_destination_path' in self.task_config:
- env_vars[OUTPUT_DESTINATION_PATH] = self.task_config[
- 'output_destination_path'
- ]
return dict([(k, str(v)) for k, v in env_vars.items()])
diff --git a/liminal/runners/airflow/tasks/create_cloudformation_stack.py b/liminal/runners/airflow/tasks/create_cloudformation_stack.py
index c94d640..2ac1605 100644
--- a/liminal/runners/airflow/tasks/create_cloudformation_stack.py
+++ b/liminal/runners/airflow/tasks/create_cloudformation_stack.py
@@ -22,6 +22,8 @@ from flatdict import FlatDict
from liminal.runners.airflow.operators.cloudformation import CloudFormationCreateStackOperator, \
CloudFormationCreateStackSensor, CloudFormationHook
+from liminal.runners.airflow.operators.operator_with_variable_resolving import \
+ OperatorWithVariableResolving
from liminal.runners.airflow.tasks import airflow
@@ -31,37 +33,41 @@ class CreateCloudFormationStackTask(airflow.AirflowTask):
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config):
+ task_config, variables=None):
super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
- pipeline_config, task_config)
+ pipeline_config, task_config, variables)
self.stack_name = task_config['stack_name']
def apply_task_to_dag(self):
- check_cloudformation_stack_exists_task = BranchPythonOperator(
- op_kwargs={'stack_name': self.stack_name},
- task_id=f'is-cloudformation-{self.task_id}-running',
- python_callable=self.__cloudformation_stack_running_branch,
- dag=self.dag
+ check_cloudformation_stack_exists_task = self._add_variables_to_operator(
+ BranchPythonOperator(
+ templates_dict={'stack_name': self.stack_name},
+ task_id=f'is-cloudformation-{self.task_id}-running',
+ python_callable=self.__cloudformation_stack_running_branch,
+ provide_context=True,
+ )
)
- create_cloudformation_stack_task = CloudFormationCreateStackOperator(
- task_id=f'create-cloudformation-{self.task_id}',
- params={
- **self.__reformatted_params()
- },
- dag=self.dag
+ create_cloudformation_stack_task = self._add_variables_to_operator(
+ CloudFormationCreateStackOperator(
+ task_id=f'create-cloudformation-{self.task_id}',
+ params={
+ **self.__reformatted_params()
+ }
+ )
)
- create_stack_sensor_task = CloudFormationCreateStackSensor(
- task_id=f'cloudformation-watch-{self.task_id}-create',
- stack_name=self.stack_name,
- dag=self.dag
+ create_stack_sensor_task = self._add_variables_to_operator(
+ CloudFormationCreateStackSensor(
+ task_id=f'cloudformation-watch-{self.task_id}-create',
+ stack_name=self.stack_name,
+ )
)
stack_creation_end_task = DummyOperator(
task_id=f'creation-end-{self.task_id}',
- dag=self.dag,
- trigger_rule='all_done'
+ trigger_rule='all_done',
+ dag=self.dag
)
if self.parent:
diff --git a/liminal/runners/airflow/tasks/delete_cloudformation_stack.py b/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
index 324c961..756f645 100644
--- a/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
+++ b/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
@@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
@@ -30,9 +31,9 @@ class DeleteCloudFormationStackTask(airflow.AirflowTask):
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config):
+ task_config, variables=None):
super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
- pipeline_config, task_config)
+ pipeline_config, task_config, variables)
self.stack_name = task_config['stack_name']
def apply_task_to_dag(self):
@@ -44,16 +45,18 @@ class DeleteCloudFormationStackTask(airflow.AirflowTask):
dag=self.dag
)
- delete_stack_task = CloudFormationDeleteStackOperator(
- task_id=f'delete-cloudformation-{self.task_id}',
- params={'StackName': self.stack_name},
- dag=self.dag
+ delete_stack_task = self._add_variables_to_operator(
+ CloudFormationDeleteStackOperator(
+ task_id=f'delete-cloudformation-{self.task_id}',
+ params={'StackName': self.stack_name},
+ )
)
- delete_stack_sensor = CloudFormationDeleteStackSensor(
- task_id=f'cloudformation-watch-{self.task_id}-delete',
- stack_name=self.stack_name,
- dag=self.dag
+ delete_stack_sensor = self._add_variables_to_operator(
+ CloudFormationDeleteStackSensor(
+ task_id=f'cloudformation-watch-{self.task_id}-delete',
+ stack_name=self.stack_name,
+ )
)
stack_delete_end_task = DummyOperator(
diff --git a/liminal/runners/airflow/tasks/job_end.py b/liminal/runners/airflow/tasks/job_end.py
index ff5cffa..4f8c295 100644
--- a/liminal/runners/airflow/tasks/job_end.py
+++ b/liminal/runners/airflow/tasks/job_end.py
@@ -26,9 +26,9 @@ class JobEndTask(airflow.AirflowTask):
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config):
+ task_config, variables=None):
super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config)
+ task_config, variables)
metrics = self.liminal_config.get('metrics', {})
self.metrics_namespace = metrics.get('namespace', '')
self.metrics_backends = metrics.get('backends', [])
diff --git a/liminal/runners/airflow/tasks/job_start.py b/liminal/runners/airflow/tasks/job_start.py
index b6bef32..78b69a2 100644
--- a/liminal/runners/airflow/tasks/job_start.py
+++ b/liminal/runners/airflow/tasks/job_start.py
@@ -26,9 +26,9 @@ class JobStartTask(airflow.AirflowTask):
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config):
+ task_config, variables=None):
super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config)
+ task_config, variables)
metrics = self.liminal_config.get('metrics', {})
self.metrics_namespace = metrics.get('namespace', '')
self.metrics_backends = metrics.get('backends', [])
diff --git a/liminal/runners/airflow/tasks/python.py b/liminal/runners/airflow/tasks/python.py
index 878ce62..81163dd 100644
--- a/liminal/runners/airflow/tasks/python.py
+++ b/liminal/runners/airflow/tasks/python.py
@@ -24,14 +24,14 @@ class PythonTask(containerable.ContainerTask):
Python task.
"""
- def _kubernetes_cmds_and_arguments(self, output_path, output_destination_path):
+ def _kubernetes_cmds_and_arguments(self):
cmds = ['/bin/bash', '-c']
arguments = [
- self.__cmd(output_path, output_destination_path)
+ self.__cmd()
]
return cmds, arguments
- def __cmd(self, output_path, output_destination_path):
+ def __cmd(self):
return self.task_config['cmd']
diff --git a/liminal/runners/airflow/tasks/spark.py b/liminal/runners/airflow/tasks/spark.py
index 247df2c..8938011 100644
--- a/liminal/runners/airflow/tasks/spark.py
+++ b/liminal/runners/airflow/tasks/spark.py
@@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
from itertools import chain
from flatdict import FlatDict
@@ -28,11 +29,12 @@ class SparkTask(hadoop.HadoopTask, containerable.ContainerTask):
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config):
+ task_config, variables=None):
task_config['image'] = task_config.get('image', '')
task_config['cmd'] = task_config.get('cmd', [])
+ task_config['env_vars'] = {'SPARK_LOCAL_HOSTNAME': 'localhost'}
super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
- pipeline_config, task_config)
+ pipeline_config, task_config, variables)
def get_runnable_command(self):
"""
@@ -88,5 +90,5 @@ class SparkTask(hadoop.HadoopTask, containerable.ContainerTask):
def __interleaving(keys, values):
return list(chain.from_iterable(zip(keys, values)))
- def _kubernetes_cmds_and_arguments(self, output_path, output_destination_path):
+ def _kubernetes_cmds_and_arguments(self):
return self.__generate_spark_submit(), []
diff --git a/requirements.txt b/requirements.txt
index 1b3b0bf..415ee41 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -20,7 +20,7 @@ docker==4.2.0
apache-airflow==2.1.2
click==7.1.1
Flask==1.1.1
-pyyaml==5.3.1
+pyyaml==5.4.1
boto3==1.17.112
botocore==1.20.112
wheel==0.36.2
@@ -43,4 +43,5 @@ requests==2.25.0
apache-airflow-providers-amazon==1.4.0
apache-airflow-providers-cncf-kubernetes==1.0.2
#apache-airflow-providers-google==4.0.0
-#apache-airflow[google,amazon,apache.spark]==2.0.0
\ No newline at end of file
+#apache-airflow[google,amazon,apache.spark]==2.0.0
+cfn-lint==0.53.0
diff --git a/tests/runners/airflow/executors/test_emr.py b/tests/runners/airflow/executors/test_emr.py
index b4d107b..19c7b99 100644
--- a/tests/runners/airflow/executors/test_emr.py
+++ b/tests/runners/airflow/executors/test_emr.py
@@ -28,6 +28,8 @@ from moto import mock_emr
from liminal.runners.airflow import DummyDag
from liminal.runners.airflow.executors.emr import EMRExecutor
+from liminal.runners.airflow.operators.operator_with_variable_resolving import \
+ OperatorWithVariableResolving
from liminal.runners.airflow.tasks import hadoop
from tests.util import dag_test_utils
@@ -80,6 +82,8 @@ class TestEMRExecutorTask(TestCase):
self.hadoop_task.dag = self.dag
self.hadoop_task.trigger_rule = 'all_done'
self.hadoop_task.parent = None
+ self.hadoop_task.task_config = {}
+ self.hadoop_task.variables = {}
self.emr = EMRExecutor(
self.executor_name,
@@ -92,8 +96,10 @@ class TestEMRExecutorTask(TestCase):
self.assertEqual(len(self.dag.tasks), 2)
- self.assertIsInstance(self.dag.tasks[0], EmrAddStepsOperator)
- self.assertIsInstance(self.dag.tasks[1], EmrStepSensor)
+ self.assertIsInstance(self.dag.tasks[0], OperatorWithVariableResolving)
+ self.assertIsInstance(self.dag.tasks[0].operator_delegate, EmrAddStepsOperator)
+ self.assertIsInstance(self.dag.tasks[1], OperatorWithVariableResolving)
+ self.assertIsInstance(self.dag.tasks[1].operator_delegate, EmrStepSensor)
@mock.patch.object(EmrHook, 'get_conn')
def test_add_step(self, mock_emr_hook_get_conn):
@@ -107,7 +113,8 @@ class TestEMRExecutorTask(TestCase):
emr_add_step_task = self.dag.tasks[0]
- self.assertIsInstance(emr_add_step_task, EmrAddStepsOperator)
+ self.assertIsInstance(emr_add_step_task, OperatorWithVariableResolving)
+ self.assertIsInstance(emr_add_step_task.operator_delegate, EmrAddStepsOperator)
emr_add_step_task.render_template_fields({})
@@ -133,6 +140,7 @@ class TestEMRExecutorTask(TestCase):
emr_watch_step_task = self.dag.tasks[1]
- self.assertIsInstance(emr_watch_step_task, EmrStepSensor)
+ self.assertIsInstance(emr_watch_step_task, OperatorWithVariableResolving)
+ self.assertIsInstance(emr_watch_step_task.operator_delegate, EmrStepSensor)
# todo - elaborate tests
diff --git a/tests/runners/airflow/operators/test_operator_with_variable_resolving.py b/tests/runners/airflow/operators/test_operator_with_variable_resolving.py
new file mode 100644
index 0000000..53a875d
--- /dev/null
+++ b/tests/runners/airflow/operators/test_operator_with_variable_resolving.py
@@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from unittest import TestCase
+from unittest.mock import patch, MagicMock
+
+from airflow.models import BaseOperator
+
+from liminal.runners.airflow.operators.operator_with_variable_resolving import \
+ OperatorWithVariableResolving
+
+
+class TestKubernetesPodOperatorWithAutoImage(TestCase):
+ @patch.dict(os.environ,
+ {'env': 'myenv', 'LIMINAL_STAND_ALONE_MODE': 'True', 'nested': 'value1'})
+ def test_value_from_environment_and_liminal_config(self):
+ variables = {'my-image': 'my-image', 'something-value1': 'stg'}
+ operator = TestOperator(task_id='abc', field='{{env}}', expected='{{my-image}}')
+ ret_val = OperatorWithVariableResolving(dag=None,
+ operator=operator,
+ task_id='my_task',
+ task_config={},
+ variables=variables).execute({})
+
+ self.assertEqual(operator.field, 'myenv')
+ self.assertEqual(ret_val, 'my-image')
+
+ @patch.dict(os.environ,
+ {'env': 'myenv', 'LIMINAL_STAND_ALONE_MODE': 'True', 'nested': 'value1'})
+ def test_value_from_task_variables(self):
+ variables = {'my-image': 'my-image', 'my_dict_var': {'env': 'myenv2'}}
+ operator = TestOperator(task_id='abc', field='{{env}}', expected='{{my-image}}')
+ ret_val = OperatorWithVariableResolving(dag=None,
+ operator=operator,
+ task_id='my_task',
+ task_config={'variables': 'my_dict_var'},
+ variables=variables).execute({})
+
+ self.assertEqual(operator.field, 'myenv2')
+ self.assertEqual(ret_val, 'my-image')
+
+ @patch.dict(os.environ,
+ {'env': 'myenv', 'LIMINAL_STAND_ALONE_MODE': 'True', 'nested': 'value1'})
+ def test_value_from_inline_task_variables(self):
+ variables = {'my-image': 'my-image'}
+ operator = TestOperator(task_id='abc', field='{{env}}', expected='{{my-image}}')
+ ret_val = OperatorWithVariableResolving(dag=None,
+ operator=operator,
+ task_id='my_task',
+ task_config={'variables': {'env': 'myenv2'}},
+ variables=variables).execute({})
+
+ self.assertEqual(operator.field, 'myenv2')
+ self.assertEqual(ret_val, 'my-image')
+
+ @patch.dict(os.environ, {'env': 'staging', 'LIMINAL_STAND_ALONE_MODE': 'True'})
+ def test_nested_variables(self):
+ variables = {'nested-the-s3-location': 'good', 'my-image': 'my-image',
+ 'something-value1': 'stg', 'env_val-staging': 'the-s3-location',
+ 'nested': 'value1'}
+ operator = TestOperator(task_id='abc', field='something-{{nested-{{env_val-{{env}}}}}}',
+ expected='{{my-image}}')
+
+ OperatorWithVariableResolving(dag=None,
+ operator=operator,
+ task_id='my_task',
+ task_config={},
+ variables=variables).execute({})
+
+ self.assertEqual(operator.field, 'something-good')
+
+ @patch.dict(os.environ, {'env': 'staging', 'LIMINAL_STAND_ALONE_MODE': 'True'})
+ def test_dag_run_conf_parameters(self):
+ dag_run = MagicMock()
+ dag_run.conf = {'nested-the-s3-location': 'good', 'my-image': 'my-image',
+ 'something-value1': 'stg', 'env_val-staging': 'the-s3-location',
+ 'nested': 'value1'}
+ operator = TestOperator(task_id='abc', field='something-{{nested-{{env_val-{{env}}}}}}',
+ expected='{{my-image}}')
+ OperatorWithVariableResolving(dag=None,
+ operator=operator,
+ task_id='my_task',
+ task_config={},
+ variables={}).execute({'dag_run': dag_run})
+
+ self.assertEqual(operator.field, 'something-good')
+
+ @patch.dict(os.environ, {'env': 'staging', 'LIMINAL_STAND_ALONE_MODE': 'True'})
+ def test_duplicated_params(self):
+ dag_run = MagicMock()
+ dag_run.conf = {'nested-the-s3-location': 'good', 'my-image': 'my-image',
+ 'something-value1': 'stg', 'env_val-staging': 'the-s3-location',
+ 'nested': 'value1'}
+ operator = TestOperator(task_id='abc', field='something-{{nested-{{env_val-{{env}}}}}}-'
+ '{{nested-{{env_val-{{env}}}}}}',
+ expected='{{my-image}}')
+ OperatorWithVariableResolving(dag=None,
+ operator=operator,
+ task_id='my_task',
+ task_config={},
+ variables={}).execute({'dag_run': dag_run})
+
+ self.assertEqual(operator.field, 'something-good-good')
+
+ operator = TestOperator(task_id='abc', field='something-{{env}} {{env}}',
+ expected='{{my-image}}')
+ OperatorWithVariableResolving(dag=None,
+ operator=operator,
+ task_id='my_task',
+ task_config={},
+ variables={}).execute({'dag_run': dag_run})
+
+ self.assertEqual(operator.field, 'something-staging staging')
+
+ dag_run.conf = {'var1': '{{var2}}', 'var2': 'bla', 'my-image': 'my-image'}
+ operator = TestOperator(task_id='abc', field='{{ var1 }}',
+ expected='{{my-image}}')
+ OperatorWithVariableResolving(dag=None,
+ operator=operator,
+ task_id='my_task',
+ task_config={},
+ variables={}).execute({'dag_run': dag_run})
+
+ self.assertEqual(operator.field, 'bla')
+
+ @patch.dict(os.environ, {'env': 'staging', 'LIMINAL_STAND_ALONE_MODE': 'True'})
+ def test_non_existing_param(self):
+ dag_run = MagicMock()
+ dag_run.conf = {'var1': '{{var2}}', 'var2': 'bla'}
+ operator = TestOperator(task_id='abc', field='{{myimage}}',
+ expected='{{myimage}}')
+ OperatorWithVariableResolving(dag=None,
+ operator=operator,
+ task_id='my_task',
+ task_config={},
+ variables={}).execute({'dag_run': dag_run})
+
+ self.assertEqual(operator.field, '')
+
+
+class BaseTestOperator(BaseOperator):
+
+ def execute(self, context):
+ raise NotImplementedError()
+
+ def __init__(self, field, expected, *args, **kwargs):
+ self.field = field
+ self.expected = expected
+ super().__init__(*args, **kwargs)
+
+
+class TestOperator(BaseTestOperator):
+
+ def __init__(self, *args, **kwargs):
+ super(TestOperator, self).__init__(*args, **kwargs)
+
+ def execute(self, context):
+ return self.expected
diff --git a/tests/runners/airflow/tasks/test_create_cloudformation_stack.py b/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
index a2317d1..e57dad4 100644
--- a/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
+++ b/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
@@ -26,6 +26,7 @@ from moto import mock_cloudformation
from liminal.runners.airflow.executors import airflow
from liminal.runners.airflow.operators.cloudformation import CloudFormationCreateStackOperator, \
CloudFormationCreateStackSensor, CloudFormationHook
+from liminal.runners.airflow.operators.operator_with_variable_resolving import OperatorWithVariableResolving
from liminal.runners.airflow.tasks.create_cloudformation_stack import CreateCloudFormationStackTask
from tests.util import dag_test_utils
@@ -87,9 +88,12 @@ class TestCreateCloudFormationStackTask(TestCase):
def test_apply_task_to_dag(self):
self.assertEqual(len(self.dag.tasks), 4)
- self.assertIsInstance(self.dag.tasks[0], BranchPythonOperator)
- self.assertIsInstance(self.dag.tasks[1], CloudFormationCreateStackOperator)
- self.assertIsInstance(self.dag.tasks[2], CloudFormationCreateStackSensor)
+ self.assertIsInstance(self.dag.tasks[0], OperatorWithVariableResolving)
+ self.assertIsInstance(self.dag.tasks[0].operator_delegate, BranchPythonOperator)
+ self.assertIsInstance(self.dag.tasks[1], OperatorWithVariableResolving)
+ self.assertIsInstance(self.dag.tasks[1].operator_delegate, CloudFormationCreateStackOperator)
+ self.assertIsInstance(self.dag.tasks[2], OperatorWithVariableResolving)
+ self.assertIsInstance(self.dag.tasks[2].operator_delegate, CloudFormationCreateStackSensor)
self.assertIsInstance(self.dag.tasks[3], DummyOperator)
def test_cloudformation_does_not_exist(self):
@@ -98,7 +102,7 @@ class TestCreateCloudFormationStackTask(TestCase):
mock_cf_conn.describe_stacks.return_value.raiseError.side_effect = Exception()
mock_conn.return_value = mock_cf_conn
- is_cloudformation_exists = self.dag.tasks[0]
+ is_cloudformation_exists = self.dag.tasks[0].operator_delegate
print(is_cloudformation_exists)
self.assertEqual(
@@ -106,7 +110,7 @@ class TestCreateCloudFormationStackTask(TestCase):
'create-cloudformation-create_emr')
def test_cloudformation_exist_and_running(self):
- is_cloudformation_exists = self.dag.tasks[0]
+ is_cloudformation_exists = self.dag.tasks[0].operator_delegate
for status in ['CREATE_COMPLETE', 'DELETE_FAILED']:
with mock.patch.object(CloudFormationHook, 'get_conn') as mock_conn:
@@ -126,7 +130,7 @@ class TestCreateCloudFormationStackTask(TestCase):
'creation-end-create_emr')
def test_cloudformation_exists_and_not_running(self):
- is_cloudformation_exists = self.dag.tasks[0]
+ is_cloudformation_exists = self.dag.tasks[0].operator_delegate
for status in ['DELETED']:
with mock.patch.object(CloudFormationHook, 'get_conn') as mock_conn:
diff --git a/tests/runners/airflow/tasks/test_delete_cloudformation_stack.py b/tests/runners/airflow/tasks/test_delete_cloudformation_stack.py
index fc96114..dbb2656 100644
--- a/tests/runners/airflow/tasks/test_delete_cloudformation_stack.py
+++ b/tests/runners/airflow/tasks/test_delete_cloudformation_stack.py
@@ -27,6 +27,7 @@ from moto import mock_cloudformation
from liminal.runners.airflow.operators.cloudformation import CloudFormationDeleteStackOperator, \
CloudFormationDeleteStackSensor
+from liminal.runners.airflow.operators.operator_with_variable_resolving import OperatorWithVariableResolving
from liminal.runners.airflow.tasks.delete_cloudformation_stack import DeleteCloudFormationStackTask
from tests.util import dag_test_utils
@@ -68,8 +69,10 @@ class TestDeleteCloudFormationStackTask(TestCase):
self.assertEqual(len(self.dag.tasks), 4)
self.assertIsInstance(self.dag.tasks[0], BranchPythonOperator)
- self.assertIsInstance(self.dag.tasks[1], CloudFormationDeleteStackOperator)
- self.assertIsInstance(self.dag.tasks[2], CloudFormationDeleteStackSensor)
+ self.assertIsInstance(self.dag.tasks[1], OperatorWithVariableResolving)
+ self.assertIsInstance(self.dag.tasks[1].operator_delegate, CloudFormationDeleteStackOperator)
+ self.assertIsInstance(self.dag.tasks[2], OperatorWithVariableResolving)
+ self.assertIsInstance(self.dag.tasks[2].operator_delegate, CloudFormationDeleteStackSensor)
self.assertIsInstance(self.dag.tasks[3], DummyOperator)
def test_check_dags_queued_task(self):
diff --git a/tests/test_licenses.py b/tests/test_licenses.py
index e105b3c..6b4a3b4 100644
--- a/tests/test_licenses.py
+++ b/tests/test_licenses.py
@@ -22,8 +22,10 @@ from unittest import TestCase
from termcolor import colored
-EXCLUDED_EXTENSIONS = ['.gif', '.png', '.pyc', 'LICENSE', 'DISCLAIMER', 'NOTICE', '.whl']
-EXCLUDED_DIRS = ['docs/build', '.git', '.idea', 'venv', 'apache_liminal.egg-info']
+EXCLUDED_EXTENSIONS = [
+ '.gif', '.png', '.pyc', 'LICENSE', 'DISCLAIMER', 'DISCLAIMER-WIP', 'NOTICE', '.whl'
+]
+EXCLUDED_DIRS = ['docs/build', 'build', 'dist', '.git', '.idea', 'venv', 'apache_liminal.egg-info']
EXCLUDED_FILES = ['DISCLAIMER-WIP']
PYTHON_LICENSE_HEADER = """