You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@liminal.apache.org by av...@apache.org on 2021/07/28 07:53:09 UTC

[incubator-liminal] branch master updated: [LIMINAL-56] add default executor (k8s) for spark

This is an automated email from the ASF dual-hosted git repository.

aviemzur pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git


The following commit(s) were added to refs/heads/master by this push:
     new e8c18dc  [LIMINAL-56] add default executor (k8s) for spark
e8c18dc is described below

commit e8c18dc2827141dc6c2e45366d3f5e281a37dd50
Author: Zion Rubin <zi...@naturalint.com>
AuthorDate: Wed Jul 28 10:53:03 2021 +0300

    [LIMINAL-56] add default executor (k8s) for spark
---
 .gitignore                                         |   1 +
 docs/getting-started/spark_app_demo.md             | 344 +++++++++++++++++++++
 docs/nstatic/spark-demo-app/k8s_dag.png            | Bin 0 -> 42631 bytes
 .../aws-ml-app-demo/manifests/aws-ml-app-demo.yaml |   3 +-
 .../spark-app-demo/k8s/__init__.py                 |  15 -
 .../spark-app-demo/k8s/archetype}/liminal.yml      |  36 ++-
 examples/spark-app-demo/k8s/data/iris.csv          | 168 ++++++++++
 .../spark-app-demo/k8s/data_cleanup.py             |  34 +-
 examples/spark-app-demo/k8s/liminal.yml            |  89 ++++++
 examples/spark-app-demo/k8s/model_store.py         |  59 ++++
 .../spark-app-demo/k8s/requirements.txt            |  17 +-
 examples/spark-app-demo/k8s/serving.py             |  53 ++++
 examples/spark-app-demo/k8s/training.py            |  99 ++++++
 .gitignore => liminal/build/image/spark/Dockerfile |  22 +-
 .../build/image/spark/__init__.py                  |  15 -
 .../tasks/hadoop.py => build/image/spark/spark.py} |  21 +-
 liminal/core/config/config.py                      |  14 +
 liminal/core/config/defaults/base/liminal.yml      |   8 +
 .../runners/airflow/dag/liminal_register_dags.py   |  23 +-
 .../{tasks/hadoop.py => executors/airflow.py}      |  17 +-
 liminal/runners/airflow/executors/emr.py           |   4 +-
 liminal/runners/airflow/executors/kubernetes.py    |   5 +-
 liminal/runners/airflow/model/executor.py          |   9 +-
 liminal/runners/airflow/model/task.py              |   7 +-
 .../airflow/tasks/{hadoop.py => airflow.py}        |   9 +-
 liminal/runners/airflow/tasks/containerable.py     |   7 +-
 .../airflow/tasks/create_cloudformation_stack.py   |   8 +-
 .../airflow/tasks/delete_cloudformation_stack.py   |   8 +-
 liminal/runners/airflow/tasks/hadoop.py            |   3 -
 liminal/runners/airflow/tasks/job_end.py           |   8 +-
 liminal/runners/airflow/tasks/job_start.py         |   8 +-
 liminal/runners/airflow/tasks/spark.py             |  18 +-
 liminal/runners/airflow/tasks/sql.py               |  10 +-
 scripts/liminal                                    |  21 +-
 tests/liminal/core/config/test_config.py           | 200 +++++++-----
 .../runners/airflow/build/spark/__init__.py        |  15 -
 .../build/spark/test_spark_image_builder.py        | 110 +++++++
 .../airflow/executors/test_airflow_executor.py     |  22 +-
 .../tasks/test_create_cloudformation_stack.py      |   4 +-
 tests/runners/airflow/tasks/test_job_end.py        |   9 +-
 tests/runners/airflow/tasks/test_job_start.py      |  11 +-
 tests/runners/airflow/tasks/test_python.py         |  26 +-
 tests/runners/airflow/tasks/test_spark_task.py     | 144 +++++++--
 tests/runners/apps/test_app/extra/liminal.yml      |   1 +
 .../runners/apps/test_spark_app/__init__.py        |  15 -
 .../runners/apps/test_spark_app}/liminal.yml       |  40 +--
 .../apps/test_spark_app/wordcount/__init__.py      |  15 -
 .../apps/test_spark_app/wordcount/requirements.txt |  15 +-
 .../apps/test_spark_app/wordcount/wordcount.py     |  47 +++
 .../apps/test_spark_app/wordcount/words.txt        |  16 +-
 tests/test_licenses.py                             |   4 +-
 51 files changed, 1455 insertions(+), 402 deletions(-)

diff --git a/.gitignore b/.gitignore
index fcf8bcd..dfaa502 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,6 +21,7 @@ bin
 include
 lib
 venv
+/build
 .Python
 *.pyc
 pip-selfcheck.json
