You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/12/15 19:52:57 UTC

(airflow) 26/42: Change retry type for Google Dataflow Client to async one (#36141)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d3e6283c643da5bf2d134a1174e52c5ef753a80d
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sat Dec 9 22:14:27 2023 +0100

    Change retry type for Google Dataflow Client to async one (#36141)
    
    Google Dataflow Client 0.8.6 implemented bugfix where retry type
    was changed to async. This caused our canary builds to fail.
    
    We change the client to Async now and bump min version of the
    client to 0.8.6.
    
    (cherry picked from commit 8d0c5d900875ce3b9dda1a86f1de534759e9d7f6)
---
 .../providers/google/cloud/hooks/bigquery_dts.py   |  3 +-
 .../providers/google/cloud/hooks/cloud_build.py    |  3 +-
 .../providers/google/cloud/hooks/cloud_composer.py |  7 ++--
 airflow/providers/google/cloud/hooks/dataplex.py   |  3 +-
 airflow/providers/google/cloud/hooks/dataproc.py   | 37 +++++++++++-----------
 .../providers/google/cloud/operators/dataproc.py   | 11 ++++---
 airflow/providers/google/provider.yaml             | 14 ++++----
 generated/provider_dependencies.json               | 14 ++++----
 .../cloud/dataproc/example_dataproc_batch.py       |  4 +--
 9 files changed, 51 insertions(+), 45 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/bigquery_dts.py b/airflow/providers/google/cloud/hooks/bigquery_dts.py
index c3f5780ea8..c7ef15fb44 100644
--- a/airflow/providers/google/cloud/hooks/bigquery_dts.py
+++ b/airflow/providers/google/cloud/hooks/bigquery_dts.py
@@ -38,6 +38,7 @@ from airflow.providers.google.common.hooks.base_google import (
 
 if TYPE_CHECKING:
     from google.api_core.retry import Retry
+    from google.api_core.retry_async import AsyncRetry
     from googleapiclient.discovery import Resource
 
 
@@ -321,7 +322,7 @@ class AsyncBiqQueryDataTransferServiceHook(GoogleBaseAsyncHook):
         run_id: str,
         project_id: str | None,
         location: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ):
diff --git a/airflow/providers/google/cloud/hooks/cloud_build.py b/airflow/providers/google/cloud/hooks/cloud_build.py
index 5cd9b798ea..189303a9ce 100644
--- a/airflow/providers/google/cloud/hooks/cloud_build.py
+++ b/airflow/providers/google/cloud/hooks/cloud_build.py
@@ -33,6 +33,7 @@ from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID
 if TYPE_CHECKING:
     from google.api_core.operation import Operation
     from google.api_core.retry import Retry
+    from google.api_core.retry_async import AsyncRetry
     from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 # Time to sleep between active checks of the operation results
@@ -645,7 +646,7 @@ class CloudBuildAsyncHook(GoogleBaseHook):
         self,
         id_: str,
         project_id: str = PROVIDE_PROJECT_ID,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
         location: str = "global",
diff --git a/airflow/providers/google/cloud/hooks/cloud_composer.py b/airflow/providers/google/cloud/hooks/cloud_composer.py
index 63170a4513..01e88df8a3 100644
--- a/airflow/providers/google/cloud/hooks/cloud_composer.py
+++ b/airflow/providers/google/cloud/hooks/cloud_composer.py
@@ -35,6 +35,7 @@ if TYPE_CHECKING:
     from google.api_core.operation import Operation
     from google.api_core.operation_async import AsyncOperation
     from google.api_core.retry import Retry
+    from google.api_core.retry_async import AsyncRetry
     from google.cloud.orchestration.airflow.service_v1.services.environments.pagers import (
         ListEnvironmentsPager,
     )
@@ -332,7 +333,7 @@ class CloudComposerAsyncHook(GoogleBaseHook):
         project_id: str,
         region: str,
         environment: Environment | dict,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> AsyncOperation:
@@ -361,7 +362,7 @@ class CloudComposerAsyncHook(GoogleBaseHook):
         project_id: str,
         region: str,
         environment_id: str,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> AsyncOperation:
@@ -389,7 +390,7 @@ class CloudComposerAsyncHook(GoogleBaseHook):
         environment_id: str,
         environment: Environment | dict,
         update_mask: dict | FieldMask,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> AsyncOperation:
diff --git a/airflow/providers/google/cloud/hooks/dataplex.py b/airflow/providers/google/cloud/hooks/dataplex.py
index 4eeefa2a3d..3832026886 100644
--- a/airflow/providers/google/cloud/hooks/dataplex.py
+++ b/airflow/providers/google/cloud/hooks/dataplex.py
@@ -40,6 +40,7 @@ from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHoo
 if TYPE_CHECKING:
     from google.api_core.operation import Operation
     from google.api_core.retry import Retry
+    from google.api_core.retry_async import AsyncRetry
     from googleapiclient.discovery import Resource
 
 PATH_DATA_SCAN = "projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
@@ -896,7 +897,7 @@ class DataplexAsyncHook(GoogleBaseAsyncHook):
         region: str,
         data_scan_id: str | None = None,
         job_id: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> Any:
diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py
index 349a8488bc..10459118c0 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -51,6 +51,7 @@ if TYPE_CHECKING:
     from google.api_core.operation_async import AsyncOperation
     from google.api_core.operations_v1.operations_client import OperationsClient
     from google.api_core.retry import Retry
+    from google.api_core.retry_async import AsyncRetry
     from google.protobuf.duration_pb2 import Duration
     from google.protobuf.field_mask_pb2 import FieldMask
 
@@ -256,7 +257,7 @@ class DataprocHook(GoogleBaseHook):
         self,
         operation: Operation,
         timeout: float | None = None,
-        result_retry: Retry | _MethodDefault = DEFAULT,
+        result_retry: AsyncRetry | _MethodDefault = DEFAULT,
     ) -> Any:
         """Wait for a long-lasting operation to complete."""
         try:
@@ -997,7 +998,7 @@ class DataprocHook(GoogleBaseHook):
         region: str,
         project_id: str,
         wait_check_interval: int = 10,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> Batch:
@@ -1132,7 +1133,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         virtual_cluster_config: dict | None = None,
         labels: dict[str, str] | None = None,
         request_id: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> AsyncOperation:
@@ -1199,7 +1200,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         project_id: str,
         cluster_uuid: str | None = None,
         request_id: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> AsyncOperation:
@@ -1242,7 +1243,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         region: str,
         cluster_name: str,
         project_id: str,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> str:
@@ -1277,7 +1278,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         region: str,
         cluster_name: str,
         project_id: str,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> Cluster:
@@ -1309,7 +1310,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         filter_: str,
         project_id: str,
         page_size: int | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ):
