You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/25 18:28:55 UTC
[airflow] branch main updated: Add doc and example dag for AWS Step Functions Operators
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8cfb2be989 Add doc and example dag for AWS Step Functions Operators
8cfb2be989 is described below
commit 8cfb2be98931e0f0bfb15ca411b36be3d6e66b80
Author: Niko Oliveira <on...@amazon.com>
AuthorDate: Thu Apr 14 13:20:03 2022 -0700
Add doc and example dag for AWS Step Functions Operators
---
.../aws/example_dags/example_step_functions.py | 56 ++++++++++++++++
.../amazon/aws/operators/step_function.py | 10 +--
.../providers/amazon/aws/sensors/step_function.py | 8 ++-
airflow/providers/amazon/provider.yaml | 2 +
.../operators/step_functions.rst | 78 ++++++++++++++++++++++
5 files changed, 148 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_step_functions.py b/airflow/providers/amazon/aws/example_dags/example_step_functions.py
new file mode 100644
index 0000000000..9a0ac2474c
--- /dev/null
+++ b/airflow/providers/amazon/aws/example_dags/example_step_functions.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from datetime import datetime
+from os import environ
+
+from airflow import DAG
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.step_function import (
+ StepFunctionGetExecutionOutputOperator,
+ StepFunctionStartExecutionOperator,
+)
+from airflow.providers.amazon.aws.sensors.step_function import StepFunctionExecutionSensor
+
+STEP_FUNCTIONS_STATE_MACHINE_ARN = environ.get('STEP_FUNCTIONS_STATE_MACHINE_ARN', 'state_machine_arn')
+
+with DAG(
+ dag_id='example_step_functions',
+ schedule_interval=None,
+ start_date=datetime(2021, 1, 1),
+ tags=['example'],
+ catchup=False,
+) as dag:
+
+ # [START howto_operator_step_function_start_execution]
+ start_execution = StepFunctionStartExecutionOperator(
+ task_id='start_execution', state_machine_arn=STEP_FUNCTIONS_STATE_MACHINE_ARN
+ )
+ # [END howto_operator_step_function_start_execution]
+
+ # [START howto_operator_step_function_execution_sensor]
+ wait_for_execution = StepFunctionExecutionSensor(
+ task_id='wait_for_execution', execution_arn=start_execution.output
+ )
+ # [END howto_operator_step_function_execution_sensor]
+
+ # [START howto_operator_step_function_get_execution_output]
+ get_execution_output = StepFunctionGetExecutionOutputOperator(
+ task_id='get_execution_output', execution_arn=start_execution.output
+ )
+ # [END howto_operator_step_function_get_execution_output]
+
+ chain(start_execution, wait_for_execution, get_execution_output)
diff --git a/airflow/providers/amazon/aws/operators/step_function.py b/airflow/providers/amazon/aws/operators/step_function.py
index b800ea90d3..7c32b33890 100644
--- a/airflow/providers/amazon/aws/operators/step_function.py
+++ b/airflow/providers/amazon/aws/operators/step_function.py
@@ -29,12 +29,13 @@ if TYPE_CHECKING:
class StepFunctionStartExecutionOperator(BaseOperator):
"""
- An Operator that begins execution of an Step Function State Machine
+ An Operator that begins execution of an AWS Step Function State Machine.
Additional arguments may be specified and are passed down to the underlying BaseOperator.
.. seealso::
- :class:`~airflow.models.BaseOperator`
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:StepFunctionStartExecutionOperator`
:param state_machine_arn: ARN of the Step Function State Machine
:param name: The name of the execution.
@@ -79,12 +80,13 @@ class StepFunctionStartExecutionOperator(BaseOperator):
class StepFunctionGetExecutionOutputOperator(BaseOperator):
"""
- An Operator that begins execution of an Step Function State Machine
+ An Operator that returns the output of an AWS Step Function State Machine execution.
Additional arguments may be specified and are passed down to the underlying BaseOperator.
.. seealso::
- :class:`~airflow.models.BaseOperator`
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:StepFunctionGetExecutionOutputOperator`
:param execution_arn: ARN of the Step Function State Machine Execution
:param aws_conn_id: aws connection to use, defaults to 'aws_default'
diff --git a/airflow/providers/amazon/aws/sensors/step_function.py b/airflow/providers/amazon/aws/sensors/step_function.py
index 3c170c0727..085ca21fc4 100644
--- a/airflow/providers/amazon/aws/sensors/step_function.py
+++ b/airflow/providers/amazon/aws/sensors/step_function.py
@@ -28,13 +28,17 @@ if TYPE_CHECKING:
class StepFunctionExecutionSensor(BaseSensorOperator):
"""
- Asks for the state of the Step Function State Machine Execution until it
+ Asks for the state of the AWS Step Function State Machine Execution until it
reaches a failure state or success state.
- If it fails, failing the task.
+ If it fails, then fail the task.
On successful completion of the Execution the Sensor will do an XCom Push
of the State Machine's output to `output`
+ .. seealso::
+ For more information on how to use this sensor, take a look at the guide:
+ :ref:`howto/operator:StepFunctionExecutionSensor`
+
:param execution_arn: execution_arn to check the state of
:param aws_conn_id: aws connection to use, defaults to 'aws_default'
"""
diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml
index c1cfe7abf0..13b84639f5 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -190,6 +190,8 @@ integrations:
- integration-name: AWS Step Functions
external-doc-url: https://aws.amazon.com/step-functions/
logo: /integration-logos/aws/AWS-Step-Functions_light-bg@4x.png
+ how-to-guide:
+ - /docs/apache-airflow-providers-amazon/operators/step_functions.rst
tags: [aws]
- integration-name: AWS Database Migration Service
external-doc-url: https://aws.amazon.com/dms/
diff --git a/docs/apache-airflow-providers-amazon/operators/step_functions.rst b/docs/apache-airflow-providers-amazon/operators/step_functions.rst
new file mode 100644
index 0000000000..8d14af78fc
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/step_functions.rst
@@ -0,0 +1,78 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+AWS Step Functions Operators
+============================
+
+`AWS Step Functions <https://aws.amazon.com/step-functions/>`__ makes it easy to coordinate the components
+of distributed applications as a series of steps in a visual workflow. You can quickly build and run state
+machines to execute the steps of your application in a reliable and scalable fashion.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+.. _howto/operator:StepFunctionStartExecutionOperator:
+
+AWS Step Functions Start Execution Operator
+"""""""""""""""""""""""""""""""""""""""""""
+
+To start a new AWS Step Functions State Machine execution
+use :class:`~airflow.providers.amazon.aws.operators.step_function.StepFunctionStartExecutionOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_step_function_start_execution]
+ :end-before: [END howto_operator_step_function_start_execution]
+
+.. _howto/operator:StepFunctionExecutionSensor:
+
+AWS Step Functions Execution Sensor
+"""""""""""""""""""""""""""""""""""
+
+To wait on the state of an AWS Step Function State Machine execution until it reaches a terminal state you can
+use :class:`~airflow.providers.amazon.aws.sensors.step_function.StepFunctionExecutionSensor`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_step_function_execution_sensor]
+ :end-before: [END howto_operator_step_function_execution_sensor]
+
+.. _howto/operator:StepFunctionGetExecutionOutputOperator:
+
+AWS Step Functions Get Execution Output Operator
+""""""""""""""""""""""""""""""""""""""""""""""""
+
+To fetch the output from an AWS Step Function State Machine execution you can
+use :class:`~airflow.providers.amazon.aws.operators.step_function.StepFunctionGetExecutionOutputOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_step_function_get_execution_output]
+ :end-before: [END howto_operator_step_function_get_execution_output]
+
+References
+----------
+
+For further information, look at:
+
+* `Boto3 Library Documentation for Step Functions <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html>`__