You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@liminal.apache.org by as...@apache.org on 2022/12/08 12:32:04 UTC
[incubator-liminal] branch master updated: Sagemaker example (#90)
This is an automated email from the ASF dual-hosted git repository.
assafpinhasi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
The following commit(s) were added to refs/heads/master by this push:
new 4979c3d Sagemaker example (#90)
4979c3d is described below
commit 4979c3d000dfe5b9e7c60c2cdbe447bcff30b053
Author: Lidor Ettinger <li...@gmail.com>
AuthorDate: Thu Dec 8 14:31:59 2022 +0200
Sagemaker example (#90)
* sagemaker example
---
.github/workflows/pre_commits.yml | 2 +-
.github/workflows/{unittest.yml => unittests.yml} | 23 +++-
docs/liminal/kubernetes/secret_util.md | 11 +-
examples/sagemaker_example/README.md | 51 ++++++++
examples/sagemaker_example/archetype/liminal.yaml | 57 +++++++++
.../sagemaker_example/data_preparation/__init__.py | 36 +-----
.../data_preparation/data_preparation.py | 77 ++++++++++++
.../data_preparation/data_uploader.py | 82 ++++++++++++
.../sagemaker_example/data_train/__init__.py | 36 +-----
examples/sagemaker_example/data_train/train.py | 94 ++++++++++++++
examples/sagemaker_example/inference.py | 76 +++++++++++
examples/sagemaker_example/liminal.yaml | 43 +++++++
examples/sagemaker_example/liminal_sm.py | 81 ++++++++++++
.../sagemaker_example/requirements.txt | 39 +-----
examples/sagemaker_example/sm_ops.py | 140 +++++++++++++++++++++
liminal/build/image/python/Dockerfile | 6 +-
liminal/kubernetes/secret_util.py | 5 +
liminal/runners/airflow/executors/kubernetes.py | 12 +-
scripts/docker-compose.yml | 1 +
scripts/liminal | 4 +-
tests/runners/airflow/liminal/liminal.yml | 2 +-
tests/runners/airflow/tasks/test_python.py | 2 +-
yamllint-config.yml | 2 +-
23 files changed, 754 insertions(+), 128 deletions(-)
diff --git a/.github/workflows/pre_commits.yml b/.github/workflows/pre_commits.yml
index 8a18a15..2049626 100644
--- a/.github/workflows/pre_commits.yml
+++ b/.github/workflows/pre_commits.yml
@@ -13,7 +13,7 @@ on:
jobs:
linting:
name: Run pre-commit hooks on py3.6
- runs-on: ubuntu-latest
+ runs-on: ubuntu-20.04
steps:
#----------------------------------------------
# Checkout, SetUp Python, Load Cache and Run PreCommitChecks
diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittests.yml
similarity index 54%
rename from .github/workflows/unittest.yml
rename to .github/workflows/unittests.yml
index d2f05a2..742e11f 100644
--- a/.github/workflows/unittest.yml
+++ b/.github/workflows/unittests.yml
@@ -15,16 +15,16 @@ on:
jobs:
unittest:
- runs-on: ubuntu-latest
- timeout-minutes: 10
+ runs-on: ubuntu-20.04
+ timeout-minutes: 20
steps:
- name: Checkout
uses: actions/checkout@v1
- name: Setup Minikube
- uses: manusa/actions-setup-minikube@v2.3.0
+ uses: manusa/actions-setup-minikube@v2.7.1
with:
- minikube version: v1.16.0
- kubernetes version: v1.19.2
+ minikube version: 'v1.26.1'
+ kubernetes version: 'v1.25.0'
github token: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Python
uses: actions/setup-python@v1
@@ -34,5 +34,16 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- - name: Run unittest
+ - name: Run unittest - runners
run: ./run_tests.sh
+ approve:
+ needs: unittest
+ runs-on: ubuntu-20.04
+
+ steps:
+ - run: | # approve the pull request
+ curl --request POST \
+ --url https://api.github.com/repos/${{github.repository}}/pulls/${{github.event.number}}/reviews \
+ --header 'authorization: Bearer ${{ secrets.GITHUB_TOKEN }}' \
+ --header 'content-type: application/json' \
+ -d '{"event":"APPROVE"}'
diff --git a/docs/liminal/kubernetes/secret_util.md b/docs/liminal/kubernetes/secret_util.md
index 2bb44f3..cf46531 100644
--- a/docs/liminal/kubernetes/secret_util.md
+++ b/docs/liminal/kubernetes/secret_util.md
@@ -27,7 +27,6 @@ liminal.yml file.
name: k8s_secret_example
secrets:
- secret: aws
- remote_path: "/secrets"
local_path_file: "~/.aws/credentials"
executors:
- executor: k8s
@@ -40,14 +39,10 @@ task_defaults:
image: amazon/aws-cli:2.7.23
executors: 2
env_vars:
- AWS_CONFIG_FILE: "/secrets/credentials"
- AWS_PROFILE: "default"
- secrets:
+ AWS_CONFIG_FILE: "/secret/credentials"
+ secret_mount:
- secret: aws
- mounts:
- - mount: myaws-creds
- volume: aws
- path: /mnt/
+ remote_path: "/secret"
pipelines:
- pipeline: k8s_secret_example
owner: Bosco Albert Baracus
diff --git a/examples/sagemaker_example/README.md b/examples/sagemaker_example/README.md
new file mode 100644
index 0000000..72e7a78
--- /dev/null
+++ b/examples/sagemaker_example/README.md
@@ -0,0 +1,51 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# sageMaker
+
+Enable access to AWS STS AssumeRole:
+
+Go yo IAM->Role of your user and add under Trust Relationships
+```
+{
+ "Version": "2012-10-17",
+ "Statement": [
+ {
+ "Effect": "Allow",
+ "Principal": {
+ "Federated": "arn:aws:iam::<ACCOUNT_ID>:saml-provider/Okta"
+ },
+ "Action": "sts:AssumeRoleWithSAML",
+ "Condition": {
+ "StringEquals": {
+ "SAML:aud": "https://signin.aws.amazon.com/saml"
+ }
+ }
+ },
+ {
+ "Sid": "",
+ "Effect": "Allow",
+ "Principal": {
+ "Service": "sagemaker.amazonaws.com"
+ },
+ "Action": "sts:AssumeRole"
+ }
+ ]
+}
+```
diff --git a/examples/sagemaker_example/archetype/liminal.yaml b/examples/sagemaker_example/archetype/liminal.yaml
new file mode 100644
index 0000000..48e4b42
--- /dev/null
+++ b/examples/sagemaker_example/archetype/liminal.yaml
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+
+# superliminal for local development
+name: InfraSageMaker
+owner: Bosco Albert Baracus
+type: super
+secrets:
+ - secret: aws
+ local_path_file: "~/.aws/credentials"
+executors:
+ - executor: k8s
+ type: kubernetes
+variables:
+ output_root_dir: "/mnt/smvolume"
+ input_root_dir: ''
+images:
+ - image: my_sm_image
+ source: .
+ type: python
+ no_cache: false
+task_defaults:
+ python:
+ executor: k8s
+ image: my_sm_image
+ application_source: '{{application}}'
+ env_vars:
+ AWS_CONFIG_FILE: "/secret/credentials"
+ COMM_PATH: "{{output_root_dir}}"
+ secrets:
+ - secret: aws
+ remote_path: "/secret"
+ mounts:
+ - mount: mymount
+ volume: smvolume
+ path: /mnt/smvolume
+volumes:
+ - volume: smvolume
+ claim_name: smvolume-pvc
+ local:
+ path: .
diff --git a/yamllint-config.yml b/examples/sagemaker_example/data_preparation/__init__.py
similarity index 58%
copy from yamllint-config.yml
copy to examples/sagemaker_example/data_preparation/__init__.py
index ea52231..217e5db 100644
--- a/yamllint-config.yml
+++ b/examples/sagemaker_example/data_preparation/__init__.py
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,38 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
----
-# https://yamllint.readthedocs.io/en/stable/
-extends: default
-
-locale: en_US.UTF-8
-
-yaml-files:
- - '*.yaml'
- - '*.yml'
- - .yamllint
-
-rules:
- line-length:
- max: 110
- # level: warning
- brackets: enable
- colons: enable
- commas: enable
- empty-lines: enable
- empty-values: disable
- key-duplicates: disable
- hyphens: enable
- key-ordering: disable
- new-line-at-end-of-file: enable
- new-lines: enable
- # quoted-strings: disable
- truthy:
- level: warning
- indentation:
- spaces: 2
- indent-sequences: true
-ignore: |
- docs/
- .asf.yaml
diff --git a/examples/sagemaker_example/data_preparation/data_preparation.py b/examples/sagemaker_example/data_preparation/data_preparation.py
new file mode 100644
index 0000000..80e1af7
--- /dev/null
+++ b/examples/sagemaker_example/data_preparation/data_preparation.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import os
+from pathlib import Path
+
+import pandas as pd
+from data_preparation.data_uploader import get_uploader
+from sklearn.model_selection import train_test_split
+
+TEST_CSV = "diamonds_test.csv"
+
+TRAIN_CSV = "diamonds_train.csv"
+
+LABEL_COLUMN = 'price'
+
+DATASET_PUBLIC_URL = "https://www.openml.org/data/get_csv/21792853/dataset"
+
+
+def transform(data):
+ X = data.drop(columns=LABEL_COLUMN)
+ y = data[LABEL_COLUMN]
+
+ X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
+ train = X_train.copy()
+ train[LABEL_COLUMN] = y_train
+
+ test = X_test.copy()
+ test[LABEL_COLUMN] = y_test
+ return train, test
+
+
+def extract(input_uri):
+ return pd.read_csv(input_uri)
+
+
+def load(train, test, output_uri_base, data_uploader):
+ train.to_csv(TRAIN_CSV)
+ test.to_csv(TEST_CSV)
+ train_path = data_uploader.upload(TRAIN_CSV, os.path.join(output_uri_base, "train"))
+ test_path = data_uploader.upload(TEST_CSV, os.path.join(output_uri_base, "test"))
+ return train_path, test_path
+
+
+def data_pipeline(input_uri, output_uri_base, data_uploader):
+ data = extract(input_uri)
+ train, test = transform(data)
+ return load(train, test, output_uri_base, data_uploader)
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--input_uri", default=DATASET_PUBLIC_URL)
+ parser.add_argument(
+ "--output_uri_base",
+ default=f"file://{Path(__file__).parent.parent.absolute().joinpath('data')}",
+ help="a uri starting with 's3', 'file' or a relative path " "which will be treated as sagemaker prefix",
+ )
+ args = parser.parse_args()
+ data_uploader = get_uploader(args.output_uri_base)
+ data_pipeline(args.input_uri, args.output_uri_base, data_uploader)
diff --git a/examples/sagemaker_example/data_preparation/data_uploader.py b/examples/sagemaker_example/data_preparation/data_uploader.py
new file mode 100644
index 0000000..12a50d1
--- /dev/null
+++ b/examples/sagemaker_example/data_preparation/data_uploader.py
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+from abc import ABC, abstractmethod
+from urllib.parse import urlparse
+
+import boto3
+import sagemaker
+
+
+def get_uploader(output_uri_base):
+ o = urlparse(output_uri_base)
+ if o.scheme == 's3':
+ return S3DataUploader()
+ elif o.scheme == "file":
+ return LocalDataUploader()
+ return SagemakerDataUploader()
+
+
+class DataUploader(ABC):
+ def __init__(self):
+ pass
+
+ @abstractmethod
+ def upload(self, local_path, output_uri_base):
+ pass
+
+
+class LocalDataUploader(DataUploader):
+ s3_client = None
+
+ def upload(self, local_path, output_uri_base):
+ o = urlparse(output_uri_base)
+ os.makedirs(o.path, exist_ok=True)
+ shutil.copy2(local_path, o.path)
+ return o.path
+
+
+class S3DataUploader(DataUploader):
+ s3_client = None
+
+ def __init__(self):
+ self.s3_client = boto3.client('s3')
+
+ def upload(self, local_path, output_uri_base):
+ o = urlparse(output_uri_base)
+ response = self.s3_client.upload_file(local_path, o.netloc, o.path)
+
+
+class SagemakerDataUploader(DataUploader):
+ sm_client = None
+ sm_session = None
+ region = None
+ default_bucket = None
+
+ def __init__(self):
+ self.sm_client = boto3.client("sagemaker")
+ self.sm_session = sagemaker.Session()
+ self.region = self.sm_session.boto_session.region_name
+ self.default_bucket = self.sm_session.default_bucket()
+
+ def upload(self, local_path, output_uri_base):
+ o = urlparse(output_uri_base)
+ bucket = o.netloc or self.default_bucket
+ return self.sm_session.upload_data(path=local_path, bucket=bucket, key_prefix=o.path)
diff --git a/yamllint-config.yml b/examples/sagemaker_example/data_train/__init__.py
similarity index 58%
copy from yamllint-config.yml
copy to examples/sagemaker_example/data_train/__init__.py
index ea52231..217e5db 100644
--- a/yamllint-config.yml
+++ b/examples/sagemaker_example/data_train/__init__.py
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,38 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
----
-# https://yamllint.readthedocs.io/en/stable/
-extends: default
-
-locale: en_US.UTF-8
-
-yaml-files:
- - '*.yaml'
- - '*.yml'
- - .yamllint
-
-rules:
- line-length:
- max: 110
- # level: warning
- brackets: enable
- colons: enable
- commas: enable
- empty-lines: enable
- empty-values: disable
- key-duplicates: disable
- hyphens: enable
- key-ordering: disable
- new-line-at-end-of-file: enable
- new-lines: enable
- # quoted-strings: disable
- truthy:
- level: warning
- indentation:
- spaces: 2
- indent-sequences: true
-ignore: |
- docs/
- .asf.yaml
diff --git a/examples/sagemaker_example/data_train/train.py b/examples/sagemaker_example/data_train/train.py
new file mode 100644
index 0000000..f665625
--- /dev/null
+++ b/examples/sagemaker_example/data_train/train.py
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import os
+from pathlib import Path
+
+import joblib
+import numpy as np
+import pandas as pd
+from data_preparation.data_preparation import LABEL_COLUMN, TEST_CSV, TRAIN_CSV
+from sklearn.compose import ColumnTransformer
+from sklearn.linear_model import LinearRegression
+from sklearn.pipeline import Pipeline
+from sklearn.preprocessing import OneHotEncoder
+
+feature_column_names = ['carat', 'cut', 'color', 'clarity', 'depth', 'table', 'x', 'y', 'z']
+
+
+categorical_cols = ['cut', 'clarity', 'color']
+
+# inference functions
+MODEL_JOBLIB_FILENAME = "model.joblib"
+
+preprocessor = ColumnTransformer(
+ transformers=[('categorical', OneHotEncoder(), categorical_cols)], remainder='passthrough'
+)
+
+
+def train(train_df, test_df, n_jobs, model_dir):
+ X_train = train_df[feature_column_names]
+ y_train = train_df[LABEL_COLUMN]
+
+ print(X_train.head(3))
+ X_test = test_df[feature_column_names]
+ y_test = test_df[LABEL_COLUMN]
+
+ # train
+
+ diamond_price_model = LinearRegression(n_jobs=n_jobs)
+
+ my_pipeline = Pipeline(steps=[('preprocessor', preprocessor), ('model', diamond_price_model)])
+
+ my_pipeline.fit(X_train, y_train)
+
+ train_predictions = my_pipeline.predict(X_train)
+ print(train_predictions)
+
+ print("validating model")
+ abs_err = np.abs(my_pipeline.predict(X_test) - y_test)
+ print(f"Test prediction error:{abs_err} ")
+
+ # persist model
+ path = os.path.join(model_dir, MODEL_JOBLIB_FILENAME)
+ joblib.dump(my_pipeline, path)
+ print("model persisted at " + path)
+ return path
+
+
+if __name__ == '__main__':
+ data_path = Path(__file__).parent.parent.absolute().joinpath('data')
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--model-dir', type=str, default=os.getenv('SM_MODEL_DIR', f"file://{data_path}"))
+ parser.add_argument('--n-jobs', default=2)
+ parser.add_argument(
+ "--train", type=str, default=os.getenv("SM_CHANNEL_TRAIN", f"file://{data_path.joinpath('train')}")
+ )
+ parser.add_argument(
+ "--test", type=str, default=os.getenv("SM_CHANNEL_TEST", f"file://{data_path.joinpath('test')}")
+ )
+ parser.add_argument("--train-file", type=str, default=TRAIN_CSV)
+ parser.add_argument("--test-file", type=str, default=TEST_CSV)
+
+ args = parser.parse_args()
+
+ train_df = pd.read_csv(os.path.join(args.train, args.train_file))
+ test_df = pd.read_csv(os.path.join(args.test, args.test_file))
+
+ train(train_df=train_df, test_df=test_df, n_jobs=args.n_jobs, model_dir=args.model_dir)
diff --git a/examples/sagemaker_example/inference.py b/examples/sagemaker_example/inference.py
new file mode 100644
index 0000000..2244598
--- /dev/null
+++ b/examples/sagemaker_example/inference.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import os
+from io import BytesIO
+
+import joblib
+import numpy as np
+import pandas as pd
+
+feature_column_names = ['carat', 'cut', 'color', 'clarity', 'depth', 'table', 'x', 'y', 'z']
+
+
+categorical_cols = ['cut', 'clarity', 'color']
+
+MODEL_JOBLIB_FILENAME = "model.joblib"
+
+
+def model_fn(model_dir):
+ clf = joblib.load(os.path.join(model_dir, MODEL_JOBLIB_FILENAME))
+ return clf
+
+
+def input_fn(input_data, content_type):
+ if content_type == "application/x-npy":
+ load_bytes = BytesIO(input_data)
+ input_np = np.load(load_bytes, allow_pickle=True)
+ df = pd.DataFrame(data=input_np, columns=feature_column_names)
+ return df
+ else:
+ raise ValueError(
+ f"content type {content_type} is not supported by this inference endpoint. Please send a legal application/x-npy payload"
+ )
+
+
+def predict_fn(input_data, model):
+ prediction = model.predict(input_data[feature_column_names])
+ return prediction
+
+
+def df_to_inference_input():
+ X_train = df[feature_column_names]
+ rows = X_train.head(10)
+ inference_input = rows.to_numpy()
+ np_bytes = BytesIO()
+ np.save(np_bytes, inference_input, allow_pickle=True)
+ input_data = input_fn(np_bytes.getvalue(), "application/x-npy")
+ return input_data
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--model-dir", type=str, default=os.getenv("SM_MODEL_DIR", "../data"))
+ parser.add_argument("--data-path", type=str, default=f"../data/test/diamonds_test.csv")
+ args = parser.parse_args()
+ model = model_fn(args.model_dir)
+ df = pd.read_csv(args.data_path)
+ input_data = df_to_inference_input()
+ predictions = predict_fn(input_data, model)
+ print(predictions)
diff --git a/examples/sagemaker_example/liminal.yaml b/examples/sagemaker_example/liminal.yaml
new file mode 100644
index 0000000..9ea8adb
--- /dev/null
+++ b/examples/sagemaker_example/liminal.yaml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+name: SageMakerExample
+super: InfraSageMaker
+owner: Bosco Albert Baracus
+pipelines:
+ - pipeline: sagemaker_diamonds_train_and_inference
+ start_date: 1970-01-01
+ timeout_minutes: 45
+ schedule: 0 * 1 * *
+ tasks:
+ - task: data_preprocessing
+ type: python
+ description: prepare the data for training
+ cmd: python -u liminal_sm.py --action data_prep --output_uri_base liminal-sm-example/data
+ - task: train
+ type: python
+ description: train model
+ cmd: python -u liminal_sm.py --action train --base_job_name liminal-base-training-job --train_instance_type ml.m5.large --n_jobs 5
+ - task: deploy
+ type: python
+ description: Deploy model
+ cmd: python -u liminal_sm.py --action deploy --model_name liminal-sm-diamonds-model --deploy_instance_type ml.m5.large
+ - task: validate
+ type: python
+ description: validate model post deployment
+ cmd: python -u liminal_sm.py --action validate
diff --git a/examples/sagemaker_example/liminal_sm.py b/examples/sagemaker_example/liminal_sm.py
new file mode 100644
index 0000000..a02719f
--- /dev/null
+++ b/examples/sagemaker_example/liminal_sm.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import tempfile
+
+from sagemaker.deserializers import NumpyDeserializer
+from sagemaker.predictor import Predictor
+from sagemaker.serializers import NumpySerializer
+from sm_ops import create_sm_args_parser, sm_data_prep, sm_deploy, sm_train, sm_validate
+
+COMM_PATH = os.getenv("COMM_PATH", tempfile.mkdtemp())
+os.makedirs(COMM_PATH, exist_ok=True)
+
+
+def read_message(name):
+ with open(os.path.join(COMM_PATH, name)) as f:
+ lines = f.readlines()
+ return [x.strip() for x in lines]
+
+
+def forward_message(name, lines):
+ if not isinstance(lines, list):
+ lines = [lines]
+ lines = '\n'.join(lines) + '\n'
+ with open(os.path.join(COMM_PATH, name), 'w') as f:
+ f.writelines(lines)
+
+
+if __name__ == '__main__':
+ choices = ['data_prep', 'train', 'deploy', 'validate', 'all']
+ parser = create_sm_args_parser()
+ parser.add_argument("--action", choices=choices, default='all')
+ args = parser.parse_args()
+ steps = [args.action]
+ if args.action == 'all':
+ steps = choices[:-1]
+ for step in steps:
+ if step == 'data_prep':
+ train, test = sm_data_prep(args.input_uri, args.output_uri_base)
+ forward_message('data_prep', [train, test])
+ elif step == 'train':
+ lines = read_message('data_prep')
+ train, test = lines[0], lines[1]
+ artifact = sm_train(
+ train,
+ test,
+ base_job_name=args.base_job_name,
+ instance_type=args.train_instance_type,
+ n_jobs=args.n_jobs,
+ )
+ forward_message('train', artifact)
+ elif step == 'deploy':
+ artifact = read_message('train')[0]
+ predictor = sm_deploy(
+ artifact=artifact, model_name=args.model_name, instance_type=args.deploy_instance_type
+ )
+ forward_message('deploy', predictor.endpoint)
+ else:
+ endpoint = read_message('deploy')[0]
+ train, test = read_message('data_prep')
+ predictor = Predictor(
+ endpoint_name=endpoint, serializer=NumpySerializer(), deserializer=NumpyDeserializer()
+ )
+ result = sm_validate(predictor, test)
+ print(f"avg abs error:{result}")
diff --git a/yamllint-config.yml b/examples/sagemaker_example/requirements.txt
similarity index 58%
copy from yamllint-config.yml
copy to examples/sagemaker_example/requirements.txt
index ea52231..6f82102 100644
--- a/yamllint-config.yml
+++ b/examples/sagemaker_example/requirements.txt
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,37 +16,7 @@
# specific language governing permissions and limitations
# under the License.
----
-# https://yamllint.readthedocs.io/en/stable/
-extends: default
-
-locale: en_US.UTF-8
-
-yaml-files:
- - '*.yaml'
- - '*.yml'
- - .yamllint
-
-rules:
- line-length:
- max: 110
- # level: warning
- brackets: enable
- colons: enable
- commas: enable
- empty-lines: enable
- empty-values: disable
- key-duplicates: disable
- hyphens: enable
- key-ordering: disable
- new-line-at-end-of-file: enable
- new-lines: enable
- # quoted-strings: disable
- truthy:
- level: warning
- indentation:
- spaces: 2
- indent-sequences: true
-ignore: |
- docs/
- .asf.yaml
+sagemaker==2.57.0
+scikit-learn==0.23.2
+s3fs==2022.5.0
+protobuf==3.20.*
diff --git a/examples/sagemaker_example/sm_ops.py b/examples/sagemaker_example/sm_ops.py
new file mode 100644
index 0000000..0c9b30c
--- /dev/null
+++ b/examples/sagemaker_example/sm_ops.py
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import os
+import time
+
+import boto3
+import numpy as np
+import pandas as pd
+import sagemaker
+from data_preparation.data_preparation import (
+ DATASET_PUBLIC_URL,
+ LABEL_COLUMN,
+ data_pipeline,
+)
+from data_preparation.data_uploader import SagemakerDataUploader
+from data_train.train import feature_column_names
+from sagemaker.sklearn.estimator import SKLearn
+from sagemaker.sklearn.model import SKLearnModel
+
+FRAMEWORK_VERSION = "0.23-1"
+
+sm_boto3 = boto3.client("sagemaker")
+
+
+def get_role():
+ try:
+ role = sagemaker.get_execution_role()
+ except ValueError:
+ role = os.getenv("SM_ROLE")
+ return role
+
+
+def sm_train(train_path, test_path, base_job_name="Liminal-sm-training-job", instance_type="ml.m5.large", n_jobs=1):
+ sklearn_estimator = SKLearn(
+ entry_point="data_train/train.py",
+ source_dir="./",
+ role=get_role(),
+ instance_count=1,
+ instance_type=instance_type,
+ framework_version=FRAMEWORK_VERSION,
+ base_job_name=base_job_name,
+ hyperparameters={
+ "n-jobs": n_jobs,
+ },
+ )
+ sklearn_estimator.fit({"train": train_path, "test": test_path})
+ artifact = sm_boto3.describe_training_job(TrainingJobName=sklearn_estimator.latest_training_job.name)[
+ "ModelArtifacts"
+ ]["S3ModelArtifacts"]
+
+ print("Model artifact persisted at " + artifact)
+ return artifact
+
+
+def sm_deploy(artifact, model_name="Liminal-sm-demo-model", instance_type="ml.m5.large"):
+
+ timestamp = time.strftime("-%Y-%m-%d-%H-%M-%S", time.gmtime())
+ model = SKLearnModel(
+ name=f"{model_name}-{timestamp}",
+ entry_point="inference.py",
+ model_data=artifact,
+ role=get_role(),
+ framework_version=FRAMEWORK_VERSION,
+ )
+
+ predictor = model.deploy(instance_type=instance_type, initial_instance_count=1, wait=True)
+ return predictor
+
+
+def sm_validate(predictor, test_path):
+ if not test_path.startswith("s3"):
+ test_path = f"s3://{test_path}"
+
+ df = pd.read_csv(test_path)
+ X_test_inference = df[feature_column_names].to_numpy()
+ predictions = predictor.predict(data=X_test_inference)
+ print("validating model")
+ avg_err = np.average(np.abs(predictions - df[LABEL_COLUMN]))
+ print(f"average abs error:{avg_err}")
+ print(predictions)
+ return avg_err
+
+
+def sm_data_prep(input_uri, output_uri_base):
+ return data_pipeline(
+ input_uri=input_uri or DATASET_PUBLIC_URL,
+ output_uri_base=output_uri_base,
+ data_uploader=SagemakerDataUploader(),
+ )
+
+
+def sm_main(args):
+ test_path = 'sagemaker-us-east-1-468326661668/liminal-sm-example/test/diamonds_test.csv'
+ train_path, test_path = sm_data_prep(args.input_uri, args.output_uri_base)
+ artifact = sm_train(
+ train_path,
+ test_path,
+ base_job_name=args.base_job_name,
+ instance_type=args.train_instance_type,
+ n_jobs=args.n_jobs,
+ )
+ # artifact = 's3://sagemaker-us-east-1-468326661668/Liminal-sm-training-job-2022-06-06-14-30-39-827/output/model.tar.gz'
+ predictor = sm_deploy(artifact, model_name=args.model_name, instance_type=args.deploy_instance_type)
+ # predictor = Predictor(endpoint_name="Liminal-sm-demo-model-2022-06-06-14-35-48-079", serializer=NumpySerializer(), deserializer=NumpyDeserializer())
+ sm_validate(predictor, test_path)
+
+
+def create_sm_args_parser():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--input_uri", default=DATASET_PUBLIC_URL)
+ parser.add_argument("--output_uri_base", default="liminal-sm-example")
+ parser.add_argument("--base_job_name", default="Liminal-sm-training-job")
+ parser.add_argument("--train_instance_type", default="ml.m5.large")
+ parser.add_argument("--n_jobs", default=1)
+ parser.add_argument("--model_name", default="Liminal-sm-demo-model")
+ parser.add_argument("--deploy_instance_type", default="ml.m5.large")
+ return parser
+
+
+if __name__ == '__main__':
+ parser = create_sm_args_parser()
+ args = parser.parse_args()
+ sm_main(args)
diff --git a/liminal/build/image/python/Dockerfile b/liminal/build/image/python/Dockerfile
index 67d8462..9d0be8f 100644
--- a/liminal/build/image/python/Dockerfile
+++ b/liminal/build/image/python/Dockerfile
@@ -20,7 +20,9 @@
FROM {{python}}
# Install aptitude build-essential
-#RUN apt-get install -y --reinstall build-essential
+RUN apt-get update && apt-get install -y --reinstall build-essential \
+ make \
+ gcc
# Set the working directory to /app
WORKDIR /app
@@ -32,7 +34,7 @@ WORKDIR /app
COPY ./requirements.txt /app/
# mount the secret in the correct location, then run pip install
-RUN pip install --upgrade pip
+RUN python -m pip install --upgrade pip
RUN {{mount}} pip install -r requirements.txt
# Copy the current directory contents into the container at /app
diff --git a/liminal/kubernetes/secret_util.py b/liminal/kubernetes/secret_util.py
index e5bccc8..f9e4c69 100644
--- a/liminal/kubernetes/secret_util.py
+++ b/liminal/kubernetes/secret_util.py
@@ -63,6 +63,11 @@ def create_secret(conf, namespace='default') -> None:
_LOG.info(f'Requested secret {name}')
if name not in _LOCAL_VOLUMES:
+ matching_secrets = _kubernetes.list_namespaced_secret(
+ namespace, field_selector=f'metadata.name={name}'
+ ).to_dict()['items']
+
+ if len(matching_secrets) == 0:
_create_secret(namespace, conf, name)
sleep(5)
diff --git a/liminal/runners/airflow/executors/kubernetes.py b/liminal/runners/airflow/executors/kubernetes.py
index 409e603..b69babe 100644
--- a/liminal/runners/airflow/executors/kubernetes.py
+++ b/liminal/runners/airflow/executors/kubernetes.py
@@ -85,9 +85,6 @@ class KubernetesPodExecutor(executor.Executor):
def __kubernetes_kwargs(self, task: ContainerTask):
config = copy.deepcopy(self.executor_config)
- for secret in task.secrets:
- result = next(x for x in self.liminal_config['secrets'] if x['secret'] == secret['secret'])
- task.mounts.append({'volume': result['secret'], 'path': result['remote_path']})
kubernetes_kwargs = {
'task_id': task.task_id,
@@ -115,6 +112,15 @@ class KubernetesPodExecutor(executor.Executor):
read_only=mount.get('read_only', False),
)
for mount in task.mounts
+ ]
+ + [
+ V1VolumeMount(
+ name=secret['secret'],
+ mount_path=secret['remote_path'],
+ sub_path=secret.get('sub_path'),
+ read_only=secret.get('read_only', False),
+ )
+ for secret in task.secrets
],
}
diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml
index d86ef2e..e61396a 100644
--- a/scripts/docker-compose.yml
+++ b/scripts/docker-compose.yml
@@ -28,6 +28,7 @@ x-airflow-common: &airflow-common
LOAD_EX: n
AIRFLOW__CORE__EXECUTOR: LocalExecutor
KUBECONFIG: /home/airflow/kube/config
+ AWS_DEFAULT_REGION: "${AWS_DEFAULT_REGION:-us-east-1}"
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__WEBSERVER__WORKERS: '1'
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'True'
diff --git a/scripts/liminal b/scripts/liminal
index 897796c..51354f9 100755
--- a/scripts/liminal
+++ b/scripts/liminal
@@ -159,12 +159,13 @@ def deploy_liminal_apps(path, clean):
pathlib.Path(target_yml_name).parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(config_file, target_yml_name)
configs_path = ConfigUtil(os.path.dirname(config_file)).safe_load(is_render_variables=True,
- soft_merge=True)
+ soft_merge=True)
for config in configs_path:
if config.get('secrets'):
logging.info(f'Secret volume is being created')
secret_util.create_local_secrets(config)
+
def is_file_empty(file):
try:
with open(file, "r") as stream:
@@ -173,6 +174,7 @@ def is_file_empty(file):
print(exc)
return data_loaded is None
+
def liminal_is_running():
stdout, stderr = docker_compose_command('ps', [])
return "liminal" in stdout
diff --git a/tests/runners/airflow/liminal/liminal.yml b/tests/runners/airflow/liminal/liminal.yml
index a54a7bd..170701b 100644
--- a/tests/runners/airflow/liminal/liminal.yml
+++ b/tests/runners/airflow/liminal/liminal.yml
@@ -23,7 +23,6 @@ volumes:
path: /tmp/liminal_tests
secrets:
- secret: aws
- remote_path: "/mnt"
local_path_file: "~/.aws/credentials"
images:
- image: my_python_task_img
@@ -68,6 +67,7 @@ pipelines:
AWS_PROFILE: "dev"
secrets:
- secret: aws
+ remote_path: "/mnt"
mounts:
- mount: mymount
volume: myvol1
diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py
index bb9dd2a..019b191 100644
--- a/tests/runners/airflow/tasks/test_python.py
+++ b/tests/runners/airflow/tasks/test_python.py
@@ -148,7 +148,7 @@ class TestPythonTask(TestCase):
'image': image,
'env_vars': env_vars if env_vars is not None else {},
'mounts': [{'mount': 'mymount', 'volume': self._VOLUME_NAME, 'path': '/mnt/vol1'}],
- 'secrets': [{'secret': self._SECRET_NAME}],
+ 'secrets': [{'secret': self._SECRET_NAME, 'remote_path': '/mnt'}],
}
if executors:
diff --git a/yamllint-config.yml b/yamllint-config.yml
index ea52231..d93684d 100644
--- a/yamllint-config.yml
+++ b/yamllint-config.yml
@@ -28,7 +28,7 @@ yaml-files:
rules:
line-length:
- max: 110
+ max: 140
# level: warning
brackets: enable
colons: enable