You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "pankajastro (via GitHub)" <gi...@apache.org> on 2023/03/02 09:27:11 UTC

[GitHub] [airflow] pankajastro commented on a diff in pull request #29835: Provider Google Cloud - Add bigquery bi engine reservation operators

pankajastro commented on code in PR #29835:
URL: https://github.com/apache/airflow/pull/29835#discussion_r1122802132


##########
airflow/providers/google/cloud/hooks/bigquery_reservation.py:
##########
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a BigQuery Reservation Hook."""
+from __future__ import annotations
+
+from typing import Sequence
+
+from google.cloud.bigquery_reservation_v1 import ReservationServiceClient
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.common.consts import CLIENT_INFO
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+
+class BigQueryReservationServiceHook(GoogleBaseHook):
+    """
+    Hook for Google Bigquery Reservation API.
+
+    :param gcp_conn_id: The Airflow connection used for GCP credentials.
+    :param location: The location of the BigQuery resource.
+    :param delegate_to: This performs a task on one host with reference to other hosts.
+    :param impersonation_chain: This is the optional service account to impersonate using short term
+        credentials.
+    """
+
+    conn_name_attr = "gcp_conn_id"
+    default_conn_name = "google_cloud_bigquery_reservation_default"
+    conn_type = "gcpbigqueryreservation"
+    hook_name = "Google Bigquery Reservation"
+
+    def __init__(
+        self,
+        gcp_conn_id: str = GoogleBaseHook.default_conn_name,
+        location: str | None = None,
+        delegate_to: str | None = None,

Review Comment:
   in `GoogleBaseHook` we have deprecated `delegate_to` so can we remove this from here?



##########
airflow/providers/google/cloud/operators/bigquery_reservation.py:
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google BigQuery Reservation Service operators."""
+from __future__ import annotations
+
+from typing import Any, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.bigquery_reservation import BigQueryReservationServiceHook
+
+
+class BigQueryBiEngineReservationCreateOperator(BaseOperator):
+    """
+    Create or Update BI engine reservation.
+
+    :param location: The BI engine reservation location. (templated)
+    :param size: The BI Engine reservation memory size (GB). (templated)
+    :param project_id: (Optional) The name of the project where the reservation
+        will be attached from. (templated)
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud. (templated)
+    :param delegate_to: Account to impersonate using domain-wide delegation of authority,

Review Comment:
   since we have deprecated `delegate_to` in google provider so can avoid adding it here?



##########
airflow/providers/google/cloud/example_dags/example_bigquery_reservation_bi_engine.py:
##########
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that performs BI Engine.
+
+This DAG reserves a 100GB BI engine reservation between from 8am ato 7pm each working days.
+This DAG executes this following workflow:
+    1. Create BI reservation
+    2. Wait the precise date
+    3. Delete BI reservation
+"""
+from __future__ import annotations
+
+import os
+
+from pendulum import Time, datetime
+
+from airflow import models
+from airflow.providers.google.cloud.operators.bigquery_reservation import (
+    BigQueryBiEngineReservationCreateOperator,
+    BigQueryBiEngineReservationDeleteOperator,
+)
+from airflow.sensors.time_sensor import TimeSensor
+
+END_TIME = Time(19, 0, 0)
+BI_ENGINE_RESERVATION_SIZE = 100
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-project")
+LOCATION = os.getenv("GCP_PROJECT_LOCATION", "US")
+
+with models.DAG(

Review Comment:
   not sure if we should move this to system tests?



##########
tests/providers/google/cloud/operators/test_bigquery_reservation.py:
##########
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import unittest
+from unittest import mock
+
+from airflow.providers.google.cloud.operators.bigquery_reservation import (
+    BigQueryBiEngineReservationCreateOperator,
+    BigQueryBiEngineReservationDeleteOperator,
+)
+
+TASK_ID = "id"
+PROJECT_ID = "test-project"
+LOCATION = "US"
+SIZE = 100
+
+
+class TestBigQueryBiEngineReservationCreateOperator(unittest.TestCase):

Review Comment:
   ```suggestion
   class TestBigQueryBiEngineReservationCreateOperator:
   ```



##########
airflow/providers/google/cloud/example_dags/example_bigquery_reservation_bi_engine.py:
##########
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that performs BI Engine.
+
+This DAG reserves a 100GB BI engine reservation between from 8am ato 7pm each working days.
+This DAG executes this following workflow:
+    1. Create BI reservation
+    2. Wait the precise date
+    3. Delete BI reservation
+"""
+from __future__ import annotations
+
+import os
+
+from pendulum import Time, datetime
+
+from airflow import models
+from airflow.providers.google.cloud.operators.bigquery_reservation import (
+    BigQueryBiEngineReservationCreateOperator,
+    BigQueryBiEngineReservationDeleteOperator,
+)
+from airflow.sensors.time_sensor import TimeSensor
+
+END_TIME = Time(19, 0, 0)
+BI_ENGINE_RESERVATION_SIZE = 100
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-project")
+LOCATION = os.getenv("GCP_PROJECT_LOCATION", "US")
+
+with models.DAG(

Review Comment:
   ohh you have already added the system test then do we need example here too?



##########
airflow/providers/google/cloud/hooks/bigquery_reservation.py:
##########
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a BigQuery Reservation Hook."""
+from __future__ import annotations
+
+from typing import Sequence
+
+from google.cloud.bigquery_reservation_v1 import ReservationServiceClient
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.common.consts import CLIENT_INFO
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+
+class BigQueryReservationServiceHook(GoogleBaseHook):
+    """
+    Hook for Google Bigquery Reservation API.
+
+    :param gcp_conn_id: The Airflow connection used for GCP credentials.
+    :param location: The location of the BigQuery resource.
+    :param delegate_to: This performs a task on one host with reference to other hosts.
+    :param impersonation_chain: This is the optional service account to impersonate using short term
+        credentials.
+    """
+
+    conn_name_attr = "gcp_conn_id"
+    default_conn_name = "google_cloud_bigquery_reservation_default"
+    conn_type = "gcpbigqueryreservation"

Review Comment:
   do we need a new connection type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org