You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/31 01:44:33 UTC

[airflow] branch main updated: Add system tests for Vertex AI operators in new approach (#27053)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a7c5f8fc4 Add system tests for Vertex AI operators in new approach (#27053)
7a7c5f8fc4 is described below

commit 7a7c5f8fc4284adfedcb0667ec7c935b913660cf
Author: VladaZakharova <80...@users.noreply.github.com>
AuthorDate: Mon Oct 31 02:44:15 2022 +0100

    Add system tests for Vertex AI operators in new approach (#27053)
---
 .../google/cloud/hooks/vertex_ai/auto_ml.py        |  75 +++--
 .../google/cloud/hooks/vertex_ai/custom_job.py     |  30 +-
 .../google/cloud/operators/vertex_ai/auto_ml.py    | 108 +++++---
 .../operators/vertex_ai/batch_prediction_job.py    |   2 +-
 .../google/cloud/operators/vertex_ai/custom_job.py |  37 ++-
 .../cloud/operators/vertex_ai/endpoint_service.py  |   2 +-
 .../cloud/operators/vertex_ai/model_service.py     |   2 +-
 .../operators/cloud/vertex_ai.rst                  |  70 ++---
 .../google/cloud/operators/test_vertex_ai.py       |  23 +-
 .../providers/google/cloud/vertex_ai/__init__.py   |  16 ++
 ...ample_vertex_ai_auto_ml_forecasting_training.py | 195 +++++++++++++
 .../example_vertex_ai_auto_ml_image_training.py    | 185 +++++++++++++
 .../example_vertex_ai_auto_ml_list_training.py     |  59 ++++
 .../example_vertex_ai_auto_ml_tabular_training.py  | 189 +++++++++++++
 .../example_vertex_ai_auto_ml_text_training.py     | 183 +++++++++++++
 .../example_vertex_ai_auto_ml_video_training.py    | 179 ++++++++++++
 .../example_vertex_ai_batch_prediction_job.py      | 234 ++++++++++++++++
 .../example_vertex_ai_custom_container.py          | 183 +++++++++++++
 .../vertex_ai/example_vertex_ai_custom_job.py      | 175 ++++++++++++
 .../example_vertex_ai_custom_job_python_package.py | 191 +++++++++++++
 .../cloud/vertex_ai/example_vertex_ai_dataset.py   | 305 +++++++++++++++++++++
 .../cloud/vertex_ai/example_vertex_ai_endpoint.py  | 256 +++++++++++++++++
 .../example_vertex_ai_hyperparameter_tuning_job.py | 158 +++++++++++
 .../example_vertex_ai_list_custom_jobs.py          |  57 ++++
 .../vertex_ai/example_vertex_ai_model_service.py   | 246 +++++++++++++++++
 .../vertex_ai/resources/California-housing.zip     | Bin 0 -> 366644 bytes
 .../google/cloud/vertex_ai/resources/__init__.py   |  16 ++
 .../cloud/vertex_ai/resources/all-datasets.zip     | Bin 0 -> 84305 bytes
 .../vertex_ai/resources/forecast-dataset.csv.zip   | Bin 0 -> 11538 bytes
 .../vertex_ai/resources/image-dataset.csv.zip      | Bin 0 -> 16626 bytes
 .../vertex_ai/resources/tabular-dataset.csv.zip    | Bin 0 -> 9403 bytes
 .../cloud/vertex_ai/resources/text-dataset.csv.zip | Bin 0 -> 41210 bytes
 .../vertex_ai/resources/video-dataset.csv.zip      | Bin 0 -> 5616 bytes
 33 files changed, 3053 insertions(+), 123 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py b/airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py
index 3b76229dbd..1a31e68cc8 100644
--- a/airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py
+++ b/airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py
@@ -259,6 +259,11 @@ class AutoMLHook(GoogleBaseHook):
         """Returns unique id of the Model."""
         return obj["name"].rpartition("/")[-1]
 
+    @staticmethod
+    def extract_training_id(resource_name: str) -> str:
+        """Returns unique id of the Training pipeline."""
+        return resource_name.rpartition("/")[-1]
+
     def wait_for_operation(self, operation: Operation, timeout: float | None = None):
         """Waits for long-lasting operation to complete."""
         try:
@@ -303,7 +308,7 @@ class AutoMLHook(GoogleBaseHook):
         export_evaluated_data_items_bigquery_destination_uri: str | None = None,
         export_evaluated_data_items_override_destination: bool = False,
         sync: bool = True,
-    ) -> models.Model:
+    ) -> tuple[models.Model | None, str]:
         """
         Create an AutoML Tabular Training Job.
 
@@ -488,9 +493,15 @@ class AutoMLHook(GoogleBaseHook):
             export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination,
             sync=sync,
         )
-        model.wait()
-
-        return model
+        training_id = self.extract_training_id(self._job.resource_name)
+        if model:
+            model.wait()
+        else:
+            self.log.warning(
+                "Training did not produce a Managed Model returning None. Training Pipeline is not "
+                "configured to upload a Model."
+            )
+        return model, training_id
 
     @GoogleBaseHook.fallback_to_default_project_id
     def create_auto_ml_forecasting_training_job(
@@ -529,7 +540,7 @@ class AutoMLHook(GoogleBaseHook):
         model_display_name: str | None = None,
         model_labels: dict[str, str] | None = None,
         sync: bool = True,
-    ) -> models.Model:
+    ) -> tuple[models.Model | None, str]:
         """
         Create an AutoML Forecasting Training Job.
 
@@ -715,9 +726,15 @@ class AutoMLHook(GoogleBaseHook):
             model_labels=model_labels,
             sync=sync,
         )
-        model.wait()
-
-        return model
+        training_id = self.extract_training_id(self._job.resource_name)
+        if model:
+            model.wait()
+        else:
+            self.log.warning(
+                "Training did not produce a Managed Model returning None. Training Pipeline is not "
+                "configured to upload a Model."
+            )
+        return model, training_id
 
     @GoogleBaseHook.fallback_to_default_project_id
     def create_auto_ml_image_training_job(
@@ -744,7 +761,7 @@ class AutoMLHook(GoogleBaseHook):
         model_labels: dict[str, str] | None = None,
         disable_early_stopping: bool = False,
         sync: bool = True,
-    ) -> models.Model:
+    ) -> tuple[models.Model | None, str]:
         """
         Create an AutoML Image Training Job.
 
@@ -885,9 +902,15 @@ class AutoMLHook(GoogleBaseHook):
             disable_early_stopping=disable_early_stopping,
             sync=sync,
         )
-        model.wait()
-
-        return model
+        training_id = self.extract_training_id(self._job.resource_name)
+        if model:
+            model.wait()
+        else:
+            self.log.warning(
+                "Training did not produce a Managed Model returning None. AutoML Image Training "
+                "Pipeline is not configured to upload a Model."
+            )
+        return model, training_id
 
     @GoogleBaseHook.fallback_to_default_project_id
     def create_auto_ml_text_training_job(
@@ -911,7 +934,7 @@ class AutoMLHook(GoogleBaseHook):
         model_display_name: str | None = None,
         model_labels: dict[str, str] | None = None,
         sync: bool = True,
-    ) -> models.Model:
+    ) -> tuple[models.Model | None, str]:
         """
         Create an AutoML Text Training Job.
 
@@ -1016,9 +1039,15 @@ class AutoMLHook(GoogleBaseHook):
             model_labels=model_labels,
             sync=sync,
         )
-        model.wait()
-
-        return model
+        training_id = self.extract_training_id(self._job.resource_name)
+        if model:
+            model.wait()
+        else:
+            self.log.warning(
+                "Training did not produce a Managed Model returning None. AutoML Text Training "
+                "Pipeline is not configured to upload a Model."
+            )
+        return model, training_id
 
     @GoogleBaseHook.fallback_to_default_project_id
     def create_auto_ml_video_training_job(
@@ -1039,7 +1068,7 @@ class AutoMLHook(GoogleBaseHook):
         model_display_name: str | None = None,
         model_labels: dict[str, str] | None = None,
         sync: bool = True,
-    ) -> models.Model:
+    ) -> tuple[models.Model | None, str]:
         """
         Create an AutoML Video Training Job.
 
@@ -1141,9 +1170,15 @@ class AutoMLHook(GoogleBaseHook):
             model_labels=model_labels,
             sync=sync,
         )
-        model.wait()
-
-        return model
+        training_id = self.extract_training_id(self._job.resource_name)
+        if model:
+            model.wait()
+        else:
+            self.log.warning(
+                "Training did not produce a Managed Model returning None. AutoML Video Training "
+                "Pipeline is not configured to upload a Model."
+            )
+        return model, training_id
 
     @GoogleBaseHook.fallback_to_default_project_id
     def delete_training_pipeline(
diff --git a/airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py b/airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py
index 9ded2770a2..77fb7d2cc6 100644
--- a/airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py
+++ b/airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py
@@ -247,6 +247,11 @@ class CustomJobHook(GoogleBaseHook):
         """Returns unique id of the Training pipeline."""
         return resource_name.rpartition("/")[-1]
 
+    @staticmethod
+    def extract_custom_job_id(custom_job_name: str) -> str:
+        """Returns unique id of the Custom Job pipeline."""
+        return custom_job_name.rpartition("/")[-1]
+
     def wait_for_operation(self, operation: Operation, timeout: float | None = None):
         """Waits for long-lasting operation to complete."""
         try:
@@ -292,7 +297,7 @@ class CustomJobHook(GoogleBaseHook):
         timestamp_split_column_name: str | None = None,
         tensorboard: str | None = None,
         sync=True,
-    ) -> tuple[models.Model | None, str]:
+    ) -> tuple[models.Model | None, str, str]:
         """Run Job for training pipeline"""
         model = job.run(
             dataset=dataset,
@@ -323,6 +328,9 @@ class CustomJobHook(GoogleBaseHook):
             sync=sync,
         )
         training_id = self.extract_training_id(job.resource_name)
+        custom_job_id = self.extract_custom_job_id(
+            job.gca_resource.training_task_metadata.get("backingCustomJob")
+        )
         if model:
             model.wait()
         else:
@@ -332,7 +340,7 @@ class CustomJobHook(GoogleBaseHook):
                 "model_serving_container_image_uri and model_display_name passed in. "
                 "Ensure that your training script saves to model to os.environ['AIP_MODEL_DIR']."
             )
-        return model, training_id
+        return model, training_id, custom_job_id
 
     @GoogleBaseHook.fallback_to_default_project_id
     def cancel_pipeline_job(
@@ -613,7 +621,7 @@ class CustomJobHook(GoogleBaseHook):
         timestamp_split_column_name: str | None = None,
         tensorboard: str | None = None,
         sync=True,
-    ) -> tuple[models.Model | None, str]:
+    ) -> tuple[models.Model | None, str, str]:
         """
         Create Custom Container Training Job
 
@@ -885,7 +893,7 @@ class CustomJobHook(GoogleBaseHook):
         if not self._job:
             raise AirflowException("CustomJob was not created")
 
-        model, training_id = self._run_job(
+        model, training_id, custom_job_id = self._run_job(
             job=self._job,
             dataset=dataset,
             annotation_schema_uri=annotation_schema_uri,
@@ -915,7 +923,7 @@ class CustomJobHook(GoogleBaseHook):
             sync=sync,
         )
 
-        return model, training_id
+        return model, training_id, custom_job_id
 
     @GoogleBaseHook.fallback_to_default_project_id
     def create_custom_python_package_training_job(
@@ -971,7 +979,7 @@ class CustomJobHook(GoogleBaseHook):
         timestamp_split_column_name: str | None = None,
         tensorboard: str | None = None,
         sync=True,
-    ) -> tuple[models.Model | None, str]:
+    ) -> tuple[models.Model | None, str, str]:
         """
         Create Custom Python Package Training Job
 
@@ -1243,7 +1251,7 @@ class CustomJobHook(GoogleBaseHook):
         if not self._job:
             raise AirflowException("CustomJob was not created")
 
-        model, training_id = self._run_job(
+        model, training_id, custom_job_id = self._run_job(
             job=self._job,
             dataset=dataset,
             annotation_schema_uri=annotation_schema_uri,
@@ -1273,7 +1281,7 @@ class CustomJobHook(GoogleBaseHook):
             sync=sync,
         )
 
-        return model, training_id
+        return model, training_id, custom_job_id
 
     @GoogleBaseHook.fallback_to_default_project_id
     def create_custom_training_job(
@@ -1329,7 +1337,7 @@ class CustomJobHook(GoogleBaseHook):
         timestamp_split_column_name: str | None = None,
         tensorboard: str | None = None,
         sync=True,
-    ) -> tuple[models.Model | None, str]:
+    ) -> tuple[models.Model | None, str, str]:
         """
         Create Custom Training Job
 
