You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/10/13 10:57:21 UTC

[airflow] branch main updated: Refactor DataFusionInstanceLink usage (#34514)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d27d0bb60b Refactor DataFusionInstanceLink usage (#34514)
d27d0bb60b is described below

commit d27d0bb60b08ed8550491d4801ba5bf3c0e3da9b
Author: max <42...@users.noreply.github.com>
AuthorDate: Fri Oct 13 12:57:12 2023 +0200

    Refactor DataFusionInstanceLink usage (#34514)
---
 .../providers/google/cloud/operators/datafusion.py  | 21 +++++++++++++++------
 airflow/providers/google/cloud/utils/helpers.py     | 21 +++++++++++++++++++++
 .../google/cloud/operators/test_datafusion.py       | 17 +++++++++++++----
 tests/providers/google/cloud/utils/test_helpers.py  | 19 ++++++++++++++++++-
 4 files changed, 67 insertions(+), 11 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/datafusion.py b/airflow/providers/google/cloud/operators/datafusion.py
index b2149495f9..4f62b82407 100644
--- a/airflow/providers/google/cloud/operators/datafusion.py
+++ b/airflow/providers/google/cloud/operators/datafusion.py
@@ -24,7 +24,7 @@ from google.api_core.retry import exponential_sleep_generator
 from googleapiclient.errors import HttpError
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
 from airflow.providers.google.cloud.hooks.datafusion import SUCCESS_STATES, DataFusionHook, PipelineStates
 from airflow.providers.google.cloud.links.datafusion import (
     DataFusionInstanceLink,
@@ -34,16 +34,25 @@ from airflow.providers.google.cloud.links.datafusion import (
 from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
 from airflow.providers.google.cloud.triggers.datafusion import DataFusionStartPipelineTrigger
 from airflow.providers.google.cloud.utils.datafusion import DataFusionPipelineType
+from airflow.providers.google.cloud.utils.helpers import resource_path_to_dict
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
 class DataFusionPipelineLinkHelper:
-    """Helper class for Pipeline links."""
+    """
+    Helper class for Pipeline links.
+
+    .. warning::
+        This class is deprecated. Consider using ``resource_path_to_dict()`` instead.
+    """
 
     @staticmethod
     def get_project_id(instance):
+        raise AirflowProviderDeprecationWarning(
+            "DataFusionPipelineLinkHelper is deprecated. Consider using resource_path_to_dict() instead."
+        )
         instance = instance["name"]
         project_id = next(x for x in instance.split("/") if x.startswith("airflow"))
         return project_id
@@ -114,7 +123,7 @@ class CloudDataFusionRestartInstanceOperator(GoogleCloudBaseOperator):
         instance = hook.wait_for_operation(operation)
         self.log.info("Instance %s restarted successfully", self.instance_name)
 
-        project_id = self.project_id or DataFusionPipelineLinkHelper.get_project_id(instance)
+        project_id = resource_path_to_dict(resource_name=instance["name"])["projects"]
         DataFusionInstanceLink.persist(
             context=context,
             task_instance=self,
@@ -272,7 +281,7 @@ class CloudDataFusionCreateInstanceOperator(GoogleCloudBaseOperator):
                     instance_name=self.instance_name, location=self.location, project_id=self.project_id
                 )
 
-        project_id = self.project_id or DataFusionPipelineLinkHelper.get_project_id(instance)
+        project_id = resource_path_to_dict(resource_name=instance["name"])["projects"]
         DataFusionInstanceLink.persist(
             context=context,
             task_instance=self,
@@ -361,7 +370,7 @@ class CloudDataFusionUpdateInstanceOperator(GoogleCloudBaseOperator):
         instance = hook.wait_for_operation(operation)
         self.log.info("Instance %s updated successfully", self.instance_name)
 
-        project_id = self.project_id or DataFusionPipelineLinkHelper.get_project_id(instance)
+        project_id = resource_path_to_dict(resource_name=instance["name"])["projects"]
         DataFusionInstanceLink.persist(
             context=context,
             task_instance=self,
@@ -432,7 +441,7 @@ class CloudDataFusionGetInstanceOperator(GoogleCloudBaseOperator):
             project_id=self.project_id,
         )
 
-        project_id = self.project_id or DataFusionPipelineLinkHelper.get_project_id(instance)
+        project_id = resource_path_to_dict(resource_name=instance["name"])["projects"]
         DataFusionInstanceLink.persist(
             context=context,
             task_instance=self,
diff --git a/airflow/providers/google/cloud/utils/helpers.py b/airflow/providers/google/cloud/utils/helpers.py
index 72216ec20b..a0ff3e58ef 100644
--- a/airflow/providers/google/cloud/utils/helpers.py
+++ b/airflow/providers/google/cloud/utils/helpers.py
@@ -21,3 +21,24 @@ from __future__ import annotations
 def normalize_directory_path(source_object: str | None) -> str | None:
     """Makes sure dir path ends with a slash."""
     return source_object + "/" if source_object and not source_object.endswith("/") else source_object
+
+
+def resource_path_to_dict(resource_name: str) -> dict[str, str]:
+    """Converts a path-like GCP resource name into a dictionary.
+
+    For example, the path `projects/my-project/locations/my-location/instances/my-instance` will be converted
+    to a dict:
+    `{"projects": "my-project",
+    "locations": "my-location",
+    "instances": "my-instance",}`
+    """
+    if not resource_name:
+        return {}
+    path_items = resource_name.split("/")
+    if len(path_items) % 2:
+        raise ValueError(
+            "Invalid resource_name. Expected the path-like name consisting of key/value pairs "
+            "'key1/value1/key2/value2/...', for example 'projects/<project>/locations/<location>'."
+        )
+    iterator = iter(path_items)
+    return dict(zip(iterator, iterator))
diff --git a/tests/providers/google/cloud/operators/test_datafusion.py b/tests/providers/google/cloud/operators/test_datafusion.py
index a06b019f5e..2783d3fc62 100644
--- a/tests/providers/google/cloud/operators/test_datafusion.py
+++ b/tests/providers/google/cloud/operators/test_datafusion.py
@@ -39,6 +39,7 @@ from airflow.providers.google.cloud.triggers.datafusion import DataFusionStartPi
 from airflow.providers.google.cloud.utils.datafusion import DataFusionPipelineType
 
 HOOK_STR = "airflow.providers.google.cloud.operators.datafusion.DataFusionHook"
+RESOURCE_PATH_TO_DICT_STR = "airflow.providers.google.cloud.operators.datafusion.resource_path_to_dict"
 
 TASK_ID = "test_task"
 LOCATION = "test-location"
@@ -54,9 +55,11 @@ RUNTIME_ARGS = {"arg1": "a", "arg2": "b"}
 
 
 class TestCloudDataFusionUpdateInstanceOperator:
+    @mock.patch(RESOURCE_PATH_TO_DICT_STR)
     @mock.patch(HOOK_STR)
-    def test_execute_check_hook_call_should_execute_successfully(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, mock_hook, mock_resource_to_dict):
         update_maks = "instance.name"
+        mock_resource_to_dict.return_value = {"projects": PROJECT_ID}
         op = CloudDataFusionUpdateInstanceOperator(
             task_id="test_tasks",
             instance_name=INSTANCE_NAME,
@@ -78,8 +81,10 @@ class TestCloudDataFusionUpdateInstanceOperator:
 
 
 class TestCloudDataFusionRestartInstanceOperator:
+    @mock.patch(RESOURCE_PATH_TO_DICT_STR)
     @mock.patch(HOOK_STR)
-    def test_execute_check_hook_call_should_execute_successfully(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, mock_hook, mock_resource_path_to_dict):
+        mock_resource_path_to_dict.return_value = {"projects": PROJECT_ID}
         op = CloudDataFusionRestartInstanceOperator(
             task_id="test_tasks",
             instance_name=INSTANCE_NAME,
@@ -95,8 +100,10 @@ class TestCloudDataFusionRestartInstanceOperator:
 
 
 class TestCloudDataFusionCreateInstanceOperator:
+    @mock.patch(RESOURCE_PATH_TO_DICT_STR)
     @mock.patch(HOOK_STR)
-    def test_execute_check_hook_call_should_execute_successfully(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, mock_hook, mock_resource_path_to_dict):
+        mock_resource_path_to_dict.return_value = {"projects": PROJECT_ID}
         op = CloudDataFusionCreateInstanceOperator(
             task_id="test_tasks",
             instance_name=INSTANCE_NAME,
@@ -133,8 +140,10 @@ class TestCloudDataFusionDeleteInstanceOperator:
 
 
 class TestCloudDataFusionGetInstanceOperator:
+    @mock.patch(RESOURCE_PATH_TO_DICT_STR)
     @mock.patch(HOOK_STR)
-    def test_execute_check_hook_call_should_execute_successfully(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, mock_hook, mock_resource_path_to_dict):
+        mock_resource_path_to_dict.return_value = {"projects": PROJECT_ID}
         op = CloudDataFusionGetInstanceOperator(
             task_id="test_tasks",
             instance_name=INSTANCE_NAME,
diff --git a/tests/providers/google/cloud/utils/test_helpers.py b/tests/providers/google/cloud/utils/test_helpers.py
index 9055af89da..c277b8470b 100644
--- a/tests/providers/google/cloud/utils/test_helpers.py
+++ b/tests/providers/google/cloud/utils/test_helpers.py
@@ -16,7 +16,9 @@
 # under the License.
 from __future__ import annotations
 
-from airflow.providers.google.cloud.utils.helpers import normalize_directory_path
+import pytest
+
+from airflow.providers.google.cloud.utils.helpers import normalize_directory_path, resource_path_to_dict
 
 
 class TestHelpers:
@@ -24,3 +26,18 @@ class TestHelpers:
         assert normalize_directory_path("dir_path") == "dir_path/"
         assert normalize_directory_path("dir_path/") == "dir_path/"
         assert normalize_directory_path(None) is None
+
+    def test_resource_path_to_dict(self):
+        resource_name = "key1/value1/key2/value2"
+        expected_dict = {"key1": "value1", "key2": "value2"}
+        actual_dict = resource_path_to_dict(resource_name=resource_name)
+        assert set(actual_dict.items()) == set(expected_dict.items())
+
+    def test_resource_path_to_dict_empty(self):
+        resource_name = ""
+        expected_dict = {}
+        assert resource_path_to_dict(resource_name=resource_name) == expected_dict
+
+    def test_resource_path_to_dict_fail(self):
+        with pytest.raises(ValueError):
+            resource_path_to_dict(resource_name="key/value/key")