You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/16 19:56:10 UTC

[airflow] branch main updated: Sagemaker System Tests - Part 2 of 3 - example_sagemaker.py (#25079)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d872edacfe Sagemaker System Tests - Part 2 of 3 - example_sagemaker.py (#25079)
d872edacfe is described below

commit d872edacfe3cec65a9179eff52bf219c12361fef
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Sat Jul 16 19:55:59 2022 +0000

    Sagemaker System Tests - Part 2 of 3 - example_sagemaker.py (#25079)
---
 .../amazon/aws/example_dags/example_sagemaker.py   | 467 ------------------
 .../operators/sagemaker.rst                        |  18 +-
 docs/spelling_wordlist.txt                         |   1 +
 .../providers/amazon/aws/example_sagemaker.py      | 535 +++++++++++++++++++++
 .../system/providers/amazon/aws/utils/__init__.py  |  28 +-
 5 files changed, 572 insertions(+), 477 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_sagemaker.py b/airflow/providers/amazon/aws/example_dags/example_sagemaker.py
deleted file mode 100644
index 41e7666eab..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_sagemaker.py
+++ /dev/null
@@ -1,467 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import base64
-import os
-import subprocess
-from datetime import datetime
-from tempfile import NamedTemporaryFile
-
-import boto3
-
-from airflow import DAG
-from airflow.decorators import task
-from airflow.providers.amazon.aws.hooks.s3 import S3Hook
-from airflow.providers.amazon.aws.operators.sagemaker import (
-    SageMakerDeleteModelOperator,
-    SageMakerModelOperator,
-    SageMakerProcessingOperator,
-    SageMakerTrainingOperator,
-    SageMakerTransformOperator,
-    SageMakerTuningOperator,
-)
-from airflow.providers.amazon.aws.sensors.sagemaker import (
-    SageMakerTrainingSensor,
-    SageMakerTransformSensor,
-    SageMakerTuningSensor,
-)
-
-# Project name will be used in naming the S3 buckets and various tasks.
-# The dataset used in this example is identifying varieties of the Iris flower.
-PROJECT_NAME = 'iris'
-TIMESTAMP = '{{ ts_nodash }}'
-
-S3_BUCKET = os.getenv('S3_BUCKET', 'S3_bucket')
-RAW_DATA_S3_KEY = f'{PROJECT_NAME}/preprocessing/input.csv'
-INPUT_DATA_S3_KEY = f'{PROJECT_NAME}/processed-input-data'
-TRAINING_OUTPUT_S3_KEY = f'{PROJECT_NAME}/results'
-PREDICTION_OUTPUT_S3_KEY = f'{PROJECT_NAME}/transform'
-
-PROCESSING_LOCAL_INPUT_PATH = '/opt/ml/processing/input'
-PROCESSING_LOCAL_OUTPUT_PATH = '/opt/ml/processing/output'
-
-MODEL_NAME = f'{PROJECT_NAME}-KNN-model'
-# Job names can't be reused, so appending a timestamp ensures it is unique.
-PROCESSING_JOB_NAME = f'{PROJECT_NAME}-processing-{TIMESTAMP}'
-TRAINING_JOB_NAME = f'{PROJECT_NAME}-train-{TIMESTAMP}'
-TRANSFORM_JOB_NAME = f'{PROJECT_NAME}-transform-{TIMESTAMP}'
-TUNING_JOB_NAME = f'{PROJECT_NAME}-tune-{TIMESTAMP}'
-
-ROLE_ARN = os.getenv(
-    'SAGEMAKER_ROLE_ARN',
-    'arn:aws:iam::1234567890:role/service-role/AmazonSageMaker-ExecutionRole',
-)
-ECR_REPOSITORY = os.getenv('ECR_REPOSITORY', '1234567890.dkr.ecr.us-west-2.amazonaws.com/process_data')
-REGION = ECR_REPOSITORY.split('.')[3]
-
-# For this example we are using a subset of Fischer's Iris Data Set.
-# The full dataset can be found at UC Irvine's machine learning repository:
-# https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
-DATASET = """
-        5.1,3.5,1.4,0.2,Iris-setosa
-        4.9,3.0,1.4,0.2,Iris-setosa
-        7.0,3.2,4.7,1.4,Iris-versicolor
-        6.4,3.2,4.5,1.5,Iris-versicolor
-        4.9,2.5,4.5,1.7,Iris-virginica
-        7.3,2.9,6.3,1.8,Iris-virginica
-        """
-SAMPLE_SIZE = DATASET.count('\n') - 1
-
-# The URI of an Amazon-provided docker image for handling KNN model training.  This is a public ECR
-# repo cited in public SageMaker documentation, so the account number does not need to be redacted.
-# For more info see: https://docs.aws.amazon.com/sagemaker/latest/dg/ecr-us-west-2.html#knn-us-west-2.title
-KNN_IMAGE_URI = '174872318107.dkr.ecr.us-west-2.amazonaws.com/knn'
-
-TASK_TIMEOUT = {'MaxRuntimeInSeconds': 6 * 60}
-
-RESOURCE_CONFIG = {
-    'InstanceCount': 1,
-    'InstanceType': 'ml.m5.large',
-    'VolumeSizeInGB': 1,
-}
-
-TRAINING_DATA_SOURCE = {
-    'CompressionType': 'None',
-    'ContentType': 'text/csv',
-    'DataSource': {  # type: ignore
-        'S3DataSource': {
-            'S3DataDistributionType': 'FullyReplicated',
-            'S3DataType': 'S3Prefix',
-            'S3Uri': f's3://{S3_BUCKET}/{INPUT_DATA_S3_KEY}/train.csv',
-        }
-    },
-}
-
-# Define configs for processing, training, model creation, and batch transform jobs
-SAGEMAKER_PROCESSING_JOB_CONFIG = {
-    'ProcessingJobName': PROCESSING_JOB_NAME,
-    'RoleArn': f'{ROLE_ARN}',
-    'ProcessingInputs': [
-        {
-            'InputName': 'input',
-            'AppManaged': False,
-            'S3Input': {
-                'S3Uri': f's3://{S3_BUCKET}/{RAW_DATA_S3_KEY}',
-                'LocalPath': PROCESSING_LOCAL_INPUT_PATH,
-                'S3DataType': 'S3Prefix',
-                'S3InputMode': 'File',
-                'S3DataDistributionType': 'FullyReplicated',
-                'S3CompressionType': 'None',
-            },
-        },
-    ],
-    'ProcessingOutputConfig': {
-        'Outputs': [
-            {
-                'OutputName': 'output',
-                'S3Output': {
-                    'S3Uri': f's3://{S3_BUCKET}/{INPUT_DATA_S3_KEY}',
-                    'LocalPath': PROCESSING_LOCAL_OUTPUT_PATH,
-                    'S3UploadMode': 'EndOfJob',
-                },
-                'AppManaged': False,
-            }
-        ]
-    },
-    'ProcessingResources': {
-        'ClusterConfig': RESOURCE_CONFIG,
-    },
-    'StoppingCondition': TASK_TIMEOUT,
-    'AppSpecification': {
-        'ImageUri': ECR_REPOSITORY,
-    },
-}
-
-TRAINING_CONFIG = {
-    'TrainingJobName': TRAINING_JOB_NAME,
-    'RoleArn': ROLE_ARN,
-    'AlgorithmSpecification': {
-        "TrainingImage": KNN_IMAGE_URI,
-        "TrainingInputMode": "File",
-    },
-    'HyperParameters': {
-        'predictor_type': 'classifier',
-        'feature_dim': '4',
-        'k': '3',
-        'sample_size': str(SAMPLE_SIZE),
-    },
-    'InputDataConfig': [
-        {
-            'ChannelName': 'train',
-            **TRAINING_DATA_SOURCE,  # type: ignore [arg-type]
-        }
-    ],
-    'OutputDataConfig': {'S3OutputPath': f's3://{S3_BUCKET}/{TRAINING_OUTPUT_S3_KEY}/'},
-    'ResourceConfig': RESOURCE_CONFIG,
-    'StoppingCondition': TASK_TIMEOUT,
-}
-
-MODEL_CONFIG = {
-    'ModelName': MODEL_NAME,
-    'ExecutionRoleArn': ROLE_ARN,
-    'PrimaryContainer': {
-        'Mode': 'SingleModel',
-        'Image': KNN_IMAGE_URI,
-        'ModelDataUrl': f's3://{S3_BUCKET}/{TRAINING_OUTPUT_S3_KEY}/{TRAINING_JOB_NAME}/output/model.tar.gz',
-    },
-}
-
-TRANSFORM_CONFIG = {
-    'TransformJobName': TRANSFORM_JOB_NAME,
-    'ModelName': MODEL_NAME,
-    'TransformInput': {
-        'DataSource': {
-            'S3DataSource': {
-                'S3DataType': 'S3Prefix',
-                'S3Uri': f's3://{S3_BUCKET}/{INPUT_DATA_S3_KEY}/test.csv',
-            }
-        },
-        'SplitType': 'Line',
-        'ContentType': 'text/csv',
-    },
-    'TransformOutput': {'S3OutputPath': f's3://{S3_BUCKET}/{PREDICTION_OUTPUT_S3_KEY}'},
-    'TransformResources': {
-        'InstanceCount': 1,
-        'InstanceType': 'ml.m5.large',
-    },
-}
-
-TUNING_CONFIG = {
-    'HyperParameterTuningJobName': TUNING_JOB_NAME,
-    'HyperParameterTuningJobConfig': {
-        'Strategy': 'Bayesian',
-        'HyperParameterTuningJobObjective': {
-            'MetricName': 'test:accuracy',
-            'Type': 'Maximize',
-        },
-        'ResourceLimits': {
-            # You would bump these up in production as appropriate.
-            'MaxNumberOfTrainingJobs': 1,
-            'MaxParallelTrainingJobs': 1,
-        },
-        'ParameterRanges': {
-            'CategoricalParameterRanges': [],
-            'IntegerParameterRanges': [
-                # Set the min and max values of the hyperparameters you want to tune.
-                {
-                    'Name': 'k',
-                    'MinValue': '1',
-                    "MaxValue": str(SAMPLE_SIZE),
-                },
-                {
-                    'Name': 'sample_size',
-                    'MinValue': '1',
-                    'MaxValue': str(SAMPLE_SIZE),
-                },
-            ],
-        },
-    },
-    'TrainingJobDefinition': {
-        'StaticHyperParameters': {
-            'predictor_type': 'classifier',
-            'feature_dim': '4',
-        },
-        'AlgorithmSpecification': {'TrainingImage': KNN_IMAGE_URI, 'TrainingInputMode': 'File'},
-        'InputDataConfig': [
-            {
-                'ChannelName': 'train',
-                **TRAINING_DATA_SOURCE,  # type: ignore [arg-type]
-            },
-            {
-                'ChannelName': 'test',
-                **TRAINING_DATA_SOURCE,  # type: ignore [arg-type]
-            },
-        ],
-        'OutputDataConfig': {'S3OutputPath': f's3://{S3_BUCKET}/{TRAINING_OUTPUT_S3_KEY}'},
-        'ResourceConfig': RESOURCE_CONFIG,
-        'StoppingCondition': TASK_TIMEOUT,
-        'RoleArn': ROLE_ARN,
-    },
-}
-
-
-# This script will be the entrypoint for the docker image which will handle preprocessing the raw data
-# NOTE:  The following string must remain dedented as it is being written to a file.
-PREPROCESS_SCRIPT = (
-    """
-import boto3
-import numpy as np
-import pandas as pd
-
-def main():
-    # Load the Iris dataset from {input_path}/input.csv, split it into train/test
-    # subsets, and write them to {output_path}/ for the Processing Operator.
-
-    columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']
-    iris = pd.read_csv('{input_path}/input.csv', names=columns)
-
-    # Process data
-    iris['species'] = iris['species'].replace({{'Iris-virginica': 0, 'Iris-versicolor': 1, 'Iris-setosa': 2}})
-    iris = iris[['species', 'sepal_length', 'sepal_width', 'petal_length', 'petal_width']]
-
-    # Split into test and train data
-    iris_train, iris_test = np.split(
-        iris.sample(frac=1, random_state=np.random.RandomState()), [int(0.7 * len(iris))]
-    )
-
-    # Remove the "answers" from the test set
-    iris_test.drop(['species'], axis=1, inplace=True)
-
-    # Write the splits to disk
-    iris_train.to_csv('{output_path}/train.csv', index=False, header=False)
-    iris_test.to_csv('{output_path}/test.csv', index=False, header=False)
-
-    print('Preprocessing Done.')
-
-if __name__ == "__main__":
-    main()
-
-    """
-).format(input_path=PROCESSING_LOCAL_INPUT_PATH, output_path=PROCESSING_LOCAL_OUTPUT_PATH)
-
-
-@task
-def upload_dataset_to_s3():
-    """Uploads the provided dataset to a designated Amazon S3 bucket."""
-    S3Hook().load_string(
-        string_data=DATASET,
-        bucket_name=S3_BUCKET,
-        key=RAW_DATA_S3_KEY,
-        replace=True,
-    )
-
-
-@task
-def build_and_upload_docker_image():
-    """
-    We need a Docker image with the following requirements:
-      - Has numpy, pandas, requests, and boto3 installed
-      - Has our data preprocessing script mounted and set as the entry point
-    """
-
-    # Fetch and parse ECR Token to be used for the docker push
-    ecr_client = boto3.client('ecr', region_name=REGION)
-    token = ecr_client.get_authorization_token()
-    credentials = (base64.b64decode(token['authorizationData'][0]['authorizationToken'])).decode('utf-8')
-    username, password = credentials.split(':')
-
-    with NamedTemporaryFile(mode='w+t') as preprocessing_script, NamedTemporaryFile(mode='w+t') as dockerfile:
-
-        preprocessing_script.write(PREPROCESS_SCRIPT)
-        preprocessing_script.flush()
-
-        dockerfile.write(
-            f"""
-            FROM amazonlinux
-            COPY {preprocessing_script.name.split('/')[2]} /preprocessing.py
-            ADD credentials /credentials
-            ENV AWS_SHARED_CREDENTIALS_FILE=/credentials
-            RUN yum install python3 pip -y
-            RUN pip3 install boto3 pandas requests
-            CMD [ "python3", "/preprocessing.py"]
-            """
-        )
-        dockerfile.flush()
-
-        docker_build_and_push_commands = f"""
-            cp /root/.aws/credentials /tmp/credentials &&
-            docker build -f {dockerfile.name} -t {ECR_REPOSITORY} /tmp &&
-            rm /tmp/credentials &&
-            aws ecr get-login-password --region {REGION} |
-            docker login --username {username} --password {password} {ECR_REPOSITORY} &&
-            docker push {ECR_REPOSITORY}
-            """
-        docker_build = subprocess.Popen(
-            docker_build_and_push_commands,
-            shell=True,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
-        )
-        _, err = docker_build.communicate()
-
-        if docker_build.returncode != 0:
-            raise RuntimeError(err)
-
-
-@task(trigger_rule='all_done')
-def cleanup():
-    # Delete S3 Artifacts
-    client = boto3.client('s3')
-    object_keys = [
-        key['Key'] for key in client.list_objects_v2(Bucket=S3_BUCKET, Prefix=PROJECT_NAME)['Contents']
-    ]
-    for key in object_keys:
-        client.delete_objects(Bucket=S3_BUCKET, Delete={'Objects': [{'Key': key}]})
-
-
-with DAG(
-    dag_id='example_sagemaker',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    tags=['example'],
-    catchup=False,
-) as dag:
-
-    # [START howto_operator_sagemaker_processing]
-    preprocess_raw_data = SageMakerProcessingOperator(
-        task_id='preprocess_raw_data',
-        config=SAGEMAKER_PROCESSING_JOB_CONFIG,
-        do_xcom_push=False,
-    )
-    # [END howto_operator_sagemaker_processing]
-
-    # [START howto_operator_sagemaker_training]
-    train_model = SageMakerTrainingOperator(
-        task_id='train_model',
-        config=TRAINING_CONFIG,
-        # Waits by default, setting as False to demonstrate the Sensor below.
-        wait_for_completion=False,
-        do_xcom_push=False,
-    )
-    # [END howto_operator_sagemaker_training]
-
-    # [START howto_sensor_sagemaker_training]
-    await_training = SageMakerTrainingSensor(
-        task_id='await_training',
-        job_name=TRAINING_JOB_NAME,
-    )
-    # [END howto_sensor_sagemaker_training]
-
-    # [START howto_operator_sagemaker_model]
-    create_model = SageMakerModelOperator(
-        task_id='create_model',
-        config=MODEL_CONFIG,
-        do_xcom_push=False,
-    )
-    # [END howto_operator_sagemaker_model]
-
-    # [START howto_operator_sagemaker_tuning]
-    tune_model = SageMakerTuningOperator(
-        task_id='tune_model',
-        config=TUNING_CONFIG,
-        # Waits by default, setting as False to demonstrate the Sensor below.
-        wait_for_completion=False,
-        do_xcom_push=False,
-    )
-    # [END howto_operator_sagemaker_tuning]
-
-    # [START howto_sensor_sagemaker_tuning]
-    await_tune = SageMakerTuningSensor(
-        task_id='await_tuning',
-        job_name=TUNING_JOB_NAME,
-    )
-    # [END howto_sensor_sagemaker_tuning]
-
-    # [START howto_operator_sagemaker_transform]
-    test_model = SageMakerTransformOperator(
-        task_id='test_model',
-        config=TRANSFORM_CONFIG,
-        # Waits by default, setting as False to demonstrate the Sensor below.
-        wait_for_completion=False,
-        do_xcom_push=False,
-    )
-    # [END howto_operator_sagemaker_transform]
-
-    # [START howto_sensor_sagemaker_transform]
-    await_transform = SageMakerTransformSensor(
-        task_id='await_transform',
-        job_name=TRANSFORM_JOB_NAME,
-    )
-    # [END howto_sensor_sagemaker_transform]
-
-    # Trigger rule set to "all_done" so clean up will run regardless of success on other tasks.
-    # [START howto_operator_sagemaker_delete_model]
-    delete_model = SageMakerDeleteModelOperator(
-        task_id='delete_model',
-        config={'ModelName': MODEL_NAME},
-        trigger_rule='all_done',
-    )
-    # [END howto_operator_sagemaker_delete_model]
-
-    (
-        upload_dataset_to_s3()
-        >> build_and_upload_docker_image()
-        >> preprocess_raw_data
-        >> train_model
-        >> await_training
-        >> create_model
-        >> tune_model
-        >> await_tune
-        >> test_model
-        >> await_transform
-        >> cleanup()
-        >> delete_model
-    )
diff --git a/docs/apache-airflow-providers-amazon/operators/sagemaker.rst b/docs/apache-airflow-providers-amazon/operators/sagemaker.rst
index 262d7d84f2..38d615cd6f 100644
--- a/docs/apache-airflow-providers-amazon/operators/sagemaker.rst
+++ b/docs/apache-airflow-providers-amazon/operators/sagemaker.rst
@@ -42,7 +42,7 @@ Create an Amazon SageMaker processing job
 To create an Amazon Sagemaker processing job to sanitize your dataset you can use
 :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerProcessingOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_sagemaker.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sagemaker_processing]
@@ -56,7 +56,7 @@ Create an Amazon SageMaker training job
 To create an Amazon Sagemaker training job you can use
 :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerTrainingOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_sagemaker.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sagemaker_training]
