You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/08/10 21:32:54 UTC

[airflow] branch main updated: Fix AutoML system tests (#33197)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ef6851ab9 Fix AutoML system tests (#33197)
7ef6851ab9 is described below

commit 7ef6851ab94266720727a7cf4c6c7d7c6d6f9f93
Author: VladaZakharova <80...@users.noreply.github.com>
AuthorDate: Thu Aug 10 23:32:46 2023 +0200

    Fix AutoML system tests (#33197)
---
 .../google/cloud/automl/example_automl_dataset.py  |  47 ++++-----
 .../google/cloud/automl/example_automl_model.py    |  28 ++---
 .../example_automl_nl_text_classification.py       | 102 +++++++++++++-----
 .../automl/example_automl_nl_text_extraction.py    |  34 +++---
 .../automl/example_automl_nl_text_sentiment.py     |  93 ++++++++++------
 .../cloud/automl/example_automl_translation.py     | 117 +++++++++++++++------
 ...ple_automl_video_intelligence_classification.py |  94 +++++++++++------
 .../example_automl_video_intelligence_tracking.py  |  94 +++++++++++------
 .../automl/example_automl_vision_classification.py |  88 +++++++++++-----
 .../example_automl_vision_object_detection.py      | 101 ++++++++++++------
 10 files changed, 530 insertions(+), 268 deletions(-)

diff --git a/tests/system/providers/google/cloud/automl/example_automl_dataset.py b/tests/system/providers/google/cloud/automl/example_automl_dataset.py
index e3c5abf671..1d7da91c8f 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_dataset.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_dataset.py
@@ -43,21 +43,20 @@ from airflow.providers.google.cloud.operators.gcs import (
 )
 from airflow.utils.trigger_rule import TriggerRule
 
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "automl_dataset"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_dataset"
 GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
 
 GCP_AUTOML_LOCATION = "us-central1"
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
 
-DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-RESOURCE_DATA_BUCKET = "system-tests-resources"
-
-DATASET_NAME = "test_dataset_tabular"
+DATASET_NAME = f"ds_tabular_{ENV_ID}".replace("-", "_")
 DATASET = {
     "display_name": DATASET_NAME,
     "tables_dataset_metadata": {"target_column_spec_id": ""},
 }
-AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/bank-marketing.csv"
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/tabular-classification.csv"
 IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
 
 extract_object_id = CloudAutoMLHook.extract_object_id
@@ -78,7 +77,7 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "automl"],
+    tags=["example", "automl", "dataset"],
     user_defined_macros={
         "get_target_column_spec": get_target_column_spec,
         "target": "Class",
@@ -95,7 +94,7 @@ with models.DAG(
     move_dataset_file = GCSSynchronizeBucketsOperator(
         task_id="move_dataset_to_bucket",
         source_bucket=RESOURCE_DATA_BUCKET,
-        source_object="automl",
+        source_object="automl/datasets/tabular",
         destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
         destination_object="automl",
         recursive=True,
@@ -112,8 +111,8 @@ with models.DAG(
     # [END howto_operator_automl_create_dataset]
 
     # [START howto_operator_automl_import_data]
-    import_dataset_task = AutoMLImportDataOperator(
-        task_id="import_dataset_task",
+    import_dataset = AutoMLImportDataOperator(
+        task_id="import_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         input_config=IMPORT_INPUT_CONFIG,
@@ -121,8 +120,8 @@ with models.DAG(
     # [END howto_operator_automl_import_data]
 
     # [START howto_operator_automl_specs]
-    list_tables_spec_task = AutoMLTablesListTableSpecsOperator(
-        task_id="list_tables_spec_task",
+    list_tables_spec = AutoMLTablesListTableSpecsOperator(
+        task_id="list_tables_spec",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
@@ -130,8 +129,8 @@ with models.DAG(
     # [END howto_operator_automl_specs]
 
     # [START howto_operator_automl_column_specs]
-    list_columns_spec_task = AutoMLTablesListColumnSpecsOperator(
-        task_id="list_columns_spec_task",
+    list_columns_spec = AutoMLTablesListColumnSpecsOperator(
+        task_id="list_columns_spec",
         dataset_id=dataset_id,
         table_spec_id="{{ extract_object_id(task_instance.xcom_pull('list_tables_spec_task')[0]) }}",
         location=GCP_AUTOML_LOCATION,
@@ -146,16 +145,16 @@ with models.DAG(
         "target_column_spec_id"
     ] = "{{ get_target_column_spec(task_instance.xcom_pull('list_columns_spec_task'), target) }}"
 
-    update_dataset_task = AutoMLTablesUpdateDatasetOperator(
-        task_id="update_dataset_task",
+    update_dataset = AutoMLTablesUpdateDatasetOperator(
+        task_id="update_dataset",
         dataset=update,
         location=GCP_AUTOML_LOCATION,
     )
     # [END howto_operator_automl_update_dataset]
 
     # [START howto_operator_list_dataset]
-    list_datasets_task = AutoMLListDatasetOperator(
-        task_id="list_datasets_task",
+    list_datasets = AutoMLListDatasetOperator(
+        task_id="list_datasets",
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
@@ -178,11 +177,11 @@ with models.DAG(
         # TEST SETUP
         [create_bucket >> move_dataset_file, create_dataset]
         # TEST BODY
-        >> import_dataset_task
-        >> list_tables_spec_task
-        >> list_columns_spec_task
-        >> update_dataset_task
-        >> list_datasets_task
+        >> import_dataset
+        >> list_tables_spec
+        >> list_columns_spec
+        >> update_dataset
+        >> list_datasets
         # TEST TEARDOWN
         >> delete_dataset
         >> delete_bucket
diff --git a/tests/system/providers/google/cloud/automl/example_automl_model.py b/tests/system/providers/google/cloud/automl/example_automl_model.py
index 5e7560a022..274dd83e95 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_model.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_model.py
@@ -50,33 +50,35 @@ from airflow.providers.google.cloud.operators.gcs import (
 )
 from airflow.utils.trigger_rule import TriggerRule
 
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
 DAG_ID = "example_automl_model"
 GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
 
 GCP_AUTOML_LOCATION = "us-central1"
 
-DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-RESOURCE_DATA_BUCKET = "system-tests-resources"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 
-DATASET_NAME = "test_dataset_model"
+DATASET_NAME = f"md_tabular_{ENV_ID}".replace("-", "_")
 DATASET = {
     "display_name": DATASET_NAME,
     "tables_dataset_metadata": {"target_column_spec_id": ""},
 }
-AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl-model/bank-marketing.csv"
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/bank-marketing-split.csv"
 IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
 IMPORT_OUTPUT_CONFIG = {
-    "gcs_destination": {"output_uri_prefix": f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl-model"}
+    "gcs_destination": {"output_uri_prefix": f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl"}
 }
 
-MODEL_NAME = "test_model"
+# change the name here
+MODEL_NAME = f"md_tabular_{ENV_ID}".replace("-", "_")
 MODEL = {
     "display_name": MODEL_NAME,
     "tables_model_metadata": {"train_budget_milli_node_hours": 1000},
 }
 
 PREDICT_VALUES = [
+    Value(string_value="TRAINING"),
     Value(string_value="51"),
     Value(string_value="blue-collar"),
     Value(string_value="married"),
@@ -115,10 +117,10 @@ with models.DAG(
     catchup=False,
     user_defined_macros={
         "get_target_column_spec": get_target_column_spec,
-        "target": "Class",
+        "target": "Deposit",
         "extract_object_id": extract_object_id,
     },
-    tags=["example", "automl"],
+    tags=["example", "automl", "model"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket",
@@ -130,9 +132,9 @@ with models.DAG(
     move_dataset_file = GCSSynchronizeBucketsOperator(
         task_id="move_data_to_bucket",
         source_bucket=RESOURCE_DATA_BUCKET,
-        source_object="automl-model",
+        source_object="automl/datasets/model",
         destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
-        destination_object="automl-model",
+        destination_object="automl",
         recursive=True,
     )
 
@@ -255,9 +257,7 @@ with models.DAG(
 
     (
         # TEST SETUP
-        create_bucket
-        >> move_dataset_file
-        >> create_dataset
+        [create_bucket >> move_dataset_file, create_dataset]
         >> import_dataset
         >> list_tables_spec
         >> list_columns_spec
diff --git a/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py b/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py
index 3912916209..0a04b3b361 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py
@@ -31,47 +31,77 @@ from airflow.providers.google.cloud.operators.automl import (
     AutoMLCreateDatasetOperator,
     AutoMLDeleteDatasetOperator,
     AutoMLDeleteModelOperator,
+    AutoMLDeployModelOperator,
     AutoMLImportDataOperator,
     AutoMLTrainModelOperator,
 )
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+    GCSSynchronizeBucketsOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_text_cls"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
 
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_TEXT_CLS_BUCKET = os.environ.get("GCP_AUTOML_TEXT_CLS_BUCKET", "gs://INVALID BUCKET NAME")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 
-# Example model
+MODEL_NAME = "text_clss_test_model"
 MODEL = {
-    "display_name": "auto_model_1",
+    "display_name": MODEL_NAME,
     "text_classification_model_metadata": {},
 }
 
-# Example dataset
+DATASET_NAME = f"ds_clss_{ENV_ID}".replace("-", "_")
 DATASET = {
-    "display_name": "test_text_cls_dataset",
+    "display_name": DATASET_NAME,
     "text_classification_dataset_metadata": {"classification_type": "MULTICLASS"},
 }
 
-
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_TEXT_CLS_BUCKET]}}
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/text_classification.csv"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
 
 extract_object_id = CloudAutoMLHook.extract_object_id
 
 # Example DAG for AutoML Natural Language Text Classification
 with models.DAG(
-    "example_automl_text_cls",
+    DAG_ID,
+    schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example"],
+    tags=["example", "automl", "text-classification"],
 ) as dag:
-    create_dataset_task = AutoMLCreateDatasetOperator(
-        task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=GCP_AUTOML_LOCATION,
     )
 
-    dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
-    MODEL["dataset_id"] = dataset_id
+    move_dataset_file = GCSSynchronizeBucketsOperator(
+        task_id="move_dataset_to_bucket",
+        source_bucket=RESOURCE_DATA_BUCKET,
+        source_object="automl/datasets/text",
+        destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+        destination_object="automl",
+        recursive=True,
+    )
+
+    create_dataset = AutoMLCreateDatasetOperator(
+        task_id="create_dataset",
+        dataset=DATASET,
+        location=GCP_AUTOML_LOCATION,
+        project_id=GCP_PROJECT_ID,
+    )
 
-    import_dataset_task = AutoMLImportDataOperator(
-        task_id="import_dataset_task",
+    dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
+    MODEL["dataset_id"] = dataset_id
+    import_dataset = AutoMLImportDataOperator(
+        task_id="import_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         input_config=IMPORT_INPUT_CONFIG,
@@ -81,29 +111,43 @@ with models.DAG(
     create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
     model_id = cast(str, XComArg(create_model, key="model_id"))
 
-    delete_model_task = AutoMLDeleteModelOperator(
-        task_id="delete_model_task",
+    deploy_model = AutoMLDeployModelOperator(
+        task_id="deploy_model",
         model_id=model_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    delete_datasets_task = AutoMLDeleteDatasetOperator(
-        task_id="delete_datasets_task",
+    delete_model = AutoMLDeleteModelOperator(
+        task_id="delete_model",
+        model_id=model_id,
+        location=GCP_AUTOML_LOCATION,
+        project_id=GCP_PROJECT_ID,
+    )
+
+    delete_dataset = AutoMLDeleteDatasetOperator(
+        task_id="delete_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    # TEST BODY
-    import_dataset_task >> create_model
-    # TEST TEARDOWN
-    delete_model_task >> delete_datasets_task
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+    )
 
-    # Task dependencies created via `XComArgs`:
-    #   create_dataset_task >> import_dataset_task
-    #   create_dataset_task >> create_model
-    #   create_dataset_task >> delete_datasets_task
+    (
+        # TEST SETUP
+        [create_bucket >> move_dataset_file, create_dataset]
+        # TEST BODY
+        >> import_dataset
+        >> create_model
+        >> deploy_model
+        # TEST TEARDOWN
+        >> delete_model
+        >> delete_dataset
+        >> delete_bucket
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git a/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py b/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py
index fd6a4fd6a7..260a7d84c6 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py
@@ -41,18 +41,18 @@ from airflow.providers.google.cloud.operators.gcs import (
 )
 from airflow.utils.trigger_rule import TriggerRule
 
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_automl_text"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_text_extr"
 GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
 
 GCP_AUTOML_LOCATION = "us-central1"
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 
-DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-RESOURCE_DATA_BUCKET = "system-tests-resources"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
 
-DATASET_NAME = "test_entity_extr"
+DATASET_NAME = f"ds_extr_{ENV_ID}".replace("-", "_")
 DATASET = {"display_name": DATASET_NAME, "text_extraction_dataset_metadata": {}}
-AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl-text/dataset.csv"
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/text_extraction.csv"
 IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
 
 MODEL_NAME = "entity_extr_test_model"
@@ -70,7 +70,7 @@ with models.DAG(
     start_date=datetime(2021, 1, 1),
     catchup=False,
     user_defined_macros={"extract_object_id": extract_object_id},
-    tags=["example", "automl"],
+    tags=["example", "automl", "text-extraction"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket",
@@ -82,23 +82,23 @@ with models.DAG(
     move_dataset_file = GCSSynchronizeBucketsOperator(
         task_id="move_data_to_bucket",
         source_bucket=RESOURCE_DATA_BUCKET,
-        source_object="automl-text",
+        source_object="automl/datasets/text",
         destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
-        destination_object="automl-text",
+        destination_object="automl",
         recursive=True,
     )
 
-    create_dataset_task = AutoMLCreateDatasetOperator(
-        task_id="create_dataset_task",
+    create_dataset = AutoMLCreateDatasetOperator(
+        task_id="create_dataset",
         dataset=DATASET,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+    dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
     MODEL["dataset_id"] = dataset_id
-    import_dataset_task = AutoMLImportDataOperator(
-        task_id="import_dataset_task",
+    import_dataset = AutoMLImportDataOperator(
+        task_id="import_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         input_config=IMPORT_INPUT_CONFIG,
@@ -137,11 +137,9 @@ with models.DAG(
 
     (
         # TEST SETUP
-        create_bucket
-        >> move_dataset_file
-        >> create_dataset_task
-        >> import_dataset_task
+        [create_bucket >> move_dataset_file, create_dataset]
         # TEST BODY
+        >> import_dataset
         >> create_model
         # TEST TEARDOWN
         >> delete_model_task
diff --git a/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py b/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py
index 6bbe0d2bfb..3559339755 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py
@@ -34,44 +34,71 @@ from airflow.providers.google.cloud.operators.automl import (
     AutoMLImportDataOperator,
     AutoMLTrainModelOperator,
 )
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+    GCSSynchronizeBucketsOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
 
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_SENTIMENT_BUCKET = os.environ.get("GCP_AUTOML_SENTIMENT_BUCKET", "gs://INVALID BUCKET NAME")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_text_sent"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 
-# Example model
+MODEL_NAME = "text_sent_test_model"
 MODEL = {
-    "display_name": "auto_model_1",
+    "display_name": MODEL_NAME,
     "text_sentiment_model_metadata": {},
 }
 
-# Example dataset
+DATASET_NAME = f"ds_sent_{ENV_ID}".replace("-", "_")
 DATASET = {
-    "display_name": "test_text_sentiment_dataset",
-    "text_sentiment_dataset_metadata": {"sentiment_max": 10},
+    "display_name": DATASET_NAME,
+    "text_sentiment_dataset_metadata": {"sentiment_max": 5},
 }
 
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_SENTIMENT_BUCKET]}}
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/text_sentiment.csv"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
 
 extract_object_id = CloudAutoMLHook.extract_object_id
 
 # Example DAG for AutoML Natural Language Text Sentiment
 with models.DAG(
-    "example_automl_text_sentiment",
+    DAG_ID,
+    schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
     user_defined_macros={"extract_object_id": extract_object_id},
-    tags=["example"],
+    tags=["example", "automl", "text-sentiment"],
 ) as dag:
-    create_dataset_task = AutoMLCreateDatasetOperator(
-        task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=GCP_AUTOML_LOCATION,
     )
 
-    dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+    move_dataset_file = GCSSynchronizeBucketsOperator(
+        task_id="move_dataset_to_bucket",
+        source_bucket=RESOURCE_DATA_BUCKET,
+        source_object="automl/datasets/text",
+        destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+        destination_object="automl",
+        recursive=True,
+    )
+
+    create_dataset = AutoMLCreateDatasetOperator(
+        task_id="create_dataset", dataset=DATASET, location=GCP_AUTOML_LOCATION
+    )
+
+    dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
     MODEL["dataset_id"] = dataset_id
 
-    import_dataset_task = AutoMLImportDataOperator(
-        task_id="import_dataset_task",
+    import_dataset = AutoMLImportDataOperator(
+        task_id="import_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         input_config=IMPORT_INPUT_CONFIG,
@@ -80,33 +107,39 @@ with models.DAG(
     MODEL["dataset_id"] = dataset_id
 
     create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
-
     model_id = cast(str, XComArg(create_model, key="model_id"))
 
-    delete_model_task = AutoMLDeleteModelOperator(
-        task_id="delete_model_task",
+    delete_model = AutoMLDeleteModelOperator(
+        task_id="delete_model",
         model_id=model_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    delete_datasets_task = AutoMLDeleteDatasetOperator(
-        task_id="delete_datasets_task",
+    delete_dataset = AutoMLDeleteDatasetOperator(
+        task_id="delete_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    # TEST BODY
-    import_dataset_task >> create_model
-    # TEST TEARDOWN
-    delete_model_task >> delete_datasets_task
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
 
-    # Task dependencies created via `XComArgs`:
-    #   create_dataset_task >> import_dataset_task
-    #   create_dataset_task >> create_model
-    #   create_model >> delete_model_task
-    #   create_dataset_task >> delete_datasets_task
+    (
+        # TEST SETUP
+        [create_bucket >> move_dataset_file, create_dataset]
+        # TEST BODY
+        >> import_dataset
+        >> create_model
+        # TEST TEARDOWN
+        >> delete_model
+        >> delete_dataset
+        >> delete_bucket
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git a/tests/system/providers/google/cloud/automl/example_automl_translation.py b/tests/system/providers/google/cloud/automl/example_automl_translation.py
index 87bf7166dc..e03453f03a 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_translation.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_translation.py
@@ -24,7 +24,10 @@ import os
 from datetime import datetime
 from typing import cast
 
+from google.cloud import storage
+
 from airflow import models
+from airflow.decorators import task
 from airflow.models.xcom_arg import XComArg
 from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
 from airflow.providers.google.cloud.operators.automl import (
@@ -34,51 +37,92 @@ from airflow.providers.google.cloud.operators.automl import (
     AutoMLImportDataOperator,
     AutoMLTrainModelOperator,
 )
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
 
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_TRANSLATION_BUCKET = os.environ.get(
-    "GCP_AUTOML_TRANSLATION_BUCKET", "gs://INVALID BUCKET NAME/file"
-)
+DAG_ID = "example_automl_translate"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 
-# Example model
+
+MODEL_NAME = "translate_test_model"
 MODEL = {
-    "display_name": "auto_model_1",
+    "display_name": MODEL_NAME,
     "translation_model_metadata": {},
 }
 
-# Example dataset
+DATASET_NAME = f"ds_translate_{ENV_ID}".replace("-", "_")
 DATASET = {
-    "display_name": "test_translation_dataset",
+    "display_name": DATASET_NAME,
     "translation_dataset_metadata": {
         "source_language_code": "en",
         "target_language_code": "es",
     },
 }
 
-
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_TRANSLATION_BUCKET]}}
+CSV_FILE_NAME = "en-es.csv"
+TSV_FILE_NAME = "en-es.tsv"
+GCS_FILE_PATH = f"automl/datasets/translate/{CSV_FILE_NAME}"
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/{CSV_FILE_NAME}"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
 
 extract_object_id = CloudAutoMLHook.extract_object_id
 
 
 # Example DAG for AutoML Translation
 with models.DAG(
-    "example_automl_translation",
-    start_date=datetime(2021, 1, 1),
+    DAG_ID,
     schedule="@once",
+    start_date=datetime(2021, 1, 1),
     catchup=False,
     user_defined_macros={"extract_object_id": extract_object_id},
-    tags=["example"],
+    tags=["example", "automl", "translate"],
 ) as dag:
-    create_dataset_task = AutoMLCreateDatasetOperator(
-        task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=GCP_AUTOML_LOCATION,
     )
 
-    dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+    @task
+    def upload_csv_file_to_gcs():
+        # download file into memory
+        storage_client = storage.Client()
+        bucket = storage_client.bucket(RESOURCE_DATA_BUCKET)
+        blob = bucket.blob(GCS_FILE_PATH)
+        contents = blob.download_as_string().decode()
+
+        # update memory content
+        updated_contents = contents.replace("template-bucket", DATA_SAMPLE_GCS_BUCKET_NAME)
+
+        # upload updated content to bucket
+        destination_bucket = storage_client.bucket(DATA_SAMPLE_GCS_BUCKET_NAME)
+        destination_blob = destination_bucket.blob(f"automl/{CSV_FILE_NAME}")
+        destination_blob.upload_from_string(updated_contents)
+
+    upload_csv_file_to_gcs_task = upload_csv_file_to_gcs()
+
+    copy_dataset_file = GCSToGCSOperator(
+        task_id="copy_dataset_file",
+        source_bucket=RESOURCE_DATA_BUCKET,
+        source_object=f"automl/datasets/translate/{TSV_FILE_NAME}",
+        destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+        destination_object=f"automl/{TSV_FILE_NAME}",
+    )
+
+    create_dataset = AutoMLCreateDatasetOperator(
+        task_id="create_dataset", dataset=DATASET, location=GCP_AUTOML_LOCATION
+    )
+
+    dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
 
-    import_dataset_task = AutoMLImportDataOperator(
-        task_id="import_dataset_task",
+    import_dataset = AutoMLImportDataOperator(
+        task_id="import_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         input_config=IMPORT_INPUT_CONFIG,
@@ -87,33 +131,40 @@ with models.DAG(
     MODEL["dataset_id"] = dataset_id
 
     create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
-
     model_id = cast(str, XComArg(create_model, key="model_id"))
 
-    delete_model_task = AutoMLDeleteModelOperator(
-        task_id="delete_model_task",
+    delete_model = AutoMLDeleteModelOperator(
+        task_id="delete_model",
         model_id=model_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    delete_datasets_task = AutoMLDeleteDatasetOperator(
-        task_id="delete_datasets_task",
+    delete_dataset = AutoMLDeleteDatasetOperator(
+        task_id="delete_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    # TEST BODY
-    import_dataset_task >> create_model
-    # TEST TEARDOWN
-    delete_model_task >> delete_datasets_task
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
 
-    # Task dependencies created via `XComArgs`:
-    #   create_dataset_task >> import_dataset_task
-    #   create_dataset_task >> create_model
-    #   create_model >> delete_model_task
-    #   create_dataset_task >> delete_datasets_task
+    (
+        # TEST SETUP
+        [create_bucket >> upload_csv_file_to_gcs_task >> copy_dataset_file]
+        # TEST BODY
+        >> create_dataset
+        >> import_dataset
+        >> create_model
+        # TEST TEARDOWN
+        >> delete_dataset
+        >> delete_model
+        >> delete_bucket
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py b/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py
index 63447a1230..7993494fff 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py
@@ -34,47 +34,73 @@ from airflow.providers.google.cloud.operators.automl import (
     AutoMLImportDataOperator,
     AutoMLTrainModelOperator,
 )
-
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_VIDEO_BUCKET = os.environ.get(
-    "GCP_AUTOML_VIDEO_BUCKET", "gs://INVALID BUCKET NAME/hmdb_split1.csv"
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+    GCSSynchronizeBucketsOperator,
 )
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_video_clss"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 
-# Example model
+MODEL_NAME = "video_clss_test_model"
 MODEL = {
-    "display_name": "auto_model_1",
+    "display_name": MODEL_NAME,
     "video_classification_model_metadata": {},
 }
 
-# Example dataset
+DATASET_NAME = f"ds_video_clss_{ENV_ID}".replace("-", "_")
 DATASET = {
-    "display_name": "test_video_dataset",
+    "display_name": DATASET_NAME,
     "video_classification_dataset_metadata": {},
 }
 
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_VIDEO_BUCKET]}}
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/video_classification.csv"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
+
 
 extract_object_id = CloudAutoMLHook.extract_object_id
 
 
 # Example DAG for AutoML Video Intelligence Classification
 with models.DAG(
-    "example_automl_video",
+    DAG_ID,
+    schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
     user_defined_macros={"extract_object_id": extract_object_id},
-    tags=["example"],
+    tags=["example", "automl", "video-clss"],
 ) as dag:
-    create_dataset_task = AutoMLCreateDatasetOperator(
-        task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=GCP_AUTOML_LOCATION,
+    )
+
+    move_dataset_file = GCSSynchronizeBucketsOperator(
+        task_id="move_dataset_to_bucket",
+        source_bucket=RESOURCE_DATA_BUCKET,
+        source_object="automl/datasets/video",
+        destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+        destination_object="automl",
+        recursive=True,
     )
 
-    dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+    create_dataset = AutoMLCreateDatasetOperator(
+        task_id="create_dataset", dataset=DATASET, location=GCP_AUTOML_LOCATION
+    )
+
+    dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
     MODEL["dataset_id"] = dataset_id
 
-    import_dataset_task = AutoMLImportDataOperator(
-        task_id="import_dataset_task",
+    import_dataset = AutoMLImportDataOperator(
+        task_id="import_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         input_config=IMPORT_INPUT_CONFIG,
@@ -83,33 +109,39 @@ with models.DAG(
     MODEL["dataset_id"] = dataset_id
 
     create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
-
     model_id = cast(str, XComArg(create_model, key="model_id"))
 
-    delete_model_task = AutoMLDeleteModelOperator(
-        task_id="delete_model_task",
+    delete_model = AutoMLDeleteModelOperator(
+        task_id="delete_model",
         model_id=model_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    delete_datasets_task = AutoMLDeleteDatasetOperator(
-        task_id="delete_datasets_task",
+    delete_dataset = AutoMLDeleteDatasetOperator(
+        task_id="delete_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    # TEST BODY
-    import_dataset_task >> create_model
-    # TEST TEARDOWN
-    delete_model_task >> delete_datasets_task
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
 
-    # Task dependencies created via `XComArgs`:
-    #   create_dataset_task >> import_dataset_task
-    #   create_dataset_task >> create_model
-    #   create_model >> delete_model_task
-    #   create_dataset_task >> delete_datasets_task
+    (
+        # TEST SETUP
+        [create_bucket >> move_dataset_file, create_dataset]
+        # TEST BODY
+        >> import_dataset
+        >> create_model
+        # TEST TEARDOWN
+        >> delete_model
+        >> delete_dataset
+        >> delete_bucket
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py b/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py
index 5ac8fa7457..61802077b3 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py
@@ -34,49 +34,73 @@ from airflow.providers.google.cloud.operators.automl import (
     AutoMLImportDataOperator,
     AutoMLTrainModelOperator,
 )
-
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_TRACKING_BUCKET = os.environ.get(
-    "GCP_AUTOML_TRACKING_BUCKET",
-    "gs://INVALID BUCKET NAME/youtube_8m_videos_animal_tiny.csv",
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+    GCSSynchronizeBucketsOperator,
 )
+from airflow.utils.trigger_rule import TriggerRule
 
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_video_track"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 
-# Example model
+MODEL_NAME = "video_track_test_model"
 MODEL = {
-    "display_name": "auto_model_1",
+    "display_name": MODEL_NAME,
     "video_object_tracking_model_metadata": {},
 }
 
-# Example dataset
+DATASET_NAME = f"ds_video_track_{ENV_ID}".replace("-", "_")
 DATASET = {
-    "display_name": "test_video_tracking_dataset",
+    "display_name": DATASET_NAME,
     "video_object_tracking_dataset_metadata": {},
 }
 
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_TRACKING_BUCKET]}}
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/video_tracking.csv"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
+
 
 extract_object_id = CloudAutoMLHook.extract_object_id
 
 
 # Example DAG for AutoML Video Intelligence Object Tracking
 with models.DAG(
-    "example_automl_video_tracking",
+    DAG_ID,
+    schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
     user_defined_macros={"extract_object_id": extract_object_id},
-    tags=["example"],
+    tags=["example", "automl", "video-tracking"],
 ) as dag:
-    create_dataset_task = AutoMLCreateDatasetOperator(
-        task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=GCP_AUTOML_LOCATION,
+    )
+
+    move_dataset_file = GCSSynchronizeBucketsOperator(
+        task_id="move_dataset_to_bucket",
+        source_bucket=RESOURCE_DATA_BUCKET,
+        source_object="automl/datasets/video",
+        destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+        destination_object="automl",
+        recursive=True,
     )
 
-    dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+    create_dataset = AutoMLCreateDatasetOperator(
+        task_id="create_dataset", dataset=DATASET, location=GCP_AUTOML_LOCATION
+    )
+
+    dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
     MODEL["dataset_id"] = dataset_id
 
-    import_dataset_task = AutoMLImportDataOperator(
-        task_id="import_dataset_task",
+    import_dataset = AutoMLImportDataOperator(
+        task_id="import_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         input_config=IMPORT_INPUT_CONFIG,
@@ -85,33 +109,39 @@ with models.DAG(
     MODEL["dataset_id"] = dataset_id
 
     create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
-
     model_id = cast(str, XComArg(create_model, key="model_id"))
 
-    delete_model_task = AutoMLDeleteModelOperator(
-        task_id="delete_model_task",
+    delete_model = AutoMLDeleteModelOperator(
+        task_id="delete_model",
         model_id=model_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    delete_datasets_task = AutoMLDeleteDatasetOperator(
-        task_id="delete_datasets_task",
+    delete_dataset = AutoMLDeleteDatasetOperator(
+        task_id="delete_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    # TEST BODY
-    import_dataset_task >> create_model
-    # TEST TEARDOWN
-    delete_model_task >> delete_datasets_task
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
 
-    # Task dependencies created via `XComArgs`:
-    #   create_dataset_task >> import_dataset_task
-    #   create_dataset_task >> create_model
-    #   create_model >> delete_model_task
-    #   create_dataset_task >> delete_datasets_task
+    (
+        # TEST SETUP
+        [create_bucket >> move_dataset_file, create_dataset]
+        # TEST BODY
+        >> import_dataset
+        >> create_model
+        # TEST TEARDOWN
+        >> delete_model
+        >> delete_dataset
+        >> delete_bucket
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git a/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py b/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py
index 5d6ea7b932..a85078779b 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py
@@ -24,7 +24,10 @@ import os
 from datetime import datetime
 from typing import cast
 
+from google.cloud import storage
+
 from airflow import models
+from airflow.decorators import task
 from airflow.models.xcom_arg import XComArg
 from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
 from airflow.providers.google.cloud.operators.automl import (
@@ -37,33 +40,36 @@ from airflow.providers.google.cloud.operators.automl import (
 from airflow.providers.google.cloud.operators.gcs import (
     GCSCreateBucketOperator,
     GCSDeleteBucketOperator,
-    GCSSynchronizeBucketsOperator,
 )
+from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.utils.trigger_rule import TriggerRule
 
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_automl_vision"
+DAG_ID = "example_automl_vision_clss"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
 GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-
 GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 
-DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-RESOURCE_DATA_BUCKET = "system-tests-resources"
 
-DATASET_NAME = "test_dataset_vision"
+DATASET_NAME = f"ds_vision_clss_{ENV_ID}".replace("-", "_")
 DATASET = {
     "display_name": DATASET_NAME,
     "image_classification_dataset_metadata": {"classification_type": "MULTILABEL"},
 }
-AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl-vision/data.csv"
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/vision_classification.csv"
 IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
 
-MODEL_NAME = "test_model"
+MODEL_NAME = "vision_clss_test_model"
 MODEL = {
     "display_name": MODEL_NAME,
     "image_classification_model_metadata": {"train_budget": 1},
 }
 
+CSV_FILE_NAME = "vision_classification.csv"
+GCS_FILE_PATH = f"automl/datasets/vision/{CSV_FILE_NAME}"
+DESTINATION_FILE_PATH = f"/tmp/{CSV_FILE_NAME}"
+
 extract_object_id = CloudAutoMLHook.extract_object_id
 
 # Example DAG for AutoML Vision Classification
@@ -73,7 +79,7 @@ with models.DAG(
     start_date=datetime(2021, 1, 1),
     catchup=False,
     user_defined_macros={"extract_object_id": extract_object_id},
-    tags=["example", "automl"],
+    tags=["example", "automl", "vision-clss"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket",
@@ -82,14 +88,43 @@ with models.DAG(
         location=GCP_AUTOML_LOCATION,
     )
 
-    move_dataset_file = GCSSynchronizeBucketsOperator(
-        task_id="move_data_to_bucket",
-        source_bucket=RESOURCE_DATA_BUCKET,
-        source_object="automl-vision",
-        destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
-        destination_object="automl-vision",
-        recursive=True,
-    )
+    @task
+    def upload_csv_file_to_gcs():
+        # download file to local storage
+        storage_client = storage.Client()
+        bucket = storage_client.bucket(RESOURCE_DATA_BUCKET)
+        blob = bucket.blob(GCS_FILE_PATH)
+        blob.download_to_filename(DESTINATION_FILE_PATH)
+
+        # update file content
+        with open(DESTINATION_FILE_PATH) as file:
+            lines = file.readlines()
+
+        updated_lines = [line.replace("template-bucket", DATA_SAMPLE_GCS_BUCKET_NAME) for line in lines]
+
+        with open(DESTINATION_FILE_PATH, "w") as file:
+            file.writelines(updated_lines)
+
+        # upload updated file to bucket storage
+        destination_bucket = storage_client.bucket(DATA_SAMPLE_GCS_BUCKET_NAME)
+        destination_blob = destination_bucket.blob(f"automl/{CSV_FILE_NAME}")
+        generation_match_precondition = 0
+        destination_blob.upload_from_filename(
+            DESTINATION_FILE_PATH, if_generation_match=generation_match_precondition
+        )
+
+    upload_csv_file_to_gcs_task = upload_csv_file_to_gcs()
+
+    copy_folder_tasks = [
+        GCSToGCSOperator(
+            task_id=f"copy_dataset_folder_{folder}",
+            source_bucket=RESOURCE_DATA_BUCKET,
+            source_object=f"automl/datasets/vision/{folder}",
+            destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+            destination_object=f"automl/{folder}",
+        )
+        for folder in ("cirrus", "cumulonimbus", "cumulus")
+    ]
 
     create_dataset_task = AutoMLCreateDatasetOperator(
         task_id="create_dataset_task",
@@ -110,16 +145,16 @@ with models.DAG(
     create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
     model_id = cast(str, XComArg(create_model, key="model_id"))
 
-    delete_model_task = AutoMLDeleteModelOperator(
-        task_id="delete_model_task",
+    delete_model = AutoMLDeleteModelOperator(
+        task_id="delete_model",
         model_id=model_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_datasets_task = AutoMLDeleteDatasetOperator(
-        task_id="delete_datasets_task",
+    delete_dataset = AutoMLDeleteDatasetOperator(
+        task_id="delete_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
@@ -135,14 +170,15 @@ with models.DAG(
     (
         # TEST SETUP
         create_bucket
-        >> move_dataset_file
+        >> upload_csv_file_to_gcs_task
+        >> copy_folder_tasks
+        # TEST BODY
         >> create_dataset_task
         >> import_dataset_task
-        # TEST BODY
         >> create_model
         # TEST TEARDOWN
-        >> delete_model_task
-        >> delete_datasets_task
+        >> delete_model
+        >> delete_dataset
         >> delete_bucket
     )
 
diff --git a/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py b/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py
index 933c2760cb..894b467dd3 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py
@@ -31,50 +31,75 @@ from airflow.providers.google.cloud.operators.automl import (
     AutoMLCreateDatasetOperator,
     AutoMLDeleteDatasetOperator,
     AutoMLDeleteModelOperator,
+    AutoMLDeployModelOperator,
     AutoMLImportDataOperator,
     AutoMLTrainModelOperator,
 )
-
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_DETECTION_BUCKET = os.environ.get(
-    "GCP_AUTOML_DETECTION_BUCKET", "gs://INVALID BUCKET NAME/img/openimage/csv/salads_ml_use.csv"
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+    GCSSynchronizeBucketsOperator,
 )
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_vision_obj_detect"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 
-# Example model
+MODEL_NAME = "vision_detect_test_model"
 MODEL = {
-    "display_name": "auto_model",
+    "display_name": MODEL_NAME,
     "image_object_detection_model_metadata": {},
 }
 
-# Example dataset
+DATASET_NAME = f"ds_vision_detect_{ENV_ID}".replace("-", "_")
 DATASET = {
-    "display_name": "test_detection_dataset",
+    "display_name": DATASET_NAME,
     "image_object_detection_dataset_metadata": {},
 }
 
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_DETECTION_BUCKET]}}
-
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/object_detection.csv"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
 extract_object_id = CloudAutoMLHook.extract_object_id
 
 
 # Example DAG for AutoML Vision Object Detection
 with models.DAG(
-    "example_automl_vision_detection",
+    DAG_ID,
+    schedule="@once",  # Override to match your needs
     start_date=datetime(2021, 1, 1),
     catchup=False,
     user_defined_macros={"extract_object_id": extract_object_id},
-    tags=["example"],
+    tags=["example", "automl", "object-detection"],
 ) as dag:
-    create_dataset_task = AutoMLCreateDatasetOperator(
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=GCP_AUTOML_LOCATION,
+    )
+
+    move_dataset_file = GCSSynchronizeBucketsOperator(
+        task_id="move_data_to_bucket",
+        source_bucket=RESOURCE_DATA_BUCKET,
+        source_object="automl/datasets/vision",
+        destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+        destination_object="automl",
+        recursive=True,
+    )
+
+    create_dataset = AutoMLCreateDatasetOperator(
         task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
     )
 
-    dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+    dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
     MODEL["dataset_id"] = dataset_id
 
-    import_dataset_task = AutoMLImportDataOperator(
-        task_id="import_dataset_task",
+    import_dataset = AutoMLImportDataOperator(
+        task_id="import_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         input_config=IMPORT_INPUT_CONFIG,
@@ -83,33 +108,47 @@ with models.DAG(
     MODEL["dataset_id"] = dataset_id
 
     create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
-
     model_id = cast(str, XComArg(create_model, key="model_id"))
 
-    delete_model_task = AutoMLDeleteModelOperator(
-        task_id="delete_model_task",
+    deploy_model = AutoMLDeployModelOperator(
+        task_id="deploy_model",
+        model_id=model_id,
+        location=GCP_AUTOML_LOCATION,
+        project_id=GCP_PROJECT_ID,
+    )
+
+    delete_model = AutoMLDeleteModelOperator(
+        task_id="delete_model",
         model_id=model_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    delete_datasets_task = AutoMLDeleteDatasetOperator(
-        task_id="delete_datasets_task",
+    delete_dataset = AutoMLDeleteDatasetOperator(
+        task_id="delete_dataset",
         dataset_id=dataset_id,
         location=GCP_AUTOML_LOCATION,
         project_id=GCP_PROJECT_ID,
     )
 
-    # TEST BODY
-    import_dataset_task >> create_model
-    # TEST TEARDOWN
-    delete_model_task >> delete_datasets_task
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
 
-    # Task dependencies created via `XComArgs`:
-    #   create_dataset_task >> import_dataset_task
-    #   create_dataset_task >> create_model
-    #   create_model >> delete_model_task
-    #   create_dataset_task >> delete_datasets_task
+    (
+        # TEST SETUP
+        [create_bucket >> move_dataset_file, create_dataset]
+        # TEST BODY
+        >> import_dataset
+        >> create_model
+        >> deploy_model
+        # TEST TEARDOWN
+        >> delete_model
+        >> delete_dataset
+        >> delete_bucket
+    )
 
     from tests.system.utils.watcher import watcher