You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/14 11:19:03 UTC
[airflow] branch master updated: Support google-cloud-tasks>=2.0.0
(#13347)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new ef8617e Support google-cloud-tasks>=2.0.0 (#13347)
ef8617e is described below
commit ef8617ec9d6e4b7c433a29bd388f5102a7a17c11
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Thu Jan 14 12:18:49 2021 +0100
Support google-cloud-tasks>=2.0.0 (#13347)
---
airflow/providers/google/ADDITIONAL_INFO.md | 4 +-
airflow/providers/google/cloud/hooks/tasks.py | 118 +++++++++++----------
airflow/providers/google/cloud/operators/tasks.py | 39 ++++---
setup.py | 2 +-
tests/providers/google/cloud/hooks/test_tasks.py | 86 +++++++--------
.../providers/google/cloud/operators/test_tasks.py | 65 ++++++++++--
6 files changed, 176 insertions(+), 138 deletions(-)
diff --git a/airflow/providers/google/ADDITIONAL_INFO.md b/airflow/providers/google/ADDITIONAL_INFO.md
index 800703b..c696e1b 100644
--- a/airflow/providers/google/ADDITIONAL_INFO.md
+++ b/airflow/providers/google/ADDITIONAL_INFO.md
@@ -32,10 +32,10 @@ Details are covered in the UPDATING.md files for each library, but there are som
| [``google-cloud-automl``](https://pypi.org/project/google-cloud-automl/) | ``>=0.4.0,<2.0.0`` | ``>=2.1.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-bigquery-automl/blob/master/UPGRADING.md) |
| [``google-cloud-bigquery-datatransfer``](https://pypi.org/project/google-cloud-bigquery-datatransfer/) | ``>=0.4.0,<2.0.0`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-bigquery-datatransfer/blob/master/UPGRADING.md) |
| [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
+| [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
| [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
| [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
-| [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
-
+| [``google-cloud-tasks``](https://pypi.org/project/google-cloud-tasks/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-tasks/blob/master/UPGRADING.md) |
### The field names use the snake_case convention
diff --git a/airflow/providers/google/cloud/hooks/tasks.py b/airflow/providers/google/cloud/hooks/tasks.py
index 1c3223d..633f227 100644
--- a/airflow/providers/google/cloud/hooks/tasks.py
+++ b/airflow/providers/google/cloud/hooks/tasks.py
@@ -21,11 +21,13 @@ This module contains a CloudTasksHook
which allows you to connect to Google Cloud Tasks service,
performing actions to queues or tasks.
"""
+
from typing import Dict, List, Optional, Sequence, Tuple, Union
from google.api_core.retry import Retry
-from google.cloud.tasks_v2 import CloudTasksClient, enums
-from google.cloud.tasks_v2.types import FieldMask, Queue, Task
+from google.cloud.tasks_v2 import CloudTasksClient
+from google.cloud.tasks_v2.types import Queue, Task
+from google.protobuf.field_mask_pb2 import FieldMask
from airflow.exceptions import AirflowException
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
@@ -120,20 +122,19 @@ class CloudTasksHook(GoogleBaseHook):
client = self.get_conn()
if queue_name:
- full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name)
+ full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
if isinstance(task_queue, Queue):
task_queue.name = full_queue_name
elif isinstance(task_queue, dict):
task_queue['name'] = full_queue_name
else:
raise AirflowException('Unable to set queue_name.')
- full_location_path = CloudTasksClient.location_path(project_id, location)
+ full_location_path = f"projects/{project_id}/locations/{location}"
return client.create_queue(
- parent=full_location_path,
- queue=task_queue,
+ request={'parent': full_location_path, 'queue': task_queue},
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
@GoogleBaseHook.fallback_to_default_project_id
@@ -167,7 +168,7 @@ class CloudTasksHook(GoogleBaseHook):
:param update_mask: A mast used to specify which fields of the queue are being updated.
If empty, then all fields will be updated.
If a dict is provided, it must be of the same form as the protobuf message.
- :type update_mask: dict or google.cloud.tasks_v2.types.FieldMask
+ :type update_mask: dict or google.protobuf.field_mask_pb2.FieldMask
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@@ -182,7 +183,7 @@ class CloudTasksHook(GoogleBaseHook):
client = self.get_conn()
if queue_name and location:
- full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name)
+ full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
if isinstance(task_queue, Queue):
task_queue.name = full_queue_name
elif isinstance(task_queue, dict):
@@ -190,11 +191,10 @@ class CloudTasksHook(GoogleBaseHook):
else:
raise AirflowException('Unable to set queue_name.')
return client.update_queue(
- queue=task_queue,
- update_mask=update_mask,
+ request={'queue': task_queue, 'update_mask': update_mask},
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
@GoogleBaseHook.fallback_to_default_project_id
@@ -230,8 +230,10 @@ class CloudTasksHook(GoogleBaseHook):
"""
client = self.get_conn()
- full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name)
- return client.get_queue(name=full_queue_name, retry=retry, timeout=timeout, metadata=metadata)
+ full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
+ return client.get_queue(
+ request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
+ )
@GoogleBaseHook.fallback_to_default_project_id
def list_queues(
@@ -270,14 +272,12 @@ class CloudTasksHook(GoogleBaseHook):
"""
client = self.get_conn()
- full_location_path = CloudTasksClient.location_path(project_id, location)
+ full_location_path = f"projects/{project_id}/locations/{location}"
queues = client.list_queues(
- parent=full_location_path,
- filter_=results_filter,
- page_size=page_size,
+ request={'parent': full_location_path, 'filter': results_filter, 'page_size': page_size},
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
return list(queues)
@@ -313,8 +313,10 @@ class CloudTasksHook(GoogleBaseHook):
"""
client = self.get_conn()
- full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name)
- client.delete_queue(name=full_queue_name, retry=retry, timeout=timeout, metadata=metadata)
+ full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
+ client.delete_queue(
+ request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
+ )
@GoogleBaseHook.fallback_to_default_project_id
def purge_queue(
@@ -349,8 +351,10 @@ class CloudTasksHook(GoogleBaseHook):
"""
client = self.get_conn()
- full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name)
- return client.purge_queue(name=full_queue_name, retry=retry, timeout=timeout, metadata=metadata)
+ full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
+ return client.purge_queue(
+ request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
+ )
@GoogleBaseHook.fallback_to_default_project_id
def pause_queue(
@@ -385,8 +389,10 @@ class CloudTasksHook(GoogleBaseHook):
"""
client = self.get_conn()
- full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name)
- return client.pause_queue(name=full_queue_name, retry=retry, timeout=timeout, metadata=metadata)
+ full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
+ return client.pause_queue(
+ request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
+ )
@GoogleBaseHook.fallback_to_default_project_id
def resume_queue(
@@ -421,8 +427,10 @@ class CloudTasksHook(GoogleBaseHook):
"""
client = self.get_conn()
- full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name)
- return client.resume_queue(name=full_queue_name, retry=retry, timeout=timeout, metadata=metadata)
+ full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
+ return client.resume_queue(
+ request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
+ )
@GoogleBaseHook.fallback_to_default_project_id
def create_task(
@@ -432,7 +440,7 @@ class CloudTasksHook(GoogleBaseHook):
task: Union[Dict, Task],
project_id: str,
task_name: Optional[str] = None,
- response_view: Optional[enums.Task.View] = None,
+ response_view: Optional = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
@@ -455,7 +463,7 @@ class CloudTasksHook(GoogleBaseHook):
:type task_name: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
- :type response_view: google.cloud.tasks_v2.enums.Task.View
+ :type response_view: google.cloud.tasks_v2.Task.View
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@@ -470,21 +478,21 @@ class CloudTasksHook(GoogleBaseHook):
client = self.get_conn()
if task_name:
- full_task_name = CloudTasksClient.task_path(project_id, location, queue_name, task_name)
+ full_task_name = (
+ f"projects/{project_id}/locations/{location}/queues/{queue_name}/tasks/{task_name}"
+ )
if isinstance(task, Task):
task.name = full_task_name
elif isinstance(task, dict):
task['name'] = full_task_name
else:
raise AirflowException('Unable to set task_name.')
- full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name)
+ full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
return client.create_task(
- parent=full_queue_name,
- task=task,
- response_view=response_view,
+ request={'parent': full_queue_name, 'task': task, 'response_view': response_view},
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
@GoogleBaseHook.fallback_to_default_project_id
@@ -494,7 +502,7 @@ class CloudTasksHook(GoogleBaseHook):
queue_name: str,
task_name: str,
project_id: str,
- response_view: Optional[enums.Task.View] = None,
+ response_view: Optional = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
@@ -513,7 +521,7 @@ class CloudTasksHook(GoogleBaseHook):
:type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
- :type response_view: google.cloud.tasks_v2.enums.Task.View
+ :type response_view: google.cloud.tasks_v2.Task.View
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@@ -527,13 +535,12 @@ class CloudTasksHook(GoogleBaseHook):
"""
client = self.get_conn()
- full_task_name = CloudTasksClient.task_path(project_id, location, queue_name, task_name)
+ full_task_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}/tasks/{task_name}"
return client.get_task(
- name=full_task_name,
- response_view=response_view,
+ request={'name': full_task_name, 'response_view': response_view},
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
@GoogleBaseHook.fallback_to_default_project_id
@@ -542,7 +549,7 @@ class CloudTasksHook(GoogleBaseHook):
location: str,
queue_name: str,
project_id: str,
- response_view: Optional[enums.Task.View] = None,
+ response_view: Optional = None,
page_size: Optional[int] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
@@ -560,7 +567,7 @@ class CloudTasksHook(GoogleBaseHook):
:type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
- :type response_view: google.cloud.tasks_v2.enums.Task.View
+ :type response_view: google.cloud.tasks_v2.Task.View
:param page_size: (Optional) The maximum number of resources contained in the
underlying API response.
:type page_size: int
@@ -576,14 +583,12 @@ class CloudTasksHook(GoogleBaseHook):
:rtype: list[google.cloud.tasks_v2.types.Task]
"""
client = self.get_conn()
- full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name)
+ full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
tasks = client.list_tasks(
- parent=full_queue_name,
- response_view=response_view,
- page_size=page_size,
+ request={'parent': full_queue_name, 'response_view': response_view, 'page_size': page_size},
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
return list(tasks)
@@ -622,8 +627,10 @@ class CloudTasksHook(GoogleBaseHook):
"""
client = self.get_conn()
- full_task_name = CloudTasksClient.task_path(project_id, location, queue_name, task_name)
- client.delete_task(name=full_task_name, retry=retry, timeout=timeout, metadata=metadata)
+ full_task_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}/tasks/{task_name}"
+ client.delete_task(
+ request={'name': full_task_name}, retry=retry, timeout=timeout, metadata=metadata or ()
+ )
@GoogleBaseHook.fallback_to_default_project_id
def run_task(
@@ -632,7 +639,7 @@ class CloudTasksHook(GoogleBaseHook):
queue_name: str,
task_name: str,
project_id: str,
- response_view: Optional[enums.Task.View] = None,
+ response_view: Optional = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
@@ -651,7 +658,7 @@ class CloudTasksHook(GoogleBaseHook):
:type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
- :type response_view: google.cloud.tasks_v2.enums.Task.View
+ :type response_view: google.cloud.tasks_v2.Task.View
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@@ -665,11 +672,10 @@ class CloudTasksHook(GoogleBaseHook):
"""
client = self.get_conn()
- full_task_name = CloudTasksClient.task_path(project_id, location, queue_name, task_name)
+ full_task_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}/tasks/{task_name}"
return client.run_task(
- name=full_task_name,
- response_view=response_view,
+ request={'name': full_task_name, 'response_view': response_view},
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
diff --git a/airflow/providers/google/cloud/operators/tasks.py b/airflow/providers/google/cloud/operators/tasks.py
index 7867d29..2834b32 100644
--- a/airflow/providers/google/cloud/operators/tasks.py
+++ b/airflow/providers/google/cloud/operators/tasks.py
@@ -25,9 +25,8 @@ from typing import Dict, Optional, Sequence, Tuple, Union
from google.api_core.exceptions import AlreadyExists
from google.api_core.retry import Retry
-from google.cloud.tasks_v2 import enums
-from google.cloud.tasks_v2.types import FieldMask, Queue, Task
-from google.protobuf.json_format import MessageToDict
+from google.cloud.tasks_v2.types import Queue, Task
+from google.protobuf.field_mask_pb2 import FieldMask
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.tasks import CloudTasksHook
@@ -136,7 +135,7 @@ class CloudTasksQueueCreateOperator(BaseOperator):
metadata=self.metadata,
)
- return MessageToDict(queue)
+ return Queue.to_dict(queue)
class CloudTasksQueueUpdateOperator(BaseOperator):
@@ -159,7 +158,7 @@ class CloudTasksQueueUpdateOperator(BaseOperator):
:param update_mask: A mast used to specify which fields of the queue are being updated.
If empty, then all fields will be updated.
If a dict is provided, it must be of the same form as the protobuf message.
- :type update_mask: dict or google.cloud.tasks_v2.types.FieldMask
+ :type update_mask: dict or google.protobuf.field_mask_pb2.FieldMask
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@@ -237,7 +236,7 @@ class CloudTasksQueueUpdateOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- return MessageToDict(queue)
+ return Queue.to_dict(queue)
class CloudTasksQueueGetOperator(BaseOperator):
@@ -320,7 +319,7 @@ class CloudTasksQueueGetOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- return MessageToDict(queue)
+ return Queue.to_dict(queue)
class CloudTasksQueuesListOperator(BaseOperator):
@@ -408,7 +407,7 @@ class CloudTasksQueuesListOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- return [MessageToDict(q) for q in queues]
+ return [Queue.to_dict(q) for q in queues]
class CloudTasksQueueDeleteOperator(BaseOperator):
@@ -571,7 +570,7 @@ class CloudTasksQueuePurgeOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- return MessageToDict(queue)
+ return Queue.to_dict(queue)
class CloudTasksQueuePauseOperator(BaseOperator):
@@ -654,7 +653,7 @@ class CloudTasksQueuePauseOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- return MessageToDict(queue)
+ return Queue.to_dict(queue)
class CloudTasksQueueResumeOperator(BaseOperator):
@@ -737,7 +736,7 @@ class CloudTasksQueueResumeOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- return MessageToDict(queue)
+ return Queue.to_dict(queue)
class CloudTasksTaskCreateOperator(BaseOperator):
@@ -803,7 +802,7 @@ class CloudTasksTaskCreateOperator(BaseOperator):
task: Union[Dict, Task],
project_id: Optional[str] = None,
task_name: Optional[str] = None,
- response_view: Optional[enums.Task.View] = None,
+ response_view: Optional = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
@@ -840,7 +839,7 @@ class CloudTasksTaskCreateOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- return MessageToDict(task)
+ return Task.to_dict(task)
class CloudTasksTaskGetOperator(BaseOperator):
@@ -900,7 +899,7 @@ class CloudTasksTaskGetOperator(BaseOperator):
queue_name: str,
task_name: str,
project_id: Optional[str] = None,
- response_view: Optional[enums.Task.View] = None,
+ response_view: Optional = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
@@ -935,7 +934,7 @@ class CloudTasksTaskGetOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- return MessageToDict(task)
+ return Task.to_dict(task)
class CloudTasksTasksListOperator(BaseOperator):
@@ -994,7 +993,7 @@ class CloudTasksTasksListOperator(BaseOperator):
location: str,
queue_name: str,
project_id: Optional[str] = None,
- response_view: Optional[enums.Task.View] = None,
+ response_view: Optional = None,
page_size: Optional[int] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
@@ -1030,7 +1029,7 @@ class CloudTasksTasksListOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- return [MessageToDict(t) for t in tasks]
+ return [Task.to_dict(t) for t in tasks]
class CloudTasksTaskDeleteOperator(BaseOperator):
@@ -1134,7 +1133,7 @@ class CloudTasksTaskRunOperator(BaseOperator):
:type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
- :type response_view: google.cloud.tasks_v2.enums.Task.View
+ :type response_view: google.cloud.tasks_v2.Task.View
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@@ -1176,7 +1175,7 @@ class CloudTasksTaskRunOperator(BaseOperator):
queue_name: str,
task_name: str,
project_id: Optional[str] = None,
- response_view: Optional[enums.Task.View] = None,
+ response_view: Optional = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
@@ -1211,4 +1210,4 @@ class CloudTasksTaskRunOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- return MessageToDict(task)
+ return Task.to_dict(task)
diff --git a/setup.py b/setup.py
index 6a5c07e..16a4e2a 100644
--- a/setup.py
+++ b/setup.py
@@ -300,7 +300,7 @@ google = [
'google-cloud-spanner>=1.10.0,<2.0.0',
'google-cloud-speech>=0.36.3,<2.0.0',
'google-cloud-storage>=1.30,<2.0.0',
- 'google-cloud-tasks>=1.2.1,<2.0.0',
+ 'google-cloud-tasks>=2.0.0,<3.0.0',
'google-cloud-texttospeech>=0.4.0,<2.0.0',
'google-cloud-translate>=1.5.0,<2.0.0',
'google-cloud-videointelligence>=1.7.0,<2.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_tasks.py b/tests/providers/google/cloud/hooks/test_tasks.py
index 8be6686..6504595 100644
--- a/tests/providers/google/cloud/hooks/test_tasks.py
+++ b/tests/providers/google/cloud/hooks/test_tasks.py
@@ -72,11 +72,10 @@ class TestCloudTasksHook(unittest.TestCase):
self.assertIs(result, API_RESPONSE)
get_conn.return_value.create_queue.assert_called_once_with(
- parent=FULL_LOCATION_PATH,
- queue=Queue(name=FULL_QUEUE_PATH),
+ request=dict(parent=FULL_LOCATION_PATH, queue=Queue(name=FULL_QUEUE_PATH)),
retry=None,
timeout=None,
- metadata=None,
+ metadata=(),
)
@mock.patch(
@@ -94,11 +93,10 @@ class TestCloudTasksHook(unittest.TestCase):
self.assertIs(result, API_RESPONSE)
get_conn.return_value.update_queue.assert_called_once_with(
- queue=Queue(name=FULL_QUEUE_PATH, state=3),
- update_mask=None,
+ request=dict(queue=Queue(name=FULL_QUEUE_PATH, state=3), update_mask=None),
retry=None,
timeout=None,
- metadata=None,
+ metadata=(),
)
@mock.patch(
@@ -111,30 +109,28 @@ class TestCloudTasksHook(unittest.TestCase):
self.assertIs(result, API_RESPONSE)
get_conn.return_value.get_queue.assert_called_once_with(
- name=FULL_QUEUE_PATH, retry=None, timeout=None, metadata=None
+ request=dict(name=FULL_QUEUE_PATH), retry=None, timeout=None, metadata=()
)
@mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
- **{"return_value.list_queues.return_value": API_RESPONSE}, # type: ignore
+ **{"return_value.list_queues.return_value": [Queue(name=FULL_QUEUE_PATH)]}, # type: ignore
)
def test_list_queues(self, get_conn):
result = self.hook.list_queues(location=LOCATION, project_id=PROJECT_ID)
- self.assertEqual(result, list(API_RESPONSE))
+ self.assertEqual(result, [Queue(name=FULL_QUEUE_PATH)])
get_conn.return_value.list_queues.assert_called_once_with(
- parent=FULL_LOCATION_PATH,
- filter_=None,
- page_size=None,
+ request=dict(parent=FULL_LOCATION_PATH, filter=None, page_size=None),
retry=None,
timeout=None,
- metadata=None,
+ metadata=(),
)
@mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
- **{"return_value.delete_queue.return_value": API_RESPONSE}, # type: ignore
+ **{"return_value.delete_queue.return_value": None}, # type: ignore
)
def test_delete_queue(self, get_conn):
result = self.hook.delete_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID)
@@ -142,51 +138,51 @@ class TestCloudTasksHook(unittest.TestCase):
self.assertEqual(result, None)
get_conn.return_value.delete_queue.assert_called_once_with(
- name=FULL_QUEUE_PATH, retry=None, timeout=None, metadata=None
+ request=dict(name=FULL_QUEUE_PATH), retry=None, timeout=None, metadata=()
)
@mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
- **{"return_value.purge_queue.return_value": API_RESPONSE}, # type: ignore
+ **{"return_value.purge_queue.return_value": Queue(name=FULL_QUEUE_PATH)}, # type: ignore
)
def test_purge_queue(self, get_conn):
result = self.hook.purge_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID)
- self.assertEqual(result, API_RESPONSE)
+ self.assertEqual(result, Queue(name=FULL_QUEUE_PATH))
get_conn.return_value.purge_queue.assert_called_once_with(
- name=FULL_QUEUE_PATH, retry=None, timeout=None, metadata=None
+ request=dict(name=FULL_QUEUE_PATH), retry=None, timeout=None, metadata=()
)
@mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
- **{"return_value.pause_queue.return_value": API_RESPONSE}, # type: ignore
+ **{"return_value.pause_queue.return_value": Queue(name=FULL_QUEUE_PATH)}, # type: ignore
)
def test_pause_queue(self, get_conn):
result = self.hook.pause_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID)
- self.assertEqual(result, API_RESPONSE)
+ self.assertEqual(result, Queue(name=FULL_QUEUE_PATH))
get_conn.return_value.pause_queue.assert_called_once_with(
- name=FULL_QUEUE_PATH, retry=None, timeout=None, metadata=None
+ request=dict(name=FULL_QUEUE_PATH), retry=None, timeout=None, metadata=()
)
@mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
- **{"return_value.resume_queue.return_value": API_RESPONSE}, # type: ignore
+ **{"return_value.resume_queue.return_value": Queue(name=FULL_QUEUE_PATH)}, # type: ignore
)
def test_resume_queue(self, get_conn):
result = self.hook.resume_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID)
- self.assertEqual(result, API_RESPONSE)
+ self.assertEqual(result, Queue(name=FULL_QUEUE_PATH))
get_conn.return_value.resume_queue.assert_called_once_with(
- name=FULL_QUEUE_PATH, retry=None, timeout=None, metadata=None
+ request=dict(name=FULL_QUEUE_PATH), retry=None, timeout=None, metadata=()
)
@mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
- **{"return_value.create_task.return_value": API_RESPONSE}, # type: ignore
+ **{"return_value.create_task.return_value": Task(name=FULL_TASK_PATH)}, # type: ignore
)
def test_create_task(self, get_conn):
result = self.hook.create_task(
@@ -197,20 +193,18 @@ class TestCloudTasksHook(unittest.TestCase):
task_name=TASK_NAME,
)
- self.assertEqual(result, API_RESPONSE)
+ self.assertEqual(result, Task(name=FULL_TASK_PATH))
get_conn.return_value.create_task.assert_called_once_with(
- parent=FULL_QUEUE_PATH,
- task=Task(name=FULL_TASK_PATH),
- response_view=None,
+ request=dict(parent=FULL_QUEUE_PATH, task=Task(name=FULL_TASK_PATH), response_view=None),
retry=None,
timeout=None,
- metadata=None,
+ metadata=(),
)
@mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
- **{"return_value.get_task.return_value": API_RESPONSE}, # type: ignore
+ **{"return_value.get_task.return_value": Task(name=FULL_TASK_PATH)}, # type: ignore
)
def test_get_task(self, get_conn):
result = self.hook.get_task(
@@ -220,37 +214,34 @@ class TestCloudTasksHook(unittest.TestCase):
project_id=PROJECT_ID,
)
- self.assertEqual(result, API_RESPONSE)
+ self.assertEqual(result, Task(name=FULL_TASK_PATH))
get_conn.return_value.get_task.assert_called_once_with(
- name=FULL_TASK_PATH,
- response_view=None,
+ request=dict(name=FULL_TASK_PATH, response_view=None),
retry=None,
timeout=None,
- metadata=None,
+ metadata=(),
)
@mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
- **{"return_value.list_tasks.return_value": API_RESPONSE}, # type: ignore
+ **{"return_value.list_tasks.return_value": [Task(name=FULL_TASK_PATH)]}, # type: ignore
)
def test_list_tasks(self, get_conn):
result = self.hook.list_tasks(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID)
- self.assertEqual(result, list(API_RESPONSE))
+ self.assertEqual(result, [Task(name=FULL_TASK_PATH)])
get_conn.return_value.list_tasks.assert_called_once_with(
- parent=FULL_QUEUE_PATH,
- response_view=None,
- page_size=None,
+ request=dict(parent=FULL_QUEUE_PATH, response_view=None, page_size=None),
retry=None,
timeout=None,
- metadata=None,
+ metadata=(),
)
@mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
- **{"return_value.delete_task.return_value": API_RESPONSE}, # type: ignore
+ **{"return_value.delete_task.return_value": None}, # type: ignore
)
def test_delete_task(self, get_conn):
result = self.hook.delete_task(
@@ -263,12 +254,12 @@ class TestCloudTasksHook(unittest.TestCase):
self.assertEqual(result, None)
get_conn.return_value.delete_task.assert_called_once_with(
- name=FULL_TASK_PATH, retry=None, timeout=None, metadata=None
+ request=dict(name=FULL_TASK_PATH), retry=None, timeout=None, metadata=()
)
@mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
- **{"return_value.run_task.return_value": API_RESPONSE}, # type: ignore
+ **{"return_value.run_task.return_value": Task(name=FULL_TASK_PATH)}, # type: ignore
)
def test_run_task(self, get_conn):
result = self.hook.run_task(
@@ -278,12 +269,11 @@ class TestCloudTasksHook(unittest.TestCase):
project_id=PROJECT_ID,
)
- self.assertEqual(result, API_RESPONSE)
+ self.assertEqual(result, Task(name=FULL_TASK_PATH))
get_conn.return_value.run_task.assert_called_once_with(
- name=FULL_TASK_PATH,
- response_view=None,
+ request=dict(name=FULL_TASK_PATH, response_view=None),
retry=None,
timeout=None,
- metadata=None,
+ metadata=(),
)
diff --git a/tests/providers/google/cloud/operators/test_tasks.py b/tests/providers/google/cloud/operators/test_tasks.py
index b7e886d..ed76911 100644
--- a/tests/providers/google/cloud/operators/test_tasks.py
+++ b/tests/providers/google/cloud/operators/test_tasks.py
@@ -57,7 +57,7 @@ class TestCloudTasksQueueCreate(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual({'name': FULL_QUEUE_PATH}, result)
+ self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,
@@ -81,7 +81,7 @@ class TestCloudTasksQueueUpdate(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual({'name': FULL_QUEUE_PATH}, result)
+ self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,
@@ -106,7 +106,7 @@ class TestCloudTasksQueueGet(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual({'name': FULL_QUEUE_PATH}, result)
+ self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,
@@ -129,7 +129,7 @@ class TestCloudTasksQueuesList(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual([{'name': FULL_QUEUE_PATH}], result)
+ self.assertEqual([{'name': FULL_QUEUE_PATH, 'state': 0}], result)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,
@@ -176,7 +176,7 @@ class TestCloudTasksQueuePurge(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual({'name': FULL_QUEUE_PATH}, result)
+ self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,
@@ -199,7 +199,7 @@ class TestCloudTasksQueuePause(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual({'name': FULL_QUEUE_PATH}, result)
+ self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,
@@ -222,7 +222,7 @@ class TestCloudTasksQueueResume(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual({'name': FULL_QUEUE_PATH}, result)
+ self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,
@@ -247,7 +247,16 @@ class TestCloudTasksTaskCreate(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual({'appEngineHttpRequest': {}}, result)
+ self.assertEqual(
+ {
+ 'app_engine_http_request': {'body': '', 'headers': {}, 'http_method': 0, 'relative_uri': ''},
+ 'dispatch_count': 0,
+ 'name': '',
+ 'response_count': 0,
+ 'view': 0,
+ },
+ result,
+ )
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,
@@ -275,7 +284,16 @@ class TestCloudTasksTaskGet(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual({'appEngineHttpRequest': {}}, result)
+ self.assertEqual(
+ {
+ 'app_engine_http_request': {'body': '', 'headers': {}, 'http_method': 0, 'relative_uri': ''},
+ 'dispatch_count': 0,
+ 'name': '',
+ 'response_count': 0,
+ 'view': 0,
+ },
+ result,
+ )
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,
@@ -300,7 +318,23 @@ class TestCloudTasksTasksList(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual([{'appEngineHttpRequest': {}}], result)
+ self.assertEqual(
+ [
+ {
+ 'app_engine_http_request': {
+ 'body': '',
+ 'headers': {},
+ 'http_method': 0,
+ 'relative_uri': '',
+ },
+ 'dispatch_count': 0,
+ 'name': '',
+ 'response_count': 0,
+ 'view': 0,
+ }
+ ],
+ result,
+ )
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,
@@ -353,7 +387,16 @@ class TestCloudTasksTaskRun(unittest.TestCase):
result = operator.execute(context=None)
- self.assertEqual({'appEngineHttpRequest': {}}, result)
+ self.assertEqual(
+ {
+ 'app_engine_http_request': {'body': '', 'headers': {}, 'http_method': 0, 'relative_uri': ''},
+ 'dispatch_count': 0,
+ 'name': '',
+ 'response_count': 0,
+ 'view': 0,
+ },
+ result,
+ )
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None,