You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/19 07:04:07 UTC

[airflow] branch main updated: AIP-47 - Migrate redshift DAGs to new design #22438 (#24239)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c6d9bccdb6 AIP-47 - Migrate redshift DAGs to new design #22438 (#24239)
c6d9bccdb6 is described below

commit c6d9bccdb6c5af644c5570f0524f2207832383b6
Author: chethanuk-plutoflume <ch...@tessian.com>
AuthorDate: Tue Jul 19 08:03:59 2022 +0100

    AIP-47 - Migrate redshift DAGs to new design #22438 (#24239)
---
 .../operators/redshift_cluster.rst                 | 10 +++++-----
 .../operators/redshift_data.rst                    |  2 +-
 .../operators/redshift_sql.rst                     |  4 ++--
 .../operators/transfer/redshift_to_s3.rst          |  2 +-
 tests/system/providers/amazon/__init__.py          |  1 +
 tests/system/providers/amazon/aws/__init__.py      |  1 +
 .../amazon/aws}/example_redshift_cluster.py        | 19 +++++++++++++++++-
 .../aws}/example_redshift_data_execute_sql.py      | 23 +++++++++++++++++-----
 .../providers/amazon/aws}/example_redshift_sql.py  | 20 ++++++++++++++++++-
 .../amazon/aws}/example_redshift_to_s3.py          | 11 ++++++++++-
 10 files changed, 76 insertions(+), 17 deletions(-)

diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
index a0875c3d77..b1c708850e 100644
--- a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
@@ -40,7 +40,7 @@ Create an Amazon Redshift cluster
 To create an Amazon Redshift Cluster with the specified parameters you can use
 :class:`~airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftCreateClusterOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_redshift_cluster]
@@ -54,7 +54,7 @@ Resume an Amazon Redshift cluster
 To resume a 'paused' Amazon Redshift cluster you can use
 :class:`RedshiftResumeClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_redshift_resume_cluster]
@@ -68,7 +68,7 @@ Pause an Amazon Redshift cluster
 To pause an 'available' Amazon Redshift cluster you can use
 :class:`RedshiftPauseClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_redshift_pause_cluster]
@@ -82,7 +82,7 @@ Delete an Amazon Redshift cluster
 To delete an Amazon Redshift cluster you can use
 :class:`RedshiftDeleteClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_redshift_delete_cluster]
@@ -99,7 +99,7 @@ Wait on an Amazon Redshift cluster state
 To check the state of an Amazon Redshift Cluster until it reaches the target state or another terminal
 state you can use :class:`~airflow.providers.amazon.aws.sensors.redshift_cluster.RedshiftClusterSensor`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_redshift_cluster]
diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_data.rst b/docs/apache-airflow-providers-amazon/operators/redshift_data.rst
index eb9dc51129..dea4472f00 100644
--- a/docs/apache-airflow-providers-amazon/operators/redshift_data.rst
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_data.rst
@@ -43,7 +43,7 @@ statements against an Amazon Redshift cluster.
 This differs from ``RedshiftSQLOperator`` in that it allows users to query and retrieve data via the AWS API and avoid
 the necessity of a Postgres connection.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_redshift_data]
diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst b/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
index 92ff0e1c4b..fc9809ac46 100644
--- a/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
@@ -42,7 +42,7 @@ Execute a SQL query
 To execute a SQL query against an Amazon Redshift cluster without using a Postgres connection,
 please check ``RedshiftDataOperator``.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_sql.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_sql.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_redshift_sql]
@@ -51,7 +51,7 @@ please check ``RedshiftDataOperator``.
 ``RedshiftSQLOperator`` supports the ``parameters`` attribute which allows us to dynamically pass
 parameters into SQL statements.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_sql.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_sql.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_redshift_sql_with_params]
diff --git a/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst b/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst
index 2235bf8bb3..3707cf2b4a 100644
--- a/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst
+++ b/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst
@@ -42,7 +42,7 @@ To get more information about this operator visit:
 
 Example usage:
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_to_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_to_s3.py
     :language: python
     :dedent: 4
     :start-after: [START howto_transfer_redshift_to_s3]
diff --git a/tests/system/providers/amazon/__init__.py b/tests/system/providers/amazon/__init__.py
index 13a83393a9..217e5db960 100644
--- a/tests/system/providers/amazon/__init__.py
+++ b/tests/system/providers/amazon/__init__.py
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
diff --git a/tests/system/providers/amazon/aws/__init__.py b/tests/system/providers/amazon/aws/__init__.py
index 13a83393a9..217e5db960 100644
--- a/tests/system/providers/amazon/aws/__init__.py
+++ b/tests/system/providers/amazon/aws/__init__.py
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py b/tests/system/providers/amazon/aws/example_redshift_cluster.py
similarity index 84%
rename from airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
rename to tests/system/providers/amazon/aws/example_redshift_cluster.py
index dc6deead3c..ed0e2219af 100644
--- a/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
+++ b/tests/system/providers/amazon/aws/example_redshift_cluster.py
@@ -28,11 +28,15 @@ from airflow.providers.amazon.aws.operators.redshift_cluster import (
     RedshiftResumeClusterOperator,
 )
 from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import set_env_id
 
