You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/01 06:24:07 UTC

[GitHub] [airflow] bharanidharan14 opened a new pull request, #25436: Implement Azure Service Bus Topic create Operator

bharanidharan14 opened a new pull request, #25436:
URL: https://github.com/apache/airflow/pull/25436

   - Added Azure Service Bus Topic Create Operator
   - Added Test case 
   - Added Example for the operator


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bharanidharan14 commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
bharanidharan14 commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1205405312

   > > yes the existing flow will through an error if the topic name exists, Now I have add `try catch` so before creating the topic just checking if the topic exists, if it exists logging it with message and returning the topic name, similarly for delete operator
   > 
   > I wonder if there should be a parameter for users to have the option of overwriting a topic that might exist. WDYT?
   
   Just thinking, overwriting the existing topic is kind of update operation right ?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25436:
URL: https://github.com/apache/airflow/pull/25436#discussion_r936378164


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -206,6 +208,120 @@ def execute(self, context: "Context") -> None:
         hook.delete_queue(self.queue_name)
 
 
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+    :param topic_name: Name of the topic.
+    :param default_message_time_to_live: ISO 8601 default message time span to live value. This is
+     the duration after which the message expires, starting from when the message is sent to Service
+     Bus. This is the default value used when TimeToLive is not set on a message itself.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
+     memory allocated for the topic.
+    :param requires_duplicate_detection: A value indicating if this topic requires duplicate
+     detection.
+    :param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
+     duration of the duplicate detection history. The default value is 10 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_batched_operations: Value that indicates whether server-side batched operations
+     are enabled.
+    :param size_in_bytes: The size of the topic, in bytes.
+    :param filtering_messages_before_publishing: Filter messages before publishing.
+    :param authorization_rules: List of Authorization rules for resource.
+    :param support_ordering: A value that indicates whether the topic supports ordering.
+    :param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
+     automatically deleted. The minimum duration is 5 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_partitioning: A value that indicates whether the topic is to be partitioned
+     across multiple message brokers.
+    :param enable_express: A value that indicates whether Express Entities are enabled. An express
+     queue holds a message in memory temporarily before writing it to persistent storage.
+    :param user_metadata: Metadata associated with the topic.
+    :param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
+     can be accepted by the queue. This feature is only available when using a Premium namespace
+     and Service Bus API version "2021-05" or higher.
+     The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None,
+        max_size_in_megabytes: Optional[int] = None,
+        requires_duplicate_detection: Optional[bool] = None,
+        duplicate_detection_history_time_window: Optional[Union[datetime.timedelta, str]] = None,
+        enable_batched_operations: Optional[bool] = None,
+        size_in_bytes: Optional[int] = None,
+        filtering_messages_before_publishing: Optional[bool] = None,
+        authorization_rules: Optional[List[AuthorizationRule]] = None,
+        support_ordering: Optional[bool] = None,
+        auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
+        enable_partitioning: Optional[bool] = None,
+        enable_express: Optional[bool] = None,
+        user_metadata: Optional[str] = None,
+        max_message_size_in_kilobytes: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.topic_name = topic_name
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+        self.default_message_time_to_live = default_message_time_to_live
+        self.max_size_in_megabytes = max_size_in_megabytes
+        self.requires_duplicate_detection = requires_duplicate_detection
+        self.duplicate_detection_history_time_window = duplicate_detection_history_time_window
+        self.enable_batched_operations = enable_batched_operations
+        self.size_in_bytes = size_in_bytes
+        self.filtering_messages_before_publishing = filtering_messages_before_publishing
+        self.authorization_rules = authorization_rules
+        self.support_ordering = support_ordering
+        self.auto_delete_on_idle = auto_delete_on_idle
+        self.enable_partitioning = enable_partitioning
+        self.enable_express = enable_express
+        self.user_metadata = user_metadata
+        self.max_message_size_in_kilobytes = max_message_size_in_kilobytes
+
+    def execute(self, context: "Context") -> None:
+        """Creates Topic in Service Bus namespace, by connecting to Service Bus Admin client"""
+        if self.topic_name is None:
+            raise TypeError("Topic name cannot be None.")
+
+        # Create the hook
+        hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        with hook.get_conn() as service_mgmt_conn:
+            topic = service_mgmt_conn.create_topic(
+                topic_name=self.topic_name,
+                default_message_time_to_live=self.default_message_time_to_live,
+                max_size_in_megabytes=self.max_size_in_megabytes,
+                requires_duplicate_detection=self.requires_duplicate_detection,
+                duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
+                enable_batched_operations=self.enable_batched_operations,
+                size_in_bytes=self.size_in_bytes,
+                filtering_messages_before_publishing=self.filtering_messages_before_publishing,
+                authorization_rules=self.authorization_rules,
+                support_ordering=self.support_ordering,
+                auto_delete_on_idle=self.auto_delete_on_idle,
+                enable_partitioning=self.enable_partitioning,
+                enable_express=self.enable_express,
+                user_metadata=self.user_metadata,
+                max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
+            )
+            self.log.info("Created Topic %s", topic.name)