@@ -70,7 +70,7 @@ Create an Amazon SageMaker model
 To create an Amazon Sagemaker model you can use
 :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerModelOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_sagemaker.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sagemaker_model]
@@ -84,7 +84,7 @@ Start a hyperparameter tuning job
 To start a hyperparameter tuning job for an Amazon Sagemaker model you can use
 :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerTuningOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_sagemaker.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sagemaker_tuning]
@@ -98,7 +98,7 @@ Delete an Amazon SageMaker model
 To delete an Amazon Sagemaker model you can use
 :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerDeleteModelOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_sagemaker.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sagemaker_delete_model]
@@ -112,7 +112,7 @@ Create an Amazon SageMaker transform job
 To create an Amazon Sagemaker transform job you can use
 :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerTransformOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_sagemaker.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sagemaker_transform]
@@ -157,7 +157,7 @@ Wait on an Amazon SageMaker training job state
 To check the state of an Amazon Sagemaker training job until it reaches a terminal state
 you can use :class:`~airflow.providers.amazon.aws.sensors.sagemaker.SageMakerTrainingSensor`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_sagemaker.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_sagemaker_training]
@@ -171,7 +171,7 @@ Wait on an Amazon SageMaker transform job state
 To check the state of an Amazon Sagemaker transform job until it reaches a terminal state
 you can use :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerTransformOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_sagemaker.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_sagemaker_transform]