diff --git a/docs/getting-started/spark_app_demo.md b/docs/getting-started/spark_app_demo.md
new file mode 100644
index 0000000..8755651
--- /dev/null
+++ b/docs/getting-started/spark_app_demo.md
@@ -0,0 +1,344 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Getting started / ***Spark application***
+
+* [Setup your local environment](#Setup-your-local-environment)
+* [Spark on K8S](#Spark-On-K8S)
+    * [Setup liminal](#setup-liminal)
+    * [Liminal YAML walkthrough](#Liminal-YAML-walkthrough)
+    * [Evaluate the Iris Classification model](#Evaluate-the-iris-classification-model)
+    * [Debugging Kubernetes Deployments](#Debugging-Kubernetes-Deployments)
+
+* [Closing up](#Closing-up)
+
+In this tutorial, we will guide you through setting up Apache Liminal on your local machine and run
+a simple machine-learning workflow, based on the classic Iris dataset classification example
+including a feature engineering with Spark engine. \
+More details in
+this [link](https://scikit-learn.org/stable/auto_examples/datasets/plot_iris_dataset.html).
+
+#### Prerequisites
+
+* [Python 3 (3.6 and up)](https://www.python.org/downloads)
+* [Python Virtual Environments](https://pypi.org/project/virtualenv)
+* [Docker Desktop](https://www.docker.com/products/docker-desktop)
+* [Kubernetes CLI (kubectl)](https://kubernetes.io/docs/tasks/tools/install-kubectl-macos)
+
+*Note: Make sure kubernetes cluster is running in docker desktop*
+
+## Setup your local env environment
+
+In the dev folder, clone the example code from liminal:
+
+```BASH
+git clone https://github.com/apache/incubator-liminal
+```
+
+***Note:*** *You just cloned the entire Liminal Project, you actually only need examples folder.*
+
+Create a python virtual environment to isolate your runs:
+
+```BASH
+cd incubator-liminal/examples/spark-app-demo/
+python3 -m venv env
+```
+
+Activate your virtual environment:
+
+```BASH
+source env/bin/activate
+```
+
+Now we are ready to install liminal:
+
+```BASH
+pip install apache-liminal
+```
+
+## Spark On K8S
+
+We will define the following steps and services to implement the Iris classification example
+including a simple feature engineering with Apache Spark: \
+Clean data, Train, Validate & Deploy - Cleaning the input data set and prepare it for training and
+validation execution is managed by Liminal Airflow extension. The training task trains a regression
+model using a public dataset. \
+We then validate the model and deploy it to a model-store in mounted volume. \
+Inference - online inference is done using a Python Flask service running on the local Kubernetes in
+docker desktop. The service exposes the `/predict` endpoint. It reads the model stored in the
+mounted drive and uses it to evaluate the request.
+
+### Setup liminal
+
+```BASH
+cd incubator-liminal/examples/spark-app-demo/k8s
+```
+
+#### Liminal build
+
+The build will create docker images based on the liminal.yml file in the `images` section.
+
+```BASH
+liminal build
+```
+
+#### Liminal create
+
+All tasks use a mounted volume as defined in the pipeline YAML. \
+In our case the mounted volume will point to the liminal Iris Classification example. The training
+task trains a regression model using a public dataset. We then validate the model and deploy it to a
+model-store in the mounted volume.
+
+Create a kubernetes local volume:
+
+```BASH
+liminal create
+```
+
+#### Liminal deploy
+
+The deploy command deploys a liminal server and deploys any liminal.yml files in your working
+directory or any of its subdirectories to your liminal home directory.
+
+```BASH
+liminal deploy --clean  
+```
+
+*Note: liminal home directory is located in the path defined in LIMINAL_HOME env variable. If the
+LIMINAL_HOME environemnet variable is not defined, home directory defaults to
+~/liminal_home directory.*
+
+#### Liminal start
+
+The start command spins up 3 containers that load the Apache Airflow stack. Liminal's Airflow
+extension is responsible to execute the workflows defined in the liminal.yml file as standard
+Airflow DAGs.
+
+```BASH
+liminal start
+```
+
+You can go to graph view to see all the tasks configured in the liminal.yml file:
+[http://localhost:8080/admin/airflow/graph?dag_id=my_first_spark_pipeline](
+http://localhost:8080/admin/airflow/graph?dag_id=my_first_spark_pipeline
+)
+
+You should see the following dag:
+
+![](../nstatic/spark-demo-app/k8s_dag.png)
+
+### Liminal YAML walkthrough
+
+* [Local archetype](#Local-archetype)
+* [Pipeline flow](#Pipeline-flow)
+
+#### Local archetype
+
+A superliminal for an easy local development
+
+```YAML
+name: InfraSpark
+owner: Bosco Albert Baracus
+type: super
+executors:
+  - executor: k8s
+    type: kubernetes
+variables:
+  output_root_dir: /mnt/gettingstartedvol
+  input_root_dir: ''
+images:
+  - image: my_spark_image
+    source: .
+    no_cache: True
+task_defaults:
+  spark:
+    executor: k8s
+    executors: 2
+    application_source: '{{application}}'
+    mounts:
+      - mount: mymount
+        volume: gettingstartedvol
+        path: /mnt/gettingstartedvol
+```
+
+###### We specify the `MOUNT_PATH` in which we store the trained model.
+
+You can read more about `superliminal` `variables` and `defaults`
+in [advanced.liminal.yml](../liminal/advanced.liminal.yml.md)
+
+#### Pipeline flow
+
+Declaration of the pipeline tasks flow in your liminal YAML:
+
+```YAML
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  output_path: '{{output_root_dir}}/my_first_liminal_spark_app_outputs/'
+  application: data_cleanup.py
+task_defaults:
+  python:
+    mounts:
+      - mount: mymount
+        volume: gettingstartedvol
+        path: /mnt/gettingstartedvol
+pipelines:
+  - pipeline: my_first_spark_pipeline
+    start_date: 1970-01-01
+    timeout_minutes: 45
+    schedule: 0 * 1 * *
+    tasks:
+      - task: data_preprocessing
+        type: spark
+        description: prepare the data for training
+        application_arguments:
+          - '{{input_root_dir}}data/iris.csv'
+          - '{{output_path}}'
+      - task: train
+        type: python
+        description: train model
+        image: myorg/mydatascienceapp
+        cmd: python -u training.py train '{{output_path}}'
+        env:
+          MOUNT_PATH: /mnt/gettingstartedvol
+        ...
+```
+
+### Evaluate the iris classification model
+
+Once the Iris Classification model trainging is completed and model is deployed (to the mounted
+volume), you can launch a pod of the pre-built image which contains a flask server, by applying the
+following Kubernetes manifest configuration:
+
+```BASH
+kubectl apply -f manifests/spark-app-demo.yaml
+```
+
+Alternatively, create a Kubernetes pod from stdin:
+
+```YAML
+cat <<EOF | kubectl apply -f -
+---
+apiVersion: v1
+kind: Pod
+metadata:
+  name: spark-app-demo
+spec:
+  volumes:
+    - name: task-pv-storage
+      persistentVolumeClaim:
+        claimName: gettingstartedvol-pvc
+  containers:
+    - name: task-pv-container
+      imagePullPolicy: Never
+      image: myorg/mydatascienceapp
+      lifecycle:
+        postStart:
+          exec:
+            command: [ "/bin/bash", "-c", "apt update && apt install curl -y" ]
+      ports:
+        - containerPort: 80
+          name: "http-server"
+      volumeMounts:
+        - mountPath: "/mnt/gettingstartedvol"
+          name: task-pv-storage
+  EOF
+```
+
+Check that the service is running:
+
+```BASH
+kubectl get pods --namespace=default
+```
+
+Check that the service is up:
+
+```BASH
+kubectl exec -it --namespace=default spark-app-demo-- /bin/bash -c "curl localhost/healthcheck"
+```
+
+Check the prediction:
+
+```BASH
+kubectl exec -it --namespace=default spark-app-demo-- /bin/bash -c "curl -X POST -d '{\"petal_width\": \"2.1\"}' localhost/predict"
+```
+
+## Debugging Kubernetes Deployments
+
+kubectl get pods will help you check your pod status:
+
+```BASH
+kubectl get pods --namespace=default
+```
+
+kubectl logs will help you check your pods log:
+
+```BASH
+kubectl logs --namespace=default spark-app-demo
+```
+
+kubectl exec to get a shell to a running container:
+
+```BASH
+kubectl exec --namespace=default spark-app-demo -- bash
+```
+
+Then you can check the mounted volume `df -h` and to verify the result of the model.
+
+You can go to graph view to see all the tasks configured in the liminal.yml file:
+[http://localhost:8080/admin/airflow/graph?dag_id=my_first_pipeline](
+http://localhost:8080/admin/airflow/graph?dag_id=my_first_pipeline
+
+## Here are the entire list of commands, if you want to start from scratch:
+
+```
+git clone https://github.com/apache/incubator-liminal
+cd examples/spark-app-demo/k8s
+# cd examples/spark-app-demo/emr
+python3 -m venv env
+source env/bin/activate
+rm -rf ~/liminal_home
+pip uninstall apache-liminal
+pip install apache-liminal
+Liminal build
+Liminal create
+liminal deploy --clean
+liminal start
+```
+
+## Closing up
+
+To make sure liminal containers are stopped use:
+
+```
+liminal stop
+```
+
+To deactivate the python virtual env use:
+
+```
+deactivate
+```
+
+To terminate the kubernetes pod:
+
+```
+kubectl delete pod --namespace=default spark-app-demo
+```
diff --git a/docs/nstatic/spark-demo-app/k8s_dag.png b/docs/nstatic/spark-demo-app/k8s_dag.png
new file mode 100644
index 0000000..b0e9a6f
Binary files /dev/null and b/docs/nstatic/spark-demo-app/k8s_dag.png differ
diff --git a/examples/aws-ml-app-demo/manifests/aws-ml-app-demo.yaml b/examples/aws-ml-app-demo/manifests/aws-ml-app-demo.yaml
index 2361ee5..180df8a 100644
--- a/examples/aws-ml-app-demo/manifests/aws-ml-app-demo.yaml
+++ b/examples/aws-ml-app-demo/manifests/aws-ml-app-demo.yaml
@@ -16,6 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 ---
+
 apiVersion: v1
 kind: Pod
 metadata:
@@ -32,7 +33,7 @@ spec:
       lifecycle:
         postStart:
           exec:
-            command: ["/bin/bash", "-c", "apt update && apt install curl -y"]
+            command: [ "/bin/bash", "-c", "apt update && apt install curl -y" ]
       ports:
         - containerPort: 80
           name: "http-server"
diff --git a/.gitignore b/examples/spark-app-demo/k8s/__init__.py
similarity index 85%
copy from .gitignore
copy to examples/spark-app-demo/k8s/__init__.py
index fcf8bcd..217e5db 100644
--- a/.gitignore
+++ b/examples/spark-app-demo/k8s/__init__.py
@@ -15,18 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
diff --git a/liminal/core/config/defaults/base/liminal.yml b/examples/spark-app-demo/k8s/archetype/liminal.yml
similarity index 64%
copy from liminal/core/config/defaults/base/liminal.yml
copy to examples/spark-app-demo/k8s/archetype/liminal.yml
index 729ba22..751043a 100644
--- a/liminal/core/config/defaults/base/liminal.yml
+++ b/examples/spark-app-demo/k8s/archetype/liminal.yml
@@ -16,22 +16,28 @@
 # specific language governing permissions and limitations
 # under the License.
 ---
-name: base
+# superliminal for local development
+name: InfraSpark
+owner: Bosco Albert Baracus
 type: super
 executors:
-  - executor: default_k8s
+  - executor: k8s
     type: kubernetes
-service_defaults:
-  description: add defaults parameters for all services
+variables:
+  output_root_dir: /mnt/gettingstartedvol
+  input_root_dir: ''
+images:
+  - image: my_spark_image
+    source: .
+    no_cache: True
 task_defaults:
-  description: add defaults parameters for all tasks separate by task type
-  python:
-    executor: default_k8s
-pipeline_defaults:
-  description: add defaults parameters for all pipelines
-  before_tasks:
-    - task: start
-      type: job_start
-  after_tasks:
-    - task: end
-      type: job_end
\ No newline at end of file
+  spark:
+    executor: k8s
+    image: my_spark_image
+    executors: 2
+    application_source: '{{application}}'
+    mounts:
+      - mount: mymount
+        volume: gettingstartedvol
+        path: /mnt/gettingstartedvol
+
diff --git a/examples/spark-app-demo/k8s/data/iris.csv b/examples/spark-app-demo/k8s/data/iris.csv
new file mode 100644
index 0000000..339109c
--- /dev/null
+++ b/examples/spark-app-demo/k8s/data/iris.csv
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+ignore: 150,4,setosa,versicolor,virginica,garbage
+5.1,3.5,1.4,0.2,0,21231
+4.9,3.0,1.4,0.2,0,sda
+4.7,3.2,1.3,0.2,0,2321
+4.6,3.1,1.5,0.2,0,
+5.0,3.6,1.4,0.2,0,
+5.4,3.9,1.7,0.4,0,
+4.6,3.4,1.4,0.3,0,
+5.0,3.4,1.5,0.2,0,
+4.4,2.9,1.4,0.2,0,
+4.9,3.1,1.5,0.1,0,
+5.4,3.7,1.5,0.2,0,
+4.8,3.4,1.6,0.2,0,
+4.8,3.0,1.4,0.1,0,
+4.3,3.0,1.1,0.1,0,
+5.8,4.0,1.2,0.2,0,
+5.7,4.4,1.5,0.4,0,
+5.4,3.9,1.3,0.4,0,
+5.1,3.5,1.4,0.3,0,
+5.7,3.8,1.7,0.3,0,
+5.1,3.8,1.5,0.3,0,
+5.4,3.4,1.7,0.2,0,
+5.1,3.7,1.5,0.4,0,
+4.6,3.6,1.0,0.2,0,
+5.1,3.3,1.7,0.5,0,
+4.8,3.4,1.9,0.2,0,
+5.0,3.0,1.6,0.2,0,
+5.0,3.4,1.6,0.4,0,
+5.2,3.5,1.5,0.2,0,
+5.2,3.4,1.4,0.2,0,
+4.7,3.2,1.6,0.2,0,
+4.8,3.1,1.6,0.2,0,
+5.4,3.4,1.5,0.4,0,
+5.2,4.1,1.5,0.1,0,
+5.5,4.2,1.4,0.2,0,
+4.9,3.1,1.5,0.2,0,
+5.0,3.2,1.2,0.2,0,
+5.5,3.5,1.3,0.2,0,
+4.9,3.6,1.4,0.1,0,
+4.4,3.0,1.3,0.2,0,
+5.1,3.4,1.5,0.2,0,
+5.0,3.5,1.3,0.3,0,
+4.5,2.3,1.3,0.3,0,
+4.4,3.2,1.3,0.2,0,
+5.0,3.5,1.6,0.6,0,
+5.1,3.8,1.9,0.4,0,
+4.8,3.0,1.4,0.3,0,
+5.1,3.8,1.6,0.2,0,
+4.6,3.2,1.4,0.2,0,
+5.3,3.7,1.5,0.2,0,
+5.0,3.3,1.4,0.2,0,
+7.0,3.2,4.7,1.4,1,
+6.4,3.2,4.5,1.5,1,
+6.9,3.1,4.9,1.5,1,
+5.5,2.3,4.0,1.3,1,
+6.5,2.8,4.6,1.5,1,
+5.7,2.8,4.5,1.3,1,
+6.3,3.3,4.7,1.6,1,
+4.9,2.4,3.3,1.0,1,
+6.6,2.9,4.6,1.3,1,
+5.2,2.7,3.9,1.4,1,
+5.0,2.0,3.5,1.0,1,
+5.9,3.0,4.2,1.5,1,
+6.0,2.2,4.0,1.0,1,
+6.1,2.9,4.7,1.4,1,
+5.6,2.9,3.6,1.3,1,
+6.7,3.1,4.4,1.4,1,
+5.6,3.0,4.5,1.5,1,
+5.8,2.7,4.1,1.0,1,
+6.2,2.2,4.5,1.5,1,
+5.6,2.5,3.9,1.1,1,
+5.9,3.2,4.8,1.8,1,
+6.1,2.8,4.0,1.3,1,
+6.3,2.5,4.9,1.5,1,
+6.1,2.8,4.7,1.2,1,
+6.4,2.9,4.3,1.3,1,
+6.6,3.0,4.4,1.4,1,
+6.8,2.8,4.8,1.4,1,
+6.7,3.0,5.0,1.7,1,
+6.0,2.9,4.5,1.5,1,
+5.7,2.6,3.5,1.0,1,
+5.5,2.4,3.8,1.1,1,
+5.5,2.4,3.7,1.0,1,
+5.8,2.7,3.9,1.2,1,
+6.0,2.7,5.1,1.6,1,
+5.4,3.0,4.5,1.5,1,
+6.0,3.4,4.5,1.6,1,
+6.7,3.1,4.7,1.5,1,
+6.3,2.3,4.4,1.3,1,
+5.6,3.0,4.1,1.3,1,
+5.5,2.5,4.0,1.3,1,
+5.5,2.6,4.4,1.2,1,
+6.1,3.0,4.6,1.4,1,
+5.8,2.6,4.0,1.2,1,
+5.0,2.3,3.3,1.0,1,
+5.6,2.7,4.2,1.3,1,
+5.7,3.0,4.2,1.2,1,
+5.7,2.9,4.2,1.3,1,
+6.2,2.9,4.3,1.3,1,
+5.1,2.5,3.0,1.1,1,
+5.7,2.8,4.1,1.3,1,
+6.3,3.3,6.0,2.5,2,
+5.8,2.7,5.1,1.9,2,
+7.1,3.0,5.9,2.1,2,
+6.3,2.9,5.6,1.8,2,
+6.5,3.0,5.8,2.2,2,
+7.6,3.0,6.6,2.1,2,
+4.9,2.5,4.5,1.7,2,
+7.3,2.9,6.3,1.8,2,
+6.7,2.5,5.8,1.8,2,
+7.2,3.6,6.1,2.5,2,
+6.5,3.2,5.1,2.0,2,
+6.4,2.7,5.3,1.9,2,
+6.8,3.0,5.5,2.1,2,
+5.7,2.5,5.0,2.0,2,
+5.8,2.8,5.1,2.4,2,
+6.4,3.2,5.3,2.3,2,
+6.5,3.0,5.5,1.8,2,
+7.7,3.8,6.7,2.2,2,
+7.7,2.6,6.9,2.3,2,
+6.0,2.2,5.0,1.5,2,
+6.9,3.2,5.7,2.3,2,
+5.6,2.8,4.9,2.0,2,
+7.7,2.8,6.7,2.0,2,
+6.3,2.7,4.9,1.8,2,
+6.7,3.3,5.7,2.1,2,
+7.2,3.2,6.0,1.8,2,
+6.2,2.8,4.8,1.8,2,
+6.1,3.0,4.9,1.8,2,
+6.4,2.8,5.6,2.1,2,
+7.2,3.0,5.8,1.6,2,
+7.4,2.8,6.1,1.9,2,
+7.9,3.8,6.4,2.0,2,
+6.4,2.8,5.6,2.2,2,
+6.3,2.8,5.1,1.5,2,
+6.1,2.6,5.6,1.4,2,
+7.7,3.0,6.1,2.3,2,
+6.3,3.4,5.6,2.4,2,
+6.4,3.1,5.5,1.8,2,
+6.0,3.0,4.8,1.8,2,
+6.9,3.1,5.4,2.1,2,
+6.7,3.1,5.6,2.4,2,
+6.9,3.1,5.1,2.3,2,
+5.8,2.7,5.1,1.9,2,
+6.8,3.2,5.9,2.3,2,
+6.7,3.3,5.7,2.5,2,
+6.7,3.0,5.2,2.3,2,
+6.3,2.5,5.0,1.9,2,
+6.5,3.0,5.2,2.0,2,
+6.2,3.4,5.4,2.3,2,
+5.9,3.0,5.1,1.8,2,
\ No newline at end of file
diff --git a/liminal/runners/airflow/model/executor.py b/examples/spark-app-demo/k8s/data_cleanup.py
similarity index 52%
copy from liminal/runners/airflow/model/executor.py
copy to examples/spark-app-demo/k8s/data_cleanup.py
index 4cb41d9..a9446ae 100644
--- a/liminal/runners/airflow/model/executor.py
+++ b/examples/spark-app-demo/k8s/data_cleanup.py
@@ -16,25 +16,25 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from abc import ABC, abstractmethod
+import sys
 
+from pyspark.sql import SparkSession
 
-class Executor(ABC):
-    """
-    Executor Task.
-    """
-    # list of task types supported by the executor
-    supported_task_types = []
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        print("Usage: source <file> destination <dest>", file=sys.stderr)
+        sys.exit(-1)
 
-    def __init__(self, executor_id, liminal_config, executor_config):
-        self.liminal_config = liminal_config
-        self.executor_id = executor_id
-        self.executor_config = executor_config
+    spark = SparkSession \
+        .builder \
+        .appName("CleanData") \
+        .getOrCreate()
 
-    @abstractmethod
-    def apply_task_to_dag(self, **kwargs):
-        pass
+    spark.read.text(sys.argv[1]).rdd.filter(lambda x: not x[0].startswith('#')) \
+        .filter(lambda r: not r[0].startswith('ignore')) \
+        .map(lambda r: r[0]).map(
+        lambda r: (
+            r.split(',')[0], r.split(',')[1], r.split(',')[2], r.split(',')[3], r.split(',')[4])) \
+        .toDF().coalesce(1).write.mode("overwrite").option("header", "false").csv(sys.argv[2])
 
-    def _validate_task_type(self, task):
-        assert any([isinstance(task, tYp) for tYp in self.supported_task_types]), \
-            f'supported task types: {self.supported_task_types}'
+    spark.stop()
diff --git a/examples/spark-app-demo/k8s/liminal.yml b/examples/spark-app-demo/k8s/liminal.yml
new file mode 100644
index 0000000..93e8679
--- /dev/null
+++ b/examples/spark-app-demo/k8s/liminal.yml
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  output_path: '{{output_root_dir}}/my_first_liminal_spark_app_outputs/'
+  application: data_cleanup.py
+images:
+  - image: myorg/mydatascienceapp
+    type: python_server
+    source: .
+    endpoints:
+      - endpoint: /predict
+        module: serving
+        function: predict
+      - endpoint: /healthcheck
+        module: serving
+        function: healthcheck
+      - endpoint: /version
+        module: serving
+        function: version
+task_defaults:
+  python:
+    mounts:
+      - mount: mymount
+        volume: gettingstartedvol
+        path: /mnt/gettingstartedvol
+pipelines:
+  - pipeline: my_first_spark_pipeline
+    start_date: 1970-01-01
+    timeout_minutes: 45
+    schedule: 0 * 1 * *
+    tasks:
+      - task: data_preprocessing
+        type: spark
+        description: prepare the data for training
+        application_arguments:
+          - '{{input_root_dir}}data/iris.csv'
+          - '{{output_path}}'
+      - task: train
+        type: python
+        description: train model
+        image: myorg/mydatascienceapp
+        cmd: python -u training.py train '{{output_path}}'
+        env:
+          MOUNT_PATH: /mnt/gettingstartedvol
+      - task: validate
+        type: python
+        description: validate model and deploy
+        image: myorg/mydatascienceapp
+        cmd: python -u training.py validate
+        env:
+          MOUNT_PATH: /mnt/gettingstartedvol
+volumes:
+  - volume: gettingstartedvol
+    claim_name: gettingstartedvol-pvc
+    local:
+      path: .
+services:
+  - service:
+    name: my_datascience_server
+    type: python_server
+    description: my ds server
+    image: myorg/mydatascienceapp
+    source: .
+    endpoints:
+      - endpoint: /predict
+        module: serving
+        function: predict
+      - endpoint: /healthcheck
+        module: serving
+        function: healthcheck
\ No newline at end of file
diff --git a/examples/spark-app-demo/k8s/model_store.py b/examples/spark-app-demo/k8s/model_store.py
new file mode 100644
index 0000000..f2c8509
--- /dev/null
+++ b/examples/spark-app-demo/k8s/model_store.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pickle
+import time
+import glob
+
+import os
+
+MOUNT_PATH = os.environ.get('MOUNT_PATH', '/mnt/gettingstartedvol')
+PRODUCTION = 'production'
+CANDIDATE = 'candidate'
+
+_ONE_HOUR = 60 * 60
+
+
+class ModelStore:
+
+    def __init__(self, env):
+        self.env = env
+        self._latest_model = None
+        self._latest_version = None
+        self._last_check = time.time()
+
+    def load_latest_model(self, force=False):
+        if not self._latest_model or time.time() - self._last_check > _ONE_HOUR or force:
+            self._latest_model, self._latest_version = self._download_latest_model()
+
+        return self._latest_model, self._latest_version
+
+    def save_model(self, model, version):
+        key = 'model.p'
+        path = f'{MOUNT_PATH}/{self.env}/{version}'
+
+        os.makedirs(f'{path}', exist_ok=True)
+        pickle.dump(model, open(f'{path}/{key}', "wb"))
+
+    def _download_latest_model(self):
+        objects = (glob.glob(f'{MOUNT_PATH}/{self.env}/**/*'))
+        models = list(reversed(sorted([obj for obj in objects if obj.endswith('.p')])))
+        latest_key = models[0]
+        version = latest_key.rsplit('/')[-2]
+        print(f'Loading model version {version}')
+        return pickle.load(open(latest_key, 'rb')), version
diff --git a/.gitignore b/examples/spark-app-demo/k8s/requirements.txt
similarity index 85%
copy from .gitignore
copy to examples/spark-app-demo/k8s/requirements.txt
index fcf8bcd..b6b3b87 100644
--- a/.gitignore
+++ b/examples/spark-app-demo/k8s/requirements.txt
@@ -16,17 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
+scikit-learn==0.23.2
+apache-liminal==0.0.2
+pyspark==3.0.0
diff --git a/examples/spark-app-demo/k8s/serving.py b/examples/spark-app-demo/k8s/serving.py
new file mode 100644
index 0000000..02ca58c
--- /dev/null
+++ b/examples/spark-app-demo/k8s/serving.py
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import model_store
+from model_store import ModelStore
+
+_MODEL_STORE = ModelStore(model_store.PRODUCTION)
+_PETAL_WIDTH = 'petal_width'
+
+
+def predict(input_json):
+    try:
+        input_dict = json.loads(input_json)
+        model, version = _MODEL_STORE.load_latest_model()
+        result = str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
+        return json.dumps({"result": result, "version": version})
+
+    except IndexError:
+        return 'Failure: the model is not ready yet'
+
+    except Exception as e:
+        print(e)
+        return 'Failure'
+
+
+def healthcheck(self):
+    return 'Server is up!'
+
+
+def version(self):
+    try:
+        model, version = _MODEL_STORE.load_latest_model()
+        print(f'version={version}')
+        return version
+    except Exception as e:
+        return e
diff --git a/examples/spark-app-demo/k8s/training.py b/examples/spark-app-demo/k8s/training.py
new file mode 100644
index 0000000..caf05bf
--- /dev/null
+++ b/examples/spark-app-demo/k8s/training.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import sys
+import time
+
+import model_store
+import numpy as np
+from model_store import ModelStore
+from sklearn import datasets
+from sklearn.linear_model import LogisticRegression
+import os
+
+_CANDIDATE_MODEL_STORE = ModelStore(model_store.CANDIDATE)
+_PRODUCTION_MODEL_STORE = ModelStore(model_store.PRODUCTION)
+
+import numpy as np
+import csv
+from sklearn.datasets.base import Bunch
+
+
+def load_iris_from_csv_file(f):
+    with open(f) as csv_file:
+        data_file = csv.reader(csv_file)
+        temp = next(data_file)
+        n_samples = 150  # number of data rows, don't count header
+        n_features = 4  # number of columns for features, don't count target column
+        feature_names = ['setosa', 'versicolor', 'virginica']
+        target_names = ['f4']  # adjust accordingly
+        data = np.empty((n_samples, n_features))
+        target = np.empty((n_samples,), dtype=np.int)
+
+        for i, sample in enumerate(data_file):
+            data[i] = np.asarray(sample[:-1], dtype=np.float64)
+            target[i] = np.asarray(sample[-1], dtype=np.int)
+
+    return Bunch(data=data, target=target, feature_names=feature_names, target_names=target_names)
+
+
+def get_dataset(d):
+    print("searching for csv files in {}".format(d))
+    for root, dirs, files in os.walk(d):
+
+        for file in files:
+            if file.endswith(".csv"):
+                return os.path.join(d, file)
+    return None
+
+
+def train_model(f):
+    csv_file = get_dataset(f)
+    if csv_file:
+        print("found {} dataset".format(csv_file))
+
+    iris = load_iris_from_csv_file(csv_file)
+
+    X = iris["data"][:, 3:]  # petal width
+    y = (iris["target"] == 2).astype(np.int)
+
+    model = LogisticRegression()
+    model.fit(X, y)
+
+    version = round(time.time())
+
+    print(f'Saving model with version {version} to candidate model store.')
+    _CANDIDATE_MODEL_STORE.save_model(model, version)
+
+
+def validate_model():
+    model, version = _CANDIDATE_MODEL_STORE.load_latest_model()
+    print(f'Validating model with version {version} to candidate model store.')
+    if not isinstance(model.predict([[1]]), np.ndarray):
+        raise ValueError('Invalid model')
+    print(f'Deploying model with version {version} to production model store.')
+    _PRODUCTION_MODEL_STORE.save_model(model, version)
+
+
+if __name__ == '__main__':
+    cmd = sys.argv[1]
+    if cmd == 'train':
+        train_model(sys.argv[2])
+    elif cmd == 'validate':
+        validate_model()
+    else:
+        raise ValueError(f"Unknown command {cmd}")
diff --git a/.gitignore b/liminal/build/image/spark/Dockerfile
similarity index 85%
copy from .gitignore
copy to liminal/build/image/spark/Dockerfile
index fcf8bcd..db93f5b 100644
--- a/.gitignore
+++ b/liminal/build/image/spark/Dockerfile
@@ -15,18 +15,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+FROM bitnami/spark:3.1.2-debian-10-r23
 
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
+USER root
+
+WORKDIR /app
+
+COPY . /app/
+
+RUN {{mount}} pip install -r requirements.txt
\ No newline at end of file
diff --git a/.gitignore b/liminal/build/image/spark/__init__.py
similarity index 85%
copy from .gitignore
copy to liminal/build/image/spark/__init__.py
index fcf8bcd..217e5db 100644
--- a/.gitignore
+++ b/liminal/build/image/spark/__init__.py
@@ -15,18 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
diff --git a/liminal/runners/airflow/tasks/hadoop.py b/liminal/build/image/spark/spark.py
similarity index 68%
copy from liminal/runners/airflow/tasks/hadoop.py
copy to liminal/build/image/spark/spark.py
index 6003ce7..dbc9e8d 100644
--- a/liminal/runners/airflow/tasks/hadoop.py
+++ b/liminal/build/image/spark/spark.py
@@ -15,20 +15,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import os
 
-from abc import ABC, abstractmethod
+from liminal.build.python import BasePythonImageBuilder
 
-from liminal.runners.airflow.model import task
 
+class SparkImageBuilder(BasePythonImageBuilder):
+    def __init__(self, config, base_path, relative_source_path, tag):
+        super().__init__(config, base_path, relative_source_path, tag)
 
-class HadoopTask(task.Task, ABC):
-    """
-    Hadoop task
-    """
-
-    def apply_task_to_dag(self):
-        return self.executor.apply_task_to_dag(task=self, parent=self.parent)
-
-    @abstractmethod
-    def get_runnable_command(self):
-        pass
+    @staticmethod
+    def _dockerfile_path():
+        return os.path.join(os.path.dirname(__file__), 'Dockerfile')
diff --git a/liminal/core/config/config.py b/liminal/core/config/config.py
index 51c7198..7af3633 100644
--- a/liminal/core/config/config.py
+++ b/liminal/core/config/config.py
@@ -43,6 +43,18 @@ class ConfigUtil:
     __AFTER_TASKS = 'after_tasks'
     __EXECUTORS = 'executors'
     __IMAGES = 'images'
+    __BASE = "base"
+    __PIPELINES = "pipelines"
+    __SUPER = "super"
+    __TYPE = "type"
+    __SUB = "sub"
+    __SERVICES = "services"
+    __TASKS = "tasks"
+    __PIPELINE_DEFAULTS = "pipeline_defaults"
+    __TASK_DEFAULTS = "task_defaults"
+    __BEFORE_TASKS = "before_tasks"
+    __AFTER_TASKS = "after_tasks"
+    __EXECUTORS = "executors"
 
     def __init__(self, configs_path):
         self.configs_path = configs_path
@@ -148,10 +160,12 @@ class ConfigUtil:
         super1_pipeline_defaults = super1.get(self.__PIPELINE_DEFAULTS, {}).copy()
         super2_pipeline_defaults = super2.get(self.__PIPELINE_DEFAULTS, {}).copy()
 
+        super1[self.__PIPELINE_DEFAULTS] = super1_pipeline_defaults
         super1[self.__PIPELINE_DEFAULTS][self.__BEFORE_TASKS] = \
             super2_pipeline_defaults.pop(self.__BEFORE_TASKS, []) + super1_pipeline_defaults.pop(
                 self.__BEFORE_TASKS, [])
 
+        super2[self.__PIPELINE_DEFAULTS] = super2_pipeline_defaults
         super1[self.__PIPELINE_DEFAULTS][self.__AFTER_TASKS] = \
             super1_pipeline_defaults.pop(self.__AFTER_TASKS, []) + super2_pipeline_defaults.pop(
                 self.__AFTER_TASKS, [])
diff --git a/liminal/core/config/defaults/base/liminal.yml b/liminal/core/config/defaults/base/liminal.yml
index 729ba22..41682a2 100644
--- a/liminal/core/config/defaults/base/liminal.yml
+++ b/liminal/core/config/defaults/base/liminal.yml
@@ -21,12 +21,20 @@ type: super
 executors:
   - executor: default_k8s
     type: kubernetes
+  - executor: airflow_executor
+    type: airflow
 service_defaults:
   description: add defaults parameters for all services
 task_defaults:
   description: add defaults parameters for all tasks separate by task type
   python:
     executor: default_k8s
+  spark:
+    executor: default_k8s
+  job_end:
+    executor: airflow_executor
+  job_start:
+    executor: airflow_executor
 pipeline_defaults:
   description: add defaults parameters for all pipelines
   before_tasks:
diff --git a/liminal/runners/airflow/dag/liminal_register_dags.py b/liminal/runners/airflow/dag/liminal_register_dags.py
index a08f1c1..46ff6e4 100644
--- a/liminal/runners/airflow/dag/liminal_register_dags.py
+++ b/liminal/runners/airflow/dag/liminal_register_dags.py
@@ -22,10 +22,10 @@ from datetime import datetime, timedelta
 
 from airflow import DAG
 
-from liminal.core import environment as env
 from liminal.core.config.config import ConfigUtil
 from liminal.core.util import class_util
-from liminal.runners.airflow.model import executor
+from liminal.runners.airflow.executors import airflow
+from liminal.runners.airflow.model import executor as liminal_executor
 from liminal.runners.airflow.model.task import Task
 
 __DEPENDS_ON_PAST = 'depends_on_past'
@@ -62,6 +62,9 @@ def register_dags(configs_path):
 
             executors = __initialize_executors(config)
 
+            default_executor = airflow.AirflowExecutor("default_executor", liminal_config=config,
+                                                       executor_config={})
+
             for pipeline in config['pipelines']:
                 default_args = __default_args(pipeline)
                 dag = __initialize_dag(default_args, pipeline, owner)
@@ -77,11 +80,19 @@ def register_dags(configs_path):
                         trigger_rule=trigger_rule,
                         liminal_config=config,
                         pipeline_config=pipeline,
-                        task_config=task,
-                        executor=executors.get(task.get('executor'))
+                        task_config=task
                     )
 
-                    parent = task_instance.apply_task_to_dag()
+                    executor_id = task.get('executor')
+                    if executor_id:
+                        executor = executors[executor_id]
+                    else:
+                        logging.info(f"Did not find `executor` in ${task['task']} config."
+                                     f" Using the default executor (${type(default_executor)})"
+                                     f" instead.")
+                        executor = default_executor
+
+                    parent = executor.apply_task_to_dag(task=task_instance)
 
                 logging.info(f'registered DAG {dag.dag_id}: {dag.tasks}')
 
@@ -160,7 +171,7 @@ logging.info(f'Finished loading task implementations: {tasks_by_liminal_name.key
 
 executors_by_liminal_name = class_util.find_subclasses_in_packages(
     ['liminal.runners.airflow.executors'],
-    executor.Executor)
+    liminal_executor.Executor)
 
 logging.info(f'Finished loading executor implementations: {executors_by_liminal_name.keys()}')
 
diff --git a/liminal/runners/airflow/tasks/hadoop.py b/liminal/runners/airflow/executors/airflow.py
similarity index 71%
copy from liminal/runners/airflow/tasks/hadoop.py
copy to liminal/runners/airflow/executors/airflow.py
index 6003ce7..98ff0a5 100644
--- a/liminal/runners/airflow/tasks/hadoop.py
+++ b/liminal/runners/airflow/executors/airflow.py
@@ -16,19 +16,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from abc import ABC, abstractmethod
+from liminal.runners.airflow.model import executor
+from liminal.runners.airflow.tasks import airflow
 
-from liminal.runners.airflow.model import task
 
-
-class HadoopTask(task.Task, ABC):
+class AirflowExecutor(executor.Executor):
     """
-    Hadoop task
+    Execute the task steps
     """
 
-    def apply_task_to_dag(self):
-        return self.executor.apply_task_to_dag(task=self, parent=self.parent)
+    supported_task_types = [airflow.AirflowTask]
 
-    @abstractmethod
-    def get_runnable_command(self):
-        pass
+    def _apply_executor_task_to_dag(self, **kwargs):
+        return kwargs['task'].apply_task_to_dag()
diff --git a/liminal/runners/airflow/executors/emr.py b/liminal/runners/airflow/executors/emr.py
index fd9bf34..eab7e87 100644
--- a/liminal/runners/airflow/executors/emr.py
+++ b/liminal/runners/airflow/executors/emr.py
@@ -37,9 +37,9 @@ class EMRExecutor(executor.Executor):
         self.job_flow_id = self.executor_config.get('cluster_id', None)
         self.job_flow_name = self.executor_config.get('cluster_name', None)
 
-    def apply_task_to_dag(self, **kwargs):
+    def _apply_executor_task_to_dag(self, **kwargs):
         task = kwargs['task']
-        parent = kwargs.get('parent', task.parent)
+        parent = task.parent
 
         self._validate_task_type(task)
 
diff --git a/liminal/runners/airflow/executors/kubernetes.py b/liminal/runners/airflow/executors/kubernetes.py
index c28d5dc..eb1c34c 100644
--- a/liminal/runners/airflow/executors/kubernetes.py
+++ b/liminal/runners/airflow/executors/kubernetes.py
@@ -49,13 +49,12 @@ class KubernetesPodExecutor(executor.Executor):
         self.task_name = self.executor_config['executor']
         self.volumes = self._volumes()
 
-    def apply_task_to_dag(self, **kwargs):
+    def _apply_executor_task_to_dag(self, **kwargs):
         task = kwargs['task']
+        parent = task.parent
 
         self._validate_task_type(task)
 
-        parent = kwargs.get('parent', task.parent)
-
         pod_task = KubernetesPodOperator(
             dag=task.dag,
             trigger_rule=task.trigger_rule,
diff --git a/liminal/runners/airflow/model/executor.py b/liminal/runners/airflow/model/executor.py
index 4cb41d9..918ed21 100644
--- a/liminal/runners/airflow/model/executor.py
+++ b/liminal/runners/airflow/model/executor.py
@@ -31,8 +31,15 @@ class Executor(ABC):
         self.executor_id = executor_id
         self.executor_config = executor_config
 
-    @abstractmethod
     def apply_task_to_dag(self, **kwargs):
+        task = kwargs['task']
+
+        self._validate_task_type(task)
+
+        return self._apply_executor_task_to_dag(task=task)
+
+    @abstractmethod
+    def _apply_executor_task_to_dag(self, **kwargs):
         pass
 
     def _validate_task_type(self, task):
diff --git a/liminal/runners/airflow/model/task.py b/liminal/runners/airflow/model/task.py
index 47be264..e311b59 100644
--- a/liminal/runners/airflow/model/task.py
+++ b/liminal/runners/airflow/model/task.py
@@ -28,7 +28,7 @@ class Task(ABC):
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config, executor=None):
+                 task_config):
         self.liminal_config = liminal_config
         self.dag = dag
         self.pipeline_config = pipeline_config
@@ -36,8 +36,3 @@ class Task(ABC):
         self.parent = parent
         self.trigger_rule = trigger_rule
         self.task_config = task_config
-        self.executor = executor
-
-    @abstractmethod
-    def apply_task_to_dag(self, **kwargs):
-        pass
diff --git a/liminal/runners/airflow/tasks/hadoop.py b/liminal/runners/airflow/tasks/airflow.py
similarity index 85%
copy from liminal/runners/airflow/tasks/hadoop.py
copy to liminal/runners/airflow/tasks/airflow.py
index 6003ce7..e577c66 100644
--- a/liminal/runners/airflow/tasks/hadoop.py
+++ b/liminal/runners/airflow/tasks/airflow.py
@@ -21,14 +21,11 @@ from abc import ABC, abstractmethod
 from liminal.runners.airflow.model import task
 
 
-class HadoopTask(task.Task, ABC):
+class AirflowTask(task.Task, ABC):
     """
-    Hadoop task
+    Airflow task
     """
 
-    def apply_task_to_dag(self):
-        return self.executor.apply_task_to_dag(task=self, parent=self.parent)
-
     @abstractmethod
-    def get_runnable_command(self):
+    def apply_task_to_dag(self):
         pass
diff --git a/liminal/runners/airflow/tasks/containerable.py b/liminal/runners/airflow/tasks/containerable.py
index fa3af71..9f731e3 100644
--- a/liminal/runners/airflow/tasks/containerable.py
+++ b/liminal/runners/airflow/tasks/containerable.py
@@ -39,9 +39,9 @@ class ContainerTask(task.Task, ABC):
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config, executor=None):
+                 task_config):
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
-                         pipeline_config, task_config, executor)
+                         pipeline_config, task_config)
         env = standalone_variable_backend.get_variable(ENV, DEFAULT)
         self.env_vars = self.__env_vars(env)
         self.image = self.task_config['image']
@@ -51,9 +51,6 @@ class ContainerTask(task.Task, ABC):
             self.env_vars.get(OUTPUT_DESTINATION_PATH)
         )
 
-    def apply_task_to_dag(self):
-        return self.executor.apply_task_to_dag(task=self, parent=self.parent)
-
     def _kubernetes_cmds_and_arguments(self, output_path, output_destination_path):
         cmds = ['/bin/sh', '-c']
 
diff --git a/liminal/runners/airflow/tasks/create_cloudformation_stack.py b/liminal/runners/airflow/tasks/create_cloudformation_stack.py
index 7bbe2b9..9a7aef9 100644
--- a/liminal/runners/airflow/tasks/create_cloudformation_stack.py
+++ b/liminal/runners/airflow/tasks/create_cloudformation_stack.py
@@ -20,20 +20,20 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python_operator import BranchPythonOperator
 from flatdict import FlatDict
 
-from liminal.runners.airflow.model import task
 from liminal.runners.airflow.operators.cloudformation import CloudFormationCreateStackOperator, \
     CloudFormationCreateStackSensor, CloudFormationHook
+from liminal.runners.airflow.tasks import airflow
 
 
-class CreateCloudFormationStackTask(task.Task):
+class CreateCloudFormationStackTask(airflow.AirflowTask):
     """
     Creates cloud_formation stack.
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config, executor=None):
+                 task_config):
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
-                         pipeline_config, task_config, executor)
+                         pipeline_config, task_config)
         self.stack_name = task_config['stack_name']
 
     def apply_task_to_dag(self):
diff --git a/liminal/runners/airflow/tasks/delete_cloudformation_stack.py b/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
index 500fef1..324c961 100644
--- a/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
+++ b/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
@@ -19,20 +19,20 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python_operator import BranchPythonOperator
 from airflow.utils.trigger_rule import TriggerRule
 
-from liminal.runners.airflow.model import task
 from liminal.runners.airflow.operators.cloudformation import CloudFormationDeleteStackOperator, \
     CloudFormationDeleteStackSensor
+from liminal.runners.airflow.tasks import airflow
 
 
-class DeleteCloudFormationStackTask(task.Task):
+class DeleteCloudFormationStackTask(airflow.AirflowTask):
     """
     Deletes cloud_formation stack.
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config, executor=None):
+                 task_config):
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
-                         pipeline_config, task_config, executor)
+                         pipeline_config, task_config)
         self.stack_name = task_config['stack_name']
 
     def apply_task_to_dag(self):
diff --git a/liminal/runners/airflow/tasks/hadoop.py b/liminal/runners/airflow/tasks/hadoop.py
index 6003ce7..7a79460 100644
--- a/liminal/runners/airflow/tasks/hadoop.py
+++ b/liminal/runners/airflow/tasks/hadoop.py
@@ -26,9 +26,6 @@ class HadoopTask(task.Task, ABC):
     Hadoop task
     """
 
-    def apply_task_to_dag(self):
-        return self.executor.apply_task_to_dag(task=self, parent=self.parent)
-
     @abstractmethod
     def get_runnable_command(self):
         pass
diff --git a/liminal/runners/airflow/tasks/job_end.py b/liminal/runners/airflow/tasks/job_end.py
index c71f0ff..ff5cffa 100644
--- a/liminal/runners/airflow/tasks/job_end.py
+++ b/liminal/runners/airflow/tasks/job_end.py
@@ -16,19 +16,19 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from liminal.runners.airflow.model import task
 from liminal.runners.airflow.operators.job_status_operator import JobEndOperator
+from liminal.runners.airflow.tasks import airflow
 
 
-class JobEndTask(task.Task):
+class JobEndTask(airflow.AirflowTask):
     """
       Job end task. Reports job end metrics.
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config, executor=None):
+                 task_config):
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                         task_config, executor)
+                         task_config)
         metrics = self.liminal_config.get('metrics', {})
         self.metrics_namespace = metrics.get('namespace', '')
         self.metrics_backends = metrics.get('backends', [])
