You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/27 22:01:43 UTC

[GitHub] [airflow] ferruzzi opened a new pull request, #26733: RDS Instance System Tests

ferruzzi opened a new pull request, #26733:
URL: https://github.com/apache/airflow/pull/26733

   Follows the template set forward in https://github.com/apache/airflow/pull/24643
   
   Fixed, squashed, and rebased version of https://github.com/apache/airflow/pull/26683 which I can't reopen.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #26733: RDS Instance System Tests

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #26733:
URL: https://github.com/apache/airflow/pull/26733


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #26733: RDS Instance System Tests

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #26733:
URL: https://github.com/apache/airflow/pull/26733#discussion_r982628251


##########
tests/system/providers/amazon/aws/example_rds_instance.py:
##########
@@ -26,56 +25,78 @@
     RdsDeleteDbInstanceOperator,
 )
 from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor
-from tests.system.providers.amazon.aws.utils import set_env_id
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+sys_test_context_task = SystemTestContextBuilder().build()
 
-ENV_ID = set_env_id()
-DAG_ID = "example_rds_instance"
+DAG_ID = 'example_rds_instance'
 
-RDS_DB_IDENTIFIER = f'{ENV_ID}-database'
 RDS_USERNAME = 'database_username'
 # NEVER store your production password in plaintext in a DAG like this.
 # Use Airflow Secrets or a secret manager for this in production.
 RDS_PASSWORD = 'database_password'
 
 with DAG(
     dag_id=DAG_ID,
-    schedule=None,
+    schedule='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
+    test_context = sys_test_context_task()
+    rds_db_identifier = f'{test_context[ENV_ID_KEY]}-database'
+
     # [START howto_operator_rds_create_db_instance]
     create_db_instance = RdsCreateDbInstanceOperator(
         task_id='create_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
-        db_instance_class="db.t4g.micro",
-        engine="postgres",
+        db_instance_identifier=rds_db_identifier,
+        db_instance_class='db.t4g.micro',
+        engine='postgres',
         rds_kwargs={
-            "MasterUsername": RDS_USERNAME,
-            "MasterUserPassword": RDS_PASSWORD,
-            "AllocatedStorage": 20,
+            'MasterUsername': RDS_USERNAME,
+            'MasterUserPassword': RDS_PASSWORD,
+            'AllocatedStorage': 20,
         },
     )
     # [END howto_operator_rds_create_db_instance]
 
+    # RdsCreateDbInstanceOperator waits by default, setting as False to test the Sensor below.
+    create_db_instance.wait_for_completion = False
+
     # [START howto_sensor_rds_instance]
-    db_instance_available = RdsDbSensor(
-        task_id="db_instance_available",
-        db_identifier=RDS_DB_IDENTIFIER,
+    await_db_instance = RdsDbSensor(
+        task_id='await_db_instance',
+        db_identifier=rds_db_identifier,
     )
     # [END howto_sensor_rds_instance]
 
     # [START howto_operator_rds_delete_db_instance]
     delete_db_instance = RdsDeleteDbInstanceOperator(
         task_id='delete_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
+        db_instance_identifier=rds_db_identifier,
         rds_kwargs={
-            "SkipFinalSnapshot": True,
+            'SkipFinalSnapshot': True,
         },
     )
     # [END howto_operator_rds_delete_db_instance]
+    delete_db_instance.trigger_rule = TriggerRule.ALL_DONE
+
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_db_instance,
+        await_db_instance,
+        delete_db_instance,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()

Review Comment:
   My apologies, I misunderstood and thought you were saying the whole line was not needed.
   
   Yeah, interesting.  That block is a copy/paste from the [AIP-47](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-47+New+design+of+Airflow+System+Tests#AIP47NewdesignofAirflowSystemTests-Processofmigrationindetails) specification, and all of the previous system tests do it this way.  But it does look like maybe it isn't required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #26733: RDS Instance System Tests

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #26733:
URL: https://github.com/apache/airflow/pull/26733#discussion_r990905916


##########
tests/system/providers/amazon/aws/example_rds_instance.py:
##########
@@ -26,56 +25,78 @@
     RdsDeleteDbInstanceOperator,
 )
 from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor
-from tests.system.providers.amazon.aws.utils import set_env_id
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+sys_test_context_task = SystemTestContextBuilder().build()
 
-ENV_ID = set_env_id()
-DAG_ID = "example_rds_instance"
+DAG_ID = 'example_rds_instance'
 
-RDS_DB_IDENTIFIER = f'{ENV_ID}-database'
 RDS_USERNAME = 'database_username'
 # NEVER store your production password in plaintext in a DAG like this.
 # Use Airflow Secrets or a secret manager for this in production.
 RDS_PASSWORD = 'database_password'
 
 with DAG(
     dag_id=DAG_ID,
-    schedule=None,
+    schedule='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
+    test_context = sys_test_context_task()
+    rds_db_identifier = f'{test_context[ENV_ID_KEY]}-database'
+
     # [START howto_operator_rds_create_db_instance]
     create_db_instance = RdsCreateDbInstanceOperator(
         task_id='create_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
-        db_instance_class="db.t4g.micro",
-        engine="postgres",
+        db_instance_identifier=rds_db_identifier,
+        db_instance_class='db.t4g.micro',
+        engine='postgres',
         rds_kwargs={
-            "MasterUsername": RDS_USERNAME,
-            "MasterUserPassword": RDS_PASSWORD,
-            "AllocatedStorage": 20,
+            'MasterUsername': RDS_USERNAME,
+            'MasterUserPassword': RDS_PASSWORD,
+            'AllocatedStorage': 20,
         },
     )
     # [END howto_operator_rds_create_db_instance]
 
+    # RdsCreateDbInstanceOperator waits by default, setting as False to test the Sensor below.
+    create_db_instance.wait_for_completion = False
+
     # [START howto_sensor_rds_instance]
-    db_instance_available = RdsDbSensor(
-        task_id="db_instance_available",
-        db_identifier=RDS_DB_IDENTIFIER,
+    await_db_instance = RdsDbSensor(
+        task_id='await_db_instance',
+        db_identifier=rds_db_identifier,
     )
     # [END howto_sensor_rds_instance]
 
     # [START howto_operator_rds_delete_db_instance]
     delete_db_instance = RdsDeleteDbInstanceOperator(
         task_id='delete_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
+        db_instance_identifier=rds_db_identifier,
         rds_kwargs={
-            "SkipFinalSnapshot": True,
+            'SkipFinalSnapshot': True,
         },
     )
     # [END howto_operator_rds_delete_db_instance]
+    delete_db_instance.trigger_rule = TriggerRule.ALL_DONE
+
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_db_instance,
+        await_db_instance,
+        delete_db_instance,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()

Review Comment:
   There was a good reason we added it first and possibly it has changed since by modifying "tasks" property to return list. As @ferruzzi wrote - better to keep it consistent and we might want to remove the list as a final refactor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26733: RDS Instance System Tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26733:
URL: https://github.com/apache/airflow/pull/26733#discussion_r981910056


##########
tests/system/providers/amazon/aws/example_rds_instance.py:
##########
@@ -26,56 +25,78 @@
     RdsDeleteDbInstanceOperator,
 )
 from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor
-from tests.system.providers.amazon.aws.utils import set_env_id
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+sys_test_context_task = SystemTestContextBuilder().build()
 
-ENV_ID = set_env_id()
-DAG_ID = "example_rds_instance"
+DAG_ID = 'example_rds_instance'
 
-RDS_DB_IDENTIFIER = f'{ENV_ID}-database'
 RDS_USERNAME = 'database_username'
 # NEVER store your production password in plaintext in a DAG like this.
 # Use Airflow Secrets or a secret manager for this in production.
 RDS_PASSWORD = 'database_password'
 
 with DAG(
     dag_id=DAG_ID,
-    schedule=None,
+    schedule='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
+    test_context = sys_test_context_task()
+    rds_db_identifier = f'{test_context[ENV_ID_KEY]}-database'
+
     # [START howto_operator_rds_create_db_instance]
     create_db_instance = RdsCreateDbInstanceOperator(
         task_id='create_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
-        db_instance_class="db.t4g.micro",
-        engine="postgres",
+        db_instance_identifier=rds_db_identifier,
+        db_instance_class='db.t4g.micro',
+        engine='postgres',
         rds_kwargs={
-            "MasterUsername": RDS_USERNAME,
-            "MasterUserPassword": RDS_PASSWORD,
-            "AllocatedStorage": 20,
+            'MasterUsername': RDS_USERNAME,
+            'MasterUserPassword': RDS_PASSWORD,
+            'AllocatedStorage': 20,
         },
     )
     # [END howto_operator_rds_create_db_instance]
 
+    # RdsCreateDbInstanceOperator waits by default, setting as False to test the Sensor below.
+    create_db_instance.wait_for_completion = False
+
     # [START howto_sensor_rds_instance]
-    db_instance_available = RdsDbSensor(
-        task_id="db_instance_available",
-        db_identifier=RDS_DB_IDENTIFIER,
+    await_db_instance = RdsDbSensor(
+        task_id='await_db_instance',
+        db_identifier=rds_db_identifier,
     )
     # [END howto_sensor_rds_instance]
 
     # [START howto_operator_rds_delete_db_instance]
     delete_db_instance = RdsDeleteDbInstanceOperator(
         task_id='delete_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
+        db_instance_identifier=rds_db_identifier,
         rds_kwargs={
-            "SkipFinalSnapshot": True,
+            'SkipFinalSnapshot': True,
         },
     )
     # [END howto_operator_rds_delete_db_instance]
+    delete_db_instance.trigger_rule = TriggerRule.ALL_DONE
+
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_db_instance,
+        await_db_instance,
+        delete_db_instance,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()

Review Comment:
   This additional `list` calls is unnecessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26733: RDS Instance System Tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26733:
URL: https://github.com/apache/airflow/pull/26733#discussion_r981913360


##########
tests/system/providers/amazon/aws/example_rds_instance.py:
##########
@@ -26,56 +25,78 @@
     RdsDeleteDbInstanceOperator,
 )
 from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor
-from tests.system.providers.amazon.aws.utils import set_env_id
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+sys_test_context_task = SystemTestContextBuilder().build()
 
-ENV_ID = set_env_id()
-DAG_ID = "example_rds_instance"
+DAG_ID = 'example_rds_instance'
 
-RDS_DB_IDENTIFIER = f'{ENV_ID}-database'
 RDS_USERNAME = 'database_username'
 # NEVER store your production password in plaintext in a DAG like this.
 # Use Airflow Secrets or a secret manager for this in production.
 RDS_PASSWORD = 'database_password'
 
 with DAG(
     dag_id=DAG_ID,
-    schedule=None,
+    schedule='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
+    test_context = sys_test_context_task()
+    rds_db_identifier = f'{test_context[ENV_ID_KEY]}-database'
+
     # [START howto_operator_rds_create_db_instance]
     create_db_instance = RdsCreateDbInstanceOperator(
         task_id='create_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
-        db_instance_class="db.t4g.micro",
-        engine="postgres",
+        db_instance_identifier=rds_db_identifier,
+        db_instance_class='db.t4g.micro',
+        engine='postgres',
         rds_kwargs={
-            "MasterUsername": RDS_USERNAME,
-            "MasterUserPassword": RDS_PASSWORD,
-            "AllocatedStorage": 20,
+            'MasterUsername': RDS_USERNAME,
+            'MasterUserPassword': RDS_PASSWORD,
+            'AllocatedStorage': 20,
         },
     )
     # [END howto_operator_rds_create_db_instance]
 
+    # RdsCreateDbInstanceOperator waits by default, setting as False to test the Sensor below.
+    create_db_instance.wait_for_completion = False
+
     # [START howto_sensor_rds_instance]
-    db_instance_available = RdsDbSensor(
-        task_id="db_instance_available",
-        db_identifier=RDS_DB_IDENTIFIER,
+    await_db_instance = RdsDbSensor(
+        task_id='await_db_instance',
+        db_identifier=rds_db_identifier,
     )
     # [END howto_sensor_rds_instance]
 
     # [START howto_operator_rds_delete_db_instance]
     delete_db_instance = RdsDeleteDbInstanceOperator(
         task_id='delete_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
+        db_instance_identifier=rds_db_identifier,
         rds_kwargs={
-            "SkipFinalSnapshot": True,
+            'SkipFinalSnapshot': True,
         },
     )
     # [END howto_operator_rds_delete_db_instance]
+    delete_db_instance.trigger_rule = TriggerRule.ALL_DONE
+
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_db_instance,
+        await_db_instance,
+        delete_db_instance,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()

Review Comment:
   Weird, I thought `dag.tasks` already returns a (new) list
   
   https://github.com/apache/airflow/blob/685f5230440d5d1c538acd80cdb0622f2791fad6/airflow/models/dag.py#L1163-L1165



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #26733: RDS Instance System Tests

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #26733:
URL: https://github.com/apache/airflow/pull/26733#discussion_r981912192


##########
tests/system/providers/amazon/aws/example_rds_instance.py:
##########
@@ -26,56 +25,78 @@
     RdsDeleteDbInstanceOperator,
 )
 from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor
-from tests.system.providers.amazon.aws.utils import set_env_id
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+sys_test_context_task = SystemTestContextBuilder().build()
 
-ENV_ID = set_env_id()
-DAG_ID = "example_rds_instance"
+DAG_ID = 'example_rds_instance'
 
-RDS_DB_IDENTIFIER = f'{ENV_ID}-database'
 RDS_USERNAME = 'database_username'
 # NEVER store your production password in plaintext in a DAG like this.
 # Use Airflow Secrets or a secret manager for this in production.
 RDS_PASSWORD = 'database_password'
 
 with DAG(
     dag_id=DAG_ID,
-    schedule=None,
+    schedule='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
+    test_context = sys_test_context_task()
+    rds_db_identifier = f'{test_context[ENV_ID_KEY]}-database'
+
     # [START howto_operator_rds_create_db_instance]
     create_db_instance = RdsCreateDbInstanceOperator(
         task_id='create_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
-        db_instance_class="db.t4g.micro",
-        engine="postgres",
+        db_instance_identifier=rds_db_identifier,
+        db_instance_class='db.t4g.micro',
+        engine='postgres',
         rds_kwargs={
-            "MasterUsername": RDS_USERNAME,
-            "MasterUserPassword": RDS_PASSWORD,
-            "AllocatedStorage": 20,
+            'MasterUsername': RDS_USERNAME,
+            'MasterUserPassword': RDS_PASSWORD,
+            'AllocatedStorage': 20,
         },
     )
     # [END howto_operator_rds_create_db_instance]
 
+    # RdsCreateDbInstanceOperator waits by default, setting as False to test the Sensor below.
+    create_db_instance.wait_for_completion = False
+
     # [START howto_sensor_rds_instance]
-    db_instance_available = RdsDbSensor(
-        task_id="db_instance_available",
-        db_identifier=RDS_DB_IDENTIFIER,
+    await_db_instance = RdsDbSensor(
+        task_id='await_db_instance',
+        db_identifier=rds_db_identifier,
     )
     # [END howto_sensor_rds_instance]
 
     # [START howto_operator_rds_delete_db_instance]
     delete_db_instance = RdsDeleteDbInstanceOperator(
         task_id='delete_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
+        db_instance_identifier=rds_db_identifier,
         rds_kwargs={
-            "SkipFinalSnapshot": True,
+            'SkipFinalSnapshot': True,
         },
     )
     # [END howto_operator_rds_delete_db_instance]
+    delete_db_instance.trigger_rule = TriggerRule.ALL_DONE
+
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_db_instance,
+        await_db_instance,
+        delete_db_instance,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()

Review Comment:
   Needed for system tests which have trigger_rule='ALL_DONE' on one of the tasks.  Even if either of the first two tasks fail, the third (delete_db_instance) will trigger, and the test will still mark as a pass without the watcher.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #26733: RDS Instance System Tests

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #26733:
URL: https://github.com/apache/airflow/pull/26733#discussion_r982628251


##########
tests/system/providers/amazon/aws/example_rds_instance.py:
##########
@@ -26,56 +25,78 @@
     RdsDeleteDbInstanceOperator,
 )
 from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor
-from tests.system.providers.amazon.aws.utils import set_env_id
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+sys_test_context_task = SystemTestContextBuilder().build()
 
-ENV_ID = set_env_id()
-DAG_ID = "example_rds_instance"
+DAG_ID = 'example_rds_instance'
 
-RDS_DB_IDENTIFIER = f'{ENV_ID}-database'
 RDS_USERNAME = 'database_username'
 # NEVER store your production password in plaintext in a DAG like this.
 # Use Airflow Secrets or a secret manager for this in production.
 RDS_PASSWORD = 'database_password'
 
 with DAG(
     dag_id=DAG_ID,
-    schedule=None,
+    schedule='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
+    test_context = sys_test_context_task()
+    rds_db_identifier = f'{test_context[ENV_ID_KEY]}-database'
+
     # [START howto_operator_rds_create_db_instance]
     create_db_instance = RdsCreateDbInstanceOperator(
         task_id='create_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
-        db_instance_class="db.t4g.micro",
-        engine="postgres",
+        db_instance_identifier=rds_db_identifier,
+        db_instance_class='db.t4g.micro',
+        engine='postgres',
         rds_kwargs={
-            "MasterUsername": RDS_USERNAME,
-            "MasterUserPassword": RDS_PASSWORD,
-            "AllocatedStorage": 20,
+            'MasterUsername': RDS_USERNAME,
+            'MasterUserPassword': RDS_PASSWORD,
+            'AllocatedStorage': 20,
         },
     )
     # [END howto_operator_rds_create_db_instance]
 
+    # RdsCreateDbInstanceOperator waits by default, setting as False to test the Sensor below.
+    create_db_instance.wait_for_completion = False
+
     # [START howto_sensor_rds_instance]
-    db_instance_available = RdsDbSensor(
-        task_id="db_instance_available",
-        db_identifier=RDS_DB_IDENTIFIER,
+    await_db_instance = RdsDbSensor(
+        task_id='await_db_instance',
+        db_identifier=rds_db_identifier,
     )
     # [END howto_sensor_rds_instance]
 
     # [START howto_operator_rds_delete_db_instance]
     delete_db_instance = RdsDeleteDbInstanceOperator(
         task_id='delete_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
+        db_instance_identifier=rds_db_identifier,
         rds_kwargs={
-            "SkipFinalSnapshot": True,
+            'SkipFinalSnapshot': True,
         },
     )
     # [END howto_operator_rds_delete_db_instance]
+    delete_db_instance.trigger_rule = TriggerRule.ALL_DONE
+
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_db_instance,
+        await_db_instance,
+        delete_db_instance,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()

Review Comment:
   Yeah, interesting.  That block is a copy/paste from the [AIP-47](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-47+New+design+of+Airflow+System+Tests#AIP47NewdesignofAirflowSystemTests-Processofmigrationindetails) specification, and all of the previous system tests do it this way.  But it does look like maybe it isn't required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #26733: RDS Instance System Tests

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #26733:
URL: https://github.com/apache/airflow/pull/26733#discussion_r984903010


##########
tests/system/providers/amazon/aws/example_rds_instance.py:
##########
@@ -26,56 +25,78 @@
     RdsDeleteDbInstanceOperator,
 )
 from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor
-from tests.system.providers.amazon.aws.utils import set_env_id
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+sys_test_context_task = SystemTestContextBuilder().build()
 
-ENV_ID = set_env_id()
-DAG_ID = "example_rds_instance"
+DAG_ID = 'example_rds_instance'
 
-RDS_DB_IDENTIFIER = f'{ENV_ID}-database'
 RDS_USERNAME = 'database_username'
 # NEVER store your production password in plaintext in a DAG like this.
 # Use Airflow Secrets or a secret manager for this in production.
 RDS_PASSWORD = 'database_password'
 
 with DAG(
     dag_id=DAG_ID,
-    schedule=None,
+    schedule='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
+    test_context = sys_test_context_task()
+    rds_db_identifier = f'{test_context[ENV_ID_KEY]}-database'
+
     # [START howto_operator_rds_create_db_instance]
     create_db_instance = RdsCreateDbInstanceOperator(
         task_id='create_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
-        db_instance_class="db.t4g.micro",
-        engine="postgres",
+        db_instance_identifier=rds_db_identifier,
+        db_instance_class='db.t4g.micro',
+        engine='postgres',
         rds_kwargs={
-            "MasterUsername": RDS_USERNAME,
-            "MasterUserPassword": RDS_PASSWORD,
-            "AllocatedStorage": 20,
+            'MasterUsername': RDS_USERNAME,
+            'MasterUserPassword': RDS_PASSWORD,
+            'AllocatedStorage': 20,
         },
     )
     # [END howto_operator_rds_create_db_instance]
 
+    # RdsCreateDbInstanceOperator waits by default, setting as False to test the Sensor below.
+    create_db_instance.wait_for_completion = False
+
     # [START howto_sensor_rds_instance]
-    db_instance_available = RdsDbSensor(
-        task_id="db_instance_available",
-        db_identifier=RDS_DB_IDENTIFIER,
+    await_db_instance = RdsDbSensor(
+        task_id='await_db_instance',
+        db_identifier=rds_db_identifier,
     )
     # [END howto_sensor_rds_instance]
 
     # [START howto_operator_rds_delete_db_instance]
     delete_db_instance = RdsDeleteDbInstanceOperator(
         task_id='delete_db_instance',
-        db_instance_identifier=RDS_DB_IDENTIFIER,
+        db_instance_identifier=rds_db_identifier,
         rds_kwargs={
-            "SkipFinalSnapshot": True,
+            'SkipFinalSnapshot': True,
         },
     )
     # [END howto_operator_rds_delete_db_instance]
+    delete_db_instance.trigger_rule = TriggerRule.ALL_DONE
+
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_db_instance,
+        await_db_instance,
+        delete_db_instance,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()

Review Comment:
   I would prefer to keep it standardized like all of the others and fix them all later, unless you feel strongly about changing this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org