You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/19 20:31:57 UTC

[airflow] branch main updated: Add DataFlow operations to Azure DataFactory hook (#26345)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 24d88e8fee Add DataFlow operations to Azure DataFactory hook (#26345)
24d88e8fee is described below

commit 24d88e8feedcb11edc316f0d3f20f4ea54dc23b8
Author: Phani Kumar <94...@users.noreply.github.com>
AuthorDate: Tue Sep 20 02:01:48 2022 +0530

    Add DataFlow operations to Azure DataFactory hook (#26345)
---
 .../microsoft/azure/hooks/data_factory.py          | 113 ++++++++++++++++++++-
 .../azure/hooks/test_azure_data_factory.py         |  62 +++++++++++
 2 files changed, 174 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/microsoft/azure/hooks/data_factory.py b/airflow/providers/microsoft/azure/hooks/data_factory.py
index bbe8e6baab..bc25aae884 100644
--- a/airflow/providers/microsoft/azure/hooks/data_factory.py
+++ b/airflow/providers/microsoft/azure/hooks/data_factory.py
@@ -25,6 +25,7 @@
     PipelineRun
     TriggerResource
     datafactory
+    DataFlow
     mgmt
 """
 from __future__ import annotations
@@ -39,6 +40,7 @@ from azure.identity import ClientSecretCredential, DefaultAzureCredential
 from azure.mgmt.datafactory import DataFactoryManagementClient
 from azure.mgmt.datafactory.models import (
     CreateRunResponse,
+    DataFlow,
     DatasetResource,
     Factory,
     LinkedServiceResource,
@@ -479,12 +481,121 @@ class AzureDataFactoryHook(BaseHook):
         Delete the dataset.
 
         :param dataset_name: The dataset name.
-        :param resource_group_name: The dataset name.
+        :param resource_group_name: The resource group name.
         :param factory_name: The factory name.
         :param config: Extra parameters for the ADF client.
         """
         self.get_conn().datasets.delete(resource_group_name, factory_name, dataset_name, **config)
 
+    @provide_targeted_factory
+    def get_dataflow(
+        self,
+        dataflow_name: str,
+        resource_group_name: str | None = None,
+        factory_name: str | None = None,
+        **config: Any,
+    ) -> DataFlow:
+        """
+        Get the dataflow.
+
+        :param dataflow_name: The dataflow name.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :return: The dataflow.
+        """
+        return self.get_conn().data_flows.get(resource_group_name, factory_name, dataflow_name, **config)
+
+    def _dataflow_exists(
+        self,
+        dataflow_name: str,
+        resource_group_name: str | None = None,
+        factory_name: str | None = None,
+    ) -> bool:
+        """Return whether the dataflow already exists."""
+        dataflows = {
+            dataflow.name
+            for dataflow in self.get_conn().data_flows.list_by_factory(resource_group_name, factory_name)
+        }
+
+        return dataflow_name in dataflows
+
+    @provide_targeted_factory
+    def update_dataflow(
+        self,
+        dataflow_name: str,
+        dataflow: DataFlow,
+        resource_group_name: str | None = None,
+        factory_name: str | None = None,
+        **config: Any,
+    ) -> DataFlow:
+        """
+        Update the dataflow.
+
+        :param dataflow_name: The dataflow name.
+        :param dataflow: The dataflow resource definition.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :raise AirflowException: If the dataset does not exist.
+        :return: The dataflow.
+        """
+        if not self._dataflow_exists(
+            dataflow_name,
+            resource_group_name,
+            factory_name,
+        ):
+            raise AirflowException(f"Dataflow {dataflow_name!r} does not exist.")
+
+        return self.get_conn().data_flows.create_or_update(
+            resource_group_name, factory_name, dataflow_name, dataflow, **config
+        )
+
+    @provide_targeted_factory
+    def create_dataflow(
+        self,
+        dataflow_name: str,
+        dataflow: DataFlow,
+        resource_group_name: str | None = None,
+        factory_name: str | None = None,
+        **config: Any,
+    ) -> DataFlow:
+        """
+        Create the dataflow.
+
+        :param dataflow_name: The dataflow name.
+        :param dataflow: The dataflow resource definition.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :raise AirflowException: If the dataset already exists.
+        :return: The dataset.
+        """
+        if self._dataflow_exists(dataflow_name, resource_group_name, factory_name):
+            raise AirflowException(f"Dataflow {dataflow_name!r} already exists.")
+
+        return self.get_conn().data_flows.create_or_update(
+            resource_group_name, factory_name, dataflow_name, dataflow, **config
+        )
+
+    @provide_targeted_factory
+    def delete_dataflow(
+        self,
+        dataflow_name: str,
+        resource_group_name: str | None = None,
+        factory_name: str | None = None,
+        **config: Any,
+    ) -> None:
+        """
+        Delete the dataflow.
+
+        :param dataflow_name: The dataflow name.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        """
+        self.get_conn().data_flows.delete(resource_group_name, factory_name, dataflow_name, **config)
+
     @provide_targeted_factory
     def get_pipeline(
         self,
diff --git a/tests/providers/microsoft/azure/hooks/test_azure_data_factory.py b/tests/providers/microsoft/azure/hooks/test_azure_data_factory.py
index 956d1a0ef0..cfc387c8f9 100644
--- a/tests/providers/microsoft/azure/hooks/test_azure_data_factory.py
+++ b/tests/providers/microsoft/azure/hooks/test_azure_data_factory.py
@@ -119,6 +119,7 @@ def hook():
             "pipeline_runs",
             "triggers",
             "trigger_runs",
+            "data_flows",
         ]
     )
 
@@ -342,6 +343,67 @@ def test_delete_dataset(hook: AzureDataFactoryHook, user_args, sdk_args):
     hook._conn.datasets.delete.assert_called_with(*sdk_args)
 
 
+@parametrize(
+    explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
+    implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
+)
+def test_get_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
+    hook.get_dataflow(*user_args)
+
+    hook._conn.data_flows.get.assert_called_with(*sdk_args)
+
+
+@parametrize(
+    explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
+    implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
+)
+def test_create_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
+    hook.create_dataflow(*user_args)
+
+    hook._conn.data_flows.create_or_update.assert_called_with(*sdk_args)
+
+
+@parametrize(
+    explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
+    implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
+)
+def test_update_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
+    with patch.object(hook, "_dataflow_exists") as mock_dataflow_exists:
+        mock_dataflow_exists.return_value = True
+        hook.update_dataflow(*user_args)
+
+    hook._conn.data_flows.create_or_update.assert_called_with(*sdk_args)
+
+
+@parametrize(
+    explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
+    implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
+)
+def test_update_dataflow_non_existent(hook: AzureDataFactoryHook, user_args, sdk_args):
+    with patch.object(hook, "_dataflow_exists") as mock_dataflow_exists:
+        mock_dataflow_exists.return_value = False
+
+    with pytest.raises(AirflowException, match=r"Dataflow .+ does not exist"):
+        hook.update_dataflow(*user_args)
+
+
+@parametrize(
+    explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
+    implicit_factory=(
+        (NAME,),
+        (
+            DEFAULT_RESOURCE_GROUP,
+            DEFAULT_FACTORY,
+            NAME,
+        ),
+    ),
+)
+def test_delete_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
+    hook.delete_dataflow(*user_args)
+
+    hook._conn.data_flows.delete.assert_called_with(*sdk_args)
+
+
 @parametrize(
     explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
     implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),