diff --git a/liminal/runners/airflow/tasks/job_start.py b/liminal/runners/airflow/tasks/job_start.py
index c5c6234..b6bef32 100644
--- a/liminal/runners/airflow/tasks/job_start.py
+++ b/liminal/runners/airflow/tasks/job_start.py
@@ -16,19 +16,19 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from liminal.runners.airflow.model import task
 from liminal.runners.airflow.operators.job_status_operator import JobStartOperator
+from liminal.runners.airflow.tasks import airflow
 
 
-class JobStartTask(task.Task):
+class JobStartTask(airflow.AirflowTask):
     """
     Job start task. Reports job start metrics.
     """
 
     def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config, executor=None):
+                 task_config):
         super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                         task_config, executor)
+                         task_config)
         metrics = self.liminal_config.get('metrics', {})
         self.metrics_namespace = metrics.get('namespace', '')
         self.metrics_backends = metrics.get('backends', [])
diff --git a/liminal/runners/airflow/tasks/spark.py b/liminal/runners/airflow/tasks/spark.py
index c29c4d3..247df2c 100644
--- a/liminal/runners/airflow/tasks/spark.py
+++ b/liminal/runners/airflow/tasks/spark.py
@@ -16,15 +16,24 @@
 # specific language governing permissions and limitations
 # under the License.
 from itertools import chain