Review Comment:
   Given topic name is templated, it does make sense to return the value as the user might not know exactly what value it will result in (doubly so if this happens to be paired with dynamic task mapping -- although that is perhaps unlikely for this specific operator)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bharanidharan14 commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
bharanidharan14 commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1210193678

   > > > Just thinking, overwriting the existing topic is kind of update operation right ?
   > > 
   > > 
   > > Sure, depending on how Azure Service Bus management operates and/or if someone wanted to patch a particular attribute (if that's even possible). What if you wanted to completely rebuild the topic? Yes, you could check to see if it exists, delete it, and then rebuild it, but I was thinking some analogous feature to file operations.
   > > I'm not saying it _should_ or even _can_ be done. I haven't been used ASB in a long time. Just tossing spaghetti to see if it sticks. You can tell me to go back to my corner and it could be something for the future (or never) maybe.
   > 
   > Yes, there is an Option to patch/update the Topic, I was planning to add update topic operator as next future operator. on that the user can update whatever attribute they want(which is allowed to update).
   > 
   > Even I need suggestion this create topic operator should be.
   > 
   > 1. Only create and if it exists just return it ?
   > 2. Create the topic, if it exists delete it and create with new properties?
   
   @josh-fell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
josh-fell commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1211461946

   > > > > Just thinking, overwriting the existing topic is kind of update operation right ?
   > > > 
   > > > 
   > > > Sure, depending on how Azure Service Bus management operates and/or if someone wanted to patch a particular attribute (if that's even possible). What if you wanted to completely rebuild the topic? Yes, you could check to see if it exists, delete it, and then rebuild it, but I was thinking some analogous feature to file operations.
   > > > I'm not saying it _should_ or even _can_ be done. I haven't been used ASB in a long time. Just tossing spaghetti to see if it sticks. You can tell me to go back to my corner and it could be something for the future (or never) maybe.
   > > 
   > > 
   > > Yes, there is an Option to patch/update the Topic, I was planning to add update topic operator as next future operator. on that the user can update whatever attribute they want(which is allowed to update).
   > > Even I need suggestion this create topic operator should be.
   > > 
   > > 1. Only create and if it exists just return it ?
   > > 2. Create the topic, if it exists delete it and create with new properties?
   > 
   > @josh-fell
   
   I think option 1 is fine for now. If users find that indeed having option 2 would be helpful, they are more than welcome to log a feature or submit a PR adding the functionality. Thinking about my suggestion, it was a little much and out of scope for the initial creation of these operators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25436:
URL: https://github.com/apache/airflow/pull/25436#discussion_r935955757


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -15,7 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 import datetime
-from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union
+
+from azure.servicebus.management._models import AuthorizationRule

Review Comment:
   Why I wonder? We don't do that elsewhere? Is there a particular reason why you want to have it in TYPE_CHECKING?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bharanidharan14 commented on a diff in pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
bharanidharan14 commented on code in PR #25436:
URL: https://github.com/apache/airflow/pull/25436#discussion_r936276847


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -206,6 +208,120 @@ def execute(self, context: "Context") -> None:
         hook.delete_queue(self.queue_name)
 
 
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+    :param topic_name: Name of the topic.
+    :param default_message_time_to_live: ISO 8601 default message time span to live value. This is
+     the duration after which the message expires, starting from when the message is sent to Service
+     Bus. This is the default value used when TimeToLive is not set on a message itself.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
+     memory allocated for the topic.
+    :param requires_duplicate_detection: A value indicating if this topic requires duplicate
+     detection.
+    :param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
+     duration of the duplicate detection history. The default value is 10 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_batched_operations: Value that indicates whether server-side batched operations
+     are enabled.
+    :param size_in_bytes: The size of the topic, in bytes.
+    :param filtering_messages_before_publishing: Filter messages before publishing.
+    :param authorization_rules: List of Authorization rules for resource.
+    :param support_ordering: A value that indicates whether the topic supports ordering.
+    :param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
+     automatically deleted. The minimum duration is 5 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_partitioning: A value that indicates whether the topic is to be partitioned
+     across multiple message brokers.
+    :param enable_express: A value that indicates whether Express Entities are enabled. An express
+     queue holds a message in memory temporarily before writing it to persistent storage.
+    :param user_metadata: Metadata associated with the topic.
+    :param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
+     can be accepted by the queue. This feature is only available when using a Premium namespace
+     and Service Bus API version "2021-05" or higher.
+     The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None,
+        max_size_in_megabytes: Optional[int] = None,
+        requires_duplicate_detection: Optional[bool] = None,
+        duplicate_detection_history_time_window: Optional[Union[datetime.timedelta, str]] = None,
+        enable_batched_operations: Optional[bool] = None,
+        size_in_bytes: Optional[int] = None,
+        filtering_messages_before_publishing: Optional[bool] = None,
+        authorization_rules: Optional[List[AuthorizationRule]] = None,
+        support_ordering: Optional[bool] = None,
+        auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
+        enable_partitioning: Optional[bool] = None,
+        enable_express: Optional[bool] = None,
+        user_metadata: Optional[str] = None,
+        max_message_size_in_kilobytes: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.topic_name = topic_name
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+        self.default_message_time_to_live = default_message_time_to_live
+        self.max_size_in_megabytes = max_size_in_megabytes
+        self.requires_duplicate_detection = requires_duplicate_detection
+        self.duplicate_detection_history_time_window = duplicate_detection_history_time_window
+        self.enable_batched_operations = enable_batched_operations
+        self.size_in_bytes = size_in_bytes
+        self.filtering_messages_before_publishing = filtering_messages_before_publishing
+        self.authorization_rules = authorization_rules
+        self.support_ordering = support_ordering
+        self.auto_delete_on_idle = auto_delete_on_idle
+        self.enable_partitioning = enable_partitioning
+        self.enable_express = enable_express
+        self.user_metadata = user_metadata
+        self.max_message_size_in_kilobytes = max_message_size_in_kilobytes
+
+    def execute(self, context: "Context") -> None:
+        """Creates Topic in Service Bus namespace, by connecting to Service Bus Admin client"""
+        if self.topic_name is None:
+            raise TypeError("Topic name cannot be None.")
+
+        # Create the hook
+        hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        with hook.get_conn() as service_mgmt_conn:
+            topic = service_mgmt_conn.create_topic(
+                topic_name=self.topic_name,
+                default_message_time_to_live=self.default_message_time_to_live,
+                max_size_in_megabytes=self.max_size_in_megabytes,
+                requires_duplicate_detection=self.requires_duplicate_detection,
+                duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
+                enable_batched_operations=self.enable_batched_operations,
+                size_in_bytes=self.size_in_bytes,
+                filtering_messages_before_publishing=self.filtering_messages_before_publishing,
+                authorization_rules=self.authorization_rules,
+                support_ordering=self.support_ordering,
+                auto_delete_on_idle=self.auto_delete_on_idle,
+                enable_partitioning=self.enable_partitioning,
+                enable_express=self.enable_express,
+                user_metadata=self.user_metadata,
+                max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
+            )
+            self.log.info("Created Topic %s", topic.name)

Review Comment:
   We can return the `topic.name`, but my small thought is, if the user is creating the topic with specific name and he want to use the same name in the downstream task he can use same name which is passed to `create_topic` operator, in that case  do we need to return the `topic.name`? 



##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -206,6 +208,120 @@ def execute(self, context: "Context") -> None:
         hook.delete_queue(self.queue_name)
 
 
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+    :param topic_name: Name of the topic.
+    :param default_message_time_to_live: ISO 8601 default message time span to live value. This is
+     the duration after which the message expires, starting from when the message is sent to Service
+     Bus. This is the default value used when TimeToLive is not set on a message itself.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
+     memory allocated for the topic.
+    :param requires_duplicate_detection: A value indicating if this topic requires duplicate
+     detection.
+    :param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
+     duration of the duplicate detection history. The default value is 10 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_batched_operations: Value that indicates whether server-side batched operations
+     are enabled.
+    :param size_in_bytes: The size of the topic, in bytes.
+    :param filtering_messages_before_publishing: Filter messages before publishing.
+    :param authorization_rules: List of Authorization rules for resource.
+    :param support_ordering: A value that indicates whether the topic supports ordering.
+    :param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
+     automatically deleted. The minimum duration is 5 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_partitioning: A value that indicates whether the topic is to be partitioned
+     across multiple message brokers.
+    :param enable_express: A value that indicates whether Express Entities are enabled. An express
+     queue holds a message in memory temporarily before writing it to persistent storage.
+    :param user_metadata: Metadata associated with the topic.
+    :param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
+     can be accepted by the queue. This feature is only available when using a Premium namespace
+     and Service Bus API version "2021-05" or higher.
+     The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None,
+        max_size_in_megabytes: Optional[int] = None,
+        requires_duplicate_detection: Optional[bool] = None,
+        duplicate_detection_history_time_window: Optional[Union[datetime.timedelta, str]] = None,
+        enable_batched_operations: Optional[bool] = None,
+        size_in_bytes: Optional[int] = None,
+        filtering_messages_before_publishing: Optional[bool] = None,
+        authorization_rules: Optional[List[AuthorizationRule]] = None,
+        support_ordering: Optional[bool] = None,
+        auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
+        enable_partitioning: Optional[bool] = None,
+        enable_express: Optional[bool] = None,
+        user_metadata: Optional[str] = None,
+        max_message_size_in_kilobytes: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.topic_name = topic_name
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+        self.default_message_time_to_live = default_message_time_to_live
+        self.max_size_in_megabytes = max_size_in_megabytes
+        self.requires_duplicate_detection = requires_duplicate_detection
+        self.duplicate_detection_history_time_window = duplicate_detection_history_time_window
+        self.enable_batched_operations = enable_batched_operations
+        self.size_in_bytes = size_in_bytes
+        self.filtering_messages_before_publishing = filtering_messages_before_publishing
+        self.authorization_rules = authorization_rules
+        self.support_ordering = support_ordering
+        self.auto_delete_on_idle = auto_delete_on_idle
+        self.enable_partitioning = enable_partitioning
+        self.enable_express = enable_express
+        self.user_metadata = user_metadata
+        self.max_message_size_in_kilobytes = max_message_size_in_kilobytes
+
+    def execute(self, context: "Context") -> None:
+        """Creates Topic in Service Bus namespace, by connecting to Service Bus Admin client"""
+        if self.topic_name is None:
+            raise TypeError("Topic name cannot be None.")
+
+        # Create the hook
+        hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        with hook.get_conn() as service_mgmt_conn:
+            topic = service_mgmt_conn.create_topic(
+                topic_name=self.topic_name,
+                default_message_time_to_live=self.default_message_time_to_live,
+                max_size_in_megabytes=self.max_size_in_megabytes,
+                requires_duplicate_detection=self.requires_duplicate_detection,
+                duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
+                enable_batched_operations=self.enable_batched_operations,
+                size_in_bytes=self.size_in_bytes,
+                filtering_messages_before_publishing=self.filtering_messages_before_publishing,
+                authorization_rules=self.authorization_rules,
+                support_ordering=self.support_ordering,
+                auto_delete_on_idle=self.auto_delete_on_idle,
+                enable_partitioning=self.enable_partitioning,
+                enable_express=self.enable_express,
+                user_metadata=self.user_metadata,
+                max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
+            )
+            self.log.info("Created Topic %s", topic.name)

Review Comment:
   @josh-fell  We can return the `topic.name`, but my small thought is, if the user is creating the topic with specific name and he want to use the same name in the downstream task he can use same name which is passed to `create_topic` operator, in that case  do we need to return the `topic.name`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1203641169

   What is the behaviour if the topic you try to create already exists? I'm assuming the underlying hook would fail with an exception, but I think the operator should catch that error so that the operator can be used idempotently: i.e. we should think of the job of this operator to ensure that the specific topic exists. (Similarly for delete operator)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25436:
URL: https://github.com/apache/airflow/pull/25436#discussion_r936376701


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -15,7 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 import datetime
-from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union
+
+from azure.servicebus.management._models import AuthorizationRule

Review Comment:
   Because it's sole use is in a type annotation, and in this particular case, it feels like a "private" module (the underscore prefix)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1205394602

   > Would it be misleading for a task to succeed if the operation is should perform does not occur and the user didn't explicitly choose to be indifferent about the operation
   
   Not to my mind. Good tasks are idempotent, so deleting a non-existent topic has achieved the goal: to ensure that the topic does not exist.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bharanidharan14 commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
bharanidharan14 commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1205980607

   > > Just thinking, overwriting the existing topic is kind of update operation right ?
   > 
   > Sure, depending on how Azure Service Bus management operates and/or if someone wanted to patch a particular attribute (if that's even possible). What if you wanted to completely rebuild the topic? Yes, you could check to see if it exists, delete it, and then rebuild it, but I was thinking some analogous feature to file operations.
   > 
   > I'm not saying it _should_ or even _can_ be done. I haven't been used ASB in a long time. Just tossing spaghetti to see if it sticks. You can tell me to go back to my corner and it could be something for the future (or never) maybe.
   
   Yes, there is an Option to patch/update the Topic, I was planning to add update topic operator as next future operator. on that the user can update whatever attribute they want(which is allowed to update).
   
   Even I need suggestion this create topic operator should be.
   1. Only create and if it exists just return it ?
   2. Create the topic, if it exists delete it and create with new properties?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bharanidharan14 commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
bharanidharan14 commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1204776480

   > What is the behaviour if the topic you try to create already exists? I'm assuming the underlying hook would fail with an exception, but I think the operator should catch that error so that the operator can be used idempotently: i.e. we should think of the job of this operator to ensure that the specific topic exists. (Similarly for delete operator)
   
   yes the existing flow will through an error if the topic name exists, Now I have add `try catch` if the topic name exists and return the existing topic name with failing the operator


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
josh-fell commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1205422907

   > Just thinking, overwriting the existing topic is kind of update operation right ?
   
   Sure, depending on how Azure Service Bus management operates and/or if someone wanted to patch a particular attribute (if that's even possible). What if you wanted to completely rebuild the topic? Yes, you could check to see if it exists, delete it, and then rebuild it, but I was thinking some analogous feature to file operations.
   
   I'm not saying it _should_ or even _can_ be done. I haven't been used ASB in a long time. Just tossing spaghetti to see if it sticks. You can tell me to go back to my corner and it could be something for the future (or never) maybe. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #25436:
URL: https://github.com/apache/airflow/pull/25436#discussion_r936181426


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -206,6 +208,120 @@ def execute(self, context: "Context") -> None:
         hook.delete_queue(self.queue_name)
 
 
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+    :param topic_name: Name of the topic.
+    :param default_message_time_to_live: ISO 8601 default message time span to live value. This is
+     the duration after which the message expires, starting from when the message is sent to Service
+     Bus. This is the default value used when TimeToLive is not set on a message itself.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
+     memory allocated for the topic.
+    :param requires_duplicate_detection: A value indicating if this topic requires duplicate
+     detection.
+    :param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
+     duration of the duplicate detection history. The default value is 10 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_batched_operations: Value that indicates whether server-side batched operations
+     are enabled.
+    :param size_in_bytes: The size of the topic, in bytes.
+    :param filtering_messages_before_publishing: Filter messages before publishing.
+    :param authorization_rules: List of Authorization rules for resource.
+    :param support_ordering: A value that indicates whether the topic supports ordering.
+    :param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
+     automatically deleted. The minimum duration is 5 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_partitioning: A value that indicates whether the topic is to be partitioned
+     across multiple message brokers.
+    :param enable_express: A value that indicates whether Express Entities are enabled. An express
+     queue holds a message in memory temporarily before writing it to persistent storage.
+    :param user_metadata: Metadata associated with the topic.
+    :param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
+     can be accepted by the queue. This feature is only available when using a Premium namespace
+     and Service Bus API version "2021-05" or higher.
+     The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None,
+        max_size_in_megabytes: Optional[int] = None,
+        requires_duplicate_detection: Optional[bool] = None,
+        duplicate_detection_history_time_window: Optional[Union[datetime.timedelta, str]] = None,
+        enable_batched_operations: Optional[bool] = None,
+        size_in_bytes: Optional[int] = None,
+        filtering_messages_before_publishing: Optional[bool] = None,
+        authorization_rules: Optional[List[AuthorizationRule]] = None,
+        support_ordering: Optional[bool] = None,
+        auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
+        enable_partitioning: Optional[bool] = None,
+        enable_express: Optional[bool] = None,
+        user_metadata: Optional[str] = None,
+        max_message_size_in_kilobytes: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.topic_name = topic_name
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+        self.default_message_time_to_live = default_message_time_to_live
+        self.max_size_in_megabytes = max_size_in_megabytes
+        self.requires_duplicate_detection = requires_duplicate_detection
+        self.duplicate_detection_history_time_window = duplicate_detection_history_time_window
+        self.enable_batched_operations = enable_batched_operations
+        self.size_in_bytes = size_in_bytes
+        self.filtering_messages_before_publishing = filtering_messages_before_publishing
+        self.authorization_rules = authorization_rules
+        self.support_ordering = support_ordering
+        self.auto_delete_on_idle = auto_delete_on_idle
+        self.enable_partitioning = enable_partitioning
+        self.enable_express = enable_express
+        self.user_metadata = user_metadata
+        self.max_message_size_in_kilobytes = max_message_size_in_kilobytes
+
+    def execute(self, context: "Context") -> None:
+        """Creates Topic in Service Bus namespace, by connecting to Service Bus Admin client"""
+        if self.topic_name is None:
+            raise TypeError("Topic name cannot be None.")
+
+        # Create the hook
+        hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        with hook.get_conn() as service_mgmt_conn:
+            topic = service_mgmt_conn.create_topic(
+                topic_name=self.topic_name,
+                default_message_time_to_live=self.default_message_time_to_live,
+                max_size_in_megabytes=self.max_size_in_megabytes,
+                requires_duplicate_detection=self.requires_duplicate_detection,
+                duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
+                enable_batched_operations=self.enable_batched_operations,
+                size_in_bytes=self.size_in_bytes,
+                filtering_messages_before_publishing=self.filtering_messages_before_publishing,
+                authorization_rules=self.authorization_rules,
+                support_ordering=self.support_ordering,
+                auto_delete_on_idle=self.auto_delete_on_idle,
+                enable_partitioning=self.enable_partitioning,
+                enable_express=self.enable_express,
+                user_metadata=self.user_metadata,
+                max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
+            )
+            self.log.info("Created Topic %s", topic.name)

Review Comment:
   Would it be useful to return the `topic.name` from the operator? Maybe there are use cases in which a topic is created and downstream tasks could reference the newly-created topic's name (especially if it can be dynamically-generate as a template field)?



##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -206,6 +208,120 @@ def execute(self, context: "Context") -> None:
         hook.delete_queue(self.queue_name)
 
 
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+    :param topic_name: Name of the topic.
+    :param default_message_time_to_live: ISO 8601 default message time span to live value. This is

Review Comment:
   It would be nice to have hanging indents for the param descriptions for consistency with the code base and improved readability, but you could also consider this a nit. Up to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb merged pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
ashb merged PR #25436:
URL: https://github.com/apache/airflow/pull/25436


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25436:
URL: https://github.com/apache/airflow/pull/25436#discussion_r935730445


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -206,6 +208,120 @@ def execute(self, context: "Context") -> None:
         hook.delete_queue(self.queue_name)
 
 
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+    :param topic_name: Name of the topic.
+    :param default_message_time_to_live: ISO 8601 default message time span to live value. This is
+     the duration after which the message expires, starting from when the message is sent to Service
+     Bus. This is the default value used when TimeToLive is not set on a message itself.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
+     memory allocated for the topic.
+    :param requires_duplicate_detection: A value indicating if this topic requires duplicate
+     detection.
+    :param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
+     duration of the duplicate detection history. The default value is 10 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_batched_operations: Value that indicates whether server-side batched operations
+     are enabled.
+    :param size_in_bytes: The size of the topic, in bytes.
+    :param filtering_messages_before_publishing: Filter messages before publishing.
+    :param authorization_rules: List of Authorization rules for resource.
+    :param support_ordering: A value that indicates whether the topic supports ordering.
+    :param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
+     automatically deleted. The minimum duration is 5 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_partitioning: A value that indicates whether the topic is to be partitioned
+     across multiple message brokers.
+    :param enable_express: A value that indicates whether Express Entities are enabled. An express
+     queue holds a message in memory temporarily before writing it to persistent storage.
+    :param user_metadata: Metadata associated with the topic.
+    :param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
+     can be accepted by the queue. This feature is only available when using a Premium namespace
+     and Service Bus API version "2021-05" or higher.
+     The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None,
+        max_size_in_megabytes: Optional[int] = None,
+        requires_duplicate_detection: Optional[bool] = None,
+        duplicate_detection_history_time_window: Optional[Union[datetime.timedelta, str]] = None,
+        enable_batched_operations: Optional[bool] = None,
+        size_in_bytes: Optional[int] = None,
+        filtering_messages_before_publishing: Optional[bool] = None,
+        authorization_rules: Optional[List[AuthorizationRule]] = None,

Review Comment:
   ```suggestion
           authorization_rules: Optional[List["AuthorizationRule"]] = None,
   ```



##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -15,7 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 import datetime
-from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union
+
+from azure.servicebus.management._models import AuthorizationRule

Review Comment:
   Please move this to inside the `if TYPE_CHECKING` block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bharanidharan14 commented on a diff in pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
bharanidharan14 commented on code in PR #25436:
URL: https://github.com/apache/airflow/pull/25436#discussion_r937355217


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -15,7 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 import datetime
-from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union
+
+from azure.servicebus.management._models import AuthorizationRule

Review Comment:
   moved the code inside `if TYPE_CHECKING` 



##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -206,6 +208,120 @@ def execute(self, context: "Context") -> None:
         hook.delete_queue(self.queue_name)
 
 
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+    :param topic_name: Name of the topic.
+    :param default_message_time_to_live: ISO 8601 default message time span to live value. This is
+     the duration after which the message expires, starting from when the message is sent to Service
+     Bus. This is the default value used when TimeToLive is not set on a message itself.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
+     memory allocated for the topic.
+    :param requires_duplicate_detection: A value indicating if this topic requires duplicate
+     detection.
+    :param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
+     duration of the duplicate detection history. The default value is 10 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_batched_operations: Value that indicates whether server-side batched operations
+     are enabled.
+    :param size_in_bytes: The size of the topic, in bytes.
+    :param filtering_messages_before_publishing: Filter messages before publishing.
+    :param authorization_rules: List of Authorization rules for resource.
+    :param support_ordering: A value that indicates whether the topic supports ordering.
+    :param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
+     automatically deleted. The minimum duration is 5 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_partitioning: A value that indicates whether the topic is to be partitioned
+     across multiple message brokers.
+    :param enable_express: A value that indicates whether Express Entities are enabled. An express
+     queue holds a message in memory temporarily before writing it to persistent storage.
+    :param user_metadata: Metadata associated with the topic.
+    :param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
+     can be accepted by the queue. This feature is only available when using a Premium namespace
+     and Service Bus API version "2021-05" or higher.
+     The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None,
+        max_size_in_megabytes: Optional[int] = None,
+        requires_duplicate_detection: Optional[bool] = None,
+        duplicate_detection_history_time_window: Optional[Union[datetime.timedelta, str]] = None,
+        enable_batched_operations: Optional[bool] = None,
+        size_in_bytes: Optional[int] = None,
+        filtering_messages_before_publishing: Optional[bool] = None,
+        authorization_rules: Optional[List[AuthorizationRule]] = None,

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bharanidharan14 commented on a diff in pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
bharanidharan14 commented on code in PR #25436:
URL: https://github.com/apache/airflow/pull/25436#discussion_r937356982


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -206,6 +208,120 @@ def execute(self, context: "Context") -> None:
         hook.delete_queue(self.queue_name)
 
 
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+    :param topic_name: Name of the topic.
+    :param default_message_time_to_live: ISO 8601 default message time span to live value. This is
+     the duration after which the message expires, starting from when the message is sent to Service
+     Bus. This is the default value used when TimeToLive is not set on a message itself.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
+     memory allocated for the topic.
+    :param requires_duplicate_detection: A value indicating if this topic requires duplicate
+     detection.
+    :param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
+     duration of the duplicate detection history. The default value is 10 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_batched_operations: Value that indicates whether server-side batched operations
+     are enabled.
+    :param size_in_bytes: The size of the topic, in bytes.
+    :param filtering_messages_before_publishing: Filter messages before publishing.
+    :param authorization_rules: List of Authorization rules for resource.
+    :param support_ordering: A value that indicates whether the topic supports ordering.
+    :param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
+     automatically deleted. The minimum duration is 5 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_partitioning: A value that indicates whether the topic is to be partitioned
+     across multiple message brokers.
+    :param enable_express: A value that indicates whether Express Entities are enabled. An express
+     queue holds a message in memory temporarily before writing it to persistent storage.
+    :param user_metadata: Metadata associated with the topic.
+    :param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
+     can be accepted by the queue. This feature is only available when using a Premium namespace
+     and Service Bus API version "2021-05" or higher.
+     The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None,
+        max_size_in_megabytes: Optional[int] = None,
+        requires_duplicate_detection: Optional[bool] = None,
+        duplicate_detection_history_time_window: Optional[Union[datetime.timedelta, str]] = None,
+        enable_batched_operations: Optional[bool] = None,
+        size_in_bytes: Optional[int] = None,
+        filtering_messages_before_publishing: Optional[bool] = None,
+        authorization_rules: Optional[List[AuthorizationRule]] = None,
+        support_ordering: Optional[bool] = None,
+        auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
+        enable_partitioning: Optional[bool] = None,
+        enable_express: Optional[bool] = None,
+        user_metadata: Optional[str] = None,
+        max_message_size_in_kilobytes: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.topic_name = topic_name
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+        self.default_message_time_to_live = default_message_time_to_live
+        self.max_size_in_megabytes = max_size_in_megabytes
+        self.requires_duplicate_detection = requires_duplicate_detection
+        self.duplicate_detection_history_time_window = duplicate_detection_history_time_window
+        self.enable_batched_operations = enable_batched_operations
+        self.size_in_bytes = size_in_bytes
+        self.filtering_messages_before_publishing = filtering_messages_before_publishing
+        self.authorization_rules = authorization_rules
+        self.support_ordering = support_ordering
+        self.auto_delete_on_idle = auto_delete_on_idle
+        self.enable_partitioning = enable_partitioning
+        self.enable_express = enable_express
+        self.user_metadata = user_metadata
+        self.max_message_size_in_kilobytes = max_message_size_in_kilobytes
+
+    def execute(self, context: "Context") -> None:
+        """Creates Topic in Service Bus namespace, by connecting to Service Bus Admin client"""
+        if self.topic_name is None:
+            raise TypeError("Topic name cannot be None.")
+
+        # Create the hook
+        hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        with hook.get_conn() as service_mgmt_conn:
+            topic = service_mgmt_conn.create_topic(
+                topic_name=self.topic_name,
+                default_message_time_to_live=self.default_message_time_to_live,
+                max_size_in_megabytes=self.max_size_in_megabytes,
+                requires_duplicate_detection=self.requires_duplicate_detection,
+                duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
+                enable_batched_operations=self.enable_batched_operations,
+                size_in_bytes=self.size_in_bytes,
+                filtering_messages_before_publishing=self.filtering_messages_before_publishing,
+                authorization_rules=self.authorization_rules,
+                support_ordering=self.support_ordering,
+                auto_delete_on_idle=self.auto_delete_on_idle,
+                enable_partitioning=self.enable_partitioning,
+                enable_express=self.enable_express,
+                user_metadata=self.user_metadata,
+                max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
+            )
+            self.log.info("Created Topic %s", topic.name)

Review Comment:
   Returned the topic name 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #25436:
URL: https://github.com/apache/airflow/pull/25436#discussion_r937921158


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -467,3 +588,44 @@ def execute(self, context: "Context") -> None:
 
         # delete subscription with name
         hook.delete_subscription(self.subscription_name, self.topic_name)
+
+
+class AzureServiceBusTopicDeleteOperator(BaseOperator):
+    """
+    Deletes the topic in the Azure Service Bus namespace
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureServiceBusTopicDeleteOperator`
+
+    :param topic_name: Name of the topic to be deleted.
+    :param azure_service_bus_conn_id: Reference to the
+        :ref:`Azure Service Bus connection <howto/connection:azure_service_bus>`.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.topic_name = topic_name
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+
+    def execute(self, context: "Context") -> None:
+        """Delete topic in Service Bus namespace, by connecting to Service Bus Admin client"""
+        if self.topic_name is None:
+            raise TypeError("Topic name cannot be None.")
+        hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        with hook.get_conn() as service_mgmt_conn:
+            topic_properties = service_mgmt_conn.get_topic(self.topic_name)
+            if topic_properties and topic_properties.name == self.topic_name:
+                service_mgmt_conn.delete_topic(self.topic_name)
+            else:
+                self.log.info("Topic does not exists")

Review Comment:
   ```suggestion
                   self.log.info("Topic %s does not exist.", self.topic_name)
   ```
   Minor suggestion to add the name as well. Might be good to see it especially if the `topic_name` is dynamically generated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
josh-fell commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1205401820

   > Not to my mind. Good tasks are idempotent, so deleting a non-existent topic has achieved the goal: to ensure that the topic does not exist.
   
   Fair point.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
josh-fell commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1205366750

   > yes the existing flow will through an error if the topic name exists, Now I have add `try catch` so before creating the topic just checking if the topic exists, if it exists logging it with message and returning the topic name, similarly for delete operator
   
   I wonder if there should be a parameter for users to have the option of overwriting a topic that might exist. WDYT?
   
   Also, for deleting a topic, if the topic doesn't exist should the task fail? Would it be misleading for a task to succeed if the operation is should perform does not occur and the user didn't explicitly choose to be indifferent about the operation? I could imagine a scenario where users expect to have topics deleted when they aren't even though "Airflow says" the deletion task succeeded. Just some thoughts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bharanidharan14 commented on pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

Posted by GitBox <gi...@apache.org>.
bharanidharan14 commented on PR #25436:
URL: https://github.com/apache/airflow/pull/25436#issuecomment-1216175399

   @josh-fell @ashb Can you review this PR, I have addressed the review comments 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org