@@ -1349,7 +1350,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         region: str,
         graceful_decommission_timeout: dict | Duration | None = None,
         request_id: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> AsyncOperation:
@@ -1429,7 +1430,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         template: dict | WorkflowTemplate,
         project_id: str,
         region: str,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> WorkflowTemplate:
@@ -1465,7 +1466,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         version: int | None = None,
         request_id: str | None = None,
         parameters: dict[str, str] | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> AsyncOperation:
@@ -1511,7 +1512,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         project_id: str,
         region: str,
         request_id: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> AsyncOperation:
@@ -1554,7 +1555,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         job_id: str,
         project_id: str,
         region: str,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> Job:
@@ -1588,7 +1589,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         project_id: str,
         region: str,
         request_id: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> Job:
@@ -1624,7 +1625,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         job_id: str,
         project_id: str,
         region: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> Job:
@@ -1658,7 +1659,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         batch: dict | Batch,
         batch_id: str | None = None,
         request_id: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> AsyncOperation:
@@ -1703,7 +1704,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         batch_id: str,
         region: str,
         project_id: str,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> None:
@@ -1737,7 +1738,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         batch_id: str,
         region: str,
         project_id: str,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
     ) -> Batch:
@@ -1773,7 +1774,7 @@ class DataprocAsyncHook(GoogleBaseHook):
         project_id: str,
         page_size: int | None = None,
         page_token: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
         filter: str | None = None,
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index b489a79dc8..2d8e5f3d49 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -63,6 +63,7 @@ from airflow.utils import timezone
 
 if TYPE_CHECKING:
     from google.api_core import operation
+    from google.api_core.retry_async import AsyncRetry
     from google.protobuf.duration_pb2 import Duration
     from google.protobuf.field_mask_pb2 import FieldMask
 
@@ -592,7 +593,7 @@ class DataprocCreateClusterOperator(GoogleCloudBaseOperator):
         request_id: str | None = None,
         delete_on_error: bool = True,
         use_if_exists: bool = True,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float = 1 * 60 * 60,
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
@@ -985,7 +986,7 @@ class DataprocDeleteClusterOperator(GoogleCloudBaseOperator):
         project_id: str | None = None,
         cluster_uuid: str | None = None,
         request_id: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float = 1 * 60 * 60,
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
@@ -1891,7 +1892,7 @@ class DataprocInstantiateWorkflowTemplateOperator(GoogleCloudBaseOperator):
         version: int | None = None,
         request_id: str | None = None,
         parameters: dict[str, str] | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
@@ -2340,7 +2341,7 @@ class DataprocUpdateClusterOperator(GoogleCloudBaseOperator):
         region: str,
         request_id: str | None = None,
         project_id: str | None = None,
