You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/08/10 21:32:54 UTC
[airflow] branch main updated: Fix AutoML system tests (#33197)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7ef6851ab9 Fix AutoML system tests (#33197)
7ef6851ab9 is described below
commit 7ef6851ab94266720727a7cf4c6c7d7c6d6f9f93
Author: VladaZakharova <80...@users.noreply.github.com>
AuthorDate: Thu Aug 10 23:32:46 2023 +0200
Fix AutoML system tests (#33197)
---
.../google/cloud/automl/example_automl_dataset.py | 47 ++++-----
.../google/cloud/automl/example_automl_model.py | 28 ++---
.../example_automl_nl_text_classification.py | 102 +++++++++++++-----
.../automl/example_automl_nl_text_extraction.py | 34 +++---
.../automl/example_automl_nl_text_sentiment.py | 93 ++++++++++------
.../cloud/automl/example_automl_translation.py | 117 +++++++++++++++------
...ple_automl_video_intelligence_classification.py | 94 +++++++++++------
.../example_automl_video_intelligence_tracking.py | 94 +++++++++++------
.../automl/example_automl_vision_classification.py | 88 +++++++++++-----
.../example_automl_vision_object_detection.py | 101 ++++++++++++------
10 files changed, 530 insertions(+), 268 deletions(-)
diff --git a/tests/system/providers/google/cloud/automl/example_automl_dataset.py b/tests/system/providers/google/cloud/automl/example_automl_dataset.py
index e3c5abf671..1d7da91c8f 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_dataset.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_dataset.py
@@ -43,21 +43,20 @@ from airflow.providers.google.cloud.operators.gcs import (
)
from airflow.utils.trigger_rule import TriggerRule
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "automl_dataset"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_dataset"
GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
GCP_AUTOML_LOCATION = "us-central1"
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
-DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-RESOURCE_DATA_BUCKET = "system-tests-resources"
-
-DATASET_NAME = "test_dataset_tabular"
+DATASET_NAME = f"ds_tabular_{ENV_ID}".replace("-", "_")
DATASET = {
"display_name": DATASET_NAME,
"tables_dataset_metadata": {"target_column_spec_id": ""},
}
-AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/bank-marketing.csv"
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/tabular-classification.csv"
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
extract_object_id = CloudAutoMLHook.extract_object_id
@@ -78,7 +77,7 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "automl"],
+ tags=["example", "automl", "dataset"],
user_defined_macros={
"get_target_column_spec": get_target_column_spec,
"target": "Class",
@@ -95,7 +94,7 @@ with models.DAG(
move_dataset_file = GCSSynchronizeBucketsOperator(
task_id="move_dataset_to_bucket",
source_bucket=RESOURCE_DATA_BUCKET,
- source_object="automl",
+ source_object="automl/datasets/tabular",
destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
destination_object="automl",
recursive=True,
@@ -112,8 +111,8 @@ with models.DAG(
# [END howto_operator_automl_create_dataset]
# [START howto_operator_automl_import_data]
- import_dataset_task = AutoMLImportDataOperator(
- task_id="import_dataset_task",
+ import_dataset = AutoMLImportDataOperator(
+ task_id="import_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
input_config=IMPORT_INPUT_CONFIG,
@@ -121,8 +120,8 @@ with models.DAG(
# [END howto_operator_automl_import_data]
# [START howto_operator_automl_specs]
- list_tables_spec_task = AutoMLTablesListTableSpecsOperator(
- task_id="list_tables_spec_task",
+ list_tables_spec = AutoMLTablesListTableSpecsOperator(
+ task_id="list_tables_spec",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
@@ -130,8 +129,8 @@ with models.DAG(
# [END howto_operator_automl_specs]
# [START howto_operator_automl_column_specs]
- list_columns_spec_task = AutoMLTablesListColumnSpecsOperator(
- task_id="list_columns_spec_task",
+ list_columns_spec = AutoMLTablesListColumnSpecsOperator(
+ task_id="list_columns_spec",
dataset_id=dataset_id,
table_spec_id="{{ extract_object_id(task_instance.xcom_pull('list_tables_spec_task')[0]) }}",
location=GCP_AUTOML_LOCATION,
@@ -146,16 +145,16 @@ with models.DAG(
"target_column_spec_id"
] = "{{ get_target_column_spec(task_instance.xcom_pull('list_columns_spec_task'), target) }}"
- update_dataset_task = AutoMLTablesUpdateDatasetOperator(
- task_id="update_dataset_task",
+ update_dataset = AutoMLTablesUpdateDatasetOperator(
+ task_id="update_dataset",
dataset=update,
location=GCP_AUTOML_LOCATION,
)
# [END howto_operator_automl_update_dataset]
# [START howto_operator_list_dataset]
- list_datasets_task = AutoMLListDatasetOperator(
- task_id="list_datasets_task",
+ list_datasets = AutoMLListDatasetOperator(
+ task_id="list_datasets",
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
@@ -178,11 +177,11 @@ with models.DAG(
# TEST SETUP
[create_bucket >> move_dataset_file, create_dataset]
# TEST BODY
- >> import_dataset_task
- >> list_tables_spec_task
- >> list_columns_spec_task
- >> update_dataset_task
- >> list_datasets_task
+ >> import_dataset
+ >> list_tables_spec
+ >> list_columns_spec
+ >> update_dataset
+ >> list_datasets
# TEST TEARDOWN
>> delete_dataset
>> delete_bucket
diff --git a/tests/system/providers/google/cloud/automl/example_automl_model.py b/tests/system/providers/google/cloud/automl/example_automl_model.py
index 5e7560a022..274dd83e95 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_model.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_model.py
@@ -50,33 +50,35 @@ from airflow.providers.google.cloud.operators.gcs import (
)
from airflow.utils.trigger_rule import TriggerRule
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
DAG_ID = "example_automl_model"
GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
GCP_AUTOML_LOCATION = "us-central1"
-DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-RESOURCE_DATA_BUCKET = "system-tests-resources"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-DATASET_NAME = "test_dataset_model"
+DATASET_NAME = f"md_tabular_{ENV_ID}".replace("-", "_")
DATASET = {
"display_name": DATASET_NAME,
"tables_dataset_metadata": {"target_column_spec_id": ""},
}
-AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl-model/bank-marketing.csv"
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/bank-marketing-split.csv"
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
IMPORT_OUTPUT_CONFIG = {
- "gcs_destination": {"output_uri_prefix": f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl-model"}
+ "gcs_destination": {"output_uri_prefix": f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl"}
}
-MODEL_NAME = "test_model"
+# change the name here
+MODEL_NAME = f"md_tabular_{ENV_ID}".replace("-", "_")
MODEL = {
"display_name": MODEL_NAME,
"tables_model_metadata": {"train_budget_milli_node_hours": 1000},
}
PREDICT_VALUES = [
+ Value(string_value="TRAINING"),
Value(string_value="51"),
Value(string_value="blue-collar"),
Value(string_value="married"),
@@ -115,10 +117,10 @@ with models.DAG(
catchup=False,
user_defined_macros={
"get_target_column_spec": get_target_column_spec,
- "target": "Class",
+ "target": "Deposit",
"extract_object_id": extract_object_id,
},
- tags=["example", "automl"],
+ tags=["example", "automl", "model"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket",
@@ -130,9 +132,9 @@ with models.DAG(
move_dataset_file = GCSSynchronizeBucketsOperator(
task_id="move_data_to_bucket",
source_bucket=RESOURCE_DATA_BUCKET,
- source_object="automl-model",
+ source_object="automl/datasets/model",
destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
- destination_object="automl-model",
+ destination_object="automl",
recursive=True,
)
@@ -255,9 +257,7 @@ with models.DAG(
(
# TEST SETUP
- create_bucket
- >> move_dataset_file
- >> create_dataset
+ [create_bucket >> move_dataset_file, create_dataset]
>> import_dataset
>> list_tables_spec
>> list_columns_spec
diff --git a/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py b/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py
index 3912916209..0a04b3b361 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py
@@ -31,47 +31,77 @@ from airflow.providers.google.cloud.operators.automl import (
AutoMLCreateDatasetOperator,
AutoMLDeleteDatasetOperator,
AutoMLDeleteModelOperator,
+ AutoMLDeployModelOperator,
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+ GCSSynchronizeBucketsOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_text_cls"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_TEXT_CLS_BUCKET = os.environ.get("GCP_AUTOML_TEXT_CLS_BUCKET", "gs://INVALID BUCKET NAME")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-# Example model
+MODEL_NAME = "text_clss_test_model"
MODEL = {
- "display_name": "auto_model_1",
+ "display_name": MODEL_NAME,
"text_classification_model_metadata": {},
}
-# Example dataset
+DATASET_NAME = f"ds_clss_{ENV_ID}".replace("-", "_")
DATASET = {
- "display_name": "test_text_cls_dataset",
+ "display_name": DATASET_NAME,
"text_classification_dataset_metadata": {"classification_type": "MULTICLASS"},
}
-
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_TEXT_CLS_BUCKET]}}
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/text_classification.csv"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Natural Language Text Classification
with models.DAG(
- "example_automl_text_cls",
+ DAG_ID,
+ schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["example", "automl", "text-classification"],
) as dag:
- create_dataset_task = AutoMLCreateDatasetOperator(
- task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=GCP_AUTOML_LOCATION,
)
- dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
- MODEL["dataset_id"] = dataset_id
+ move_dataset_file = GCSSynchronizeBucketsOperator(
+ task_id="move_dataset_to_bucket",
+ source_bucket=RESOURCE_DATA_BUCKET,
+ source_object="automl/datasets/text",
+ destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ destination_object="automl",
+ recursive=True,
+ )
+
+ create_dataset = AutoMLCreateDatasetOperator(
+ task_id="create_dataset",
+ dataset=DATASET,
+ location=GCP_AUTOML_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ )
- import_dataset_task = AutoMLImportDataOperator(
- task_id="import_dataset_task",
+ dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
+ MODEL["dataset_id"] = dataset_id
+ import_dataset = AutoMLImportDataOperator(
+ task_id="import_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
input_config=IMPORT_INPUT_CONFIG,
@@ -81,29 +111,43 @@ with models.DAG(
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
model_id = cast(str, XComArg(create_model, key="model_id"))
- delete_model_task = AutoMLDeleteModelOperator(
- task_id="delete_model_task",
+ deploy_model = AutoMLDeployModelOperator(
+ task_id="deploy_model",
model_id=model_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- delete_datasets_task = AutoMLDeleteDatasetOperator(
- task_id="delete_datasets_task",
+ delete_model = AutoMLDeleteModelOperator(
+ task_id="delete_model",
+ model_id=model_id,
+ location=GCP_AUTOML_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ )
+
+ delete_dataset = AutoMLDeleteDatasetOperator(
+ task_id="delete_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- # TEST BODY
- import_dataset_task >> create_model
- # TEST TEARDOWN
- delete_model_task >> delete_datasets_task
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+ )
- # Task dependencies created via `XComArgs`:
- # create_dataset_task >> import_dataset_task
- # create_dataset_task >> create_model
- # create_dataset_task >> delete_datasets_task
+ (
+ # TEST SETUP
+ [create_bucket >> move_dataset_file, create_dataset]
+ # TEST BODY
+ >> import_dataset
+ >> create_model
+ >> deploy_model
+ # TEST TEARDOWN
+ >> delete_model
+ >> delete_dataset
+ >> delete_bucket
+ )
from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py b/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py
index fd6a4fd6a7..260a7d84c6 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py
@@ -41,18 +41,18 @@ from airflow.providers.google.cloud.operators.gcs import (
)
from airflow.utils.trigger_rule import TriggerRule
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_automl_text"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_text_extr"
GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
GCP_AUTOML_LOCATION = "us-central1"
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-RESOURCE_DATA_BUCKET = "system-tests-resources"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
-DATASET_NAME = "test_entity_extr"
+DATASET_NAME = f"ds_extr_{ENV_ID}".replace("-", "_")
DATASET = {"display_name": DATASET_NAME, "text_extraction_dataset_metadata": {}}
-AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl-text/dataset.csv"
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/text_extraction.csv"
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
MODEL_NAME = "entity_extr_test_model"
@@ -70,7 +70,7 @@ with models.DAG(
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
- tags=["example", "automl"],
+ tags=["example", "automl", "text-extraction"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket",
@@ -82,23 +82,23 @@ with models.DAG(
move_dataset_file = GCSSynchronizeBucketsOperator(
task_id="move_data_to_bucket",
source_bucket=RESOURCE_DATA_BUCKET,
- source_object="automl-text",
+ source_object="automl/datasets/text",
destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
- destination_object="automl-text",
+ destination_object="automl",
recursive=True,
)
- create_dataset_task = AutoMLCreateDatasetOperator(
- task_id="create_dataset_task",
+ create_dataset = AutoMLCreateDatasetOperator(
+ task_id="create_dataset",
dataset=DATASET,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+ dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
MODEL["dataset_id"] = dataset_id
- import_dataset_task = AutoMLImportDataOperator(
- task_id="import_dataset_task",
+ import_dataset = AutoMLImportDataOperator(
+ task_id="import_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
input_config=IMPORT_INPUT_CONFIG,
@@ -137,11 +137,9 @@ with models.DAG(
(
# TEST SETUP
- create_bucket
- >> move_dataset_file
- >> create_dataset_task
- >> import_dataset_task
+ [create_bucket >> move_dataset_file, create_dataset]
# TEST BODY
+ >> import_dataset
>> create_model
# TEST TEARDOWN
>> delete_model_task
diff --git a/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py b/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py
index 6bbe0d2bfb..3559339755 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py
@@ -34,44 +34,71 @@ from airflow.providers.google.cloud.operators.automl import (
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+ GCSSynchronizeBucketsOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_SENTIMENT_BUCKET = os.environ.get("GCP_AUTOML_SENTIMENT_BUCKET", "gs://INVALID BUCKET NAME")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_text_sent"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-# Example model
+MODEL_NAME = "text_sent_test_model"
MODEL = {
- "display_name": "auto_model_1",
+ "display_name": MODEL_NAME,
"text_sentiment_model_metadata": {},
}
-# Example dataset
+DATASET_NAME = f"ds_sent_{ENV_ID}".replace("-", "_")
DATASET = {
- "display_name": "test_text_sentiment_dataset",
- "text_sentiment_dataset_metadata": {"sentiment_max": 10},
+ "display_name": DATASET_NAME,
+ "text_sentiment_dataset_metadata": {"sentiment_max": 5},
}
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_SENTIMENT_BUCKET]}}
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/text_sentiment.csv"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Natural Language Text Sentiment
with models.DAG(
- "example_automl_text_sentiment",
+ DAG_ID,
+ schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
- tags=["example"],
+ tags=["example", "automl", "text-sentiment"],
) as dag:
- create_dataset_task = AutoMLCreateDatasetOperator(
- task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=GCP_AUTOML_LOCATION,
)
- dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+ move_dataset_file = GCSSynchronizeBucketsOperator(
+ task_id="move_dataset_to_bucket",
+ source_bucket=RESOURCE_DATA_BUCKET,
+ source_object="automl/datasets/text",
+ destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ destination_object="automl",
+ recursive=True,
+ )
+
+ create_dataset = AutoMLCreateDatasetOperator(
+ task_id="create_dataset", dataset=DATASET, location=GCP_AUTOML_LOCATION
+ )
+
+ dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
MODEL["dataset_id"] = dataset_id
- import_dataset_task = AutoMLImportDataOperator(
- task_id="import_dataset_task",
+ import_dataset = AutoMLImportDataOperator(
+ task_id="import_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
input_config=IMPORT_INPUT_CONFIG,
@@ -80,33 +107,39 @@ with models.DAG(
MODEL["dataset_id"] = dataset_id
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
-
model_id = cast(str, XComArg(create_model, key="model_id"))
- delete_model_task = AutoMLDeleteModelOperator(
- task_id="delete_model_task",
+ delete_model = AutoMLDeleteModelOperator(
+ task_id="delete_model",
model_id=model_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- delete_datasets_task = AutoMLDeleteDatasetOperator(
- task_id="delete_datasets_task",
+ delete_dataset = AutoMLDeleteDatasetOperator(
+ task_id="delete_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- # TEST BODY
- import_dataset_task >> create_model
- # TEST TEARDOWN
- delete_model_task >> delete_datasets_task
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
- # Task dependencies created via `XComArgs`:
- # create_dataset_task >> import_dataset_task
- # create_dataset_task >> create_model
- # create_model >> delete_model_task
- # create_dataset_task >> delete_datasets_task
+ (
+ # TEST SETUP
+ [create_bucket >> move_dataset_file, create_dataset]
+ # TEST BODY
+ >> import_dataset
+ >> create_model
+ # TEST TEARDOWN
+ >> delete_model
+ >> delete_dataset
+ >> delete_bucket
+ )
from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/cloud/automl/example_automl_translation.py b/tests/system/providers/google/cloud/automl/example_automl_translation.py
index 87bf7166dc..e03453f03a 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_translation.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_translation.py
@@ -24,7 +24,10 @@ import os
from datetime import datetime
from typing import cast
+from google.cloud import storage
+
from airflow import models
+from airflow.decorators import task
from airflow.models.xcom_arg import XComArg
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
from airflow.providers.google.cloud.operators.automl import (
@@ -34,51 +37,92 @@ from airflow.providers.google.cloud.operators.automl import (
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_TRANSLATION_BUCKET = os.environ.get(
- "GCP_AUTOML_TRANSLATION_BUCKET", "gs://INVALID BUCKET NAME/file"
-)
+DAG_ID = "example_automl_translate"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-# Example model
+
+MODEL_NAME = "translate_test_model"
MODEL = {
- "display_name": "auto_model_1",
+ "display_name": MODEL_NAME,
"translation_model_metadata": {},
}
-# Example dataset
+DATASET_NAME = f"ds_translate_{ENV_ID}".replace("-", "_")
DATASET = {
- "display_name": "test_translation_dataset",
+ "display_name": DATASET_NAME,
"translation_dataset_metadata": {
"source_language_code": "en",
"target_language_code": "es",
},
}
-
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_TRANSLATION_BUCKET]}}
+CSV_FILE_NAME = "en-es.csv"
+TSV_FILE_NAME = "en-es.tsv"
+GCS_FILE_PATH = f"automl/datasets/translate/{CSV_FILE_NAME}"
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/{CSV_FILE_NAME}"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Translation
with models.DAG(
- "example_automl_translation",
- start_date=datetime(2021, 1, 1),
+ DAG_ID,
schedule="@once",
+ start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
- tags=["example"],
+ tags=["example", "automl", "translate"],
) as dag:
- create_dataset_task = AutoMLCreateDatasetOperator(
- task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=GCP_AUTOML_LOCATION,
)
- dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+ @task
+ def upload_csv_file_to_gcs():
+ # download file into memory
+ storage_client = storage.Client()
+ bucket = storage_client.bucket(RESOURCE_DATA_BUCKET)
+ blob = bucket.blob(GCS_FILE_PATH)
+ contents = blob.download_as_string().decode()
+
+ # update memory content
+ updated_contents = contents.replace("template-bucket", DATA_SAMPLE_GCS_BUCKET_NAME)
+
+ # upload updated content to bucket
+ destination_bucket = storage_client.bucket(DATA_SAMPLE_GCS_BUCKET_NAME)
+ destination_blob = destination_bucket.blob(f"automl/{CSV_FILE_NAME}")
+ destination_blob.upload_from_string(updated_contents)
+
+ upload_csv_file_to_gcs_task = upload_csv_file_to_gcs()
+
+ copy_dataset_file = GCSToGCSOperator(
+ task_id="copy_dataset_file",
+ source_bucket=RESOURCE_DATA_BUCKET,
+ source_object=f"automl/datasets/translate/{TSV_FILE_NAME}",
+ destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ destination_object=f"automl/{TSV_FILE_NAME}",
+ )
+
+ create_dataset = AutoMLCreateDatasetOperator(
+ task_id="create_dataset", dataset=DATASET, location=GCP_AUTOML_LOCATION
+ )
+
+ dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
- import_dataset_task = AutoMLImportDataOperator(
- task_id="import_dataset_task",
+ import_dataset = AutoMLImportDataOperator(
+ task_id="import_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
input_config=IMPORT_INPUT_CONFIG,
@@ -87,33 +131,40 @@ with models.DAG(
MODEL["dataset_id"] = dataset_id
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
-
model_id = cast(str, XComArg(create_model, key="model_id"))
- delete_model_task = AutoMLDeleteModelOperator(
- task_id="delete_model_task",
+ delete_model = AutoMLDeleteModelOperator(
+ task_id="delete_model",
model_id=model_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- delete_datasets_task = AutoMLDeleteDatasetOperator(
- task_id="delete_datasets_task",
+ delete_dataset = AutoMLDeleteDatasetOperator(
+ task_id="delete_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- # TEST BODY
- import_dataset_task >> create_model
- # TEST TEARDOWN
- delete_model_task >> delete_datasets_task
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
- # Task dependencies created via `XComArgs`:
- # create_dataset_task >> import_dataset_task
- # create_dataset_task >> create_model
- # create_model >> delete_model_task
- # create_dataset_task >> delete_datasets_task
+ (
+ # TEST SETUP
+ [create_bucket >> upload_csv_file_to_gcs_task >> copy_dataset_file]
+ # TEST BODY
+ >> create_dataset
+ >> import_dataset
+ >> create_model
+ # TEST TEARDOWN
+ >> delete_dataset
+ >> delete_model
+ >> delete_bucket
+ )
from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py b/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py
index 63447a1230..7993494fff 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py
@@ -34,47 +34,73 @@ from airflow.providers.google.cloud.operators.automl import (
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
-
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_VIDEO_BUCKET = os.environ.get(
- "GCP_AUTOML_VIDEO_BUCKET", "gs://INVALID BUCKET NAME/hmdb_split1.csv"
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+ GCSSynchronizeBucketsOperator,
)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_video_clss"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-# Example model
+MODEL_NAME = "video_clss_test_model"
MODEL = {
- "display_name": "auto_model_1",
+ "display_name": MODEL_NAME,
"video_classification_model_metadata": {},
}
-# Example dataset
+DATASET_NAME = f"ds_video_clss_{ENV_ID}".replace("-", "_")
DATASET = {
- "display_name": "test_video_dataset",
+ "display_name": DATASET_NAME,
"video_classification_dataset_metadata": {},
}
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_VIDEO_BUCKET]}}
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/video_classification.csv"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
+
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Video Intelligence Classification
with models.DAG(
- "example_automl_video",
+ DAG_ID,
+ schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
- tags=["example"],
+ tags=["example", "automl", "video-clss"],
) as dag:
- create_dataset_task = AutoMLCreateDatasetOperator(
- task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=GCP_AUTOML_LOCATION,
+ )
+
+ move_dataset_file = GCSSynchronizeBucketsOperator(
+ task_id="move_dataset_to_bucket",
+ source_bucket=RESOURCE_DATA_BUCKET,
+ source_object="automl/datasets/video",
+ destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ destination_object="automl",
+ recursive=True,
)
- dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+ create_dataset = AutoMLCreateDatasetOperator(
+ task_id="create_dataset", dataset=DATASET, location=GCP_AUTOML_LOCATION
+ )
+
+ dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
MODEL["dataset_id"] = dataset_id
- import_dataset_task = AutoMLImportDataOperator(
- task_id="import_dataset_task",
+ import_dataset = AutoMLImportDataOperator(
+ task_id="import_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
input_config=IMPORT_INPUT_CONFIG,
@@ -83,33 +109,39 @@ with models.DAG(
MODEL["dataset_id"] = dataset_id
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
-
model_id = cast(str, XComArg(create_model, key="model_id"))
- delete_model_task = AutoMLDeleteModelOperator(
- task_id="delete_model_task",
+ delete_model = AutoMLDeleteModelOperator(
+ task_id="delete_model",
model_id=model_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- delete_datasets_task = AutoMLDeleteDatasetOperator(
- task_id="delete_datasets_task",
+ delete_dataset = AutoMLDeleteDatasetOperator(
+ task_id="delete_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- # TEST BODY
- import_dataset_task >> create_model
- # TEST TEARDOWN
- delete_model_task >> delete_datasets_task
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
- # Task dependencies created via `XComArgs`:
- # create_dataset_task >> import_dataset_task
- # create_dataset_task >> create_model
- # create_model >> delete_model_task
- # create_dataset_task >> delete_datasets_task
+ (
+ # TEST SETUP
+ [create_bucket >> move_dataset_file, create_dataset]
+ # TEST BODY
+ >> import_dataset
+ >> create_model
+ # TEST TEARDOWN
+ >> delete_model
+ >> delete_dataset
+ >> delete_bucket
+ )
from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py b/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py
index 5ac8fa7457..61802077b3 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py
@@ -34,49 +34,73 @@ from airflow.providers.google.cloud.operators.automl import (
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
-
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_TRACKING_BUCKET = os.environ.get(
- "GCP_AUTOML_TRACKING_BUCKET",
- "gs://INVALID BUCKET NAME/youtube_8m_videos_animal_tiny.csv",
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+ GCSSynchronizeBucketsOperator,
)
+from airflow.utils.trigger_rule import TriggerRule
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_video_track"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-# Example model
+MODEL_NAME = "video_track_test_model"
MODEL = {
- "display_name": "auto_model_1",
+ "display_name": MODEL_NAME,
"video_object_tracking_model_metadata": {},
}
-# Example dataset
+DATASET_NAME = f"ds_video_track_{ENV_ID}".replace("-", "_")
DATASET = {
- "display_name": "test_video_tracking_dataset",
+ "display_name": DATASET_NAME,
"video_object_tracking_dataset_metadata": {},
}
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_TRACKING_BUCKET]}}
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/video_tracking.csv"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
+
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Video Intelligence Object Tracking
with models.DAG(
- "example_automl_video_tracking",
+ DAG_ID,
+ schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
- tags=["example"],
+ tags=["example", "automl", "video-tracking"],
) as dag:
- create_dataset_task = AutoMLCreateDatasetOperator(
- task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=GCP_AUTOML_LOCATION,
+ )
+
+ move_dataset_file = GCSSynchronizeBucketsOperator(
+ task_id="move_dataset_to_bucket",
+ source_bucket=RESOURCE_DATA_BUCKET,
+ source_object="automl/datasets/video",
+ destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ destination_object="automl",
+ recursive=True,
)
- dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+ create_dataset = AutoMLCreateDatasetOperator(
+ task_id="create_dataset", dataset=DATASET, location=GCP_AUTOML_LOCATION
+ )
+
+ dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
MODEL["dataset_id"] = dataset_id
- import_dataset_task = AutoMLImportDataOperator(
- task_id="import_dataset_task",
+ import_dataset = AutoMLImportDataOperator(
+ task_id="import_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
input_config=IMPORT_INPUT_CONFIG,
@@ -85,33 +109,39 @@ with models.DAG(
MODEL["dataset_id"] = dataset_id
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
-
model_id = cast(str, XComArg(create_model, key="model_id"))
- delete_model_task = AutoMLDeleteModelOperator(
- task_id="delete_model_task",
+ delete_model = AutoMLDeleteModelOperator(
+ task_id="delete_model",
model_id=model_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- delete_datasets_task = AutoMLDeleteDatasetOperator(
- task_id="delete_datasets_task",
+ delete_dataset = AutoMLDeleteDatasetOperator(
+ task_id="delete_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- # TEST BODY
- import_dataset_task >> create_model
- # TEST TEARDOWN
- delete_model_task >> delete_datasets_task
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
- # Task dependencies created via `XComArgs`:
- # create_dataset_task >> import_dataset_task
- # create_dataset_task >> create_model
- # create_model >> delete_model_task
- # create_dataset_task >> delete_datasets_task
+ (
+ # TEST SETUP
+ [create_bucket >> move_dataset_file, create_dataset]
+ # TEST BODY
+ >> import_dataset
+ >> create_model
+ # TEST TEARDOWN
+ >> delete_model
+ >> delete_dataset
+ >> delete_bucket
+ )
from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py b/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py
index 5d6ea7b932..a85078779b 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py
@@ -24,7 +24,10 @@ import os
from datetime import datetime
from typing import cast
+from google.cloud import storage
+
from airflow import models
+from airflow.decorators import task
from airflow.models.xcom_arg import XComArg
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
from airflow.providers.google.cloud.operators.automl import (
@@ -37,33 +40,36 @@ from airflow.providers.google.cloud.operators.automl import (
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
)
+from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.trigger_rule import TriggerRule
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_automl_vision"
+DAG_ID = "example_automl_vision_clss"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-
GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-RESOURCE_DATA_BUCKET = "system-tests-resources"
-DATASET_NAME = "test_dataset_vision"
+DATASET_NAME = f"ds_vision_clss_{ENV_ID}".replace("-", "_")
DATASET = {
"display_name": DATASET_NAME,
"image_classification_dataset_metadata": {"classification_type": "MULTILABEL"},
}
-AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl-vision/data.csv"
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/vision_classification.csv"
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
-MODEL_NAME = "test_model"
+MODEL_NAME = "vision_clss_test_model"
MODEL = {
"display_name": MODEL_NAME,
"image_classification_model_metadata": {"train_budget": 1},
}
+CSV_FILE_NAME = "vision_classification.csv"
+GCS_FILE_PATH = f"automl/datasets/vision/{CSV_FILE_NAME}"
+DESTINATION_FILE_PATH = f"/tmp/{CSV_FILE_NAME}"
+
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Vision Classification
@@ -73,7 +79,7 @@ with models.DAG(
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
- tags=["example", "automl"],
+ tags=["example", "automl", "vision-clss"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket",
@@ -82,14 +88,43 @@ with models.DAG(
location=GCP_AUTOML_LOCATION,
)
- move_dataset_file = GCSSynchronizeBucketsOperator(
- task_id="move_data_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="automl-vision",
- destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
- destination_object="automl-vision",
- recursive=True,
- )
+ @task
+ def upload_csv_file_to_gcs():
+ # download file to local storage
+ storage_client = storage.Client()
+ bucket = storage_client.bucket(RESOURCE_DATA_BUCKET)
+ blob = bucket.blob(GCS_FILE_PATH)
+ blob.download_to_filename(DESTINATION_FILE_PATH)
+
+ # update file content
+ with open(DESTINATION_FILE_PATH) as file:
+ lines = file.readlines()
+
+ updated_lines = [line.replace("template-bucket", DATA_SAMPLE_GCS_BUCKET_NAME) for line in lines]
+
+ with open(DESTINATION_FILE_PATH, "w") as file:
+ file.writelines(updated_lines)
+
+ # upload updated file to bucket storage
+ destination_bucket = storage_client.bucket(DATA_SAMPLE_GCS_BUCKET_NAME)
+ destination_blob = destination_bucket.blob(f"automl/{CSV_FILE_NAME}")
+ generation_match_precondition = 0
+ destination_blob.upload_from_filename(
+ DESTINATION_FILE_PATH, if_generation_match=generation_match_precondition
+ )
+
+ upload_csv_file_to_gcs_task = upload_csv_file_to_gcs()
+
+ copy_folder_tasks = [
+ GCSToGCSOperator(
+ task_id=f"copy_dataset_folder_{folder}",
+ source_bucket=RESOURCE_DATA_BUCKET,
+ source_object=f"automl/datasets/vision/{folder}",
+ destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ destination_object=f"automl/{folder}",
+ )
+ for folder in ("cirrus", "cumulonimbus", "cumulus")
+ ]
create_dataset_task = AutoMLCreateDatasetOperator(
task_id="create_dataset_task",
@@ -110,16 +145,16 @@ with models.DAG(
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
model_id = cast(str, XComArg(create_model, key="model_id"))
- delete_model_task = AutoMLDeleteModelOperator(
- task_id="delete_model_task",
+ delete_model = AutoMLDeleteModelOperator(
+ task_id="delete_model",
model_id=model_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_datasets_task = AutoMLDeleteDatasetOperator(
- task_id="delete_datasets_task",
+ delete_dataset = AutoMLDeleteDatasetOperator(
+ task_id="delete_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
@@ -135,14 +170,15 @@ with models.DAG(
(
# TEST SETUP
create_bucket
- >> move_dataset_file
+ >> upload_csv_file_to_gcs_task
+ >> copy_folder_tasks
+ # TEST BODY
>> create_dataset_task
>> import_dataset_task
- # TEST BODY
>> create_model
# TEST TEARDOWN
- >> delete_model_task
- >> delete_datasets_task
+ >> delete_model
+ >> delete_dataset
>> delete_bucket
)
diff --git a/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py b/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py
index 933c2760cb..894b467dd3 100644
--- a/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py
+++ b/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py
@@ -31,50 +31,75 @@ from airflow.providers.google.cloud.operators.automl import (
AutoMLCreateDatasetOperator,
AutoMLDeleteDatasetOperator,
AutoMLDeleteModelOperator,
+ AutoMLDeployModelOperator,
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
-
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
-GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
-GCP_AUTOML_DETECTION_BUCKET = os.environ.get(
- "GCP_AUTOML_DETECTION_BUCKET", "gs://INVALID BUCKET NAME/img/openimage/csv/salads_ml_use.csv"
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+ GCSSynchronizeBucketsOperator,
)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "example_automl_vision_obj_detect"
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+GCP_AUTOML_LOCATION = "us-central1"
+DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-# Example model
+MODEL_NAME = "vision_detect_test_model"
MODEL = {
- "display_name": "auto_model",
+ "display_name": MODEL_NAME,
"image_object_detection_model_metadata": {},
}
-# Example dataset
+DATASET_NAME = f"ds_vision_detect_{ENV_ID}".replace("-", "_")
DATASET = {
- "display_name": "test_detection_dataset",
+ "display_name": DATASET_NAME,
"image_object_detection_dataset_metadata": {},
}
-IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_DETECTION_BUCKET]}}
-
+AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/object_detection.csv"
+IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Vision Object Detection
with models.DAG(
- "example_automl_vision_detection",
+ DAG_ID,
+ schedule="@once", # Override to match your needs
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
- tags=["example"],
+ tags=["example", "automl", "object-detection"],
) as dag:
- create_dataset_task = AutoMLCreateDatasetOperator(
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=GCP_AUTOML_LOCATION,
+ )
+
+ move_dataset_file = GCSSynchronizeBucketsOperator(
+ task_id="move_data_to_bucket",
+ source_bucket=RESOURCE_DATA_BUCKET,
+ source_object="automl/datasets/vision",
+ destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
+ destination_object="automl",
+ recursive=True,
+ )
+
+ create_dataset = AutoMLCreateDatasetOperator(
task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
)
- dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id"))
+ dataset_id = cast(str, XComArg(create_dataset, key="dataset_id"))
MODEL["dataset_id"] = dataset_id
- import_dataset_task = AutoMLImportDataOperator(
- task_id="import_dataset_task",
+ import_dataset = AutoMLImportDataOperator(
+ task_id="import_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
input_config=IMPORT_INPUT_CONFIG,
@@ -83,33 +108,47 @@ with models.DAG(
MODEL["dataset_id"] = dataset_id
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
-
model_id = cast(str, XComArg(create_model, key="model_id"))
- delete_model_task = AutoMLDeleteModelOperator(
- task_id="delete_model_task",
+ deploy_model = AutoMLDeployModelOperator(
+ task_id="deploy_model",
+ model_id=model_id,
+ location=GCP_AUTOML_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ )
+
+ delete_model = AutoMLDeleteModelOperator(
+ task_id="delete_model",
model_id=model_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- delete_datasets_task = AutoMLDeleteDatasetOperator(
- task_id="delete_datasets_task",
+ delete_dataset = AutoMLDeleteDatasetOperator(
+ task_id="delete_dataset",
dataset_id=dataset_id,
location=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
- # TEST BODY
- import_dataset_task >> create_model
- # TEST TEARDOWN
- delete_model_task >> delete_datasets_task
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
- # Task dependencies created via `XComArgs`:
- # create_dataset_task >> import_dataset_task
- # create_dataset_task >> create_model
- # create_model >> delete_model_task
- # create_dataset_task >> delete_datasets_task
+ (
+ # TEST SETUP
+ [create_bucket >> move_dataset_file, create_dataset]
+ # TEST BODY
+ >> import_dataset
+ >> create_model
+ >> deploy_model
+ # TEST TEARDOWN
+ >> delete_model
+ >> delete_dataset
+ >> delete_bucket
+ )
from tests.system.utils.watcher import watcher