You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/23 09:31:38 UTC

[GitHub] [airflow] flvndh commented on a change in pull request #11015: Add Azure Data Factory hook

flvndh commented on a change in pull request #11015:
URL: https://github.com/apache/airflow/pull/11015#discussion_r510758299



##########
File path: airflow/providers/microsoft/azure/hooks/azure_data_factory.py
##########
@@ -0,0 +1,659 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import inspect
+from functools import wraps
+from typing import Any, Callable, Optional
+
+from azure.mgmt.datafactory import DataFactoryManagementClient
+from azure.mgmt.datafactory.models import (
+    CreateRunResponse,
+    Dataset,
+    DatasetResource,
+    Factory,
+    LinkedService,
+    LinkedServiceResource,
+    PipelineResource,
+    PipelineRun,
+    Trigger,
+    TriggerResource,
+)
+from msrestazure.azure_operation import AzureOperationPoller
+
+from airflow.exceptions import AirflowException
+from airflow.providers.microsoft.azure.hooks.base_azure import AzureBaseHook
+
+
+def provide_targeted_factory(func: Callable) -> Callable:
+    """
+    Provide the targeted factory to the decorated function in case it isn't specified.
+
+    If ``resource_group_name`` or ``factory_name`` is not provided it defaults to the value specified in
+    the connection extras.
+    """
+
+    signature = inspect.signature(func)
+
+    @wraps(func)
+    def wrapper(*args, **kwargs) -> Callable:
+        bound_args = signature.bind(*args, **kwargs)
+
+        def bind_argument(arg, default_key):
+            if arg not in bound_args.arguments:
+                self = args[0]
+                conn = self.get_connection(self.conn_id)
+                default_value = conn.extra_dejson.get(default_key)
+
+                if not default_value:
+                    raise AirflowException("Could not determine the targeted data factory.")
+
+                bound_args.arguments[arg] = conn.extra_dejson[default_key]
+
+        bind_argument("resource_group_name", "resourceGroup")
+        bind_argument("factory_name", "factory")
+
+        return func(*bound_args.args, **bound_args.kwargs)
+
+    return wrapper
+
+
+class AzureDataFactoryHook(AzureBaseHook):
+    """
+    A hook to interact with Azure Data Factory.
+
+    :param conn_id: The Azure Data Factory connection id.
+    """
+
+    def __init__(self, conn_id: str = "azure_data_factory_default"):
+        super().__init__(sdk_client=DataFactoryManagementClient, conn_id=conn_id)
+        self._conn = None
+
+    def get_conn(self) -> DataFactoryManagementClient:
+        if not self._conn:
+            self._conn = super().get_conn()
+
+        return self._conn
+
+    @provide_targeted_factory
+    def get_factory(
+        self, resource_group_name: Optional[str] = None, factory_name: Optional[str] = None, **config: Any
+    ) -> Factory:
+        """
+        Get the factory.
+
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :return: The factory.
+        """
+
+        return self.get_conn().factories.get(resource_group_name, factory_name, **config)
+
+    @provide_targeted_factory
+    def update_factory(
+        self,
+        factory: Factory,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        **config: Any,
+    ) -> Factory:
+        """
+        Update the factory.
+
+        :param factory: The factory resource definition.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :return: The factory.
+        """
+
+        return self.get_conn().factories.create_or_update(
+            resource_group_name, factory_name, factory, **config
+        )
+
+    @provide_targeted_factory
+    def create_factory(
+        self,
+        factory: Factory,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        **config: Any,
+    ) -> Factory:
+        """
+        Create the factory.
+
+        :param factory: The factory resource definition.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :return: The factory.
+        """
+
+        return self.get_conn().factories.create_or_update(
+            resource_group_name, factory_name, factory, **config
+        )
+
+    @provide_targeted_factory
+    def delete_factory(
+        self, resource_group_name: Optional[str] = None, factory_name: Optional[str] = None, **config: Any
+    ) -> None:
+        """
+        Delete the factory.
+
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        """
+
+        self.get_conn().factories.delete(resource_group_name, factory_name, **config)
+
+    @provide_targeted_factory
+    def get_linked_service(
+        self,
+        linked_service_name: str,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        **config: Any,
+    ) -> LinkedServiceResource:
+        """
+        Get the linked service.
+
+        :param linked_service_name: The linked service name.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :return: The linked service.
+        """
+
+        return self.get_conn().linked_services.get(
+            resource_group_name, factory_name, linked_service_name, **config
+        )
+
+    @provide_targeted_factory
+    def update_linked_service(
+        self,
+        linked_service_name: str,
+        linked_service: LinkedService,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        **config: Any,
+    ) -> LinkedServiceResource:
+        """
+        Update the linked service.
+
+        :param linked_service_name: The linked service name.
+        :param linked_service: The linked service resource definition.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :return: The linked service.
+        """
+
+        return self.get_conn().linked_services.create_or_update(
+            resource_group_name, factory_name, linked_service_name, linked_service, **config
+        )
+
+    @provide_targeted_factory
+    def create_linked_service(
+        self,
+        linked_service_name: str,
+        linked_service: LinkedService,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        **config: Any,
+    ) -> LinkedServiceResource:
+        """
+        Create the linked service.
+
+        :param linked_service_name: The linked service name.
+        :param linked_service: The linked service resource definition.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :return: The linked service.
+        """
+
+        return self.get_conn().linked_services.create_or_update(
+            resource_group_name, factory_name, linked_service_name, linked_service, **config
+        )
+
+    @provide_targeted_factory
+    def delete_linked_service(
+        self,
+        linked_service_name: str,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        **config: Any,
+    ) -> None:
+        """
+        Delete the linked service:
+
+        :param linked_service_name: The linked service name.
+        :param resource_group_name: The linked service name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        """
+
+        self.get_conn().linked_services.delete(
+            resource_group_name, factory_name, linked_service_name, **config
+        )
+
+    @provide_targeted_factory
+    def get_dataset(
+        self,
+        dataset_name: str,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        **config: Any,
+    ) -> DatasetResource:
+        """
+        Get the dataset.
+
+        :param dataset_name: The dataset name.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :return: The dataset.
+        """
+
+        return self.get_conn().datasets.get(resource_group_name, factory_name, dataset_name, **config)
+
+    @provide_targeted_factory
+    def update_dataset(
+        self,
+        dataset_name: str,
+        dataset: Dataset,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        **config: Any,
+    ) -> DatasetResource:
+        """
+        Update the dataset.
+
+        :param dataset_name: The dataset name.
+        :param dataset: The dataset resource definition.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :return: The dataset.
+        """
+
+        return self.get_conn().datasets.create_or_update(
+            resource_group_name, factory_name, dataset_name, dataset, **config
+        )
+
+    @provide_targeted_factory
+    def create_dataset(
+        self,
+        dataset_name: str,
+        dataset: Dataset,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        **config: Any,
+    ) -> DatasetResource:
+        """
+        Create the dataset.
+
+        :param dataset_name: The dataset name.
+        :param dataset: The dataset resource definition.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        :return: The dataset.
+        """
+
+        return self.get_conn().datasets.create_or_update(
+            resource_group_name, factory_name, dataset_name, dataset, **config
+        )

Review comment:
       I did it that way so that we can potentially add existence checks to the `create` methods and throw an exception when the entity already exists. But that could also be left to the user code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org