You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/31 01:44:33 UTC
[airflow] branch main updated: Add system tests for Vertex AI operators in new approach (#27053)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7a7c5f8fc4 Add system tests for Vertex AI operators in new approach (#27053)
7a7c5f8fc4 is described below
commit 7a7c5f8fc4284adfedcb0667ec7c935b913660cf
Author: VladaZakharova <80...@users.noreply.github.com>
AuthorDate: Mon Oct 31 02:44:15 2022 +0100
Add system tests for Vertex AI operators in new approach (#27053)
---
.../google/cloud/hooks/vertex_ai/auto_ml.py | 75 +++--
.../google/cloud/hooks/vertex_ai/custom_job.py | 30 +-
.../google/cloud/operators/vertex_ai/auto_ml.py | 108 +++++---
.../operators/vertex_ai/batch_prediction_job.py | 2 +-
.../google/cloud/operators/vertex_ai/custom_job.py | 37 ++-
.../cloud/operators/vertex_ai/endpoint_service.py | 2 +-
.../cloud/operators/vertex_ai/model_service.py | 2 +-
.../operators/cloud/vertex_ai.rst | 70 ++---
.../google/cloud/operators/test_vertex_ai.py | 23 +-
.../providers/google/cloud/vertex_ai/__init__.py | 16 ++
...ample_vertex_ai_auto_ml_forecasting_training.py | 195 +++++++++++++
.../example_vertex_ai_auto_ml_image_training.py | 185 +++++++++++++
.../example_vertex_ai_auto_ml_list_training.py | 59 ++++
.../example_vertex_ai_auto_ml_tabular_training.py | 189 +++++++++++++
.../example_vertex_ai_auto_ml_text_training.py | 183 +++++++++++++
.../example_vertex_ai_auto_ml_video_training.py | 179 ++++++++++++
.../example_vertex_ai_batch_prediction_job.py | 234 ++++++++++++++++
.../example_vertex_ai_custom_container.py | 183 +++++++++++++
.../vertex_ai/example_vertex_ai_custom_job.py | 175 ++++++++++++
.../example_vertex_ai_custom_job_python_package.py | 191 +++++++++++++
.../cloud/vertex_ai/example_vertex_ai_dataset.py | 305 +++++++++++++++++++++
.../cloud/vertex_ai/example_vertex_ai_endpoint.py | 256 +++++++++++++++++
.../example_vertex_ai_hyperparameter_tuning_job.py | 158 +++++++++++
.../example_vertex_ai_list_custom_jobs.py | 57 ++++
.../vertex_ai/example_vertex_ai_model_service.py | 246 +++++++++++++++++
.../vertex_ai/resources/California-housing.zip | Bin 0 -> 366644 bytes
.../google/cloud/vertex_ai/resources/__init__.py | 16 ++
.../cloud/vertex_ai/resources/all-datasets.zip | Bin 0 -> 84305 bytes
.../vertex_ai/resources/forecast-dataset.csv.zip | Bin 0 -> 11538 bytes
.../vertex_ai/resources/image-dataset.csv.zip | Bin 0 -> 16626 bytes
.../vertex_ai/resources/tabular-dataset.csv.zip | Bin 0 -> 9403 bytes
.../cloud/vertex_ai/resources/text-dataset.csv.zip | Bin 0 -> 41210 bytes
.../vertex_ai/resources/video-dataset.csv.zip | Bin 0 -> 5616 bytes
33 files changed, 3053 insertions(+), 123 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py b/airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py
index 3b76229dbd..1a31e68cc8 100644
--- a/airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py
+++ b/airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py
@@ -259,6 +259,11 @@ class AutoMLHook(GoogleBaseHook):
"""Returns unique id of the Model."""
return obj["name"].rpartition("/")[-1]
+ @staticmethod
+ def extract_training_id(resource_name: str) -> str:
+ """Returns unique id of the Training pipeline."""
+ return resource_name.rpartition("/")[-1]
+
def wait_for_operation(self, operation: Operation, timeout: float | None = None):
"""Waits for long-lasting operation to complete."""
try:
@@ -303,7 +308,7 @@ class AutoMLHook(GoogleBaseHook):
export_evaluated_data_items_bigquery_destination_uri: str | None = None,
export_evaluated_data_items_override_destination: bool = False,
sync: bool = True,
- ) -> models.Model:
+ ) -> tuple[models.Model | None, str]:
"""
Create an AutoML Tabular Training Job.
@@ -488,9 +493,15 @@ class AutoMLHook(GoogleBaseHook):
export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination,
sync=sync,
)
- model.wait()
-
- return model
+ training_id = self.extract_training_id(self._job.resource_name)
+ if model:
+ model.wait()
+ else:
+ self.log.warning(
+ "Training did not produce a Managed Model returning None. Training Pipeline is not "
+ "configured to upload a Model."
+ )
+ return model, training_id
@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_forecasting_training_job(
@@ -529,7 +540,7 @@ class AutoMLHook(GoogleBaseHook):
model_display_name: str | None = None,
model_labels: dict[str, str] | None = None,
sync: bool = True,
- ) -> models.Model:
+ ) -> tuple[models.Model | None, str]:
"""
Create an AutoML Forecasting Training Job.
@@ -715,9 +726,15 @@ class AutoMLHook(GoogleBaseHook):
model_labels=model_labels,
sync=sync,
)
- model.wait()
-
- return model
+ training_id = self.extract_training_id(self._job.resource_name)
+ if model:
+ model.wait()
+ else:
+ self.log.warning(
+ "Training did not produce a Managed Model returning None. Training Pipeline is not "
+ "configured to upload a Model."
+ )
+ return model, training_id
@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_image_training_job(
@@ -744,7 +761,7 @@ class AutoMLHook(GoogleBaseHook):
model_labels: dict[str, str] | None = None,
disable_early_stopping: bool = False,
sync: bool = True,
- ) -> models.Model:
+ ) -> tuple[models.Model | None, str]:
"""
Create an AutoML Image Training Job.
@@ -885,9 +902,15 @@ class AutoMLHook(GoogleBaseHook):
disable_early_stopping=disable_early_stopping,
sync=sync,
)
- model.wait()
-
- return model
+ training_id = self.extract_training_id(self._job.resource_name)
+ if model:
+ model.wait()
+ else:
+ self.log.warning(
+ "Training did not produce a Managed Model returning None. AutoML Image Training "
+ "Pipeline is not configured to upload a Model."
+ )
+ return model, training_id
@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_text_training_job(
@@ -911,7 +934,7 @@ class AutoMLHook(GoogleBaseHook):
model_display_name: str | None = None,
model_labels: dict[str, str] | None = None,
sync: bool = True,
- ) -> models.Model:
+ ) -> tuple[models.Model | None, str]:
"""
Create an AutoML Text Training Job.
@@ -1016,9 +1039,15 @@ class AutoMLHook(GoogleBaseHook):
model_labels=model_labels,
sync=sync,
)
- model.wait()
-
- return model
+ training_id = self.extract_training_id(self._job.resource_name)
+ if model:
+ model.wait()
+ else:
+ self.log.warning(
+ "Training did not produce a Managed Model returning None. AutoML Text Training "
+ "Pipeline is not configured to upload a Model."
+ )
+ return model, training_id
@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_video_training_job(
@@ -1039,7 +1068,7 @@ class AutoMLHook(GoogleBaseHook):
model_display_name: str | None = None,
model_labels: dict[str, str] | None = None,
sync: bool = True,
- ) -> models.Model:
+ ) -> tuple[models.Model | None, str]:
"""
Create an AutoML Video Training Job.
@@ -1141,9 +1170,15 @@ class AutoMLHook(GoogleBaseHook):
model_labels=model_labels,
sync=sync,
)
- model.wait()
-
- return model
+ training_id = self.extract_training_id(self._job.resource_name)
+ if model:
+ model.wait()
+ else:
+ self.log.warning(
+ "Training did not produce a Managed Model returning None. AutoML Video Training "
+ "Pipeline is not configured to upload a Model."
+ )
+ return model, training_id
@GoogleBaseHook.fallback_to_default_project_id
def delete_training_pipeline(
diff --git a/airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py b/airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py
index 9ded2770a2..77fb7d2cc6 100644
--- a/airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py
+++ b/airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py
@@ -247,6 +247,11 @@ class CustomJobHook(GoogleBaseHook):
"""Returns unique id of the Training pipeline."""
return resource_name.rpartition("/")[-1]
+ @staticmethod
+ def extract_custom_job_id(custom_job_name: str) -> str:
+ """Returns unique id of the Custom Job pipeline."""
+ return custom_job_name.rpartition("/")[-1]
+
def wait_for_operation(self, operation: Operation, timeout: float | None = None):
"""Waits for long-lasting operation to complete."""
try:
@@ -292,7 +297,7 @@ class CustomJobHook(GoogleBaseHook):
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
sync=True,
- ) -> tuple[models.Model | None, str]:
+ ) -> tuple[models.Model | None, str, str]:
"""Run Job for training pipeline"""
model = job.run(
dataset=dataset,
@@ -323,6 +328,9 @@ class CustomJobHook(GoogleBaseHook):
sync=sync,
)
training_id = self.extract_training_id(job.resource_name)
+ custom_job_id = self.extract_custom_job_id(
+ job.gca_resource.training_task_metadata.get("backingCustomJob")
+ )
if model:
model.wait()
else:
@@ -332,7 +340,7 @@ class CustomJobHook(GoogleBaseHook):
"model_serving_container_image_uri and model_display_name passed in. "
"Ensure that your training script saves to model to os.environ['AIP_MODEL_DIR']."
)
- return model, training_id
+ return model, training_id, custom_job_id
@GoogleBaseHook.fallback_to_default_project_id
def cancel_pipeline_job(
@@ -613,7 +621,7 @@ class CustomJobHook(GoogleBaseHook):
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
sync=True,
- ) -> tuple[models.Model | None, str]:
+ ) -> tuple[models.Model | None, str, str]:
"""
Create Custom Container Training Job
@@ -885,7 +893,7 @@ class CustomJobHook(GoogleBaseHook):
if not self._job:
raise AirflowException("CustomJob was not created")
- model, training_id = self._run_job(
+ model, training_id, custom_job_id = self._run_job(
job=self._job,
dataset=dataset,
annotation_schema_uri=annotation_schema_uri,
@@ -915,7 +923,7 @@ class CustomJobHook(GoogleBaseHook):
sync=sync,
)
- return model, training_id
+ return model, training_id, custom_job_id
@GoogleBaseHook.fallback_to_default_project_id
def create_custom_python_package_training_job(
@@ -971,7 +979,7 @@ class CustomJobHook(GoogleBaseHook):
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
sync=True,
- ) -> tuple[models.Model | None, str]:
+ ) -> tuple[models.Model | None, str, str]:
"""
Create Custom Python Package Training Job
@@ -1243,7 +1251,7 @@ class CustomJobHook(GoogleBaseHook):
if not self._job:
raise AirflowException("CustomJob was not created")
- model, training_id = self._run_job(
+ model, training_id, custom_job_id = self._run_job(
job=self._job,
dataset=dataset,
annotation_schema_uri=annotation_schema_uri,
@@ -1273,7 +1281,7 @@ class CustomJobHook(GoogleBaseHook):
sync=sync,
)
- return model, training_id
+ return model, training_id, custom_job_id
@GoogleBaseHook.fallback_to_default_project_id
def create_custom_training_job(
@@ -1329,7 +1337,7 @@ class CustomJobHook(GoogleBaseHook):
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
sync=True,
- ) -> tuple[models.Model | None, str]:
+ ) -> tuple[models.Model | None, str, str]:
"""
Create Custom Training Job
@@ -1601,7 +1609,7 @@ class CustomJobHook(GoogleBaseHook):
if not self._job:
raise AirflowException("CustomJob was not created")
- model, training_id = self._run_job(
+ model, training_id, custom_job_id = self._run_job(
job=self._job,
dataset=dataset,
annotation_schema_uri=annotation_schema_uri,
@@ -1631,7 +1639,7 @@ class CustomJobHook(GoogleBaseHook):
sync=sync,
)
- return model, training_id
+ return model, training_id, custom_job_id
@GoogleBaseHook.fallback_to_default_project_id
def delete_pipeline_job(
diff --git a/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py b/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
index 8d1c13a9be..6851d52633 100644
--- a/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
+++ b/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
@@ -29,7 +29,11 @@ from google.cloud.aiplatform_v1.types.training_pipeline import TrainingPipeline
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.vertex_ai.auto_ml import AutoMLHook
-from airflow.providers.google.cloud.links.vertex_ai import VertexAIModelLink, VertexAITrainingPipelinesLink
+from airflow.providers.google.cloud.links.vertex_ai import (
+ VertexAIModelLink,
+ VertexAITrainingLink,
+ VertexAITrainingPipelinesLink,
+)
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -89,11 +93,12 @@ class AutoMLTrainingJobBaseOperator(BaseOperator):
class CreateAutoMLForecastingTrainingJobOperator(AutoMLTrainingJobBaseOperator):
"""Create AutoML Forecasting Training job"""
- template_fields = [
+ template_fields = (
+ "dataset_id",
"region",
"impersonation_chain",
- ]
- operator_extra_links = (VertexAIModelLink(),)
+ )
+ operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
def __init__(
self,
@@ -158,7 +163,7 @@ class CreateAutoMLForecastingTrainingJobOperator(AutoMLTrainingJobBaseOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- model = self.hook.create_auto_ml_forecasting_training_job(
+ model, training_id = self.hook.create_auto_ml_forecasting_training_job(
project_id=self.project_id,
region=self.region,
display_name=self.display_name,
@@ -199,20 +204,26 @@ class CreateAutoMLForecastingTrainingJobOperator(AutoMLTrainingJobBaseOperator):
sync=self.sync,
)
- result = Model.to_dict(model)
- model_id = self.hook.extract_model_id(result)
- VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+ if model:
+ result = Model.to_dict(model)
+ model_id = self.hook.extract_model_id(result)
+ VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+ else:
+ result = model # type: ignore
+ self.xcom_push(context, key="training_id", value=training_id)
+ VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
return result
class CreateAutoMLImageTrainingJobOperator(AutoMLTrainingJobBaseOperator):
"""Create Auto ML Image Training job"""
- template_fields = [
+ template_fields = (
+ "dataset_id",
"region",
"impersonation_chain",
- ]
- operator_extra_links = (VertexAIModelLink(),)
+ )
+ operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
def __init__(
self,
@@ -249,7 +260,7 @@ class CreateAutoMLImageTrainingJobOperator(AutoMLTrainingJobBaseOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- model = self.hook.create_auto_ml_image_training_job(
+ model, training_id = self.hook.create_auto_ml_image_training_job(
project_id=self.project_id,
region=self.region,
display_name=self.display_name,
@@ -274,20 +285,26 @@ class CreateAutoMLImageTrainingJobOperator(AutoMLTrainingJobBaseOperator):
sync=self.sync,
)
- result = Model.to_dict(model)
- model_id = self.hook.extract_model_id(result)
- VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+ if model:
+ result = Model.to_dict(model)
+ model_id = self.hook.extract_model_id(result)
+ VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+ else:
+ result = model # type: ignore
+ self.xcom_push(context, key="training_id", value=training_id)
+ VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
return result
class CreateAutoMLTabularTrainingJobOperator(AutoMLTrainingJobBaseOperator):
"""Create Auto ML Tabular Training job"""
- template_fields = [
+ template_fields = (
+ "dataset_id",
"region",
"impersonation_chain",
- ]
- operator_extra_links = (VertexAIModelLink(),)
+ )
+ operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
def __init__(
self,
@@ -340,7 +357,7 @@ class CreateAutoMLTabularTrainingJobOperator(AutoMLTrainingJobBaseOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- model = self.hook.create_auto_ml_tabular_training_job(
+ model, training_id = self.hook.create_auto_ml_tabular_training_job(
project_id=self.project_id,
region=self.region,
display_name=self.display_name,
@@ -375,9 +392,14 @@ class CreateAutoMLTabularTrainingJobOperator(AutoMLTrainingJobBaseOperator):
sync=self.sync,
)
- result = Model.to_dict(model)
- model_id = self.hook.extract_model_id(result)
- VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+ if model:
+ result = Model.to_dict(model)
+ model_id = self.hook.extract_model_id(result)
+ VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+ else:
+ result = model # type: ignore
+ self.xcom_push(context, key="training_id", value=training_id)
+ VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
return result
@@ -385,10 +407,11 @@ class CreateAutoMLTextTrainingJobOperator(AutoMLTrainingJobBaseOperator):
"""Create Auto ML Text Training job"""
template_fields = [
+ "dataset_id",
"region",
"impersonation_chain",
]
- operator_extra_links = (VertexAIModelLink(),)
+ operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
def __init__(
self,
@@ -419,7 +442,7 @@ class CreateAutoMLTextTrainingJobOperator(AutoMLTrainingJobBaseOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- model = self.hook.create_auto_ml_text_training_job(
+ model, training_id = self.hook.create_auto_ml_text_training_job(
project_id=self.project_id,
region=self.region,
display_name=self.display_name,
@@ -441,20 +464,26 @@ class CreateAutoMLTextTrainingJobOperator(AutoMLTrainingJobBaseOperator):
sync=self.sync,
)
- result = Model.to_dict(model)
- model_id = self.hook.extract_model_id(result)
- VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+ if model:
+ result = Model.to_dict(model)
+ model_id = self.hook.extract_model_id(result)
+ VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+ else:
+ result = model # type: ignore
+ self.xcom_push(context, key="training_id", value=training_id)
+ VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
return result
class CreateAutoMLVideoTrainingJobOperator(AutoMLTrainingJobBaseOperator):
"""Create Auto ML Video Training job"""
- template_fields = [
+ template_fields = (
+ "dataset_id",
"region",
"impersonation_chain",
- ]
- operator_extra_links = (VertexAIModelLink(),)
+ )
+ operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
def __init__(
self,
@@ -479,7 +508,7 @@ class CreateAutoMLVideoTrainingJobOperator(AutoMLTrainingJobBaseOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- model = self.hook.create_auto_ml_video_training_job(
+ model, training_id = self.hook.create_auto_ml_video_training_job(
project_id=self.project_id,
region=self.region,
display_name=self.display_name,
@@ -498,9 +527,14 @@ class CreateAutoMLVideoTrainingJobOperator(AutoMLTrainingJobBaseOperator):
sync=self.sync,
)
- result = Model.to_dict(model)
- model_id = self.hook.extract_model_id(result)
- VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+ if model:
+ result = Model.to_dict(model)
+ model_id = self.hook.extract_model_id(result)
+ VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
+ else:
+ result = model # type: ignore
+ self.xcom_push(context, key="training_id", value=training_id)
+ VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
return result
@@ -509,7 +543,7 @@ class DeleteAutoMLTrainingJobOperator(BaseOperator):
AutoMLTextTrainingJob, or AutoMLVideoTrainingJob.
"""
- template_fields = ("region", "project_id", "impersonation_chain")
+ template_fields = ("training_pipeline", "region", "project_id", "impersonation_chain")
def __init__(
self,
@@ -563,11 +597,11 @@ class ListAutoMLTrainingJobOperator(BaseOperator):
AutoMLTextTrainingJob, or AutoMLVideoTrainingJob in a Location.
"""
- template_fields = [
+ template_fields = (
"region",
"project_id",
"impersonation_chain",
- ]
+ )
operator_extra_links = [
VertexAITrainingPipelinesLink(),
]
diff --git a/airflow/providers/google/cloud/operators/vertex_ai/batch_prediction_job.py b/airflow/providers/google/cloud/operators/vertex_ai/batch_prediction_job.py
index 3579fcdd4e..448e46611e 100644
--- a/airflow/providers/google/cloud/operators/vertex_ai/batch_prediction_job.py
+++ b/airflow/providers/google/cloud/operators/vertex_ai/batch_prediction_job.py
@@ -156,7 +156,7 @@ class CreateBatchPredictionJobOperator(BaseOperator):
account from the list granting this role to the originating account (templated).
"""
- template_fields = ("region", "project_id", "impersonation_chain")
+ template_fields = ("region", "project_id", "model_name", "impersonation_chain")
operator_extra_links = (VertexAIBatchPredictionJobLink(),)
def __init__(
diff --git a/airflow/providers/google/cloud/operators/vertex_ai/custom_job.py b/airflow/providers/google/cloud/operators/vertex_ai/custom_job.py
index 0835500a28..b10a0715fc 100644
--- a/airflow/providers/google/cloud/operators/vertex_ai/custom_job.py
+++ b/airflow/providers/google/cloud/operators/vertex_ai/custom_job.py
@@ -119,7 +119,7 @@ class CustomTrainingJobBaseOperator(BaseOperator):
self.staging_bucket = staging_bucket
# END Custom
# START Run param
- self.dataset = Dataset(name=dataset_id) if dataset_id else None
+ self.dataset_id = dataset_id
self.annotation_schema_uri = annotation_schema_uri
self.model_display_name = model_display_name
self.model_labels = model_labels
@@ -410,11 +410,12 @@ class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
account from the list granting this role to the originating account (templated).
"""
- template_fields = [
+ template_fields = (
"region",
"command",
+ "dataset_id",
"impersonation_chain",
- ]
+ )
operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
def __init__(
@@ -432,7 +433,7 @@ class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- model, training_id = self.hook.create_custom_container_training_job(
+ model, training_id, custom_job_id = self.hook.create_custom_container_training_job(
project_id=self.project_id,
region=self.region,
display_name=self.display_name,
@@ -454,7 +455,7 @@ class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
model_encryption_spec_key_name=self.model_encryption_spec_key_name,
staging_bucket=self.staging_bucket,
# RUN
- dataset=self.dataset,
+ dataset=Dataset(name=self.dataset_id) if self.dataset_id else None,
annotation_schema_uri=self.annotation_schema_uri,
model_display_name=self.model_display_name,
model_labels=self.model_labels,
@@ -488,6 +489,8 @@ class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
else:
result = model # type: ignore
+ self.xcom_push(context, key="training_id", value=training_id)
+ self.xcom_push(context, key="custom_job_id", value=custom_job_id)
VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
return result
@@ -759,10 +762,11 @@ class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator
account from the list granting this role to the originating account (templated).
"""
- template_fields = [
+ template_fields = (
"region",
+ "dataset_id",
"impersonation_chain",
- ]
+ )
operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
def __init__(
@@ -782,7 +786,7 @@ class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- model, training_id = self.hook.create_custom_python_package_training_job(
+ model, training_id, custom_job_id = self.hook.create_custom_python_package_training_job(
project_id=self.project_id,
region=self.region,
display_name=self.display_name,
@@ -805,7 +809,7 @@ class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator
model_encryption_spec_key_name=self.model_encryption_spec_key_name,
staging_bucket=self.staging_bucket,
# RUN
- dataset=self.dataset,
+ dataset=Dataset(name=self.dataset_id) if self.dataset_id else None,
annotation_schema_uri=self.annotation_schema_uri,
model_display_name=self.model_display_name,
model_labels=self.model_labels,
@@ -839,6 +843,8 @@ class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator
VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
else:
result = model # type: ignore
+ self.xcom_push(context, key="training_id", value=training_id)
+ self.xcom_push(context, key="custom_job_id", value=custom_job_id)
VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
return result
@@ -1110,12 +1116,13 @@ class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
account from the list granting this role to the originating account (templated).
"""
- template_fields = [
+ template_fields = (
"region",
"script_path",
"requirements",
+ "dataset_id",
"impersonation_chain",
- ]
+ )
operator_extra_links = (VertexAIModelLink(), VertexAITrainingLink())
def __init__(
@@ -1135,7 +1142,7 @@ class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- model, training_id = self.hook.create_custom_training_job(
+ model, training_id, custom_job_id = self.hook.create_custom_training_job(
project_id=self.project_id,
region=self.region,
display_name=self.display_name,
@@ -1158,7 +1165,7 @@ class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
model_encryption_spec_key_name=self.model_encryption_spec_key_name,
staging_bucket=self.staging_bucket,
# RUN
- dataset=self.dataset,
+ dataset=Dataset(name=self.dataset_id) if self.dataset_id else None,
annotation_schema_uri=self.annotation_schema_uri,
model_display_name=self.model_display_name,
model_labels=self.model_labels,
@@ -1192,6 +1199,8 @@ class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
VertexAIModelLink.persist(context=context, task_instance=self, model_id=model_id)
else:
result = model # type: ignore
+ self.xcom_push(context, key="training_id", value=training_id)
+ self.xcom_push(context, key="custom_job_id", value=custom_job_id)
VertexAITrainingLink.persist(context=context, task_instance=self, training_id=training_id)
return result
@@ -1228,7 +1237,7 @@ class DeleteCustomTrainingJobOperator(BaseOperator):
account from the list granting this role to the originating account (templated).
"""
- template_fields = ("region", "project_id", "impersonation_chain")
+ template_fields = ("training_pipeline", "custom_job", "region", "project_id", "impersonation_chain")
def __init__(
self,
diff --git a/airflow/providers/google/cloud/operators/vertex_ai/endpoint_service.py b/airflow/providers/google/cloud/operators/vertex_ai/endpoint_service.py
index 72bd47efaa..cfe67cf7aa 100644
--- a/airflow/providers/google/cloud/operators/vertex_ai/endpoint_service.py
+++ b/airflow/providers/google/cloud/operators/vertex_ai/endpoint_service.py
@@ -246,7 +246,7 @@ class DeployModelOperator(BaseOperator):
account from the list granting this role to the originating account (templated).
"""
- template_fields = ("region", "endpoint_id", "project_id", "impersonation_chain")
+ template_fields = ("region", "endpoint_id", "project_id", "deployed_model", "impersonation_chain")
operator_extra_links = (VertexAIModelLink(),)
def __init__(
diff --git a/airflow/providers/google/cloud/operators/vertex_ai/model_service.py b/airflow/providers/google/cloud/operators/vertex_ai/model_service.py
index c2de199032..3b8c1e972f 100644
--- a/airflow/providers/google/cloud/operators/vertex_ai/model_service.py
+++ b/airflow/providers/google/cloud/operators/vertex_ai/model_service.py
@@ -320,7 +320,7 @@ class UploadModelOperator(BaseOperator):
account from the list granting this role to the originating account (templated).
"""
- template_fields = ("region", "project_id", "impersonation_chain")
+ template_fields = ("region", "project_id", "model", "impersonation_chain")
operator_extra_links = (VertexAIModelLink(),)
def __init__(
diff --git a/docs/apache-airflow-providers-google/operators/cloud/vertex_ai.rst b/docs/apache-airflow-providers-google/operators/cloud/vertex_ai.rst
index f10863c5cd..0ab36f0ab9 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/vertex_ai.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/vertex_ai.rst
@@ -33,7 +33,7 @@ To create a Google VertexAI dataset you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.CreateDatasetOperator`.
The operator returns dataset id in :ref:`XCom <concepts:xcom>` under ``dataset_id`` key.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_dataset_operator]
@@ -42,7 +42,7 @@ The operator returns dataset id in :ref:`XCom <concepts:xcom>` under ``dataset_i
After creating a dataset you can use it to import some data using
:class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.ImportDataOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_import_data_operator]
@@ -51,7 +51,7 @@ After creating a dataset you can use it to import some data using
To export dataset you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.ExportDataOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_export_data_operator]
@@ -60,7 +60,7 @@ To export dataset you can use
To delete dataset you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.DeleteDatasetOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_delete_dataset_operator]
@@ -69,7 +69,7 @@ To delete dataset you can use
To get dataset you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.GetDatasetOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_get_dataset_operator]
@@ -78,7 +78,7 @@ To get dataset you can use
To get a dataset list you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.ListDatasetsOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_list_dataset_operator]
@@ -87,7 +87,7 @@ To get a dataset list you can use
To update dataset you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.UpdateDatasetOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_update_dataset_operator]
@@ -115,7 +115,7 @@ create image you can find by this link: https://cloud.google.com/vertex-ai/docs/
After that you should put link to the image in ``container_uri`` parameter. Also you can type executing command
for container which will be created from this image in ``command`` parameter.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
@@ -129,7 +129,7 @@ create you can find by this link: https://cloud.google.com/vertex-ai/docs/traini
Next you should put link to the package in ``python_package_gcs_uri`` parameter, also ``python_module_name``
parameter should has the name of script which will run your training task.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
@@ -140,7 +140,7 @@ How to run Training Job
For this Job you should put path to your local training script inside ``script_path`` parameter.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_custom_training_job_operator]
@@ -149,7 +149,7 @@ For this Job you should put path to your local training script inside ``script_p
You can get a list of Training Jobs using
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.ListCustomTrainingJobOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_list_custom_training_job_operator]
@@ -158,7 +158,7 @@ You can get a list of Training Jobs using
If you wish to delete a Custom Training Job you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.DeleteCustomTrainingJobOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_delete_custom_training_job_operator]
@@ -182,7 +182,7 @@ How to run AutoML Forecasting Training Job
Before start running this Job you must prepare and create ``TimeSeries`` dataset. After that you should
put dataset id to ``dataset_id`` parameter in operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_auto_ml_forecasting_training_job_operator]
@@ -194,7 +194,7 @@ How to run AutoML Image Training Job
Before start running this Job you must prepare and create ``Image`` dataset. After that you should
put dataset id to ``dataset_id`` parameter in operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_auto_ml_image_training_job_operator]
@@ -206,7 +206,7 @@ How to run AutoML Tabular Training Job
Before start running this Job you must prepare and create ``Tabular`` dataset. After that you should
put dataset id to ``dataset_id`` parameter in operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
@@ -218,7 +218,7 @@ How to run AutoML Text Training Job
Before start running this Job you must prepare and create ``Text`` dataset. After that you should
put dataset id to ``dataset_id`` parameter in operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_auto_ml_text_training_job_operator]
@@ -230,7 +230,7 @@ How to run AutoML Video Training Job
Before start running this Job you must prepare and create ``Video`` dataset. After that you should
put dataset id to ``dataset_id`` parameter in operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
@@ -239,7 +239,7 @@ put dataset id to ``dataset_id`` parameter in operator.
You can get a list of AutoML Training Jobs using
:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.ListAutoMLTrainingJobOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_list_auto_ml_training_job_operator]
@@ -248,7 +248,7 @@ You can get a list of AutoML Training Jobs using
If you wish to delete a Auto ML Training Job you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.DeleteAutoMLTrainingJobOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_delete_auto_ml_training_job_operator]
@@ -261,7 +261,7 @@ To create a Google VertexAI Batch Prediction Job you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.CreateBatchPredictionJobOperator`.
The operator returns batch prediction job id in :ref:`XCom <concepts:xcom>` under ``batch_prediction_job_id`` key.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_batch_prediction_job_operator]
@@ -270,7 +270,7 @@ The operator returns batch prediction job id in :ref:`XCom <concepts:xcom>` unde
To delete batch prediction job you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.DeleteBatchPredictionJobOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_delete_batch_prediction_job_operator]
@@ -279,7 +279,7 @@ To delete batch prediction job you can use
To get a batch prediction job list you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.ListBatchPredictionJobsOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_list_batch_prediction_job_operator]
@@ -292,7 +292,7 @@ To create a Google VertexAI endpoint you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.CreateEndpointOperator`.
The operator returns endpoint id in :ref:`XCom <concepts:xcom>` under ``endpoint_id`` key.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_endpoint_operator]
@@ -301,7 +301,7 @@ The operator returns endpoint id in :ref:`XCom <concepts:xcom>` under ``endpoint
After creating an endpoint you can use it to deploy some model using
:class:`~airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.DeployModelOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_deploy_model_operator]
@@ -310,7 +310,7 @@ After creating an endpoint you can use it to deploy some model using
To un deploy model you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.UndeployModelOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_undeploy_model_operator]
@@ -319,7 +319,7 @@ To un deploy model you can use
To delete endpoint you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.DeleteEndpointOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_delete_endpoint_operator]
@@ -328,7 +328,7 @@ To delete endpoint you can use
To get an endpoint list you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.ListEndpointsOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_list_endpoints_operator]
@@ -341,7 +341,7 @@ To create a Google VertexAI hyperparameter tuning job you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job.CreateHyperparameterTuningJobOperator`.
The operator returns hyperparameter tuning job id in :ref:`XCom <concepts:xcom>` under ``hyperparameter_tuning_job_id`` key.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]
@@ -350,7 +350,7 @@ The operator returns hyperparameter tuning job id in :ref:`XCom <concepts:xcom>`
To delete hyperparameter tuning job you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job.DeleteHyperparameterTuningJobOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator]
@@ -359,7 +359,7 @@ To delete hyperparameter tuning job you can use
To get hyperparameter tuning job you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job.GetHyperparameterTuningJobOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator]
@@ -368,7 +368,7 @@ To get hyperparameter tuning job you can use
To get a hyperparameter tuning job list you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job.ListHyperparameterTuningJobOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator]
@@ -381,7 +381,7 @@ To upload a Google VertexAI model you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.model_service.UploadModelOperator`.
The operator returns model id in :ref:`XCom <concepts:xcom>` under ``model_id`` key.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_upload_model_operator]
@@ -390,7 +390,7 @@ The operator returns model id in :ref:`XCom <concepts:xcom>` under ``model_id``
To export model you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.model_service.ExportModelOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_export_model_operator]
@@ -399,7 +399,7 @@ To export model you can use
To delete model you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.model_service.DeleteModelOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_delete_model_operator]
@@ -408,7 +408,7 @@ To delete model you can use
To get a model list you can use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.model_service.ListModelsOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_vertex_ai.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_list_models_operator]
diff --git a/tests/providers/google/cloud/operators/test_vertex_ai.py b/tests/providers/google/cloud/operators/test_vertex_ai.py
index 73beee1144..2ebc069968 100644
--- a/tests/providers/google/cloud/operators/test_vertex_ai.py
+++ b/tests/providers/google/cloud/operators/test_vertex_ai.py
@@ -170,7 +170,11 @@ TEST_OUTPUT_CONFIG = {
class TestVertexAICreateCustomContainerTrainingJobOperator:
@mock.patch(VERTEX_AI_PATH.format("custom_job.CustomJobHook"))
def test_execute(self, mock_hook):
- mock_hook.return_value.create_custom_container_training_job.return_value = (None, "training_id")
+ mock_hook.return_value.create_custom_container_training_job.return_value = (
+ None,
+ "training_id",
+ "custom_job_id",
+ )
op = CreateCustomContainerTrainingJobOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
@@ -251,7 +255,11 @@ class TestVertexAICreateCustomContainerTrainingJobOperator:
class TestVertexAICreateCustomPythonPackageTrainingJobOperator:
@mock.patch(VERTEX_AI_PATH.format("custom_job.CustomJobHook"))
def test_execute(self, mock_hook):
- mock_hook.return_value.create_custom_python_package_training_job.return_value = (None, "training_id")
+ mock_hook.return_value.create_custom_python_package_training_job.return_value = (
+ None,
+ "training_id",
+ "custom_job_id",
+ )
op = CreateCustomPythonPackageTrainingJobOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
@@ -334,7 +342,11 @@ class TestVertexAICreateCustomPythonPackageTrainingJobOperator:
class TestVertexAICreateCustomTrainingJobOperator:
@mock.patch(VERTEX_AI_PATH.format("custom_job.CustomJobHook"))
def test_execute(self, mock_hook):
- mock_hook.return_value.create_custom_training_job.return_value = (None, "training_id")
+ mock_hook.return_value.create_custom_training_job.return_value = (
+ None,
+ "training_id",
+ "custom_job_id",
+ )
op = CreateCustomTrainingJobOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
@@ -691,6 +703,7 @@ class TestVertexAICreateAutoMLForecastingTrainingJobOperator:
@mock.patch("google.cloud.aiplatform.datasets.TimeSeriesDataset")
@mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
def test_execute(self, mock_hook, mock_dataset):
+ mock_hook.return_value.create_auto_ml_forecasting_training_job.return_value = (None, "training_id")
op = CreateAutoMLForecastingTrainingJobOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
@@ -757,6 +770,7 @@ class TestVertexAICreateAutoMLImageTrainingJobOperator:
@mock.patch("google.cloud.aiplatform.datasets.ImageDataset")
@mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
def test_execute(self, mock_hook, mock_dataset):
+ mock_hook.return_value.create_auto_ml_image_training_job.return_value = (None, "training_id")
op = CreateAutoMLImageTrainingJobOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
@@ -806,6 +820,7 @@ class TestVertexAICreateAutoMLTabularTrainingJobOperator:
@mock.patch("google.cloud.aiplatform.datasets.TabularDataset")
@mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
def test_execute(self, mock_hook, mock_dataset):
+ mock_hook.return_value.create_auto_ml_tabular_training_job.return_value = (None, "training_id")
op = CreateAutoMLTabularTrainingJobOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
@@ -860,6 +875,7 @@ class TestVertexAICreateAutoMLTextTrainingJobOperator:
@mock.patch("google.cloud.aiplatform.datasets.TextDataset")
@mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
def test_execute(self, mock_hook, mock_dataset):
+ mock_hook.return_value.create_auto_ml_text_training_job.return_value = (None, "training_id")
op = CreateAutoMLTextTrainingJobOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
@@ -906,6 +922,7 @@ class TestVertexAICreateAutoMLVideoTrainingJobOperator:
@mock.patch("google.cloud.aiplatform.datasets.VideoDataset")
@mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
def test_execute(self, mock_hook, mock_dataset):
+ mock_hook.return_value.create_auto_ml_video_training_job.return_value = (None, "training_id")
op = CreateAutoMLVideoTrainingJobOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
diff --git a/tests/system/providers/google/cloud/vertex_ai/__init__.py b/tests/system/providers/google/cloud/vertex_ai/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
new file mode 100644
index 0000000000..c90f9bfc87
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+ CreateAutoMLForecastingTrainingJobOperator,
+ DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+FORECASTING_DISPLAY_NAME = f"auto-ml-forecasting-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-forecasting-model-{ENV_ID}"
+
+FORECAST_GCS_BUCKET_NAME = f"bucket_forecast_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+FORECAST_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "forecast-dataset.csv.zip")
+FORECAST_GCS_OBJECT_NAME = "vertex-ai/forecast-dataset.csv"
+FORECAST_CSV_FILE_LOCAL_PATH = "/forecast/forecast-dataset.csv"
+
+FORECAST_DATASET = {
+ "display_name": f"forecast-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.time_series,
+ "metadata": ParseDict(
+ {
+ "input_config": {
+ "gcs_source": {"uri": [f"gs://{FORECAST_GCS_BUCKET_NAME}/vertex-ai/forecast-dataset.csv"]}
+ }
+ },
+ Value(),
+ ),
+}
+
+TEST_TIME_COLUMN = "date"
+TEST_TIME_SERIES_IDENTIFIER_COLUMN = "store_name"
+TEST_TARGET_COLUMN = "sale_dollars"
+
+COLUMN_SPECS = {
+ TEST_TIME_COLUMN: "timestamp",
+ TEST_TARGET_COLUMN: "numeric",
+ "city": "categorical",
+ "zip_code": "categorical",
+ "county": "categorical",
+}
+
+
+with models.DAG(
+ f"{DAG_ID}_forecasting_training_job",
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "auto_ml"],
+) as dag:
+
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=FORECAST_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"unzip {FORECAST_ZIP_CSV_FILE_LOCAL_PATH} -d /forecast/",
+ )
+
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=FORECAST_CSV_FILE_LOCAL_PATH,
+ dst=FORECAST_GCS_OBJECT_NAME,
+ bucket=FORECAST_GCS_BUCKET_NAME,
+ )
+
+ create_forecast_dataset = CreateDatasetOperator(
+ task_id="forecast_dataset",
+ dataset=FORECAST_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ forecast_dataset_id = create_forecast_dataset.output["dataset_id"]
+
+ # [START how_to_cloud_vertex_ai_create_auto_ml_forecasting_training_job_operator]
+ create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
+ task_id="auto_ml_forecasting_task",
+ display_name=FORECASTING_DISPLAY_NAME,
+ optimization_objective="minimize-rmse",
+ column_specs=COLUMN_SPECS,
+ # run params
+ dataset_id=forecast_dataset_id,
+ target_column=TEST_TARGET_COLUMN,
+ time_column=TEST_TIME_COLUMN,
+ time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
+ available_at_forecast_columns=[TEST_TIME_COLUMN],
+ unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
+ time_series_attribute_columns=["city", "zip_code", "county"],
+ forecast_horizon=30,
+ context_window=30,
+ data_granularity_unit="day",
+ data_granularity_count=1,
+ weight_column=None,
+ budget_milli_node_hours=1000,
+ model_display_name=MODEL_DISPLAY_NAME,
+ predefined_split_column_name=None,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_auto_ml_forecasting_training_job_operator]
+
+ # [START how_to_cloud_vertex_ai_delete_auto_ml_training_job_operator]
+ delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
+ task_id="delete_auto_ml_forecasting_training_job",
+ training_pipeline_id=create_auto_ml_forecasting_training_job.output["training_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_delete_auto_ml_training_job_operator]
+
+ delete_forecast_dataset = DeleteDatasetOperator(
+ task_id="delete_forecast_dataset",
+ dataset_id=forecast_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=FORECAST_GCS_BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /forecast/*",
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> unzip_file
+ >> upload_files
+ >> create_forecast_dataset
+ # TEST BODY
+ >> create_auto_ml_forecasting_training_job
+ # TEST TEARDOWN
+ >> delete_auto_ml_forecasting_training_job
+ >> delete_forecast_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py
new file mode 100644
index 0000000000..fbc193bb1a
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+ CreateAutoMLImageTrainingJobOperator,
+ DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+ ImportDataOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+IMAGE_DISPLAY_NAME = f"auto-ml-image-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-image-model-{ENV_ID}"
+
+IMAGE_GCS_BUCKET_NAME = f"bucket_image_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+IMAGE_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "image-dataset.csv.zip")
+IMAGE_GCS_OBJECT_NAME = "vertex-ai/image-dataset.csv"
+IMAGE_CSV_FILE_LOCAL_PATH = "/image/image-dataset.csv"
+
+IMAGE_DATASET = {
+ "display_name": f"image-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.image,
+ "metadata": Value(string_value="image-dataset"),
+}
+IMAGE_DATA_CONFIG = [
+ {
+ "import_schema_uri": schema.dataset.ioformat.image.single_label_classification,
+ "gcs_source": {"uris": [f"gs://{IMAGE_GCS_BUCKET_NAME}/vertex-ai/image-dataset.csv"]},
+ },
+]
+
+
+with models.DAG(
+ f"{DAG_ID}_image_training_job",
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "auto_ml"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=IMAGE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"unzip {IMAGE_ZIP_CSV_FILE_LOCAL_PATH} -d /image/",
+ )
+
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=IMAGE_CSV_FILE_LOCAL_PATH,
+ dst=IMAGE_GCS_OBJECT_NAME,
+ bucket=IMAGE_GCS_BUCKET_NAME,
+ )
+
+ create_image_dataset = CreateDatasetOperator(
+ task_id="image_dataset",
+ dataset=IMAGE_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ image_dataset_id = create_image_dataset.output["dataset_id"]
+
+ import_image_dataset = ImportDataOperator(
+ task_id="import_image_data",
+ dataset_id=image_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ import_configs=IMAGE_DATA_CONFIG,
+ )
+
+ # [START how_to_cloud_vertex_ai_create_auto_ml_image_training_job_operator]
+ create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
+ task_id="auto_ml_image_task",
+ display_name=IMAGE_DISPLAY_NAME,
+ dataset_id=image_dataset_id,
+ prediction_type="classification",
+ multi_label=False,
+ model_type="CLOUD",
+ training_fraction_split=0.6,
+ validation_fraction_split=0.2,
+ test_fraction_split=0.2,
+ budget_milli_node_hours=8000,
+ model_display_name=MODEL_DISPLAY_NAME,
+ disable_early_stopping=False,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_auto_ml_image_training_job_operator]
+
+ delete_auto_ml_image_training_job = DeleteAutoMLTrainingJobOperator(
+ task_id="delete_auto_ml_training_job",
+ training_pipeline_id=create_auto_ml_image_training_job.output["training_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_image_dataset = DeleteDatasetOperator(
+ task_id="delete_image_dataset",
+ dataset_id=image_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=IMAGE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /image/*",
+ )
+
+ (
+ # TEST SETUP
+ [
+ create_bucket,
+ create_image_dataset,
+ ]
+ >> unzip_file
+ >> upload_files
+ >> import_image_dataset
+ # TEST BODY
+ >> create_auto_ml_image_training_job
+ # TEST TEARDOWN
+ >> delete_auto_ml_image_training_job
+ >> delete_image_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py
new file mode 100644
index 0000000000..b8f5cdae68
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ListAutoMLTrainingJobOperator
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+
+
+with models.DAG(
+ f"{DAG_ID}_list_training_job",
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "auto_ml", "list_operation"],
+) as dag:
+
+ # [START how_to_cloud_vertex_ai_list_auto_ml_training_job_operator]
+ list_auto_ml_training_job = ListAutoMLTrainingJobOperator(
+ task_id="list_auto_ml_training_job",
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_list_auto_ml_training_job_operator]
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py
new file mode 100644
index 0000000000..af8d908b76
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py
@@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+ CreateAutoMLTabularTrainingJobOperator,
+ DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+TABULAR_DISPLAY_NAME = f"auto-ml-tabular-{ENV_ID}"
+MODEL_DISPLAY_NAME = "adopted-prediction-model"
+
+TABULAR_GCS_BUCKET_NAME = f"bucket_tabular_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+TABULAR_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "tabular-dataset.csv.zip")
+TABULAR_GCS_OBJECT_NAME = "vertex-ai/tabular-dataset.csv"
+TABULAR_CSV_FILE_LOCAL_PATH = "/tabular/tabular-dataset.csv"
+
+TABULAR_DATASET = {
+ "display_name": f"tabular-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.tabular,
+ "metadata": ParseDict(
+ {
+ "input_config": {
+ "gcs_source": {"uri": [f"gs://{TABULAR_GCS_BUCKET_NAME}/vertex-ai/tabular-dataset.csv"]}
+ }
+ },
+ Value(),
+ ),
+}
+
+COLUMN_TRANSFORMATIONS = [
+ {"categorical": {"column_name": "Type"}},
+ {"numeric": {"column_name": "Age"}},
+ {"categorical": {"column_name": "Breed1"}},
+ {"categorical": {"column_name": "Color1"}},
+ {"categorical": {"column_name": "Color2"}},
+ {"categorical": {"column_name": "MaturitySize"}},
+ {"categorical": {"column_name": "FurLength"}},
+ {"categorical": {"column_name": "Vaccinated"}},
+ {"categorical": {"column_name": "Sterilized"}},
+ {"categorical": {"column_name": "Health"}},
+ {"numeric": {"column_name": "Fee"}},
+ {"numeric": {"column_name": "PhotoAmt"}},
+]
+
+with models.DAG(
+ f"{DAG_ID}_tabular_training_job",
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "auto_ml"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=TABULAR_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"unzip {TABULAR_ZIP_CSV_FILE_LOCAL_PATH} -d /tabular/",
+ )
+
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=TABULAR_CSV_FILE_LOCAL_PATH,
+ dst=TABULAR_GCS_OBJECT_NAME,
+ bucket=TABULAR_GCS_BUCKET_NAME,
+ )
+
+ create_tabular_dataset = CreateDatasetOperator(
+ task_id="tabular_dataset",
+ dataset=TABULAR_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
+
+ # [START how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
+ create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
+ task_id="auto_ml_tabular_task",
+ display_name=TABULAR_DISPLAY_NAME,
+ optimization_prediction_type="classification",
+ column_transformations=COLUMN_TRANSFORMATIONS,
+ dataset_id=tabular_dataset_id,
+ target_column="Adopted",
+ training_fraction_split=0.8,
+ validation_fraction_split=0.1,
+ test_fraction_split=0.1,
+ model_display_name=MODEL_DISPLAY_NAME,
+ disable_early_stopping=False,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
+
+ delete_auto_ml_tabular_training_job = DeleteAutoMLTrainingJobOperator(
+ task_id="delete_auto_ml_training_job",
+ training_pipeline_id=create_auto_ml_tabular_training_job.output["training_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_tabular_dataset = DeleteDatasetOperator(
+ task_id="delete_tabular_dataset",
+ dataset_id=tabular_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=TABULAR_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /tabular/*",
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> unzip_file
+ >> upload_files
+ >> create_tabular_dataset
+ # TEST BODY
+ >> create_auto_ml_tabular_training_job
+ # TEST TEARDOWN
+ >> delete_auto_ml_tabular_training_job
+ >> delete_tabular_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py
new file mode 100644
index 0000000000..945e0f2964
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+ CreateAutoMLTextTrainingJobOperator,
+ DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+ ImportDataOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+TEXT_DISPLAY_NAME = f"auto-ml-text-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-text-model-{ENV_ID}"
+
+TEXT_GCS_BUCKET_NAME = f"bucket_text_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+TEXT_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "text-dataset.csv.zip")
+TEXT_GCS_OBJECT_NAME = "vertex-ai/text-dataset.csv"
+TEXT_CSV_FILE_LOCAL_PATH = "/text/text-dataset.csv"
+
+TEXT_DATASET = {
+ "display_name": f"text-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.text,
+ "metadata": Value(string_value="text-dataset"),
+}
+TEXT_DATA_CONFIG = [
+ {
+ "import_schema_uri": schema.dataset.ioformat.text.single_label_classification,
+ "gcs_source": {"uris": [f"gs://{TEXT_GCS_BUCKET_NAME}/vertex-ai/text-dataset.csv"]},
+ },
+]
+
+with models.DAG(
+ f"{DAG_ID}_text_training_job",
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "auto_ml"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=TEXT_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"unzip {TEXT_ZIP_CSV_FILE_LOCAL_PATH} -d /text/",
+ )
+
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=TEXT_CSV_FILE_LOCAL_PATH,
+ dst=TEXT_GCS_OBJECT_NAME,
+ bucket=TEXT_GCS_BUCKET_NAME,
+ )
+
+ create_text_dataset = CreateDatasetOperator(
+ task_id="text_dataset",
+ dataset=TEXT_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ text_dataset_id = create_text_dataset.output["dataset_id"]
+
+ import_text_dataset = ImportDataOperator(
+ task_id="import_text_data",
+ dataset_id=text_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ import_configs=TEXT_DATA_CONFIG,
+ )
+
+ # [START how_to_cloud_vertex_ai_create_auto_ml_text_training_job_operator]
+ create_auto_ml_text_training_job = CreateAutoMLTextTrainingJobOperator(
+ task_id="auto_ml_text_task",
+ display_name=TEXT_DISPLAY_NAME,
+ prediction_type="classification",
+ multi_label=False,
+ dataset_id=text_dataset_id,
+ model_display_name=MODEL_DISPLAY_NAME,
+ training_fraction_split=0.7,
+ validation_fraction_split=0.2,
+ test_fraction_split=0.1,
+ sync=True,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_auto_ml_text_training_job_operator]
+
+ delete_auto_ml_text_training_job = DeleteAutoMLTrainingJobOperator(
+ task_id="delete_auto_ml_text_training_job",
+ training_pipeline_id=create_auto_ml_text_training_job.output["training_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_text_dataset = DeleteDatasetOperator(
+ task_id="delete_text_dataset",
+ dataset_id=text_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=TEXT_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /text/*",
+ )
+
+ (
+ # TEST SETUP
+ [
+ create_bucket,
+ create_text_dataset,
+ ]
+ >> unzip_file
+ >> upload_files
+ >> import_text_dataset
+ # TEST BODY
+ >> create_auto_ml_text_training_job
+ # TEST TEARDOWN
+ >> delete_auto_ml_text_training_job
+ >> delete_text_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
new file mode 100644
index 0000000000..522a5c26f3
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
@@ -0,0 +1,179 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+ CreateAutoMLVideoTrainingJobOperator,
+ DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+ ImportDataOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_auto_ml_operations"
+REGION = "us-central1"
+VIDEO_DISPLAY_NAME = f"auto-ml-video-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-video-model-{ENV_ID}"
+
+VIDEO_GCS_BUCKET_NAME = f"bucket_custom_python_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+VIDEO_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "video-dataset.csv.zip")
+VIDEO_GCS_OBJECT_NAME = "vertex-ai/video-dataset.csv"
+VIDEO_CSV_FILE_LOCAL_PATH = "/video/video-dataset.csv"
+
+VIDEO_DATASET = {
+ "display_name": f"video-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.video,
+ "metadata": Value(string_value="video-dataset"),
+}
+VIDEO_DATA_CONFIG = [
+ {
+ "import_schema_uri": schema.dataset.ioformat.video.classification,
+ "gcs_source": {"uris": [f"gs://{VIDEO_GCS_BUCKET_NAME}/vertex-ai/video-dataset.csv"]},
+ },
+]
+
+with models.DAG(
+ f"{DAG_ID}_video_training_job",
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "auto_ml"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=VIDEO_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"unzip {VIDEO_ZIP_CSV_FILE_LOCAL_PATH} -d /video/",
+ )
+
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=VIDEO_CSV_FILE_LOCAL_PATH,
+ dst=VIDEO_GCS_OBJECT_NAME,
+ bucket=VIDEO_GCS_BUCKET_NAME,
+ )
+
+ create_video_dataset = CreateDatasetOperator(
+ task_id="video_dataset",
+ dataset=VIDEO_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ video_dataset_id = create_video_dataset.output["dataset_id"]
+
+ import_video_dataset = ImportDataOperator(
+ task_id="import_video_data",
+ dataset_id=video_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ import_configs=VIDEO_DATA_CONFIG,
+ )
+
+ # [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
+ create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
+ task_id="auto_ml_video_task",
+ display_name=VIDEO_DISPLAY_NAME,
+ prediction_type="classification",
+ model_type="CLOUD",
+ dataset_id=video_dataset_id,
+ model_display_name=MODEL_DISPLAY_NAME,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
+
+ delete_auto_ml_video_training_job = DeleteAutoMLTrainingJobOperator(
+ task_id="delete_auto_ml_video_training_job",
+ training_pipeline_id=create_auto_ml_video_training_job.output["training_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_video_dataset = DeleteDatasetOperator(
+ task_id="delete_video_dataset",
+ dataset_id=video_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=VIDEO_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /video/*",
+ )
+
+ (
+ # TEST SETUP
+ [
+ create_bucket,
+ create_video_dataset,
+ ]
+ >> unzip_file
+ >> upload_files
+ >> import_video_dataset
+ # TEST BODY
+ >> create_auto_ml_video_training_job
+ # TEST TEARDOWN
+ >> delete_auto_ml_video_training_job
+ >> delete_video_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
new file mode 100644
index 0000000000..41f6ff7854
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
@@ -0,0 +1,234 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Batch Prediction operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+ CreateAutoMLForecastingTrainingJobOperator,
+ DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job import (
+ CreateBatchPredictionJobOperator,
+ DeleteBatchPredictionJobOperator,
+ ListBatchPredictionJobsOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_batch_prediction_job_operations"
+REGION = "us-central1"
+
+FORECAST_DISPLAY_NAME = f"auto-ml-forecasting-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-forecasting-model-{ENV_ID}"
+
+JOB_DISPLAY_NAME = f"batch_prediction_job_test_{ENV_ID}"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/forecast-dataset.csv"
+FORECAST_ZIP_CSV_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / "forecast-dataset.csv.zip")
+FORECAST_CSV_FILE_LOCAL_PATH = "/batch-prediction/forecast-dataset.csv"
+
+FORECAST_DATASET = {
+ "display_name": f"forecast-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.time_series,
+ "metadata": ParseDict(
+ {
+ "input_config": {
+ "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}
+ }
+ },
+ Value(),
+ ),
+}
+
+TEST_TIME_COLUMN = "date"
+TEST_TIME_SERIES_IDENTIFIER_COLUMN = "store_name"
+TEST_TARGET_COLUMN = "sale_dollars"
+COLUMN_SPECS = {
+ TEST_TIME_COLUMN: "timestamp",
+ TEST_TARGET_COLUMN: "numeric",
+ "city": "categorical",
+ "zip_code": "categorical",
+ "county": "categorical",
+}
+
+BIGQUERY_SOURCE = f"bq://{PROJECT_ID}.test_iowa_liquor_sales_forecasting_us.2021_sales_predict"
+GCS_DESTINATION_PREFIX = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/output"
+MODEL_PARAMETERS = ParseDict({}, Value())
+
+
+with models.DAG(
+ DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ render_template_as_native_obj=True,
+ tags=["example", "vertex_ai", "batch_prediction_job"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"mkdir -p /batch-prediction && "
+ f"unzip {FORECAST_ZIP_CSV_FILE_LOCAL_PATH} -d /batch-prediction/",
+ )
+
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=FORECAST_CSV_FILE_LOCAL_PATH,
+ dst=DATA_SAMPLE_GCS_OBJECT_NAME,
+ bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ )
+ create_forecast_dataset = CreateDatasetOperator(
+ task_id="forecast_dataset",
+ dataset=FORECAST_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
+ task_id="auto_ml_forecasting_task",
+ display_name=FORECAST_DISPLAY_NAME,
+ optimization_objective="minimize-rmse",
+ column_specs=COLUMN_SPECS,
+ # run params
+ dataset_id=create_forecast_dataset.output["dataset_id"],
+ target_column=TEST_TARGET_COLUMN,
+ time_column=TEST_TIME_COLUMN,
+ time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
+ available_at_forecast_columns=[TEST_TIME_COLUMN],
+ unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
+ time_series_attribute_columns=["city", "zip_code", "county"],
+ forecast_horizon=30,
+ context_window=30,
+ data_granularity_unit="day",
+ data_granularity_count=1,
+ weight_column=None,
+ budget_milli_node_hours=1000,
+ model_display_name=MODEL_DISPLAY_NAME,
+ predefined_split_column_name=None,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+
+ # [START how_to_cloud_vertex_ai_create_batch_prediction_job_operator]
+ create_batch_prediction_job = CreateBatchPredictionJobOperator(
+ task_id="create_batch_prediction_job",
+ job_display_name=JOB_DISPLAY_NAME,
+ model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
+ predictions_format="csv",
+ bigquery_source=BIGQUERY_SOURCE,
+ gcs_destination_prefix=GCS_DESTINATION_PREFIX,
+ model_parameters=MODEL_PARAMETERS,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_batch_prediction_job_operator]
+
+ # [START how_to_cloud_vertex_ai_list_batch_prediction_job_operator]
+ list_batch_prediction_job = ListBatchPredictionJobsOperator(
+ task_id="list_batch_prediction_jobs",
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_list_batch_prediction_job_operator]
+
+ # [START how_to_cloud_vertex_ai_delete_batch_prediction_job_operator]
+ delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
+ task_id="delete_batch_prediction_job",
+ batch_prediction_job_id=create_batch_prediction_job.output["batch_prediction_job_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END how_to_cloud_vertex_ai_delete_batch_prediction_job_operator]
+
+ delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
+ task_id="delete_auto_ml_forecasting_training_job",
+ training_pipeline_id=create_auto_ml_forecasting_training_job.output["training_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ delete_forecast_dataset = DeleteDatasetOperator(
+ task_id="delete_forecast_dataset",
+ dataset_id=create_forecast_dataset.output["dataset_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /batch-prediction/*",
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> unzip_file
+ >> upload_files
+ >> create_forecast_dataset
+ >> create_auto_ml_forecasting_training_job
+ # TEST BODY
+ >> create_batch_prediction_job
+ >> list_batch_prediction_job
+ # TEST TEARDOWN
+ >> delete_batch_prediction_job
+ >> delete_auto_ml_forecasting_training_job
+ >> delete_forecast_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
new file mode 100644
index 0000000000..42b915f20f
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Custom Jobs operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+ CreateCustomContainerTrainingJobOperator,
+ DeleteCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_custom_job_operations"
+REGION = "us-central1"
+CONTAINER_DISPLAY_NAME = f"train-housing-container-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"container-housing-model-{ENV_ID}"
+
+CUSTOM_CONTAINER_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/california_housing_train.csv"
+CSV_FILE_LOCAL_PATH = "/custom-job/california_housing_train.csv"
+RESOURCES_PATH = Path(__file__).parent / "resources"
+CSV_ZIP_FILE_LOCAL_PATH = str(RESOURCES_PATH / "California-housing.zip")
+
+TABULAR_DATASET = lambda bucket_name: {
+ "display_name": f"tabular-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.tabular,
+ "metadata": ParseDict(
+ {"input_config": {"gcs_source": {"uri": [f"gs://{bucket_name}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}}},
+ Value(),
+ ),
+}
+
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+CUSTOM_CONTAINER_URI = f"us-central1-docker.pkg.dev/{PROJECT_ID}/system-tests/housing:latest"
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+
+with models.DAG(
+ f"{DAG_ID}_custom_container",
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "custom_job"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=CUSTOM_CONTAINER_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"mkdir -p /custom-job/ && unzip {CSV_ZIP_FILE_LOCAL_PATH} -d /custom-job/",
+ )
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=CSV_FILE_LOCAL_PATH,
+ dst=DATA_SAMPLE_GCS_OBJECT_NAME,
+ bucket=CUSTOM_CONTAINER_GCS_BUCKET_NAME,
+ )
+ create_tabular_dataset = CreateDatasetOperator(
+ task_id="tabular_dataset",
+ dataset=TABULAR_DATASET(CUSTOM_CONTAINER_GCS_BUCKET_NAME),
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
+
+ # [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+ create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
+ task_id="custom_container_task",
+ staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
+ display_name=CONTAINER_DISPLAY_NAME,
+ container_uri=CUSTOM_CONTAINER_URI,
+ model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+ # run params
+ dataset_id=tabular_dataset_id,
+ command=["python3", "task.py"],
+ model_display_name=MODEL_DISPLAY_NAME,
+ replica_count=REPLICA_COUNT,
+ machine_type=MACHINE_TYPE,
+ accelerator_type=ACCELERATOR_TYPE,
+ accelerator_count=ACCELERATOR_COUNT,
+ training_fraction_split=TRAINING_FRACTION_SPLIT,
+ validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+ test_fraction_split=TEST_FRACTION_SPLIT,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+
+ delete_custom_training_job = DeleteCustomTrainingJobOperator(
+ task_id="delete_custom_training_job",
+ training_pipeline_id=create_custom_container_training_job.output["training_id"],
+ custom_job_id=create_custom_container_training_job.output["custom_job_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_tabular_dataset = DeleteDatasetOperator(
+ task_id="delete_tabular_dataset",
+ dataset_id=tabular_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=CUSTOM_CONTAINER_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /custom-job/*",
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> unzip_file
+ >> upload_files
+ >> create_tabular_dataset
+ # TEST BODY
+ >> create_custom_container_training_job
+ # TEST TEARDOWN
+ >> delete_custom_training_job
+ >> delete_tabular_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
new file mode 100644
index 0000000000..aaa289d9a0
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
@@ -0,0 +1,175 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Custom Jobs operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+ CreateCustomTrainingJobOperator,
+ DeleteCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "vertex_ai_custom_job_operations"
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+REGION = "us-central1"
+CUSTOM_DISPLAY_NAME = f"train-housing-custom-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"custom-housing-model-{ENV_ID}"
+
+CUSTOM_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/california_housing_train.csv"
+CSV_ZIP_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / "California-housing.zip")
+CSV_FILE_LOCAL_PATH = "/custom-job/california_housing_train.csv"
+
+TABULAR_DATASET = lambda bucket_name: {
+ "display_name": f"tabular-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.tabular,
+ "metadata": ParseDict(
+ {"input_config": {"gcs_source": {"uri": [f"gs://{bucket_name}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}}},
+ Value(),
+ ),
+}
+
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+
+LOCAL_TRAINING_SCRIPT_PATH = "/custom-job/california_housing_training_script.py"
+
+
+with models.DAG(
+ f"{DAG_ID}_custom",
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "custom_job"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=CUSTOM_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"mkdir -p /custom-job && unzip {CSV_ZIP_FILE_LOCAL_PATH} -d /custom-job/",
+ )
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=CSV_FILE_LOCAL_PATH,
+ dst=DATA_SAMPLE_GCS_OBJECT_NAME,
+ bucket=CUSTOM_GCS_BUCKET_NAME,
+ )
+ create_tabular_dataset = CreateDatasetOperator(
+ task_id="tabular_dataset",
+ dataset=TABULAR_DATASET(CUSTOM_GCS_BUCKET_NAME),
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
+
+ # [START how_to_cloud_vertex_ai_create_custom_training_job_operator]
+ create_custom_training_job = CreateCustomTrainingJobOperator(
+ task_id="custom_task",
+ staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
+ display_name=CUSTOM_DISPLAY_NAME,
+ script_path=LOCAL_TRAINING_SCRIPT_PATH,
+ container_uri=CONTAINER_URI,
+ requirements=["gcsfs==0.7.1"],
+ model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+ # run params
+ dataset_id=tabular_dataset_id,
+ replica_count=REPLICA_COUNT,
+ model_display_name=MODEL_DISPLAY_NAME,
+ sync=False,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_custom_training_job_operator]
+
+ # [START how_to_cloud_vertex_ai_delete_custom_training_job_operator]
+ delete_custom_training_job = DeleteCustomTrainingJobOperator(
+ task_id="delete_custom_training_job",
+ training_pipeline_id=create_custom_training_job.output["training_id"],
+ custom_job_id=create_custom_training_job.output["custom_job_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END how_to_cloud_vertex_ai_delete_custom_training_job_operator]
+
+ delete_tabular_dataset = DeleteDatasetOperator(
+ task_id="delete_tabular_dataset",
+ dataset_id=tabular_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=CUSTOM_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /custom-job/*",
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> unzip_file
+ >> upload_files
+ >> create_tabular_dataset
+ # TEST BODY
+ >> create_custom_training_job
+ # TEST TEARDOWN
+ >> delete_custom_training_job
+ >> delete_tabular_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
new file mode 100644
index 0000000000..c3a2a27c73
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Custom Jobs operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+ CreateCustomPythonPackageTrainingJobOperator,
+ DeleteCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_custom_job_operations"
+REGION = "us-central1"
+PACKAGE_DISPLAY_NAME = f"train-housing-py-package-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"py-package-housing-model-{ENV_ID}"
+
+CUSTOM_PYTHON_GCS_BUCKET_NAME = f"bucket_python_{DAG_ID}_{ENV_ID}"
+
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/california_housing_train.csv"
+RESOURCES_PATH = Path(__file__).parent / "resources"
+CSV_ZIP_FILE_LOCAL_PATH = str(RESOURCES_PATH / "California-housing.zip")
+CSV_FILE_LOCAL_PATH = "/custom-job/california_housing_train.csv"
+TAR_FILE_LOCAL_PATH = "/custom-job/custom_trainer_script-0.1.tar"
+FILES_TO_UPLOAD = [
+ CSV_FILE_LOCAL_PATH,
+ TAR_FILE_LOCAL_PATH,
+]
+
+TABULAR_DATASET = lambda bucket_name: {
+ "display_name": f"tabular-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.tabular,
+ "metadata": ParseDict(
+ {"input_config": {"gcs_source": {"uri": [f"gs://{bucket_name}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}}},
+ Value(),
+ ),
+}
+
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+PYTHON_PACKAGE_GCS_URI = f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}/vertex-ai/custom_trainer_script-0.1.tar"
+PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
+
+
+with models.DAG(
+ f"{DAG_ID}_python_package",
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "custom_job"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=CUSTOM_PYTHON_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"mkdir -p /custom-job && unzip {CSV_ZIP_FILE_LOCAL_PATH} -d /custom-job/",
+ )
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=FILES_TO_UPLOAD,
+ dst="vertex-ai/",
+ bucket=CUSTOM_PYTHON_GCS_BUCKET_NAME,
+ )
+ create_tabular_dataset = CreateDatasetOperator(
+ task_id="tabular_dataset",
+ dataset=TABULAR_DATASET(CUSTOM_PYTHON_GCS_BUCKET_NAME),
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
+
+ # [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+ create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
+ task_id="python_package_task",
+ staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
+ display_name=PACKAGE_DISPLAY_NAME,
+ python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
+ python_module_name=PYTHON_MODULE_NAME,
+ container_uri=CONTAINER_URI,
+ model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+ # run params
+ dataset_id=tabular_dataset_id,
+ model_display_name=MODEL_DISPLAY_NAME,
+ replica_count=REPLICA_COUNT,
+ machine_type=MACHINE_TYPE,
+ accelerator_type=ACCELERATOR_TYPE,
+ accelerator_count=ACCELERATOR_COUNT,
+ training_fraction_split=TRAINING_FRACTION_SPLIT,
+ validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+ test_fraction_split=TEST_FRACTION_SPLIT,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+
+ delete_custom_training_job = DeleteCustomTrainingJobOperator(
+ task_id="delete_custom_training_job",
+ training_pipeline_id=create_custom_python_package_training_job.output["training_id"],
+ custom_job_id=create_custom_python_package_training_job.output["custom_job_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_tabular_dataset = DeleteDatasetOperator(
+ task_id="delete_tabular_dataset",
+ dataset_id=tabular_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=CUSTOM_PYTHON_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /custom-job/*",
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> unzip_file
+ >> upload_files
+ >> create_tabular_dataset
+ # TEST BODY
+ >> create_custom_python_package_training_job
+ # TEST TEARDOWN
+ >> delete_custom_training_job
+ >> delete_tabular_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
new file mode 100644
index 0000000000..270103b7f1
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
@@ -0,0 +1,305 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Dataset operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+ ExportDataOperator,
+ GetDatasetOperator,
+ ImportDataOperator,
+ ListDatasetsOperator,
+ UpdateDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_dataset_operations"
+REGION = "us-central1"
+
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+RESOURCES_PATH = Path(__file__).parent / "resources"
+ALL_DATASETS_ZIP_CSV_FILE_LOCAL_PATH = str(RESOURCES_PATH / "all-datasets.zip")
+
+CSV_FILES_LOCAL_PATH = [
+ "/all-datasets/forecast-dataset.csv",
+ "/all-datasets/image-dataset.csv",
+ "/all-datasets/tabular-dataset.csv",
+ "/all-datasets/text-dataset.csv",
+ "/all-datasets/video-dataset.csv",
+]
+
+TIME_SERIES_DATASET = {
+ "display_name": f"time-series-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.time_series,
+ "metadata": ParseDict(
+ {
+ "input_config": {
+ "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/forecast-dataset.csv"]}
+ }
+ },
+ Value(),
+ ),
+}
+IMAGE_DATASET = {
+ "display_name": f"image-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.image,
+ "metadata": Value(string_value="image-dataset"),
+}
+TABULAR_DATASET = {
+ "display_name": f"tabular-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.tabular,
+ "metadata": ParseDict(
+ {
+ "input_config": {
+ "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/tabular-dataset.csv"]}
+ }
+ },
+ Value(),
+ ),
+}
+TEXT_DATASET = {
+ "display_name": f"text-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.text,
+ "metadata": Value(string_value="text-dataset"),
+}
+VIDEO_DATASET = {
+ "display_name": f"video-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.video,
+ "metadata": Value(string_value="video-dataset"),
+}
+TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/exports"}}
+TEST_IMPORT_CONFIG = [
+ {
+ "data_item_labels": {
+ "test-labels-name": "test-labels-value",
+ },
+ "import_schema_uri": (
+ "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
+ ),
+ "gcs_source": {
+ "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
+ },
+ },
+]
+DATASET_TO_UPDATE = {"display_name": "test-name"}
+TEST_UPDATE_MASK = {"paths": ["displayName"]}
+
+
+with models.DAG(
+ DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "dataset"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"unzip {ALL_DATASETS_ZIP_CSV_FILE_LOCAL_PATH} -d /all-datasets/",
+ )
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=CSV_FILES_LOCAL_PATH,
+ dst="vertex-ai/",
+ bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ )
+
+ # [START how_to_cloud_vertex_ai_create_dataset_operator]
+ create_image_dataset_job = CreateDatasetOperator(
+ task_id="image_dataset",
+ dataset=IMAGE_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ create_tabular_dataset_job = CreateDatasetOperator(
+ task_id="tabular_dataset",
+ dataset=TABULAR_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ create_text_dataset_job = CreateDatasetOperator(
+ task_id="text_dataset",
+ dataset=TEXT_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ create_video_dataset_job = CreateDatasetOperator(
+ task_id="video_dataset",
+ dataset=VIDEO_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ create_time_series_dataset_job = CreateDatasetOperator(
+ task_id="time_series_dataset",
+ dataset=TIME_SERIES_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_dataset_operator]
+
+ # [START how_to_cloud_vertex_ai_delete_dataset_operator]
+ delete_dataset_job = DeleteDatasetOperator(
+ task_id="delete_dataset",
+ dataset_id=create_text_dataset_job.output["dataset_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_delete_dataset_operator]
+
+ # [START how_to_cloud_vertex_ai_get_dataset_operator]
+ get_dataset = GetDatasetOperator(
+ task_id="get_dataset",
+ project_id=PROJECT_ID,
+ region=REGION,
+ dataset_id=create_tabular_dataset_job.output["dataset_id"],
+ )
+ # [END how_to_cloud_vertex_ai_get_dataset_operator]
+
+ # [START how_to_cloud_vertex_ai_export_data_operator]
+ export_data_job = ExportDataOperator(
+ task_id="export_data",
+ dataset_id=create_image_dataset_job.output["dataset_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ export_config=TEST_EXPORT_CONFIG,
+ )
+ # [END how_to_cloud_vertex_ai_export_data_operator]
+
+ # [START how_to_cloud_vertex_ai_import_data_operator]
+ import_data_job = ImportDataOperator(
+ task_id="import_data",
+ dataset_id=create_image_dataset_job.output["dataset_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ import_configs=TEST_IMPORT_CONFIG,
+ )
+ # [END how_to_cloud_vertex_ai_import_data_operator]
+
+ # [START how_to_cloud_vertex_ai_list_dataset_operator]
+ list_dataset_job = ListDatasetsOperator(
+ task_id="list_dataset",
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_list_dataset_operator]
+
+ # [START how_to_cloud_vertex_ai_update_dataset_operator]
+ update_dataset_job = UpdateDatasetOperator(
+ task_id="update_dataset",
+ project_id=PROJECT_ID,
+ region=REGION,
+ dataset_id=create_video_dataset_job.output["dataset_id"],
+ dataset=DATASET_TO_UPDATE,
+ update_mask=TEST_UPDATE_MASK,
+ )
+ # [END how_to_cloud_vertex_ai_update_dataset_operator]
+
+ delete_time_series_dataset_job = DeleteDatasetOperator(
+ task_id="delete_time_series_dataset",
+ dataset_id=create_time_series_dataset_job.output["dataset_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_tabular_dataset_job = DeleteDatasetOperator(
+ task_id="delete_tabular_dataset",
+ dataset_id=create_tabular_dataset_job.output["dataset_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_image_dataset_job = DeleteDatasetOperator(
+ task_id="delete_image_dataset",
+ dataset_id=create_image_dataset_job.output["dataset_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_video_dataset_job = DeleteDatasetOperator(
+ task_id="delete_video_dataset",
+ dataset_id=create_video_dataset_job.output["dataset_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /all-datasets/*",
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> unzip_file
+ >> upload_files
+ # TEST BODY
+ >> [
+ create_time_series_dataset_job >> delete_time_series_dataset_job,
+ create_text_dataset_job >> delete_dataset_job,
+ create_tabular_dataset_job >> get_dataset >> delete_tabular_dataset_job,
+ create_image_dataset_job >> import_data_job >> export_data_job >> delete_image_dataset_job,
+ create_video_dataset_job >> update_dataset_job >> delete_video_dataset_job,
+ list_dataset_job,
+ ]
+ # TEST TEARDOWN
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
new file mode 100644
index 0000000000..da18746cf4
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Endpoint Service operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud import aiplatform
+from google.cloud.aiplatform import schema
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
+ CreateAutoMLImageTrainingJobOperator,
+ DeleteAutoMLTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+ ImportDataOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.endpoint_service import (
+ CreateEndpointOperator,
+ DeleteEndpointOperator,
+ DeployModelOperator,
+ ListEndpointsOperator,
+ UndeployModelOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_endpoint_service_operations"
+REGION = "us-central1"
+IMAGE_DISPLAY_NAME = f"auto-ml-image-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"auto-ml-image-model-{ENV_ID}"
+
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/image-dataset.csv"
+IMAGE_ZIP_CSV_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / "image-dataset.csv.zip")
+IMAGE_CSV_FILE_LOCAL_PATH = "/endpoint/image-dataset.csv"
+
+IMAGE_DATASET = {
+ "display_name": f"image-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.image,
+ "metadata": Value(string_value="image-dataset"),
+}
+IMAGE_DATA_CONFIG = [
+ {
+ "import_schema_uri": schema.dataset.ioformat.image.single_label_classification,
+ "gcs_source": {"uris": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/image-dataset.csv"]},
+ },
+]
+
+ENDPOINT_CONF = {
+ "display_name": f"endpoint_test_{ENV_ID}",
+}
+
+
+with models.DAG(
+ DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ render_template_as_native_obj=True,
+ tags=["example", "vertex_ai", "endpoint_service"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"unzip {IMAGE_ZIP_CSV_FILE_LOCAL_PATH} -d /endpoint/",
+ )
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=IMAGE_CSV_FILE_LOCAL_PATH,
+ dst=DATA_SAMPLE_GCS_OBJECT_NAME,
+ bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ )
+ create_image_dataset = CreateDatasetOperator(
+ task_id="image_dataset",
+ dataset=IMAGE_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ image_dataset_id = create_image_dataset.output["dataset_id"]
+
+ import_image_dataset = ImportDataOperator(
+ task_id="import_image_data",
+ dataset_id=image_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ import_configs=IMAGE_DATA_CONFIG,
+ )
+
+ create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
+ task_id="auto_ml_image_task",
+ display_name=IMAGE_DISPLAY_NAME,
+ dataset_id=image_dataset_id,
+ prediction_type="classification",
+ multi_label=False,
+ model_type="CLOUD",
+ training_fraction_split=0.6,
+ validation_fraction_split=0.2,
+ test_fraction_split=0.2,
+ budget_milli_node_hours=8000,
+ model_display_name=MODEL_DISPLAY_NAME,
+ disable_early_stopping=False,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ DEPLOYED_MODEL = {
+ # format: 'projects/{project}/locations/{location}/models/{model}'
+ "model": "{{ti.xcom_pull('auto_ml_image_task')['name']}}",
+ "display_name": f"temp_endpoint_test_{ENV_ID}",
+ "dedicated_resources": {
+ "machine_spec": {
+ "machine_type": "n1-standard-2",
+ "accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
+ "accelerator_count": 1,
+ },
+ "min_replica_count": 1,
+ "max_replica_count": 1,
+ },
+ }
+
+ # [START how_to_cloud_vertex_ai_create_endpoint_operator]
+ create_endpoint = CreateEndpointOperator(
+ task_id="create_endpoint",
+ endpoint=ENDPOINT_CONF,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_create_endpoint_operator]
+
+ # [START how_to_cloud_vertex_ai_delete_endpoint_operator]
+ delete_endpoint = DeleteEndpointOperator(
+ task_id="delete_endpoint",
+ endpoint_id=create_endpoint.output["endpoint_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_delete_endpoint_operator]
+
+ # [START how_to_cloud_vertex_ai_list_endpoints_operator]
+ list_endpoints = ListEndpointsOperator(
+ task_id="list_endpoints",
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_list_endpoints_operator]
+
+ # [START how_to_cloud_vertex_ai_deploy_model_operator]
+ deploy_model = DeployModelOperator(
+ task_id="deploy_model",
+ endpoint_id=create_endpoint.output["endpoint_id"],
+ deployed_model=DEPLOYED_MODEL,
+ traffic_split={"0": 100},
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_deploy_model_operator]
+
+ # [START how_to_cloud_vertex_ai_undeploy_model_operator]
+ undeploy_model = UndeployModelOperator(
+ task_id="undeploy_model",
+ endpoint_id=create_endpoint.output["endpoint_id"],
+ deployed_model_id=deploy_model.output["deployed_model_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_undeploy_model_operator]
+
+ delete_auto_ml_image_training_job = DeleteAutoMLTrainingJobOperator(
+ task_id="delete_auto_ml_training_job",
+ training_pipeline_id=create_auto_ml_image_training_job.output["training_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_image_dataset = DeleteDatasetOperator(
+ task_id="delete_image_dataset",
+ dataset_id=image_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /endpoint/*",
+ )
+
+ (
+ # TEST SETUP
+ [
+ create_bucket,
+ create_image_dataset,
+ ]
+ >> unzip_file
+ >> upload_files
+ >> import_image_dataset
+ >> create_auto_ml_image_training_job
+ # TEST BODY
+ >> create_endpoint
+ >> deploy_model
+ >> undeploy_model
+ >> delete_endpoint
+ >> list_endpoints
+ # TEST TEARDOWN
+ >> delete_auto_ml_image_training_job
+ >> delete_image_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
new file mode 100644
index 0000000000..1976a026ec
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
@@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Hyperparameter Tuning Job operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from google.cloud import aiplatform
+
+from airflow import models
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job import (
+ CreateHyperparameterTuningJobOperator,
+ DeleteHyperparameterTuningJobOperator,
+ GetHyperparameterTuningJobOperator,
+ ListHyperparameterTuningJobOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_hyperparameter_tuning_job_operations"
+REGION = "us-central1"
+DISPLAY_NAME = f"hyperparameter-tuning-job-{ENV_ID}"
+
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_hyperparameter_tuning_job_{ENV_ID}"
+STAGING_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+WORKER_POOL_SPECS = [
+ {
+ "machine_spec": {
+ "machine_type": MACHINE_TYPE,
+ "accelerator_type": ACCELERATOR_TYPE,
+ "accelerator_count": ACCELERATOR_COUNT,
+ },
+ "replica_count": REPLICA_COUNT,
+ "container_spec": {
+ "image_uri": f"gcr.io/{PROJECT_ID}/horse-human:hypertune",
+ },
+ }
+]
+PARAM_SPECS = {
+ "learning_rate": aiplatform.hyperparameter_tuning.DoubleParameterSpec(min=0.01, max=1, scale="log"),
+ "momentum": aiplatform.hyperparameter_tuning.DoubleParameterSpec(min=0, max=1, scale="linear"),
+ "num_neurons": aiplatform.hyperparameter_tuning.DiscreteParameterSpec(
+ values=[64, 128, 512], scale="linear"
+ ),
+}
+METRIC_SPEC = {
+ "accuracy": "maximize",
+}
+
+
+with models.DAG(
+ DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "hyperparameter_tuning_job"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+
+ # [START how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]
+ create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
+ task_id="create_hyperparameter_tuning_job",
+ staging_bucket=STAGING_BUCKET,
+ display_name=DISPLAY_NAME,
+ worker_pool_specs=WORKER_POOL_SPECS,
+ sync=False,
+ region=REGION,
+ project_id=PROJECT_ID,
+ parameter_spec=PARAM_SPECS,
+ metric_spec=METRIC_SPEC,
+ max_trial_count=15,
+ parallel_trial_count=3,
+ )
+ # [END how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]
+
+ # [START how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator]
+ get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
+ task_id="get_hyperparameter_tuning_job",
+ project_id=PROJECT_ID,
+ region=REGION,
+ hyperparameter_tuning_job_id=create_hyperparameter_tuning_job.output["hyperparameter_tuning_job_id"],
+ )
+ # [END how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator]
+
+ # [START how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator]
+ delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
+ task_id="delete_hyperparameter_tuning_job",
+ project_id=PROJECT_ID,
+ region=REGION,
+ hyperparameter_tuning_job_id=create_hyperparameter_tuning_job.output["hyperparameter_tuning_job_id"],
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator]
+
+ # [START how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator]
+ list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
+ task_id="list_hyperparameter_tuning_job",
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator]
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ # TEST BODY
+ >> create_hyperparameter_tuning_job
+ >> get_hyperparameter_tuning_job
+ >> delete_hyperparameter_tuning_job
+ >> list_hyperparameter_tuning_job
+ # TEST TEARDOWN
+ >> delete_bucket
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py
new file mode 100644
index 0000000000..1876139efc
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Custom Jobs operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import ListCustomTrainingJobOperator
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_custom_job_operations"
+REGION = "us-central1"
+
+with models.DAG(
+ f"{DAG_ID}_list_custom_job",
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "vertex_ai", "custom_job"],
+) as dag:
+ # [START how_to_cloud_vertex_ai_list_custom_training_job_operator]
+ list_custom_training_job = ListCustomTrainingJobOperator(
+ task_id="list_custom_training_job",
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_list_custom_training_job_operator]
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
new file mode 100644
index 0000000000..98e0c8d20d
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
@@ -0,0 +1,246 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# mypy ignore arg types (for templated fields)
+# type: ignore[arg-type]
+
+"""
+Example Airflow DAG for Google Vertex AI service testing Model Service operations.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from google.cloud.aiplatform import schema
+from google.protobuf.json_format import ParseDict
+from google.protobuf.struct_pb2 import Value
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+ CreateCustomTrainingJobOperator,
+ DeleteCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+ CreateDatasetOperator,
+ DeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.model_service import (
+ DeleteModelOperator,
+ ExportModelOperator,
+ ListModelsOperator,
+ UploadModelOperator,
+)
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "vertex_ai_model_service_operations"
+REGION = "us-central1"
+TRAIN_DISPLAY_NAME = f"train-housing-custom-{ENV_ID}"
+MODEL_DISPLAY_NAME = f"custom-housing-model-{ENV_ID}"
+
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+STAGING_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}"
+
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/california_housing_train.csv"
+CSV_FILE_LOCAL_PATH = "/model_service/california_housing_train.csv"
+RESOURCES_PATH = Path(__file__).parent / "resources"
+CSV_ZIP_FILE_LOCAL_PATH = str(RESOURCES_PATH / "California-housing.zip")
+
+TABULAR_DATASET = {
+ "display_name": f"tabular-dataset-{ENV_ID}",
+ "metadata_schema_uri": schema.dataset.metadata.tabular,
+ "metadata": ParseDict(
+ {
+ "input_config": {
+ "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}
+ }
+ },
+ Value(),
+ ),
+}
+
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+
+LOCAL_TRAINING_SCRIPT_PATH = "/model_service/california_housing_training_script.py"
+
+MODEL_OUTPUT_CONFIG = {
+ "artifact_destination": {
+ "output_uri_prefix": STAGING_BUCKET,
+ },
+ "export_format_id": "custom-trained",
+}
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+MODEL_OBJ = {
+ "display_name": f"model-{ENV_ID}",
+ "artifact_uri": "{{ti.xcom_pull('custom_task')['artifactUri']}}",
+ "container_spec": {
+ "image_uri": MODEL_SERVING_CONTAINER_URI,
+ "command": [],
+ "args": [],
+ "env": [],
+ "ports": [],
+ "predict_route": "",
+ "health_route": "",
+ },
+}
+
+
+with models.DAG(
+ DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ render_template_as_native_obj=True,
+ tags=["example", "vertex_ai", "model_service"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+ unzip_file = BashOperator(
+ task_id="unzip_csv_data_file",
+ bash_command=f"mkdir -p /model_service && unzip {CSV_ZIP_FILE_LOCAL_PATH} -d /model_service/",
+ )
+ upload_files = LocalFilesystemToGCSOperator(
+ task_id="upload_file_to_bucket",
+ src=CSV_FILE_LOCAL_PATH,
+ dst=DATA_SAMPLE_GCS_OBJECT_NAME,
+ bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ )
+ create_tabular_dataset = CreateDatasetOperator(
+ task_id="tabular_dataset",
+ dataset=TABULAR_DATASET,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
+
+ create_custom_training_job = CreateCustomTrainingJobOperator(
+ task_id="custom_task",
+ staging_bucket=f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}",
+ display_name=TRAIN_DISPLAY_NAME,
+ script_path=LOCAL_TRAINING_SCRIPT_PATH,
+ container_uri=CONTAINER_URI,
+ requirements=["gcsfs==0.7.1"],
+ model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+ # run params
+ dataset_id=tabular_dataset_id,
+ replica_count=1,
+ model_display_name=MODEL_DISPLAY_NAME,
+ sync=False,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+
+ # [START how_to_cloud_vertex_ai_upload_model_operator]
+ upload_model = UploadModelOperator(
+ task_id="upload_model",
+ region=REGION,
+ project_id=PROJECT_ID,
+ model=MODEL_OBJ,
+ )
+ # [END how_to_cloud_vertex_ai_upload_model_operator]
+
+ # [START how_to_cloud_vertex_ai_export_model_operator]
+ export_model = ExportModelOperator(
+ task_id="export_model",
+ project_id=PROJECT_ID,
+ region=REGION,
+ model_id=upload_model.output["model_id"],
+ output_config=MODEL_OUTPUT_CONFIG,
+ )
+ # [END how_to_cloud_vertex_ai_export_model_operator]
+
+ # [START how_to_cloud_vertex_ai_delete_model_operator]
+ delete_model = DeleteModelOperator(
+ task_id="delete_model",
+ project_id=PROJECT_ID,
+ region=REGION,
+ model_id=upload_model.output["model_id"],
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END how_to_cloud_vertex_ai_delete_model_operator]
+
+ # [START how_to_cloud_vertex_ai_list_models_operator]
+ list_models = ListModelsOperator(
+ task_id="list_models",
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_cloud_vertex_ai_list_models_operator]
+
+ delete_custom_training_job = DeleteCustomTrainingJobOperator(
+ task_id="delete_custom_training_job",
+ training_pipeline_id=create_custom_training_job.output["training_id"],
+ custom_job_id=create_custom_training_job.output["custom_job_id"],
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_tabular_dataset = DeleteDatasetOperator(
+ task_id="delete_tabular_dataset",
+ dataset_id=tabular_dataset_id,
+ region=REGION,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ clear_folder = BashOperator(
+ task_id="clear_folder",
+ bash_command="rm -r /model_service/*",
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> unzip_file
+ >> upload_files
+ >> create_tabular_dataset
+ >> create_custom_training_job
+ # TEST BODY
+ >> upload_model
+ >> export_model
+ >> delete_model
+ >> list_models
+ # TEST TEARDOWN
+ >> delete_custom_training_job
+ >> delete_tabular_dataset
+ >> delete_bucket
+ >> clear_folder
+ )
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/California-housing.zip b/tests/system/providers/google/cloud/vertex_ai/resources/California-housing.zip
new file mode 100644
index 0000000000..1ac6fc83a4
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/California-housing.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/__init__.py b/tests/system/providers/google/cloud/vertex_ai/resources/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/vertex_ai/resources/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/all-datasets.zip b/tests/system/providers/google/cloud/vertex_ai/resources/all-datasets.zip
new file mode 100644
index 0000000000..00c7a27029
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/all-datasets.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/forecast-dataset.csv.zip b/tests/system/providers/google/cloud/vertex_ai/resources/forecast-dataset.csv.zip
new file mode 100644
index 0000000000..fbb59761a9
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/forecast-dataset.csv.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/image-dataset.csv.zip b/tests/system/providers/google/cloud/vertex_ai/resources/image-dataset.csv.zip
new file mode 100644
index 0000000000..5dc45deeb7
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/image-dataset.csv.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/tabular-dataset.csv.zip b/tests/system/providers/google/cloud/vertex_ai/resources/tabular-dataset.csv.zip
new file mode 100644
index 0000000000..33f50119ab
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/tabular-dataset.csv.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/text-dataset.csv.zip b/tests/system/providers/google/cloud/vertex_ai/resources/text-dataset.csv.zip
new file mode 100644
index 0000000000..2a38cbb6e7
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/text-dataset.csv.zip differ
diff --git a/tests/system/providers/google/cloud/vertex_ai/resources/video-dataset.csv.zip b/tests/system/providers/google/cloud/vertex_ai/resources/video-dataset.csv.zip
new file mode 100644
index 0000000000..658ce398d9
Binary files /dev/null and b/tests/system/providers/google/cloud/vertex_ai/resources/video-dataset.csv.zip differ