@@ -1601,7 +1609,7 @@ class CustomJobHook(GoogleBaseHook):
         if not self._job:
             raise AirflowException("CustomJob was not created")
 
-        model, training_id = self._run_job(
+        model, training_id, custom_job_id = self._run_job(
             job=self._job,
             dataset=dataset,
             annotation_schema_uri=annotation_schema_uri,
@@ -1631,7 +1639,7 @@ class CustomJobHook(GoogleBaseHook):
             sync=sync,
         )
 
-        return model, training_id
+        return model, training_id, custom_job_id
 
     @GoogleBaseHook.fallback_to_default_project_id
     def delete_pipeline_job(
diff --git a/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py b/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
index 8d1c13a9be..6851d52633 100644
--- a/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
+++ b/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
@@ -29,7 +29,11 @@ from google.cloud.aiplatform_v1.types.training_pipeline import TrainingPipeline
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.vertex_ai.auto_ml import AutoMLHook
-from airflow.providers.google.cloud.links.vertex_ai import VertexAIModelLink, VertexAITrainingPipelinesLink
+from airflow.providers.google.cloud.links.vertex_ai import (
+    VertexAIModelLink,
+    VertexAITrainingLink,
+    VertexAITrainingPipelinesLink,
+)
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -89,11 +93,12 @@ class AutoMLTrainingJobBaseOperator(BaseOperator):
 class CreateAutoMLForecastingTrainingJobOperator(AutoMLTrainingJobBaseOperator):
     """Create AutoML Forecasting Training job"""
 
-    template_fields = [
+    template_fields = (
+        "dataset_id",
         "region",
         "impersonation_chain",
-    ]
-    operator_extra_links = (VertexAIModelLink(),)
+    )
+    operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
 
     def __init__(
         self,
@@ -158,7 +163,7 @@ class CreateAutoMLForecastingTrainingJobOperator(AutoMLTrainingJobBaseOperator):
             delegate_to=self.delegate_to,
             impersonation_chain=self.impersonation_chain,
         )
-        model = self.hook.create_auto_ml_forecasting_training_job(
+        model, training_id = self.hook.create_auto_ml_forecasting_training_job(
             project_id=self.project_id,
             region=self.region,
             display_name=self.display_name,
@@ -199,20 +204,26 @@ class CreateAutoMLForecastingTrainingJobOperator(AutoMLTrainingJobBaseOperator):
             sync=self.sync,
         )
 
-        result = Model.to_dict(model)
-        model_id = self.hook.extract_model_id(result)
-        VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+        if model:
+            result = Model.to_dict(model)
+            model_id = self.hook.extract_model_id(result)
+            VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+        else:
+            result = model  # type: ignore
+        self.xcom_push(context, key="training_id", value=training_id)
+        VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
         return result
 
 
 class CreateAutoMLImageTrainingJobOperator(AutoMLTrainingJobBaseOperator):
     """Create Auto ML Image Training job"""
 
-    template_fields = [
+    template_fields = (
+        "dataset_id",
         "region",
         "impersonation_chain",
-    ]
-    operator_extra_links = (VertexAIModelLink(),)
+    )
+    operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
 
     def __init__(
         self,
@@ -249,7 +260,7 @@ class CreateAutoMLImageTrainingJobOperator(AutoMLTrainingJobBaseOperator):
             delegate_to=self.delegate_to,
             impersonation_chain=self.impersonation_chain,
         )
-        model = self.hook.create_auto_ml_image_training_job(
+        model, training_id = self.hook.create_auto_ml_image_training_job(
             project_id=self.project_id,
             region=self.region,
             display_name=self.display_name,
@@ -274,20 +285,26 @@ class CreateAutoMLImageTrainingJobOperator(AutoMLTrainingJobBaseOperator):
             sync=self.sync,
         )
 
-        result = Model.to_dict(model)
-        model_id = self.hook.extract_model_id(result)
-        VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+        if model:
+            result = Model.to_dict(model)
+            model_id = self.hook.extract_model_id(result)
+            VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+        else:
+            result = model  # type: ignore
+        self.xcom_push(context, key="training_id", value=training_id)
+        VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
         return result
 
 
 class CreateAutoMLTabularTrainingJobOperator(AutoMLTrainingJobBaseOperator):
     """Create Auto ML Tabular Training job"""
 
-    template_fields = [
+    template_fields = (
+        "dataset_id",
         "region",
         "impersonation_chain",
-    ]
-    operator_extra_links = (VertexAIModelLink(),)
+    )
+    operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
 
     def __init__(
         self,
@@ -340,7 +357,7 @@ class CreateAutoMLTabularTrainingJobOperator(AutoMLTrainingJobBaseOperator):
             delegate_to=self.delegate_to,
             impersonation_chain=self.impersonation_chain,
         )
-        model = self.hook.create_auto_ml_tabular_training_job(
+        model, training_id = self.hook.create_auto_ml_tabular_training_job(
             project_id=self.project_id,
             region=self.region,
             display_name=self.display_name,
@@ -375,9 +392,14 @@ class CreateAutoMLTabularTrainingJobOperator(AutoMLTrainingJobBaseOperator):
             sync=self.sync,
         )
 
-        result = Model.to_dict(model)
-        model_id = self.hook.extract_model_id(result)
-        VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+        if model:
+            result = Model.to_dict(model)
+            model_id = self.hook.extract_model_id(result)
+            VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+        else:
+            result = model  # type: ignore
+        self.xcom_push(context, key="training_id", value=training_id)
+        VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
         return result
 
 
@@ -385,10 +407,11 @@ class CreateAutoMLTextTrainingJobOperator(AutoMLTrainingJobBaseOperator):
     """Create Auto ML Text Training job"""
 
     template_fields = [
+        "dataset_id",
         "region",
         "impersonation_chain",
     ]
-    operator_extra_links = (VertexAIModelLink(),)
+    operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
 
     def __init__(
         self,
@@ -419,7 +442,7 @@ class CreateAutoMLTextTrainingJobOperator(AutoMLTrainingJobBaseOperator):
             delegate_to=self.delegate_to,
             impersonation_chain=self.impersonation_chain,
         )
-        model = self.hook.create_auto_ml_text_training_job(
+        model, training_id = self.hook.create_auto_ml_text_training_job(
             project_id=self.project_id,
             region=self.region,
             display_name=self.display_name,
@@ -441,20 +464,26 @@ class CreateAutoMLTextTrainingJobOperator(AutoMLTrainingJobBaseOperator):
             sync=self.sync,
         )
 
-        result = Model.to_dict(model)
-        model_id = self.hook.extract_model_id(result)
-        VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+        if model:
+            result = Model.to_dict(model)
+            model_id = self.hook.extract_model_id(result)
+            VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+        else:
+            result = model  # type: ignore
+        self.xcom_push(context, key="training_id", value=training_id)
+        VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
         return result
 
 
 class CreateAutoMLVideoTrainingJobOperator(AutoMLTrainingJobBaseOperator):
     """Create Auto ML Video Training job"""
 
-    template_fields = [
+    template_fields = (
+        "dataset_id",
         "region",
         "impersonation_chain",
-    ]
-    operator_extra_links = (VertexAIModelLink(),)
+    )
+    operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
 
     def __init__(
         self,
@@ -479,7 +508,7 @@ class CreateAutoMLVideoTrainingJobOperator(AutoMLTrainingJobBaseOperator):
             delegate_to=self.delegate_to,
             impersonation_chain=self.impersonation_chain,
         )
-        model = self.hook.create_auto_ml_video_training_job(
+        model, training_id = self.hook.create_auto_ml_video_training_job(
             project_id=self.project_id,
             region=self.region,
             display_name=self.display_name,
@@ -498,9 +527,14 @@ class CreateAutoMLVideoTrainingJobOperator(AutoMLTrainingJobBaseOperator):
             sync=self.sync,
         )
 
-        result = Model.to_dict(model)
-        model_id = self.hook.extract_model_id(result)
-        VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+        if model:
+            result = Model.to_dict(model)
+            model_id = self.hook.extract_model_id(result)
+            VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+        else:
+            result = model  # type: ignore
+        self.xcom_push(context, key="training_id", value=training_id)
+        VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
         return result
 
 
@@ -509,7 +543,7 @@ class DeleteAutoMLTrainingJobOperator(BaseOperator):
     AutoMLTextTrainingJob, or AutoMLVideoTrainingJob.
     """
 
-    template_fields = ("region", "project_id", "impersonation_chain")
+    template_fields = ("training_pipeline", "region", "project_id", "impersonation_chain")
 
     def __init__(
         self,
@@ -563,11 +597,11 @@ class ListAutoMLTrainingJobOperator(BaseOperator):
     AutoMLTextTrainingJob, or AutoMLVideoTrainingJob in a Location.
     """
 
-    template_fields = [
+    template_fields = (
         "region",
         "project_id",
         "impersonation_chain",
-    ]
+    )
     operator_extra_links = [
         VertexAITrainingPipelinesLink(),
     ]
diff --git a/airflow/providers/google/cloud/operators/vertex_ai/batch_prediction_job.py b/airflow/providers/google/cloud/operators/vertex_ai/batch_prediction_job.py
index 3579fcdd4e..448e46611e 100644
--- a/airflow/providers/google/cloud/operators/vertex_ai/batch_prediction_job.py
+++ b/airflow/providers/google/cloud/operators/vertex_ai/batch_prediction_job.py
@@ -156,7 +156,7 @@ class CreateBatchPredictionJobOperator(BaseOperator):
         account from the list granting this role to the originating account (templated).
     """
 
-    template_fields = ("region", "project_id", "impersonation_chain")
+    template_fields = ("region", "project_id", "model_name", "impersonation_chain")
     operator_extra_links = (VertexAIBatchPredictionJobLink(),)
 
     def __init__(
diff --git a/airflow/providers/google/cloud/operators/vertex_ai/custom_job.py b/airflow/providers/google/cloud/operators/vertex_ai/custom_job.py
index 0835500a28..b10a0715fc 100644
--- a/airflow/providers/google/cloud/operators/vertex_ai/custom_job.py
+++ b/airflow/providers/google/cloud/operators/vertex_ai/custom_job.py
@@ -119,7 +119,7 @@ class CustomTrainingJobBaseOperator(BaseOperator):
         self.staging_bucket = staging_bucket
         # END Custom
         # START Run param
-        self.dataset = Dataset(name=dataset_id) if dataset_id else None
+        self.dataset_id = dataset_id
         self.annotation_schema_uri = annotation_schema_uri
         self.model_display_name = model_display_name
         self.model_labels = model_labels
@@ -410,11 +410,12 @@ class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
         account from the list granting this role to the originating account (templated).
     """
 
-    template_fields = [
+    template_fields = (
         "region",
         "command",
+        "dataset_id",
         "impersonation_chain",
-    ]
+    )
     operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
 
     def __init__(
@@ -432,7 +433,7 @@ class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
             delegate_to=self.delegate_to,
             impersonation_chain=self.impersonation_chain,
         )
-        model, training_id = self.hook.create_custom_container_training_job(
+        model, training_id, custom_job_id = self.hook.create_custom_container_training_job(
             project_id=self.project_id,
             region=self.region,
             display_name=self.display_name,
@@ -454,7 +455,7 @@ class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
             model_encryption_spec_key_name=self.model_encryption_spec_key_name,
             staging_bucket=self.staging_bucket,
             # RUN
-            dataset=self.dataset,
+            dataset=Dataset(name=self.dataset_id) if self.dataset_id else None,
             annotation_schema_uri=self.annotation_schema_uri,
             model_display_name=self.model_display_name,
             model_labels=self.model_labels,
@@ -488,6 +489,8 @@ class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
             VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
         else:
             result = model  # type: ignore
+        self.xcom_push(context, key="training_id", value=training_id)
+        self.xcom_push(context, key="custom_job_id", value=custom_job_id)
         VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
         return result
 
@@ -759,10 +762,11 @@ class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator
         account from the list granting this role to the originating account (templated).
     """
 
-    template_fields = [
+    template_fields = (
         "region",
+        "dataset_id",
         "impersonation_chain",
-    ]
+    )
     operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
 
     def __init__(
@@ -782,7 +786,7 @@ class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator
             delegate_to=self.delegate_to,
             impersonation_chain=self.impersonation_chain,
         )
-        model, training_id = self.hook.create_custom_python_package_training_job(
+        model, training_id, custom_job_id = self.hook.create_custom_python_package_training_job(
             project_id=self.project_id,
             region=self.region,
             display_name=self.display_name,
@@ -805,7 +809,7 @@ class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator
             model_encryption_spec_key_name=self.model_encryption_spec_key_name,
             staging_bucket=self.staging_bucket,
             # RUN
-            dataset=self.dataset,
+            dataset=Dataset(name=self.dataset_id) if self.dataset_id else None,
             annotation_schema_uri=self.annotation_schema_uri,
             model_display_name=self.model_display_name,
             model_labels=self.model_labels,
@@ -839,6 +843,8 @@ class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator
             VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
         else:
             result = model  # type: ignore
+        self.xcom_push(context, key="training_id", value=training_id)
+        self.xcom_push(context, key="custom_job_id", value=custom_job_id)
         VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
         return result
 
@@ -1110,12 +1116,13 @@ class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
         account from the list granting this role to the originating account (templated).
     """
 
-    template_fields = [
+    template_fields = (
         "region",
         "script_path",
         "requirements",
+        "dataset_id",
         "impersonation_chain",
-    ]
+    )
     operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
 
     def __init__(
@@ -1135,7 +1142,7 @@ class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
             delegate_to=self.delegate_to,
             impersonation_chain=self.impersonation_chain,
         )
-        model, training_id = self.hook.create_custom_training_job(
+        model, training_id, custom_job_id = self.hook.create_custom_training_job(
             project_id=self.project_id,
             region=self.region,
             display_name=self.display_name,
@@ -1158,7 +1165,7 @@ class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
             model_encryption_spec_key_name=self.model_encryption_spec_key_name,
             staging_bucket=self.staging_bucket,
             # RUN
-            dataset=self.dataset,
+            dataset=Dataset(name=self.dataset_id) if self.dataset_id else None,
             annotation_schema_uri=self.annotation_schema_uri,
             model_display_name=self.model_display_name,
             model_labels=self.model_labels,
@@ -1192,6 +1199,8 @@ class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
             VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
         else:
             result = model  # type: ignore
+        self.xcom_push(context, key="training_id", value=training_id)
+        self.xcom_push(context, key="custom_job_id", value=custom_job_id)
         VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
         return result
 
@@ -1228,7 +1237,7 @@ class DeleteCustomTrainingJobOperator(BaseOperator):
         account from the list granting this role to the originating account (templated).
     """
 
-    template_fields = ("region", "project_id", "impersonation_chain")
+    template_fields = ("training_pipeline", "custom_job", "region", "project_id", "impersonation_chain")
 
     def __init__(
         self,
diff --git a/airflow/providers/google/cloud/operators/vertex_ai/endpoint_service.py b/airflow/providers/google/cloud/operators/vertex_ai/endpoint_service.py
index 72bd47efaa..cfe67cf7aa 100644
--- a/airflow/providers/google/cloud/operators/vertex_ai/endpoint_service.py
+++ b/airflow/providers/google/cloud/operators/vertex_ai/endpoint_service.py
@@ -246,7 +246,7 @@ class DeployModelOperator(BaseOperator):
         account from the list granting this role to the originating account (templated).
     """
 
-    template_fields = ("region", "endpoint_id", "project_id", "impersonation_chain")
+    template_fields = ("region", "endpoint_id", "project_id", "deployed_model", "impersonation_chain")
     operator_extra_links = (VertexAIModelLink(),)
 
     def __init__(
diff --git a/airflow/providers/google/cloud/operators/vertex_ai/model_service.py b/airflow/providers/google/cloud/operators/vertex_ai/model_service.py
index c2de199032..3b8c1e972f 100644
--- a/airflow/providers/google/cloud/operators/vertex_ai/model_service.py
+++ b/airflow/providers/google/cloud/operators/vertex_ai/model_service.py
@@ -320,7 +320,7 @@ class UploadModelOperator(BaseOperator):
         account from the list granting this role to the originating account (templated).
     """
 
-    template_fields = ("region", "project_id", "impersonation_chain")
+    template_fields = ("region", "project_id", "model", "impersonation_chain")
     operator_extra_links = (VertexAIModelLink(),)
 
     def __init__(
diff --git a/docs/apache-airflow-providers-google/operators/cloud/vertex_ai.rst b/docs/apache-airflow-providers-google/operators/cloud/vertex_ai.rst
index f10863c5cd..0ab36f0ab9 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/vertex_ai.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/vertex_ai.rst
@@ -33,7 +33,7 @@ To create a Google VertexAI dataset you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.CreateDatasetOperator`.
 The operator returns dataset id in :ref:`XCom <concepts:xcom>` under ``dataset_id`` key.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_dataset_operator]
@@ -42,7 +42,7 @@ The operator returns dataset id in :ref:`XCom <concepts:xcom>` under ``dataset_i
 After creating a dataset you can use it to import some data using
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.ImportDataOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_import_data_operator]
@@ -51,7 +51,7 @@ After creating a dataset you can use it to import some data using
 To export dataset you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.ExportDataOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_export_data_operator]
@@ -60,7 +60,7 @@ To export dataset you can use
 To delete dataset you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.DeleteDatasetOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_delete_dataset_operator]
@@ -69,7 +69,7 @@ To delete dataset you can use
 To get dataset you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.GetDatasetOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_get_dataset_operator]
@@ -78,7 +78,7 @@ To get dataset you can use
 To get a dataset list you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.ListDatasetsOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_list_dataset_operator]
@@ -87,7 +87,7 @@ To get a dataset list you can use
 To update dataset you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.UpdateDatasetOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_update_dataset_operator]
@@ -115,7 +115,7 @@ create image you can find by this link: https://cloud.google.com/vertex-ai/docs/
 After that you should put link to the image in ``container_uri`` parameter. Also you can type executing command
 for container which will be created from this image in ``command`` parameter.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
@@ -129,7 +129,7 @@ create you can find by this link: https://cloud.google.com/vertex-ai/docs/traini
 Next you should put link to the package in ``python_package_gcs_uri`` parameter, also ``python_module_name``
 parameter should has the name of script which will run your training task.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
@@ -140,7 +140,7 @@ How to run Training Job
 
 For this Job you should put path to your local training script inside ``script_path`` parameter.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_custom_training_job_operator]
@@ -149,7 +149,7 @@ For this Job you should put path to your local training script inside ``script_p
 You can get a list of Training Jobs using
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.ListCustomTrainingJobOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_list_custom_training_job_operator]
@@ -158,7 +158,7 @@ You can get a list of Training Jobs using
 If you wish to delete a Custom Training Job you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.DeleteCustomTrainingJobOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_delete_custom_training_job_operator]
@@ -182,7 +182,7 @@ How to run AutoML Forecasting Training Job
 Before start running this Job you must prepare and create ``TimeSeries`` dataset. After that you should
 put dataset id to ``dataset_id`` parameter in operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_auto_ml_forecasting_training_job_operator]
@@ -194,7 +194,7 @@ How to run AutoML Image Training Job
 Before start running this Job you must prepare and create ``Image`` dataset. After that you should
 put dataset id to ``dataset_id`` parameter in operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_auto_ml_image_training_job_operator]
@@ -206,7 +206,7 @@ How to run AutoML Tabular Training Job
 Before start running this Job you must prepare and create ``Tabular`` dataset. After that you should
 put dataset id to ``dataset_id`` parameter in operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
@@ -218,7 +218,7 @@ How to run AutoML Text Training Job
 Before start running this Job you must prepare and create ``Text`` dataset. After that you should
 put dataset id to ``dataset_id`` parameter in operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_auto_ml_text_training_job_operator]
@@ -230,7 +230,7 @@ How to run AutoML Video Training Job
 Before start running this Job you must prepare and create ``Video`` dataset. After that you should
 put dataset id to ``dataset_id`` parameter in operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
@@ -239,7 +239,7 @@ put dataset id to ``dataset_id`` parameter in operator.
 You can get a list of AutoML Training Jobs using
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.ListAutoMLTrainingJobOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_list_auto_ml_training_job_operator]
@@ -248,7 +248,7 @@ You can get a list of AutoML Training Jobs using
 If you wish to delete a Auto ML Training Job you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.DeleteAutoMLTrainingJobOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_delete_auto_ml_training_job_operator]
@@ -261,7 +261,7 @@ To create a Google VertexAI Batch Prediction Job you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.CreateBatchPredictionJobOperator`.
 The operator returns batch prediction job id in :ref:`XCom <concepts:xcom>` under ``batch_prediction_job_id`` key.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_batch_prediction_job_operator]
@@ -270,7 +270,7 @@ The operator returns batch prediction job id in :ref:`XCom <concepts:xcom>` unde
 To delete batch prediction job you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.DeleteBatchPredictionJobOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_delete_batch_prediction_job_operator]
@@ -279,7 +279,7 @@ To delete batch prediction job you can use
 To get a batch prediction job list you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.ListBatchPredictionJobsOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_list_batch_prediction_job_operator]
@@ -292,7 +292,7 @@ To create a Google VertexAI endpoint you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.CreateEndpointOperator`.
 The operator returns endpoint id in :ref:`XCom <concepts:xcom>` under ``endpoint_id`` key.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_endpoint_operator]
@@ -301,7 +301,7 @@ The operator returns endpoint id in :ref:`XCom <concepts:xcom>` under ``endpoint
 After creating an endpoint you can use it to deploy some model using
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.DeployModelOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_deploy_model_operator]
@@ -310,7 +310,7 @@ After creating an endpoint you can use it to deploy some model using
 To un deploy model you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.UndeployModelOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_undeploy_model_operator]
@@ -319,7 +319,7 @@ To un deploy model you can use
 To delete endpoint you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.DeleteEndpointOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_delete_endpoint_operator]
@@ -328,7 +328,7 @@ To delete endpoint you can use
 To get an endpoint list you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.ListEndpointsOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_list_endpoints_operator]
@@ -341,7 +341,7 @@ To create a Google VertexAI hyperparameter tuning job you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job.CreateHyperparameterTuningJobOperator`.
 The operator returns hyperparameter tuning job id in :ref:`XCom <concepts:xcom>` under ``hyperparameter_tuning_job_id`` key.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]
@@ -350,7 +350,7 @@ The operator returns hyperparameter tuning job id in :ref:`XCom <concepts:xcom>`
 To delete hyperparameter tuning job you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job.DeleteHyperparameterTuningJobOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator]
@@ -359,7 +359,7 @@ To delete hyperparameter tuning job you can use
 To get hyperparameter tuning job you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job.GetHyperparameterTuningJobOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator]
@@ -368,7 +368,7 @@ To get hyperparameter tuning job you can use
 To get a hyperparameter tuning job list you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job.ListHyperparameterTuningJobOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator]
@@ -381,7 +381,7 @@ To upload a Google VertexAI model you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.model_service.UploadModelOperator`.
 The operator returns model id in :ref:`XCom <concepts:xcom>` under ``model_id`` key.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_upload_model_operator]
@@ -390,7 +390,7 @@ The operator returns model id in :ref:`XCom <concepts:xcom>` under ``model_id``
 To export model you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.model_service.ExportModelOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_export_model_operator]
@@ -399,7 +399,7 @@ To export model you can use
 To delete model you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.model_service.DeleteModelOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_delete_model_operator]
@@ -408,7 +408,7 @@ To delete model you can use
 To get a model list you can use
 :class:`~airflow.providers.google.cloud.operators.vertex_ai.model_service.ListModelsOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_list_models_operator]
diff --git a/tests/providers/google/cloud/operators/test_vertex_ai.py b/tests/providers/google/cloud/operators/test_vertex_ai.py
index 73beee1144..2ebc069968 100644
--- a/tests/providers/google/cloud/operators/test_vertex_ai.py
+++ b/tests/providers/google/cloud/operators/test_vertex_ai.py
@@ -170,7 +170,11 @@ TEST_OUTPUT_CONFIG = {
 class TestVertexAICreateCustomContainerTrainingJobOperator:
     @mock.patch(VERTEX_AI_PATH.format("custom_job.CustomJobHook"))
     def test_execute(self, mock_hook):
-        mock_hook.return_value.create_custom_container_training_job.return_value = (None, "training_id")
+        mock_hook.return_value.create_custom_container_training_job.return_value = (
+            None,
+            "training_id",
+            "custom_job_id",
+        )
         op = CreateCustomContainerTrainingJobOperator(
             task_id=TASK_ID,
             gcp_conn_id=GCP_CONN_ID,
@@ -251,7 +255,11 @@ class TestVertexAICreateCustomContainerTrainingJobOperator:
 class TestVertexAICreateCustomPythonPackageTrainingJobOperator:
     @mock.patch(VERTEX_AI_PATH.format("custom_job.CustomJobHook"))
     def test_execute(self, mock_hook):
-        mock_hook.return_value.create_custom_python_package_training_job.return_value = (None, "training_id")
+        mock_hook.return_value.create_custom_python_package_training_job.return_value = (
+            None,
+            "training_id",
+            "custom_job_id",
+        )
         op = CreateCustomPythonPackageTrainingJobOperator(
             task_id=TASK_ID,
             gcp_conn_id=GCP_CONN_ID,
@@ -334,7 +342,11 @@ class TestVertexAICreateCustomPythonPackageTrainingJobOperator:
 class TestVertexAICreateCustomTrainingJobOperator:
     @mock.patch(VERTEX_AI_PATH.format("custom_job.CustomJobHook"))
     def test_execute(self, mock_hook):
-        mock_hook.return_value.create_custom_training_job.return_value = (None, "training_id")
+        mock_hook.return_value.create_custom_training_job.return_value = (
+            None,
+            "training_id",
+            "custom_job_id",
+        )
         op = CreateCustomTrainingJobOperator(
             task_id=TASK_ID,
             gcp_conn_id=GCP_CONN_ID,
@@ -691,6 +703,7 @@ class TestVertexAICreateAutoMLForecastingTrainingJobOperator:
     @mock.patch("google.cloud.aiplatform.datasets.TimeSeriesDataset")
     @mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
     def test_execute(self, mock_hook, mock_dataset):
+        mock_hook.return_value.create_auto_ml_forecasting_training_job.return_value = (None, "training_id")
         op = CreateAutoMLForecastingTrainingJobOperator(
             task_id=TASK_ID,
             gcp_conn_id=GCP_CONN_ID,
@@ -757,6 +770,7 @@ class TestVertexAICreateAutoMLImageTrainingJobOperator:
     @mock.patch("google.cloud.aiplatform.datasets.ImageDataset")
     @mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
     def test_execute(self, mock_hook, mock_dataset):
+        mock_hook.return_value.create_auto_ml_image_training_job.return_value = (None, "training_id")
         op = CreateAutoMLImageTrainingJobOperator(
             task_id=TASK_ID,
             gcp_conn_id=GCP_CONN_ID,
@@ -806,6 +820,7 @@ class TestVertexAICreateAutoMLTabularTrainingJobOperator:
     @mock.patch("google.cloud.aiplatform.datasets.TabularDataset")
     @mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
     def test_execute(self, mock_hook, mock_dataset):
+        mock_hook.return_value.create_auto_ml_tabular_training_job.return_value = (None, "training_id")
         op = CreateAutoMLTabularTrainingJobOperator(
             task_id=TASK_ID,
             gcp_conn_id=GCP_CONN_ID,
@@ -860,6 +875,7 @@ class TestVertexAICreateAutoMLTextTrainingJobOperator:
     @mock.patch("google.cloud.aiplatform.datasets.TextDataset")
     @mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
     def test_execute(self, mock_hook, mock_dataset):
+        mock_hook.return_value.create_auto_ml_text_training_job.return_value = (None, "training_id")
         op = CreateAutoMLTextTrainingJobOperator(
             task_id=TASK_ID,
             gcp_conn_id=GCP_CONN_ID,
@@ -906,6 +922,7 @@ class TestVertexAICreateAutoMLVideoTrainingJobOperator:
     @mock.patch("google.cloud.aiplatform.datasets.VideoDataset")
     @mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
     def test_execute(self, mock_hook, mock_dataset):
+        mock_hook.return_value.create_auto_ml_video_training_job.return_value = (None, "training_id")
         op = CreateAutoMLVideoTrainingJobOperator(
             task_id=TASK_ID,
             gcp_conn_id=GCP_CONN_ID,
diff --git a/tests/system/providers/google/cloud/vertex_ai/__init__.py b/tests/system/providers/google/cloud/vertex_ai/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
new file mode 100644
index 0000000000..c90f9bfc87
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+    CreateAutoMLForecastingTrainingJobOperator,
+    DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+FORECASTING_DISPLAY_NAME = f"auto-ml-forecasting-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-forecasting-model-{ENV_ID}"
+
+FORECAST_GCS_BUCKET_NAME = f"bucket_forecast_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+FORECAST_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "forecast-dataset.csv.zip")
+FORECAST_GCS_OBJECT_NAME = "vertex-ai/forecast-dataset.csv"
+FORECAST_CSV_FILE_LOCAL_PATH = "/forecast/forecast-dataset.csv"
+
+FORECAST_DATASET = {
+    "display_name": f"forecast-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.time_series,
+    "metadata": ParseDict(
+        {
+            "input_config": {
+                "gcs_source": {"uri": [f"gs://{FORECAST_GCS_BUCKET_NAME}/vertex-ai/forecast-dataset.csv"]}
+            }
+        },
+        Value(),
+    ),
+}
+
+TEST_TIME_COLUMN = "date"
+TEST_TIME_SERIES_IDENTIFIER_COLUMN = "store_name"
+TEST_TARGET_COLUMN = "sale_dollars"
+
+COLUMN_SPECS = {
+    TEST_TIME_COLUMN: "timestamp",
+    TEST_TARGET_COLUMN: "numeric",
+    "city": "categorical",
+    "zip_code": "categorical",
+    "county": "categorical",
+}
+
+
+with models.DAG(
+    f"{DAG_ID}_forecasting_training_job",
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "auto_ml"],
+) as dag:
+
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=FORECAST_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"unzip {FORECAST_ZIP_CSV_FILE_LOCAL_PATH} -d /forecast/",
+    )
+
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=FORECAST_CSV_FILE_LOCAL_PATH,
+        dst=FORECAST_GCS_OBJECT_NAME,
+        bucket=FORECAST_GCS_BUCKET_NAME,
+    )
+
+    create_forecast_dataset = CreateDatasetOperator(
+        task_id="forecast_dataset",
+        dataset=FORECAST_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    forecast_dataset_id = create_forecast_dataset.output["dataset_id"]
+
+    # [START how_to_cloud_vertex_ai_create_auto_ml_forecasting_training_job_operator]
+    create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
+        task_id="auto_ml_forecasting_task",
+        display_name=FORECASTING_DISPLAY_NAME,
+        optimization_objective="minimize-rmse",
+        column_specs=COLUMN_SPECS,
+        # run params
+        dataset_id=forecast_dataset_id,
+        target_column=TEST_TARGET_COLUMN,
+        time_column=TEST_TIME_COLUMN,
+        time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
+        available_at_forecast_columns=[TEST_TIME_COLUMN],
+        unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
+        time_series_attribute_columns=["city", "zip_code", "county"],
+        forecast_horizon=30,
+        context_window=30,
+        data_granularity_unit="day",
+        data_granularity_count=1,
+        weight_column=None,
+        budget_milli_node_hours=1000,
+        model_display_name=MODEL_DISPLAY_NAME,
+        predefined_split_column_name=None,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_auto_ml_forecasting_training_job_operator]
+
+    # [START how_to_cloud_vertex_ai_delete_auto_ml_training_job_operator]
+    delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
+        task_id="delete_auto_ml_forecasting_training_job",
+        training_pipeline_id=create_auto_ml_forecasting_training_job.output["training_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_delete_auto_ml_training_job_operator]
+
+    delete_forecast_dataset = DeleteDatasetOperator(
+        task_id="delete_forecast_dataset",
+        dataset_id=forecast_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=FORECAST_GCS_BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /forecast/*",
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> unzip_file
+        >> upload_files
+        >> create_forecast_dataset
+        # TEST BODY
+        >> create_auto_ml_forecasting_training_job
+        # TEST TEARDOWN
+        >> delete_auto_ml_forecasting_training_job
+        >> delete_forecast_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py
new file mode 100644
index 0000000000..fbc193bb1a
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+    CreateAutoMLImageTrainingJobOperator,
+    DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ImportDataOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+IMAGE_DISPLAY_NAME = f"auto-ml-image-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-image-model-{ENV_ID}"
+
+IMAGE_GCS_BUCKET_NAME = f"bucket_image_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+IMAGE_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "image-dataset.csv.zip")
+IMAGE_GCS_OBJECT_NAME = "vertex-ai/image-dataset.csv"
+IMAGE_CSV_FILE_LOCAL_PATH = "/image/image-dataset.csv"
+
+IMAGE_DATASET = {
+    "display_name": f"image-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.image,
+    "metadata": Value(string_value="image-dataset"),
+}
+IMAGE_DATA_CONFIG = [
+    {
+        "import_schema_uri": schema.dataset.ioformat.image.single_label_classification,
+        "gcs_source": {"uris": [f"gs://{IMAGE_GCS_BUCKET_NAME}/vertex-ai/image-dataset.csv"]},
+    },
+]
+
+
+with models.DAG(
+    f"{DAG_ID}_image_training_job",
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "auto_ml"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=IMAGE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"unzip {IMAGE_ZIP_CSV_FILE_LOCAL_PATH} -d /image/",
+    )
+
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=IMAGE_CSV_FILE_LOCAL_PATH,
+        dst=IMAGE_GCS_OBJECT_NAME,
+        bucket=IMAGE_GCS_BUCKET_NAME,
+    )
+
+    create_image_dataset = CreateDatasetOperator(
+        task_id="image_dataset",
+        dataset=IMAGE_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    image_dataset_id = create_image_dataset.output["dataset_id"]
+
+    import_image_dataset = ImportDataOperator(
+        task_id="import_image_data",
+        dataset_id=image_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        import_configs=IMAGE_DATA_CONFIG,
+    )
+
+    # [START how_to_cloud_vertex_ai_create_auto_ml_image_training_job_operator]
+    create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
+        task_id="auto_ml_image_task",
+        display_name=IMAGE_DISPLAY_NAME,
+        dataset_id=image_dataset_id,
+        prediction_type="classification",
+        multi_label=False,
+        model_type="CLOUD",
+        training_fraction_split=0.6,
+        validation_fraction_split=0.2,
+        test_fraction_split=0.2,
+        budget_milli_node_hours=8000,
+        model_display_name=MODEL_DISPLAY_NAME,
+        disable_early_stopping=False,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_auto_ml_image_training_job_operator]
+
+    delete_auto_ml_image_training_job = DeleteAutoMLTrainingJobOperator(
+        task_id="delete_auto_ml_training_job",
+        training_pipeline_id=create_auto_ml_image_training_job.output["training_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_image_dataset = DeleteDatasetOperator(
+        task_id="delete_image_dataset",
+        dataset_id=image_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=IMAGE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /image/*",
+    )
+
+    (
+        # TEST SETUP
+        [
+            create_bucket,
+            create_image_dataset,
+        ]
+        >> unzip_file
+        >> upload_files
+        >> import_image_dataset
+        # TEST BODY
+        >> create_auto_ml_image_training_job
+        # TEST TEARDOWN
+        >> delete_auto_ml_image_training_job
+        >> delete_image_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py
new file mode 100644
index 0000000000..b8f5cdae68
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ListAutoMLTrainingJobOperator
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+
+
+with models.DAG(
+    f"{DAG_ID}_list_training_job",
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "auto_ml", "list_operation"],
+) as dag:
+
+    # [START how_to_cloud_vertex_ai_list_auto_ml_training_job_operator]
+    list_auto_ml_training_job = ListAutoMLTrainingJobOperator(
+        task_id="list_auto_ml_training_job",
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_list_auto_ml_training_job_operator]
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py
new file mode 100644
index 0000000000..af8d908b76
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py
@@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+    CreateAutoMLTabularTrainingJobOperator,
+    DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+TABULAR_DISPLAY_NAME = f"auto-ml-tabular-{ENV_ID}"
+MODEL_DISPLAY_NAME = "adopted-prediction-model"
+
+TABULAR_GCS_BUCKET_NAME = f"bucket_tabular_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+TABULAR_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "tabular-dataset.csv.zip")
+TABULAR_GCS_OBJECT_NAME = "vertex-ai/tabular-dataset.csv"
+TABULAR_CSV_FILE_LOCAL_PATH = "/tabular/tabular-dataset.csv"
+
+TABULAR_DATASET = {
+    "display_name": f"tabular-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.tabular,
+    "metadata": ParseDict(
+        {
+            "input_config": {
+                "gcs_source": {"uri": [f"gs://{TABULAR_GCS_BUCKET_NAME}/vertex-ai/tabular-dataset.csv"]}
+            }
+        },
+        Value(),
+    ),
+}
+
+COLUMN_TRANSFORMATIONS = [
+    {"categorical": {"column_name": "Type"}},
+    {"numeric": {"column_name": "Age"}},
+    {"categorical": {"column_name": "Breed1"}},
+    {"categorical": {"column_name": "Color1"}},
+    {"categorical": {"column_name": "Color2"}},
+    {"categorical": {"column_name": "MaturitySize"}},
+    {"categorical": {"column_name": "FurLength"}},
+    {"categorical": {"column_name": "Vaccinated"}},
+    {"categorical": {"column_name": "Sterilized"}},
+    {"categorical": {"column_name": "Health"}},
+    {"numeric": {"column_name": "Fee"}},
+    {"numeric": {"column_name": "PhotoAmt"}},
+]
+
+with models.DAG(
+    f"{DAG_ID}_tabular_training_job",
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "auto_ml"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=TABULAR_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"unzip {TABULAR_ZIP_CSV_FILE_LOCAL_PATH} -d /tabular/",
+    )
+
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=TABULAR_CSV_FILE_LOCAL_PATH,
+        dst=TABULAR_GCS_OBJECT_NAME,
+        bucket=TABULAR_GCS_BUCKET_NAME,
+    )
+
+    create_tabular_dataset = CreateDatasetOperator(
+        task_id="tabular_dataset",
+        dataset=TABULAR_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
+
+    # [START how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
+    create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
+        task_id="auto_ml_tabular_task",
+        display_name=TABULAR_DISPLAY_NAME,
+        optimization_prediction_type="classification",
+        column_transformations=COLUMN_TRANSFORMATIONS,
+        dataset_id=tabular_dataset_id,
+        target_column="Adopted",
+        training_fraction_split=0.8,
+        validation_fraction_split=0.1,
+        test_fraction_split=0.1,
+        model_display_name=MODEL_DISPLAY_NAME,
+        disable_early_stopping=False,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
+
+    delete_auto_ml_tabular_training_job = DeleteAutoMLTrainingJobOperator(
+        task_id="delete_auto_ml_training_job",
+        training_pipeline_id=create_auto_ml_tabular_training_job.output["training_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_tabular_dataset = DeleteDatasetOperator(
+        task_id="delete_tabular_dataset",
+        dataset_id=tabular_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=TABULAR_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /tabular/*",
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> unzip_file
+        >> upload_files
+        >> create_tabular_dataset
+        # TEST BODY
+        >> create_auto_ml_tabular_training_job
+        # TEST TEARDOWN
+        >> delete_auto_ml_tabular_training_job
+        >> delete_tabular_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py
new file mode 100644
index 0000000000..945e0f2964
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+    CreateAutoMLTextTrainingJobOperator,
+    DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ImportDataOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+TEXT_DISPLAY_NAME = f"auto-ml-text-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-text-model-{ENV_ID}"
+
+TEXT_GCS_BUCKET_NAME = f"bucket_text_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+TEXT_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "text-dataset.csv.zip")
+TEXT_GCS_OBJECT_NAME = "vertex-ai/text-dataset.csv"
+TEXT_CSV_FILE_LOCAL_PATH = "/text/text-dataset.csv"
+
+TEXT_DATASET = {
+    "display_name": f"text-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.text,
+    "metadata": Value(string_value="text-dataset"),
+}
+TEXT_DATA_CONFIG = [
+    {
+        "import_schema_uri": schema.dataset.ioformat.text.single_label_classification,
+        "gcs_source": {"uris": [f"gs://{TEXT_GCS_BUCKET_NAME}/vertex-ai/text-dataset.csv"]},
+    },
+]
+
+with models.DAG(
+    f"{DAG_ID}_text_training_job",
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "auto_ml"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=TEXT_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"unzip {TEXT_ZIP_CSV_FILE_LOCAL_PATH} -d /text/",
+    )
+
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=TEXT_CSV_FILE_LOCAL_PATH,
+        dst=TEXT_GCS_OBJECT_NAME,
+        bucket=TEXT_GCS_BUCKET_NAME,
+    )
+
+    create_text_dataset = CreateDatasetOperator(
+        task_id="text_dataset",
+        dataset=TEXT_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    text_dataset_id = create_text_dataset.output["dataset_id"]
+
+    import_text_dataset = ImportDataOperator(
+        task_id="import_text_data",
+        dataset_id=text_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        import_configs=TEXT_DATA_CONFIG,
+    )
+
+    # [START how_to_cloud_vertex_ai_create_auto_ml_text_training_job_operator]
+    create_auto_ml_text_training_job = CreateAutoMLTextTrainingJobOperator(
+        task_id="auto_ml_text_task",
+        display_name=TEXT_DISPLAY_NAME,
+        prediction_type="classification",
+        multi_label=False,
+        dataset_id=text_dataset_id,
+        model_display_name=MODEL_DISPLAY_NAME,
+        training_fraction_split=0.7,
+        validation_fraction_split=0.2,
+        test_fraction_split=0.1,
+        sync=True,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_auto_ml_text_training_job_operator]
+
+    delete_auto_ml_text_training_job = DeleteAutoMLTrainingJobOperator(
+        task_id="delete_auto_ml_text_training_job",
+        training_pipeline_id=create_auto_ml_text_training_job.output["training_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_text_dataset = DeleteDatasetOperator(
+        task_id="delete_text_dataset",
+        dataset_id=text_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=TEXT_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /text/*",
+    )
+
+    (
+        # TEST SETUP
+        [
+            create_bucket,
+            create_text_dataset,
+        ]
+        >> unzip_file
+        >> upload_files
+        >> import_text_dataset
+        # TEST BODY
+        >> create_auto_ml_text_training_job
+        # TEST TEARDOWN
+        >> delete_auto_ml_text_training_job
+        >> delete_text_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
new file mode 100644
index 0000000000..522a5c26f3
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
@@ -0,0 +1,179 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+    CreateAutoMLVideoTrainingJobOperator,
+    DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ImportDataOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+VIDEO_DISPLAY_NAME = f"auto-ml-video-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-video-model-{ENV_ID}"
+
+VIDEO_GCS_BUCKET_NAME = f"bucket_custom_python_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+VIDEO_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "video-dataset.csv.zip")
+VIDEO_GCS_OBJECT_NAME = "vertex-ai/video-dataset.csv"
+VIDEO_CSV_FILE_LOCAL_PATH = "/video/video-dataset.csv"
+
+VIDEO_DATASET = {
+    "display_name": f"video-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.video,
+    "metadata": Value(string_value="video-dataset"),
+}
+VIDEO_DATA_CONFIG = [
+    {
+        "import_schema_uri": schema.dataset.ioformat.video.classification,
+        "gcs_source": {"uris": [f"gs://{VIDEO_GCS_BUCKET_NAME}/vertex-ai/video-dataset.csv"]},
+    },
+]
+
+with models.DAG(
+    f"{DAG_ID}_video_training_job",
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "auto_ml"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=VIDEO_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"unzip {VIDEO_ZIP_CSV_FILE_LOCAL_PATH} -d /video/",
+    )
+
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=VIDEO_CSV_FILE_LOCAL_PATH,
+        dst=VIDEO_GCS_OBJECT_NAME,
+        bucket=VIDEO_GCS_BUCKET_NAME,
+    )
+
+    create_video_dataset = CreateDatasetOperator(
+        task_id="video_dataset",
+        dataset=VIDEO_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    video_dataset_id = create_video_dataset.output["dataset_id"]
+
+    import_video_dataset = ImportDataOperator(
+        task_id="import_video_data",
+        dataset_id=video_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        import_configs=VIDEO_DATA_CONFIG,
+    )
+
+    # [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
+    create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
+        task_id="auto_ml_video_task",
+        display_name=VIDEO_DISPLAY_NAME,
+        prediction_type="classification",
+        model_type="CLOUD",
+        dataset_id=video_dataset_id,
+        model_display_name=MODEL_DISPLAY_NAME,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
+
+    delete_auto_ml_video_training_job = DeleteAutoMLTrainingJobOperator(
+        task_id="delete_auto_ml_video_training_job",
+        training_pipeline_id=create_auto_ml_video_training_job.output["training_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_video_dataset = DeleteDatasetOperator(
+        task_id="delete_video_dataset",
+        dataset_id=video_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=VIDEO_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /video/*",
+    )
+
+    (
+        # TEST SETUP
+        [
+            create_bucket,
+            create_video_dataset,
+        ]
+        >> unzip_file
+        >> upload_files
+        >> import_video_dataset
+        # TEST BODY
+        >> create_auto_ml_video_training_job
+        # TEST TEARDOWN
+        >> delete_auto_ml_video_training_job
+        >> delete_video_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
new file mode 100644
index 0000000000..41f6ff7854
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
@@ -0,0 +1,234 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Batch Prediction operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+    CreateAutoMLForecastingTrainingJobOperator,
+    DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job import (
+    CreateBatchPredictionJobOperator,
+    DeleteBatchPredictionJobOperator,
+    ListBatchPredictionJobsOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_batch_prediction_job_operations"
+REGION = "us-central1"
+
+FORECAST_DISPLAY_NAME = f"auto-ml-forecasting-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-forecasting-model-{ENV_ID}"
+
+JOB_DISPLAY_NAME = f"batch_prediction_job_test_{ENV_ID}"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/forecast-dataset.csv"
+FORECAST_ZIP_CSV_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / "forecast-dataset.csv.zip")
+FORECAST_CSV_FILE_LOCAL_PATH = "/batch-prediction/forecast-dataset.csv"
+
+FORECAST_DATASET = {
+    "display_name": f"forecast-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.time_series,
+    "metadata": ParseDict(
+        {
+            "input_config": {
+                "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}
+            }
+        },
+        Value(),
+    ),
+}
+
+TEST_TIME_COLUMN = "date"
+TEST_TIME_SERIES_IDENTIFIER_COLUMN = "store_name"
+TEST_TARGET_COLUMN = "sale_dollars"
+COLUMN_SPECS = {
+    TEST_TIME_COLUMN: "timestamp",
+    TEST_TARGET_COLUMN: "numeric",
+    "city": "categorical",
+    "zip_code": "categorical",
+    "county": "categorical",
+}
+
+BIGQUERY_SOURCE = f"bq://{PROJECT_ID}.test_iowa_liquor_sales_forecasting_us.2021_sales_predict"
+GCS_DESTINATION_PREFIX = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/output"
+MODEL_PARAMETERS = ParseDict({}, Value())
+
+
+with models.DAG(
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    render_template_as_native_obj=True,
+    tags=["example", "vertex_ai", "batch_prediction_job"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"mkdir -p /batch-prediction && "
+        f"unzip {FORECAST_ZIP_CSV_FILE_LOCAL_PATH} -d /batch-prediction/",
+    )
+
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=FORECAST_CSV_FILE_LOCAL_PATH,
+        dst=DATA_SAMPLE_GCS_OBJECT_NAME,
+        bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+    )
+    create_forecast_dataset = CreateDatasetOperator(
+        task_id="forecast_dataset",
+        dataset=FORECAST_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
+        task_id="auto_ml_forecasting_task",
+        display_name=FORECAST_DISPLAY_NAME,
+        optimization_objective="minimize-rmse",
+        column_specs=COLUMN_SPECS,
+        # run params
+        dataset_id=create_forecast_dataset.output["dataset_id"],
+        target_column=TEST_TARGET_COLUMN,
+        time_column=TEST_TIME_COLUMN,
+        time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
+        available_at_forecast_columns=[TEST_TIME_COLUMN],
+        unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
+        time_series_attribute_columns=["city", "zip_code", "county"],
+        forecast_horizon=30,
+        context_window=30,
+        data_granularity_unit="day",
+        data_granularity_count=1,
+        weight_column=None,
+        budget_milli_node_hours=1000,
+        model_display_name=MODEL_DISPLAY_NAME,
+        predefined_split_column_name=None,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+
+    # [START how_to_cloud_vertex_ai_create_batch_prediction_job_operator]
+    create_batch_prediction_job = CreateBatchPredictionJobOperator(
+        task_id="create_batch_prediction_job",
+        job_display_name=JOB_DISPLAY_NAME,
+        model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
+        predictions_format="csv",
+        bigquery_source=BIGQUERY_SOURCE,
+        gcs_destination_prefix=GCS_DESTINATION_PREFIX,
+        model_parameters=MODEL_PARAMETERS,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_batch_prediction_job_operator]
+
+    # [START how_to_cloud_vertex_ai_list_batch_prediction_job_operator]
+    list_batch_prediction_job = ListBatchPredictionJobsOperator(
+        task_id="list_batch_prediction_jobs",
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_list_batch_prediction_job_operator]
+
+    # [START how_to_cloud_vertex_ai_delete_batch_prediction_job_operator]
+    delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
+        task_id="delete_batch_prediction_job",
+        batch_prediction_job_id=create_batch_prediction_job.output["batch_prediction_job_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END how_to_cloud_vertex_ai_delete_batch_prediction_job_operator]
+
+    delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
+        task_id="delete_auto_ml_forecasting_training_job",
+        training_pipeline_id=create_auto_ml_forecasting_training_job.output["training_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    delete_forecast_dataset = DeleteDatasetOperator(
+        task_id="delete_forecast_dataset",
+        dataset_id=create_forecast_dataset.output["dataset_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /batch-prediction/*",
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> unzip_file
+        >> upload_files
+        >> create_forecast_dataset
+        >> create_auto_ml_forecasting_training_job
+        # TEST BODY
+        >> create_batch_prediction_job
+        >> list_batch_prediction_job
+        # TEST TEARDOWN
+        >> delete_batch_prediction_job
+        >> delete_auto_ml_forecasting_training_job
+        >> delete_forecast_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
new file mode 100644
index 0000000000..42b915f20f
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Custom Jobs operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomContainerTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_custom_job_operations"
+REGION = "us-central1"
+CONTAINER_DISPLAY_NAME = f"train-housing-container-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"container-housing-model-{ENV_ID}"
+
+CUSTOM_CONTAINER_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/california_housing_train.csv"
+CSV_FILE_LOCAL_PATH = "/custom-job/california_housing_train.csv"
+RESOURCES_PATH = Path(__file__).parent / "resources"
+CSV_ZIP_FILE_LOCAL_PATH = str(RESOURCES_PATH / "California-housing.zip")
+
+TABULAR_DATASET = lambda bucket_name: {
+    "display_name": f"tabular-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.tabular,
+    "metadata": ParseDict(
+        {"input_config": {"gcs_source": {"uri": [f"gs://{bucket_name}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}}},
+        Value(),
+    ),
+}
+
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+CUSTOM_CONTAINER_URI = f"us-central1-docker.pkg.dev/{PROJECT_ID}/system-tests/housing:latest"
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+
+with models.DAG(
+    f"{DAG_ID}_custom_container",
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "custom_job"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=CUSTOM_CONTAINER_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"mkdir -p /custom-job/ && unzip {CSV_ZIP_FILE_LOCAL_PATH} -d /custom-job/",
+    )
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=CSV_FILE_LOCAL_PATH,
+        dst=DATA_SAMPLE_GCS_OBJECT_NAME,
+        bucket=CUSTOM_CONTAINER_GCS_BUCKET_NAME,
+    )
+    create_tabular_dataset = CreateDatasetOperator(
+        task_id="tabular_dataset",
+        dataset=TABULAR_DATASET(CUSTOM_CONTAINER_GCS_BUCKET_NAME),
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
+
+    # [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+    create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
+        task_id="custom_container_task",
+        staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
+        display_name=CONTAINER_DISPLAY_NAME,
+        container_uri=CUSTOM_CONTAINER_URI,
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=tabular_dataset_id,
+        command=["python3", "task.py"],
+        model_display_name=MODEL_DISPLAY_NAME,
+        replica_count=REPLICA_COUNT,
+        machine_type=MACHINE_TYPE,
+        accelerator_type=ACCELERATOR_TYPE,
+        accelerator_count=ACCELERATOR_COUNT,
+        training_fraction_split=TRAINING_FRACTION_SPLIT,
+        validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+        test_fraction_split=TEST_FRACTION_SPLIT,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+
+    delete_custom_training_job = DeleteCustomTrainingJobOperator(
+        task_id="delete_custom_training_job",
+        training_pipeline_id=create_custom_container_training_job.output["training_id"],
+        custom_job_id=create_custom_container_training_job.output["custom_job_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_tabular_dataset = DeleteDatasetOperator(
+        task_id="delete_tabular_dataset",
+        dataset_id=tabular_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=CUSTOM_CONTAINER_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /custom-job/*",
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> unzip_file
+        >> upload_files
+        >> create_tabular_dataset
+        # TEST BODY
+        >> create_custom_container_training_job
+        # TEST TEARDOWN
+        >> delete_custom_training_job
+        >> delete_tabular_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
new file mode 100644
index 0000000000..aaa289d9a0
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
@@ -0,0 +1,175 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Custom Jobs operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "vertex_ai_custom_job_operations"
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+REGION = "us-central1"
+CUSTOM_DISPLAY_NAME = f"train-housing-custom-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"custom-housing-model-{ENV_ID}"
+
+CUSTOM_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/california_housing_train.csv"
+CSV_ZIP_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / "California-housing.zip")
+CSV_FILE_LOCAL_PATH = "/custom-job/california_housing_train.csv"
+
+TABULAR_DATASET = lambda bucket_name: {
+    "display_name": f"tabular-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.tabular,
+    "metadata": ParseDict(
+        {"input_config": {"gcs_source": {"uri": [f"gs://{bucket_name}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}}},
+        Value(),
+    ),
+}
+
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+
+LOCAL_TRAINING_SCRIPT_PATH = "/custom-job/california_housing_training_script.py"
+
+
+with models.DAG(
+    f"{DAG_ID}_custom",
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "custom_job"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=CUSTOM_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"mkdir -p /custom-job && unzip {CSV_ZIP_FILE_LOCAL_PATH} -d /custom-job/",
+    )
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=CSV_FILE_LOCAL_PATH,
+        dst=DATA_SAMPLE_GCS_OBJECT_NAME,
+        bucket=CUSTOM_GCS_BUCKET_NAME,
+    )
+    create_tabular_dataset = CreateDatasetOperator(
+        task_id="tabular_dataset",
+        dataset=TABULAR_DATASET(CUSTOM_GCS_BUCKET_NAME),
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
+
+    # [START how_to_cloud_vertex_ai_create_custom_training_job_operator]
+    create_custom_training_job = CreateCustomTrainingJobOperator(
+        task_id="custom_task",
+        staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
+        display_name=CUSTOM_DISPLAY_NAME,
+        script_path=LOCAL_TRAINING_SCRIPT_PATH,
+        container_uri=CONTAINER_URI,
+        requirements=["gcsfs==0.7.1"],
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=tabular_dataset_id,
+        replica_count=REPLICA_COUNT,
+        model_display_name=MODEL_DISPLAY_NAME,
+        sync=False,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_training_job_operator]
+
+    # [START how_to_cloud_vertex_ai_delete_custom_training_job_operator]
+    delete_custom_training_job = DeleteCustomTrainingJobOperator(
+        task_id="delete_custom_training_job",
+        training_pipeline_id=create_custom_training_job.output["training_id"],
+        custom_job_id=create_custom_training_job.output["custom_job_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END how_to_cloud_vertex_ai_delete_custom_training_job_operator]
+
+    delete_tabular_dataset = DeleteDatasetOperator(
+        task_id="delete_tabular_dataset",
+        dataset_id=tabular_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=CUSTOM_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /custom-job/*",
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> unzip_file
+        >> upload_files
+        >> create_tabular_dataset
+        # TEST BODY
+        >> create_custom_training_job
+        # TEST TEARDOWN
+        >> delete_custom_training_job
+        >> delete_tabular_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
new file mode 100644
index 0000000000..c3a2a27c73
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Custom Jobs operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomPythonPackageTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_custom_job_operations"
+REGION = "us-central1"
+PACKAGE_DISPLAY_NAME = f"train-housing-py-package-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"py-package-housing-model-{ENV_ID}"
+
+CUSTOM_PYTHON_GCS_BUCKET_NAME = f"bucket_python_{DAG_ID}_{ENV_ID}"
+
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/california_housing_train.csv"
+RESOURCES_PATH = Path(__file__).parent / "resources"
+CSV_ZIP_FILE_LOCAL_PATH = str(RESOURCES_PATH / "California-housing.zip")
+CSV_FILE_LOCAL_PATH = "/custom-job/california_housing_train.csv"
+TAR_FILE_LOCAL_PATH = "/custom-job/custom_trainer_script-0.1.tar"
+FILES_TO_UPLOAD = [
+    CSV_FILE_LOCAL_PATH,
+    TAR_FILE_LOCAL_PATH,
+]
+
+TABULAR_DATASET = lambda bucket_name: {
+    "display_name": f"tabular-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.tabular,
+    "metadata": ParseDict(
+        {"input_config": {"gcs_source": {"uri": [f"gs://{bucket_name}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}}},
+        Value(),
+    ),
+}
+
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+PYTHON_PACKAGE_GCS_URI = f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}/vertex-ai/custom_trainer_script-0.1.tar"
+PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
+
+
+with models.DAG(
+    f"{DAG_ID}_python_package",
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "custom_job"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=CUSTOM_PYTHON_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"mkdir -p /custom-job && unzip {CSV_ZIP_FILE_LOCAL_PATH} -d /custom-job/",
+    )
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=FILES_TO_UPLOAD,
+        dst="vertex-ai/",
+        bucket=CUSTOM_PYTHON_GCS_BUCKET_NAME,
+    )
+    create_tabular_dataset = CreateDatasetOperator(
+        task_id="tabular_dataset",
+        dataset=TABULAR_DATASET(CUSTOM_PYTHON_GCS_BUCKET_NAME),
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
+
+    # [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+    create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
+        task_id="python_package_task",
+        staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
+        display_name=PACKAGE_DISPLAY_NAME,
+        python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
+        python_module_name=PYTHON_MODULE_NAME,
+        container_uri=CONTAINER_URI,
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=tabular_dataset_id,
+        model_display_name=MODEL_DISPLAY_NAME,
+        replica_count=REPLICA_COUNT,
+        machine_type=MACHINE_TYPE,
+        accelerator_type=ACCELERATOR_TYPE,
+        accelerator_count=ACCELERATOR_COUNT,
+        training_fraction_split=TRAINING_FRACTION_SPLIT,
+        validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+        test_fraction_split=TEST_FRACTION_SPLIT,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+
+    delete_custom_training_job = DeleteCustomTrainingJobOperator(
+        task_id="delete_custom_training_job",
+        training_pipeline_id=create_custom_python_package_training_job.output["training_id"],
+        custom_job_id=create_custom_python_package_training_job.output["custom_job_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_tabular_dataset = DeleteDatasetOperator(
+        task_id="delete_tabular_dataset",
+        dataset_id=tabular_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=CUSTOM_PYTHON_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /custom-job/*",
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> unzip_file
+        >> upload_files
+        >> create_tabular_dataset
+        # TEST BODY
+        >> create_custom_python_package_training_job
+        # TEST TEARDOWN
+        >> delete_custom_training_job
+        >> delete_tabular_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
new file mode 100644
index 0000000000..270103b7f1
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
@@ -0,0 +1,305 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Dataset operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ExportDataOperator,
+    GetDatasetOperator,
+    ImportDataOperator,
+    ListDatasetsOperator,
+    UpdateDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_dataset_operations"
+REGION = "us-central1"
+
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+ALL_DATASETS_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "all-datasets.zip")
+
+CSV_FILES_LOCAL_PATH = [
+    "/all-datasets/forecast-dataset.csv",
+    "/all-datasets/image-dataset.csv",
+    "/all-datasets/tabular-dataset.csv",
+    "/all-datasets/text-dataset.csv",
+    "/all-datasets/video-dataset.csv",
+]
+
+TIME_SERIES_DATASET = {
+    "display_name": f"time-series-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.time_series,
+    "metadata": ParseDict(
+        {
+            "input_config": {
+                "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/forecast-dataset.csv"]}
+            }
+        },
+        Value(),
+    ),
+}
+IMAGE_DATASET = {
+    "display_name": f"image-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.image,
+    "metadata": Value(string_value="image-dataset"),
+}
+TABULAR_DATASET = {
+    "display_name": f"tabular-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.tabular,
+    "metadata": ParseDict(
+        {
+            "input_config": {
+                "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/tabular-dataset.csv"]}
+            }
+        },
+        Value(),
+    ),
+}
+TEXT_DATASET = {
+    "display_name": f"text-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.text,
+    "metadata": Value(string_value="text-dataset"),
+}
+VIDEO_DATASET = {
+    "display_name": f"video-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.video,
+    "metadata": Value(string_value="video-dataset"),
+}
+TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/exports"}}
+TEST_IMPORT_CONFIG = [
+    {
+        "data_item_labels": {
+            "test-labels-name": "test-labels-value",
+        },
+        "import_schema_uri": (
+            "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
+        ),
+        "gcs_source": {
+            "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
+        },
+    },
+]
+DATASET_TO_UPDATE = {"display_name": "test-name"}
+TEST_UPDATE_MASK = {"paths": ["displayName"]}
+
+
+with models.DAG(
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "dataset"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"unzip {ALL_DATASETS_ZIP_CSV_FILE_LOCAL_PATH} -d /all-datasets/",
+    )
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=CSV_FILES_LOCAL_PATH,
+        dst="vertex-ai/",
+        bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+    )
+
+    # [START how_to_cloud_vertex_ai_create_dataset_operator]
+    create_image_dataset_job = CreateDatasetOperator(
+        task_id="image_dataset",
+        dataset=IMAGE_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    create_tabular_dataset_job = CreateDatasetOperator(
+        task_id="tabular_dataset",
+        dataset=TABULAR_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    create_text_dataset_job = CreateDatasetOperator(
+        task_id="text_dataset",
+        dataset=TEXT_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    create_video_dataset_job = CreateDatasetOperator(
+        task_id="video_dataset",
+        dataset=VIDEO_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    create_time_series_dataset_job = CreateDatasetOperator(
+        task_id="time_series_dataset",
+        dataset=TIME_SERIES_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_dataset_operator]
+
+    # [START how_to_cloud_vertex_ai_delete_dataset_operator]
+    delete_dataset_job = DeleteDatasetOperator(
+        task_id="delete_dataset",
+        dataset_id=create_text_dataset_job.output["dataset_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_delete_dataset_operator]
+
+    # [START how_to_cloud_vertex_ai_get_dataset_operator]
+    get_dataset = GetDatasetOperator(
+        task_id="get_dataset",
+        project_id=PROJECT_ID,
+        region=REGION,
+        dataset_id=create_tabular_dataset_job.output["dataset_id"],
+    )
+    # [END how_to_cloud_vertex_ai_get_dataset_operator]
+
+    # [START how_to_cloud_vertex_ai_export_data_operator]
+    export_data_job = ExportDataOperator(
+        task_id="export_data",
+        dataset_id=create_image_dataset_job.output["dataset_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        export_config=TEST_EXPORT_CONFIG,
+    )
+    # [END how_to_cloud_vertex_ai_export_data_operator]
+
+    # [START how_to_cloud_vertex_ai_import_data_operator]
+    import_data_job = ImportDataOperator(
+        task_id="import_data",
+        dataset_id=create_image_dataset_job.output["dataset_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        import_configs=TEST_IMPORT_CONFIG,
+    )
+    # [END how_to_cloud_vertex_ai_import_data_operator]
+
+    # [START how_to_cloud_vertex_ai_list_dataset_operator]
+    list_dataset_job = ListDatasetsOperator(
+        task_id="list_dataset",
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_list_dataset_operator]
+
+    # [START how_to_cloud_vertex_ai_update_dataset_operator]
+    update_dataset_job = UpdateDatasetOperator(
+        task_id="update_dataset",
+        project_id=PROJECT_ID,
+        region=REGION,
+        dataset_id=create_video_dataset_job.output["dataset_id"],
+        dataset=DATASET_TO_UPDATE,
+        update_mask=TEST_UPDATE_MASK,
+    )
+    # [END how_to_cloud_vertex_ai_update_dataset_operator]
+
+    delete_time_series_dataset_job = DeleteDatasetOperator(
+        task_id="delete_time_series_dataset",
+        dataset_id=create_time_series_dataset_job.output["dataset_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_tabular_dataset_job = DeleteDatasetOperator(
+        task_id="delete_tabular_dataset",
+        dataset_id=create_tabular_dataset_job.output["dataset_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_image_dataset_job = DeleteDatasetOperator(
+        task_id="delete_image_dataset",
+        dataset_id=create_image_dataset_job.output["dataset_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_video_dataset_job = DeleteDatasetOperator(
+        task_id="delete_video_dataset",
+        dataset_id=create_video_dataset_job.output["dataset_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /all-datasets/*",
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> unzip_file
+        >> upload_files
+        # TEST BODY
+        >> [
+            create_time_series_dataset_job >> delete_time_series_dataset_job,
+            create_text_dataset_job >> delete_dataset_job,
+            create_tabular_dataset_job >> get_dataset >> delete_tabular_dataset_job,
+            create_image_dataset_job >> import_data_job >> export_data_job >> delete_image_dataset_job,
+            create_video_dataset_job >> update_dataset_job >> delete_video_dataset_job,
+            list_dataset_job,
+        ]
+        # TEST TEARDOWN
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
new file mode 100644
index 0000000000..da18746cf4
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Endpoint Service operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud import aiplatform
+from google.cloud.aiplatform import schema
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+    CreateAutoMLImageTrainingJobOperator,
+    DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ImportDataOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.endpoint_service import (
+    CreateEndpointOperator,
+    DeleteEndpointOperator,
+    DeployModelOperator,
+    ListEndpointsOperator,
+    UndeployModelOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_endpoint_service_operations"
+REGION = "us-central1"
+IMAGE_DISPLAY_NAME = f"auto-ml-image-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-image-model-{ENV_ID}"
+
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/image-dataset.csv"
+IMAGE_ZIP_CSV_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / "image-dataset.csv.zip")
+IMAGE_CSV_FILE_LOCAL_PATH = "/endpoint/image-dataset.csv"
+
+IMAGE_DATASET = {
+    "display_name": f"image-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.image,
+    "metadata": Value(string_value="image-dataset"),
+}
+IMAGE_DATA_CONFIG = [
+    {
+        "import_schema_uri": schema.dataset.ioformat.image.single_label_classification,
+        "gcs_source": {"uris": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/image-dataset.csv"]},
+    },
+]
+
+ENDPOINT_CONF = {
+    "display_name": f"endpoint_test_{ENV_ID}",
+}
+
+
+with models.DAG(
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    render_template_as_native_obj=True,
+    tags=["example", "vertex_ai", "endpoint_service"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"unzip {IMAGE_ZIP_CSV_FILE_LOCAL_PATH} -d /endpoint/",
+    )
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=IMAGE_CSV_FILE_LOCAL_PATH,
+        dst=DATA_SAMPLE_GCS_OBJECT_NAME,
+        bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+    )
+    create_image_dataset = CreateDatasetOperator(
+        task_id="image_dataset",
+        dataset=IMAGE_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    image_dataset_id = create_image_dataset.output["dataset_id"]
+
+    import_image_dataset = ImportDataOperator(
+        task_id="import_image_data",
+        dataset_id=image_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        import_configs=IMAGE_DATA_CONFIG,
+    )
+
+    create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
+        task_id="auto_ml_image_task",
+        display_name=IMAGE_DISPLAY_NAME,
+        dataset_id=image_dataset_id,
+        prediction_type="classification",
+        multi_label=False,
+        model_type="CLOUD",
+        training_fraction_split=0.6,
+        validation_fraction_split=0.2,
+        test_fraction_split=0.2,
+        budget_milli_node_hours=8000,
+        model_display_name=MODEL_DISPLAY_NAME,
+        disable_early_stopping=False,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    DEPLOYED_MODEL = {
+        # format: 'projects/{project}/locations/{location}/models/{model}'
+        "model": "{{ti.xcom_pull('auto_ml_image_task')['name']}}",
+        "display_name": f"temp_endpoint_test_{ENV_ID}",
+        "dedicated_resources": {
+            "machine_spec": {
+                "machine_type": "n1-standard-2",
+                "accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
+                "accelerator_count": 1,
+            },
+            "min_replica_count": 1,
+            "max_replica_count": 1,
+        },
+    }
+
+    # [START how_to_cloud_vertex_ai_create_endpoint_operator]
+    create_endpoint = CreateEndpointOperator(
+        task_id="create_endpoint",
+        endpoint=ENDPOINT_CONF,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_endpoint_operator]
+
+    # [START how_to_cloud_vertex_ai_delete_endpoint_operator]
+    delete_endpoint = DeleteEndpointOperator(
+        task_id="delete_endpoint",
+        endpoint_id=create_endpoint.output["endpoint_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_delete_endpoint_operator]
+
+    # [START how_to_cloud_vertex_ai_list_endpoints_operator]
+    list_endpoints = ListEndpointsOperator(
+        task_id="list_endpoints",
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_list_endpoints_operator]
+
+    # [START how_to_cloud_vertex_ai_deploy_model_operator]
+    deploy_model = DeployModelOperator(
+        task_id="deploy_model",
+        endpoint_id=create_endpoint.output["endpoint_id"],
+        deployed_model=DEPLOYED_MODEL,
+        traffic_split={"0": 100},
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_deploy_model_operator]
+
+    # [START how_to_cloud_vertex_ai_undeploy_model_operator]
+    undeploy_model = UndeployModelOperator(
+        task_id="undeploy_model",
+        endpoint_id=create_endpoint.output["endpoint_id"],
+        deployed_model_id=deploy_model.output["deployed_model_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_undeploy_model_operator]
+
+    delete_auto_ml_image_training_job = DeleteAutoMLTrainingJobOperator(
+        task_id="delete_auto_ml_training_job",
+        training_pipeline_id=create_auto_ml_image_training_job.output["training_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_image_dataset = DeleteDatasetOperator(
+        task_id="delete_image_dataset",
+        dataset_id=image_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /endpoint/*",
+    )
+
+    (
+        # TEST SETUP
+        [
+            create_bucket,
+            create_image_dataset,
+        ]
+        >> unzip_file
+        >> upload_files
+        >> import_image_dataset
+        >> create_auto_ml_image_training_job
+        # TEST BODY
+        >> create_endpoint
+        >> deploy_model
+        >> undeploy_model
+        >> delete_endpoint
+        >> list_endpoints
+        # TEST TEARDOWN
+        >> delete_auto_ml_image_training_job
+        >> delete_image_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
new file mode 100644
index 0000000000..1976a026ec
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
@@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Hyperparameter Tuning Job operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from google.cloud import aiplatform
+
+from airflow import models
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job import (
+    CreateHyperparameterTuningJobOperator,
+    DeleteHyperparameterTuningJobOperator,
+    GetHyperparameterTuningJobOperator,
+    ListHyperparameterTuningJobOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_hyperparameter_tuning_job_operations"
+REGION = "us-central1"
+DISPLAY_NAME = f"hyperparameter-tuning-job-{ENV_ID}"
+
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_hyperparameter_tuning_job_{ENV_ID}"
+STAGING_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+WORKER_POOL_SPECS = [
+    {
+        "machine_spec": {
+            "machine_type": MACHINE_TYPE,
+            "accelerator_type": ACCELERATOR_TYPE,
+            "accelerator_count": ACCELERATOR_COUNT,
+        },
+        "replica_count": REPLICA_COUNT,
+        "container_spec": {
+            "image_uri": f"gcr.io/{PROJECT_ID}/horse-human:hypertune",
+        },
+    }
+]
+PARAM_SPECS = {
+    "learning_rate": aiplatform.hyperparameter_tuning.DoubleParameterSpec(min=0.01, max=1, scale="log"),
+    "momentum": aiplatform.hyperparameter_tuning.DoubleParameterSpec(min=0, max=1, scale="linear"),
+    "num_neurons": aiplatform.hyperparameter_tuning.DiscreteParameterSpec(
+        values=[64, 128, 512], scale="linear"
+    ),
+}
+METRIC_SPEC = {
+    "accuracy": "maximize",
+}
+
+
+with models.DAG(
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "hyperparameter_tuning_job"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+
+    # [START how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]
+    create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
+        task_id="create_hyperparameter_tuning_job",
+        staging_bucket=STAGING_BUCKET,
+        display_name=DISPLAY_NAME,
+        worker_pool_specs=WORKER_POOL_SPECS,
+        sync=False,
+        region=REGION,
+        project_id=PROJECT_ID,
+        parameter_spec=PARAM_SPECS,
+        metric_spec=METRIC_SPEC,
+        max_trial_count=15,
+        parallel_trial_count=3,
+    )
+    # [END how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]
+
+    # [START how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator]
+    get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
+        task_id="get_hyperparameter_tuning_job",
+        project_id=PROJECT_ID,
+        region=REGION,
+        hyperparameter_tuning_job_id=create_hyperparameter_tuning_job.output["hyperparameter_tuning_job_id"],
+    )
+    # [END how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator]
+
+    # [START how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator]
+    delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
+        task_id="delete_hyperparameter_tuning_job",
+        project_id=PROJECT_ID,
+        region=REGION,
+        hyperparameter_tuning_job_id=create_hyperparameter_tuning_job.output["hyperparameter_tuning_job_id"],
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator]
+
+    # [START how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator]
+    list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
+        task_id="list_hyperparameter_tuning_job",
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator]
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        # TEST BODY
+        >> create_hyperparameter_tuning_job
+        >> get_hyperparameter_tuning_job
+        >> delete_hyperparameter_tuning_job
+        >> list_hyperparameter_tuning_job
+        # TEST TEARDOWN
+        >> delete_bucket
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py
new file mode 100644
index 0000000000..1876139efc
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Custom Jobs operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import ListCustomTrainingJobOperator
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_custom_job_operations"
+REGION = "us-central1"
+
+with models.DAG(
+    f"{DAG_ID}_list_custom_job",
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "vertex_ai", "custom_job"],
+) as dag:
+    # [START how_to_cloud_vertex_ai_list_custom_training_job_operator]
+    list_custom_training_job = ListCustomTrainingJobOperator(
+        task_id="list_custom_training_job",
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_list_custom_training_job_operator]
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
new file mode 100644
index 0000000000..98e0c8d20d
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
@@ -0,0 +1,246 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Model Service operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.model_service import (
+    DeleteModelOperator,
+    ExportModelOperator,
+    ListModelsOperator,
+    UploadModelOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_model_service_operations"
+REGION = "us-central1"
+TRAIN_DISPLAY_NAME = f"train-housing-custom-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"custom-housing-model-{ENV_ID}"
+
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+STAGING_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}"
+
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/california_housing_train.csv"
+CSV_FILE_LOCAL_PATH = "/model_service/california_housing_train.csv"
+RESOURCES_PATH = Path(__file__).parent / "resources"
+CSV_ZIP_FILE_LOCAL_PATH = str(RESOURCES_PATH / "California-housing.zip")
+
+TABULAR_DATASET = {
+    "display_name": f"tabular-dataset-{ENV_ID}",
+    "metadata_schema_uri": schema.dataset.metadata.tabular,
+    "metadata": ParseDict(
+        {
+            "input_config": {
+                "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}
+            }
+        },
+        Value(),
+    ),
+}
+
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+
+LOCAL_TRAINING_SCRIPT_PATH = "/model_service/california_housing_training_script.py"
+
+MODEL_OUTPUT_CONFIG = {
+    "artifact_destination": {
+        "output_uri_prefix": STAGING_BUCKET,
+    },
+    "export_format_id": "custom-trained",
+}
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+MODEL_OBJ = {
+    "display_name": f"model-{ENV_ID}",
+    "artifact_uri": "{{ti.xcom_pull('custom_task')['artifactUri']}}",
+    "container_spec": {
+        "image_uri": MODEL_SERVING_CONTAINER_URI,
+        "command": [],
+        "args": [],
+        "env": [],
+        "ports": [],
+        "predict_route": "",
+        "health_route": "",
+    },
+}
+
+
+with models.DAG(
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    render_template_as_native_obj=True,
+    tags=["example", "vertex_ai", "model_service"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+    unzip_file = BashOperator(
+        task_id="unzip_csv_data_file",
+        bash_command=f"mkdir -p /model_service && unzip {CSV_ZIP_FILE_LOCAL_PATH} -d /model_service/",
+    )
+    upload_files = LocalFilesystemToGCSOperator(
+        task_id="upload_file_to_bucket",
+        src=CSV_FILE_LOCAL_PATH,
+        dst=DATA_SAMPLE_GCS_OBJECT_NAME,
+        bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+    )
+    create_tabular_dataset = CreateDatasetOperator(
+        task_id="tabular_dataset",
+        dataset=TABULAR_DATASET,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
+
+    create_custom_training_job = CreateCustomTrainingJobOperator(
+        task_id="custom_task",
+        staging_bucket=f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}",
+        display_name=TRAIN_DISPLAY_NAME,
+        script_path=LOCAL_TRAINING_SCRIPT_PATH,
+        container_uri=CONTAINER_URI,
+        requirements=["gcsfs==0.7.1"],
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=tabular_dataset_id,
+        replica_count=1,
+        model_display_name=MODEL_DISPLAY_NAME,
+        sync=False,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+
+    # [START how_to_cloud_vertex_ai_upload_model_operator]
+    upload_model = UploadModelOperator(
+        task_id="upload_model",
+        region=REGION,
+        project_id=PROJECT_ID,
+        model=MODEL_OBJ,
+    )
+    # [END how_to_cloud_vertex_ai_upload_model_operator]
+
+    # [START how_to_cloud_vertex_ai_export_model_operator]
+    export_model = ExportModelOperator(
+        task_id="export_model",
+        project_id=PROJECT_ID,
+        region=REGION,
+        model_id=upload_model.output["model_id"],
+        output_config=MODEL_OUTPUT_CONFIG,
+    )
+    # [END how_to_cloud_vertex_ai_export_model_operator]
+
+    # [START how_to_cloud_vertex_ai_delete_model_operator]
+    delete_model = DeleteModelOperator(
+        task_id="delete_model",
+        project_id=PROJECT_ID,
+        region=REGION,
+        model_id=upload_model.output["model_id"],
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END how_to_cloud_vertex_ai_delete_model_operator]
+
+    # [START how_to_cloud_vertex_ai_list_models_operator]
+    list_models = ListModelsOperator(
+        task_id="list_models",
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_list_models_operator]
+
+    delete_custom_training_job = DeleteCustomTrainingJobOperator(
+        task_id="delete_custom_training_job",
+        training_pipeline_id=create_custom_training_job.output["training_id"],
+        custom_job_id=create_custom_training_job.output["custom_job_id"],
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_tabular_dataset = DeleteDatasetOperator(
+        task_id="delete_tabular_dataset",
+        dataset_id=tabular_dataset_id,
+        region=REGION,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    clear_folder = BashOperator(
+        task_id="clear_folder",
+        bash_command="rm -r /model_service/*",
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> unzip_file
+        >> upload_files
+        >> create_tabular_dataset
+        >> create_custom_training_job
+        # TEST BODY
+        >> upload_model
+        >> export_model
+        >> delete_model
+        >> list_models
+        # TEST TEARDOWN
+        >> delete_custom_training_job
+        >> delete_tabular_dataset
+        >> delete_bucket
+        >> clear_folder
+    )
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/California-housing.zip b/tests/system/providers/google/cloud/vertex_ai/resources/California-housing.zip
new file mode 100644
index 0000000000..1ac6fc83a4
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/California-housing.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/__init__.py b/tests/system/providers/google/cloud/vertex_ai/resources/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/resources/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/all-datasets.zip b/tests/system/providers/google/cloud/vertex_ai/resources/all-datasets.zip
new file mode 100644
index 0000000000..00c7a27029
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/all-datasets.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/forecast-dataset.csv.zip b/tests/system/providers/google/cloud/vertex_ai/resources/forecast-dataset.csv.zip
new file mode 100644
index 0000000000..fbb59761a9
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/forecast-dataset.csv.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/image-dataset.csv.zip b/tests/system/providers/google/cloud/vertex_ai/resources/image-dataset.csv.zip
new file mode 100644
index 0000000000..5dc45deeb7
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/image-dataset.csv.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/tabular-dataset.csv.zip b/tests/system/providers/google/cloud/vertex_ai/resources/tabular-dataset.csv.zip
new file mode 100644
index 0000000000..33f50119ab
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/tabular-dataset.csv.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/text-dataset.csv.zip b/tests/system/providers/google/cloud/vertex_ai/resources/text-dataset.csv.zip
new file mode 100644
index 0000000000..2a38cbb6e7
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/text-dataset.csv.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/video-dataset.csv.zip b/tests/system/providers/google/cloud/vertex_ai/resources/video-dataset.csv.zip
new file mode 100644
index 0000000000..658ce398d9
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/video-dataset.csv.zip differ