-from liminal.runners.airflow.tasks import hadoop
+
 from flatdict import FlatDict
 
+from liminal.runners.airflow.tasks import hadoop, containerable
+
 
-class SparkTask(hadoop.HadoopTask):
+class SparkTask(hadoop.HadoopTask, containerable.ContainerTask):
     """
     Executes a Spark application.
     """
 
+    def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                 task_config):
+        task_config['image'] = task_config.get('image', '')
+        task_config['cmd'] = task_config.get('cmd', [])
+        super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
+                         pipeline_config, task_config)
+
     def get_runnable_command(self):
         """
         Return spark-submit runnable command
@@ -65,6 +74,8 @@ class SparkTask(hadoop.HadoopTask):
 
     def __additional_arguments(self):
         application_arguments = self.task_config.get('application_arguments', {})
+        if type(application_arguments) == list:
+            return application_arguments
         return self.__interleaving(application_arguments.keys(), application_arguments.values())
 
     def __parse_spark_arguments(self, spark_arguments):
@@ -76,3 +87,6 @@ class SparkTask(hadoop.HadoopTask):
     @staticmethod
     def __interleaving(keys, values):
         return list(chain.from_iterable(zip(keys, values)))
+
+    def _kubernetes_cmds_and_arguments(self, output_path, output_destination_path):
+        return self.__generate_spark_submit(), []
diff --git a/liminal/runners/airflow/tasks/sql.py b/liminal/runners/airflow/tasks/sql.py
index 7224852..e17964f 100644
--- a/liminal/runners/airflow/tasks/sql.py
+++ b/liminal/runners/airflow/tasks/sql.py
@@ -19,15 +19,7 @@
 from liminal.runners.airflow.model import task
 
 
-class SparkTask(task.Task):
+class SqlTask(task.Task):
     """
     Executes an SQL application.
     """
-
-    def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                 task_config, executor):
-        super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
-                         task_config, executor)
-
-    def apply_task_to_dag(self):
-        pass
diff --git a/scripts/liminal b/scripts/liminal
index 48282cf..c886592 100755
--- a/scripts/liminal
+++ b/scripts/liminal
@@ -31,6 +31,7 @@ import scripts
 from liminal.build import liminal_apps_builder
 from liminal.core import environment
 from liminal.core.util import files_util
+from liminal.core.config.config import ConfigUtil
 from liminal.kubernetes import volume_util
 from liminal.logging.logging_setup import logging_initialization
 from liminal.runners.airflow import dag
@@ -116,6 +117,7 @@ def docker_compose_command(command_name, args):
         subprocess.call(run_command, env=os.environ, shell=True)
         return '', ''
 
+
 @cli.command("deploy", short_help="deploys your liminal.yaml files to $LIMINAL_HOME folder")
 @click.option('--path', default=os.getcwd(), help="folder containing liminal.yaml files")
 @click.option('--clean', is_flag=True, default=False,
@@ -134,6 +136,7 @@ def deploy_liminal_apps(path, clean):
         pathlib.Path(target_yml_name).parent.mkdir(parents=True, exist_ok=True)
         shutil.copyfile(config_file, target_yml_name)
 
+
 def liminal_is_running():
     stdout, stderr = docker_compose_command('ps', [])
     return "liminal" in stdout
@@ -161,20 +164,24 @@ def logs(follow, tail):
             stdout, stderr = docker_compose_command('logs', [f'--tail={tail}'])
             logging.info(stdout)
 
+
 @cli.command("create", short_help="create a kubernetes local volume")
-@click.option('--local-volume-path', default=os.getcwd(), help="folder containing liminal.yaml files")
+@click.option('--local-volume-path', default=os.getcwd(),
+              help="folder containing liminal.yaml files")
 def create(local_volume_path):
     click.echo("creating a kubernetes local volume")
-    config_files = files_util.find_config_files(local_volume_path)
-    for config_file in config_files:
-        with open(config_file) as stream:
-            config = yaml.safe_load(stream)
-            volume_util.create_local_volumes(config, os.path.dirname(config_file))
+    configs = ConfigUtil(local_volume_path).safe_load(is_render_variables=True,
+                                                      soft_merge=True)
+    for config in configs:
+        volume_util.create_local_volumes(config, os.path.dirname(
+            files_util.resolve_pipeline_source_file(config['name'])))
+
 
 @cli.command("start",
              short_help="starts a local airflow in docker compose. should be run after deploy. " +
                         "Make sure docker is running on your machine")
-@click.option('--detached-mode', '-d', is_flag=True, default=False, help="Start liminal in detached mode.")
+@click.option('--detached-mode', '-d', is_flag=True, default=False,
+              help="Start liminal in detached mode.")
 def start(detached_mode):
     liminal_version = environment.get_liminal_version()
     logging.info(f'starting liminal version: {liminal_version}')
diff --git a/tests/liminal/core/config/test_config.py b/tests/liminal/core/config/test_config.py
index b4baf9b..57201be 100644
--- a/tests/liminal/core/config/test_config.py
+++ b/tests/liminal/core/config/test_config.py
@@ -117,6 +117,50 @@ class TestConfigUtil(TestCase):
             'type': 'sub'
         }]
 
+        expected = [{'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'},
+                                   {'executor': 'airflow_executor', 'type': 'airflow'}],
+                     'images': [{'image': 'my_image', 'source': '.'}],
+                     'name': 'my_subliminal_test',
+                     'pipeline_defaults': {'param1': 'param1_value'},
+                     'pipelines': [{'description': 'add defaults parameters for all pipelines',
+                                    'name': 'mypipe1',
+                                    'param': 'constant',
+                                    'param1': 'param1_value',
+                                    'param2': 'param2super_value',
+                                    'param3': 'param3super_value',
+                                    'param4': 'param4hyper_value',
+                                    'tasks': [{'executor': 'airflow_executor',
+                                               'task': 'start',
+                                               'task_def1': 'task_def1_value',
+                                               'task_def2': {'task_def2_1': 'task_def2_1_value'},
+                                               'task_sub_def': 'task_sub_def_value',
+                                               'type': 'job_start'},
+                                              {'executor': 'airflow_executor',
+                                               'task': 'end',
+                                               'type': 'job_end'}]},
+                                   {'description': 'add defaults parameters for all pipelines',
+                                    'name': 'mypipe2',
+                                    'param': 'constant',
+                                    'param1': 'param1_value',
+                                    'param2': 'param2super_value',
+                                    'param3': 'param3super_value',
+                                    'param4': 'param4hyper_value',
+                                    'tasks': [{'executor': 'airflow_executor',
+                                               'task': 'start',
+                                               'task_def1': 'task_def1_value',
+                                               'task_def2': {'task_def2_1': 'task_def2_1_value'},
+                                               'task_sub_def': 'task_sub_def_value',
+                                               'type': 'job_start'},
+                                              {'executor': 'airflow_executor',
+                                               'task': 'end',
+                                               'type': 'job_end'}]}],
+                     'service_defaults': {'description': 'add defaults parameters for all '
+                                                         'services'},
+                     'services': [],
+                     'super': 'my_superliminal_test',
+                     'task_defaults': {'job_start': {'task_sub_def': 'task_sub_def_value'}},
+                     'type': 'sub'}]
+
         find_config_files_mock.return_value = {
             'my_subliminal_test': subliminal,
             'my_superliminal_test': superliminal,
@@ -151,7 +195,8 @@ class TestConfigUtil(TestCase):
 
     @mock.patch('liminal.core.util.files_util.load')
     def test_get_superliminal(self, find_config_files_mock):
-        base = {'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'}],
+        base = {'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'},
+                              {'executor': 'airflow_executor', 'type': 'airflow'}],
                 'name': 'base',
                 'pipeline_defaults': {'after_tasks': [{'task': 'end', 'type': 'job_end'}],
                                       'before_tasks': [{'task': 'start', 'type': 'job_start'}],
@@ -161,7 +206,10 @@ class TestConfigUtil(TestCase):
                                                     'services'},
                 'task_defaults': {'description': 'add defaults parameters for all tasks '
                                                  'separate by task type',
-                                  'python': {'executor': 'default_k8s'}},
+                                  'job_end': {'executor': 'airflow_executor'},
+                                  'job_start': {'executor': 'airflow_executor'},
+                                  'python': {'executor': 'default_k8s'},
+                                  'spark': {'executor': 'default_k8s'}},
                 'type': 'super'}
         subliminal = {
             'name': 'subliminal_test',
@@ -335,62 +383,67 @@ class TestConfigUtil(TestCase):
             }
         }
 
-        expected = [{
-            'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'}],
-            'name': 'my_subliminal_test',
-            'pipeline_defaults': {'param1': '-case'},
-            'pipelines': [{'description': 'add defaults parameters for all pipelines',
-                           'global_conf': 'super_var',
-                           'name': 'mypipe1',
-                           'param': 'simple case',
-                           'param1': '-case',
-                           'param2': '{{pipe-var}}',
-                           'param3': 'param3super_value',
-                           'param4': 'param4hyper_value',
-                           'tasks': [{'task': 'start',
-                                      'task_def1:': 'task_sub_def_value',
-                                      'type': 'job_start'},
-                                     {'task': 'second_task', 'type': 'dummy'},
-                                     {'task': 'sub_tasks', 'type': 'dummy'},
-                                     {'task': 'before_last_task', 'type': 'dummy'},
-                                     {'task': 'end', 'type': 'job_end'}]},
-                          {'description': 'add defaults parameters for all pipelines',
-                           'global_conf': 'super_var',
-                           'name': 'mypipe2',
-                           'param': '-case',
-                           'param1': '-case',
-                           'param2': '{{pipe-var}}',
-                           'param3': 'param3super_value',
-                           'param4': 'param4hyper_value',
-                           'tasks': [{'task': 'start',
-                                      'task_def1:': 'task_sub_def_value',
-                                      'type': 'job_start'},
-                                     {'task': 'second_task', 'type': 'dummy'},
-                                     {'task': 'sub_tasks', 'type': 'dummy'},
-                                     {'task': 'before_last_task', 'type': 'dummy'},
-                                     {'task': 'end', 'type': 'job_end'}]}],
-            'service_defaults': {'description': 'add defaults parameters for all '
-                                                'services'},
-            'images': [],
-            'services': [{'description': 'add defaults parameters for all services',
-                          'image': 'prod image',
-                          'name': 'my_python_server',
-                          'type': 'python_server'},
-                         {'description': 'add defaults parameters for all services',
-                          'image': 'default_image_value',
-                          'name': 'my_python_server_for_stg',
-                          'type': 'python_server'}],
-            'super': 'my_superliminal_test',
-            'task_defaults': {'job_start': {'task_def1:': 'task_sub_def_value'}},
-            'type': 'sub',
-            'variables': {'a': 'myenv1',
-                          'b': 'myenv12',
-                          'c': 'myenv1myenv122',
-                          'image': 'prod image',
-                          'var': 'simple case',
-                          'var-2': '-case',
-                          'var_2': '_case'}
-        }]
+        expected = [{'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'},
+                                   {'executor': 'airflow_executor', 'type': 'airflow'}],
+                     'name': 'my_subliminal_test',
+                     'pipeline_defaults': {'param1': '-case'},
+                     'pipelines': [{'description': 'add defaults parameters for all pipelines',
+                                    'global_conf': 'super_var',
+                                    'name': 'mypipe1',
+                                    'param': 'simple case',
+                                    'param1': '-case',
+                                    'param2': '{{pipe-var}}',
+                                    'param3': 'param3super_value',
+                                    'param4': 'param4hyper_value',
+                                    'tasks': [{'executor': 'airflow_executor',
+                                               'task': 'start',
+                                               'task_def1:': 'task_sub_def_value',
+                                               'type': 'job_start'},
+                                              {'task': 'second_task', 'type': 'dummy'},
+                                              {'task': 'sub_tasks', 'type': 'dummy'},
+                                              {'task': 'before_last_task', 'type': 'dummy'},
+                                              {'executor': 'airflow_executor',
+                                               'task': 'end',
+                                               'type': 'job_end'}]},
+                                   {'description': 'add defaults parameters for all pipelines',
+                                    'global_conf': 'super_var',
+                                    'name': 'mypipe2',
+                                    'param': '-case',
+                                    'param1': '-case',
+                                    'param2': '{{pipe-var}}',
+                                    'param3': 'param3super_value',
+                                    'param4': 'param4hyper_value',
+                                    'tasks': [{'executor': 'airflow_executor',
+                                               'task': 'start',
+                                               'task_def1:': 'task_sub_def_value',
+                                               'type': 'job_start'},
+                                              {'task': 'second_task', 'type': 'dummy'},
+                                              {'task': 'sub_tasks', 'type': 'dummy'},
+                                              {'task': 'before_last_task', 'type': 'dummy'},
+                                              {'executor': 'airflow_executor',
+                                               'task': 'end',
+                                               'type': 'job_end'}]}],
+                     'service_defaults': {'description': 'add defaults parameters for all '
+                                                         'services'},
+                     'images': [],
+                     'services': [{'description': 'add defaults parameters for all services',
+                                   'image': 'prod image',
+                                   'name': 'my_python_server',
+                                   'type': 'python_server'},
+                                  {'description': 'add defaults parameters for all services',
+                                   'image': 'default_image_value',
+                                   'name': 'my_python_server_for_stg',
+                                   'type': 'python_server'}],
+                     'super': 'my_superliminal_test',
+                     'task_defaults': {'job_start': {'task_def1:': 'task_sub_def_value'}},
+                     'type': 'sub',
+                     'variables': {'a': 'myenv1',
+                                   'b': 'myenv12',
+                                   'c': 'myenv1myenv122',
+                                   'image': 'prod image',
+                                   'var': 'simple case',
+                                   'var-2': '-case',
+                                   'var_2': '_case'}}]
 
         find_config_files_mock.return_value = {
             'my_subliminal_test': subliminal,
@@ -424,28 +477,29 @@ class TestConfigUtil(TestCase):
             ]
         }
 
-        expected = {
-            'name': 'my_subliminal_test', 'type': 'sub',
-            'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'}],
-            'service_defaults': {'description': 'add defaults parameters for all services'},
-            'task_defaults': {
-                'description': 'add defaults parameters for all tasks separate by task type',
-                'python': {'executor': 'default_k8s'}}, 'pipeline_defaults': {
+        expected = {'name': 'my_subliminal_test', 'type': 'sub',
+                    'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'},
+                                  {'executor': 'airflow_executor', 'type': 'airflow'}],
+                    'service_defaults': {'description': 'add defaults parameters for all services'},
+                    'task_defaults': {
+                        'description': 'add defaults parameters for all tasks separate by task type',
+                        'python': {'executor': 'default_k8s'}, 'spark': {'executor': 'default_k8s'},
+                        'job_end': {'executor': 'airflow_executor'},
+                        'job_start': {'executor': 'airflow_executor'}}, 'pipeline_defaults': {
                 'description': 'add defaults parameters for all pipelines',
                 'before_tasks': [{'task': 'start', 'type': 'job_start'}],
                 'after_tasks': [{'task': 'end', 'type': 'job_end'}]},
-            'variables': {'var': 1, 'var-2': True}, 'pipelines': [
+                    'variables': {'var': 1, 'var-2': True}, 'pipelines': [
                 {'name': 'mypipe1', 'param': '1',
                  'description': 'add defaults parameters for all pipelines',
-                 'tasks': [{'task': 'start', 'type': 'job_start'},
-                           {'task': 'end', 'type': 'job_end'}]},
+                 'tasks': [{'task': 'start', 'type': 'job_start', 'executor': 'airflow_executor'},
+                           {'task': 'end', 'type': 'job_end', 'executor': 'airflow_executor'}]},
                 {'name': 'mypipe2', 'param': 'True',
                  'description': 'add defaults parameters for all pipelines',
-                 'tasks': [{'task': 'start', 'type': 'job_start'},
-                           {'task': 'end', 'type': 'job_end'}]}],
-            'images': [],
-            'services': []
-        }
+                 'tasks': [{'task': 'start', 'type': 'job_start', 'executor': 'airflow_executor'},
+                           {'task': 'end', 'type': 'job_end', 'executor': 'airflow_executor'}]}],
+                    'images': [],
+                    'services': []}
 
         find_config_files_mock.return_value = {
             'my_subliminal_test': subliminal
diff --git a/.gitignore b/tests/runners/airflow/build/spark/__init__.py
similarity index 85%
copy from .gitignore
copy to tests/runners/airflow/build/spark/__init__.py
index fcf8bcd..217e5db 100644
--- a/.gitignore
+++ b/tests/runners/airflow/build/spark/__init__.py
@@ -15,18 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
diff --git a/tests/runners/airflow/build/spark/test_spark_image_builder.py b/tests/runners/airflow/build/spark/test_spark_image_builder.py
new file mode 100644
index 0000000..70148b4
--- /dev/null
+++ b/tests/runners/airflow/build/spark/test_spark_image_builder.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import shutil
+import tempfile
+from unittest import TestCase
+
+import docker
+
+from liminal.build.image.spark.spark import SparkImageBuilder
+
+
+class TestSparkImageBuilder(TestCase):
+    __IMAGE_NAME = 'my_spark_image'
+
+    def setUp(self) -> None:
+        super().setUp()
+        os.environ['TMPDIR'] = '/tmp'
+        self.temp_dir = self.__temp_dir()
+        self.temp_airflow_dir = self.__temp_dir()
+
+    def tearDown(self) -> None:
+        super().tearDown()
+        self.__remove_dir(self.temp_dir)
+        self.__remove_dir(self.temp_airflow_dir)
+
+    def test_build(self):
+        build_out = self.__test_build()
+        self.assertTrue('RUN pip install -r requirements.txt' in build_out,
+                        'Incorrect pip command')
+
+        self.__test_image()
+
+    def __test_build(self):
+        config = self.__create_conf('my_task')
+
+        base_path = os.path.join(os.path.dirname(__file__), '../../../apps/test_spark_app')
+
+        builder = SparkImageBuilder(config=config,
+                                    base_path=base_path,
+                                    relative_source_path='wordcount',
+                                    tag=self.__IMAGE_NAME)
+
+        build_out = str(builder.build())
+
+        return build_out
+
+    def __test_image(self):
+        docker_client = docker.from_env()
+        docker_client.images.get(self.__IMAGE_NAME)
+
+        cmds = ['spark-submit', 'wordcount.py', 'words.txt', '/mnt/vol1/outputs']
+
+        container_log = docker_client.containers.run(self.__IMAGE_NAME,
+                                                     cmds,
+                                                     volumes={
+                                                         self.temp_dir: {
+                                                             'bind': '/mnt/vol1',
+                                                             'mode': 'rw'
+                                                         }
+                                                     })
+
+        docker_client.close()
+
+        logging.info(container_log)
+
+        self.assertEqual(
+            "b'\\n"
+            "my: 1\\n"
+            "first: 1\\n"
+            "liminal: 1\\n"
+            "spark: 1\\n"
+            "task: 1\\n"
+            "writing the results to /mnt/vol1/outputs\\n'",
+            str(container_log))
+
+    def __create_conf(self, task_id):
+        return {
+            'task': task_id,
+            'cmd': 'foo bar',
+            'image': self.__IMAGE_NAME,
+            'source': 'baz',
+            'no_cache': True,
+        }
+
+    @staticmethod
+    def __temp_dir():
+        temp_dir = tempfile.mkdtemp()
+        return temp_dir
+
+    @staticmethod
+    def __remove_dir(temp_dir):
+        shutil.rmtree(temp_dir, ignore_errors=True)
diff --git a/liminal/runners/airflow/tasks/hadoop.py b/tests/runners/airflow/executors/test_airflow_executor.py
similarity index 60%
copy from liminal/runners/airflow/tasks/hadoop.py
copy to tests/runners/airflow/executors/test_airflow_executor.py
index 6003ce7..6f5fd8b 100644
--- a/liminal/runners/airflow/tasks/hadoop.py
+++ b/tests/runners/airflow/executors/test_airflow_executor.py
@@ -16,19 +16,23 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from abc import ABC, abstractmethod
+from unittest import TestCase
+from unittest.mock import MagicMock
 
-from liminal.runners.airflow.model import task
+from liminal.runners.airflow.executors import airflow
+from liminal.runners.airflow.tasks.airflow import AirflowTask
 
 
-class HadoopTask(task.Task, ABC):
+class TestAirflowExecutorTask(TestCase):
     """
