You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/25 18:28:55 UTC

[airflow] branch main updated: Add doc and example dag for AWS Step Functions Operators

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8cfb2be989 Add doc and example dag for AWS Step Functions Operators
8cfb2be989 is described below

commit 8cfb2be98931e0f0bfb15ca411b36be3d6e66b80
Author: Niko Oliveira <on...@amazon.com>
AuthorDate: Thu Apr 14 13:20:03 2022 -0700

    Add doc and example dag for AWS Step Functions Operators
---
 .../aws/example_dags/example_step_functions.py     | 56 ++++++++++++++++
 .../amazon/aws/operators/step_function.py          | 10 +--
 .../providers/amazon/aws/sensors/step_function.py  |  8 ++-
 airflow/providers/amazon/provider.yaml             |  2 +
 .../operators/step_functions.rst                   | 78 ++++++++++++++++++++++
 5 files changed, 148 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_step_functions.py b/airflow/providers/amazon/aws/example_dags/example_step_functions.py
new file mode 100644
index 0000000000..9a0ac2474c
--- /dev/null
+++ b/airflow/providers/amazon/aws/example_dags/example_step_functions.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from datetime import datetime
+from os import environ
+
+from airflow import DAG
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.step_function import (
+    StepFunctionGetExecutionOutputOperator,
+    StepFunctionStartExecutionOperator,
+)
+from airflow.providers.amazon.aws.sensors.step_function import StepFunctionExecutionSensor
+
+STEP_FUNCTIONS_STATE_MACHINE_ARN = environ.get('STEP_FUNCTIONS_STATE_MACHINE_ARN', 'state_machine_arn')
+
+with DAG(
+    dag_id='example_step_functions',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+
+    # [START howto_operator_step_function_start_execution]
+    start_execution = StepFunctionStartExecutionOperator(
+        task_id='start_execution', state_machine_arn=STEP_FUNCTIONS_STATE_MACHINE_ARN
+    )
+    # [END howto_operator_step_function_start_execution]
+
+    # [START howto_operator_step_function_execution_sensor]
+    wait_for_execution = StepFunctionExecutionSensor(
+        task_id='wait_for_execution', execution_arn=start_execution.output
+    )
+    # [END howto_operator_step_function_execution_sensor]
+
+    # [START howto_operator_step_function_get_execution_output]
+    get_execution_output = StepFunctionGetExecutionOutputOperator(
+        task_id='get_execution_output', execution_arn=start_execution.output
+    )
+    # [END howto_operator_step_function_get_execution_output]
+
+    chain(start_execution, wait_for_execution, get_execution_output)
diff --git a/airflow/providers/amazon/aws/operators/step_function.py b/airflow/providers/amazon/aws/operators/step_function.py
index b800ea90d3..7c32b33890 100644
--- a/airflow/providers/amazon/aws/operators/step_function.py
+++ b/airflow/providers/amazon/aws/operators/step_function.py
@@ -29,12 +29,13 @@ if TYPE_CHECKING:
 
 class StepFunctionStartExecutionOperator(BaseOperator):
     """
-    An Operator that begins execution of an Step Function State Machine
+    An Operator that begins execution of an AWS Step Function State Machine.
 
     Additional arguments may be specified and are passed down to the underlying BaseOperator.
 
     .. seealso::
-        :class:`~airflow.models.BaseOperator`
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:StepFunctionStartExecutionOperator`
 
     :param state_machine_arn: ARN of the Step Function State Machine
     :param name: The name of the execution.
@@ -79,12 +80,13 @@ class StepFunctionStartExecutionOperator(BaseOperator):
 
 class StepFunctionGetExecutionOutputOperator(BaseOperator):
     """
-    An Operator that begins execution of an Step Function State Machine
+    An Operator that returns the output of an AWS Step Function State Machine execution.
 
     Additional arguments may be specified and are passed down to the underlying BaseOperator.
 
     .. seealso::
-        :class:`~airflow.models.BaseOperator`
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:StepFunctionGetExecutionOutputOperator`
 
     :param execution_arn: ARN of the Step Function State Machine Execution
     :param aws_conn_id: aws connection to use, defaults to 'aws_default'
diff --git a/airflow/providers/amazon/aws/sensors/step_function.py b/airflow/providers/amazon/aws/sensors/step_function.py
index 3c170c0727..085ca21fc4 100644
--- a/airflow/providers/amazon/aws/sensors/step_function.py
+++ b/airflow/providers/amazon/aws/sensors/step_function.py
@@ -28,13 +28,17 @@ if TYPE_CHECKING:
 
 class StepFunctionExecutionSensor(BaseSensorOperator):
     """
-    Asks for the state of the Step Function State Machine Execution until it
+    Asks for the state of the AWS Step Function State Machine Execution until it
     reaches a failure state or success state.
-    If it fails, failing the task.
+    If it fails, then fail the task.
 
     On successful completion of the Execution the Sensor will do an XCom Push
     of the State Machine's output to `output`
 
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/operator:StepFunctionExecutionSensor`
+
     :param execution_arn: execution_arn to check the state of
     :param aws_conn_id: aws connection to use, defaults to 'aws_default'
     """
diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml
index c1cfe7abf0..13b84639f5 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -190,6 +190,8 @@ integrations:
   - integration-name: AWS Step Functions
     external-doc-url: https://aws.amazon.com/step-functions/
     logo: /integration-logos/aws/AWS-Step-Functions_light-bg@4x.png
+    how-to-guide:
+      - /docs/apache-airflow-providers-amazon/operators/step_functions.rst
     tags: [aws]
   - integration-name: AWS Database Migration Service
     external-doc-url: https://aws.amazon.com/dms/
diff --git a/docs/apache-airflow-providers-amazon/operators/step_functions.rst b/docs/apache-airflow-providers-amazon/operators/step_functions.rst
new file mode 100644
index 0000000000..8d14af78fc
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/step_functions.rst
@@ -0,0 +1,78 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+AWS Step Functions Operators
+============================
+
+`AWS Step Functions <https://aws.amazon.com/step-functions/>`__ makes it easy to coordinate the components
+of distributed applications as a series of steps in a visual workflow. You can quickly build and run state
+machines to execute the steps of your application in a reliable and scalable fashion.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+.. _howto/operator:StepFunctionStartExecutionOperator:
+
+AWS Step Functions Start Execution Operator
+"""""""""""""""""""""""""""""""""""""""""""
+
+To start a new AWS Step Functions State Machine execution
+use :class:`~airflow.providers.amazon.aws.operators.step_function.StepFunctionStartExecutionOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_step_function_start_execution]
+    :end-before: [END howto_operator_step_function_start_execution]
+
+.. _howto/operator:StepFunctionExecutionSensor:
+
+AWS Step Functions Execution Sensor
+"""""""""""""""""""""""""""""""""""
+
+To wait on the state of an AWS Step Function State Machine execution until it reaches a terminal state you can
+use :class:`~airflow.providers.amazon.aws.sensors.step_function.StepFunctionExecutionSensor`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_step_function_execution_sensor]
+    :end-before: [END howto_operator_step_function_execution_sensor]
+
+.. _howto/operator:StepFunctionGetExecutionOutputOperator:
+
+AWS Step Functions Get Execution Output Operator
+""""""""""""""""""""""""""""""""""""""""""""""""
+
+To fetch the output from an AWS Step Function State Machine execution you can
+use :class:`~airflow.providers.amazon.aws.operators.step_function.StepFunctionGetExecutionOutputOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_step_function_get_execution_output]
+    :end-before: [END howto_operator_step_function_get_execution_output]
+
+References
+----------
+
+For further information, look at:
+
+* `Boto3 Library Documentation for Step Functions <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html>`__