@@ -185,7 +185,7 @@ Wait on an Amazon SageMaker tuning job state
 To check the state of an Amazon Sagemaker hyperparameter tuning job until it reaches a terminal state
 you can use :class:`~airflow.providers.amazon.aws.sensors.sagemaker.SageMakerTuningSensor`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_sagemaker.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_sagemaker_tuning]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 5dd5d794a0..1d821140c5 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1008,6 +1008,7 @@ keytab
 killMode
 kinit
 kms
+knn
 knownHosts
 krb
 kube
diff --git a/tests/system/providers/amazon/aws/example_sagemaker.py b/tests/system/providers/amazon/aws/example_sagemaker.py
new file mode 100644
index 0000000000..1b729e7a24
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_sagemaker.py
@@ -0,0 +1,535 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import base64
+import subprocess
+from datetime import datetime
+from tempfile import NamedTemporaryFile
+
+import boto3
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.operators.python import get_current_context
+from airflow.providers.amazon.aws.operators.s3 import (
+    S3CreateBucketOperator,
+    S3CreateObjectOperator,
+    S3DeleteBucketOperator,
+)
+from airflow.providers.amazon.aws.operators.sagemaker import (
+    SageMakerDeleteModelOperator,
+    SageMakerModelOperator,
+    SageMakerProcessingOperator,
+    SageMakerTrainingOperator,
+    SageMakerTransformOperator,
+    SageMakerTuningOperator,
+)
+from airflow.providers.amazon.aws.sensors.sagemaker import (
+    SageMakerTrainingSensor,
+    SageMakerTransformSensor,
+    SageMakerTuningSensor,
+)
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder, purge_logs
+
+DAG_ID = 'example_sagemaker'
+
+# Externally fetched variables:
+ROLE_ARN_KEY = 'ROLE_ARN'
+# The URI of a Docker image for handling KNN model training.
+# To find the URI of a free Amazon-provided image that can be used, substitute your
+# desired region in the following link and find the URI under "Registry Path".
+# https://docs.aws.amazon.com/sagemaker/latest/dg/ecr-us-east-1.html#knn-us-east-1.title
+# This URI should be in the format of {12-digits}.dkr.ecr.{region}.amazonaws.com/knn
+KNN_IMAGE_URI_KEY = 'KNN_IMAGE_URI'
+
+sys_test_context_task = (
+    SystemTestContextBuilder().add_variable(KNN_IMAGE_URI_KEY).add_variable(ROLE_ARN_KEY).build()
+)
+
+# For this example we are using a subset of Fischer's Iris Data Set.
+# The full dataset can be found at UC Irvine's machine learning repository:
+# https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
+DATASET = """
+        5.1,3.5,1.4,0.2,Iris-setosa
+        4.9,3.0,1.4,0.2,Iris-setosa
+        7.0,3.2,4.7,1.4,Iris-versicolor
+        6.4,3.2,4.5,1.5,Iris-versicolor
+        4.9,2.5,4.5,1.7,Iris-virginica
+        7.3,2.9,6.3,1.8,Iris-virginica
+        """
+SAMPLE_SIZE = DATASET.count('\n') - 1
+
+# This script will be the entrypoint for the docker image which will handle preprocessing the raw data
+# NOTE:  The following string must remain dedented as it is being written to a file.
+PREPROCESS_SCRIPT_TEMPLATE = """
+import boto3
+import numpy as np
+import pandas as pd
+
+def main():
+    # Load the Iris dataset from {input_path}/input.csv, split it into train/test
+    # subsets, and write them to {output_path}/ for the Processing Operator.
+
+    columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']
+    iris = pd.read_csv('{input_path}/input.csv', names=columns)
+
+    # Process data
+    iris['species'] = iris['species'].replace({{'Iris-virginica': 0, 'Iris-versicolor': 1, 'Iris-setosa': 2}})
+    iris = iris[['species', 'sepal_length', 'sepal_width', 'petal_length', 'petal_width']]
+
+    # Split into test and train data
+    iris_train, iris_test = np.split(
+        iris.sample(frac=1, random_state=np.random.RandomState()), [int(0.7 * len(iris))]
+    )
+
+    # Remove the "answers" from the test set
+    iris_test.drop(['species'], axis=1, inplace=True)
+
+    # Write the splits to disk
+    iris_train.to_csv('{output_path}/train.csv', index=False, header=False)
+    iris_test.to_csv('{output_path}/test.csv', index=False, header=False)
+
+    print('Preprocessing Done.')
+
+if __name__ == "__main__":
+    main()
+
+    """
+
+
+def _create_ecr_repository(repo_name):
+    return boto3.client('ecr').create_repository(repositoryName=repo_name)['repository']['repositoryUri']
+
+
+def _build_and_upload_docker_image(preprocess_script, repository_uri):
+    """
+    We need a Docker image with the following requirements:
+      - Has numpy, pandas, requests, and boto3 installed
+      - Has our data preprocessing script mounted and set as the entry point
+    """
+    ecr_region = repository_uri.split('.')[3]
+
+    # Fetch and parse ECR Token to be used for the docker push
+    token = boto3.client('ecr', region_name=ecr_region).get_authorization_token()
+    credentials = (base64.b64decode(token['authorizationData'][0]['authorizationToken'])).decode('utf-8')
+    username, password = credentials.split(':')
+
+    with NamedTemporaryFile(mode='w+t') as preprocessing_script, NamedTemporaryFile(mode='w+t') as dockerfile:
+        preprocessing_script.write(preprocess_script)
+        preprocessing_script.flush()
+
+        dockerfile.write(
+            f"""
+            FROM amazonlinux
+            COPY {preprocessing_script.name.split('/')[2]} /preprocessing.py
+            ADD credentials /credentials
+            ENV AWS_SHARED_CREDENTIALS_FILE=/credentials
+            RUN yum install python3 pip -y
+            RUN pip3 install boto3 pandas requests
+            CMD [ "python3", "/preprocessing.py"]
+            """
+        )
+        dockerfile.flush()
+
+        docker_build_and_push_commands = f"""
+            cp /root/.aws/credentials /tmp/credentials &&
+            docker build -f {dockerfile.name} -t {repository_uri} /tmp &&
+            rm /tmp/credentials &&
+            aws ecr get-login-password --region {ecr_region} |
+            docker login --username {username} --password {password} {repository_uri} &&
+            docker push {repository_uri}
+            """
+        docker_build = subprocess.Popen(
+            docker_build_and_push_commands,
+            shell=True,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+        )
+        _, err = docker_build.communicate()
+
+        if docker_build.returncode != 0:
+            raise RuntimeError(err)
+
+
+@task
+def set_up(env_id, knn_image_uri, role_arn):
+    bucket_name = f'{env_id}-sagemaker-example'
+    ecr_repository_name = f'{env_id}-repo'
+    model_name = f'{env_id}-KNN-model'
+    processing_job_name = f'{env_id}-processing'
+    training_job_name = f'{env_id}-train'
+    transform_job_name = f'{env_id}-transform'
+    tuning_job_name = f'{env_id}-tune'
+
+    input_data_S3_key = f'{env_id}/processed-input-data'
+    prediction_output_s3_key = f'{env_id}/transform'
+    processing_local_input_path = '/opt/ml/processing/input'
+    processing_local_output_path = '/opt/ml/processing/output'
+    raw_data_s3_key = f'{env_id}/preprocessing/input.csv'
+    training_output_s3_key = f'{env_id}/results'
+
+    ecr_repository_uri = _create_ecr_repository(ecr_repository_name)
+    resource_config = {
+        'InstanceCount': 1,
+        'InstanceType': 'ml.m5.large',
+        'VolumeSizeInGB': 1,
+    }
+    processing_config = {
+        "ProcessingJobName": processing_job_name,
+        "ProcessingInputs": [
+            {
+                "InputName": "input",
+                "AppManaged": False,
+                "S3Input": {
+                    "S3Uri": f's3://{bucket_name}/{raw_data_s3_key}',
+                    "LocalPath": processing_local_input_path,
+                    "S3DataType": "S3Prefix",
+                    "S3InputMode": "File",
+                    "S3DataDistributionType": "FullyReplicated",
+                    "S3CompressionType": "None",
+                },
+            },
+        ],
+        "ProcessingOutputConfig": {
+            "Outputs": [
+                {
+                    "OutputName": "output",
+                    "S3Output": {
+                        "S3Uri": f's3://{bucket_name}/{input_data_S3_key}',
+                        "LocalPath": processing_local_output_path,
+                        "S3UploadMode": "EndOfJob",
+                    },
+                    "AppManaged": False,
+                }
+            ]
+        },
+        "ProcessingResources": {
+            "ClusterConfig": resource_config,
+        },
+        "StoppingCondition": {"MaxRuntimeInSeconds": 300},
+        "AppSpecification": {
+            "ImageUri": ecr_repository_uri,
+        },
+        "RoleArn": role_arn,
+    }
+
+    training_data_source = {
+        "CompressionType": "None",
+        "ContentType": "text/csv",
+        "DataSource": {
+            "S3DataSource": {
+                "S3DataDistributionType": "FullyReplicated",
+                "S3DataType": "S3Prefix",
+                "S3Uri": f's3://{bucket_name}/{input_data_S3_key}/train.csv',
+            }
+        },
+    }
+    training_config = {
+        "AlgorithmSpecification": {
+            "TrainingImage": knn_image_uri,
+            "TrainingInputMode": "File",
+        },
+        "HyperParameters": {
+            "predictor_type": "classifier",
+            "feature_dim": "4",
+            "k": "3",
+            "sample_size": str(SAMPLE_SIZE),
+        },
+        "InputDataConfig": [
+            {
+                "ChannelName": "train",
+                **training_data_source,
+            }
+        ],
+        "OutputDataConfig": {"S3OutputPath": f"s3://{bucket_name}/{training_output_s3_key}/"},
+        "ResourceConfig": resource_config,
+        "RoleArn": role_arn,
+        "StoppingCondition": {"MaxRuntimeInSeconds": 6000},
+        "TrainingJobName": training_job_name,
+    }
+    model_config = {
+        "ExecutionRoleArn": role_arn,
+        "ModelName": model_name,
+        "PrimaryContainer": {
+            "Mode": "SingleModel",
+            "Image": knn_image_uri,
+            "ModelDataUrl": f"s3://{bucket_name}/{training_output_s3_key}/{training_job_name}/output/model.tar.gz",  # noqa: E501
+        },
+    }
+    tuning_config = {
+        "HyperParameterTuningJobName": tuning_job_name,
+        "HyperParameterTuningJobConfig": {
+            "Strategy": "Bayesian",
+            "HyperParameterTuningJobObjective": {
+                "MetricName": "test:accuracy",
+                "Type": "Maximize",
+            },
+            "ResourceLimits": {
+                # You would bump these up in production as appropriate.
+                "MaxNumberOfTrainingJobs": 2,
+                "MaxParallelTrainingJobs": 2,
+            },
+            "ParameterRanges": {
+                "CategoricalParameterRanges": [],
+                "IntegerParameterRanges": [
+                    # Set the min and max values of the hyperparameters you want to tune.
+                    {
+                        "Name": "k",
+                        "MinValue": "1",
+                        "MaxValue": str(SAMPLE_SIZE),
+                    },
+                    {
+                        "Name": "sample_size",
+                        "MinValue": "1",
+                        "MaxValue": str(SAMPLE_SIZE),
+                    },
+                ],
+            },
+        },
+        "TrainingJobDefinition": {
+            "StaticHyperParameters": {
+                "predictor_type": "classifier",
+                "feature_dim": "4",
+            },
+            "AlgorithmSpecification": {"TrainingImage": knn_image_uri, "TrainingInputMode": "File"},
+            "InputDataConfig": [
+                {
+                    "ChannelName": "train",
+                    **training_data_source,
+                },
+                {
+                    "ChannelName": "test",
+                    **training_data_source,
+                },
+            ],
+            "OutputDataConfig": {"S3OutputPath": f"s3://{bucket_name}/{training_output_s3_key}"},
+            "ResourceConfig": resource_config,
+            "RoleArn": role_arn,
+            "StoppingCondition": {"MaxRuntimeInSeconds": 60000},
+        },
+    }
+    transform_config = {
+        "TransformJobName": transform_job_name,
+        "TransformInput": {
+            "DataSource": {
+                "S3DataSource": {
+                    "S3DataType": "S3Prefix",
+                    "S3Uri": f"s3://{bucket_name}/{input_data_S3_key}/test.csv",
+                }
+            },
+            "SplitType": "Line",
+            "ContentType": "text/csv",
+        },
+        "TransformOutput": {"S3OutputPath": f"s3://{bucket_name}/{prediction_output_s3_key}"},
+        "TransformResources": {
+            "InstanceCount": 1,
+            "InstanceType": "ml.m5.large",
+        },
+        "ModelName": model_name,
+    }
+
+    preprocess_script = PREPROCESS_SCRIPT_TEMPLATE.format(
+        input_path=processing_local_input_path, output_path=processing_local_output_path
+    )
+    _build_and_upload_docker_image(preprocess_script, ecr_repository_uri)
+
+    ti = get_current_context()['ti']
+    ti.xcom_push(key='bucket_name', value=bucket_name)
+    ti.xcom_push(key='raw_data_s3_key', value=raw_data_s3_key)
+    ti.xcom_push(key='ecr_repository_name', value=ecr_repository_name)
+    ti.xcom_push(key='processing_config', value=processing_config)
+    ti.xcom_push(key='training_config', value=training_config)
+    ti.xcom_push(key='training_job_name', value=training_job_name)
+    ti.xcom_push(key='model_config', value=model_config)
+    ti.xcom_push(key='model_name', value=model_name)
+    ti.xcom_push(key='tuning_config', value=tuning_config)
+    ti.xcom_push(key='tuning_job_name', value=tuning_job_name)
+    ti.xcom_push(key='transform_config', value=transform_config)
+    ti.xcom_push(key='transform_job_name', value=transform_job_name)
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_ecr_repository(repository_name):
+    client = boto3.client('ecr')
+
+    # All images must be removed from the repo before it can be deleted.
+    image_ids = client.list_images(repositoryName=repository_name)['imageIds']
+    client.batch_delete_image(
+        repositoryName=repository_name,
+        imageIds=[{'imageDigest': image['imageDigest'] for image in image_ids}],
+    )
+    client.delete_repository(repositoryName=repository_name)
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_logs(env_id):
+    generated_logs = [
+        # Format: ('log group name', 'log stream prefix')
+        ('/aws/sagemaker/ProcessingJobs', env_id),
+        ('/aws/sagemaker/TrainingJobs', env_id),
+        ('/aws/sagemaker/TransformJobs', env_id),
+    ]
+    purge_logs(generated_logs)
+
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+
+    test_setup = set_up(
+        env_id=test_context[ENV_ID_KEY],
+        knn_image_uri=test_context[KNN_IMAGE_URI_KEY],
+        role_arn=test_context[ROLE_ARN_KEY],
+    )
+
+    create_bucket = S3CreateBucketOperator(
+        task_id='create_bucket',
+        bucket_name=test_setup['bucket_name'],
+    )
+
+    upload_dataset = S3CreateObjectOperator(
+        task_id='upload_dataset',
+        s3_bucket=test_setup['bucket_name'],
+        s3_key=test_setup['raw_data_s3_key'],
+        data=DATASET,
+        replace=True,
+    )
+
+    # [START howto_operator_sagemaker_processing]
+    preprocess_raw_data = SageMakerProcessingOperator(
+        task_id='preprocess_raw_data',
+        config=test_setup['processing_config'],
+        do_xcom_push=False,
+    )
+    # [END howto_operator_sagemaker_processing]
+
+    # [START howto_operator_sagemaker_training]
+    train_model = SageMakerTrainingOperator(
+        task_id='train_model',
+        config=test_setup['training_config'],
+        # Waits by default, setting as False to demonstrate the Sensor below.
+        wait_for_completion=False,
+        do_xcom_push=False,
+    )
+    # [END howto_operator_sagemaker_training]
+
+    # [START howto_sensor_sagemaker_training]
+    await_training = SageMakerTrainingSensor(
+        task_id="await_training",
+        job_name=test_setup['training_job_name'],
+        do_xcom_push=False,
+    )
+    # [END howto_sensor_sagemaker_training]
+
+    # [START howto_operator_sagemaker_model]
+    create_model = SageMakerModelOperator(
+        task_id='create_model',
+        config=test_setup['model_config'],
+        do_xcom_push=False,
+    )
+    # [END howto_operator_sagemaker_model]
+
+    # [START howto_operator_sagemaker_tuning]
+    tune_model = SageMakerTuningOperator(
+        task_id="tune_model",
+        config=test_setup['tuning_config'],
+        # Waits by default, setting as False to demonstrate the Sensor below.
+        wait_for_completion=False,
+        do_xcom_push=False,
+    )
+    # [END howto_operator_sagemaker_tuning]
+
+    # [START howto_sensor_sagemaker_tuning]
+    await_tune = SageMakerTuningSensor(
+        task_id="await_tuning",
+        job_name=test_setup['tuning_job_name'],
+        do_xcom_push=False,
+    )
+    # [END howto_sensor_sagemaker_tuning]
+
+    # [START howto_operator_sagemaker_transform]
+    test_model = SageMakerTransformOperator(
+        task_id='test_model',
+        config=test_setup['transform_config'],
+        # Waits by default, setting as False to demonstrate the Sensor below.
+        wait_for_completion=False,
+        do_xcom_push=False,
+    )
+    # [END howto_operator_sagemaker_transform]
+
+    # [START howto_sensor_sagemaker_transform]
+    await_transform = SageMakerTransformSensor(
+        task_id="await_transform",
+        job_name=test_setup['transform_job_name'],
+        do_xcom_push=False,
+    )
+    # [END howto_sensor_sagemaker_transform]
+
+    # [START howto_operator_sagemaker_delete_model]
+    delete_model = SageMakerDeleteModelOperator(
+        task_id="delete_model",
+        config={'ModelName': test_setup['model_name']},
+        trigger_rule=TriggerRule.ALL_DONE,
+        do_xcom_push=False,
+    )
+    # [END howto_operator_sagemaker_delete_model]
+
+    delete_bucket = S3DeleteBucketOperator(
+        task_id='delete_bucket',
+        trigger_rule=TriggerRule.ALL_DONE,
+        bucket_name=test_setup['bucket_name'],
+        force_delete=True,
+    )
+
+    chain(
+        # TEST SETUP
+        test_context,
+        test_setup,
+        create_bucket,
+        upload_dataset,
+        # TEST BODY
+        preprocess_raw_data,
+        train_model,
+        await_training,
+        create_model,
+        tune_model,
+        await_tune,
+        test_model,
+        await_transform,
+        # TEST TEARDOWN
+        delete_ecr_repository(test_setup['ecr_repository_name']),
+        delete_model,
+        delete_bucket,
+        delete_logs(test_context[ENV_ID_KEY]),
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/amazon/aws/utils/__init__.py b/tests/system/providers/amazon/aws/utils/__init__.py
index 38f606d0fb..4228fa2986 100644
--- a/tests/system/providers/amazon/aws/utils/__init__.py
+++ b/tests/system/providers/amazon/aws/utils/__init__.py
@@ -19,7 +19,7 @@ import inspect
 import json
 import os
 from os.path import basename, splitext
-from typing import Optional
+from typing import List, Optional, Tuple
 from uuid import uuid4
 
 import boto3
@@ -180,3 +180,29 @@ def set_env_id() -> str:
 
     os.environ[ENV_ID_ENVIRON_KEY] = env_id
     return env_id
+
+
+def purge_logs(test_logs: List[Tuple[str, Optional[str]]]) -> None:
+    """
+    Accepts a tuple in the format: ('log group name', 'log stream prefix').
+    For each log group, it will delete any log streams matching the provided
+    prefix then if the log group is empty, delete the group.   If the group
+    is not empty that indicates there are logs not generated by the test and
+    those are left intact.
+
+    :param test_logs: A list of log_group/stream_prefix tuples to delete.
+    """
+    client: BaseClient = boto3.client('logs')
+
+    for group, prefix in test_logs:
+        if prefix:
+            log_streams = client.describe_log_streams(
+                logGroupName=group,
+                logStreamNamePrefix=prefix,
+            )['logStreams']
+
+            for stream_name in [stream['logStreamName'] for stream in log_streams]:
+                client.delete_log_stream(logGroupName=group, logStreamName=stream_name)
+
+        if not client.describe_log_streams(logGroupName=group)['logStreams']:
+            client.delete_log_group(logGroupName=group)