-    Hadoop task
+    Test AirflowExecutor task
     """
 
-    def apply_task_to_dag(self):
-        return self.executor.apply_task_to_dag(task=self, parent=self.parent)
+    def test_apply_task_to_dag(self):
+        task0 = MagicMock(spec=AirflowTask)
 
-    @abstractmethod
-    def get_runnable_command(self):
-        pass
+        task0.apply_task_to_dag = MagicMock()
+
+        airflow.AirflowExecutor("executor-task", {}, {}).apply_task_to_dag(task=task0)
+
+        task0.apply_task_to_dag.assert_called_once()
diff --git a/tests/runners/airflow/tasks/test_create_cloudformation_stack.py b/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
index dc8e5bc..58315e1 100644
--- a/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
+++ b/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
@@ -23,6 +23,7 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python_operator import BranchPythonOperator
 from moto import mock_cloudformation
 
+from liminal.runners.airflow.executors import airflow
 from liminal.runners.airflow.operators.cloudformation import CloudFormationCreateStackOperator, \
     CloudFormationCreateStackSensor, CloudFormationHook
 from liminal.runners.airflow.tasks.create_cloudformation_stack import CreateCloudFormationStackTask
@@ -80,7 +81,8 @@ class TestCreateCloudFormationStackTask(TestCase):
                 task_config=self.config
             )
 
-        self.create_cloudformation_task.apply_task_to_dag()
+        airflow.AirflowExecutor("airflow-executor", {}, {}).apply_task_to_dag(
+            task=self.create_cloudformation_task)
 
     def test_apply_task_to_dag(self):
         self.assertEqual(len(self.dag.tasks), 4)
diff --git a/tests/runners/airflow/tasks/test_job_end.py b/tests/runners/airflow/tasks/test_job_end.py
index d3e1444..4e1148c 100644
--- a/tests/runners/airflow/tasks/test_job_end.py
+++ b/tests/runners/airflow/tasks/test_job_end.py
@@ -19,6 +19,7 @@
 import unittest
 from unittest import TestCase
 
+from liminal.runners.airflow.executors import airflow
 from liminal.runners.airflow.tasks import job_end
 from tests.util import dag_test_utils
 
@@ -38,7 +39,7 @@ class TestJobEndTask(TestCase):
             trigger_rule='all_done',
             liminal_config={'metrics': {'namespace': 'EndJobNameSpace', 'backends': ['cloudwatch']}}
         )
-        task0.apply_task_to_dag()
+        airflow.AirflowExecutor("airflow-executor", {}, {}).apply_task_to_dag(task=task0)
 
         self.assertEqual(len(dag.tasks), 1)
         dag_task0 = dag.tasks[0]
@@ -53,9 +54,11 @@ class TestJobEndTask(TestCase):
         dag = dag_test_utils.create_dag()
 
         task0 = job_end.JobEndTask(task_id="job_end", dag=dag,
-                                   pipeline_config={'pipeline': 'my_end_pipeline'}, liminal_config=conf, parent=None,
+                                   pipeline_config={'pipeline': 'my_end_pipeline'},
+                                   liminal_config=conf, parent=None,
                                    trigger_rule='all_done', task_config={})
-        task0.apply_task_to_dag()
+
+        airflow.AirflowExecutor("airflow-executor", {}, {}).apply_task_to_dag(task=task0)
 
         self.assertEqual(len(dag.tasks), 1)
         dag_task0 = dag.tasks[0]
diff --git a/tests/runners/airflow/tasks/test_job_start.py b/tests/runners/airflow/tasks/test_job_start.py
index b799544..97faaa0 100644
--- a/tests/runners/airflow/tasks/test_job_start.py
+++ b/tests/runners/airflow/tasks/test_job_start.py
@@ -19,6 +19,7 @@
 import unittest
 from unittest import TestCase
 
+from liminal.runners.airflow.executors import airflow
 from liminal.runners.airflow.tasks import job_start
 from tests.util import dag_test_utils
 
@@ -32,7 +33,8 @@ class TestJobStartTask(TestCase):
         task0 = job_start.JobStartTask(
             task_id="start_task",
             dag=dag,
-            liminal_config={'metrics': {'namespace': 'StartJobNameSpace', 'backends': ['cloudwatch']}},
+            liminal_config={
+                'metrics': {'namespace': 'StartJobNameSpace', 'backends': ['cloudwatch']}},
             pipeline_config={'pipeline': 'my_start_pipeline'},
             task_config={},
             parent=None,
@@ -61,7 +63,7 @@ class TestJobStartTask(TestCase):
             pipeline_config={'pipeline': 'my_end_pipeline'},
             parent=None,
             trigger_rule='all_success')
-        task0.apply_task_to_dag()
+        airflow.AirflowExecutor("airflow-executor", {}, {}).apply_task_to_dag(task=task0)
 
         self.assertEqual(len(dag.tasks), 1)
         dag_task0 = dag.tasks[0]
@@ -75,12 +77,13 @@ class TestJobStartTask(TestCase):
 
         task0 = job_start.JobStartTask(task_id="start_task",
                                        dag=dag,
-                                       liminal_config={'metrics': {'namespace': 'StartJobNameSpace'}},
+                                       liminal_config={
+                                           'metrics': {'namespace': 'StartJobNameSpace'}},
                                        pipeline_config={'pipeline': 'my_start_pipeline'},
                                        task_config={},
                                        parent=None,
                                        trigger_rule='all_success', )
-        task0.apply_task_to_dag()
+        airflow.AirflowExecutor("airflow-executor", {}, {}).apply_task_to_dag(task=task0)
 
         self.assertEqual(len(dag.tasks), 1)
         dag_task0 = dag.tasks[0]
diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py
index 10dab4d..41f8379 100644
--- a/tests/runners/airflow/tasks/test_python.py
+++ b/tests/runners/airflow/tasks/test_python.py
@@ -16,7 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import logging
 import os
 import tempfile
 import unittest
@@ -67,7 +66,16 @@ class TestPythonTask(TestCase):
                                               'NUM_FILES': 10,
                                               'NUM_SPLITS': 3
                                           })
-        task0.apply_task_to_dag()
+        executor = KubernetesPodExecutor(
+            task_id='k8s',
+            liminal_config=self.liminal_config,
+            executor_config={
+                'executor': 'k8s',
+                'name': 'mypod'
+            }
+        )
+
+        executor.apply_task_to_dag(task=task0)
 
         task1 = self.__create_python_task(dag,
                                           'my_output_task',
@@ -75,7 +83,7 @@ class TestPythonTask(TestCase):
                                           'my_parallelized_python_task_img',
                                           'python -u write_outputs.py',
                                           executors=3)
-        task1.apply_task_to_dag()
+        executor.apply_task_to_dag(task=task1)
 
         for task in dag.tasks:
             print(f'Executing task {task.task_id}')
@@ -119,7 +127,7 @@ class TestPythonTask(TestCase):
                              cmd,
                              env_vars=None,
                              executors=None):
-    
+
         self.liminal_config['volumes'] = [
             {
                 'volume': self._VOLUME_NAME,
@@ -164,15 +172,7 @@ class TestPythonTask(TestCase):
             },
             task_config=task_config,
             parent=parent,
-            trigger_rule='all_success',
-            executor=KubernetesPodExecutor(
-                task_id='k8s',
-                liminal_config=self.liminal_config,
-                executor_config={
-                    'executor': 'k8s',
-                    'name': 'mypod'
-                }
-            ))
+            trigger_rule='all_success')
 
 
 if __name__ == '__main__':
diff --git a/tests/runners/airflow/tasks/test_spark_task.py b/tests/runners/airflow/tasks/test_spark_task.py
index 5223ba7..b0ae799 100644
--- a/tests/runners/airflow/tasks/test_spark_task.py
+++ b/tests/runners/airflow/tasks/test_spark_task.py
@@ -15,10 +15,17 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import os
+import tempfile
 from unittest import TestCase
 
+from liminal.build import liminal_apps_builder
+from liminal.kubernetes import volume_util
 from liminal.runners.airflow import DummyDag
+from liminal.runners.airflow.executors.kubernetes import KubernetesPodExecutor
 from liminal.runners.airflow.tasks.spark import SparkTask
+from tests.util import dag_test_utils
 
 
 class TestSparkTask(TestCase):
@@ -26,6 +33,89 @@ class TestSparkTask(TestCase):
     Test Spark Task
     """
 
