You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/10 05:43:43 UTC
[airflow] branch main updated: System test for Dynamo DB (#26729)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e68c8b9d52 System test for Dynamo DB (#26729)
e68c8b9d52 is described below
commit e68c8b9d52399ed1470b2d54e6dd13f3380a7788
Author: syedahsn <10...@users.noreply.github.com>
AuthorDate: Sun Oct 9 22:43:34 2022 -0700
System test for Dynamo DB (#26729)
---
.../aws/example_dags/example_dynamodb_to_s3.py | 73 ----------
.../operators/transfer/dynamodb_to_s3.rst | 4 +-
.../providers/amazon/aws/example_dynamodb_to_s3.py | 162 +++++++++++++++++++++
3 files changed, 164 insertions(+), 75 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py b/airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py
deleted file mode 100644
index 844e0dec88..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from datetime import datetime
-from os import environ
-
-from airflow import DAG
-from airflow.models.baseoperator import chain
-from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator
-
-TABLE_NAME = environ.get('DYNAMO_TABLE_NAME', 'ExistingDynamoDbTableName')
-BUCKET_NAME = environ.get('S3_BUCKET_NAME', 'ExistingS3BucketName')
-
-
-with DAG(
- dag_id='example_dynamodb_to_s3',
- start_date=datetime(2021, 1, 1),
- tags=['example'],
- catchup=False,
-) as dag:
- # [START howto_transfer_dynamodb_to_s3]
- backup_db = DynamoDBToS3Operator(
- task_id='backup_db',
- dynamodb_table_name=TABLE_NAME,
- s3_bucket_name=BUCKET_NAME,
- # Max output file size in bytes. If the Table is too large, multiple files will be created.
- file_size=1000,
- )
- # [END howto_transfer_dynamodb_to_s3]
-
- # [START howto_transfer_dynamodb_to_s3_segmented]
- # Segmenting allows the transfer to be parallelized into {segment} number of parallel tasks.
- backup_db_segment_1 = DynamoDBToS3Operator(
- task_id='backup-1',
- dynamodb_table_name=TABLE_NAME,
- s3_bucket_name=BUCKET_NAME,
- # Max output file size in bytes. If the Table is too large, multiple files will be created.
- file_size=1000,
- dynamodb_scan_kwargs={
- "TotalSegments": 2,
- "Segment": 0,
- },
- )
-
- backup_db_segment_2 = DynamoDBToS3Operator(
- task_id="backup-2",
- dynamodb_table_name=TABLE_NAME,
- s3_bucket_name=BUCKET_NAME,
- # Max output file size in bytes. If the Table is too large, multiple files will be created.
- file_size=1000,
- dynamodb_scan_kwargs={
- "TotalSegments": 2,
- "Segment": 1,
- },
- )
- # [END howto_transfer_dynamodb_to_s3_segmented]
-
- chain(backup_db, [backup_db_segment_1, backup_db_segment_2])
diff --git a/docs/apache-airflow-providers-amazon/operators/transfer/dynamodb_to_s3.rst b/docs/apache-airflow-providers-amazon/operators/transfer/dynamodb_to_s3.rst
index 61727140a0..508f3d4fbc 100644
--- a/docs/apache-airflow-providers-amazon/operators/transfer/dynamodb_to_s3.rst
+++ b/docs/apache-airflow-providers-amazon/operators/transfer/dynamodb_to_s3.rst
@@ -48,7 +48,7 @@ To get more information visit:
Example usage:
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
:language: python
:dedent: 4
:start-after: [START howto_transfer_dynamodb_to_s3]
@@ -57,7 +57,7 @@ Example usage:
To parallelize the replication, users can create multiple ``DynamoDBToS3Operator`` tasks using the
``TotalSegments`` parameter. For instance to replicate with parallelism of 2, create two tasks:
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
:language: python
:dedent: 4
:start-after: [START howto_transfer_dynamodb_to_s3_segmented]
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
new file mode 100644
index 0000000000..92128bc7fd
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -0,0 +1,162 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+import boto3
+
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.models.dag import DAG
+from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
+from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+DAG_ID = 'example_dynamodb_to_s3'
+
+sys_test_context_task = SystemTestContextBuilder().build()
+
+TABLE_ATTRIBUTES = [
+ {'AttributeName': 'ID', 'AttributeType': 'S'},
+ {'AttributeName': 'Value', 'AttributeType': 'S'},
+]
+TABLE_KEY_SCHEMA = [
+ {'AttributeName': 'ID', 'KeyType': 'HASH'},
+ {'AttributeName': 'Value', 'KeyType': 'RANGE'},
+]
+TABLE_THROUGHPUT = {'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1}
+S3_KEY_PREFIX = 'dynamodb-segmented-file'
+
+
+@task
+def set_up_table(table_name: str):
+ dynamo_resource = boto3.resource('dynamodb')
+ table = dynamo_resource.create_table(
+ AttributeDefinitions=TABLE_ATTRIBUTES,
+ TableName=table_name,
+ KeySchema=TABLE_KEY_SCHEMA,
+ ProvisionedThroughput=TABLE_THROUGHPUT,
+ )
+ boto3.client('dynamodb').get_waiter('table_exists').wait(
+ TableName=table_name, WaiterConfig={'Delay': 10, 'MaxAttempts': 10}
+ )
+ table.put_item(Item={'ID': '123', 'Value': 'Testing'})
+
+
+@task
+def wait_for_bucket(s3_bucket_name):
+ waiter = boto3.client('s3').get_waiter('bucket_exists')
+ waiter.wait(Bucket=s3_bucket_name)
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_dynamodb_table(table_name: str):
+ boto3.resource('dynamodb').Table(table_name).delete()
+ boto3.client('dynamodb').get_waiter('table_not_exists').wait(
+ TableName=table_name, WaiterConfig={'Delay': 10, 'MaxAttempts': 10}
+ )
+
+
+with DAG(
+ dag_id=DAG_ID,
+ schedule='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=['example'],
+) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context[ENV_ID_KEY]
+ table_name = f'{env_id}-dynamodb-table'
+ bucket_name = f'{env_id}-dynamodb-bucket'
+
+ create_table = set_up_table(table_name=table_name)
+
+ create_bucket = S3CreateBucketOperator(task_id='create_bucket', bucket_name=bucket_name)
+
+ # [START howto_transfer_dynamodb_to_s3]
+ backup_db = DynamoDBToS3Operator(
+ task_id='backup_db',
+ dynamodb_table_name=table_name,
+ s3_bucket_name=bucket_name,
+ # Max output file size in bytes. If the Table is too large, multiple files will be created.
+ file_size=20,
+ )
+ # [END howto_transfer_dynamodb_to_s3]
+
+ # [START howto_transfer_dynamodb_to_s3_segmented]
+ # Segmenting allows the transfer to be parallelized into {segment} number of parallel tasks.
+ backup_db_segment_1 = DynamoDBToS3Operator(
+ task_id='backup_db_segment_1',
+ dynamodb_table_name=table_name,
+ s3_bucket_name=bucket_name,
+ # Max output file size in bytes. If the Table is too large, multiple files will be created.
+ file_size=1000,
+ s3_key_prefix=f'{S3_KEY_PREFIX}-1-',
+ dynamodb_scan_kwargs={
+ "TotalSegments": 2,
+ "Segment": 0,
+ },
+ )
+
+ backup_db_segment_2 = DynamoDBToS3Operator(
+ task_id="backup_db_segment_2",
+ dynamodb_table_name=table_name,
+ s3_bucket_name=bucket_name,
+ # Max output file size in bytes. If the Table is too large, multiple files will be created.
+ file_size=1000,
+ s3_key_prefix=f'{S3_KEY_PREFIX}-2-',
+ dynamodb_scan_kwargs={
+ "TotalSegments": 2,
+ "Segment": 1,
+ },
+ )
+ # [END howto_transfer_dynamodb_to_s3_segmented]
+ delete_table = delete_dynamodb_table(table_name=table_name)
+
+ delete_bucket = S3DeleteBucketOperator(
+ task_id='delete_bucket',
+ bucket_name=bucket_name,
+ trigger_rule=TriggerRule.ALL_DONE,
+ force_delete=True,
+ )
+
+ chain(
+ # TEST SETUP
+ test_context,
+ create_table,
+ create_bucket,
+ wait_for_bucket(s3_bucket_name=bucket_name),
+ # TEST BODY
+ backup_db,
+ backup_db_segment_1,
+ backup_db_segment_2,
+ # TEST TEARDOWN
+ delete_table,
+ delete_bucket,
+ )
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)