You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/10/13 10:57:21 UTC
[airflow] branch main updated: Refactor DataFusionInstanceLink usage (#34514)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d27d0bb60b Refactor DataFusionInstanceLink usage (#34514)
d27d0bb60b is described below
commit d27d0bb60b08ed8550491d4801ba5bf3c0e3da9b
Author: max <42...@users.noreply.github.com>
AuthorDate: Fri Oct 13 12:57:12 2023 +0200
Refactor DataFusionInstanceLink usage (#34514)
---
.../providers/google/cloud/operators/datafusion.py | 21 +++++++++++++++------
airflow/providers/google/cloud/utils/helpers.py | 21 +++++++++++++++++++++
.../google/cloud/operators/test_datafusion.py | 17 +++++++++++++----
tests/providers/google/cloud/utils/test_helpers.py | 19 ++++++++++++++++++-
4 files changed, 67 insertions(+), 11 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/datafusion.py b/airflow/providers/google/cloud/operators/datafusion.py
index b2149495f9..4f62b82407 100644
--- a/airflow/providers/google/cloud/operators/datafusion.py
+++ b/airflow/providers/google/cloud/operators/datafusion.py
@@ -24,7 +24,7 @@ from google.api_core.retry import exponential_sleep_generator
from googleapiclient.errors import HttpError
from airflow.configuration import conf
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.hooks.datafusion import SUCCESS_STATES, DataFusionHook, PipelineStates
from airflow.providers.google.cloud.links.datafusion import (
DataFusionInstanceLink,
@@ -34,16 +34,25 @@ from airflow.providers.google.cloud.links.datafusion import (
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
from airflow.providers.google.cloud.triggers.datafusion import DataFusionStartPipelineTrigger
from airflow.providers.google.cloud.utils.datafusion import DataFusionPipelineType
+from airflow.providers.google.cloud.utils.helpers import resource_path_to_dict
if TYPE_CHECKING:
from airflow.utils.context import Context
class DataFusionPipelineLinkHelper:
- """Helper class for Pipeline links."""
+ """
+ Helper class for Pipeline links.
+
+ .. warning::
+ This class is deprecated. Consider using ``resource_path_to_dict()`` instead.
+ """
@staticmethod
def get_project_id(instance):
+ raise AirflowProviderDeprecationWarning(
+ "DataFusionPipelineLinkHelper is deprecated. Consider using resource_path_to_dict() instead."
+ )
instance = instance["name"]
project_id = next(x for x in instance.split("/") if x.startswith("airflow"))
return project_id
@@ -114,7 +123,7 @@ class CloudDataFusionRestartInstanceOperator(GoogleCloudBaseOperator):
instance = hook.wait_for_operation(operation)
self.log.info("Instance %s restarted successfully", self.instance_name)
- project_id = self.project_id or DataFusionPipelineLinkHelper.get_project_id(instance)
+ project_id = resource_path_to_dict(resource_name=instance["name"])["projects"]
DataFusionInstanceLink.persist(
context=context,
task_instance=self,
@@ -272,7 +281,7 @@ class CloudDataFusionCreateInstanceOperator(GoogleCloudBaseOperator):
instance_name=self.instance_name, location=self.location, project_id=self.project_id
)
- project_id = self.project_id or DataFusionPipelineLinkHelper.get_project_id(instance)
+ project_id = resource_path_to_dict(resource_name=instance["name"])["projects"]
DataFusionInstanceLink.persist(
context=context,
task_instance=self,
@@ -361,7 +370,7 @@ class CloudDataFusionUpdateInstanceOperator(GoogleCloudBaseOperator):
instance = hook.wait_for_operation(operation)
self.log.info("Instance %s updated successfully", self.instance_name)
- project_id = self.project_id or DataFusionPipelineLinkHelper.get_project_id(instance)
+ project_id = resource_path_to_dict(resource_name=instance["name"])["projects"]
DataFusionInstanceLink.persist(
context=context,
task_instance=self,
@@ -432,7 +441,7 @@ class CloudDataFusionGetInstanceOperator(GoogleCloudBaseOperator):
project_id=self.project_id,
)
- project_id = self.project_id or DataFusionPipelineLinkHelper.get_project_id(instance)
+ project_id = resource_path_to_dict(resource_name=instance["name"])["projects"]
DataFusionInstanceLink.persist(
context=context,
task_instance=self,
diff --git a/airflow/providers/google/cloud/utils/helpers.py b/airflow/providers/google/cloud/utils/helpers.py
index 72216ec20b..a0ff3e58ef 100644
--- a/airflow/providers/google/cloud/utils/helpers.py
+++ b/airflow/providers/google/cloud/utils/helpers.py
@@ -21,3 +21,24 @@ from __future__ import annotations
def normalize_directory_path(source_object: str | None) -> str | None:
"""Makes sure dir path ends with a slash."""
return source_object + "/" if source_object and not source_object.endswith("/") else source_object
+
+
+def resource_path_to_dict(resource_name: str) -> dict[str, str]:
+ """Converts a path-like GCP resource name into a dictionary.
+
+ For example, the path `projects/my-project/locations/my-location/instances/my-instance` will be converted
+ to a dict:
+ `{"projects": "my-project",
+ "locations": "my-location",
+ "instances": "my-instance",}`
+ """
+ if not resource_name:
+ return {}
+ path_items = resource_name.split("/")
+ if len(path_items) % 2:
+ raise ValueError(
+ "Invalid resource_name. Expected the path-like name consisting of key/value pairs "
+ "'key1/value1/key2/value2/...', for example 'projects/<project>/locations/<location>'."
+ )
+ iterator = iter(path_items)
+ return dict(zip(iterator, iterator))
diff --git a/tests/providers/google/cloud/operators/test_datafusion.py b/tests/providers/google/cloud/operators/test_datafusion.py
index a06b019f5e..2783d3fc62 100644
--- a/tests/providers/google/cloud/operators/test_datafusion.py
+++ b/tests/providers/google/cloud/operators/test_datafusion.py
@@ -39,6 +39,7 @@ from airflow.providers.google.cloud.triggers.datafusion import DataFusionStartPi
from airflow.providers.google.cloud.utils.datafusion import DataFusionPipelineType
HOOK_STR = "airflow.providers.google.cloud.operators.datafusion.DataFusionHook"
+RESOURCE_PATH_TO_DICT_STR = "airflow.providers.google.cloud.operators.datafusion.resource_path_to_dict"
TASK_ID = "test_task"
LOCATION = "test-location"
@@ -54,9 +55,11 @@ RUNTIME_ARGS = {"arg1": "a", "arg2": "b"}
class TestCloudDataFusionUpdateInstanceOperator:
+ @mock.patch(RESOURCE_PATH_TO_DICT_STR)
@mock.patch(HOOK_STR)
- def test_execute_check_hook_call_should_execute_successfully(self, mock_hook):
+ def test_execute_check_hook_call_should_execute_successfully(self, mock_hook, mock_resource_to_dict):
update_maks = "instance.name"
+ mock_resource_to_dict.return_value = {"projects": PROJECT_ID}
op = CloudDataFusionUpdateInstanceOperator(
task_id="test_tasks",
instance_name=INSTANCE_NAME,
@@ -78,8 +81,10 @@ class TestCloudDataFusionUpdateInstanceOperator:
class TestCloudDataFusionRestartInstanceOperator:
+ @mock.patch(RESOURCE_PATH_TO_DICT_STR)
@mock.patch(HOOK_STR)
- def test_execute_check_hook_call_should_execute_successfully(self, mock_hook):
+ def test_execute_check_hook_call_should_execute_successfully(self, mock_hook, mock_resource_path_to_dict):
+ mock_resource_path_to_dict.return_value = {"projects": PROJECT_ID}
op = CloudDataFusionRestartInstanceOperator(
task_id="test_tasks",
instance_name=INSTANCE_NAME,
@@ -95,8 +100,10 @@ class TestCloudDataFusionRestartInstanceOperator:
class TestCloudDataFusionCreateInstanceOperator:
+ @mock.patch(RESOURCE_PATH_TO_DICT_STR)
@mock.patch(HOOK_STR)
- def test_execute_check_hook_call_should_execute_successfully(self, mock_hook):
+ def test_execute_check_hook_call_should_execute_successfully(self, mock_hook, mock_resource_path_to_dict):
+ mock_resource_path_to_dict.return_value = {"projects": PROJECT_ID}
op = CloudDataFusionCreateInstanceOperator(
task_id="test_tasks",
instance_name=INSTANCE_NAME,
@@ -133,8 +140,10 @@ class TestCloudDataFusionDeleteInstanceOperator:
class TestCloudDataFusionGetInstanceOperator:
+ @mock.patch(RESOURCE_PATH_TO_DICT_STR)
@mock.patch(HOOK_STR)
- def test_execute_check_hook_call_should_execute_successfully(self, mock_hook):
+ def test_execute_check_hook_call_should_execute_successfully(self, mock_hook, mock_resource_path_to_dict):
+ mock_resource_path_to_dict.return_value = {"projects": PROJECT_ID}
op = CloudDataFusionGetInstanceOperator(
task_id="test_tasks",
instance_name=INSTANCE_NAME,
diff --git a/tests/providers/google/cloud/utils/test_helpers.py b/tests/providers/google/cloud/utils/test_helpers.py
index 9055af89da..c277b8470b 100644
--- a/tests/providers/google/cloud/utils/test_helpers.py
+++ b/tests/providers/google/cloud/utils/test_helpers.py
@@ -16,7 +16,9 @@
# under the License.
from __future__ import annotations
-from airflow.providers.google.cloud.utils.helpers import normalize_directory_path
+import pytest
+
+from airflow.providers.google.cloud.utils.helpers import normalize_directory_path, resource_path_to_dict
class TestHelpers:
@@ -24,3 +26,18 @@ class TestHelpers:
assert normalize_directory_path("dir_path") == "dir_path/"
assert normalize_directory_path("dir_path/") == "dir_path/"
assert normalize_directory_path(None) is None
+
+ def test_resource_path_to_dict(self):
+ resource_name = "key1/value1/key2/value2"
+ expected_dict = {"key1": "value1", "key2": "value2"}
+ actual_dict = resource_path_to_dict(resource_name=resource_name)
+ assert set(actual_dict.items()) == set(expected_dict.items())
+
+ def test_resource_path_to_dict_empty(self):
+ resource_name = ""
+ expected_dict = {}
+ assert resource_path_to_dict(resource_name=resource_name) == expected_dict
+
+ def test_resource_path_to_dict_fail(self):
+ with pytest.raises(ValueError):
+ resource_path_to_dict(resource_name="key/value/key")