You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/18 19:46:44 UTC

[airflow] branch main updated: Convert Glue Sample DAG to System Test (#25136)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c48c185627 Convert Glue Sample DAG to System Test (#25136)
c48c185627 is described below

commit c48c18562713e8682a490c8f3ab51891fa8974ec
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Mon Jul 18 19:46:37 2022 +0000

    Convert Glue Sample DAG to System Test (#25136)
---
 .../amazon/aws/example_dags/example_glue.py        | 123 -----------
 airflow/providers/amazon/aws/operators/glue.py     |   8 +-
 .../providers/amazon/aws/operators/glue_crawler.py |   3 +-
 .../providers/amazon/aws/sensors/glue_crawler.py   |   4 +-
 .../operators/glue.rst                             |   8 +-
 tests/system/providers/amazon/aws/example_glue.py  | 233 +++++++++++++++++++++
 6 files changed, 249 insertions(+), 130 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_glue.py b/airflow/providers/amazon/aws/example_dags/example_glue.py
deleted file mode 100644
index 65417dc24f..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_glue.py
+++ /dev/null
@@ -1,123 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from datetime import datetime
-from os import getenv
-
-from airflow import DAG
-from airflow.decorators import task
-from airflow.models.baseoperator import chain
-from airflow.providers.amazon.aws.hooks.s3 import S3Hook
-from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
-from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
-from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
-from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
-
-GLUE_DATABASE_NAME = getenv('GLUE_DATABASE_NAME', 'glue_database_name')
-GLUE_EXAMPLE_S3_BUCKET = getenv('GLUE_EXAMPLE_S3_BUCKET', 'glue_example_s3_bucket')
-
-# Role needs putobject/getobject access to the above bucket as well as the glue
-# service role, see docs here: https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html
-GLUE_CRAWLER_ROLE = getenv('GLUE_CRAWLER_ROLE', 'glue_crawler_role')
-GLUE_CRAWLER_NAME = 'example_crawler'
-GLUE_CRAWLER_CONFIG = {
-    'Name': GLUE_CRAWLER_NAME,
-    'Role': GLUE_CRAWLER_ROLE,
-    'DatabaseName': GLUE_DATABASE_NAME,
-    'Targets': {
-        'S3Targets': [
-            {
-                'Path': f'{GLUE_EXAMPLE_S3_BUCKET}/input',
-            }
-        ]
-    },
-}
-
-# Example csv data used as input to the example AWS Glue Job.
-EXAMPLE_CSV = '''
-apple,0.5
-milk,2.5
-bread,4.0
-'''
-
-# Example Spark script to operate on the above sample csv data.
-EXAMPLE_SCRIPT = f'''
-from pyspark.context import SparkContext
-from awsglue.context import GlueContext
-
-glueContext = GlueContext(SparkContext.getOrCreate())
-datasource = glueContext.create_dynamic_frame.from_catalog(
-             database='{GLUE_DATABASE_NAME}', table_name='input')
-print('There are %s items in the table' % datasource.count())
-
-datasource.toDF().write.format('csv').mode("append").save('s3://{GLUE_EXAMPLE_S3_BUCKET}/output')
-'''
-
-
-@task(task_id='setup__upload_artifacts_to_s3')
-def upload_artifacts_to_s3():
-    '''Upload example CSV input data and an example Spark script to be used by the Glue Job'''
-    s3_hook = S3Hook()
-    s3_load_kwargs = {"replace": True, "bucket_name": GLUE_EXAMPLE_S3_BUCKET}
-    s3_hook.load_string(string_data=EXAMPLE_CSV, key='input/input.csv', **s3_load_kwargs)
-    s3_hook.load_string(string_data=EXAMPLE_SCRIPT, key='etl_script.py', **s3_load_kwargs)
-
-
-with DAG(
-    dag_id='example_glue',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    tags=['example'],
-    catchup=False,
-) as glue_dag:
-
-    setup_upload_artifacts_to_s3 = upload_artifacts_to_s3()
-
-    # [START howto_operator_glue_crawler]
-    crawl_s3 = GlueCrawlerOperator(
-        task_id='crawl_s3',
-        config=GLUE_CRAWLER_CONFIG,
-        wait_for_completion=False,
-    )
-    # [END howto_operator_glue_crawler]
-
-    # [START howto_sensor_glue_crawler]
-    wait_for_crawl = GlueCrawlerSensor(task_id='wait_for_crawl', crawler_name=GLUE_CRAWLER_NAME)
-    # [END howto_sensor_glue_crawler]
-
-    # [START howto_operator_glue]
-    job_name = 'example_glue_job'
-    submit_glue_job = GlueJobOperator(
-        task_id='submit_glue_job',
-        job_name=job_name,
-        wait_for_completion=False,
-        script_location=f's3://{GLUE_EXAMPLE_S3_BUCKET}/etl_script.py',
-        s3_bucket=GLUE_EXAMPLE_S3_BUCKET,
-        iam_role_name=GLUE_CRAWLER_ROLE.split('/')[-1],
-        create_job_kwargs={'GlueVersion': '3.0', 'NumberOfWorkers': 2, 'WorkerType': 'G.1X'},
-    )
-    # [END howto_operator_glue]
-
-    # [START howto_sensor_glue]
-    wait_for_job = GlueJobSensor(
-        task_id='wait_for_job',
-        job_name=job_name,
-        # Job ID extracted from previous Glue Job Operator task
-        run_id=submit_glue_job.output,
-    )
-    # [END howto_sensor_glue]
-
-    chain(setup_upload_artifacts_to_s3, crawl_s3, wait_for_crawl, submit_glue_job, wait_for_job)
diff --git a/airflow/providers/amazon/aws/operators/glue.py b/airflow/providers/amazon/aws/operators/glue.py
index f5cc8c104b..4d42b0b440 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -53,7 +53,13 @@ class GlueJobOperator(BaseOperator):
     :param wait_for_completion: Whether or not wait for job run completion. (default: True)
     """
 
-    template_fields: Sequence[str] = ('script_args',)
+    template_fields: Sequence[str] = (
+        'job_name',
+        'script_location',
+        'script_args',
+        's3_bucket',
+        'iam_role_name',
+    )
     template_ext: Sequence[str] = ()
     template_fields_renderers = {
         "script_args": "json",
diff --git a/airflow/providers/amazon/aws/operators/glue_crawler.py b/airflow/providers/amazon/aws/operators/glue_crawler.py
index f771e1b80c..c75febf267 100644
--- a/airflow/providers/amazon/aws/operators/glue_crawler.py
+++ b/airflow/providers/amazon/aws/operators/glue_crawler.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import warnings
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Sequence
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -42,6 +42,7 @@ class GlueCrawlerOperator(BaseOperator):
     :param wait_for_completion: Whether or not wait for crawl execution completion. (default: True)
     """
 
+    template_fields: Sequence[str] = ('config',)
     ui_color = '#ededed'
 
     def __init__(
diff --git a/airflow/providers/amazon/aws/sensors/glue_crawler.py b/airflow/providers/amazon/aws/sensors/glue_crawler.py
index 52944ce2eb..fa2c714ee4 100644
--- a/airflow/providers/amazon/aws/sensors/glue_crawler.py
+++ b/airflow/providers/amazon/aws/sensors/glue_crawler.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import warnings
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, Optional, Sequence
 
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook
@@ -39,6 +39,8 @@ class GlueCrawlerSensor(BaseSensorOperator):
     :param aws_conn_id: aws connection to use, defaults to 'aws_default'
     """
 
+    template_fields: Sequence[str] = ('crawler_name',)
+
     def __init__(self, *, crawler_name: str, aws_conn_id: str = 'aws_default', **kwargs) -> None:
         super().__init__(**kwargs)
         self.crawler_name = crawler_name
diff --git a/docs/apache-airflow-providers-amazon/operators/glue.rst b/docs/apache-airflow-providers-amazon/operators/glue.rst
index 64726ef473..0690ec5b72 100644
--- a/docs/apache-airflow-providers-amazon/operators/glue.rst
+++ b/docs/apache-airflow-providers-amazon/operators/glue.rst
@@ -41,7 +41,7 @@ AWS Glue Crawlers allow you to easily extract data from various data sources.
 To create a new AWS Glue Crawler or run an existing one you can
 use :class:`~airflow.providers.amazon.aws.operators.glue_crawler.GlueCrawlerOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_glue.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_glue_crawler]
@@ -59,7 +59,7 @@ Submit an AWS Glue job
 
 To submit a new AWS Glue job you can use :class:`~airflow.providers.amazon.aws.operators.glue.GlueJobOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_glue.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_glue]
@@ -80,7 +80,7 @@ Wait on an AWS Glue crawler state
 To wait on the state of an AWS Glue crawler execution until it reaches a terminal state you can
 use :class:`~airflow.providers.amazon.aws.sensors.glue_crawler.GlueCrawlerSensor`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_glue.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_glue_crawler]
@@ -94,7 +94,7 @@ Wait on an AWS Glue job state
 To wait on the state of an AWS Glue Job until it reaches a terminal state you can
 use :class:`~airflow.providers.amazon.aws.sensors.glue.GlueJobSensor`
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_glue.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_glue]
diff --git a/tests/system/providers/amazon/aws/example_glue.py b/tests/system/providers/amazon/aws/example_glue.py
new file mode 100644
index 0000000000..cf6ad57d8c
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_glue.py
@@ -0,0 +1,233 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from datetime import datetime
+from typing import List, Optional, Tuple
+
+import boto3
+from botocore.client import BaseClient
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.operators.python import get_current_context
+from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
+from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
+from airflow.providers.amazon.aws.operators.s3 import (
+    S3CreateBucketOperator,
+    S3CreateObjectOperator,
+    S3DeleteBucketOperator,
+)
+from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
+from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder, purge_logs
+
+DAG_ID = 'example_glue'
+
+# Externally fetched variables:
+# Role needs S3 putobject/getobject access as well as the glue service role,
+# see docs here: https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html
+ROLE_ARN_KEY = 'ROLE_ARN'
+
+sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
+
+# Example csv data used as input to the example AWS Glue Job.
+EXAMPLE_CSV = '''
+apple,0.5
+milk,2.5
+bread,4.0
+'''
+
+# Example Spark script to operate on the above sample csv data.
+EXAMPLE_SCRIPT = '''
+from pyspark.context import SparkContext
+from awsglue.context import GlueContext
+
+glueContext = GlueContext(SparkContext.getOrCreate())
+datasource = glueContext.create_dynamic_frame.from_catalog(
+             database='{db_name}', table_name='input')
+print('There are %s items in the table' % datasource.count())
+
+datasource.toDF().write.format('csv').mode("append").save('s3://{bucket_name}/output')
+'''
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_logs(job_id: str, glue_crawler_name: str) -> None:
+    """
+    Glue generates four Cloudwatch log groups and multiple log streams and leaves them.
+    """
+    generated_log_groups: List[Tuple[str, Optional[str]]] = [
+        # Format: ('log group name', 'log stream prefix')
+        ('/aws-glue/crawlers', glue_crawler_name),
+        ('/aws-glue/jobs/logs-v2', job_id),
+        ('/aws-glue/jobs/error', job_id),
+        ('/aws-glue/jobs/output', job_id),
+    ]
+
+    purge_logs(generated_log_groups)
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def glue_cleanup(glue_crawler_name: str, glue_job_name: str, glue_db_name: str) -> None:
+    client: BaseClient = boto3.client('glue')
+
+    client.delete_crawler(Name=glue_crawler_name)
+    client.delete_job(JobName=glue_job_name)
+    client.delete_database(Name=glue_db_name)
+
+
+@task
+def set_up(env_id, role_arn):
+    glue_crawler_name = f'{env_id}_crawler'
+    glue_db_name = f'{env_id}_glue_db'
+    glue_job_name = f'{env_id}_glue_job'
+    bucket_name = f'{env_id}-bucket'
+
+    role_name = role_arn.split('/')[-1]
+
+    glue_crawler_config = {
+        'Name': glue_crawler_name,
+        'Role': role_arn,
+        'DatabaseName': glue_db_name,
+        'Targets': {'S3Targets': [{'Path': f'{bucket_name}/input'}]},
+    }
+
+    ti = get_current_context()['ti']
+    ti.xcom_push(key='bucket_name', value=bucket_name)
+    ti.xcom_push(key='glue_db_name', value=glue_db_name)
+    ti.xcom_push(key='glue_crawler_config', value=glue_crawler_config)
+    ti.xcom_push(key='glue_crawler_name', value=glue_crawler_name)
+    ti.xcom_push(key='glue_job_name', value=glue_job_name)
+    ti.xcom_push(key='role_name', value=role_name)
+
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+
+    test_setup = set_up(
+        env_id=test_context[ENV_ID_KEY],
+        role_arn=test_context[ROLE_ARN_KEY],
+    )
+
+    create_bucket = S3CreateBucketOperator(
+        task_id='create_bucket',
+        bucket_name=test_setup['bucket_name'],
+    )
+
+    upload_csv = S3CreateObjectOperator(
+        task_id='upload_csv',
+        s3_bucket=test_setup['bucket_name'],
+        s3_key='input/input.csv',
+        data=EXAMPLE_CSV,
+        replace=True,
+    )
+
+    upload_script = S3CreateObjectOperator(
+        task_id='upload_script',
+        s3_bucket=test_setup['bucket_name'],
+        s3_key='etl_script.py',
+        data=EXAMPLE_SCRIPT.format(db_name=test_setup['glue_db_name'], bucket_name=test_setup['bucket_name']),
+        replace=True,
+    )
+
+    # [START howto_operator_glue_crawler]
+    crawl_s3 = GlueCrawlerOperator(
+        task_id='crawl_s3',
+        config=test_setup['glue_crawler_config'],
+        # Waits by default, set False to test the Sensor below
+        wait_for_completion=False,
+    )
+    # [END howto_operator_glue_crawler]
+
+    # [START howto_sensor_glue_crawler]
+    wait_for_crawl = GlueCrawlerSensor(
+        task_id='wait_for_crawl',
+        crawler_name=test_setup['glue_crawler_name'],
+    )
+    # [END howto_sensor_glue_crawler]
+
+    # [START howto_operator_glue]
+    submit_glue_job = GlueJobOperator(
+        task_id='submit_glue_job',
+        job_name=test_setup['glue_job_name'],
+        script_location=f's3://{test_setup["bucket_name"]}/etl_script.py',
+        s3_bucket=test_setup['bucket_name'],
+        iam_role_name=test_setup['role_name'],
+        create_job_kwargs={'GlueVersion': '3.0', 'NumberOfWorkers': 2, 'WorkerType': 'G.1X'},
+        # Waits by default, set False to test the Sensor below
+        wait_for_completion=False,
+    )
+    # [END howto_operator_glue]
+
+    # [START howto_sensor_glue]
+    wait_for_job = GlueJobSensor(
+        task_id='wait_for_job',
+        job_name=test_setup['glue_job_name'],
+        # Job ID extracted from previous Glue Job Operator task
+        run_id=submit_glue_job.output,
+    )
+    # [END howto_sensor_glue]
+
+    delete_bucket = S3DeleteBucketOperator(
+        task_id='delete_bucket',
+        trigger_rule=TriggerRule.ALL_DONE,
+        bucket_name=test_setup['bucket_name'],
+        force_delete=True,
+    )
+
+    clean_up = glue_cleanup(
+        test_setup['glue_crawler_name'],
+        test_setup['glue_job_name'],
+        test_setup['glue_db_name'],
+    )
+
+    chain(
+        # TEST SETUP
+        test_context,
+        test_setup,
+        create_bucket,
+        upload_csv,
+        upload_script,
+        # TEST BODY
+        crawl_s3,
+        wait_for_crawl,
+        submit_glue_job,
+        wait_for_job,
+        # TEST TEARDOWN
+        clean_up,
+        delete_bucket,
+        delete_logs(submit_glue_job.output, test_setup['glue_crawler_name']),
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)