You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/06 16:28:26 UTC

[GitHub] [airflow] MaksYermak opened a new pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

MaksYermak opened a new pull request #20077:
URL: https://github.com/apache/airflow/pull/20077


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   Create operators for working with Custom Job and Datasets for Vertex AI service. Includes operators, hooks, example dags, tests and docs.
   
   Co-authored-by: Wojciech Januszek januszek@google.com
   Co-authored-by: Lukasz Wyszomirski wyszomirski@google.com
   Co-authored-by: Maksim Yermakou maksimy@google.com
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak edited a comment on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak edited a comment on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1017683960


   > @ashb Should the `:type:` directives in the docstrings here be removed preemptively because of #20951?
   
   @josh-fell I have deleted :type in last commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1017615769


   Yes please!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#discussion_r786061371



##########
File path: airflow/providers/google/cloud/operators/vertex_ai/custom_job.py
##########
@@ -0,0 +1,594 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Vertex AI operators."""
+
+from typing import Dict, List, Optional, Sequence, Tuple, Union
+
+from google.api_core.exceptions import NotFound
+from google.api_core.retry import Retry
+from google.cloud.aiplatform.models import Model
+from google.cloud.aiplatform_v1.types.dataset import Dataset
+from google.cloud.aiplatform_v1.types.training_pipeline import TrainingPipeline
+
+from airflow.models import BaseOperator, BaseOperatorLink
+from airflow.models.taskinstance import TaskInstance
+from airflow.providers.google.cloud.hooks.vertex_ai.custom_job import CustomJobHook
+
+VERTEX_AI_BASE_LINK = "https://console.cloud.google.com/vertex-ai"
+VERTEX_AI_MODEL_LINK = (
+    VERTEX_AI_BASE_LINK + "/locations/{region}/models/{model_id}/deploy?project={project_id}"
+)
+VERTEX_AI_TRAINING_PIPELINES_LINK = VERTEX_AI_BASE_LINK + "/training/training-pipelines?project={project_id}"
+
+
+class VertexAIModelLink(BaseOperatorLink):
+    """Helper class for constructing Vertex AI Model link"""
+
+    name = "Vertex AI Model"
+
+    def get_link(self, operator, dttm):
+        ti = TaskInstance(task=operator, execution_date=dttm)
+        model_conf = ti.xcom_pull(task_ids=operator.task_id, key="model_conf")
+        return (
+            VERTEX_AI_MODEL_LINK.format(
+                region=model_conf["region"],
+                model_id=model_conf["model_id"],
+                project_id=model_conf["project_id"],
+            )
+            if model_conf
+            else ""
+        )
+
+
+class VertexAITrainingPipelinesLink(BaseOperatorLink):
+    """Helper class for constructing Vertex AI Training Pipelines link"""
+
+    name = "Vertex AI Training Pipelines"
+
+    def get_link(self, operator, dttm):
+        ti = TaskInstance(task=operator, execution_date=dttm)
+        project_id = ti.xcom_pull(task_ids=operator.task_id, key="project_id")
+        return (
+            VERTEX_AI_TRAINING_PIPELINES_LINK.format(
+                project_id=project_id,
+            )
+            if project_id
+            else ""
+        )
+
+
+class CustomTrainingJobBaseOperator(BaseOperator):
+    """The base class for operators that launch Custom jobs on VertexAI."""
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        display_name: str,
+        container_uri: str,
+        model_serving_container_image_uri: Optional[str] = None,
+        model_serving_container_predict_route: Optional[str] = None,
+        model_serving_container_health_route: Optional[str] = None,
+        model_serving_container_command: Optional[Sequence[str]] = None,
+        model_serving_container_args: Optional[Sequence[str]] = None,
+        model_serving_container_environment_variables: Optional[Dict[str, str]] = None,
+        model_serving_container_ports: Optional[Sequence[int]] = None,
+        model_description: Optional[str] = None,
+        model_instance_schema_uri: Optional[str] = None,
+        model_parameters_schema_uri: Optional[str] = None,
+        model_prediction_schema_uri: Optional[str] = None,
+        labels: Optional[Dict[str, str]] = None,
+        training_encryption_spec_key_name: Optional[str] = None,
+        model_encryption_spec_key_name: Optional[str] = None,
+        staging_bucket: Optional[str] = None,
+        # RUN
+        dataset_id: Optional[str] = None,
+        annotation_schema_uri: Optional[str] = None,
+        model_display_name: Optional[str] = None,
+        model_labels: Optional[Dict[str, str]] = None,
+        base_output_dir: Optional[str] = None,
+        service_account: Optional[str] = None,
+        network: Optional[str] = None,
+        bigquery_destination: Optional[str] = None,
+        args: Optional[List[Union[str, float, int]]] = None,
+        environment_variables: Optional[Dict[str, str]] = None,
+        replica_count: int = 1,
+        machine_type: str = "n1-standard-4",
+        accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
+        accelerator_count: int = 0,
+        boot_disk_type: str = "pd-ssd",
+        boot_disk_size_gb: int = 100,
+        training_fraction_split: Optional[float] = None,
+        validation_fraction_split: Optional[float] = None,
+        test_fraction_split: Optional[float] = None,
+        training_filter_split: Optional[str] = None,
+        validation_filter_split: Optional[str] = None,
+        test_filter_split: Optional[str] = None,
+        predefined_split_column_name: Optional[str] = None,
+        timestamp_split_column_name: Optional[str] = None,
+        tensorboard: Optional[str] = None,
+        sync=True,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.display_name = display_name
+        # START Custom
+        self.container_uri = container_uri
+        self.model_serving_container_image_uri = model_serving_container_image_uri
+        self.model_serving_container_predict_route = model_serving_container_predict_route
+        self.model_serving_container_health_route = model_serving_container_health_route
+        self.model_serving_container_command = model_serving_container_command
+        self.model_serving_container_args = model_serving_container_args
+        self.model_serving_container_environment_variables = model_serving_container_environment_variables
+        self.model_serving_container_ports = model_serving_container_ports
+        self.model_description = model_description
+        self.model_instance_schema_uri = model_instance_schema_uri
+        self.model_parameters_schema_uri = model_parameters_schema_uri
+        self.model_prediction_schema_uri = model_prediction_schema_uri
+        self.labels = labels
+        self.training_encryption_spec_key_name = training_encryption_spec_key_name
+        self.model_encryption_spec_key_name = model_encryption_spec_key_name
+        self.staging_bucket = staging_bucket
+        # END Custom
+        # START Run param
+        self.dataset = Dataset(name=dataset_id) if dataset_id else None
+        self.annotation_schema_uri = annotation_schema_uri
+        self.model_display_name = model_display_name
+        self.model_labels = model_labels
+        self.base_output_dir = base_output_dir
+        self.service_account = service_account
+        self.network = network
+        self.bigquery_destination = bigquery_destination
+        self.args = args
+        self.environment_variables = environment_variables
+        self.replica_count = replica_count
+        self.machine_type = machine_type
+        self.accelerator_type = accelerator_type
+        self.accelerator_count = accelerator_count
+        self.boot_disk_type = boot_disk_type
+        self.boot_disk_size_gb = boot_disk_size_gb
+        self.training_fraction_split = training_fraction_split
+        self.validation_fraction_split = validation_fraction_split
+        self.test_fraction_split = test_fraction_split
+        self.training_filter_split = training_filter_split
+        self.validation_filter_split = validation_filter_split
+        self.test_filter_split = test_filter_split
+        self.predefined_split_column_name = predefined_split_column_name
+        self.timestamp_split_column_name = timestamp_split_column_name
+        self.tensorboard = tensorboard
+        self.sync = sync
+        # END Run param
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        self.hook: Optional[CustomJobHook] = None
+
+    def execute(self, context):
+        self.hook = CustomJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def on_kill(self) -> None:
+        """
+        Callback called when the operator is killed.
+        Cancel any running job.
+        """
+        if self.hook:
+            self.hook.cancel_job()
+
+
+class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
+    """Create Custom Container Training job"""
+
+    template_fields = [
+        'region',
+        'command',
+        'impersonation_chain',
+    ]
+    operator_extra_links = (VertexAIModelLink(),)
+
+    def __init__(
+        self,
+        *,
+        command: Sequence[str] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.command = command
+
+    def execute(self, context):
+        super().execute(context)
+        model = self.hook.create_custom_container_training_job(
+            project_id=self.project_id,
+            region=self.region,
+            display_name=self.display_name,
+            container_uri=self.container_uri,
+            command=self.command,
+            model_serving_container_image_uri=self.model_serving_container_image_uri,
+            model_serving_container_predict_route=self.model_serving_container_predict_route,
+            model_serving_container_health_route=self.model_serving_container_health_route,
+            model_serving_container_command=self.model_serving_container_command,
+            model_serving_container_args=self.model_serving_container_args,
+            model_serving_container_environment_variables=self.model_serving_container_environment_variables,
+            model_serving_container_ports=self.model_serving_container_ports,
+            model_description=self.model_description,
+            model_instance_schema_uri=self.model_instance_schema_uri,
+            model_parameters_schema_uri=self.model_parameters_schema_uri,
+            model_prediction_schema_uri=self.model_prediction_schema_uri,
+            labels=self.labels,
+            training_encryption_spec_key_name=self.training_encryption_spec_key_name,
+            model_encryption_spec_key_name=self.model_encryption_spec_key_name,
+            staging_bucket=self.staging_bucket,
+            # RUN
+            dataset=self.dataset,
+            annotation_schema_uri=self.annotation_schema_uri,
+            model_display_name=self.model_display_name,
+            model_labels=self.model_labels,
+            base_output_dir=self.base_output_dir,
+            service_account=self.service_account,
+            network=self.network,
+            bigquery_destination=self.bigquery_destination,
+            args=self.args,
+            environment_variables=self.environment_variables,
+            replica_count=self.replica_count,
+            machine_type=self.machine_type,
+            accelerator_type=self.accelerator_type,
+            accelerator_count=self.accelerator_count,
+            boot_disk_type=self.boot_disk_type,
+            boot_disk_size_gb=self.boot_disk_size_gb,
+            training_fraction_split=self.training_fraction_split,
+            validation_fraction_split=self.validation_fraction_split,
+            test_fraction_split=self.test_fraction_split,
+            training_filter_split=self.training_filter_split,
+            validation_filter_split=self.validation_filter_split,
+            test_filter_split=self.test_filter_split,
+            predefined_split_column_name=self.predefined_split_column_name,
+            timestamp_split_column_name=self.timestamp_split_column_name,
+            tensorboard=self.tensorboard,
+            sync=True,
+        )
+
+        result = Model.to_dict(model)
+        model_id = self.hook.extract_model_id(result)
+        self.xcom_push(
+            context,
+            key="model_conf",
+            value={
+                "model_id": model_id,
+                "region": self.region,
+                "project_id": self.project_id,
+            },
+        )
+        return result
+
+
+class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator):
+    """Create Custom Python Package Training job"""
+
+    template_fields = [
+        'region',
+        'impersonation_chain',
+    ]
+    operator_extra_links = (VertexAIModelLink(),)
+
+    def __init__(
+        self,
+        *,
+        python_package_gcs_uri: str,
+        python_module_name: str,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.python_package_gcs_uri = python_package_gcs_uri
+        self.python_module_name = python_module_name
+
+    def execute(self, context):
+        super().execute(context)
+        model = self.hook.create_custom_python_package_training_job(
+            project_id=self.project_id,
+            region=self.region,
+            display_name=self.display_name,
+            python_package_gcs_uri=self.python_package_gcs_uri,
+            python_module_name=self.python_module_name,
+            container_uri=self.container_uri,
+            model_serving_container_image_uri=self.model_serving_container_image_uri,
+            model_serving_container_predict_route=self.model_serving_container_predict_route,
+            model_serving_container_health_route=self.model_serving_container_health_route,
+            model_serving_container_command=self.model_serving_container_command,
+            model_serving_container_args=self.model_serving_container_args,
+            model_serving_container_environment_variables=self.model_serving_container_environment_variables,
+            model_serving_container_ports=self.model_serving_container_ports,
+            model_description=self.model_description,
+            model_instance_schema_uri=self.model_instance_schema_uri,
+            model_parameters_schema_uri=self.model_parameters_schema_uri,
+            model_prediction_schema_uri=self.model_prediction_schema_uri,
+            labels=self.labels,
+            training_encryption_spec_key_name=self.training_encryption_spec_key_name,
+            model_encryption_spec_key_name=self.model_encryption_spec_key_name,
+            staging_bucket=self.staging_bucket,
+            # RUN
+            dataset=self.dataset,
+            annotation_schema_uri=self.annotation_schema_uri,
+            model_display_name=self.model_display_name,
+            model_labels=self.model_labels,
+            base_output_dir=self.base_output_dir,
+            service_account=self.service_account,
+            network=self.network,
+            bigquery_destination=self.bigquery_destination,
+            args=self.args,
+            environment_variables=self.environment_variables,
+            replica_count=self.replica_count,
+            machine_type=self.machine_type,
+            accelerator_type=self.accelerator_type,
+            accelerator_count=self.accelerator_count,
+            boot_disk_type=self.boot_disk_type,
+            boot_disk_size_gb=self.boot_disk_size_gb,
+            training_fraction_split=self.training_fraction_split,
+            validation_fraction_split=self.validation_fraction_split,
+            test_fraction_split=self.test_fraction_split,
+            training_filter_split=self.training_filter_split,
+            validation_filter_split=self.validation_filter_split,
+            test_filter_split=self.test_filter_split,
+            predefined_split_column_name=self.predefined_split_column_name,
+            timestamp_split_column_name=self.timestamp_split_column_name,
+            tensorboard=self.tensorboard,
+            sync=True,
+        )
+
+        result = Model.to_dict(model)
+        model_id = self.hook.extract_model_id(result)
+        self.xcom_push(
+            context,
+            key="model_conf",
+            value={
+                "model_id": model_id,
+                "region": self.region,
+                "project_id": self.project_id,
+            },
+        )
+        return result
+
+
+class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
+    """Create Custom Training job"""
+
+    template_fields = [
+        'region',
+        'script_path',
+        'requirements',
+        'impersonation_chain',
+    ]
+    operator_extra_links = (VertexAIModelLink(),)
+
+    def __init__(
+        self,
+        *,
+        script_path: str,
+        requirements: Optional[Sequence[str]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.requirements = requirements
+        self.script_path = script_path
+
+    def execute(self, context):
+        super().execute(context)
+        model = self.hook.create_custom_training_job(
+            project_id=self.project_id,
+            region=self.region,
+            display_name=self.display_name,
+            script_path=self.script_path,
+            container_uri=self.container_uri,
+            requirements=self.requirements,
+            model_serving_container_image_uri=self.model_serving_container_image_uri,
+            model_serving_container_predict_route=self.model_serving_container_predict_route,
+            model_serving_container_health_route=self.model_serving_container_health_route,
+            model_serving_container_command=self.model_serving_container_command,
+            model_serving_container_args=self.model_serving_container_args,
+            model_serving_container_environment_variables=self.model_serving_container_environment_variables,
+            model_serving_container_ports=self.model_serving_container_ports,
+            model_description=self.model_description,
+            model_instance_schema_uri=self.model_instance_schema_uri,
+            model_parameters_schema_uri=self.model_parameters_schema_uri,
+            model_prediction_schema_uri=self.model_prediction_schema_uri,
+            labels=self.labels,
+            training_encryption_spec_key_name=self.training_encryption_spec_key_name,
+            model_encryption_spec_key_name=self.model_encryption_spec_key_name,
+            staging_bucket=self.staging_bucket,
+            # RUN
+            dataset=self.dataset,
+            annotation_schema_uri=self.annotation_schema_uri,
+            model_display_name=self.model_display_name,
+            model_labels=self.model_labels,
+            base_output_dir=self.base_output_dir,
+            service_account=self.service_account,
+            network=self.network,
+            bigquery_destination=self.bigquery_destination,
+            args=self.args,
+            environment_variables=self.environment_variables,
+            replica_count=self.replica_count,
+            machine_type=self.machine_type,
+            accelerator_type=self.accelerator_type,
+            accelerator_count=self.accelerator_count,
+            boot_disk_type=self.boot_disk_type,
+            boot_disk_size_gb=self.boot_disk_size_gb,
+            training_fraction_split=self.training_fraction_split,
+            validation_fraction_split=self.validation_fraction_split,
+            test_fraction_split=self.test_fraction_split,
+            training_filter_split=self.training_filter_split,
+            validation_filter_split=self.validation_filter_split,
+            test_filter_split=self.test_filter_split,
+            predefined_split_column_name=self.predefined_split_column_name,
+            timestamp_split_column_name=self.timestamp_split_column_name,
+            tensorboard=self.tensorboard,
+            sync=True,
+        )
+
+        result = Model.to_dict(model)
+        model_id = self.hook.extract_model_id(result)
+        self.xcom_push(
+            context,
+            key="model_conf",
+            value={
+                "model_id": model_id,
+                "region": self.region,
+                "project_id": self.project_id,
+            },
+        )
+        return result
+
+
+class DeleteCustomTrainingJobOperator(BaseOperator):
+    """Deletes a CustomTrainingJob, CustomPythonTrainingJob, or CustomContainerTrainingJob."""
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        training_pipeline_id: str,
+        custom_job_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.training_pipeline = training_pipeline_id
+        self.custom_job = custom_job_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Dict):

Review comment:
       I have changed Dict to Context in the last commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1016403983


   @potiuk @josh-fell hi guys could you look on my PR one more time and approve/merge if all looks good?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1017683960


   > @ashb Should the `:type:` directives in the docstrings here be removed preemptively because of #20951?
   
   I have deleted :type in last commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#discussion_r787804525



##########
File path: airflow/providers/google/cloud/example_dags/example_vertex_ai.py
##########
@@ -0,0 +1,313 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
+Cloud Platform.
+
+This DAG relies on the following OS environment variables:
+
+* GCP_VERTEX_AI_BUCKET - Google Cloud Storage bucket where the model will be saved
+after training process was finished.
+* CUSTOM_CONTAINER_URI - path to container with model.
+* PYTHON_PACKAGE_GSC_URI - path to test model in archive.
+* LOCAL_TRAINING_SCRIPT_PATH - path to local training script.
+* DATASET_ID - ID of dataset which will be used in training process.
+"""
+import os
+from datetime import datetime
+from uuid import uuid4
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomContainerTrainingJobOperator,
+    CreateCustomPythonPackageTrainingJobOperator,
+    CreateCustomTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+    ListCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ExportDataOperator,
+    GetDatasetOperator,
+    ImportDataOperator,
+    ListDatasetsOperator,
+    UpdateDatasetOperator,
+)
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+REGION = os.environ.get("GCP_LOCATION", "us-central1")
+BUCKET = os.environ.get("GCP_VERTEX_AI_BUCKET", "vertex-ai-system-tests")
+
+STAGING_BUCKET = f"gs://{BUCKET}"
+DISPLAY_NAME = str(uuid4())  # Create random display name
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+CUSTOM_CONTAINER_URI = os.environ.get("CUSTOM_CONTAINER_URI", "path_to_container_with_model")
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+PYTHON_PACKAGE_GCS_URI = os.environ.get("PYTHON_PACKAGE_GSC_URI", "path_to_test_model_in_arch")
+PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
+
+LOCAL_TRAINING_SCRIPT_PATH = os.environ.get("LOCAL_TRAINING_SCRIPT_PATH", "path_to_training_script")
+
+TRAINING_PIPELINE_ID = "test-training-pipeline-id"
+CUSTOM_JOB_ID = "test-custom-job-id"
+
+IMAGE_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml",
+    "metadata": "test-image-dataset",
+}
+TABULAR_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml",
+    "metadata": "test-tabular-dataset",
+}
+TEXT_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml",
+    "metadata": "test-text-dataset",
+}
+VIDEO_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+TIME_SERIES_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+DATASET_ID = os.environ.get("DATASET_ID", "test-dataset-id")
+TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://test-vertex-ai-bucket/exports"}}
+TEST_IMPORT_CONFIG = [
+    {
+        "data_item_labels": {
+            "test-labels-name": "test-labels-value",
+        },
+        "import_schema_uri": (
+            "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
+        ),
+        "gcs_source": {
+            "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
+        },
+    },
+]
+DATASET_TO_UPDATE = {"display_name": "test-name"}
+TEST_UPDATE_MASK = {"paths": ["displayName"]}
+
+with models.DAG(
+    "example_gcp_vertex_ai_custom_jobs",
+    schedule_interval="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as custom_jobs_dag:
+    # [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+    create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
+        task_id="custom_container_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-container-{DISPLAY_NAME}",
+        container_uri=CUSTOM_CONTAINER_URI,
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        command=["python3", "task.py"],
+        model_display_name=f"container-housing-model-{DISPLAY_NAME}",
+        replica_count=REPLICA_COUNT,
+        machine_type=MACHINE_TYPE,
+        accelerator_type=ACCELERATOR_TYPE,
+        accelerator_count=ACCELERATOR_COUNT,
+        training_fraction_split=TRAINING_FRACTION_SPLIT,
+        validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+        test_fraction_split=TEST_FRACTION_SPLIT,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+
+    # [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+    create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
+        task_id="python_package_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-py-package-{DISPLAY_NAME}",
+        python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
+        python_module_name=PYTHON_MODULE_NAME,
+        container_uri=CONTAINER_URI,
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        model_display_name=f"py-package-housing-model-{DISPLAY_NAME}",
+        replica_count=REPLICA_COUNT,
+        machine_type=MACHINE_TYPE,
+        accelerator_type=ACCELERATOR_TYPE,
+        accelerator_count=ACCELERATOR_COUNT,
+        training_fraction_split=TRAINING_FRACTION_SPLIT,
+        validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+        test_fraction_split=TEST_FRACTION_SPLIT,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+
+    # [START how_to_cloud_vertex_ai_create_custom_training_job_operator]
+    create_custom_training_job = CreateCustomTrainingJobOperator(
+        task_id="custom_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-custom-{DISPLAY_NAME}",
+        script_path=LOCAL_TRAINING_SCRIPT_PATH,
+        container_uri=CONTAINER_URI,
+        requirements=["gcsfs==0.7.1"],
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        replica_count=1,
+        model_display_name=f"custom-housing-model-{DISPLAY_NAME}",
+        sync=False,

Review comment:
       I think my confusion is in the `sync` parameter description of "...execute this method synchronously". The "method" I guess is some underyling API method from what you're saying. I assumed the description was for the operator but if it's related to the job running on AI Platform, maybe it's worth updating slightly? WDYT?
   
   Something like "Whether to execute the AI Platform job synchronously...."?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#discussion_r763591527



##########
File path: airflow/providers/google/cloud/example_dags/example_vertex_ai.py
##########
@@ -0,0 +1,306 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
+Cloud Platform.
+
+This DAG relies on the following OS environment variables:
+
+* GCP_BUCKET_NAME - Google Cloud Storage bucket where the file exists.

Review comment:
       Should this be "GCP_VERTEX_AI_BUCKET" instead of "GCP_BUCKET_NAME"?

##########
File path: airflow/providers/google/cloud/example_dags/example_vertex_ai.py
##########
@@ -0,0 +1,306 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
+Cloud Platform.
+
+This DAG relies on the following OS environment variables:
+
+* GCP_BUCKET_NAME - Google Cloud Storage bucket where the file exists.
+"""
+import os
+from uuid import uuid4
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomContainerTrainingJobOperator,
+    CreateCustomPythonPackageTrainingJobOperator,
+    CreateCustomTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+    ListCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ExportDataOperator,
+    GetDatasetOperator,
+    ImportDataOperator,
+    ListDatasetsOperator,
+    UpdateDatasetOperator,
+)
+from airflow.utils.dates import days_ago
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+REGION = os.environ.get("GCP_LOCATION", "us-central1")
+BUCKET = os.environ.get("GCP_VERTEX_AI_BUCKET", "vertex-ai-system-tests")
+
+STAGING_BUCKET = f"gs://{BUCKET}"
+DISPLAY_NAME = str(uuid4())  # Create random display name
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+CUSTOM_CONTAINER_URI = os.environ.get("CUSTOM_CONTAINER_URI", "path_to_container_with_model")
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+PYTHON_PACKAGE_GCS_URI = os.environ.get("PYTHON_PACKAGE_GSC_URI", "path_to_test_model_in_arch")
+PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
+
+LOCAL_TRAINING_SCRIPT_PATH = os.environ.get("LOCAL_TRAINING_SCRIPT_PATH", "path_to_training_script")
+
+TRAINING_PIPELINE_ID = "test-training-pipeline-id"
+CUSTOM_JOB_ID = "test-custom-job-id"
+
+IMAGE_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml",
+    "metadata": "test-image-dataset",
+}
+TABULAR_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml",
+    "metadata": "test-tabular-dataset",
+}
+TEXT_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml",
+    "metadata": "test-text-dataset",
+}
+VIDEO_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+TIME_SERIES_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+DATASET_ID = os.environ.get("DATASET_ID", "test-dataset-id")
+TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://test-vertex-ai-bucket/exports"}}
+TEST_IMPORT_CONFIG = [
+    {
+        "data_item_labels": {
+            "test-labels-name": "test-labels-value",
+        },
+        "import_schema_uri": (
+            "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
+        ),
+        "gcs_source": {
+            "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
+        },
+    },
+]
+DATASET_TO_UPDATE = {"display_name": "test-name"}
+TEST_UPDATE_MASK = {"paths": ["displayName"]}
+
+with models.DAG(
+    "example_gcp_vertex_ai_custom_jobs",
+    start_date=days_ago(1),

Review comment:
       Applicable to both DAGs in this file:
   
   - Example DAGs should use a static `start_date` value instead of a dynamic one as best practice. The value doesn't matter.
   
   - Also, we've been adding `catchup=False` to initially ward off new-user headaches when these example DAGs are copied and `start_date` is modified to fit needs. 

##########
File path: airflow/providers/google/cloud/example_dags/example_vertex_ai.py
##########
@@ -0,0 +1,306 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
+Cloud Platform.
+
+This DAG relies on the following OS environment variables:
+
+* GCP_BUCKET_NAME - Google Cloud Storage bucket where the file exists.
+"""
+import os
+from uuid import uuid4
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomContainerTrainingJobOperator,
+    CreateCustomPythonPackageTrainingJobOperator,
+    CreateCustomTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+    ListCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ExportDataOperator,
+    GetDatasetOperator,
+    ImportDataOperator,
+    ListDatasetsOperator,
+    UpdateDatasetOperator,
+)
+from airflow.utils.dates import days_ago
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+REGION = os.environ.get("GCP_LOCATION", "us-central1")
+BUCKET = os.environ.get("GCP_VERTEX_AI_BUCKET", "vertex-ai-system-tests")
+
+STAGING_BUCKET = f"gs://{BUCKET}"
+DISPLAY_NAME = str(uuid4())  # Create random display name
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+CUSTOM_CONTAINER_URI = os.environ.get("CUSTOM_CONTAINER_URI", "path_to_container_with_model")
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+PYTHON_PACKAGE_GCS_URI = os.environ.get("PYTHON_PACKAGE_GSC_URI", "path_to_test_model_in_arch")
+PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
+
+LOCAL_TRAINING_SCRIPT_PATH = os.environ.get("LOCAL_TRAINING_SCRIPT_PATH", "path_to_training_script")
+
+TRAINING_PIPELINE_ID = "test-training-pipeline-id"
+CUSTOM_JOB_ID = "test-custom-job-id"
+
+IMAGE_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml",
+    "metadata": "test-image-dataset",
+}
+TABULAR_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml",
+    "metadata": "test-tabular-dataset",
+}
+TEXT_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml",
+    "metadata": "test-text-dataset",
+}
+VIDEO_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+TIME_SERIES_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+DATASET_ID = os.environ.get("DATASET_ID", "test-dataset-id")
+TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://test-vertex-ai-bucket/exports"}}
+TEST_IMPORT_CONFIG = [
+    {
+        "data_item_labels": {
+            "test-labels-name": "test-labels-value",
+        },
+        "import_schema_uri": (
+            "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
+        ),
+        "gcs_source": {
+            "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
+        },
+    },
+]
+DATASET_TO_UPDATE = {"display_name": "test-name"}
+TEST_UPDATE_MASK = {"paths": ["displayName"]}
+
+with models.DAG(
+    "example_gcp_vertex_ai_custom_jobs",

Review comment:
       Right now all of these task will execute in parallel and have no dependencies on each other. If there is a use case in which some of these tasks _could_ flow together in a pipeline it would be nice to demonstrate here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1026990060


   @potiuk @jedcunningham I have created a new PR https://github.com/apache/airflow/pull/21253


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1024772604


   This had a bunch of failing mypy issues once it landed in main. #21203 reverted it.
   
   @MaksYermak, can you rebase your changes on current main, fix the mypy issues, and open a new PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1016527738


   > @potiuk @josh-fell hi guys could you look on my PR one more time and approve/merge if all looks good?
   
   Just a small comment on a possible clarification but LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#discussion_r764153850



##########
File path: airflow/providers/google/cloud/example_dags/example_vertex_ai.py
##########
@@ -0,0 +1,306 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
+Cloud Platform.
+
+This DAG relies on the following OS environment variables:
+
+* GCP_BUCKET_NAME - Google Cloud Storage bucket where the file exists.
+"""
+import os
+from uuid import uuid4
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomContainerTrainingJobOperator,
+    CreateCustomPythonPackageTrainingJobOperator,
+    CreateCustomTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+    ListCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ExportDataOperator,
+    GetDatasetOperator,
+    ImportDataOperator,
+    ListDatasetsOperator,
+    UpdateDatasetOperator,
+)
+from airflow.utils.dates import days_ago
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+REGION = os.environ.get("GCP_LOCATION", "us-central1")
+BUCKET = os.environ.get("GCP_VERTEX_AI_BUCKET", "vertex-ai-system-tests")
+
+STAGING_BUCKET = f"gs://{BUCKET}"
+DISPLAY_NAME = str(uuid4())  # Create random display name
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+CUSTOM_CONTAINER_URI = os.environ.get("CUSTOM_CONTAINER_URI", "path_to_container_with_model")
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+PYTHON_PACKAGE_GCS_URI = os.environ.get("PYTHON_PACKAGE_GSC_URI", "path_to_test_model_in_arch")
+PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
+
+LOCAL_TRAINING_SCRIPT_PATH = os.environ.get("LOCAL_TRAINING_SCRIPT_PATH", "path_to_training_script")
+
+TRAINING_PIPELINE_ID = "test-training-pipeline-id"
+CUSTOM_JOB_ID = "test-custom-job-id"
+
+IMAGE_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml",
+    "metadata": "test-image-dataset",
+}
+TABULAR_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml",
+    "metadata": "test-tabular-dataset",
+}
+TEXT_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml",
+    "metadata": "test-text-dataset",
+}
+VIDEO_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+TIME_SERIES_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+DATASET_ID = os.environ.get("DATASET_ID", "test-dataset-id")
+TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://test-vertex-ai-bucket/exports"}}
+TEST_IMPORT_CONFIG = [
+    {
+        "data_item_labels": {
+            "test-labels-name": "test-labels-value",
+        },
+        "import_schema_uri": (
+            "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
+        ),
+        "gcs_source": {
+            "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
+        },
+    },
+]
+DATASET_TO_UPDATE = {"display_name": "test-name"}
+TEST_UPDATE_MASK = {"paths": ["displayName"]}
+
+with models.DAG(
+    "example_gcp_vertex_ai_custom_jobs",

Review comment:
       @josh-fell right now we don't have any dependencies between these task, but in the future iterations maybe we will add some dependencies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#discussion_r764146751



##########
File path: airflow/providers/google/cloud/example_dags/example_vertex_ai.py
##########
@@ -0,0 +1,306 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
+Cloud Platform.
+
+This DAG relies on the following OS environment variables:
+
+* GCP_BUCKET_NAME - Google Cloud Storage bucket where the file exists.
+"""
+import os
+from uuid import uuid4
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomContainerTrainingJobOperator,
+    CreateCustomPythonPackageTrainingJobOperator,
+    CreateCustomTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+    ListCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ExportDataOperator,
+    GetDatasetOperator,
+    ImportDataOperator,
+    ListDatasetsOperator,
+    UpdateDatasetOperator,
+)
+from airflow.utils.dates import days_ago
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+REGION = os.environ.get("GCP_LOCATION", "us-central1")
+BUCKET = os.environ.get("GCP_VERTEX_AI_BUCKET", "vertex-ai-system-tests")
+
+STAGING_BUCKET = f"gs://{BUCKET}"
+DISPLAY_NAME = str(uuid4())  # Create random display name
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+CUSTOM_CONTAINER_URI = os.environ.get("CUSTOM_CONTAINER_URI", "path_to_container_with_model")
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+PYTHON_PACKAGE_GCS_URI = os.environ.get("PYTHON_PACKAGE_GSC_URI", "path_to_test_model_in_arch")
+PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
+
+LOCAL_TRAINING_SCRIPT_PATH = os.environ.get("LOCAL_TRAINING_SCRIPT_PATH", "path_to_training_script")
+
+TRAINING_PIPELINE_ID = "test-training-pipeline-id"
+CUSTOM_JOB_ID = "test-custom-job-id"
+
+IMAGE_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml",
+    "metadata": "test-image-dataset",
+}
+TABULAR_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml",
+    "metadata": "test-tabular-dataset",
+}
+TEXT_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml",
+    "metadata": "test-text-dataset",
+}
+VIDEO_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+TIME_SERIES_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+DATASET_ID = os.environ.get("DATASET_ID", "test-dataset-id")
+TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://test-vertex-ai-bucket/exports"}}
+TEST_IMPORT_CONFIG = [
+    {
+        "data_item_labels": {
+            "test-labels-name": "test-labels-value",
+        },
+        "import_schema_uri": (
+            "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
+        ),
+        "gcs_source": {
+            "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
+        },
+    },
+]
+DATASET_TO_UPDATE = {"display_name": "test-name"}
+TEST_UPDATE_MASK = {"paths": ["displayName"]}
+
+with models.DAG(
+    "example_gcp_vertex_ai_custom_jobs",
+    start_date=days_ago(1),

Review comment:
       @josh-fell I have changed it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1008739369


   @potiuk could we merge this PR ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#discussion_r784126348



##########
File path: airflow/providers/google/cloud/example_dags/example_vertex_ai.py
##########
@@ -0,0 +1,313 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
+Cloud Platform.
+
+This DAG relies on the following OS environment variables:
+
+* GCP_VERTEX_AI_BUCKET - Google Cloud Storage bucket where the model will be saved
+after training process was finished.
+* CUSTOM_CONTAINER_URI - path to container with model.
+* PYTHON_PACKAGE_GSC_URI - path to test model in archive.
+* LOCAL_TRAINING_SCRIPT_PATH - path to local training script.
+* DATASET_ID - ID of dataset which will be used in training process.
+"""
+import os
+from datetime import datetime
+from uuid import uuid4
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomContainerTrainingJobOperator,
+    CreateCustomPythonPackageTrainingJobOperator,
+    CreateCustomTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+    ListCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ExportDataOperator,
+    GetDatasetOperator,
+    ImportDataOperator,
+    ListDatasetsOperator,
+    UpdateDatasetOperator,
+)
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+REGION = os.environ.get("GCP_LOCATION", "us-central1")
+BUCKET = os.environ.get("GCP_VERTEX_AI_BUCKET", "vertex-ai-system-tests")
+
+STAGING_BUCKET = f"gs://{BUCKET}"
+DISPLAY_NAME = str(uuid4())  # Create random display name
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+CUSTOM_CONTAINER_URI = os.environ.get("CUSTOM_CONTAINER_URI", "path_to_container_with_model")
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+PYTHON_PACKAGE_GCS_URI = os.environ.get("PYTHON_PACKAGE_GSC_URI", "path_to_test_model_in_arch")
+PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
+
+LOCAL_TRAINING_SCRIPT_PATH = os.environ.get("LOCAL_TRAINING_SCRIPT_PATH", "path_to_training_script")
+
+TRAINING_PIPELINE_ID = "test-training-pipeline-id"
+CUSTOM_JOB_ID = "test-custom-job-id"
+
+IMAGE_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml",
+    "metadata": "test-image-dataset",
+}
+TABULAR_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml",
+    "metadata": "test-tabular-dataset",
+}
+TEXT_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml",
+    "metadata": "test-text-dataset",
+}
+VIDEO_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+TIME_SERIES_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+DATASET_ID = os.environ.get("DATASET_ID", "test-dataset-id")
+TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://test-vertex-ai-bucket/exports"}}
+TEST_IMPORT_CONFIG = [
+    {
+        "data_item_labels": {
+            "test-labels-name": "test-labels-value",
+        },
+        "import_schema_uri": (
+            "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
+        ),
+        "gcs_source": {
+            "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
+        },
+    },
+]
+DATASET_TO_UPDATE = {"display_name": "test-name"}
+TEST_UPDATE_MASK = {"paths": ["displayName"]}
+
+with models.DAG(
+    "example_gcp_vertex_ai_custom_jobs",
+    schedule_interval="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as custom_jobs_dag:
+    # [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+    create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
+        task_id="custom_container_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-container-{DISPLAY_NAME}",
+        container_uri=CUSTOM_CONTAINER_URI,
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        command=["python3", "task.py"],
+        model_display_name=f"container-housing-model-{DISPLAY_NAME}",
+        replica_count=REPLICA_COUNT,
+        machine_type=MACHINE_TYPE,
+        accelerator_type=ACCELERATOR_TYPE,
+        accelerator_count=ACCELERATOR_COUNT,
+        training_fraction_split=TRAINING_FRACTION_SPLIT,
+        validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+        test_fraction_split=TEST_FRACTION_SPLIT,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+
+    # [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+    create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
+        task_id="python_package_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-py-package-{DISPLAY_NAME}",
+        python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
+        python_module_name=PYTHON_MODULE_NAME,
+        container_uri=CONTAINER_URI,
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        model_display_name=f"py-package-housing-model-{DISPLAY_NAME}",
+        replica_count=REPLICA_COUNT,
+        machine_type=MACHINE_TYPE,
+        accelerator_type=ACCELERATOR_TYPE,
+        accelerator_count=ACCELERATOR_COUNT,
+        training_fraction_split=TRAINING_FRACTION_SPLIT,
+        validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+        test_fraction_split=TEST_FRACTION_SPLIT,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+
+    # [START how_to_cloud_vertex_ai_create_custom_training_job_operator]
+    create_custom_training_job = CreateCustomTrainingJobOperator(
+        task_id="custom_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-custom-{DISPLAY_NAME}",
+        script_path=LOCAL_TRAINING_SCRIPT_PATH,
+        container_uri=CONTAINER_URI,
+        requirements=["gcsfs==0.7.1"],
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        replica_count=1,
+        model_display_name=f"custom-housing-model-{DISPLAY_NAME}",
+        sync=False,

Review comment:
       In the operator doc, this operator and a couple others will "...wait for the operation to complete" but the example shows this operator running asynchronously. Should this operator and the others mentioned in the doc _always_ execute synchronously (it looks like `sync` is forced to `True` when calling the underlying hook method in the operator) or is configurability allowed for running the job in an async manner?

##########
File path: airflow/providers/google/cloud/operators/vertex_ai/custom_job.py
##########
@@ -0,0 +1,594 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Vertex AI operators."""
+
+from typing import Dict, List, Optional, Sequence, Tuple, Union
+
+from google.api_core.exceptions import NotFound
+from google.api_core.retry import Retry
+from google.cloud.aiplatform.models import Model
+from google.cloud.aiplatform_v1.types.dataset import Dataset
+from google.cloud.aiplatform_v1.types.training_pipeline import TrainingPipeline
+
+from airflow.models import BaseOperator, BaseOperatorLink
+from airflow.models.taskinstance import TaskInstance
+from airflow.providers.google.cloud.hooks.vertex_ai.custom_job import CustomJobHook
+
+VERTEX_AI_BASE_LINK = "https://console.cloud.google.com/vertex-ai"
+VERTEX_AI_MODEL_LINK = (
+    VERTEX_AI_BASE_LINK + "/locations/{region}/models/{model_id}/deploy?project={project_id}"
+)
+VERTEX_AI_TRAINING_PIPELINES_LINK = VERTEX_AI_BASE_LINK + "/training/training-pipelines?project={project_id}"
+
+
+class VertexAIModelLink(BaseOperatorLink):
+    """Helper class for constructing Vertex AI Model link"""
+
+    name = "Vertex AI Model"
+
+    def get_link(self, operator, dttm):
+        ti = TaskInstance(task=operator, execution_date=dttm)
+        model_conf = ti.xcom_pull(task_ids=operator.task_id, key="model_conf")
+        return (
+            VERTEX_AI_MODEL_LINK.format(
+                region=model_conf["region"],
+                model_id=model_conf["model_id"],
+                project_id=model_conf["project_id"],
+            )
+            if model_conf
+            else ""
+        )
+
+
+class VertexAITrainingPipelinesLink(BaseOperatorLink):
+    """Helper class for constructing Vertex AI Training Pipelines link"""
+
+    name = "Vertex AI Training Pipelines"
+
+    def get_link(self, operator, dttm):
+        ti = TaskInstance(task=operator, execution_date=dttm)
+        project_id = ti.xcom_pull(task_ids=operator.task_id, key="project_id")
+        return (
+            VERTEX_AI_TRAINING_PIPELINES_LINK.format(
+                project_id=project_id,
+            )
+            if project_id
+            else ""
+        )
+
+
+class CustomTrainingJobBaseOperator(BaseOperator):
+    """The base class for operators that launch Custom jobs on VertexAI."""
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        display_name: str,
+        container_uri: str,
+        model_serving_container_image_uri: Optional[str] = None,
+        model_serving_container_predict_route: Optional[str] = None,
+        model_serving_container_health_route: Optional[str] = None,
+        model_serving_container_command: Optional[Sequence[str]] = None,
+        model_serving_container_args: Optional[Sequence[str]] = None,
+        model_serving_container_environment_variables: Optional[Dict[str, str]] = None,
+        model_serving_container_ports: Optional[Sequence[int]] = None,
+        model_description: Optional[str] = None,
+        model_instance_schema_uri: Optional[str] = None,
+        model_parameters_schema_uri: Optional[str] = None,
+        model_prediction_schema_uri: Optional[str] = None,
+        labels: Optional[Dict[str, str]] = None,
+        training_encryption_spec_key_name: Optional[str] = None,
+        model_encryption_spec_key_name: Optional[str] = None,
+        staging_bucket: Optional[str] = None,
+        # RUN
+        dataset_id: Optional[str] = None,
+        annotation_schema_uri: Optional[str] = None,
+        model_display_name: Optional[str] = None,
+        model_labels: Optional[Dict[str, str]] = None,
+        base_output_dir: Optional[str] = None,
+        service_account: Optional[str] = None,
+        network: Optional[str] = None,
+        bigquery_destination: Optional[str] = None,
+        args: Optional[List[Union[str, float, int]]] = None,
+        environment_variables: Optional[Dict[str, str]] = None,
+        replica_count: int = 1,
+        machine_type: str = "n1-standard-4",
+        accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
+        accelerator_count: int = 0,
+        boot_disk_type: str = "pd-ssd",
+        boot_disk_size_gb: int = 100,
+        training_fraction_split: Optional[float] = None,
+        validation_fraction_split: Optional[float] = None,
+        test_fraction_split: Optional[float] = None,
+        training_filter_split: Optional[str] = None,
+        validation_filter_split: Optional[str] = None,
+        test_filter_split: Optional[str] = None,
+        predefined_split_column_name: Optional[str] = None,
+        timestamp_split_column_name: Optional[str] = None,
+        tensorboard: Optional[str] = None,
+        sync=True,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.display_name = display_name
+        # START Custom
+        self.container_uri = container_uri
+        self.model_serving_container_image_uri = model_serving_container_image_uri
+        self.model_serving_container_predict_route = model_serving_container_predict_route
+        self.model_serving_container_health_route = model_serving_container_health_route
+        self.model_serving_container_command = model_serving_container_command
+        self.model_serving_container_args = model_serving_container_args
+        self.model_serving_container_environment_variables = model_serving_container_environment_variables
+        self.model_serving_container_ports = model_serving_container_ports
+        self.model_description = model_description
+        self.model_instance_schema_uri = model_instance_schema_uri
+        self.model_parameters_schema_uri = model_parameters_schema_uri
+        self.model_prediction_schema_uri = model_prediction_schema_uri
+        self.labels = labels
+        self.training_encryption_spec_key_name = training_encryption_spec_key_name
+        self.model_encryption_spec_key_name = model_encryption_spec_key_name
+        self.staging_bucket = staging_bucket
+        # END Custom
+        # START Run param
+        self.dataset = Dataset(name=dataset_id) if dataset_id else None
+        self.annotation_schema_uri = annotation_schema_uri
+        self.model_display_name = model_display_name
+        self.model_labels = model_labels
+        self.base_output_dir = base_output_dir
+        self.service_account = service_account
+        self.network = network
+        self.bigquery_destination = bigquery_destination
+        self.args = args
+        self.environment_variables = environment_variables
+        self.replica_count = replica_count
+        self.machine_type = machine_type
+        self.accelerator_type = accelerator_type
+        self.accelerator_count = accelerator_count
+        self.boot_disk_type = boot_disk_type
+        self.boot_disk_size_gb = boot_disk_size_gb
+        self.training_fraction_split = training_fraction_split
+        self.validation_fraction_split = validation_fraction_split
+        self.test_fraction_split = test_fraction_split
+        self.training_filter_split = training_filter_split
+        self.validation_filter_split = validation_filter_split
+        self.test_filter_split = test_filter_split
+        self.predefined_split_column_name = predefined_split_column_name
+        self.timestamp_split_column_name = timestamp_split_column_name
+        self.tensorboard = tensorboard
+        self.sync = sync
+        # END Run param
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        self.hook: Optional[CustomJobHook] = None
+
+    def execute(self, context):
+        self.hook = CustomJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def on_kill(self) -> None:
+        """
+        Callback called when the operator is killed.
+        Cancel any running job.
+        """
+        if self.hook:
+            self.hook.cancel_job()
+
+
+class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
+    """Create Custom Container Training job"""
+
+    template_fields = [
+        'region',
+        'command',
+        'impersonation_chain',
+    ]
+    operator_extra_links = (VertexAIModelLink(),)
+
+    def __init__(
+        self,
+        *,
+        command: Sequence[str] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.command = command
+
+    def execute(self, context):
+        super().execute(context)
+        model = self.hook.create_custom_container_training_job(
+            project_id=self.project_id,
+            region=self.region,
+            display_name=self.display_name,
+            container_uri=self.container_uri,
+            command=self.command,
+            model_serving_container_image_uri=self.model_serving_container_image_uri,
+            model_serving_container_predict_route=self.model_serving_container_predict_route,
+            model_serving_container_health_route=self.model_serving_container_health_route,
+            model_serving_container_command=self.model_serving_container_command,
+            model_serving_container_args=self.model_serving_container_args,
+            model_serving_container_environment_variables=self.model_serving_container_environment_variables,
+            model_serving_container_ports=self.model_serving_container_ports,
+            model_description=self.model_description,
+            model_instance_schema_uri=self.model_instance_schema_uri,
+            model_parameters_schema_uri=self.model_parameters_schema_uri,
+            model_prediction_schema_uri=self.model_prediction_schema_uri,
+            labels=self.labels,
+            training_encryption_spec_key_name=self.training_encryption_spec_key_name,
+            model_encryption_spec_key_name=self.model_encryption_spec_key_name,
+            staging_bucket=self.staging_bucket,
+            # RUN
+            dataset=self.dataset,
+            annotation_schema_uri=self.annotation_schema_uri,
+            model_display_name=self.model_display_name,
+            model_labels=self.model_labels,
+            base_output_dir=self.base_output_dir,
+            service_account=self.service_account,
+            network=self.network,
+            bigquery_destination=self.bigquery_destination,
+            args=self.args,
+            environment_variables=self.environment_variables,
+            replica_count=self.replica_count,
+            machine_type=self.machine_type,
+            accelerator_type=self.accelerator_type,
+            accelerator_count=self.accelerator_count,
+            boot_disk_type=self.boot_disk_type,
+            boot_disk_size_gb=self.boot_disk_size_gb,
+            training_fraction_split=self.training_fraction_split,
+            validation_fraction_split=self.validation_fraction_split,
+            test_fraction_split=self.test_fraction_split,
+            training_filter_split=self.training_filter_split,
+            validation_filter_split=self.validation_filter_split,
+            test_filter_split=self.test_filter_split,
+            predefined_split_column_name=self.predefined_split_column_name,
+            timestamp_split_column_name=self.timestamp_split_column_name,
+            tensorboard=self.tensorboard,
+            sync=True,
+        )
+
+        result = Model.to_dict(model)
+        model_id = self.hook.extract_model_id(result)
+        self.xcom_push(
+            context,
+            key="model_conf",
+            value={
+                "model_id": model_id,
+                "region": self.region,
+                "project_id": self.project_id,
+            },
+        )
+        return result
+
+
+class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator):
+    """Create Custom Python Package Training job"""
+
+    template_fields = [
+        'region',
+        'impersonation_chain',
+    ]
+    operator_extra_links = (VertexAIModelLink(),)
+
+    def __init__(
+        self,
+        *,
+        python_package_gcs_uri: str,
+        python_module_name: str,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.python_package_gcs_uri = python_package_gcs_uri
+        self.python_module_name = python_module_name
+
+    def execute(self, context):
+        super().execute(context)
+        model = self.hook.create_custom_python_package_training_job(
+            project_id=self.project_id,
+            region=self.region,
+            display_name=self.display_name,
+            python_package_gcs_uri=self.python_package_gcs_uri,
+            python_module_name=self.python_module_name,
+            container_uri=self.container_uri,
+            model_serving_container_image_uri=self.model_serving_container_image_uri,
+            model_serving_container_predict_route=self.model_serving_container_predict_route,
+            model_serving_container_health_route=self.model_serving_container_health_route,
+            model_serving_container_command=self.model_serving_container_command,
+            model_serving_container_args=self.model_serving_container_args,
+            model_serving_container_environment_variables=self.model_serving_container_environment_variables,
+            model_serving_container_ports=self.model_serving_container_ports,
+            model_description=self.model_description,
+            model_instance_schema_uri=self.model_instance_schema_uri,
+            model_parameters_schema_uri=self.model_parameters_schema_uri,
+            model_prediction_schema_uri=self.model_prediction_schema_uri,
+            labels=self.labels,
+            training_encryption_spec_key_name=self.training_encryption_spec_key_name,
+            model_encryption_spec_key_name=self.model_encryption_spec_key_name,
+            staging_bucket=self.staging_bucket,
+            # RUN
+            dataset=self.dataset,
+            annotation_schema_uri=self.annotation_schema_uri,
+            model_display_name=self.model_display_name,
+            model_labels=self.model_labels,
+            base_output_dir=self.base_output_dir,
+            service_account=self.service_account,
+            network=self.network,
+            bigquery_destination=self.bigquery_destination,
+            args=self.args,
+            environment_variables=self.environment_variables,
+            replica_count=self.replica_count,
+            machine_type=self.machine_type,
+            accelerator_type=self.accelerator_type,
+            accelerator_count=self.accelerator_count,
+            boot_disk_type=self.boot_disk_type,
+            boot_disk_size_gb=self.boot_disk_size_gb,
+            training_fraction_split=self.training_fraction_split,
+            validation_fraction_split=self.validation_fraction_split,
+            test_fraction_split=self.test_fraction_split,
+            training_filter_split=self.training_filter_split,
+            validation_filter_split=self.validation_filter_split,
+            test_filter_split=self.test_filter_split,
+            predefined_split_column_name=self.predefined_split_column_name,
+            timestamp_split_column_name=self.timestamp_split_column_name,
+            tensorboard=self.tensorboard,
+            sync=True,
+        )
+
+        result = Model.to_dict(model)
+        model_id = self.hook.extract_model_id(result)
+        self.xcom_push(
+            context,
+            key="model_conf",
+            value={
+                "model_id": model_id,
+                "region": self.region,
+                "project_id": self.project_id,
+            },
+        )
+        return result
+
+
+class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
+    """Create Custom Training job"""
+
+    template_fields = [
+        'region',
+        'script_path',
+        'requirements',
+        'impersonation_chain',
+    ]
+    operator_extra_links = (VertexAIModelLink(),)
+
+    def __init__(
+        self,
+        *,
+        script_path: str,
+        requirements: Optional[Sequence[str]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.requirements = requirements
+        self.script_path = script_path
+
+    def execute(self, context):
+        super().execute(context)
+        model = self.hook.create_custom_training_job(
+            project_id=self.project_id,
+            region=self.region,
+            display_name=self.display_name,
+            script_path=self.script_path,
+            container_uri=self.container_uri,
+            requirements=self.requirements,
+            model_serving_container_image_uri=self.model_serving_container_image_uri,
+            model_serving_container_predict_route=self.model_serving_container_predict_route,
+            model_serving_container_health_route=self.model_serving_container_health_route,
+            model_serving_container_command=self.model_serving_container_command,
+            model_serving_container_args=self.model_serving_container_args,
+            model_serving_container_environment_variables=self.model_serving_container_environment_variables,
+            model_serving_container_ports=self.model_serving_container_ports,
+            model_description=self.model_description,
+            model_instance_schema_uri=self.model_instance_schema_uri,
+            model_parameters_schema_uri=self.model_parameters_schema_uri,
+            model_prediction_schema_uri=self.model_prediction_schema_uri,
+            labels=self.labels,
+            training_encryption_spec_key_name=self.training_encryption_spec_key_name,
+            model_encryption_spec_key_name=self.model_encryption_spec_key_name,
+            staging_bucket=self.staging_bucket,
+            # RUN
+            dataset=self.dataset,
+            annotation_schema_uri=self.annotation_schema_uri,
+            model_display_name=self.model_display_name,
+            model_labels=self.model_labels,
+            base_output_dir=self.base_output_dir,
+            service_account=self.service_account,
+            network=self.network,
+            bigquery_destination=self.bigquery_destination,
+            args=self.args,
+            environment_variables=self.environment_variables,
+            replica_count=self.replica_count,
+            machine_type=self.machine_type,
+            accelerator_type=self.accelerator_type,
+            accelerator_count=self.accelerator_count,
+            boot_disk_type=self.boot_disk_type,
+            boot_disk_size_gb=self.boot_disk_size_gb,
+            training_fraction_split=self.training_fraction_split,
+            validation_fraction_split=self.validation_fraction_split,
+            test_fraction_split=self.test_fraction_split,
+            training_filter_split=self.training_filter_split,
+            validation_filter_split=self.validation_filter_split,
+            test_filter_split=self.test_filter_split,
+            predefined_split_column_name=self.predefined_split_column_name,
+            timestamp_split_column_name=self.timestamp_split_column_name,
+            tensorboard=self.tensorboard,
+            sync=True,
+        )
+
+        result = Model.to_dict(model)
+        model_id = self.hook.extract_model_id(result)
+        self.xcom_push(
+            context,
+            key="model_conf",
+            value={
+                "model_id": model_id,
+                "region": self.region,
+                "project_id": self.project_id,
+            },
+        )
+        return result
+
+
+class DeleteCustomTrainingJobOperator(BaseOperator):
+    """Deletes a CustomTrainingJob, CustomPythonTrainingJob, or CustomContainerTrainingJob."""
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        training_pipeline_id: str,
+        custom_job_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.training_pipeline = training_pipeline_id
+        self.custom_job = custom_job_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Dict):

Review comment:
       FYI - As of Airflow 2.2.3, there is a specific `Context` object available to use for typing but to make sure the operator doesn't have an implicit Airflow-version dependency you'll need to use `TYPE_CHECKING`.
   
   Something like:
   ```python
   if TYPE_CHECKING:
       from airflow.utils.context import Context
   ...
   
   def execute(self, context: "Context"): ...
   ```
   This can be done for `ListCustomTrainingJobOperator` and the Dataset operators as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#discussion_r788584510



##########
File path: airflow/providers/google/cloud/example_dags/example_vertex_ai.py
##########
@@ -0,0 +1,313 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
+Cloud Platform.
+
+This DAG relies on the following OS environment variables:
+
+* GCP_VERTEX_AI_BUCKET - Google Cloud Storage bucket where the model will be saved
+after training process was finished.
+* CUSTOM_CONTAINER_URI - path to container with model.
+* PYTHON_PACKAGE_GSC_URI - path to test model in archive.
+* LOCAL_TRAINING_SCRIPT_PATH - path to local training script.
+* DATASET_ID - ID of dataset which will be used in training process.
+"""
+import os
+from datetime import datetime
+from uuid import uuid4
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomContainerTrainingJobOperator,
+    CreateCustomPythonPackageTrainingJobOperator,
+    CreateCustomTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+    ListCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ExportDataOperator,
+    GetDatasetOperator,
+    ImportDataOperator,
+    ListDatasetsOperator,
+    UpdateDatasetOperator,
+)
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+REGION = os.environ.get("GCP_LOCATION", "us-central1")
+BUCKET = os.environ.get("GCP_VERTEX_AI_BUCKET", "vertex-ai-system-tests")
+
+STAGING_BUCKET = f"gs://{BUCKET}"
+DISPLAY_NAME = str(uuid4())  # Create random display name
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+CUSTOM_CONTAINER_URI = os.environ.get("CUSTOM_CONTAINER_URI", "path_to_container_with_model")
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+PYTHON_PACKAGE_GCS_URI = os.environ.get("PYTHON_PACKAGE_GSC_URI", "path_to_test_model_in_arch")
+PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
+
+LOCAL_TRAINING_SCRIPT_PATH = os.environ.get("LOCAL_TRAINING_SCRIPT_PATH", "path_to_training_script")
+
+TRAINING_PIPELINE_ID = "test-training-pipeline-id"
+CUSTOM_JOB_ID = "test-custom-job-id"
+
+IMAGE_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml",
+    "metadata": "test-image-dataset",
+}
+TABULAR_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml",
+    "metadata": "test-tabular-dataset",
+}
+TEXT_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml",
+    "metadata": "test-text-dataset",
+}
+VIDEO_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+TIME_SERIES_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+DATASET_ID = os.environ.get("DATASET_ID", "test-dataset-id")
+TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://test-vertex-ai-bucket/exports"}}
+TEST_IMPORT_CONFIG = [
+    {
+        "data_item_labels": {
+            "test-labels-name": "test-labels-value",
+        },
+        "import_schema_uri": (
+            "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
+        ),
+        "gcs_source": {
+            "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
+        },
+    },
+]
+DATASET_TO_UPDATE = {"display_name": "test-name"}
+TEST_UPDATE_MASK = {"paths": ["displayName"]}
+
+with models.DAG(
+    "example_gcp_vertex_ai_custom_jobs",
+    schedule_interval="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as custom_jobs_dag:
+    # [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+    create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
+        task_id="custom_container_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-container-{DISPLAY_NAME}",
+        container_uri=CUSTOM_CONTAINER_URI,
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        command=["python3", "task.py"],
+        model_display_name=f"container-housing-model-{DISPLAY_NAME}",
+        replica_count=REPLICA_COUNT,
+        machine_type=MACHINE_TYPE,
+        accelerator_type=ACCELERATOR_TYPE,
+        accelerator_count=ACCELERATOR_COUNT,
+        training_fraction_split=TRAINING_FRACTION_SPLIT,
+        validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+        test_fraction_split=TEST_FRACTION_SPLIT,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+
+    # [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+    create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
+        task_id="python_package_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-py-package-{DISPLAY_NAME}",
+        python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
+        python_module_name=PYTHON_MODULE_NAME,
+        container_uri=CONTAINER_URI,
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        model_display_name=f"py-package-housing-model-{DISPLAY_NAME}",
+        replica_count=REPLICA_COUNT,
+        machine_type=MACHINE_TYPE,
+        accelerator_type=ACCELERATOR_TYPE,
+        accelerator_count=ACCELERATOR_COUNT,
+        training_fraction_split=TRAINING_FRACTION_SPLIT,
+        validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+        test_fraction_split=TEST_FRACTION_SPLIT,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+
+    # [START how_to_cloud_vertex_ai_create_custom_training_job_operator]
+    create_custom_training_job = CreateCustomTrainingJobOperator(
+        task_id="custom_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-custom-{DISPLAY_NAME}",
+        script_path=LOCAL_TRAINING_SCRIPT_PATH,
+        container_uri=CONTAINER_URI,
+        requirements=["gcsfs==0.7.1"],
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        replica_count=1,
+        model_display_name=f"custom-housing-model-{DISPLAY_NAME}",
+        sync=False,

Review comment:
       I think it is good idea. I will update docstring in next PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#discussion_r785735655



##########
File path: airflow/providers/google/cloud/example_dags/example_vertex_ai.py
##########
@@ -0,0 +1,313 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
+Cloud Platform.
+
+This DAG relies on the following OS environment variables:
+
+* GCP_VERTEX_AI_BUCKET - Google Cloud Storage bucket where the model will be saved
+after training process was finished.
+* CUSTOM_CONTAINER_URI - path to container with model.
+* PYTHON_PACKAGE_GSC_URI - path to test model in archive.
+* LOCAL_TRAINING_SCRIPT_PATH - path to local training script.
+* DATASET_ID - ID of dataset which will be used in training process.
+"""
+import os
+from datetime import datetime
+from uuid import uuid4
+
+from airflow import models
+from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
+    CreateCustomContainerTrainingJobOperator,
+    CreateCustomPythonPackageTrainingJobOperator,
+    CreateCustomTrainingJobOperator,
+    DeleteCustomTrainingJobOperator,
+    ListCustomTrainingJobOperator,
+)
+from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
+    CreateDatasetOperator,
+    DeleteDatasetOperator,
+    ExportDataOperator,
+    GetDatasetOperator,
+    ImportDataOperator,
+    ListDatasetsOperator,
+    UpdateDatasetOperator,
+)
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+REGION = os.environ.get("GCP_LOCATION", "us-central1")
+BUCKET = os.environ.get("GCP_VERTEX_AI_BUCKET", "vertex-ai-system-tests")
+
+STAGING_BUCKET = f"gs://{BUCKET}"
+DISPLAY_NAME = str(uuid4())  # Create random display name
+CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
+CUSTOM_CONTAINER_URI = os.environ.get("CUSTOM_CONTAINER_URI", "path_to_container_with_model")
+MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
+REPLICA_COUNT = 1
+MACHINE_TYPE = "n1-standard-4"
+ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
+ACCELERATOR_COUNT = 0
+TRAINING_FRACTION_SPLIT = 0.7
+TEST_FRACTION_SPLIT = 0.15
+VALIDATION_FRACTION_SPLIT = 0.15
+
+PYTHON_PACKAGE_GCS_URI = os.environ.get("PYTHON_PACKAGE_GSC_URI", "path_to_test_model_in_arch")
+PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
+
+LOCAL_TRAINING_SCRIPT_PATH = os.environ.get("LOCAL_TRAINING_SCRIPT_PATH", "path_to_training_script")
+
+TRAINING_PIPELINE_ID = "test-training-pipeline-id"
+CUSTOM_JOB_ID = "test-custom-job-id"
+
+IMAGE_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml",
+    "metadata": "test-image-dataset",
+}
+TABULAR_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml",
+    "metadata": "test-tabular-dataset",
+}
+TEXT_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml",
+    "metadata": "test-text-dataset",
+}
+VIDEO_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+TIME_SERIES_DATASET = {
+    "display_name": str(uuid4()),
+    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml",
+    "metadata": "test-video-dataset",
+}
+DATASET_ID = os.environ.get("DATASET_ID", "test-dataset-id")
+TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://test-vertex-ai-bucket/exports"}}
+TEST_IMPORT_CONFIG = [
+    {
+        "data_item_labels": {
+            "test-labels-name": "test-labels-value",
+        },
+        "import_schema_uri": (
+            "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
+        ),
+        "gcs_source": {
+            "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
+        },
+    },
+]
+DATASET_TO_UPDATE = {"display_name": "test-name"}
+TEST_UPDATE_MASK = {"paths": ["displayName"]}
+
+with models.DAG(
+    "example_gcp_vertex_ai_custom_jobs",
+    schedule_interval="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as custom_jobs_dag:
+    # [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+    create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
+        task_id="custom_container_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-container-{DISPLAY_NAME}",
+        container_uri=CUSTOM_CONTAINER_URI,
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        command=["python3", "task.py"],
+        model_display_name=f"container-housing-model-{DISPLAY_NAME}",
+        replica_count=REPLICA_COUNT,
+        machine_type=MACHINE_TYPE,
+        accelerator_type=ACCELERATOR_TYPE,
+        accelerator_count=ACCELERATOR_COUNT,
+        training_fraction_split=TRAINING_FRACTION_SPLIT,
+        validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+        test_fraction_split=TEST_FRACTION_SPLIT,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
+
+    # [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+    create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
+        task_id="python_package_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-py-package-{DISPLAY_NAME}",
+        python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
+        python_module_name=PYTHON_MODULE_NAME,
+        container_uri=CONTAINER_URI,
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        model_display_name=f"py-package-housing-model-{DISPLAY_NAME}",
+        replica_count=REPLICA_COUNT,
+        machine_type=MACHINE_TYPE,
+        accelerator_type=ACCELERATOR_TYPE,
+        accelerator_count=ACCELERATOR_COUNT,
+        training_fraction_split=TRAINING_FRACTION_SPLIT,
+        validation_fraction_split=VALIDATION_FRACTION_SPLIT,
+        test_fraction_split=TEST_FRACTION_SPLIT,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
+
+    # [START how_to_cloud_vertex_ai_create_custom_training_job_operator]
+    create_custom_training_job = CreateCustomTrainingJobOperator(
+        task_id="custom_task",
+        staging_bucket=STAGING_BUCKET,
+        display_name=f"train-housing-custom-{DISPLAY_NAME}",
+        script_path=LOCAL_TRAINING_SCRIPT_PATH,
+        container_uri=CONTAINER_URI,
+        requirements=["gcsfs==0.7.1"],
+        model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
+        # run params
+        dataset_id=DATASET_ID,
+        replica_count=1,
+        model_display_name=f"custom-housing-model-{DISPLAY_NAME}",
+        sync=False,

Review comment:
       This `sync` relates to aiplatfrom job not to our operator. Our hook runs this job with specific parameters and than this job create CustomJob inside Google Cloud after that our operator waits for CustomJob finished. Our operator works as usual.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1012173051


   @josh-fell @mik-laj @turbaszek @vikramkoka @ashb @potiuk  hi guys what about this PR, could we merge it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1023996380


   @ashb @josh-fell @potiuk Hi all, could we merge this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #20077:
URL: https://github.com/apache/airflow/pull/20077


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1024869686


   Yeah. So the up-to-date checker did not prevent it because it was really last run 8 days ago :( . Up-to-date was pretty useless actually (or maybe not :) - this was the ONLY change with this problem. I wonder how many we prevented ) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#discussion_r765677432



##########
File path: scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
##########
@@ -151,7 +151,10 @@ def parse_module_data(provider_data, resource_type, yaml_file_path):
     package_dir = ROOT_DIR + "/" + os.path.dirname(yaml_file_path)
     provider_package = os.path.dirname(yaml_file_path).replace(os.sep, ".")
     py_files = chain(
-        glob(f"{package_dir}/**/{resource_type}/*.py"), glob(f"{package_dir}/{resource_type}/*.py")
+        glob(f"{package_dir}/**/{resource_type}/*.py"),
+        glob(f"{package_dir}/{resource_type}/*.py"),
+        glob(f"{package_dir}/**/{resource_type}/**/*.py"),
+        glob(f"{package_dir}/{resource_type}/**/*.py"),

Review comment:
       I have done this changes, because for Vertex AI we use package for code rather than one module and for packages our static check doesn't work. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-992947745


   Tests are failing - please fix and rebase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#discussion_r764145389



##########
File path: airflow/providers/google/cloud/example_dags/example_vertex_ai.py
##########
@@ -0,0 +1,306 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
+Cloud Platform.
+
+This DAG relies on the following OS environment variables:
+
+* GCP_BUCKET_NAME - Google Cloud Storage bucket where the file exists.

Review comment:
       @josh-fell yes you right, I have changed it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1017599506


   @ashb Should the `:type:` directives in the docstrings here be removed preemptively because of #20951?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1017686432


   @MaksYermak You might have to add some local spelling dictionaries to your hook file too -- check out https://github.com/apache/airflow/pull/20951/commits/1149e63e45d9e5047935c11287a4d0b23e87d2bd
   
   Sorry about that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1018340484


   > @MaksYermak You might have to add some local spelling dictionaries to your hook file too -- check out [1149e63](https://github.com/apache/airflow/commit/1149e63e45d9e5047935c11287a4d0b23e87d2bd)
   > 
   > Sorry about that.
   
   @ashb I have run this command for check `./breeze build-docs -- --spellcheck-only --package-filter apache-airflow-providers-google` and I havenĀ“t seen any spell errors in this code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1024582228


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1024869686


   Yeah. So the up-to-date checker did not prevent it because it was really last run 8 hours ago :( . Up-to-date was pretty useless actually :( 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20077: Create CustomJob and Datasets operators for Vertex AI service

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20077:
URL: https://github.com/apache/airflow/pull/20077#issuecomment-1024869686


   Yeah. So the up-to-date checker did not prevent it because it was really last run 8 days ago :( . Up-to-date was pretty useless actually :( 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org