You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/02 15:22:46 UTC

[GitHub] [airflow] ashb commented on a diff in pull request #25436: Implement Azure Service Bus Topic (Create, Delete) Operators

ashb commented on code in PR #25436:
URL: https://github.com/apache/airflow/pull/25436#discussion_r935730445


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -206,6 +208,120 @@ def execute(self, context: "Context") -> None:
         hook.delete_queue(self.queue_name)
 
 
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+    :param topic_name: Name of the topic.
+    :param default_message_time_to_live: ISO 8601 default message time span to live value. This is
+     the duration after which the message expires, starting from when the message is sent to Service
+     Bus. This is the default value used when TimeToLive is not set on a message itself.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
+     memory allocated for the topic.
+    :param requires_duplicate_detection: A value indicating if this topic requires duplicate
+     detection.
+    :param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
+     duration of the duplicate detection history. The default value is 10 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_batched_operations: Value that indicates whether server-side batched operations
+     are enabled.
+    :param size_in_bytes: The size of the topic, in bytes.
+    :param filtering_messages_before_publishing: Filter messages before publishing.
+    :param authorization_rules: List of Authorization rules for resource.
+    :param support_ordering: A value that indicates whether the topic supports ordering.
+    :param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
+     automatically deleted. The minimum duration is 5 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
+     like "PT300S" is accepted.
+    :param enable_partitioning: A value that indicates whether the topic is to be partitioned
+     across multiple message brokers.
+    :param enable_express: A value that indicates whether Express Entities are enabled. An express
+     queue holds a message in memory temporarily before writing it to persistent storage.
+    :param user_metadata: Metadata associated with the topic.
+    :param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
+     can be accepted by the queue. This feature is only available when using a Premium namespace
+     and Service Bus API version "2021-05" or higher.
+     The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None,
+        max_size_in_megabytes: Optional[int] = None,
+        requires_duplicate_detection: Optional[bool] = None,
+        duplicate_detection_history_time_window: Optional[Union[datetime.timedelta, str]] = None,
+        enable_batched_operations: Optional[bool] = None,
+        size_in_bytes: Optional[int] = None,
+        filtering_messages_before_publishing: Optional[bool] = None,
+        authorization_rules: Optional[List[AuthorizationRule]] = None,

Review Comment:
   ```suggestion
           authorization_rules: Optional[List["AuthorizationRule"]] = None,
   ```



##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -15,7 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 import datetime
-from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union
+
+from azure.servicebus.management._models import AuthorizationRule

Review Comment:
   Please move this to inside the `if TYPE_CHECKING` block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org