You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/19 20:31:57 UTC
[airflow] branch main updated: Add DataFlow operations to Azure DataFactory hook (#26345)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 24d88e8fee Add DataFlow operations to Azure DataFactory hook (#26345)
24d88e8fee is described below
commit 24d88e8feedcb11edc316f0d3f20f4ea54dc23b8
Author: Phani Kumar <94...@users.noreply.github.com>
AuthorDate: Tue Sep 20 02:01:48 2022 +0530
Add DataFlow operations to Azure DataFactory hook (#26345)
---
.../microsoft/azure/hooks/data_factory.py | 113 ++++++++++++++++++++-
.../azure/hooks/test_azure_data_factory.py | 62 +++++++++++
2 files changed, 174 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/microsoft/azure/hooks/data_factory.py b/airflow/providers/microsoft/azure/hooks/data_factory.py
index bbe8e6baab..bc25aae884 100644
--- a/airflow/providers/microsoft/azure/hooks/data_factory.py
+++ b/airflow/providers/microsoft/azure/hooks/data_factory.py
@@ -25,6 +25,7 @@
PipelineRun
TriggerResource
datafactory
+ DataFlow
mgmt
"""
from __future__ import annotations
@@ -39,6 +40,7 @@ from azure.identity import ClientSecretCredential, DefaultAzureCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import (
CreateRunResponse,
+ DataFlow,
DatasetResource,
Factory,
LinkedServiceResource,
@@ -479,12 +481,121 @@ class AzureDataFactoryHook(BaseHook):
Delete the dataset.
:param dataset_name: The dataset name.
- :param resource_group_name: The dataset name.
+ :param resource_group_name: The resource group name.
:param factory_name: The factory name.
:param config: Extra parameters for the ADF client.
"""
self.get_conn().datasets.delete(resource_group_name, factory_name, dataset_name, **config)
+ @provide_targeted_factory
+ def get_dataflow(
+ self,
+ dataflow_name: str,
+ resource_group_name: str | None = None,
+ factory_name: str | None = None,
+ **config: Any,
+ ) -> DataFlow:
+ """
+ Get the dataflow.
+
+ :param dataflow_name: The dataflow name.
+ :param resource_group_name: The resource group name.
+ :param factory_name: The factory name.
+ :param config: Extra parameters for the ADF client.
+ :return: The dataflow.
+ """
+ return self.get_conn().data_flows.get(resource_group_name, factory_name, dataflow_name, **config)
+
+ def _dataflow_exists(
+ self,
+ dataflow_name: str,
+ resource_group_name: str | None = None,
+ factory_name: str | None = None,
+ ) -> bool:
+ """Return whether the dataflow already exists."""
+ dataflows = {
+ dataflow.name
+ for dataflow in self.get_conn().data_flows.list_by_factory(resource_group_name, factory_name)
+ }
+
+ return dataflow_name in dataflows
+
+ @provide_targeted_factory
+ def update_dataflow(
+ self,
+ dataflow_name: str,
+ dataflow: DataFlow,
+ resource_group_name: str | None = None,
+ factory_name: str | None = None,
+ **config: Any,
+ ) -> DataFlow:
+ """
+ Update the dataflow.
+
+ :param dataflow_name: The dataflow name.
+ :param dataflow: The dataflow resource definition.
+ :param resource_group_name: The resource group name.
+ :param factory_name: The factory name.
+ :param config: Extra parameters for the ADF client.
+ :raise AirflowException: If the dataset does not exist.
+ :return: The dataflow.
+ """
+ if not self._dataflow_exists(
+ dataflow_name,
+ resource_group_name,
+ factory_name,
+ ):
+ raise AirflowException(f"Dataflow {dataflow_name!r} does not exist.")
+
+ return self.get_conn().data_flows.create_or_update(
+ resource_group_name, factory_name, dataflow_name, dataflow, **config
+ )
+
+ @provide_targeted_factory
+ def create_dataflow(
+ self,
+ dataflow_name: str,
+ dataflow: DataFlow,
+ resource_group_name: str | None = None,
+ factory_name: str | None = None,
+ **config: Any,
+ ) -> DataFlow:
+ """
+ Create the dataflow.
+
+ :param dataflow_name: The dataflow name.
+ :param dataflow: The dataflow resource definition.
+ :param resource_group_name: The resource group name.
+ :param factory_name: The factory name.
+ :param config: Extra parameters for the ADF client.
+ :raise AirflowException: If the dataset already exists.
+ :return: The dataset.
+ """
+ if self._dataflow_exists(dataflow_name, resource_group_name, factory_name):
+ raise AirflowException(f"Dataflow {dataflow_name!r} already exists.")
+
+ return self.get_conn().data_flows.create_or_update(
+ resource_group_name, factory_name, dataflow_name, dataflow, **config
+ )
+
+ @provide_targeted_factory
+ def delete_dataflow(
+ self,
+ dataflow_name: str,
+ resource_group_name: str | None = None,
+ factory_name: str | None = None,
+ **config: Any,
+ ) -> None:
+ """
+ Delete the dataflow.
+
+ :param dataflow_name: The dataflow name.
+ :param resource_group_name: The resource group name.
+ :param factory_name: The factory name.
+ :param config: Extra parameters for the ADF client.
+ """
+ self.get_conn().data_flows.delete(resource_group_name, factory_name, dataflow_name, **config)
+
@provide_targeted_factory
def get_pipeline(
self,
diff --git a/tests/providers/microsoft/azure/hooks/test_azure_data_factory.py b/tests/providers/microsoft/azure/hooks/test_azure_data_factory.py
index 956d1a0ef0..cfc387c8f9 100644
--- a/tests/providers/microsoft/azure/hooks/test_azure_data_factory.py
+++ b/tests/providers/microsoft/azure/hooks/test_azure_data_factory.py
@@ -119,6 +119,7 @@ def hook():
"pipeline_runs",
"triggers",
"trigger_runs",
+ "data_flows",
]
)
@@ -342,6 +343,67 @@ def test_delete_dataset(hook: AzureDataFactoryHook, user_args, sdk_args):
hook._conn.datasets.delete.assert_called_with(*sdk_args)
+@parametrize(
+ explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
+ implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
+)
+def test_get_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
+ hook.get_dataflow(*user_args)
+
+ hook._conn.data_flows.get.assert_called_with(*sdk_args)
+
+
+@parametrize(
+ explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
+ implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
+)
+def test_create_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
+ hook.create_dataflow(*user_args)
+
+ hook._conn.data_flows.create_or_update.assert_called_with(*sdk_args)
+
+
+@parametrize(
+ explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
+ implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
+)
+def test_update_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
+ with patch.object(hook, "_dataflow_exists") as mock_dataflow_exists:
+ mock_dataflow_exists.return_value = True
+ hook.update_dataflow(*user_args)
+
+ hook._conn.data_flows.create_or_update.assert_called_with(*sdk_args)
+
+
+@parametrize(
+ explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
+ implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
+)
+def test_update_dataflow_non_existent(hook: AzureDataFactoryHook, user_args, sdk_args):
+ with patch.object(hook, "_dataflow_exists") as mock_dataflow_exists:
+ mock_dataflow_exists.return_value = False
+
+ with pytest.raises(AirflowException, match=r"Dataflow .+ does not exist"):
+ hook.update_dataflow(*user_args)
+
+
+@parametrize(
+ explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
+ implicit_factory=(
+ (NAME,),
+ (
+ DEFAULT_RESOURCE_GROUP,
+ DEFAULT_FACTORY,
+ NAME,
+ ),
+ ),
+)
+def test_delete_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
+ hook.delete_dataflow(*user_args)
+
+ hook._conn.data_flows.delete.assert_called_with(*sdk_args)
+
+
@parametrize(
explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),