You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/12/15 19:52:57 UTC
(airflow) 26/42: Change retry type for Google Dataflow Client to async one (#36141)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d3e6283c643da5bf2d134a1174e52c5ef753a80d
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sat Dec 9 22:14:27 2023 +0100
Change retry type for Google Dataflow Client to async one (#36141)
Google Dataflow Client 0.8.6 implemented bugfix where retry type
was changed to async. This caused our canary builds to fail.
We change the client to Async now and bump min version of the
client to 0.8.6.
(cherry picked from commit 8d0c5d900875ce3b9dda1a86f1de534759e9d7f6)
---
.../providers/google/cloud/hooks/bigquery_dts.py | 3 +-
.../providers/google/cloud/hooks/cloud_build.py | 3 +-
.../providers/google/cloud/hooks/cloud_composer.py | 7 ++--
airflow/providers/google/cloud/hooks/dataplex.py | 3 +-
airflow/providers/google/cloud/hooks/dataproc.py | 37 +++++++++++-----------
.../providers/google/cloud/operators/dataproc.py | 11 ++++---
airflow/providers/google/provider.yaml | 14 ++++----
generated/provider_dependencies.json | 14 ++++----
.../cloud/dataproc/example_dataproc_batch.py | 4 +--
9 files changed, 51 insertions(+), 45 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery_dts.py b/airflow/providers/google/cloud/hooks/bigquery_dts.py
index c3f5780ea8..c7ef15fb44 100644
--- a/airflow/providers/google/cloud/hooks/bigquery_dts.py
+++ b/airflow/providers/google/cloud/hooks/bigquery_dts.py
@@ -38,6 +38,7 @@ from airflow.providers.google.common.hooks.base_google import (
if TYPE_CHECKING:
from google.api_core.retry import Retry
+ from google.api_core.retry_async import AsyncRetry
from googleapiclient.discovery import Resource
@@ -321,7 +322,7 @@ class AsyncBiqQueryDataTransferServiceHook(GoogleBaseAsyncHook):
run_id: str,
project_id: str | None,
location: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
):
diff --git a/airflow/providers/google/cloud/hooks/cloud_build.py b/airflow/providers/google/cloud/hooks/cloud_build.py
index 5cd9b798ea..189303a9ce 100644
--- a/airflow/providers/google/cloud/hooks/cloud_build.py
+++ b/airflow/providers/google/cloud/hooks/cloud_build.py
@@ -33,6 +33,7 @@ from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID
if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
+ from google.api_core.retry_async import AsyncRetry
from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
# Time to sleep between active checks of the operation results
@@ -645,7 +646,7 @@ class CloudBuildAsyncHook(GoogleBaseHook):
self,
id_: str,
project_id: str = PROVIDE_PROJECT_ID,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
diff --git a/airflow/providers/google/cloud/hooks/cloud_composer.py b/airflow/providers/google/cloud/hooks/cloud_composer.py
index 63170a4513..01e88df8a3 100644
--- a/airflow/providers/google/cloud/hooks/cloud_composer.py
+++ b/airflow/providers/google/cloud/hooks/cloud_composer.py
@@ -35,6 +35,7 @@ if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.operation_async import AsyncOperation
from google.api_core.retry import Retry
+ from google.api_core.retry_async import AsyncRetry
from google.cloud.orchestration.airflow.service_v1.services.environments.pagers import (
ListEnvironmentsPager,
)
@@ -332,7 +333,7 @@ class CloudComposerAsyncHook(GoogleBaseHook):
project_id: str,
region: str,
environment: Environment | dict,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -361,7 +362,7 @@ class CloudComposerAsyncHook(GoogleBaseHook):
project_id: str,
region: str,
environment_id: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -389,7 +390,7 @@ class CloudComposerAsyncHook(GoogleBaseHook):
environment_id: str,
environment: Environment | dict,
update_mask: dict | FieldMask,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
diff --git a/airflow/providers/google/cloud/hooks/dataplex.py b/airflow/providers/google/cloud/hooks/dataplex.py
index 4eeefa2a3d..3832026886 100644
--- a/airflow/providers/google/cloud/hooks/dataplex.py
+++ b/airflow/providers/google/cloud/hooks/dataplex.py
@@ -40,6 +40,7 @@ from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHoo
if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
+ from google.api_core.retry_async import AsyncRetry
from googleapiclient.discovery import Resource
PATH_DATA_SCAN = "projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
@@ -896,7 +897,7 @@ class DataplexAsyncHook(GoogleBaseAsyncHook):
region: str,
data_scan_id: str | None = None,
job_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Any:
diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py
index 349a8488bc..10459118c0 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -51,6 +51,7 @@ if TYPE_CHECKING:
from google.api_core.operation_async import AsyncOperation
from google.api_core.operations_v1.operations_client import OperationsClient
from google.api_core.retry import Retry
+ from google.api_core.retry_async import AsyncRetry
from google.protobuf.duration_pb2 import Duration
from google.protobuf.field_mask_pb2 import FieldMask
@@ -256,7 +257,7 @@ class DataprocHook(GoogleBaseHook):
self,
operation: Operation,
timeout: float | None = None,
- result_retry: Retry | _MethodDefault = DEFAULT,
+ result_retry: AsyncRetry | _MethodDefault = DEFAULT,
) -> Any:
"""Wait for a long-lasting operation to complete."""
try:
@@ -997,7 +998,7 @@ class DataprocHook(GoogleBaseHook):
region: str,
project_id: str,
wait_check_interval: int = 10,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Batch:
@@ -1132,7 +1133,7 @@ class DataprocAsyncHook(GoogleBaseHook):
virtual_cluster_config: dict | None = None,
labels: dict[str, str] | None = None,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1199,7 +1200,7 @@ class DataprocAsyncHook(GoogleBaseHook):
project_id: str,
cluster_uuid: str | None = None,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1242,7 +1243,7 @@ class DataprocAsyncHook(GoogleBaseHook):
region: str,
cluster_name: str,
project_id: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> str:
@@ -1277,7 +1278,7 @@ class DataprocAsyncHook(GoogleBaseHook):
region: str,
cluster_name: str,
project_id: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Cluster:
@@ -1309,7 +1310,7 @@ class DataprocAsyncHook(GoogleBaseHook):
filter_: str,
project_id: str,
page_size: int | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
):
@@ -1349,7 +1350,7 @@ class DataprocAsyncHook(GoogleBaseHook):
region: str,
graceful_decommission_timeout: dict | Duration | None = None,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1429,7 +1430,7 @@ class DataprocAsyncHook(GoogleBaseHook):
template: dict | WorkflowTemplate,
project_id: str,
region: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> WorkflowTemplate:
@@ -1465,7 +1466,7 @@ class DataprocAsyncHook(GoogleBaseHook):
version: int | None = None,
request_id: str | None = None,
parameters: dict[str, str] | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1511,7 +1512,7 @@ class DataprocAsyncHook(GoogleBaseHook):
project_id: str,
region: str,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1554,7 +1555,7 @@ class DataprocAsyncHook(GoogleBaseHook):
job_id: str,
project_id: str,
region: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Job:
@@ -1588,7 +1589,7 @@ class DataprocAsyncHook(GoogleBaseHook):
project_id: str,
region: str,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Job:
@@ -1624,7 +1625,7 @@ class DataprocAsyncHook(GoogleBaseHook):
job_id: str,
project_id: str,
region: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Job:
@@ -1658,7 +1659,7 @@ class DataprocAsyncHook(GoogleBaseHook):
batch: dict | Batch,
batch_id: str | None = None,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1703,7 +1704,7 @@ class DataprocAsyncHook(GoogleBaseHook):
batch_id: str,
region: str,
project_id: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> None:
@@ -1737,7 +1738,7 @@ class DataprocAsyncHook(GoogleBaseHook):
batch_id: str,
region: str,
project_id: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Batch:
@@ -1773,7 +1774,7 @@ class DataprocAsyncHook(GoogleBaseHook):
project_id: str,
page_size: int | None = None,
page_token: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
filter: str | None = None,
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index b489a79dc8..2d8e5f3d49 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -63,6 +63,7 @@ from airflow.utils import timezone
if TYPE_CHECKING:
from google.api_core import operation
+ from google.api_core.retry_async import AsyncRetry
from google.protobuf.duration_pb2 import Duration
from google.protobuf.field_mask_pb2 import FieldMask
@@ -592,7 +593,7 @@ class DataprocCreateClusterOperator(GoogleCloudBaseOperator):
request_id: str | None = None,
delete_on_error: bool = True,
use_if_exists: bool = True,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float = 1 * 60 * 60,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
@@ -985,7 +986,7 @@ class DataprocDeleteClusterOperator(GoogleCloudBaseOperator):
project_id: str | None = None,
cluster_uuid: str | None = None,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float = 1 * 60 * 60,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
@@ -1891,7 +1892,7 @@ class DataprocInstantiateWorkflowTemplateOperator(GoogleCloudBaseOperator):
version: int | None = None,
request_id: str | None = None,
parameters: dict[str, str] | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
@@ -2340,7 +2341,7 @@ class DataprocUpdateClusterOperator(GoogleCloudBaseOperator):
region: str,
request_id: str | None = None,
project_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
@@ -2480,7 +2481,7 @@ class DataprocCreateBatchOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- result_retry: Retry | _MethodDefault = DEFAULT,
+ result_retry: AsyncRetry | _MethodDefault = DEFAULT,
asynchronous: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
polling_interval_seconds: int = 5,
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 1e6bcbe69b..4a7f9262f1 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -94,17 +94,17 @@ dependencies:
- google-auth>=1.0.0
- google-auth-httplib2>=0.0.1
- google-cloud-aiplatform>=1.22.1
- - google-cloud-automl>=2.11.0
- - google-cloud-bigquery-datatransfer>=3.11.0
+ - google-cloud-automl>=2.12.0
+ - google-cloud-bigquery-datatransfer>=3.13.0
- google-cloud-bigtable>=2.17.0
- - google-cloud-build>=3.13.0
+ - google-cloud-build>=3.22.0
- google-cloud-compute>=1.10.0
- google-cloud-container>=2.17.4
- google-cloud-datacatalog>=3.11.1
- - google-cloud-dataflow-client>=0.8.2
+ - google-cloud-dataflow-client>=0.8.6
- google-cloud-dataform>=0.5.0
- - google-cloud-dataplex>=1.4.2
- - google-cloud-dataproc>=5.5.0
+ - google-cloud-dataplex>=1.10.0
+ - google-cloud-dataproc>=5.8.0
- google-cloud-dataproc-metastore>=1.12.0
- google-cloud-dlp>=3.12.0
- google-cloud-kms>=2.15.0
@@ -112,7 +112,7 @@ dependencies:
- google-cloud-logging>=3.5.0
- google-cloud-memcache>=1.7.0
- google-cloud-monitoring>=2.14.1
- - google-cloud-orchestration-airflow>=1.7.0
+ - google-cloud-orchestration-airflow>=1.10.0
- google-cloud-os-login>=2.9.1
- google-cloud-pubsub>=2.15.0
- google-cloud-redis>=2.12.0
diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json
index c47736d9ac..60ffee762d 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -422,26 +422,26 @@
"google-auth-httplib2>=0.0.1",
"google-auth>=1.0.0",
"google-cloud-aiplatform>=1.22.1",
- "google-cloud-automl>=2.11.0",
+ "google-cloud-automl>=2.12.0",
"google-cloud-batch>=0.13.0",
- "google-cloud-bigquery-datatransfer>=3.11.0",
+ "google-cloud-bigquery-datatransfer>=3.13.0",
"google-cloud-bigtable>=2.17.0",
- "google-cloud-build>=3.13.0",
+ "google-cloud-build>=3.22.0",
"google-cloud-compute>=1.10.0",
"google-cloud-container>=2.17.4",
"google-cloud-datacatalog>=3.11.1",
- "google-cloud-dataflow-client>=0.8.2",
+ "google-cloud-dataflow-client>=0.8.6",
"google-cloud-dataform>=0.5.0",
- "google-cloud-dataplex>=1.4.2",
+ "google-cloud-dataplex>=1.10.0",
"google-cloud-dataproc-metastore>=1.12.0",
- "google-cloud-dataproc>=5.5.0",
+ "google-cloud-dataproc>=5.8.0",
"google-cloud-dlp>=3.12.0",
"google-cloud-kms>=2.15.0",
"google-cloud-language>=2.9.0",
"google-cloud-logging>=3.5.0",
"google-cloud-memcache>=1.7.0",
"google-cloud-monitoring>=2.14.1",
- "google-cloud-orchestration-airflow>=1.7.0",
+ "google-cloud-orchestration-airflow>=1.10.0",
"google-cloud-os-login>=2.9.1",
"google-cloud-pubsub>=2.15.0",
"google-cloud-redis>=2.12.0",
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
index 7dd5eff73a..7d126fc28b 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
@@ -22,7 +22,7 @@ from __future__ import annotations
import os
from datetime import datetime
-from google.api_core.retry import Retry
+from google.api_core.retry_async import AsyncRetry
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.dataproc import (
@@ -75,7 +75,7 @@ with DAG(
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_2,
- result_retry=Retry(maximum=10.0, initial=10.0, multiplier=1.0),
+ result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
)
create_batch_3 = DataprocCreateBatchOperator(