-        retry: Retry | _MethodDefault = DEFAULT,
+        retry: AsyncRetry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
@@ -2480,7 +2481,7 @@ class DataprocCreateBatchOperator(GoogleCloudBaseOperator):
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        result_retry: Retry | _MethodDefault = DEFAULT,
+        result_retry: AsyncRetry | _MethodDefault = DEFAULT,
         asynchronous: bool = False,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
         polling_interval_seconds: int = 5,
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 1e6bcbe69b..4a7f9262f1 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -94,17 +94,17 @@ dependencies:
   - google-auth>=1.0.0
   - google-auth-httplib2>=0.0.1
   - google-cloud-aiplatform>=1.22.1
-  - google-cloud-automl>=2.11.0
-  - google-cloud-bigquery-datatransfer>=3.11.0
+  - google-cloud-automl>=2.12.0
+  - google-cloud-bigquery-datatransfer>=3.13.0
   - google-cloud-bigtable>=2.17.0
-  - google-cloud-build>=3.13.0
+  - google-cloud-build>=3.22.0
   - google-cloud-compute>=1.10.0
   - google-cloud-container>=2.17.4
   - google-cloud-datacatalog>=3.11.1
-  - google-cloud-dataflow-client>=0.8.2
+  - google-cloud-dataflow-client>=0.8.6
   - google-cloud-dataform>=0.5.0
-  - google-cloud-dataplex>=1.4.2
-  - google-cloud-dataproc>=5.5.0
+  - google-cloud-dataplex>=1.10.0
+  - google-cloud-dataproc>=5.8.0
   - google-cloud-dataproc-metastore>=1.12.0
   - google-cloud-dlp>=3.12.0
   - google-cloud-kms>=2.15.0
@@ -112,7 +112,7 @@ dependencies:
   - google-cloud-logging>=3.5.0
   - google-cloud-memcache>=1.7.0
   - google-cloud-monitoring>=2.14.1
-  - google-cloud-orchestration-airflow>=1.7.0
+  - google-cloud-orchestration-airflow>=1.10.0
   - google-cloud-os-login>=2.9.1
   - google-cloud-pubsub>=2.15.0
   - google-cloud-redis>=2.12.0
diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json
index c47736d9ac..60ffee762d 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -422,26 +422,26 @@
       "google-auth-httplib2>=0.0.1",
       "google-auth>=1.0.0",
       "google-cloud-aiplatform>=1.22.1",
-      "google-cloud-automl>=2.11.0",
+      "google-cloud-automl>=2.12.0",
       "google-cloud-batch>=0.13.0",
-      "google-cloud-bigquery-datatransfer>=3.11.0",
+      "google-cloud-bigquery-datatransfer>=3.13.0",
       "google-cloud-bigtable>=2.17.0",
-      "google-cloud-build>=3.13.0",
+      "google-cloud-build>=3.22.0",
       "google-cloud-compute>=1.10.0",
       "google-cloud-container>=2.17.4",
       "google-cloud-datacatalog>=3.11.1",
-      "google-cloud-dataflow-client>=0.8.2",
+      "google-cloud-dataflow-client>=0.8.6",
       "google-cloud-dataform>=0.5.0",
-      "google-cloud-dataplex>=1.4.2",
+      "google-cloud-dataplex>=1.10.0",
       "google-cloud-dataproc-metastore>=1.12.0",
-      "google-cloud-dataproc>=5.5.0",
+      "google-cloud-dataproc>=5.8.0",
       "google-cloud-dlp>=3.12.0",
       "google-cloud-kms>=2.15.0",
       "google-cloud-language>=2.9.0",
       "google-cloud-logging>=3.5.0",
       "google-cloud-memcache>=1.7.0",
       "google-cloud-monitoring>=2.14.1",
-      "google-cloud-orchestration-airflow>=1.7.0",
+      "google-cloud-orchestration-airflow>=1.10.0",
       "google-cloud-os-login>=2.9.1",
       "google-cloud-pubsub>=2.15.0",
       "google-cloud-redis>=2.12.0",
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
index 7dd5eff73a..7d126fc28b 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 import os
 from datetime import datetime
 
-from google.api_core.retry import Retry
+from google.api_core.retry_async import AsyncRetry
 
 from airflow.models.dag import DAG
 from airflow.providers.google.cloud.operators.dataproc import (
@@ -75,7 +75,7 @@ with DAG(
         region=REGION,
         batch=BATCH_CONFIG,
         batch_id=BATCH_ID_2,
-        result_retry=Retry(maximum=10.0, initial=10.0, multiplier=1.0),
+        result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
     )
 
     create_batch_3 = DataprocCreateBatchOperator(