You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@liminal.apache.org by av...@apache.org on 2021/07/28 07:53:09 UTC
[incubator-liminal] branch master updated: [LIMINAL-56] add default
executor (k8s) for spark
This is an automated email from the ASF dual-hosted git repository.
aviemzur pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
The following commit(s) were added to refs/heads/master by this push:
new e8c18dc [LIMINAL-56] add default executor (k8s) for spark
e8c18dc is described below
commit e8c18dc2827141dc6c2e45366d3f5e281a37dd50
Author: Zion Rubin <zi...@naturalint.com>
AuthorDate: Wed Jul 28 10:53:03 2021 +0300
[LIMINAL-56] add default executor (k8s) for spark
---
.gitignore | 1 +
docs/getting-started/spark_app_demo.md | 344 +++++++++++++++++++++
docs/nstatic/spark-demo-app/k8s_dag.png | Bin 0 -> 42631 bytes
.../aws-ml-app-demo/manifests/aws-ml-app-demo.yaml | 3 +-
.../spark-app-demo/k8s/__init__.py | 15 -
.../spark-app-demo/k8s/archetype}/liminal.yml | 36 ++-
examples/spark-app-demo/k8s/data/iris.csv | 168 ++++++++++
.../spark-app-demo/k8s/data_cleanup.py | 34 +-
examples/spark-app-demo/k8s/liminal.yml | 89 ++++++
examples/spark-app-demo/k8s/model_store.py | 59 ++++
.../spark-app-demo/k8s/requirements.txt | 17 +-
examples/spark-app-demo/k8s/serving.py | 53 ++++
examples/spark-app-demo/k8s/training.py | 99 ++++++
.gitignore => liminal/build/image/spark/Dockerfile | 22 +-
.../build/image/spark/__init__.py | 15 -
.../tasks/hadoop.py => build/image/spark/spark.py} | 21 +-
liminal/core/config/config.py | 14 +
liminal/core/config/defaults/base/liminal.yml | 8 +
.../runners/airflow/dag/liminal_register_dags.py | 23 +-
.../{tasks/hadoop.py => executors/airflow.py} | 17 +-
liminal/runners/airflow/executors/emr.py | 4 +-
liminal/runners/airflow/executors/kubernetes.py | 5 +-
liminal/runners/airflow/model/executor.py | 9 +-
liminal/runners/airflow/model/task.py | 7 +-
.../airflow/tasks/{hadoop.py => airflow.py} | 9 +-
liminal/runners/airflow/tasks/containerable.py | 7 +-
.../airflow/tasks/create_cloudformation_stack.py | 8 +-
.../airflow/tasks/delete_cloudformation_stack.py | 8 +-
liminal/runners/airflow/tasks/hadoop.py | 3 -
liminal/runners/airflow/tasks/job_end.py | 8 +-
liminal/runners/airflow/tasks/job_start.py | 8 +-
liminal/runners/airflow/tasks/spark.py | 18 +-
liminal/runners/airflow/tasks/sql.py | 10 +-
scripts/liminal | 21 +-
tests/liminal/core/config/test_config.py | 200 +++++++-----
.../runners/airflow/build/spark/__init__.py | 15 -
.../build/spark/test_spark_image_builder.py | 110 +++++++
.../airflow/executors/test_airflow_executor.py | 22 +-
.../tasks/test_create_cloudformation_stack.py | 4 +-
tests/runners/airflow/tasks/test_job_end.py | 9 +-
tests/runners/airflow/tasks/test_job_start.py | 11 +-
tests/runners/airflow/tasks/test_python.py | 26 +-
tests/runners/airflow/tasks/test_spark_task.py | 144 +++++++--
tests/runners/apps/test_app/extra/liminal.yml | 1 +
.../runners/apps/test_spark_app/__init__.py | 15 -
.../runners/apps/test_spark_app}/liminal.yml | 40 +--
.../apps/test_spark_app/wordcount/__init__.py | 15 -
.../apps/test_spark_app/wordcount/requirements.txt | 15 +-
.../apps/test_spark_app/wordcount/wordcount.py | 47 +++
.../apps/test_spark_app/wordcount/words.txt | 16 +-
tests/test_licenses.py | 4 +-
51 files changed, 1455 insertions(+), 402 deletions(-)
diff --git a/.gitignore b/.gitignore
index fcf8bcd..dfaa502 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,6 +21,7 @@ bin
include
lib
venv
+/build
.Python
*.pyc
pip-selfcheck.json
diff --git a/docs/getting-started/spark_app_demo.md b/docs/getting-started/spark_app_demo.md
new file mode 100644
index 0000000..8755651
--- /dev/null
+++ b/docs/getting-started/spark_app_demo.md
@@ -0,0 +1,344 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Getting started / ***Spark application***
+
+* [Setup your local environment](#Setup-your-local-environment)
+* [Spark on K8S](#Spark-On-K8S)
+ * [Setup liminal](#setup-liminal)
+ * [Liminal YAML walkthrough](#Liminal-YAML-walkthrough)
+ * [Evaluate the Iris Classification model](#Evaluate-the-iris-classification-model)
+ * [Debugging Kubernetes Deployments](#Debugging-Kubernetes-Deployments)
+
+* [Closing up](#Closing-up)
+
+In this tutorial, we will guide you through setting up Apache Liminal on your local machine and run
+a simple machine-learning workflow, based on the classic Iris dataset classification example
+including a feature engineering with Spark engine. \
+More details in
+this [link](https://scikit-learn.org/stable/auto_examples/datasets/plot_iris_dataset.html).
+
+#### Prerequisites
+
+* [Python 3 (3.6 and up)](https://www.python.org/downloads)
+* [Python Virtual Environments](https://pypi.org/project/virtualenv)
+* [Docker Desktop](https://www.docker.com/products/docker-desktop)
+* [Kubernetes CLI (kubectl)](https://kubernetes.io/docs/tasks/tools/install-kubectl-macos)
+
+*Note: Make sure kubernetes cluster is running in docker desktop*
+
+## Setup your local env environment
+
+In the dev folder, clone the example code from liminal:
+
+```BASH
+git clone https://github.com/apache/incubator-liminal
+```
+
+***Note:*** *You just cloned the entire Liminal Project, you actually only need examples folder.*
+
+Create a python virtual environment to isolate your runs:
+
+```BASH
+cd incubator-liminal/examples/spark-app-demo/
+python3 -m venv env
+```
+
+Activate your virtual environment:
+
+```BASH
+source env/bin/activate
+```
+
+Now we are ready to install liminal:
+
+```BASH
+pip install apache-liminal
+```
+
+## Spark On K8S
+
+We will define the following steps and services to implement the Iris classification example
+including a simple feature engineering with Apache Spark: \
+Clean data, Train, Validate & Deploy - Cleaning the input data set and prepare it for training and
+validation execution is managed by Liminal Airflow extension. The training task trains a regression
+model using a public dataset. \
+We then validate the model and deploy it to a model-store in mounted volume. \
+Inference - online inference is done using a Python Flask service running on the local Kubernetes in
+docker desktop. The service exposes the `/predict` endpoint. It reads the model stored in the
+mounted drive and uses it to evaluate the request.
+
+### Setup liminal
+
+```BASH
+cd incubator-liminal/examples/spark-app-demo/k8s
+```
+
+#### Liminal build
+
+The build will create docker images based on the liminal.yml file in the `images` section.
+
+```BASH
+liminal build
+```
+
+#### Liminal create
+
+All tasks use a mounted volume as defined in the pipeline YAML. \
+In our case the mounted volume will point to the liminal Iris Classification example. The training
+task trains a regression model using a public dataset. We then validate the model and deploy it to a
+model-store in the mounted volume.
+
+Create a kubernetes local volume:
+
+```BASH
+liminal create
+```
+
+#### Liminal deploy
+
+The deploy command deploys a liminal server and deploys any liminal.yml files in your working
+directory or any of its subdirectories to your liminal home directory.
+
+```BASH
+liminal deploy --clean
+```
+
+*Note: liminal home directory is located in the path defined in LIMINAL_HOME env variable. If the
+LIMINAL_HOME environemnet variable is not defined, home directory defaults to
+~/liminal_home directory.*
+
+#### Liminal start
+
+The start command spins up 3 containers that load the Apache Airflow stack. Liminal's Airflow
+extension is responsible to execute the workflows defined in the liminal.yml file as standard
+Airflow DAGs.
+
+```BASH
+liminal start
+```
+
+You can go to graph view to see all the tasks configured in the liminal.yml file:
+[http://localhost:8080/admin/airflow/graph?dag_id=my_first_spark_pipeline](
+http://localhost:8080/admin/airflow/graph?dag_id=my_first_spark_pipeline
+)
+
+You should see the following dag:
+
+![](../nstatic/spark-demo-app/k8s_dag.png)
+
+### Liminal YAML walkthrough
+
+* [Local archetype](#Local-archetype)
+* [Pipeline flow](#Pipeline-flow)
+
+#### Local archetype
+
+A superliminal for an easy local development
+
+```YAML
+name: InfraSpark
+owner: Bosco Albert Baracus
+type: super
+executors:
+ - executor: k8s
+ type: kubernetes
+variables:
+ output_root_dir: /mnt/gettingstartedvol
+ input_root_dir: ''
+images:
+ - image: my_spark_image
+ source: .
+ no_cache: True
+task_defaults:
+ spark:
+ executor: k8s
+ executors: 2
+ application_source: '{{application}}'
+ mounts:
+ - mount: mymount
+ volume: gettingstartedvol
+ path: /mnt/gettingstartedvol
+```
+
+###### We specify the `MOUNT_PATH` in which we store the trained model.
+
+You can read more about `superliminal` `variables` and `defaults`
+in [advanced.liminal.yml](../liminal/advanced.liminal.yml.md)
+
+#### Pipeline flow
+
+Declaration of the pipeline tasks flow in your liminal YAML:
+
+```YAML
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+ output_path: '{{output_root_dir}}/my_first_liminal_spark_app_outputs/'
+ application: data_cleanup.py
+task_defaults:
+ python:
+ mounts:
+ - mount: mymount
+ volume: gettingstartedvol
+ path: /mnt/gettingstartedvol
+pipelines:
+ - pipeline: my_first_spark_pipeline
+ start_date: 1970-01-01
+ timeout_minutes: 45
+ schedule: 0 * 1 * *
+ tasks:
+ - task: data_preprocessing
+ type: spark
+ description: prepare the data for training
+ application_arguments:
+ - '{{input_root_dir}}data/iris.csv'
+ - '{{output_path}}'
+ - task: train
+ type: python
+ description: train model
+ image: myorg/mydatascienceapp
+ cmd: python -u training.py train '{{output_path}}'
+ env:
+ MOUNT_PATH: /mnt/gettingstartedvol
+ ...
+```
+
+### Evaluate the iris classification model
+
+Once the Iris Classification model trainging is completed and model is deployed (to the mounted
+volume), you can launch a pod of the pre-built image which contains a flask server, by applying the
+following Kubernetes manifest configuration:
+
+```BASH
+kubectl apply -f manifests/spark-app-demo.yaml
+```
+
+Alternatively, create a Kubernetes pod from stdin:
+
+```YAML
+cat <<EOF | kubectl apply -f -
+---
+apiVersion: v1
+kind: Pod
+metadata:
+ name: spark-app-demo
+spec:
+ volumes:
+ - name: task-pv-storage
+ persistentVolumeClaim:
+ claimName: gettingstartedvol-pvc
+ containers:
+ - name: task-pv-container
+ imagePullPolicy: Never
+ image: myorg/mydatascienceapp
+ lifecycle:
+ postStart:
+ exec:
+ command: [ "/bin/bash", "-c", "apt update && apt install curl -y" ]
+ ports:
+ - containerPort: 80
+ name: "http-server"
+ volumeMounts:
+ - mountPath: "/mnt/gettingstartedvol"
+ name: task-pv-storage
+ EOF
+```
+
+Check that the service is running:
+
+```BASH
+kubectl get pods --namespace=default
+```
+
+Check that the service is up:
+
+```BASH
+kubectl exec -it --namespace=default spark-app-demo-- /bin/bash -c "curl localhost/healthcheck"
+```
+
+Check the prediction:
+
+```BASH
+kubectl exec -it --namespace=default spark-app-demo-- /bin/bash -c "curl -X POST -d '{\"petal_width\": \"2.1\"}' localhost/predict"
+```
+
+## Debugging Kubernetes Deployments
+
+kubectl get pods will help you check your pod status:
+
+```BASH
+kubectl get pods --namespace=default
+```
+
+kubectl logs will help you check your pods log:
+
+```BASH
+kubectl logs --namespace=default spark-app-demo
+```
+
+kubectl exec to get a shell to a running container:
+
+```BASH
+kubectl exec --namespace=default spark-app-demo -- bash
+```
+
+Then you can check the mounted volume `df -h` and to verify the result of the model.
+
+You can go to graph view to see all the tasks configured in the liminal.yml file:
+[http://localhost:8080/admin/airflow/graph?dag_id=my_first_pipeline](
+http://localhost:8080/admin/airflow/graph?dag_id=my_first_pipeline
+
+## Here are the entire list of commands, if you want to start from scratch:
+
+```
+git clone https://github.com/apache/incubator-liminal
+cd examples/spark-app-demo/k8s
+# cd examples/spark-app-demo/emr
+python3 -m venv env
+source env/bin/activate
+rm -rf ~/liminal_home
+pip uninstall apache-liminal
+pip install apache-liminal
+Liminal build
+Liminal create
+liminal deploy --clean
+liminal start
+```
+
+## Closing up
+
+To make sure liminal containers are stopped use:
+
+```
+liminal stop
+```
+
+To deactivate the python virtual env use:
+
+```
+deactivate
+```
+
+To terminate the kubernetes pod:
+
+```
+kubectl delete pod --namespace=default spark-app-demo
+```
diff --git a/docs/nstatic/spark-demo-app/k8s_dag.png b/docs/nstatic/spark-demo-app/k8s_dag.png
new file mode 100644
index 0000000..b0e9a6f
Binary files /dev/null and b/docs/nstatic/spark-demo-app/k8s_dag.png differ
diff --git a/examples/aws-ml-app-demo/manifests/aws-ml-app-demo.yaml b/examples/aws-ml-app-demo/manifests/aws-ml-app-demo.yaml
index 2361ee5..180df8a 100644
--- a/examples/aws-ml-app-demo/manifests/aws-ml-app-demo.yaml
+++ b/examples/aws-ml-app-demo/manifests/aws-ml-app-demo.yaml
@@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
---
+
apiVersion: v1
kind: Pod
metadata:
@@ -32,7 +33,7 @@ spec:
lifecycle:
postStart:
exec:
- command: ["/bin/bash", "-c", "apt update && apt install curl -y"]
+ command: [ "/bin/bash", "-c", "apt update && apt install curl -y" ]
ports:
- containerPort: 80
name: "http-server"
diff --git a/.gitignore b/examples/spark-app-demo/k8s/__init__.py
similarity index 85%
copy from .gitignore
copy to examples/spark-app-demo/k8s/__init__.py
index fcf8bcd..217e5db 100644
--- a/.gitignore
+++ b/examples/spark-app-demo/k8s/__init__.py
@@ -15,18 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
diff --git a/liminal/core/config/defaults/base/liminal.yml b/examples/spark-app-demo/k8s/archetype/liminal.yml
similarity index 64%
copy from liminal/core/config/defaults/base/liminal.yml
copy to examples/spark-app-demo/k8s/archetype/liminal.yml
index 729ba22..751043a 100644
--- a/liminal/core/config/defaults/base/liminal.yml
+++ b/examples/spark-app-demo/k8s/archetype/liminal.yml
@@ -16,22 +16,28 @@
# specific language governing permissions and limitations
# under the License.
---
-name: base
+# superliminal for local development
+name: InfraSpark
+owner: Bosco Albert Baracus
type: super
executors:
- - executor: default_k8s
+ - executor: k8s
type: kubernetes
-service_defaults:
- description: add defaults parameters for all services
+variables:
+ output_root_dir: /mnt/gettingstartedvol
+ input_root_dir: ''
+images:
+ - image: my_spark_image
+ source: .
+ no_cache: True
task_defaults:
- description: add defaults parameters for all tasks separate by task type
- python:
- executor: default_k8s
-pipeline_defaults:
- description: add defaults parameters for all pipelines
- before_tasks:
- - task: start
- type: job_start
- after_tasks:
- - task: end
- type: job_end
\ No newline at end of file
+ spark:
+ executor: k8s
+ image: my_spark_image
+ executors: 2
+ application_source: '{{application}}'
+ mounts:
+ - mount: mymount
+ volume: gettingstartedvol
+ path: /mnt/gettingstartedvol
+
diff --git a/examples/spark-app-demo/k8s/data/iris.csv b/examples/spark-app-demo/k8s/data/iris.csv
new file mode 100644
index 0000000..339109c
--- /dev/null
+++ b/examples/spark-app-demo/k8s/data/iris.csv
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+ignore: 150,4,setosa,versicolor,virginica,garbage
+5.1,3.5,1.4,0.2,0,21231
+4.9,3.0,1.4,0.2,0,sda
+4.7,3.2,1.3,0.2,0,2321
+4.6,3.1,1.5,0.2,0,
+5.0,3.6,1.4,0.2,0,
+5.4,3.9,1.7,0.4,0,
+4.6,3.4,1.4,0.3,0,
+5.0,3.4,1.5,0.2,0,
+4.4,2.9,1.4,0.2,0,
+4.9,3.1,1.5,0.1,0,
+5.4,3.7,1.5,0.2,0,
+4.8,3.4,1.6,0.2,0,
+4.8,3.0,1.4,0.1,0,
+4.3,3.0,1.1,0.1,0,
+5.8,4.0,1.2,0.2,0,
+5.7,4.4,1.5,0.4,0,
+5.4,3.9,1.3,0.4,0,
+5.1,3.5,1.4,0.3,0,
+5.7,3.8,1.7,0.3,0,
+5.1,3.8,1.5,0.3,0,
+5.4,3.4,1.7,0.2,0,
+5.1,3.7,1.5,0.4,0,
+4.6,3.6,1.0,0.2,0,
+5.1,3.3,1.7,0.5,0,
+4.8,3.4,1.9,0.2,0,
+5.0,3.0,1.6,0.2,0,
+5.0,3.4,1.6,0.4,0,
+5.2,3.5,1.5,0.2,0,
+5.2,3.4,1.4,0.2,0,
+4.7,3.2,1.6,0.2,0,
+4.8,3.1,1.6,0.2,0,
+5.4,3.4,1.5,0.4,0,
+5.2,4.1,1.5,0.1,0,
+5.5,4.2,1.4,0.2,0,
+4.9,3.1,1.5,0.2,0,
+5.0,3.2,1.2,0.2,0,
+5.5,3.5,1.3,0.2,0,
+4.9,3.6,1.4,0.1,0,
+4.4,3.0,1.3,0.2,0,
+5.1,3.4,1.5,0.2,0,
+5.0,3.5,1.3,0.3,0,
+4.5,2.3,1.3,0.3,0,
+4.4,3.2,1.3,0.2,0,
+5.0,3.5,1.6,0.6,0,
+5.1,3.8,1.9,0.4,0,
+4.8,3.0,1.4,0.3,0,
+5.1,3.8,1.6,0.2,0,
+4.6,3.2,1.4,0.2,0,
+5.3,3.7,1.5,0.2,0,
+5.0,3.3,1.4,0.2,0,
+7.0,3.2,4.7,1.4,1,
+6.4,3.2,4.5,1.5,1,
+6.9,3.1,4.9,1.5,1,
+5.5,2.3,4.0,1.3,1,
+6.5,2.8,4.6,1.5,1,
+5.7,2.8,4.5,1.3,1,
+6.3,3.3,4.7,1.6,1,
+4.9,2.4,3.3,1.0,1,
+6.6,2.9,4.6,1.3,1,
+5.2,2.7,3.9,1.4,1,
+5.0,2.0,3.5,1.0,1,
+5.9,3.0,4.2,1.5,1,
+6.0,2.2,4.0,1.0,1,
+6.1,2.9,4.7,1.4,1,
+5.6,2.9,3.6,1.3,1,
+6.7,3.1,4.4,1.4,1,
+5.6,3.0,4.5,1.5,1,
+5.8,2.7,4.1,1.0,1,
+6.2,2.2,4.5,1.5,1,
+5.6,2.5,3.9,1.1,1,
+5.9,3.2,4.8,1.8,1,
+6.1,2.8,4.0,1.3,1,
+6.3,2.5,4.9,1.5,1,
+6.1,2.8,4.7,1.2,1,
+6.4,2.9,4.3,1.3,1,
+6.6,3.0,4.4,1.4,1,
+6.8,2.8,4.8,1.4,1,
+6.7,3.0,5.0,1.7,1,
+6.0,2.9,4.5,1.5,1,
+5.7,2.6,3.5,1.0,1,
+5.5,2.4,3.8,1.1,1,
+5.5,2.4,3.7,1.0,1,
+5.8,2.7,3.9,1.2,1,
+6.0,2.7,5.1,1.6,1,
+5.4,3.0,4.5,1.5,1,
+6.0,3.4,4.5,1.6,1,
+6.7,3.1,4.7,1.5,1,
+6.3,2.3,4.4,1.3,1,
+5.6,3.0,4.1,1.3,1,
+5.5,2.5,4.0,1.3,1,
+5.5,2.6,4.4,1.2,1,
+6.1,3.0,4.6,1.4,1,
+5.8,2.6,4.0,1.2,1,
+5.0,2.3,3.3,1.0,1,
+5.6,2.7,4.2,1.3,1,
+5.7,3.0,4.2,1.2,1,
+5.7,2.9,4.2,1.3,1,
+6.2,2.9,4.3,1.3,1,
+5.1,2.5,3.0,1.1,1,
+5.7,2.8,4.1,1.3,1,
+6.3,3.3,6.0,2.5,2,
+5.8,2.7,5.1,1.9,2,
+7.1,3.0,5.9,2.1,2,
+6.3,2.9,5.6,1.8,2,
+6.5,3.0,5.8,2.2,2,
+7.6,3.0,6.6,2.1,2,
+4.9,2.5,4.5,1.7,2,
+7.3,2.9,6.3,1.8,2,
+6.7,2.5,5.8,1.8,2,
+7.2,3.6,6.1,2.5,2,
+6.5,3.2,5.1,2.0,2,
+6.4,2.7,5.3,1.9,2,
+6.8,3.0,5.5,2.1,2,
+5.7,2.5,5.0,2.0,2,
+5.8,2.8,5.1,2.4,2,
+6.4,3.2,5.3,2.3,2,
+6.5,3.0,5.5,1.8,2,
+7.7,3.8,6.7,2.2,2,
+7.7,2.6,6.9,2.3,2,
+6.0,2.2,5.0,1.5,2,
+6.9,3.2,5.7,2.3,2,
+5.6,2.8,4.9,2.0,2,
+7.7,2.8,6.7,2.0,2,
+6.3,2.7,4.9,1.8,2,
+6.7,3.3,5.7,2.1,2,
+7.2,3.2,6.0,1.8,2,
+6.2,2.8,4.8,1.8,2,
+6.1,3.0,4.9,1.8,2,
+6.4,2.8,5.6,2.1,2,
+7.2,3.0,5.8,1.6,2,
+7.4,2.8,6.1,1.9,2,
+7.9,3.8,6.4,2.0,2,
+6.4,2.8,5.6,2.2,2,
+6.3,2.8,5.1,1.5,2,
+6.1,2.6,5.6,1.4,2,
+7.7,3.0,6.1,2.3,2,
+6.3,3.4,5.6,2.4,2,
+6.4,3.1,5.5,1.8,2,
+6.0,3.0,4.8,1.8,2,
+6.9,3.1,5.4,2.1,2,
+6.7,3.1,5.6,2.4,2,
+6.9,3.1,5.1,2.3,2,
+5.8,2.7,5.1,1.9,2,
+6.8,3.2,5.9,2.3,2,
+6.7,3.3,5.7,2.5,2,
+6.7,3.0,5.2,2.3,2,
+6.3,2.5,5.0,1.9,2,
+6.5,3.0,5.2,2.0,2,
+6.2,3.4,5.4,2.3,2,
+5.9,3.0,5.1,1.8,2,
\ No newline at end of file
diff --git a/liminal/runners/airflow/model/executor.py b/examples/spark-app-demo/k8s/data_cleanup.py
similarity index 52%
copy from liminal/runners/airflow/model/executor.py
copy to examples/spark-app-demo/k8s/data_cleanup.py
index 4cb41d9..a9446ae 100644
--- a/liminal/runners/airflow/model/executor.py
+++ b/examples/spark-app-demo/k8s/data_cleanup.py
@@ -16,25 +16,25 @@
# specific language governing permissions and limitations
# under the License.
-from abc import ABC, abstractmethod
+import sys
+from pyspark.sql import SparkSession
-class Executor(ABC):
- """
- Executor Task.
- """
- # list of task types supported by the executor
- supported_task_types = []
+if __name__ == "__main__":
+ if len(sys.argv) != 3:
+ print("Usage: source <file> destination <dest>", file=sys.stderr)
+ sys.exit(-1)
- def __init__(self, executor_id, liminal_config, executor_config):
- self.liminal_config = liminal_config
- self.executor_id = executor_id
- self.executor_config = executor_config
+ spark = SparkSession \
+ .builder \
+ .appName("CleanData") \
+ .getOrCreate()
- @abstractmethod
- def apply_task_to_dag(self, **kwargs):
- pass
+ spark.read.text(sys.argv[1]).rdd.filter(lambda x: not x[0].startswith('#')) \
+ .filter(lambda r: not r[0].startswith('ignore')) \
+ .map(lambda r: r[0]).map(
+ lambda r: (
+ r.split(',')[0], r.split(',')[1], r.split(',')[2], r.split(',')[3], r.split(',')[4])) \
+ .toDF().coalesce(1).write.mode("overwrite").option("header", "false").csv(sys.argv[2])
- def _validate_task_type(self, task):
- assert any([isinstance(task, tYp) for tYp in self.supported_task_types]), \
- f'supported task types: {self.supported_task_types}'
+ spark.stop()
diff --git a/examples/spark-app-demo/k8s/liminal.yml b/examples/spark-app-demo/k8s/liminal.yml
new file mode 100644
index 0000000..93e8679
--- /dev/null
+++ b/examples/spark-app-demo/k8s/liminal.yml
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+ output_path: '{{output_root_dir}}/my_first_liminal_spark_app_outputs/'
+ application: data_cleanup.py
+images:
+ - image: myorg/mydatascienceapp
+ type: python_server
+ source: .
+ endpoints:
+ - endpoint: /predict
+ module: serving
+ function: predict
+ - endpoint: /healthcheck
+ module: serving
+ function: healthcheck
+ - endpoint: /version
+ module: serving
+ function: version
+task_defaults:
+ python:
+ mounts:
+ - mount: mymount
+ volume: gettingstartedvol
+ path: /mnt/gettingstartedvol
+pipelines:
+ - pipeline: my_first_spark_pipeline
+ start_date: 1970-01-01
+ timeout_minutes: 45
+ schedule: 0 * 1 * *
+ tasks:
+ - task: data_preprocessing
+ type: spark
+ description: prepare the data for training
+ application_arguments:
+ - '{{input_root_dir}}data/iris.csv'
+ - '{{output_path}}'
+ - task: train
+ type: python
+ description: train model
+ image: myorg/mydatascienceapp
+ cmd: python -u training.py train '{{output_path}}'
+ env:
+ MOUNT_PATH: /mnt/gettingstartedvol
+ - task: validate
+ type: python
+ description: validate model and deploy
+ image: myorg/mydatascienceapp
+ cmd: python -u training.py validate
+ env:
+ MOUNT_PATH: /mnt/gettingstartedvol
+volumes:
+ - volume: gettingstartedvol
+ claim_name: gettingstartedvol-pvc
+ local:
+ path: .
+services:
+ - service:
+ name: my_datascience_server
+ type: python_server
+ description: my ds server
+ image: myorg/mydatascienceapp
+ source: .
+ endpoints:
+ - endpoint: /predict
+ module: serving
+ function: predict
+ - endpoint: /healthcheck
+ module: serving
+ function: healthcheck
\ No newline at end of file
diff --git a/examples/spark-app-demo/k8s/model_store.py b/examples/spark-app-demo/k8s/model_store.py
new file mode 100644
index 0000000..f2c8509
--- /dev/null
+++ b/examples/spark-app-demo/k8s/model_store.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pickle
+import time
+import glob
+
+import os
+
+MOUNT_PATH = os.environ.get('MOUNT_PATH', '/mnt/gettingstartedvol')
+PRODUCTION = 'production'
+CANDIDATE = 'candidate'
+
+_ONE_HOUR = 60 * 60
+
+
+class ModelStore:
+
+ def __init__(self, env):
+ self.env = env
+ self._latest_model = None
+ self._latest_version = None
+ self._last_check = time.time()
+
+ def load_latest_model(self, force=False):
+ if not self._latest_model or time.time() - self._last_check > _ONE_HOUR or force:
+ self._latest_model, self._latest_version = self._download_latest_model()
+
+ return self._latest_model, self._latest_version
+
+ def save_model(self, model, version):
+ key = 'model.p'
+ path = f'{MOUNT_PATH}/{self.env}/{version}'
+
+ os.makedirs(f'{path}', exist_ok=True)
+ pickle.dump(model, open(f'{path}/{key}', "wb"))
+
+ def _download_latest_model(self):
+ objects = (glob.glob(f'{MOUNT_PATH}/{self.env}/**/*'))
+ models = list(reversed(sorted([obj for obj in objects if obj.endswith('.p')])))
+ latest_key = models[0]
+ version = latest_key.rsplit('/')[-2]
+ print(f'Loading model version {version}')
+ return pickle.load(open(latest_key, 'rb')), version
diff --git a/.gitignore b/examples/spark-app-demo/k8s/requirements.txt
similarity index 85%
copy from .gitignore
copy to examples/spark-app-demo/k8s/requirements.txt
index fcf8bcd..b6b3b87 100644
--- a/.gitignore
+++ b/examples/spark-app-demo/k8s/requirements.txt
@@ -16,17 +16,6 @@
# specific language governing permissions and limitations
# under the License.
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
+scikit-learn==0.23.2
+apache-liminal==0.0.2
+pyspark==3.0.0
diff --git a/examples/spark-app-demo/k8s/serving.py b/examples/spark-app-demo/k8s/serving.py
new file mode 100644
index 0000000..02ca58c
--- /dev/null
+++ b/examples/spark-app-demo/k8s/serving.py
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import model_store
+from model_store import ModelStore
+
+_MODEL_STORE = ModelStore(model_store.PRODUCTION)
+_PETAL_WIDTH = 'petal_width'
+
+
+def predict(input_json):
+ try:
+ input_dict = json.loads(input_json)
+ model, version = _MODEL_STORE.load_latest_model()
+ result = str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
+ return json.dumps({"result": result, "version": version})
+
+ except IndexError:
+ return 'Failure: the model is not ready yet'
+
+ except Exception as e:
+ print(e)
+ return 'Failure'
+
+
+def healthcheck(self):
+ return 'Server is up!'
+
+
+def version(self):
+ try:
+ model, version = _MODEL_STORE.load_latest_model()
+ print(f'version={version}')
+ return version
+ except Exception as e:
+ return e
diff --git a/examples/spark-app-demo/k8s/training.py b/examples/spark-app-demo/k8s/training.py
new file mode 100644
index 0000000..caf05bf
--- /dev/null
+++ b/examples/spark-app-demo/k8s/training.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import sys
+import time
+
+import model_store
+import numpy as np
+from model_store import ModelStore
+from sklearn import datasets
+from sklearn.linear_model import LogisticRegression
+import os
+
+_CANDIDATE_MODEL_STORE = ModelStore(model_store.CANDIDATE)
+_PRODUCTION_MODEL_STORE = ModelStore(model_store.PRODUCTION)
+
+import numpy as np
+import csv
+from sklearn.datasets.base import Bunch
+
+
+def load_iris_from_csv_file(f):
+ with open(f) as csv_file:
+ data_file = csv.reader(csv_file)
+ temp = next(data_file)
+ n_samples = 150 # number of data rows, don't count header
+ n_features = 4 # number of columns for features, don't count target column
+ feature_names = ['setosa', 'versicolor', 'virginica']
+ target_names = ['f4'] # adjust accordingly
+ data = np.empty((n_samples, n_features))
+ target = np.empty((n_samples,), dtype=np.int)
+
+ for i, sample in enumerate(data_file):
+ data[i] = np.asarray(sample[:-1], dtype=np.float64)
+ target[i] = np.asarray(sample[-1], dtype=np.int)
+
+ return Bunch(data=data, target=target, feature_names=feature_names, target_names=target_names)
+
+
+def get_dataset(d):
+ print("searching for csv files in {}".format(d))
+ for root, dirs, files in os.walk(d):
+
+ for file in files:
+ if file.endswith(".csv"):
+ return os.path.join(d, file)
+ return None
+
+
+def train_model(f):
+ csv_file = get_dataset(f)
+ if csv_file:
+ print("found {} dataset".format(csv_file))
+
+ iris = load_iris_from_csv_file(csv_file)
+
+ X = iris["data"][:, 3:] # petal width
+ y = (iris["target"] == 2).astype(np.int)
+
+ model = LogisticRegression()
+ model.fit(X, y)
+
+ version = round(time.time())
+
+ print(f'Saving model with version {version} to candidate model store.')
+ _CANDIDATE_MODEL_STORE.save_model(model, version)
+
+
+def validate_model():
+ model, version = _CANDIDATE_MODEL_STORE.load_latest_model()
+ print(f'Validating model with version {version} to candidate model store.')
+ if not isinstance(model.predict([[1]]), np.ndarray):
+ raise ValueError('Invalid model')
+ print(f'Deploying model with version {version} to production model store.')
+ _PRODUCTION_MODEL_STORE.save_model(model, version)
+
+
+if __name__ == '__main__':
+ cmd = sys.argv[1]
+ if cmd == 'train':
+ train_model(sys.argv[2])
+ elif cmd == 'validate':
+ validate_model()
+ else:
+ raise ValueError(f"Unknown command {cmd}")
diff --git a/.gitignore b/liminal/build/image/spark/Dockerfile
similarity index 85%
copy from .gitignore
copy to liminal/build/image/spark/Dockerfile
index fcf8bcd..db93f5b 100644
--- a/.gitignore
+++ b/liminal/build/image/spark/Dockerfile
@@ -15,18 +15,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+FROM bitnami/spark:3.1.2-debian-10-r23
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
+USER root
+
+WORKDIR /app
+
+COPY . /app/
+
+RUN {{mount}} pip install -r requirements.txt
\ No newline at end of file
diff --git a/.gitignore b/liminal/build/image/spark/__init__.py
similarity index 85%
copy from .gitignore
copy to liminal/build/image/spark/__init__.py
index fcf8bcd..217e5db 100644
--- a/.gitignore
+++ b/liminal/build/image/spark/__init__.py
@@ -15,18 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
diff --git a/liminal/runners/airflow/tasks/hadoop.py b/liminal/build/image/spark/spark.py
similarity index 68%
copy from liminal/runners/airflow/tasks/hadoop.py
copy to liminal/build/image/spark/spark.py
index 6003ce7..dbc9e8d 100644
--- a/liminal/runners/airflow/tasks/hadoop.py
+++ b/liminal/build/image/spark/spark.py
@@ -15,20 +15,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import os
-from abc import ABC, abstractmethod
+from liminal.build.python import BasePythonImageBuilder
-from liminal.runners.airflow.model import task
+class SparkImageBuilder(BasePythonImageBuilder):
+ def __init__(self, config, base_path, relative_source_path, tag):
+ super().__init__(config, base_path, relative_source_path, tag)
-class HadoopTask(task.Task, ABC):
- """
- Hadoop task
- """
-
- def apply_task_to_dag(self):
- return self.executor.apply_task_to_dag(task=self, parent=self.parent)
-
- @abstractmethod
- def get_runnable_command(self):
- pass
+ @staticmethod
+ def _dockerfile_path():
+ return os.path.join(os.path.dirname(__file__), 'Dockerfile')
diff --git a/liminal/core/config/config.py b/liminal/core/config/config.py
index 51c7198..7af3633 100644
--- a/liminal/core/config/config.py
+++ b/liminal/core/config/config.py
@@ -43,6 +43,18 @@ class ConfigUtil:
__AFTER_TASKS = 'after_tasks'
__EXECUTORS = 'executors'
__IMAGES = 'images'
+ __BASE = "base"
+ __PIPELINES = "pipelines"
+ __SUPER = "super"
+ __TYPE = "type"
+ __SUB = "sub"
+ __SERVICES = "services"
+ __TASKS = "tasks"
+ __PIPELINE_DEFAULTS = "pipeline_defaults"
+ __TASK_DEFAULTS = "task_defaults"
+ __BEFORE_TASKS = "before_tasks"
+ __AFTER_TASKS = "after_tasks"
+ __EXECUTORS = "executors"
def __init__(self, configs_path):
self.configs_path = configs_path
@@ -148,10 +160,12 @@ class ConfigUtil:
super1_pipeline_defaults = super1.get(self.__PIPELINE_DEFAULTS, {}).copy()
super2_pipeline_defaults = super2.get(self.__PIPELINE_DEFAULTS, {}).copy()
+ super1[self.__PIPELINE_DEFAULTS] = super1_pipeline_defaults
super1[self.__PIPELINE_DEFAULTS][self.__BEFORE_TASKS] = \
super2_pipeline_defaults.pop(self.__BEFORE_TASKS, []) + super1_pipeline_defaults.pop(
self.__BEFORE_TASKS, [])
+ super2[self.__PIPELINE_DEFAULTS] = super2_pipeline_defaults
super1[self.__PIPELINE_DEFAULTS][self.__AFTER_TASKS] = \
super1_pipeline_defaults.pop(self.__AFTER_TASKS, []) + super2_pipeline_defaults.pop(
self.__AFTER_TASKS, [])
diff --git a/liminal/core/config/defaults/base/liminal.yml b/liminal/core/config/defaults/base/liminal.yml
index 729ba22..41682a2 100644
--- a/liminal/core/config/defaults/base/liminal.yml
+++ b/liminal/core/config/defaults/base/liminal.yml
@@ -21,12 +21,20 @@ type: super
executors:
- executor: default_k8s
type: kubernetes
+ - executor: airflow_executor
+ type: airflow
service_defaults:
description: add defaults parameters for all services
task_defaults:
description: add defaults parameters for all tasks separate by task type
python:
executor: default_k8s
+ spark:
+ executor: default_k8s
+ job_end:
+ executor: airflow_executor
+ job_start:
+ executor: airflow_executor
pipeline_defaults:
description: add defaults parameters for all pipelines
before_tasks:
diff --git a/liminal/runners/airflow/dag/liminal_register_dags.py b/liminal/runners/airflow/dag/liminal_register_dags.py
index a08f1c1..46ff6e4 100644
--- a/liminal/runners/airflow/dag/liminal_register_dags.py
+++ b/liminal/runners/airflow/dag/liminal_register_dags.py
@@ -22,10 +22,10 @@ from datetime import datetime, timedelta
from airflow import DAG
-from liminal.core import environment as env
from liminal.core.config.config import ConfigUtil
from liminal.core.util import class_util
-from liminal.runners.airflow.model import executor
+from liminal.runners.airflow.executors import airflow
+from liminal.runners.airflow.model import executor as liminal_executor
from liminal.runners.airflow.model.task import Task
__DEPENDS_ON_PAST = 'depends_on_past'
@@ -62,6 +62,9 @@ def register_dags(configs_path):
executors = __initialize_executors(config)
+ default_executor = airflow.AirflowExecutor("default_executor", liminal_config=config,
+ executor_config={})
+
for pipeline in config['pipelines']:
default_args = __default_args(pipeline)
dag = __initialize_dag(default_args, pipeline, owner)
@@ -77,11 +80,19 @@ def register_dags(configs_path):
trigger_rule=trigger_rule,
liminal_config=config,
pipeline_config=pipeline,
- task_config=task,
- executor=executors.get(task.get('executor'))
+ task_config=task
)
- parent = task_instance.apply_task_to_dag()
+ executor_id = task.get('executor')
+ if executor_id:
+ executor = executors[executor_id]
+ else:
+ logging.info(f"Did not find `executor` in ${task['task']} config."
+ f" Using the default executor (${type(default_executor)})"
+ f" instead.")
+ executor = default_executor
+
+ parent = executor.apply_task_to_dag(task=task_instance)
logging.info(f'registered DAG {dag.dag_id}: {dag.tasks}')
@@ -160,7 +171,7 @@ logging.info(f'Finished loading task implementations: {tasks_by_liminal_name.key
executors_by_liminal_name = class_util.find_subclasses_in_packages(
['liminal.runners.airflow.executors'],
- executor.Executor)
+ liminal_executor.Executor)
logging.info(f'Finished loading executor implementations: {executors_by_liminal_name.keys()}')
diff --git a/liminal/runners/airflow/tasks/hadoop.py b/liminal/runners/airflow/executors/airflow.py
similarity index 71%
copy from liminal/runners/airflow/tasks/hadoop.py
copy to liminal/runners/airflow/executors/airflow.py
index 6003ce7..98ff0a5 100644
--- a/liminal/runners/airflow/tasks/hadoop.py
+++ b/liminal/runners/airflow/executors/airflow.py
@@ -16,19 +16,16 @@
# specific language governing permissions and limitations
# under the License.
-from abc import ABC, abstractmethod
+from liminal.runners.airflow.model import executor
+from liminal.runners.airflow.tasks import airflow
-from liminal.runners.airflow.model import task
-
-class HadoopTask(task.Task, ABC):
+class AirflowExecutor(executor.Executor):
"""
- Hadoop task
+ Execute the task steps
"""
- def apply_task_to_dag(self):
- return self.executor.apply_task_to_dag(task=self, parent=self.parent)
+ supported_task_types = [airflow.AirflowTask]
- @abstractmethod
- def get_runnable_command(self):
- pass
+ def _apply_executor_task_to_dag(self, **kwargs):
+ return kwargs['task'].apply_task_to_dag()
diff --git a/liminal/runners/airflow/executors/emr.py b/liminal/runners/airflow/executors/emr.py
index fd9bf34..eab7e87 100644
--- a/liminal/runners/airflow/executors/emr.py
+++ b/liminal/runners/airflow/executors/emr.py
@@ -37,9 +37,9 @@ class EMRExecutor(executor.Executor):
self.job_flow_id = self.executor_config.get('cluster_id', None)
self.job_flow_name = self.executor_config.get('cluster_name', None)
- def apply_task_to_dag(self, **kwargs):
+ def _apply_executor_task_to_dag(self, **kwargs):
task = kwargs['task']
- parent = kwargs.get('parent', task.parent)
+ parent = task.parent
self._validate_task_type(task)
diff --git a/liminal/runners/airflow/executors/kubernetes.py b/liminal/runners/airflow/executors/kubernetes.py
index c28d5dc..eb1c34c 100644
--- a/liminal/runners/airflow/executors/kubernetes.py
+++ b/liminal/runners/airflow/executors/kubernetes.py
@@ -49,13 +49,12 @@ class KubernetesPodExecutor(executor.Executor):
self.task_name = self.executor_config['executor']
self.volumes = self._volumes()
- def apply_task_to_dag(self, **kwargs):
+ def _apply_executor_task_to_dag(self, **kwargs):
task = kwargs['task']
+ parent = task.parent
self._validate_task_type(task)
- parent = kwargs.get('parent', task.parent)
-
pod_task = KubernetesPodOperator(
dag=task.dag,
trigger_rule=task.trigger_rule,
diff --git a/liminal/runners/airflow/model/executor.py b/liminal/runners/airflow/model/executor.py
index 4cb41d9..918ed21 100644
--- a/liminal/runners/airflow/model/executor.py
+++ b/liminal/runners/airflow/model/executor.py
@@ -31,8 +31,15 @@ class Executor(ABC):
self.executor_id = executor_id
self.executor_config = executor_config
- @abstractmethod
def apply_task_to_dag(self, **kwargs):
+ task = kwargs['task']
+
+ self._validate_task_type(task)
+
+ return self._apply_executor_task_to_dag(task=task)
+
+ @abstractmethod
+ def _apply_executor_task_to_dag(self, **kwargs):
pass
def _validate_task_type(self, task):
diff --git a/liminal/runners/airflow/model/task.py b/liminal/runners/airflow/model/task.py
index 47be264..e311b59 100644
--- a/liminal/runners/airflow/model/task.py
+++ b/liminal/runners/airflow/model/task.py
@@ -28,7 +28,7 @@ class Task(ABC):
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config, executor=None):
+ task_config):
self.liminal_config = liminal_config
self.dag = dag
self.pipeline_config = pipeline_config
@@ -36,8 +36,3 @@ class Task(ABC):
self.parent = parent
self.trigger_rule = trigger_rule
self.task_config = task_config
- self.executor = executor
-
- @abstractmethod
- def apply_task_to_dag(self, **kwargs):
- pass
diff --git a/liminal/runners/airflow/tasks/hadoop.py b/liminal/runners/airflow/tasks/airflow.py
similarity index 85%
copy from liminal/runners/airflow/tasks/hadoop.py
copy to liminal/runners/airflow/tasks/airflow.py
index 6003ce7..e577c66 100644
--- a/liminal/runners/airflow/tasks/hadoop.py
+++ b/liminal/runners/airflow/tasks/airflow.py
@@ -21,14 +21,11 @@ from abc import ABC, abstractmethod
from liminal.runners.airflow.model import task
-class HadoopTask(task.Task, ABC):
+class AirflowTask(task.Task, ABC):
"""
- Hadoop task
+ Airflow task
"""
- def apply_task_to_dag(self):
- return self.executor.apply_task_to_dag(task=self, parent=self.parent)
-
@abstractmethod
- def get_runnable_command(self):
+ def apply_task_to_dag(self):
pass
diff --git a/liminal/runners/airflow/tasks/containerable.py b/liminal/runners/airflow/tasks/containerable.py
index fa3af71..9f731e3 100644
--- a/liminal/runners/airflow/tasks/containerable.py
+++ b/liminal/runners/airflow/tasks/containerable.py
@@ -39,9 +39,9 @@ class ContainerTask(task.Task, ABC):
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config, executor=None):
+ task_config):
super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
- pipeline_config, task_config, executor)
+ pipeline_config, task_config)
env = standalone_variable_backend.get_variable(ENV, DEFAULT)
self.env_vars = self.__env_vars(env)
self.image = self.task_config['image']
@@ -51,9 +51,6 @@ class ContainerTask(task.Task, ABC):
self.env_vars.get(OUTPUT_DESTINATION_PATH)
)
- def apply_task_to_dag(self):
- return self.executor.apply_task_to_dag(task=self, parent=self.parent)
-
def _kubernetes_cmds_and_arguments(self, output_path, output_destination_path):
cmds = ['/bin/sh', '-c']
diff --git a/liminal/runners/airflow/tasks/create_cloudformation_stack.py b/liminal/runners/airflow/tasks/create_cloudformation_stack.py
index 7bbe2b9..9a7aef9 100644
--- a/liminal/runners/airflow/tasks/create_cloudformation_stack.py
+++ b/liminal/runners/airflow/tasks/create_cloudformation_stack.py
@@ -20,20 +20,20 @@ from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from flatdict import FlatDict
-from liminal.runners.airflow.model import task
from liminal.runners.airflow.operators.cloudformation import CloudFormationCreateStackOperator, \
CloudFormationCreateStackSensor, CloudFormationHook
+from liminal.runners.airflow.tasks import airflow
-class CreateCloudFormationStackTask(task.Task):
+class CreateCloudFormationStackTask(airflow.AirflowTask):
"""
Creates cloud_formation stack.
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config, executor=None):
+ task_config):
super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
- pipeline_config, task_config, executor)
+ pipeline_config, task_config)
self.stack_name = task_config['stack_name']
def apply_task_to_dag(self):
diff --git a/liminal/runners/airflow/tasks/delete_cloudformation_stack.py b/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
index 500fef1..324c961 100644
--- a/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
+++ b/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
@@ -19,20 +19,20 @@ from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
-from liminal.runners.airflow.model import task
from liminal.runners.airflow.operators.cloudformation import CloudFormationDeleteStackOperator, \
CloudFormationDeleteStackSensor
+from liminal.runners.airflow.tasks import airflow
-class DeleteCloudFormationStackTask(task.Task):
+class DeleteCloudFormationStackTask(airflow.AirflowTask):
"""
Deletes cloud_formation stack.
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config, executor=None):
+ task_config):
super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
- pipeline_config, task_config, executor)
+ pipeline_config, task_config)
self.stack_name = task_config['stack_name']
def apply_task_to_dag(self):
diff --git a/liminal/runners/airflow/tasks/hadoop.py b/liminal/runners/airflow/tasks/hadoop.py
index 6003ce7..7a79460 100644
--- a/liminal/runners/airflow/tasks/hadoop.py
+++ b/liminal/runners/airflow/tasks/hadoop.py
@@ -26,9 +26,6 @@ class HadoopTask(task.Task, ABC):
Hadoop task
"""
- def apply_task_to_dag(self):
- return self.executor.apply_task_to_dag(task=self, parent=self.parent)
-
@abstractmethod
def get_runnable_command(self):
pass
diff --git a/liminal/runners/airflow/tasks/job_end.py b/liminal/runners/airflow/tasks/job_end.py
index c71f0ff..ff5cffa 100644
--- a/liminal/runners/airflow/tasks/job_end.py
+++ b/liminal/runners/airflow/tasks/job_end.py
@@ -16,19 +16,19 @@
# specific language governing permissions and limitations
# under the License.
-from liminal.runners.airflow.model import task
from liminal.runners.airflow.operators.job_status_operator import JobEndOperator
+from liminal.runners.airflow.tasks import airflow
-class JobEndTask(task.Task):
+class JobEndTask(airflow.AirflowTask):
"""
Job end task. Reports job end metrics.
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config, executor=None):
+ task_config):
super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config, executor)
+ task_config)
metrics = self.liminal_config.get('metrics', {})
self.metrics_namespace = metrics.get('namespace', '')
self.metrics_backends = metrics.get('backends', [])
diff --git a/liminal/runners/airflow/tasks/job_start.py b/liminal/runners/airflow/tasks/job_start.py
index c5c6234..b6bef32 100644
--- a/liminal/runners/airflow/tasks/job_start.py
+++ b/liminal/runners/airflow/tasks/job_start.py
@@ -16,19 +16,19 @@
# specific language governing permissions and limitations
# under the License.
-from liminal.runners.airflow.model import task
from liminal.runners.airflow.operators.job_status_operator import JobStartOperator
+from liminal.runners.airflow.tasks import airflow
-class JobStartTask(task.Task):
+class JobStartTask(airflow.AirflowTask):
"""
Job start task. Reports job start metrics.
"""
def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config, executor=None):
+ task_config):
super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config, executor)
+ task_config)
metrics = self.liminal_config.get('metrics', {})
self.metrics_namespace = metrics.get('namespace', '')
self.metrics_backends = metrics.get('backends', [])
diff --git a/liminal/runners/airflow/tasks/spark.py b/liminal/runners/airflow/tasks/spark.py
index c29c4d3..247df2c 100644
--- a/liminal/runners/airflow/tasks/spark.py
+++ b/liminal/runners/airflow/tasks/spark.py
@@ -16,15 +16,24 @@
# specific language governing permissions and limitations
# under the License.
from itertools import chain
-from liminal.runners.airflow.tasks import hadoop
+
from flatdict import FlatDict
+from liminal.runners.airflow.tasks import hadoop, containerable
+
-class SparkTask(hadoop.HadoopTask):
+class SparkTask(hadoop.HadoopTask, containerable.ContainerTask):
"""
Executes a Spark application.
"""
+ def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+ task_config):
+ task_config['image'] = task_config.get('image', '')
+ task_config['cmd'] = task_config.get('cmd', [])
+ super().__init__(task_id, dag, parent, trigger_rule, liminal_config,
+ pipeline_config, task_config)
+
def get_runnable_command(self):
"""
Return spark-submit runnable command
@@ -65,6 +74,8 @@ class SparkTask(hadoop.HadoopTask):
def __additional_arguments(self):
application_arguments = self.task_config.get('application_arguments', {})
+ if type(application_arguments) == list:
+ return application_arguments
return self.__interleaving(application_arguments.keys(), application_arguments.values())
def __parse_spark_arguments(self, spark_arguments):
@@ -76,3 +87,6 @@ class SparkTask(hadoop.HadoopTask):
@staticmethod
def __interleaving(keys, values):
return list(chain.from_iterable(zip(keys, values)))
+
+ def _kubernetes_cmds_and_arguments(self, output_path, output_destination_path):
+ return self.__generate_spark_submit(), []
diff --git a/liminal/runners/airflow/tasks/sql.py b/liminal/runners/airflow/tasks/sql.py
index 7224852..e17964f 100644
--- a/liminal/runners/airflow/tasks/sql.py
+++ b/liminal/runners/airflow/tasks/sql.py
@@ -19,15 +19,7 @@
from liminal.runners.airflow.model import task
-class SparkTask(task.Task):
+class SqlTask(task.Task):
"""
Executes an SQL application.
"""
-
- def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config, executor):
- super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
- task_config, executor)
-
- def apply_task_to_dag(self):
- pass
diff --git a/scripts/liminal b/scripts/liminal
index 48282cf..c886592 100755
--- a/scripts/liminal
+++ b/scripts/liminal
@@ -31,6 +31,7 @@ import scripts
from liminal.build import liminal_apps_builder
from liminal.core import environment
from liminal.core.util import files_util
+from liminal.core.config.config import ConfigUtil
from liminal.kubernetes import volume_util
from liminal.logging.logging_setup import logging_initialization
from liminal.runners.airflow import dag
@@ -116,6 +117,7 @@ def docker_compose_command(command_name, args):
subprocess.call(run_command, env=os.environ, shell=True)
return '', ''
+
@cli.command("deploy", short_help="deploys your liminal.yaml files to $LIMINAL_HOME folder")
@click.option('--path', default=os.getcwd(), help="folder containing liminal.yaml files")
@click.option('--clean', is_flag=True, default=False,
@@ -134,6 +136,7 @@ def deploy_liminal_apps(path, clean):
pathlib.Path(target_yml_name).parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(config_file, target_yml_name)
+
def liminal_is_running():
stdout, stderr = docker_compose_command('ps', [])
return "liminal" in stdout
@@ -161,20 +164,24 @@ def logs(follow, tail):
stdout, stderr = docker_compose_command('logs', [f'--tail={tail}'])
logging.info(stdout)
+
@cli.command("create", short_help="create a kubernetes local volume")
-@click.option('--local-volume-path', default=os.getcwd(), help="folder containing liminal.yaml files")
+@click.option('--local-volume-path', default=os.getcwd(),
+ help="folder containing liminal.yaml files")
def create(local_volume_path):
click.echo("creating a kubernetes local volume")
- config_files = files_util.find_config_files(local_volume_path)
- for config_file in config_files:
- with open(config_file) as stream:
- config = yaml.safe_load(stream)
- volume_util.create_local_volumes(config, os.path.dirname(config_file))
+ configs = ConfigUtil(local_volume_path).safe_load(is_render_variables=True,
+ soft_merge=True)
+ for config in configs:
+ volume_util.create_local_volumes(config, os.path.dirname(
+ files_util.resolve_pipeline_source_file(config['name'])))
+
@cli.command("start",
short_help="starts a local airflow in docker compose. should be run after deploy. " +
"Make sure docker is running on your machine")
-@click.option('--detached-mode', '-d', is_flag=True, default=False, help="Start liminal in detached mode.")
+@click.option('--detached-mode', '-d', is_flag=True, default=False,
+ help="Start liminal in detached mode.")
def start(detached_mode):
liminal_version = environment.get_liminal_version()
logging.info(f'starting liminal version: {liminal_version}')
diff --git a/tests/liminal/core/config/test_config.py b/tests/liminal/core/config/test_config.py
index b4baf9b..57201be 100644
--- a/tests/liminal/core/config/test_config.py
+++ b/tests/liminal/core/config/test_config.py
@@ -117,6 +117,50 @@ class TestConfigUtil(TestCase):
'type': 'sub'
}]
+ expected = [{'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'},
+ {'executor': 'airflow_executor', 'type': 'airflow'}],
+ 'images': [{'image': 'my_image', 'source': '.'}],
+ 'name': 'my_subliminal_test',
+ 'pipeline_defaults': {'param1': 'param1_value'},
+ 'pipelines': [{'description': 'add defaults parameters for all pipelines',
+ 'name': 'mypipe1',
+ 'param': 'constant',
+ 'param1': 'param1_value',
+ 'param2': 'param2super_value',
+ 'param3': 'param3super_value',
+ 'param4': 'param4hyper_value',
+ 'tasks': [{'executor': 'airflow_executor',
+ 'task': 'start',
+ 'task_def1': 'task_def1_value',
+ 'task_def2': {'task_def2_1': 'task_def2_1_value'},
+ 'task_sub_def': 'task_sub_def_value',
+ 'type': 'job_start'},
+ {'executor': 'airflow_executor',
+ 'task': 'end',
+ 'type': 'job_end'}]},
+ {'description': 'add defaults parameters for all pipelines',
+ 'name': 'mypipe2',
+ 'param': 'constant',
+ 'param1': 'param1_value',
+ 'param2': 'param2super_value',
+ 'param3': 'param3super_value',
+ 'param4': 'param4hyper_value',
+ 'tasks': [{'executor': 'airflow_executor',
+ 'task': 'start',
+ 'task_def1': 'task_def1_value',
+ 'task_def2': {'task_def2_1': 'task_def2_1_value'},
+ 'task_sub_def': 'task_sub_def_value',
+ 'type': 'job_start'},
+ {'executor': 'airflow_executor',
+ 'task': 'end',
+ 'type': 'job_end'}]}],
+ 'service_defaults': {'description': 'add defaults parameters for all '
+ 'services'},
+ 'services': [],
+ 'super': 'my_superliminal_test',
+ 'task_defaults': {'job_start': {'task_sub_def': 'task_sub_def_value'}},
+ 'type': 'sub'}]
+
find_config_files_mock.return_value = {
'my_subliminal_test': subliminal,
'my_superliminal_test': superliminal,
@@ -151,7 +195,8 @@ class TestConfigUtil(TestCase):
@mock.patch('liminal.core.util.files_util.load')
def test_get_superliminal(self, find_config_files_mock):
- base = {'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'}],
+ base = {'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'},
+ {'executor': 'airflow_executor', 'type': 'airflow'}],
'name': 'base',
'pipeline_defaults': {'after_tasks': [{'task': 'end', 'type': 'job_end'}],
'before_tasks': [{'task': 'start', 'type': 'job_start'}],
@@ -161,7 +206,10 @@ class TestConfigUtil(TestCase):
'services'},
'task_defaults': {'description': 'add defaults parameters for all tasks '
'separate by task type',
- 'python': {'executor': 'default_k8s'}},
+ 'job_end': {'executor': 'airflow_executor'},
+ 'job_start': {'executor': 'airflow_executor'},
+ 'python': {'executor': 'default_k8s'},
+ 'spark': {'executor': 'default_k8s'}},
'type': 'super'}
subliminal = {
'name': 'subliminal_test',
@@ -335,62 +383,67 @@ class TestConfigUtil(TestCase):
}
}
- expected = [{
- 'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'}],
- 'name': 'my_subliminal_test',
- 'pipeline_defaults': {'param1': '-case'},
- 'pipelines': [{'description': 'add defaults parameters for all pipelines',
- 'global_conf': 'super_var',
- 'name': 'mypipe1',
- 'param': 'simple case',
- 'param1': '-case',
- 'param2': '{{pipe-var}}',
- 'param3': 'param3super_value',
- 'param4': 'param4hyper_value',
- 'tasks': [{'task': 'start',
- 'task_def1:': 'task_sub_def_value',
- 'type': 'job_start'},
- {'task': 'second_task', 'type': 'dummy'},
- {'task': 'sub_tasks', 'type': 'dummy'},
- {'task': 'before_last_task', 'type': 'dummy'},
- {'task': 'end', 'type': 'job_end'}]},
- {'description': 'add defaults parameters for all pipelines',
- 'global_conf': 'super_var',
- 'name': 'mypipe2',
- 'param': '-case',
- 'param1': '-case',
- 'param2': '{{pipe-var}}',
- 'param3': 'param3super_value',
- 'param4': 'param4hyper_value',
- 'tasks': [{'task': 'start',
- 'task_def1:': 'task_sub_def_value',
- 'type': 'job_start'},
- {'task': 'second_task', 'type': 'dummy'},
- {'task': 'sub_tasks', 'type': 'dummy'},
- {'task': 'before_last_task', 'type': 'dummy'},
- {'task': 'end', 'type': 'job_end'}]}],
- 'service_defaults': {'description': 'add defaults parameters for all '
- 'services'},
- 'images': [],
- 'services': [{'description': 'add defaults parameters for all services',
- 'image': 'prod image',
- 'name': 'my_python_server',
- 'type': 'python_server'},
- {'description': 'add defaults parameters for all services',
- 'image': 'default_image_value',
- 'name': 'my_python_server_for_stg',
- 'type': 'python_server'}],
- 'super': 'my_superliminal_test',
- 'task_defaults': {'job_start': {'task_def1:': 'task_sub_def_value'}},
- 'type': 'sub',
- 'variables': {'a': 'myenv1',
- 'b': 'myenv12',
- 'c': 'myenv1myenv122',
- 'image': 'prod image',
- 'var': 'simple case',
- 'var-2': '-case',
- 'var_2': '_case'}
- }]
+ expected = [{'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'},
+ {'executor': 'airflow_executor', 'type': 'airflow'}],
+ 'name': 'my_subliminal_test',
+ 'pipeline_defaults': {'param1': '-case'},
+ 'pipelines': [{'description': 'add defaults parameters for all pipelines',
+ 'global_conf': 'super_var',
+ 'name': 'mypipe1',
+ 'param': 'simple case',
+ 'param1': '-case',
+ 'param2': '{{pipe-var}}',
+ 'param3': 'param3super_value',
+ 'param4': 'param4hyper_value',
+ 'tasks': [{'executor': 'airflow_executor',
+ 'task': 'start',
+ 'task_def1:': 'task_sub_def_value',
+ 'type': 'job_start'},
+ {'task': 'second_task', 'type': 'dummy'},
+ {'task': 'sub_tasks', 'type': 'dummy'},
+ {'task': 'before_last_task', 'type': 'dummy'},
+ {'executor': 'airflow_executor',
+ 'task': 'end',
+ 'type': 'job_end'}]},
+ {'description': 'add defaults parameters for all pipelines',
+ 'global_conf': 'super_var',
+ 'name': 'mypipe2',
+ 'param': '-case',
+ 'param1': '-case',
+ 'param2': '{{pipe-var}}',
+ 'param3': 'param3super_value',
+ 'param4': 'param4hyper_value',
+ 'tasks': [{'executor': 'airflow_executor',
+ 'task': 'start',
+ 'task_def1:': 'task_sub_def_value',
+ 'type': 'job_start'},
+ {'task': 'second_task', 'type': 'dummy'},
+ {'task': 'sub_tasks', 'type': 'dummy'},
+ {'task': 'before_last_task', 'type': 'dummy'},
+ {'executor': 'airflow_executor',
+ 'task': 'end',
+ 'type': 'job_end'}]}],
+ 'service_defaults': {'description': 'add defaults parameters for all '
+ 'services'},
+ 'images': [],
+ 'services': [{'description': 'add defaults parameters for all services',
+ 'image': 'prod image',
+ 'name': 'my_python_server',
+ 'type': 'python_server'},
+ {'description': 'add defaults parameters for all services',
+ 'image': 'default_image_value',
+ 'name': 'my_python_server_for_stg',
+ 'type': 'python_server'}],
+ 'super': 'my_superliminal_test',
+ 'task_defaults': {'job_start': {'task_def1:': 'task_sub_def_value'}},
+ 'type': 'sub',
+ 'variables': {'a': 'myenv1',
+ 'b': 'myenv12',
+ 'c': 'myenv1myenv122',
+ 'image': 'prod image',
+ 'var': 'simple case',
+ 'var-2': '-case',
+ 'var_2': '_case'}}]
find_config_files_mock.return_value = {
'my_subliminal_test': subliminal,
@@ -424,28 +477,29 @@ class TestConfigUtil(TestCase):
]
}
- expected = {
- 'name': 'my_subliminal_test', 'type': 'sub',
- 'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'}],
- 'service_defaults': {'description': 'add defaults parameters for all services'},
- 'task_defaults': {
- 'description': 'add defaults parameters for all tasks separate by task type',
- 'python': {'executor': 'default_k8s'}}, 'pipeline_defaults': {
+ expected = {'name': 'my_subliminal_test', 'type': 'sub',
+ 'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'},
+ {'executor': 'airflow_executor', 'type': 'airflow'}],
+ 'service_defaults': {'description': 'add defaults parameters for all services'},
+ 'task_defaults': {
+ 'description': 'add defaults parameters for all tasks separate by task type',
+ 'python': {'executor': 'default_k8s'}, 'spark': {'executor': 'default_k8s'},
+ 'job_end': {'executor': 'airflow_executor'},
+ 'job_start': {'executor': 'airflow_executor'}}, 'pipeline_defaults': {
'description': 'add defaults parameters for all pipelines',
'before_tasks': [{'task': 'start', 'type': 'job_start'}],
'after_tasks': [{'task': 'end', 'type': 'job_end'}]},
- 'variables': {'var': 1, 'var-2': True}, 'pipelines': [
+ 'variables': {'var': 1, 'var-2': True}, 'pipelines': [
{'name': 'mypipe1', 'param': '1',
'description': 'add defaults parameters for all pipelines',
- 'tasks': [{'task': 'start', 'type': 'job_start'},
- {'task': 'end', 'type': 'job_end'}]},
+ 'tasks': [{'task': 'start', 'type': 'job_start', 'executor': 'airflow_executor'},
+ {'task': 'end', 'type': 'job_end', 'executor': 'airflow_executor'}]},
{'name': 'mypipe2', 'param': 'True',
'description': 'add defaults parameters for all pipelines',
- 'tasks': [{'task': 'start', 'type': 'job_start'},
- {'task': 'end', 'type': 'job_end'}]}],
- 'images': [],
- 'services': []
- }
+ 'tasks': [{'task': 'start', 'type': 'job_start', 'executor': 'airflow_executor'},
+ {'task': 'end', 'type': 'job_end', 'executor': 'airflow_executor'}]}],
+ 'images': [],
+ 'services': []}
find_config_files_mock.return_value = {
'my_subliminal_test': subliminal
diff --git a/.gitignore b/tests/runners/airflow/build/spark/__init__.py
similarity index 85%
copy from .gitignore
copy to tests/runners/airflow/build/spark/__init__.py
index fcf8bcd..217e5db 100644
--- a/.gitignore
+++ b/tests/runners/airflow/build/spark/__init__.py
@@ -15,18 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
diff --git a/tests/runners/airflow/build/spark/test_spark_image_builder.py b/tests/runners/airflow/build/spark/test_spark_image_builder.py
new file mode 100644
index 0000000..70148b4
--- /dev/null
+++ b/tests/runners/airflow/build/spark/test_spark_image_builder.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import shutil
+import tempfile
+from unittest import TestCase
+
+import docker
+
+from liminal.build.image.spark.spark import SparkImageBuilder
+
+
+class TestSparkImageBuilder(TestCase):
+ __IMAGE_NAME = 'my_spark_image'
+
+ def setUp(self) -> None:
+ super().setUp()
+ os.environ['TMPDIR'] = '/tmp'
+ self.temp_dir = self.__temp_dir()
+ self.temp_airflow_dir = self.__temp_dir()
+
+ def tearDown(self) -> None:
+ super().tearDown()
+ self.__remove_dir(self.temp_dir)
+ self.__remove_dir(self.temp_airflow_dir)
+
+ def test_build(self):
+ build_out = self.__test_build()
+ self.assertTrue('RUN pip install -r requirements.txt' in build_out,
+ 'Incorrect pip command')
+
+ self.__test_image()
+
+ def __test_build(self):
+ config = self.__create_conf('my_task')
+
+ base_path = os.path.join(os.path.dirname(__file__), '../../../apps/test_spark_app')
+
+ builder = SparkImageBuilder(config=config,
+ base_path=base_path,
+ relative_source_path='wordcount',
+ tag=self.__IMAGE_NAME)
+
+ build_out = str(builder.build())
+
+ return build_out
+
+ def __test_image(self):
+ docker_client = docker.from_env()
+ docker_client.images.get(self.__IMAGE_NAME)
+
+ cmds = ['spark-submit', 'wordcount.py', 'words.txt', '/mnt/vol1/outputs']
+
+ container_log = docker_client.containers.run(self.__IMAGE_NAME,
+ cmds,
+ volumes={
+ self.temp_dir: {
+ 'bind': '/mnt/vol1',
+ 'mode': 'rw'
+ }
+ })
+
+ docker_client.close()
+
+ logging.info(container_log)
+
+ self.assertEqual(
+ "b'\\n"
+ "my: 1\\n"
+ "first: 1\\n"
+ "liminal: 1\\n"
+ "spark: 1\\n"
+ "task: 1\\n"
+ "writing the results to /mnt/vol1/outputs\\n'",
+ str(container_log))
+
+ def __create_conf(self, task_id):
+ return {
+ 'task': task_id,
+ 'cmd': 'foo bar',
+ 'image': self.__IMAGE_NAME,
+ 'source': 'baz',
+ 'no_cache': True,
+ }
+
+ @staticmethod
+ def __temp_dir():
+ temp_dir = tempfile.mkdtemp()
+ return temp_dir
+
+ @staticmethod
+ def __remove_dir(temp_dir):
+ shutil.rmtree(temp_dir, ignore_errors=True)
diff --git a/liminal/runners/airflow/tasks/hadoop.py b/tests/runners/airflow/executors/test_airflow_executor.py
similarity index 60%
copy from liminal/runners/airflow/tasks/hadoop.py
copy to tests/runners/airflow/executors/test_airflow_executor.py
index 6003ce7..6f5fd8b 100644
--- a/liminal/runners/airflow/tasks/hadoop.py
+++ b/tests/runners/airflow/executors/test_airflow_executor.py
@@ -16,19 +16,23 @@
# specific language governing permissions and limitations
# under the License.
-from abc import ABC, abstractmethod
+from unittest import TestCase
+from unittest.mock import MagicMock
-from liminal.runners.airflow.model import task
+from liminal.runners.airflow.executors import airflow
+from liminal.runners.airflow.tasks.airflow import AirflowTask
-class HadoopTask(task.Task, ABC):
+class TestAirflowExecutorTask(TestCase):
"""
- Hadoop task
+ Test AirflowExecutor task
"""
- def apply_task_to_dag(self):
- return self.executor.apply_task_to_dag(task=self, parent=self.parent)
+ def test_apply_task_to_dag(self):
+ task0 = MagicMock(spec=AirflowTask)
- @abstractmethod
- def get_runnable_command(self):
- pass
+ task0.apply_task_to_dag = MagicMock()
+
+ airflow.AirflowExecutor("executor-task", {}, {}).apply_task_to_dag(task=task0)
+
+ task0.apply_task_to_dag.assert_called_once()
diff --git a/tests/runners/airflow/tasks/test_create_cloudformation_stack.py b/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
index dc8e5bc..58315e1 100644
--- a/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
+++ b/tests/runners/airflow/tasks/test_create_cloudformation_stack.py
@@ -23,6 +23,7 @@ from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from moto import mock_cloudformation
+from liminal.runners.airflow.executors import airflow
from liminal.runners.airflow.operators.cloudformation import CloudFormationCreateStackOperator, \
CloudFormationCreateStackSensor, CloudFormationHook
from liminal.runners.airflow.tasks.create_cloudformation_stack import CreateCloudFormationStackTask
@@ -80,7 +81,8 @@ class TestCreateCloudFormationStackTask(TestCase):
task_config=self.config
)
- self.create_cloudformation_task.apply_task_to_dag()
+ airflow.AirflowExecutor("airflow-executor", {}, {}).apply_task_to_dag(
+ task=self.create_cloudformation_task)
def test_apply_task_to_dag(self):
self.assertEqual(len(self.dag.tasks), 4)
diff --git a/tests/runners/airflow/tasks/test_job_end.py b/tests/runners/airflow/tasks/test_job_end.py
index d3e1444..4e1148c 100644
--- a/tests/runners/airflow/tasks/test_job_end.py
+++ b/tests/runners/airflow/tasks/test_job_end.py
@@ -19,6 +19,7 @@
import unittest
from unittest import TestCase
+from liminal.runners.airflow.executors import airflow
from liminal.runners.airflow.tasks import job_end
from tests.util import dag_test_utils
@@ -38,7 +39,7 @@ class TestJobEndTask(TestCase):
trigger_rule='all_done',
liminal_config={'metrics': {'namespace': 'EndJobNameSpace', 'backends': ['cloudwatch']}}
)
- task0.apply_task_to_dag()
+ airflow.AirflowExecutor("airflow-executor", {}, {}).apply_task_to_dag(task=task0)
self.assertEqual(len(dag.tasks), 1)
dag_task0 = dag.tasks[0]
@@ -53,9 +54,11 @@ class TestJobEndTask(TestCase):
dag = dag_test_utils.create_dag()
task0 = job_end.JobEndTask(task_id="job_end", dag=dag,
- pipeline_config={'pipeline': 'my_end_pipeline'}, liminal_config=conf, parent=None,
+ pipeline_config={'pipeline': 'my_end_pipeline'},
+ liminal_config=conf, parent=None,
trigger_rule='all_done', task_config={})
- task0.apply_task_to_dag()
+
+ airflow.AirflowExecutor("airflow-executor", {}, {}).apply_task_to_dag(task=task0)
self.assertEqual(len(dag.tasks), 1)
dag_task0 = dag.tasks[0]
diff --git a/tests/runners/airflow/tasks/test_job_start.py b/tests/runners/airflow/tasks/test_job_start.py
index b799544..97faaa0 100644
--- a/tests/runners/airflow/tasks/test_job_start.py
+++ b/tests/runners/airflow/tasks/test_job_start.py
@@ -19,6 +19,7 @@
import unittest
from unittest import TestCase
+from liminal.runners.airflow.executors import airflow
from liminal.runners.airflow.tasks import job_start
from tests.util import dag_test_utils
@@ -32,7 +33,8 @@ class TestJobStartTask(TestCase):
task0 = job_start.JobStartTask(
task_id="start_task",
dag=dag,
- liminal_config={'metrics': {'namespace': 'StartJobNameSpace', 'backends': ['cloudwatch']}},
+ liminal_config={
+ 'metrics': {'namespace': 'StartJobNameSpace', 'backends': ['cloudwatch']}},
pipeline_config={'pipeline': 'my_start_pipeline'},
task_config={},
parent=None,
@@ -61,7 +63,7 @@ class TestJobStartTask(TestCase):
pipeline_config={'pipeline': 'my_end_pipeline'},
parent=None,
trigger_rule='all_success')
- task0.apply_task_to_dag()
+ airflow.AirflowExecutor("airflow-executor", {}, {}).apply_task_to_dag(task=task0)
self.assertEqual(len(dag.tasks), 1)
dag_task0 = dag.tasks[0]
@@ -75,12 +77,13 @@ class TestJobStartTask(TestCase):
task0 = job_start.JobStartTask(task_id="start_task",
dag=dag,
- liminal_config={'metrics': {'namespace': 'StartJobNameSpace'}},
+ liminal_config={
+ 'metrics': {'namespace': 'StartJobNameSpace'}},
pipeline_config={'pipeline': 'my_start_pipeline'},
task_config={},
parent=None,
trigger_rule='all_success', )
- task0.apply_task_to_dag()
+ airflow.AirflowExecutor("airflow-executor", {}, {}).apply_task_to_dag(task=task0)
self.assertEqual(len(dag.tasks), 1)
dag_task0 = dag.tasks[0]
diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py
index 10dab4d..41f8379 100644
--- a/tests/runners/airflow/tasks/test_python.py
+++ b/tests/runners/airflow/tasks/test_python.py
@@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
-import logging
import os
import tempfile
import unittest
@@ -67,7 +66,16 @@ class TestPythonTask(TestCase):
'NUM_FILES': 10,
'NUM_SPLITS': 3
})
- task0.apply_task_to_dag()
+ executor = KubernetesPodExecutor(
+ task_id='k8s',
+ liminal_config=self.liminal_config,
+ executor_config={
+ 'executor': 'k8s',
+ 'name': 'mypod'
+ }
+ )
+
+ executor.apply_task_to_dag(task=task0)
task1 = self.__create_python_task(dag,
'my_output_task',
@@ -75,7 +83,7 @@ class TestPythonTask(TestCase):
'my_parallelized_python_task_img',
'python -u write_outputs.py',
executors=3)
- task1.apply_task_to_dag()
+ executor.apply_task_to_dag(task=task1)
for task in dag.tasks:
print(f'Executing task {task.task_id}')
@@ -119,7 +127,7 @@ class TestPythonTask(TestCase):
cmd,
env_vars=None,
executors=None):
-
+
self.liminal_config['volumes'] = [
{
'volume': self._VOLUME_NAME,
@@ -164,15 +172,7 @@ class TestPythonTask(TestCase):
},
task_config=task_config,
parent=parent,
- trigger_rule='all_success',
- executor=KubernetesPodExecutor(
- task_id='k8s',
- liminal_config=self.liminal_config,
- executor_config={
- 'executor': 'k8s',
- 'name': 'mypod'
- }
- ))
+ trigger_rule='all_success')
if __name__ == '__main__':
diff --git a/tests/runners/airflow/tasks/test_spark_task.py b/tests/runners/airflow/tasks/test_spark_task.py
index 5223ba7..b0ae799 100644
--- a/tests/runners/airflow/tasks/test_spark_task.py
+++ b/tests/runners/airflow/tasks/test_spark_task.py
@@ -15,10 +15,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+import os
+import tempfile
from unittest import TestCase
+from liminal.build import liminal_apps_builder
+from liminal.kubernetes import volume_util
from liminal.runners.airflow import DummyDag
+from liminal.runners.airflow.executors.kubernetes import KubernetesPodExecutor
from liminal.runners.airflow.tasks.spark import SparkTask
+from tests.util import dag_test_utils
class TestSparkTask(TestCase):
@@ -26,6 +33,89 @@ class TestSparkTask(TestCase):
Test Spark Task
"""
+ _VOLUME_NAME = 'myvol1'
+
+ def test_spark_on_k8s(self):
+ volume_util.delete_local_volume(self._VOLUME_NAME)
+ os.environ['TMPDIR'] = '/tmp'
+ self.temp_dir = tempfile.mkdtemp()
+ self.liminal_config = {
+ 'volumes': [
+ {
+ 'volume': self._VOLUME_NAME,
+ 'local': {
+ 'path': self.temp_dir.replace(
+ "/var/folders",
+ "/private/var/folders"
+ )
+ }
+ }
+ ]
+ }
+ volume_util.create_local_volumes(self.liminal_config, None)
+
+ # build spark image
+ liminal_apps_builder.build_liminal_apps(
+ os.path.join(os.path.dirname(__file__), '../../apps/test_spark_app'))
+
+ outputs_dir = os.path.join(self.temp_dir, 'outputs')
+
+ task_config = {
+ 'task': "my_spark_task",
+ 'image': "my_spark_image",
+ 'application_source': 'wordcount.py',
+ 'application_arguments': ['words.txt', '/mnt/vol1/outputs/'],
+ 'env_vars': {},
+ 'mounts': [
+ {
+ 'mount': 'mymount',
+ 'volume': self._VOLUME_NAME,
+ 'path': '/mnt/vol1'
+ }
+ ]
+ }
+
+ dag = dag_test_utils.create_dag()
+
+ task1 = SparkTask(
+ task_id="my_spark_task",
+ dag=dag,
+ liminal_config=self.liminal_config,
+ pipeline_config={
+ 'pipeline': 'my_pipeline'
+ },
+ task_config=task_config,
+ parent=None,
+ trigger_rule='all_success')
+
+ executor = KubernetesPodExecutor(
+ task_id='k8s',
+ liminal_config=self.liminal_config,
+ executor_config={
+ 'executor': 'k8s',
+ 'name': 'mypod'
+ }
+ )
+ executor.apply_task_to_dag(task=task1)
+
+ for task in dag.tasks:
+ print(f'Executing task {task.task_id}')
+ task.execute(DummyDag('my_dag', task.task_id).context)
+
+ expected_output = '{"word":"my","count":1}\n' \
+ '{"word":"first","count":1}\n' \
+ '{"word":"liminal","count":1}\n' \
+ '{"word":"spark","count":1}\n' \
+ '{"word":"task","count":1}\n'.split("\n")
+
+ actual = ''
+ for filename in os.listdir(outputs_dir):
+ if filename.endswith(".json"):
+ with open(os.path.join(outputs_dir, filename)) as f:
+ actual = f.read()
+
+ self.assertEqual(actual.split("\n"), expected_output)
+
def test_get_runnable_command(self):
task_config = {
'application_source': 'my_app.py',
@@ -37,18 +127,24 @@ class TestSparkTask(TestCase):
'spark.yarn.executor.memoryOverhead': '500M'
},
'application_arguments': {
- '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
+ '--query': "select * from "
+ "my_table where date_prt >= "
"'{{yesterday_ds}}'",
'--output': 'mytable'
}
}
- expected = ['spark-submit', '--master', 'yarn', '--class', 'org.apache.liminal.MySparkApp', '--conf',
- 'spark.driver.memory=1g', '--conf', 'spark.driver.maxResultSize=1g', '--conf',
+ expected = ['spark-submit',
+ '--master',
+ 'yarn',
+ '--class',
+ 'org.apache.liminal.MySparkApp',
+ '--conf',
+ 'spark.driver.maxResultSize=1g', '--conf', 'spark.driver.memory=1g', '--conf',
'spark.yarn.executor.memoryOverhead=500M', 'my_app.py',
'--query',
- "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >="
- " '{{yesterday_ds}}'",
+ "select * from my_table where "
+ "date_prt >= '{{yesterday_ds}}'",
'--output', 'mytable']
actual = SparkTask(
@@ -57,7 +153,7 @@ class TestSparkTask(TestCase):
[],
trigger_rule='all_success',
liminal_config={},
- pipeline_config={},
+ pipeline_config={'pipeline': 'pipeline'},
task_config=task_config
).get_runnable_command()
@@ -67,17 +163,17 @@ class TestSparkTask(TestCase):
task_config = {
'application_source': 'my_app.py',
'application_arguments': {
- '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
- "'{{yesterday_ds}}'",
- '--output': 'mytable'
+ '--query': "select * from my_table where"
+ " date_prt >= '{{yesterday_ds}}'",
+ '--output': 'myoutputtable'
}
}
expected = ['spark-submit', 'my_app.py',
'--query',
- "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >="
- " '{{yesterday_ds}}'",
- '--output', 'mytable']
+ "select * from my_table where "
+ "date_prt >= '{{yesterday_ds}}'",
+ '--output', 'myoutputtable']
actual = SparkTask(
'my_spark_task',
@@ -85,11 +181,11 @@ class TestSparkTask(TestCase):
[],
trigger_rule='all_success',
liminal_config={},
- pipeline_config={},
+ pipeline_config={'pipeline': 'pipeline'},
task_config=task_config
).get_runnable_command()
- self.assertEqual(actual, expected)
+ self.assertEqual(actual.sort(), expected.sort())
def test_partially_missing_spark_arguments(self):
task_config = {
@@ -101,9 +197,9 @@ class TestSparkTask(TestCase):
'spark.yarn.executor.memoryOverhead': '500M'
},
'application_arguments': {
- '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
- "'{{yesterday_ds}}'",
- '--output': 'mytable'
+ '--query': "select * from my_table where "
+ "date_prt >= '{{yesterday_ds}}'",
+ '--output': 'myoutputtable'
}
}
@@ -111,17 +207,17 @@ class TestSparkTask(TestCase):
'--class',
'org.apache.liminal.MySparkApp',
'--conf',
- 'spark.driver.memory=1g',
- '--conf',
'spark.driver.maxResultSize=1g',
'--conf',
+ 'spark.driver.memory=1g',
+ '--conf',
'spark.yarn.executor.memoryOverhead=500M',
'my_app.py',
'--query',
- 'select * from dlk_visitor_funnel_dwh_staging.fact_events where '
- "unified_Date_prt >= '{{yesterday_ds}}'",
+ 'select * from my_table where '
+ "date_prt >= '{{yesterday_ds}}'",
'--output',
- 'mytable'].sort()
+ 'myoutputtable']
actual = SparkTask(
'my_spark_task',
@@ -129,8 +225,8 @@ class TestSparkTask(TestCase):
[],
trigger_rule='all_success',
liminal_config={},
- pipeline_config={},
+ pipeline_config={'pipeline': 'pipeline'},
task_config=task_config
).get_runnable_command()
- self.assertEqual(actual.sort(), expected)
+ self.assertEqual(actual.sort(), expected.sort())
diff --git a/tests/runners/apps/test_app/extra/liminal.yml b/tests/runners/apps/test_app/extra/liminal.yml
index 546d16a..3bf8489 100644
--- a/tests/runners/apps/test_app/extra/liminal.yml
+++ b/tests/runners/apps/test_app/extra/liminal.yml
@@ -47,6 +47,7 @@ pipeline_defaults:
description: optional sub tasks
- task: my_task_output_input_task
type: python
+ no_cache: True
description: task with input from other task's output
image: my_task_output_input_task_image
env_vars:
diff --git a/.gitignore b/tests/runners/apps/test_spark_app/__init__.py
similarity index 85%
copy from .gitignore
copy to tests/runners/apps/test_spark_app/__init__.py
index fcf8bcd..217e5db 100644
--- a/.gitignore
+++ b/tests/runners/apps/test_spark_app/__init__.py
@@ -15,18 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
diff --git a/liminal/core/config/defaults/base/liminal.yml b/tests/runners/apps/test_spark_app/liminal.yml
similarity index 61%
copy from liminal/core/config/defaults/base/liminal.yml
copy to tests/runners/apps/test_spark_app/liminal.yml
index 729ba22..ff3071a 100644
--- a/liminal/core/config/defaults/base/liminal.yml
+++ b/tests/runners/apps/test_spark_app/liminal.yml
@@ -16,22 +16,24 @@
# specific language governing permissions and limitations
# under the License.
---
-name: base
-type: super
-executors:
- - executor: default_k8s
- type: kubernetes
-service_defaults:
- description: add defaults parameters for all services
-task_defaults:
- description: add defaults parameters for all tasks separate by task type
- python:
- executor: default_k8s
-pipeline_defaults:
- description: add defaults parameters for all pipelines
- before_tasks:
- - task: start
- type: job_start
- after_tasks:
- - task: end
- type: job_end
\ No newline at end of file
+name: MyPipeline
+images:
+ - image: my_spark_image
+ type: spark
+ source: wordcount
+ no_cache: True
+pipelines:
+ - pipeline: my_pipeline
+ owner: Bosco Albert Baracus
+ start_date: 1970-01-01
+ timeout_minutes: 45
+ schedule: 0 * 1 * *
+ tasks:
+ - task: my_test_spark_task
+ type: spark
+ description: spark task on k8s
+ image: my_spark_image
+ executors: 2
+ application_source: wordcount.py
+ application_arguments:
+ - "words.txt"
diff --git a/.gitignore b/tests/runners/apps/test_spark_app/wordcount/__init__.py
similarity index 85%
copy from .gitignore
copy to tests/runners/apps/test_spark_app/wordcount/__init__.py
index fcf8bcd..217e5db 100644
--- a/.gitignore
+++ b/tests/runners/apps/test_spark_app/wordcount/__init__.py
@@ -15,18 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
diff --git a/.gitignore b/tests/runners/apps/test_spark_app/wordcount/requirements.txt
similarity index 85%
copy from .gitignore
copy to tests/runners/apps/test_spark_app/wordcount/requirements.txt
index fcf8bcd..b73ac5f 100644
--- a/.gitignore
+++ b/tests/runners/apps/test_spark_app/wordcount/requirements.txt
@@ -16,17 +16,4 @@
# specific language governing permissions and limitations
# under the License.
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
+pyyaml==5.3.1
diff --git a/tests/runners/apps/test_spark_app/wordcount/wordcount.py b/tests/runners/apps/test_spark_app/wordcount/wordcount.py
new file mode 100644
index 0000000..b7537a3
--- /dev/null
+++ b/tests/runners/apps/test_spark_app/wordcount/wordcount.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+from operator import add
+
+from pyspark.sql import SparkSession
+import yaml
+
+if __name__ == "__main__":
+ if len(sys.argv) != 3:
+ print("Usage: wordcount <file> output <outputlocation>", file=sys.stderr)
+ sys.exit(-1)
+
+ spark = SparkSession \
+ .builder \
+ .appName("PythonWordCount") \
+ .getOrCreate()
+
+ lines = spark.read.text(sys.argv[1]).rdd.filter(lambda x: not x[0].startswith('#')).map(
+ lambda r: r[0])
+ counts = lines.flatMap(lambda x: x.split(' ')) \
+ .map(lambda x: (x, 1)) \
+ .reduceByKey(add)
+ output = counts.collect()
+ for (word, count) in output:
+ print("%s: %i" % (word, count))
+
+ print("writing the results to {}".format(sys.argv[2]))
+ counts.toDF(["word", "count"]).coalesce(1).write.mode("overwrite").json(sys.argv[2])
+
+ spark.stop()
diff --git a/.gitignore b/tests/runners/apps/test_spark_app/wordcount/words.txt
similarity index 85%
copy from .gitignore
copy to tests/runners/apps/test_spark_app/wordcount/words.txt
index fcf8bcd..0a4b90b 100644
--- a/.gitignore
+++ b/tests/runners/apps/test_spark_app/wordcount/words.txt
@@ -15,18 +15,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-.idea
-bin
-include
-lib
-venv
-.Python
-*.pyc
-pip-selfcheck.json
-.DS_Store
-/build
-apache_liminal.egg-info
-scripts/*.tar.gz
-scripts/*.whl
-dist
+my first liminal spark task
\ No newline at end of file
diff --git a/tests/test_licenses.py b/tests/test_licenses.py
index 76fef79..b48d731 100644
--- a/tests/test_licenses.py
+++ b/tests/test_licenses.py
@@ -22,8 +22,8 @@ from unittest import TestCase
from termcolor import colored
-EXCLUDED_EXTENSIONS = ['.gif', '.png', '.pyc', 'LICENSE', 'DISCLAIMER', 'NOTICE']
-EXCLUDED_DIRS = ['docs/build', '.git', '.idea']
+EXCLUDED_EXTENSIONS = ['.gif', '.png', '.pyc', 'LICENSE', 'DISCLAIMER', 'NOTICE', '.whl']
+EXCLUDED_DIRS = ['docs/build', '.git', '.idea', 'venv', 'apache_liminal.egg-info']
PYTHON_LICENSE_HEADER = """
#