+    _VOLUME_NAME = 'myvol1'
+
+    def test_spark_on_k8s(self):
+        volume_util.delete_local_volume(self._VOLUME_NAME)
+        os.environ['TMPDIR'] = '/tmp'
+        self.temp_dir = tempfile.mkdtemp()
+        self.liminal_config = {
+            'volumes': [
+                {
+                    'volume': self._VOLUME_NAME,
+                    'local': {
+                        'path': self.temp_dir.replace(
+                            "/var/folders",
+                            "/private/var/folders"
+                        )
+                    }
+                }
+            ]
+        }
+        volume_util.create_local_volumes(self.liminal_config, None)
+
+        # build spark image
+        liminal_apps_builder.build_liminal_apps(
+            os.path.join(os.path.dirname(__file__), '../../apps/test_spark_app'))
+
+        outputs_dir = os.path.join(self.temp_dir, 'outputs')
+
+        task_config = {
+            'task': "my_spark_task",
+            'image': "my_spark_image",
+            'application_source': 'wordcount.py',
+            'application_arguments': ['words.txt', '/mnt/vol1/outputs/'],
+            'env_vars': {},
+            'mounts': [
+                {
+                    'mount': 'mymount',
+                    'volume': self._VOLUME_NAME,
+                    'path': '/mnt/vol1'
+                }
+            ]
+        }
+
+        dag = dag_test_utils.create_dag()
+
+        task1 = SparkTask(
+            task_id="my_spark_task",
+            dag=dag,
+            liminal_config=self.liminal_config,
+            pipeline_config={
+                'pipeline': 'my_pipeline'
+            },
+            task_config=task_config,
+            parent=None,
+            trigger_rule='all_success')
+
+        executor = KubernetesPodExecutor(
+            task_id='k8s',
+            liminal_config=self.liminal_config,
+            executor_config={
+                'executor': 'k8s',
+                'name': 'mypod'
+            }
+        )
+        executor.apply_task_to_dag(task=task1)
+
+        for task in dag.tasks:
+            print(f'Executing task {task.task_id}')
+            task.execute(DummyDag('my_dag', task.task_id).context)
+
+        expected_output = '{"word":"my","count":1}\n' \
+                          '{"word":"first","count":1}\n' \
+                          '{"word":"liminal","count":1}\n' \
+                          '{"word":"spark","count":1}\n' \
+                          '{"word":"task","count":1}\n'.split("\n")
+
+        actual = ''
+        for filename in os.listdir(outputs_dir):
+            if filename.endswith(".json"):
+                with open(os.path.join(outputs_dir, filename)) as f:
+                    actual = f.read()
+
+        self.assertEqual(actual.split("\n"), expected_output)
+
     def test_get_runnable_command(self):
         task_config = {
             'application_source': 'my_app.py',
@@ -37,18 +127,24 @@ class TestSparkTask(TestCase):
                 'spark.yarn.executor.memoryOverhead': '500M'
             },
             'application_arguments': {
-                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
+                '--query': "select * from "
+                           "my_table where date_prt >= "
                            "'{{yesterday_ds}}'",
                 '--output': 'mytable'
             }
         }
 