+ENV_ID = set_env_id()
+DAG_ID = 'example_redshift_cluster'
 REDSHIFT_CLUSTER_IDENTIFIER = getenv("REDSHIFT_CLUSTER_IDENTIFIER", "redshift-cluster-1")
 
 with DAG(
-    dag_id="example_redshift_cluster",
+    dag_id=DAG_ID,
     start_date=datetime(2021, 1, 1),
     schedule_interval=None,
     catchup=False,
@@ -85,6 +89,7 @@ with DAG(
     task_delete_cluster = RedshiftDeleteClusterOperator(
         task_id="delete_cluster",
         cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
+        trigger_rule=TriggerRule.ALL_DONE,
     )
     # [END howto_operator_redshift_delete_cluster]
 
@@ -96,3 +101,15 @@ with DAG(
         task_resume_cluster,
         task_delete_cluster,
     )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py b/tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py
similarity index 71%
rename from airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py
rename to tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py
index cfa3e4cefc..d5ddb6e213 100644
--- a/airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py
+++ b/tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py
@@ -16,16 +16,18 @@
 # under the License.
 
 from datetime import datetime
-from os import getenv
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
 from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
+from tests.system.providers.amazon.aws.utils import fetch_variable, set_env_id
 
-REDSHIFT_CLUSTER_IDENTIFIER = getenv("REDSHIFT_CLUSTER_IDENTIFIER", "redshift_cluster_identifier")
-REDSHIFT_DATABASE = getenv("REDSHIFT_DATABASE", "redshift_database")
-REDSHIFT_DATABASE_USER = getenv("REDSHIFT_DATABASE_USER", "awsuser")
+ENV_ID = set_env_id()
+DAG_ID = 'example_redshift_data_execute_sql'
+REDSHIFT_CLUSTER_IDENTIFIER = fetch_variable("REDSHIFT_CLUSTER_IDENTIFIER", "redshift_cluster_identifier")
+REDSHIFT_DATABASE = fetch_variable("REDSHIFT_DATABASE", "redshift_database")
+REDSHIFT_DATABASE_USER = fetch_variable("REDSHIFT_DATABASE_USER", "awsuser")
 
 REDSHIFT_QUERY = """
 SELECT table_schema,
@@ -51,7 +53,7 @@ def output_query_results(statement_id):
 
 
 with DAG(
-    dag_id="example_redshift_data_execute_sql",
+    dag_id=DAG_ID,
     start_date=datetime(2021, 1, 1),
     schedule_interval=None,
     catchup=False,
@@ -70,3 +72,14 @@ with DAG(
     # [END howto_operator_redshift_data]
 
     task_output = output_query_results(task_query.output)
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift_sql.py b/tests/system/providers/amazon/aws/example_redshift_sql.py
similarity index 81%
rename from airflow/providers/amazon/aws/example_dags/example_redshift_sql.py
rename to tests/system/providers/amazon/aws/example_redshift_sql.py
index a71ef71934..17e386c7de 100644
--- a/airflow/providers/amazon/aws/example_dags/example_redshift_sql.py
+++ b/tests/system/providers/amazon/aws/example_redshift_sql.py
@@ -21,9 +21,14 @@ from datetime import datetime
 from airflow import DAG
 from airflow.models.baseoperator import chain
 from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import set_env_id
+
+ENV_ID = set_env_id()
+DAG_ID = 'example_redshift_sql'
 
 with DAG(
-    dag_id="example_redshift_sql",
+    dag_id=DAG_ID,
     start_date=datetime(2021, 1, 1),
     schedule_interval=None,
     catchup=False,
@@ -69,6 +74,7 @@ with DAG(
     teardown__task_drop_table = RedshiftSQLOperator(
         task_id='teardown__drop_table',
         sql='DROP TABLE IF EXISTS fruit',
+        trigger_rule=TriggerRule.ALL_DONE,
     )
 
     chain(
@@ -77,3 +83,15 @@ with DAG(
         [task_select_data, task_select_filtered_data],
         teardown__task_drop_table,
     )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift_to_s3.py b/tests/system/providers/amazon/aws/example_redshift_to_s3.py
similarity index 82%
rename from airflow/providers/amazon/aws/example_dags/example_redshift_to_s3.py
rename to tests/system/providers/amazon/aws/example_redshift_to_s3.py
index 8116e02dc1..a7f50e5771 100644
--- a/airflow/providers/amazon/aws/example_dags/example_redshift_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_redshift_to_s3.py
@@ -20,13 +20,16 @@ from os import getenv
 
 from airflow import DAG
 from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
+from tests.system.providers.amazon.aws.utils import set_env_id
 
+ENV_ID = set_env_id()
+DAG_ID = 'example_redshift_to_s3'
 S3_BUCKET_NAME = getenv("S3_BUCKET_NAME", "s3_bucket_name")
 S3_KEY = getenv("S3_KEY", "s3_key")
 REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "redshift_table")
 
 with DAG(
-    dag_id="example_redshift_to_s3",
+    dag_id=DAG_ID,
     start_date=datetime(2021, 1, 1),
     schedule_interval=None,
     catchup=False,
@@ -41,3 +44,9 @@ with DAG(
         table=REDSHIFT_TABLE,
     )
     # [END howto_transfer_redshift_to_s3]
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)