You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/28 21:13:41 UTC

[GitHub] [airflow] josh-fell opened a new pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

josh-fell opened a new pull request #17885:
URL: https://github.com/apache/airflow/pull/17885


   Closes: #17495, #14677
   
   This PR adds the first operator and sensor to integrate with Azure Data Factory:
   - `AzureDataFactoryRunPipelineOperator` will execute a given pipeline within a given data factory with a configurable option to wait for the pipeline execution completion
   - `AzureDataFactoryPipelineRunStatusSensor` will check the status of a give pipeline execution
   
   Additionally, the connection form for the Azure Data Factory type has been updated to add explicit values currently embedded within the classic `Extras` field.
   
   Finally, a small update has been made to a few function signatures within the `AzureDataFactoryHook` to annotate the correct type.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #17885:
URL: https://github.com/apache/airflow/pull/17885


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r701946651



##########
File path: airflow/providers/microsoft/azure/hooks/data_factory.py
##########
@@ -583,6 +584,77 @@ def get_pipeline_run(
         """
         return self.get_conn().pipeline_runs.get(resource_group_name, factory_name, run_id, **config)
 
+    def check_pipeline_run_status(

Review comment:
       Much better implementation; these functions should only perform one actions. I'll go ahead and make these changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r704591000



##########
File path: airflow/providers/microsoft/azure/operators/data_factory.py
##########
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.data_factory import AzureDataFactoryHook
+
+
+class AzureDataFactoryRunPipelineOperator(BaseOperator):
+    """
+    Executes a data factory pipeline.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureDataFactoryRunPipelineOperator`
+
+    :param azure_data_factory_conn_id: The connection identifier for connecting to Azure Data Factory.
+    :type azure_data_factory_conn_id: str
+    :param pipeline_name: The name of the pipeline to execute.
+    :type pipeline_name: str
+    :param wait_for_completion: Flag to wait on a pipeline run's completion.  By default, this feature is
+        enabled but could be disabled to perform an asynchronous wait for a long-running pipeline execution
+        using the ``AzureDataFactoryPipelineRunSensor``.
+    :type wait_for_pipeline_run: bool
+    :param resource_group_name: The resource group name. If a value is not passed in to the operator, the
+        ``AzureDataFactoryHook`` will attempt to use the resource group name provided in the corresponding
+        connection.
+    :type resource_group_name: str
+    :param factory_name: The data factory name. If a value is not passed in to the operator, the
+        ``AzureDataFactoryHook`` will attempt to use the factory name name provided in the corresponding
+        connection.
+    :type factory_name: str
+    :param reference_pipeline_run_id: The pipeline run identifier. If this run ID is specified the parameters
+        of the specified run will be used to create a new run.
+    :type reference_pipeline_run_id: str
+    :param is_recovery: Recovery mode flag. If recovery mode is set to `True`, the specified referenced
+        pipeline run and the new run will be grouped under the same ``groupId``.
+    :type is_recovery: bool
+    :param start_activity_name: In recovery mode, the rerun will start from this activity. If not specified,
+        all activities will run.
+    :type start_activity_name: str
+    :param start_from_failure: In recovery mode, if set to true, the rerun will start from failed activities.
+        The property will be used only if ``start_activity_name`` is not specified.
+    :type start_from_failure: bool
+    :param parameters: Parameters of the pipeline run. These parameters are referenced in a pipeline via
+        ``@pipeline().parameters.parameterName`` and will be used only if the ``reference_pipeline_run_id`` is
+        not specified.
+    :type start_from_failure: Dict[str, Any]
+    :param timeout: Time in seconds to wait for a pipeline to reach a terminal status for non-asynchronous
+        waits. Used only if ``wait_for_completion`` is True.
+    :type timeout: int
+    :param check_interval: Time in seconds to check on a pipeline run's status for non-asynchronous waits.
+        Used only if ``wait_for_completion`` is True.
+    :type check_interval: int
+    """
+
+    template_fields = (
+        "azure_data_factory_conn_id",
+        "resource_group_name",
+        "factory_name",
+        "pipeline_name",
+        "reference_pipeline_run_id",
+        "parameters",
+    )
+    template_fields_renderers = {"parameters": "json"}
+
+    def __init__(
+        self,
+        *,
+        azure_data_factory_conn_id: str,
+        pipeline_name: str,
+        wait_for_completion: bool = True,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        reference_pipeline_run_id: Optional[str] = None,
+        is_recovery: Optional[bool] = None,
+        start_activity_name: Optional[str] = None,
+        start_from_failure: Optional[bool] = None,
+        parameters: Optional[Dict[str, Any]] = None,
+        timeout: Optional[int] = 60 * 60 * 24 * 7,
+        check_interval: Optional[int] = 60,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.azure_data_factory_conn_id = azure_data_factory_conn_id
+        self.pipeline_name = pipeline_name
+        self.wait_for_completion = wait_for_completion
+        self.resource_group_name = resource_group_name
+        self.factory_name = factory_name
+        self.reference_pipeline_run_id = reference_pipeline_run_id
+        self.is_recovery = is_recovery
+        self.start_activity_name = start_activity_name
+        self.start_from_failure = start_from_failure
+        self.parameters = parameters
+        self.timeout = timeout
+        self.check_interval = check_interval
+
+    def execute(self, context: Dict) -> None:
+        self.hook = AzureDataFactoryHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
+        self.log.info(f"Executing the '{self.pipeline_name}' pipeline.")
+        response = self.hook.run_pipeline(
+            pipeline_name=self.pipeline_name,
+            resource_group_name=self.resource_group_name,
+            factory_name=self.factory_name,
+            reference_pipeline_run_id=self.reference_pipeline_run_id,
+            is_recovery=self.is_recovery,
+            start_activity_name=self.start_activity_name,
+            start_from_failure=self.start_from_failure,
+            parameters=self.parameters,
+        )
+        self.run_id = vars(response)["run_id"]
+        # Push the ``run_id`` value to XCom regardless of what happens during execution. This allows for
+        # retrieval the executed pipeline's ``run_id`` for downstream tasks especially if performing an
+        # asynchronous wait.
+        context["ti"].xcom_push(key="run_id", value=self.run_id)
+
+        if self.wait_for_completion:
+            self.log.info(f"Waiting for pipeline run {self.run_id} to complete.")
+            self.hook.check_pipeline_run_status(
+                run_id=self.run_id,
+                check_interval=self.check_interval,
+                timeout=self.timeout,
+                resource_group_name=self.resource_group_name,
+                factory_name=self.factory_name,
+            )
+            self.log.info(f"Pipeline run {self.run_id} has completed successfully.")
+
+    def on_kill(self) -> None:
+        if self.run_id:
+            self.hook.cancel_pipeline_run(
+                run_id=self.run_id,
+                resource_group_name=self.resource_group_name,
+                factory_name=self.factory_name,
+            )

Review comment:
       This check can be done via the new `wait_for_pipeline_run_status` function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r698040885



##########
File path: airflow/providers/microsoft/azure/sensors/azure_data_factory.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.microsoft.azure.hooks.azure_data_factory import (
+    AzureDataFactoryHook,
+    AzureDataFactoryPipelineRunStatus,
+)
+from airflow.sensors.base import BaseSensorOperator
+
+
+class AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
+    """
+    Checks the status of a pipeline run.
+
+    :param conn_id: The connection identifier for connecting to Azure Data Factory.
+    :type conn_id: str
+    :param run_id: The pipeline run identifier.
+    :type run_id: str
+    :param resource_group_name: The resource group name.
+    :type resource_group_name: str
+    :param factory_name: The data factory name.
+    :type factory_name: str
+    """
+
+    template_fields = ("conn_id", "resource_group_name", "factory_name", "run_id")
+
+    def __init__(
+        self,
+        *,
+        conn_id: str,

Review comment:
       This comment is for all the `conn_id` in your code.
   The convention is always using at least the provider prefix with the `conn_id` attribute.
   
   Examples:
   aws:
   https://github.com/apache/airflow/blob/95279bdb8989fc0e433799792dab2ed38373e1f8/airflow/providers/amazon/aws/operators/dms_create_task.py#L93
   https://github.com/apache/airflow/blob/95279bdb8989fc0e433799792dab2ed38373e1f8/airflow/providers/amazon/aws/operators/dms_create_task.py#L93
   
   In Azure it's usually `azure_{service}_conn_id `
   https://github.com/apache/airflow/blob/95279bdb8989fc0e433799792dab2ed38373e1f8/airflow/providers/microsoft/azure/operators/azure_batch.py#L165
   https://github.com/apache/airflow/blob/95279bdb8989fc0e433799792dab2ed38373e1f8/airflow/providers/microsoft/azure/operators/adls_list.py#L56
   
   if there is a reason to do something other than the convention please explain why
   

##########
File path: airflow/providers/microsoft/azure/sensors/azure_data_factory.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.microsoft.azure.hooks.azure_data_factory import (
+    AzureDataFactoryHook,
+    AzureDataFactoryPipelineRunStatus,
+)
+from airflow.sensors.base import BaseSensorOperator
+
+
+class AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
+    """
+    Checks the status of a pipeline run.
+
+    :param conn_id: The connection identifier for connecting to Azure Data Factory.
+    :type conn_id: str
+    :param run_id: The pipeline run identifier.
+    :type run_id: str
+    :param resource_group_name: The resource group name.
+    :type resource_group_name: str
+    :param factory_name: The data factory name.
+    :type factory_name: str
+    """
+
+    template_fields = ("conn_id", "resource_group_name", "factory_name", "run_id")
+
+    def __init__(
+        self,
+        *,
+        conn_id: str,
+        run_id: str,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.conn_id = conn_id
+        self.run_id = run_id
+        self.resource_group_name = resource_group_name
+        self.factory_name = factory_name
+
+    def poke(self, context: Dict) -> bool:
+        self.log.info(f"Checking the status for pipeline run {self.run_id}.")
+        self.hook = AzureDataFactoryHook(conn_id=self.conn_id)
+        pipeline_run = self.hook.get_pipeline_run(
+            run_id=self.run_id,
+            factory_name=self.factory_name,
+            resource_group_name=self.resource_group_name,
+        )
+        pipeline_run_status = pipeline_run.status
+        self.log.info(f"Current status for pipeline run {self.run_id}: {pipeline_run_status}.")
+
+        if pipeline_run_status == AzureDataFactoryPipelineRunStatus.SUCCEEDED:
+            self.log.info(f"The pipeline run {self.run_id} has succeeded.")
+            return True
+        elif pipeline_run_status in {
+            AzureDataFactoryPipelineRunStatus.FAILED,
+            AzureDataFactoryPipelineRunStatus.CANCELLED,
+        }:
+            raise AirflowException(
+                f"Pipeline run {self.run_id} is in a negative terminal status: {pipeline_run_status}"
+            )
+
+        return False

Review comment:
       Again would recommend to use a function that returns a job status by id/name (same comment that I gave for the operat) take a look at TableauJobStatusSensor. The code is much more clean & simple when the logic is in the hook.
   
   https://github.com/apache/airflow/blob/95279bdb8989fc0e433799792dab2ed38373e1f8/airflow/providers/tableau/sensors/tableau_job_status.py#L66-L73

##########
File path: airflow/providers/microsoft/azure/provider.yaml
##########
@@ -97,6 +99,9 @@ operators:
   - integration-name: Microsoft Azure Blob Storage
     python-modules:
       - airflow.providers.microsoft.azure.operators.wasb_delete_blob
+  - integration-name: Microsoft Azure Data Factory
+    python-modules:
+      - airflow.providers.microsoft.azure.operators.azure_data_factory

Review comment:
       ```suggestion
         - airflow.providers.microsoft.azure.operators.data_factory
   ```
   see sensor path comment

##########
File path: airflow/providers/microsoft/azure/operators/azure_data_factory.py
##########
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from typing import Any, Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.azure_data_factory import (
+    AzureDataFactoryHook,
+    AzureDataFactoryPipelineRunStatus,
+)
+
+
+class AzureDataFactoryRunPipelineOperator(BaseOperator):
+    """
+    Executes a data factory pipeline.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureDataFactoryRunPipelineOperator`
+
+    :param conn_id: The connection identifier for connecting to Azure Data Factory.
+    :type conn_id: str
+    :param pipeline_name: The name of the pipeline to execute.
+    :type pipeline_name: str
+    :param resource_group_name: The resource group name. If a value is not passed in to the operator, the
+        ``AzureDataFactoryHook`` will attempt to use the resource group name provided in the corresponding
+        connection.
+    :type resource_group_name: str
+    :param factory_name: The data factory name. If a value is not passed in to the operator, the
+        ``AzureDataFactoryHook`` will attempt to use the factory name name provided in the corresponding
+        connection.
+    :type factory_name: str
+    :param reference_pipeline_run_id: The pipeline run identifier. If this run ID is specified the parameters
+        of the specified run will be used to create a new run.
+    :type reference_pipeline_run_id: str
+    :param is_recovery: Recovery mode flag. If recovery mode is set to `True`, the specified referenced
+        pipeline run and the new run will be grouped under the same ``groupId``.
+    :type is_recovery: bool
+    :param start_activity_name: In recovery mode, the rerun will start from this activity. If not specified,
+        all activities will run.
+    :type start_activity_name: str
+    :param start_from_failure: In recovery mode, if set to true, the rerun will start from failed activities.
+        The property will be used only if ``start_activity_name`` is not specified.
+    :type start_from_failure: bool
+    :param parameters: Parameters of the pipeline run. These parameters are referenced in a pipeline via
+        ``@pipeline().parameters.parameterName`` and will be used only if the ``reference_pipeline_run_id`` is
+        not specified.
+    :type start_from_failure: Dict[str, Any]
+    :param wait_for_completion: Flag to wait on a pipeline run's completion.  By default, this feature is
+        enabled but could be disabled to wait for a long-running pipeline execution using the
+        ``AzureDataFactoryPipelineRunSensor`` rather than this operator.
+    :type wait_for_completion: bool
+    :param timeout: Time in seconds to wait for a pipeline to reach a terminal status for non-asynchronous
+        waits. Used only if ``wait_for_completion`` is True.
+    :type timeout: int
+    :param poke_interval: Time in seconds to check on a pipeline run's status for non-asynchronous waits. Used
+        only if ``wait_for_completion`` is True.
+    :type poke_interval: int
+    """
+
+    template_fields = (
+        "conn_id",
+        "resource_group_name",
+        "factory_name",
+        "pipeline_name",
+        "reference_pipeline_run_id",
+        "parameters",
+    )
+    template_fields_renderers = {"parameters": "json"}
+
+    def __init__(
+        self,
+        *,
+        conn_id: str,
+        pipeline_name: str,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        reference_pipeline_run_id: Optional[str] = None,
+        is_recovery: Optional[bool] = None,
+        start_activity_name: Optional[str] = None,
+        start_from_failure: Optional[bool] = None,
+        parameters: Optional[Dict[str, Any]] = None,
+        wait_for_completion: Optional[bool] = True,
+        timeout: Optional[int] = 60 * 60 * 24 * 7,
+        poke_interval: Optional[int] = 30,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.conn_id = conn_id
+        self.pipeline_name = pipeline_name
+        self.resource_group_name = resource_group_name
+        self.factory_name = factory_name
+        self.reference_pipeline_run_id = reference_pipeline_run_id
+        self.is_recovery = is_recovery
+        self.start_activity_name = start_activity_name
+        self.start_from_failure = start_from_failure
+        self.parameters = parameters
+        self.wait_for_completion = wait_for_completion
+        self.timeout = timeout
+        self.poke_interval = poke_interval
+
+    def execute(self, context: Dict) -> None:
+        self.hook = AzureDataFactoryHook(conn_id=self.conn_id)
+        self.log.info(f"Executing the '{self.pipeline_name}' pipeline.")
+        response = self.hook.run_pipeline(
+            pipeline_name=self.pipeline_name,
+            resource_group_name=self.resource_group_name,
+            factory_name=self.factory_name,
+            reference_pipeline_run_id=self.reference_pipeline_run_id,
+            is_recovery=self.is_recovery,
+            start_activity_name=self.start_activity_name,
+            start_from_failure=self.start_from_failure,
+            parameters=self.parameters,
+        )
+        self.run_id = vars(response)["run_id"]
+        # Push the ``run_id`` value to XCom regardless of what happens during execution. This allows for
+        # retrieval the executed pipeline's ``run_id`` for downstream tasks especially if performing an
+        # asynchronous wait.
+        context["ti"].xcom_push(key="run_id", value=self.run_id)
+
+        if self.wait_for_completion:
+            self.log.info(f"Waiting for run ID {self.run_id} of pipeline {self.pipeline_name} to complete.")
+            start_time = time.monotonic()
+            pipeline_run_status = None
+            while pipeline_run_status not in AzureDataFactoryPipelineRunStatus.TERMINAL_STATUSES:
+                # Check to see if the pipeline-run duration has exceeded the ``timeout`` configured.
+                if start_time + self.timeout < time.monotonic():
+                    raise AirflowException(
+                        f"Pipeline run {self.run_id} has not reached a terminal status after "
+                        f"{self.timeout} seconds."
+                    )
+
+                # Wait to check the status of the pipeline based on the ``poke_interval`` configured.
+                time.sleep(self.poke_interval)
+
+                self.log.info(f"Checking on the status of run ID {self.run_id}.")
+                pipeline_run = self.hook.get_pipeline_run(
+                    run_id=self.run_id,
+                    factory_name=self.factory_name,
+                    resource_group_name=self.resource_group_name,
+                )
+                pipeline_run_status = pipeline_run.status
+                self.log.info(f"Run ID {self.run_id} is in a status of {pipeline_run_status}.")
+
+            if pipeline_run_status == AzureDataFactoryPipelineRunStatus.CANCELLED:
+                raise AirflowException(f"Pipeline run {self.run_id} has been cancelled.")
+
+            if pipeline_run_status == AzureDataFactoryPipelineRunStatus.FAILED:
+                raise AirflowException(f"Pipeline run {self.run_id} has failed.")

Review comment:
       This feels like a lot of logic that just shouldn't been in operator level.
   I would suggest to take a look at Tableau provider. It's almost identical case.
   
   https://github.com/apache/airflow/blob/95279bdb8989fc0e433799792dab2ed38373e1f8/airflow/providers/tableau/operators/tableau.py#L123-L129

##########
File path: airflow/providers/microsoft/azure/provider.yaml
##########
@@ -105,6 +110,9 @@ sensors:
   - integration-name: Microsoft Azure Blob Storage
     python-modules:
       - airflow.providers.microsoft.azure.sensors.wasb
+  - integration-name: Microsoft Azure Data Factory
+    python-modules:
+      - airflow.providers.microsoft.azure.sensors.azure_data_factory

Review comment:
       This doesn't comply with `AIP-21: Changes in import paths`.
   ```suggestion
         - airflow.providers.microsoft.azure.sensors.data_factory
   ```
   
   I know there are other Azure operators/sensor that aren't comply as well. We will need fix someday but lets not introduce new operators/sensor that aren't compatible with the AIP.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#issuecomment-922429032


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r701857606



##########
File path: airflow/providers/microsoft/azure/hooks/data_factory.py
##########
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations

Review comment:
       Changing the path is OK but we can't drop the `hooks/azure_data_factory.py` yet because this is a breaking change.
   
   The file should exist with deprecation notice and point to the updated hook.
   See 
   https://github.com/apache/airflow/blob/bff580602bc619afe1bee2f7a5c3ded5fc6e39dd/airflow/providers/amazon/aws/hooks/aws_dynamodb.py#L18-L28

##########
File path: airflow/providers/microsoft/azure/operators/data_factory.py
##########
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.data_factory import AzureDataFactoryHook
+
+
+class AzureDataFactoryRunPipelineOperator(BaseOperator):
+    """
+    Executes a data factory pipeline.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureDataFactoryRunPipelineOperator`
+
+    :param azure_data_factory_conn_id: The connection identifier for connecting to Azure Data Factory.
+    :type azure_data_factory_conn_id: str
+    :param pipeline_name: The name of the pipeline to execute.
+    :type pipeline_name: str
+    :param wait_for_completion: Flag to wait on a pipeline run's completion.  By default, this feature is
+        enabled but could be disabled to perform an asynchronous wait for a long-running pipeline execution
+        using the ``AzureDataFactoryPipelineRunSensor``.
+    :type wait_for_pipeline_run: bool
+    :param resource_group_name: The resource group name. If a value is not passed in to the operator, the
+        ``AzureDataFactoryHook`` will attempt to use the resource group name provided in the corresponding
+        connection.
+    :type resource_group_name: str
+    :param factory_name: The data factory name. If a value is not passed in to the operator, the
+        ``AzureDataFactoryHook`` will attempt to use the factory name name provided in the corresponding
+        connection.
+    :type factory_name: str
+    :param reference_pipeline_run_id: The pipeline run identifier. If this run ID is specified the parameters
+        of the specified run will be used to create a new run.
+    :type reference_pipeline_run_id: str
+    :param is_recovery: Recovery mode flag. If recovery mode is set to `True`, the specified referenced
+        pipeline run and the new run will be grouped under the same ``groupId``.
+    :type is_recovery: bool
+    :param start_activity_name: In recovery mode, the rerun will start from this activity. If not specified,
+        all activities will run.
+    :type start_activity_name: str
+    :param start_from_failure: In recovery mode, if set to true, the rerun will start from failed activities.
+        The property will be used only if ``start_activity_name`` is not specified.
+    :type start_from_failure: bool
+    :param parameters: Parameters of the pipeline run. These parameters are referenced in a pipeline via
+        ``@pipeline().parameters.parameterName`` and will be used only if the ``reference_pipeline_run_id`` is
+        not specified.
+    :type start_from_failure: Dict[str, Any]
+    :param timeout: Time in seconds to wait for a pipeline to reach a terminal status for non-asynchronous
+        waits. Used only if ``wait_for_completion`` is True.
+    :type timeout: int
+    :param check_interval: Time in seconds to check on a pipeline run's status for non-asynchronous waits.
+        Used only if ``wait_for_completion`` is True.
+    :type check_interval: int
+    """
+
+    template_fields = (
+        "azure_data_factory_conn_id",
+        "resource_group_name",
+        "factory_name",
+        "pipeline_name",
+        "reference_pipeline_run_id",
+        "parameters",
+    )
+    template_fields_renderers = {"parameters": "json"}
+
+    def __init__(
+        self,
+        *,
+        azure_data_factory_conn_id: str,
+        pipeline_name: str,
+        wait_for_completion: bool = True,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        reference_pipeline_run_id: Optional[str] = None,
+        is_recovery: Optional[bool] = None,
+        start_activity_name: Optional[str] = None,
+        start_from_failure: Optional[bool] = None,
+        parameters: Optional[Dict[str, Any]] = None,
+        timeout: Optional[int] = 60 * 60 * 24 * 7,
+        check_interval: Optional[int] = 60,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.azure_data_factory_conn_id = azure_data_factory_conn_id
+        self.pipeline_name = pipeline_name
+        self.wait_for_completion = wait_for_completion
+        self.resource_group_name = resource_group_name
+        self.factory_name = factory_name
+        self.reference_pipeline_run_id = reference_pipeline_run_id
+        self.is_recovery = is_recovery
+        self.start_activity_name = start_activity_name
+        self.start_from_failure = start_from_failure
+        self.parameters = parameters
+        self.timeout = timeout
+        self.check_interval = check_interval
+
+    def execute(self, context: Dict) -> None:
+        self.hook = AzureDataFactoryHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
+        self.log.info(f"Executing the '{self.pipeline_name}' pipeline.")
+        response = self.hook.run_pipeline(
+            pipeline_name=self.pipeline_name,
+            resource_group_name=self.resource_group_name,
+            factory_name=self.factory_name,
+            reference_pipeline_run_id=self.reference_pipeline_run_id,
+            is_recovery=self.is_recovery,
+            start_activity_name=self.start_activity_name,
+            start_from_failure=self.start_from_failure,
+            parameters=self.parameters,
+        )
+        self.run_id = vars(response)["run_id"]
+        # Push the ``run_id`` value to XCom regardless of what happens during execution. This allows for
+        # retrieval the executed pipeline's ``run_id`` for downstream tasks especially if performing an
+        # asynchronous wait.
+        context["ti"].xcom_push(key="run_id", value=self.run_id)
+
+        if self.wait_for_completion:
+            self.log.info(f"Waiting for pipeline run {self.run_id} to complete.")
+            self.hook.check_pipeline_run_status(
+                run_id=self.run_id,
+                check_interval=self.check_interval,
+                timeout=self.timeout,
+                resource_group_name=self.resource_group_name,
+                factory_name=self.factory_name,
+            )
+            self.log.info(f"Pipeline run {self.run_id} has completed successfully.")
+
+    def on_kill(self) -> None:
+        if self.run_id:
+            self.hook.cancel_pipeline_run(
+                run_id=self.run_id,
+                resource_group_name=self.resource_group_name,
+                factory_name=self.factory_name,
+            )

Review comment:
       is there a way we can indicate if the cancel pipeline was successful?

##########
File path: airflow/providers/microsoft/azure/hooks/data_factory.py
##########
@@ -583,6 +584,77 @@ def get_pipeline_run(
         """
         return self.get_conn().pipeline_runs.get(resource_group_name, factory_name, run_id, **config)
 
+    def check_pipeline_run_status(

Review comment:
       This function does more than it's name.
   It check for status + allows you to wait for status but it doesn't let you do that to any status you want.
   This function is very binded to the operator/sensor you added and does't give much flexibility for custom usages.
   
   WDYT about splitting it to:
   `check_pipeline_run_status` -  just check if run_id status match any of provided status list.
   `wait_pipeline_run_termination` - wait for pipeline to end in any of the terminated states.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r698116308



##########
File path: airflow/providers/microsoft/azure/sensors/azure_data_factory.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.microsoft.azure.hooks.azure_data_factory import (
+    AzureDataFactoryHook,
+    AzureDataFactoryPipelineRunStatus,
+)
+from airflow.sensors.base import BaseSensorOperator
+
+
+class AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
+    """
+    Checks the status of a pipeline run.
+
+    :param conn_id: The connection identifier for connecting to Azure Data Factory.
+    :type conn_id: str
+    :param run_id: The pipeline run identifier.
+    :type run_id: str
+    :param resource_group_name: The resource group name.
+    :type resource_group_name: str
+    :param factory_name: The data factory name.
+    :type factory_name: str
+    """
+
+    template_fields = ("conn_id", "resource_group_name", "factory_name", "run_id")
+
+    def __init__(
+        self,
+        *,
+        conn_id: str,

Review comment:
       Seems as though the only current Azure integration that disregards this convention entirely is the Data Factory hook.  I was following the existing `conn_id` name; happy to change this.  I'll make sure to update the notes of this PR as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r704591128



##########
File path: airflow/providers/microsoft/azure/hooks/data_factory.py
##########
@@ -583,6 +584,77 @@ def get_pipeline_run(
         """
         return self.get_conn().pipeline_runs.get(resource_group_name, factory_name, run_id, **config)
 
+    def check_pipeline_run_status(

Review comment:
       For now I chose to have two functions: 
   `get_pipeline_run_status` - Simply retrieves the current status of the pipeline run
   `wait_for_pipeline_run_status` - Periodically checks if the pipeline run status matches a terminal status or any input status(es)
   
   After iterating a few times, splitting the functionality like we thought always led to instances of having unnecessary extra connections to ADF. (e.g. at the end of run a connection is made to ADF to wait for termination (via `wait_for_pipeline_run_termination`, pipeline reaches a terminal status, and then connect again to check if the status was expected (via `check_pipeline_run_status`) even though the logic had just done this task. Felt like the solution was too isolated functionally. The `wait_for_pipeline_run_status` function seems aligned with some other similar implementations but let me know what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r698116308



##########
File path: airflow/providers/microsoft/azure/sensors/azure_data_factory.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.microsoft.azure.hooks.azure_data_factory import (
+    AzureDataFactoryHook,
+    AzureDataFactoryPipelineRunStatus,
+)
+from airflow.sensors.base import BaseSensorOperator
+
+
+class AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
+    """
+    Checks the status of a pipeline run.
+
+    :param conn_id: The connection identifier for connecting to Azure Data Factory.
+    :type conn_id: str
+    :param run_id: The pipeline run identifier.
+    :type run_id: str
+    :param resource_group_name: The resource group name.
+    :type resource_group_name: str
+    :param factory_name: The data factory name.
+    :type factory_name: str
+    """
+
+    template_fields = ("conn_id", "resource_group_name", "factory_name", "run_id")
+
+    def __init__(
+        self,
+        *,
+        conn_id: str,

Review comment:
       Seems as though the only current Azure integration that disregards this convention is the Data Factory hook.  I was following the existing `conn_id` name; happy to change this.  I'll make sure to update the notes of this PR as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r698137923



##########
File path: airflow/providers/microsoft/azure/operators/azure_data_factory.py
##########
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from typing import Any, Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.azure_data_factory import (
+    AzureDataFactoryHook,
+    AzureDataFactoryPipelineRunStatus,
+)
+
+
+class AzureDataFactoryRunPipelineOperator(BaseOperator):
+    """
+    Executes a data factory pipeline.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureDataFactoryRunPipelineOperator`
+
+    :param conn_id: The connection identifier for connecting to Azure Data Factory.
+    :type conn_id: str
+    :param pipeline_name: The name of the pipeline to execute.
+    :type pipeline_name: str
+    :param resource_group_name: The resource group name. If a value is not passed in to the operator, the
+        ``AzureDataFactoryHook`` will attempt to use the resource group name provided in the corresponding
+        connection.
+    :type resource_group_name: str
+    :param factory_name: The data factory name. If a value is not passed in to the operator, the
+        ``AzureDataFactoryHook`` will attempt to use the factory name name provided in the corresponding
+        connection.
+    :type factory_name: str
+    :param reference_pipeline_run_id: The pipeline run identifier. If this run ID is specified the parameters
+        of the specified run will be used to create a new run.
+    :type reference_pipeline_run_id: str
+    :param is_recovery: Recovery mode flag. If recovery mode is set to `True`, the specified referenced
+        pipeline run and the new run will be grouped under the same ``groupId``.
+    :type is_recovery: bool
+    :param start_activity_name: In recovery mode, the rerun will start from this activity. If not specified,
+        all activities will run.
+    :type start_activity_name: str
+    :param start_from_failure: In recovery mode, if set to true, the rerun will start from failed activities.
+        The property will be used only if ``start_activity_name`` is not specified.
+    :type start_from_failure: bool
+    :param parameters: Parameters of the pipeline run. These parameters are referenced in a pipeline via
+        ``@pipeline().parameters.parameterName`` and will be used only if the ``reference_pipeline_run_id`` is
+        not specified.
+    :type start_from_failure: Dict[str, Any]
+    :param wait_for_completion: Flag to wait on a pipeline run's completion.  By default, this feature is
+        enabled but could be disabled to wait for a long-running pipeline execution using the
+        ``AzureDataFactoryPipelineRunSensor`` rather than this operator.
+    :type wait_for_completion: bool
+    :param timeout: Time in seconds to wait for a pipeline to reach a terminal status for non-asynchronous
+        waits. Used only if ``wait_for_completion`` is True.
+    :type timeout: int
+    :param poke_interval: Time in seconds to check on a pipeline run's status for non-asynchronous waits. Used
+        only if ``wait_for_completion`` is True.
+    :type poke_interval: int
+    """
+
+    template_fields = (
+        "conn_id",
+        "resource_group_name",
+        "factory_name",
+        "pipeline_name",
+        "reference_pipeline_run_id",
+        "parameters",
+    )
+    template_fields_renderers = {"parameters": "json"}
+
+    def __init__(
+        self,
+        *,
+        conn_id: str,
+        pipeline_name: str,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        reference_pipeline_run_id: Optional[str] = None,
+        is_recovery: Optional[bool] = None,
+        start_activity_name: Optional[str] = None,
+        start_from_failure: Optional[bool] = None,
+        parameters: Optional[Dict[str, Any]] = None,
+        wait_for_completion: Optional[bool] = True,
+        timeout: Optional[int] = 60 * 60 * 24 * 7,
+        poke_interval: Optional[int] = 30,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.conn_id = conn_id
+        self.pipeline_name = pipeline_name
+        self.resource_group_name = resource_group_name
+        self.factory_name = factory_name
+        self.reference_pipeline_run_id = reference_pipeline_run_id
+        self.is_recovery = is_recovery
+        self.start_activity_name = start_activity_name
+        self.start_from_failure = start_from_failure
+        self.parameters = parameters
+        self.wait_for_completion = wait_for_completion
+        self.timeout = timeout
+        self.poke_interval = poke_interval
+
+    def execute(self, context: Dict) -> None:
+        self.hook = AzureDataFactoryHook(conn_id=self.conn_id)
+        self.log.info(f"Executing the '{self.pipeline_name}' pipeline.")
+        response = self.hook.run_pipeline(
+            pipeline_name=self.pipeline_name,
+            resource_group_name=self.resource_group_name,
+            factory_name=self.factory_name,
+            reference_pipeline_run_id=self.reference_pipeline_run_id,
+            is_recovery=self.is_recovery,
+            start_activity_name=self.start_activity_name,
+            start_from_failure=self.start_from_failure,
+            parameters=self.parameters,
+        )
+        self.run_id = vars(response)["run_id"]
+        # Push the ``run_id`` value to XCom regardless of what happens during execution. This allows for
+        # retrieval the executed pipeline's ``run_id`` for downstream tasks especially if performing an
+        # asynchronous wait.
+        context["ti"].xcom_push(key="run_id", value=self.run_id)
+
+        if self.wait_for_completion:
+            self.log.info(f"Waiting for run ID {self.run_id} of pipeline {self.pipeline_name} to complete.")
+            start_time = time.monotonic()
+            pipeline_run_status = None
+            while pipeline_run_status not in AzureDataFactoryPipelineRunStatus.TERMINAL_STATUSES:
+                # Check to see if the pipeline-run duration has exceeded the ``timeout`` configured.
+                if start_time + self.timeout < time.monotonic():
+                    raise AirflowException(
+                        f"Pipeline run {self.run_id} has not reached a terminal status after "
+                        f"{self.timeout} seconds."
+                    )
+
+                # Wait to check the status of the pipeline based on the ``poke_interval`` configured.
+                time.sleep(self.poke_interval)
+
+                self.log.info(f"Checking on the status of run ID {self.run_id}.")
+                pipeline_run = self.hook.get_pipeline_run(
+                    run_id=self.run_id,
+                    factory_name=self.factory_name,
+                    resource_group_name=self.resource_group_name,
+                )
+                pipeline_run_status = pipeline_run.status
+                self.log.info(f"Run ID {self.run_id} is in a status of {pipeline_run_status}.")
+
+            if pipeline_run_status == AzureDataFactoryPipelineRunStatus.CANCELLED:
+                raise AirflowException(f"Pipeline run {self.run_id} has been cancelled.")
+
+            if pipeline_run_status == AzureDataFactoryPipelineRunStatus.FAILED:
+                raise AirflowException(f"Pipeline run {self.run_id} has failed.")

Review comment:
       Fair point. I'll refactor this piece of the operator and sensor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r701946651



##########
File path: airflow/providers/microsoft/azure/hooks/data_factory.py
##########
@@ -583,6 +584,77 @@ def get_pipeline_run(
         """
         return self.get_conn().pipeline_runs.get(resource_group_name, factory_name, run_id, **config)
 
+    def check_pipeline_run_status(

Review comment:
       Much better implementation; these functions should only perform one action. I'll go ahead and make these changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r701952693



##########
File path: airflow/providers/microsoft/azure/hooks/data_factory.py
##########
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations

Review comment:
       👍  Excellent point, thanks for the tip! I'll be more aware of changes like these in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r698116849



##########
File path: airflow/providers/microsoft/azure/provider.yaml
##########
@@ -105,6 +110,9 @@ sensors:
   - integration-name: Microsoft Azure Blob Storage
     python-modules:
       - airflow.providers.microsoft.azure.sensors.wasb
+  - integration-name: Microsoft Azure Data Factory
+    python-modules:
+      - airflow.providers.microsoft.azure.sensors.azure_data_factory

Review comment:
       Again just following existing Azure conventions but certainly don't want to stray from an AIP. I'll update this. Thanks for the note.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r698116849



##########
File path: airflow/providers/microsoft/azure/provider.yaml
##########
@@ -105,6 +110,9 @@ sensors:
   - integration-name: Microsoft Azure Blob Storage
     python-modules:
       - airflow.providers.microsoft.azure.sensors.wasb
+  - integration-name: Microsoft Azure Data Factory
+    python-modules:
+      - airflow.providers.microsoft.azure.sensors.azure_data_factory

Review comment:
       Again just following existing Azure conventions but certainly don't want to stray away from an AIP. I'll update this. Thanks for the note.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r704591128



##########
File path: airflow/providers/microsoft/azure/hooks/data_factory.py
##########
@@ -583,6 +584,77 @@ def get_pipeline_run(
         """
         return self.get_conn().pipeline_runs.get(resource_group_name, factory_name, run_id, **config)
 
+    def check_pipeline_run_status(

Review comment:
       For now I chose to have two functions: 
   `get_pipeline_run_status` - Simply retrieves the current status of the pipeline run
   `wait_for_pipeline_run_status` - Periodically checks if the pipeline run status matches a terminal status or any input status(es)
   
   After iterating a few times, splitting the functionality like we thought always led to instances of having unnecessary extra connections to ADF. (e.g. at the end of run a connection is made to ADF to wait for termination (via `wait_for_pipeline_run_termination`, pipeline reaches a terminal status, and then connect again to check if the status was expected (via 'check_pipeline_run_status`) even though the logic had just done this task. Felt like the solution too isolated functionally. The `wait_for_pipeline_run_status` function seems aligned with some other similar implementations but let me know what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r698042354



##########
File path: airflow/providers/microsoft/azure/provider.yaml
##########
@@ -105,6 +110,9 @@ sensors:
   - integration-name: Microsoft Azure Blob Storage
     python-modules:
       - airflow.providers.microsoft.azure.sensors.wasb
+  - integration-name: Microsoft Azure Data Factory
+    python-modules:
+      - airflow.providers.microsoft.azure.sensors.azure_data_factory

Review comment:
       This doesn't comply with `AIP-21: Changes in import paths`.
   ```suggestion
         - airflow.providers.microsoft.azure.sensors.data_factory
   ```
   
   I know there are other Azure operators/sensor that aren't comply as well. We will need fix someday but lets not introduce new operators/sensor that aren't compatible with the AIP. created https://github.com/apache/airflow/issues/17898 as followup




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r704591128



##########
File path: airflow/providers/microsoft/azure/hooks/data_factory.py
##########
@@ -583,6 +584,77 @@ def get_pipeline_run(
         """
         return self.get_conn().pipeline_runs.get(resource_group_name, factory_name, run_id, **config)
 
+    def check_pipeline_run_status(

Review comment:
       For now I chose to have two functions: 
   `get_pipeline_run_status` - Simply retrieves the current status of the pipeline run
   `wait_for_pipeline_run_status` - Periodically checks if the pipeline run status matches a terminal status or any input status(es)
   
   After iterating a few times, splitting the functionality like we thought always led to instances of having unnecessary extra connections to ADF. (e.g. at the end of run a connection is made to ADF to wait for termination (via `wait_for_pipeline_run_termination`, pipeline reaches a terminal status, and then connect again to check if the status was expected (via `check_pipeline_run_status`) even though the logic had just done this task. Felt like the solution too isolated functionally. The `wait_for_pipeline_run_status` function seems aligned with some other similar implementations but let me know what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17885: Creating ADF pipeline run operator, sensor + ADF custom conn fields

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17885:
URL: https://github.com/apache/airflow/pull/17885#discussion_r701953958



##########
File path: airflow/providers/microsoft/azure/operators/data_factory.py
##########
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.data_factory import AzureDataFactoryHook
+
+
+class AzureDataFactoryRunPipelineOperator(BaseOperator):
+    """
+    Executes a data factory pipeline.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureDataFactoryRunPipelineOperator`
+
+    :param azure_data_factory_conn_id: The connection identifier for connecting to Azure Data Factory.
+    :type azure_data_factory_conn_id: str
+    :param pipeline_name: The name of the pipeline to execute.
+    :type pipeline_name: str
+    :param wait_for_completion: Flag to wait on a pipeline run's completion.  By default, this feature is
+        enabled but could be disabled to perform an asynchronous wait for a long-running pipeline execution
+        using the ``AzureDataFactoryPipelineRunSensor``.
+    :type wait_for_pipeline_run: bool
+    :param resource_group_name: The resource group name. If a value is not passed in to the operator, the
+        ``AzureDataFactoryHook`` will attempt to use the resource group name provided in the corresponding
+        connection.
+    :type resource_group_name: str
+    :param factory_name: The data factory name. If a value is not passed in to the operator, the
+        ``AzureDataFactoryHook`` will attempt to use the factory name name provided in the corresponding
+        connection.
+    :type factory_name: str
+    :param reference_pipeline_run_id: The pipeline run identifier. If this run ID is specified the parameters
+        of the specified run will be used to create a new run.
+    :type reference_pipeline_run_id: str
+    :param is_recovery: Recovery mode flag. If recovery mode is set to `True`, the specified referenced
+        pipeline run and the new run will be grouped under the same ``groupId``.
+    :type is_recovery: bool
+    :param start_activity_name: In recovery mode, the rerun will start from this activity. If not specified,
+        all activities will run.
+    :type start_activity_name: str
+    :param start_from_failure: In recovery mode, if set to true, the rerun will start from failed activities.
+        The property will be used only if ``start_activity_name`` is not specified.
+    :type start_from_failure: bool
+    :param parameters: Parameters of the pipeline run. These parameters are referenced in a pipeline via
+        ``@pipeline().parameters.parameterName`` and will be used only if the ``reference_pipeline_run_id`` is
+        not specified.
+    :type start_from_failure: Dict[str, Any]
+    :param timeout: Time in seconds to wait for a pipeline to reach a terminal status for non-asynchronous
+        waits. Used only if ``wait_for_completion`` is True.
+    :type timeout: int
+    :param check_interval: Time in seconds to check on a pipeline run's status for non-asynchronous waits.
+        Used only if ``wait_for_completion`` is True.
+    :type check_interval: int
+    """
+
+    template_fields = (
+        "azure_data_factory_conn_id",
+        "resource_group_name",
+        "factory_name",
+        "pipeline_name",
+        "reference_pipeline_run_id",
+        "parameters",
+    )
+    template_fields_renderers = {"parameters": "json"}
+
+    def __init__(
+        self,
+        *,
+        azure_data_factory_conn_id: str,
+        pipeline_name: str,
+        wait_for_completion: bool = True,
+        resource_group_name: Optional[str] = None,
+        factory_name: Optional[str] = None,
+        reference_pipeline_run_id: Optional[str] = None,
+        is_recovery: Optional[bool] = None,
+        start_activity_name: Optional[str] = None,
+        start_from_failure: Optional[bool] = None,
+        parameters: Optional[Dict[str, Any]] = None,
+        timeout: Optional[int] = 60 * 60 * 24 * 7,
+        check_interval: Optional[int] = 60,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.azure_data_factory_conn_id = azure_data_factory_conn_id
+        self.pipeline_name = pipeline_name
+        self.wait_for_completion = wait_for_completion
+        self.resource_group_name = resource_group_name
+        self.factory_name = factory_name
+        self.reference_pipeline_run_id = reference_pipeline_run_id
+        self.is_recovery = is_recovery
+        self.start_activity_name = start_activity_name
+        self.start_from_failure = start_from_failure
+        self.parameters = parameters
+        self.timeout = timeout
+        self.check_interval = check_interval
+
+    def execute(self, context: Dict) -> None:
+        self.hook = AzureDataFactoryHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
+        self.log.info(f"Executing the '{self.pipeline_name}' pipeline.")
+        response = self.hook.run_pipeline(
+            pipeline_name=self.pipeline_name,
+            resource_group_name=self.resource_group_name,
+            factory_name=self.factory_name,
+            reference_pipeline_run_id=self.reference_pipeline_run_id,
+            is_recovery=self.is_recovery,
+            start_activity_name=self.start_activity_name,
+            start_from_failure=self.start_from_failure,
+            parameters=self.parameters,
+        )
+        self.run_id = vars(response)["run_id"]
+        # Push the ``run_id`` value to XCom regardless of what happens during execution. This allows for
+        # retrieval the executed pipeline's ``run_id`` for downstream tasks especially if performing an
+        # asynchronous wait.
+        context["ti"].xcom_push(key="run_id", value=self.run_id)
+
+        if self.wait_for_completion:
+            self.log.info(f"Waiting for pipeline run {self.run_id} to complete.")
+            self.hook.check_pipeline_run_status(
+                run_id=self.run_id,
+                check_interval=self.check_interval,
+                timeout=self.timeout,
+                resource_group_name=self.resource_group_name,
+                factory_name=self.factory_name,
+            )
+            self.log.info(f"Pipeline run {self.run_id} has completed successfully.")
+
+    def on_kill(self) -> None:
+        if self.run_id:
+            self.hook.cancel_pipeline_run(
+                run_id=self.run_id,
+                resource_group_name=self.resource_group_name,
+                factory_name=self.factory_name,
+            )

Review comment:
       I believe so. I imagine we could use the new functions here as well to make sure the pipeline is actually canceled. I'll implement and test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org