-        expected = ['spark-submit', '--master', 'yarn', '--class', 'org.apache.liminal.MySparkApp', '--conf',
-                    'spark.driver.memory=1g', '--conf', 'spark.driver.maxResultSize=1g', '--conf',
+        expected = ['spark-submit',
+                    '--master',
+                    'yarn',
+                    '--class',
+                    'org.apache.liminal.MySparkApp',
+                    '--conf',
+                    'spark.driver.maxResultSize=1g', '--conf', 'spark.driver.memory=1g', '--conf',
                     'spark.yarn.executor.memoryOverhead=500M', 'my_app.py',
                     '--query',
-                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >="
-                    " '{{yesterday_ds}}'",
+                    "select * from my_table where "
+                    "date_prt >= '{{yesterday_ds}}'",
                     '--output', 'mytable']
 
         actual = SparkTask(
@@ -57,7 +153,7 @@ class TestSparkTask(TestCase):
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
@@ -67,17 +163,17 @@ class TestSparkTask(TestCase):
         task_config = {
             'application_source': 'my_app.py',
             'application_arguments': {
-                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
-                           "'{{yesterday_ds}}'",
-                '--output': 'mytable'
+                '--query': "select * from my_table where"
+                           " date_prt >= '{{yesterday_ds}}'",
+                '--output': 'myoutputtable'
             }
         }
 
         expected = ['spark-submit', 'my_app.py',
                     '--query',
-                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >="
-                    " '{{yesterday_ds}}'",
-                    '--output', 'mytable']
+                    "select * from my_table where "
+                    "date_prt >= '{{yesterday_ds}}'",
+                    '--output', 'myoutputtable']
 
         actual = SparkTask(
             'my_spark_task',
@@ -85,11 +181,11 @@ class TestSparkTask(TestCase):
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
-        self.assertEqual(actual, expected)
+        self.assertEqual(actual.sort(), expected.sort())
 
     def test_partially_missing_spark_arguments(self):
         task_config = {
@@ -101,9 +197,9 @@ class TestSparkTask(TestCase):
                 'spark.yarn.executor.memoryOverhead': '500M'
             },
             'application_arguments': {
-                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
-                           "'{{yesterday_ds}}'",
-                '--output': 'mytable'
+                '--query': "select * from my_table where "
+                           "date_prt >= '{{yesterday_ds}}'",
+                '--output': 'myoutputtable'
             }
         }
 
@@ -111,17 +207,17 @@ class TestSparkTask(TestCase):
                     '--class',
                     'org.apache.liminal.MySparkApp',
                     '--conf',
-                    'spark.driver.memory=1g',
-                    '--conf',
                     'spark.driver.maxResultSize=1g',
                     '--conf',
+                    'spark.driver.memory=1g',
+                    '--conf',
                     'spark.yarn.executor.memoryOverhead=500M',
                     'my_app.py',
                     '--query',
-                    'select * from dlk_visitor_funnel_dwh_staging.fact_events where '
-                    "unified_Date_prt >= '{{yesterday_ds}}'",
+                    'select * from my_table where '
+                    "date_prt >= '{{yesterday_ds}}'",
                     '--output',
-                    'mytable'].sort()
+                    'myoutputtable']
 
         actual = SparkTask(
             'my_spark_task',
@@ -129,8 +225,8 @@ class TestSparkTask(TestCase):
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
-        self.assertEqual(actual.sort(), expected)
+        self.assertEqual(actual.sort(), expected.sort())
diff --git a/tests/runners/apps/test_app/extra/liminal.yml b/tests/runners/apps/test_app/extra/liminal.yml
index 546d16a..3bf8489 100644
--- a/tests/runners/apps/test_app/extra/liminal.yml
+++ b/tests/runners/apps/test_app/extra/liminal.yml
@@ -47,6 +47,7 @@ pipeline_defaults:
       description: optional sub tasks
     - task: my_task_output_input_task
       type: python
+      no_cache: True
       description: task with input from other task's output
       image: my_task_output_input_task_image
       env_vars:
diff --git a/.gitignore b/tests/runners/apps/test_spark_app/__init__.py
similarity index 85%
copy from .gitignore
copy to tests/runners/apps/test_spark_app/__init__.py
index fcf8bcd..217e5db 100644
--- a/.gitignore
+++ b/tests/runners/apps/test_spark_app/__init__.py
@@ -15,18 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
diff --git a/liminal/core/config/defaults/base/liminal.yml b/tests/runners/apps/test_spark_app/liminal.yml
similarity index 61%
copy from liminal/core/config/defaults/base/liminal.yml
copy to tests/runners/apps/test_spark_app/liminal.yml
index 729ba22..ff3071a 100644
--- a/liminal/core/config/defaults/base/liminal.yml
+++ b/tests/runners/apps/test_spark_app/liminal.yml
@@ -16,22 +16,24 @@
 # specific language governing permissions and limitations
 # under the License.
 ---
-name: base
-type: super
-executors:
-  - executor: default_k8s
-    type: kubernetes
-service_defaults:
-  description: add defaults parameters for all services
-task_defaults:
-  description: add defaults parameters for all tasks separate by task type
-  python:
-    executor: default_k8s
-pipeline_defaults:
-  description: add defaults parameters for all pipelines
-  before_tasks:
-    - task: start
-      type: job_start
-  after_tasks:
-    - task: end
-      type: job_end
\ No newline at end of file
+name: MyPipeline
+images:
+  - image: my_spark_image
+    type: spark
+    source: wordcount
+    no_cache: True
+pipelines:
+  - pipeline: my_pipeline
+    owner: Bosco Albert Baracus
+    start_date: 1970-01-01
+    timeout_minutes: 45
+    schedule: 0 * 1 * *
+    tasks:
+      - task: my_test_spark_task
+        type: spark
+        description: spark task on k8s
+        image: my_spark_image
+        executors: 2
+        application_source: wordcount.py
+        application_arguments:
+          - "words.txt"
diff --git a/.gitignore b/tests/runners/apps/test_spark_app/wordcount/__init__.py
similarity index 85%
copy from .gitignore
copy to tests/runners/apps/test_spark_app/wordcount/__init__.py
index fcf8bcd..217e5db 100644
--- a/.gitignore
+++ b/tests/runners/apps/test_spark_app/wordcount/__init__.py
@@ -15,18 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
diff --git a/.gitignore b/tests/runners/apps/test_spark_app/wordcount/requirements.txt
similarity index 85%
copy from .gitignore
copy to tests/runners/apps/test_spark_app/wordcount/requirements.txt
index fcf8bcd..b73ac5f 100644
--- a/.gitignore
+++ b/tests/runners/apps/test_spark_app/wordcount/requirements.txt
@@ -16,17 +16,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
+pyyaml==5.3.1
diff --git a/tests/runners/apps/test_spark_app/wordcount/wordcount.py b/tests/runners/apps/test_spark_app/wordcount/wordcount.py
new file mode 100644
index 0000000..b7537a3
--- /dev/null
+++ b/tests/runners/apps/test_spark_app/wordcount/wordcount.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+from operator import add
+
+from pyspark.sql import SparkSession
+import yaml
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        print("Usage: wordcount <file> output <outputlocation>", file=sys.stderr)
+        sys.exit(-1)
+
+    spark = SparkSession \
+        .builder \
+        .appName("PythonWordCount") \
+        .getOrCreate()
+
+    lines = spark.read.text(sys.argv[1]).rdd.filter(lambda x: not x[0].startswith('#')).map(
+        lambda r: r[0])
+    counts = lines.flatMap(lambda x: x.split(' ')) \
+        .map(lambda x: (x, 1)) \
+        .reduceByKey(add)
+    output = counts.collect()
+    for (word, count) in output:
+        print("%s: %i" % (word, count))
+
+    print("writing the results to {}".format(sys.argv[2]))
+    counts.toDF(["word", "count"]).coalesce(1).write.mode("overwrite").json(sys.argv[2])
+
+    spark.stop()
diff --git a/.gitignore b/tests/runners/apps/test_spark_app/wordcount/words.txt
similarity index 85%
copy from .gitignore
copy to tests/runners/apps/test_spark_app/wordcount/words.txt
index fcf8bcd..0a4b90b 100644
--- a/.gitignore
+++ b/tests/runners/apps/test_spark_app/wordcount/words.txt
@@ -15,18 +15,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
+my first liminal spark task
\ No newline at end of file
diff --git a/tests/test_licenses.py b/tests/test_licenses.py
index 76fef79..b48d731 100644
--- a/tests/test_licenses.py
+++ b/tests/test_licenses.py
@@ -22,8 +22,8 @@ from unittest import TestCase
 
 from termcolor import colored
 
-EXCLUDED_EXTENSIONS = ['.gif', '.png', '.pyc', 'LICENSE', 'DISCLAIMER', 'NOTICE']
-EXCLUDED_DIRS = ['docs/build', '.git', '.idea']
+EXCLUDED_EXTENSIONS = ['.gif', '.png', '.pyc', 'LICENSE', 'DISCLAIMER', 'NOTICE', '.whl']
+EXCLUDED_DIRS = ['docs/build', '.git', '.idea', 'venv', 'apache_liminal.egg-info']
 
 PYTHON_LICENSE_HEADER = """
 #