You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/12 08:43:14 UTC

[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r871110043


##########
flink-python/pyflink/datastream/connectors.py:
##########
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
         return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+    """
+    DeliverGuarantees that can be chosen. In general your pipeline can only offer the lowest
+    delivery guarantee which is supported by your sources and sinks.
+
+    :data: `EXACTLY_ONCE`:
+    Records are only delivered exactly-once also under failover scenarios. To build a complete
+    exactly-once pipeline is required that the source and sink support exactly-once and are
+    properly configured.
+
+    :data: `AT_LEAST_ONCE`:
+    Records are ensured to be delivered but it may happen that the same record is delivered
+    multiple times. Usually, this guarantee is faster than the exactly-once delivery.
+
+    :data: `NONE`:
+    Records are delivered on a best effort basis. It is often the fastest way to process records
+    but it may happen that records are lost or duplicated.
+    """
+
+    EXACTLY_ONCE = 0,
+    AT_LEAST_ONCE = 1,
+    NONE = 2
+
+    def _to_j_delivery_guarantee(self):
+        JDeliveryGuarantee = get_gateway().jvm \
+            .org.apache.flink.connector.base.DeliveryGuarantee
+        return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+    """
+    The serialization schema for how to serialize records into Pulsar.
+    """
+
+    def __init__(self, _j_pulsar_serialization_schema):
+        self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+    @staticmethod
+    def flink_schema(serialization_schema: SerializationSchema) \
+            -> 'PulsarSerializationSchema':
+        """
+        Create a PulsarSerializationSchema by using the flink's SerializationSchema. It would
+        serialize the message into byte array and send it to Pulsar with Schema#BYTES.
+        """
+        JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
+        _j_pulsar_serialization_schema = JPulsarSerializationSchema.flinkSchema(
+            serialization_schema._j_serialization_schema)
+        return PulsarSerializationSchema(_j_pulsar_serialization_schema)
+
+
+class TopicRoutingMode(Enum):
+    """
+    The routing policy for choosing the desired topic by the given message.
+
+    :data: `ROUND_ROBIN`:
+    The producer will publish messages across all partitions in a round-robin fashion to achieve
+    maximum throughput. Please note that round-robin is not done per individual message but
+    rather it's set to the same boundary of batching delay, to ensure batching is effective.
+
+    :data: `MESSAGE_KEY_HASH`:
+    If no key is provided, The partitioned producer will randomly pick one single topic partition
+    and publish all the messages into that partition. If a key is provided on the message, the
+    partitioned producer will hash the key and assign the message to a particular partition.
+
+    :data: `CUSTOM`:
+    Use custom topic router implementation that will be called to determine the partition for a
+    particular message.
+    """
+
+    ROUND_ROBIN = 0
+    MESSAGE_KEY_HASH = 1
+    CUSTOM = 2
+
+    def _to_j_topic_routing_mode(self):
+        JTopicRoutingMode = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode
+        return getattr(JTopicRoutingMode, self.name)
+
+
+class MessageDelayer(object):
+    """
+    A delayer for Pulsar broker passing the sent message to the downstream consumer. This is only
+    works in {@link SubscriptionType#Shared} subscription.
+
+    Read delayed message delivery
+    https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery for better
+    understanding this feature.
+    """
+    def __init__(self, _j_message_delayer):
+        self._j_message_delayer = _j_message_delayer
+
+    @staticmethod
+    def never() -> 'MessageDelayer':
+        """
+        All the messages should be consumed immediately.
+        """
+        JMessageDelayer = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+        return MessageDelayer(JMessageDelayer.never())
+
+    @staticmethod
+    def fixed(duration: Duration) -> 'MessageDelayer':
+        """
+        All the messages should be consumed in a fixed duration.
+        """
+        JMessageDelayer = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+        return MessageDelayer(JMessageDelayer.fixed(duration._j_duration))
+
+
+class PulsarSink(Sink):
+    """
+    The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to construct a
+    PulsarSink. The following example shows how to create a PulsarSink receiving records of
+    String type.
+
+    Example:
+    ::
+
+        >>> sink = PulsarSink() \\
+        ...     .builder() \\
+        ...     .set_service_url(PULSAR_BROKER_URL) \\
+        ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
+        ...     .set_topics(topic) \\
+        ...     .set_serialization_schema(
+        ...         PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .build()
+
+    The sink supports all delivery guarantees described by DeliveryGuarantee.
+
+    DeliveryGuarantee#NONE does not provide any guarantees: messages may be lost in
+    case of issues on the Pulsar broker and messages may be duplicated in case of a Flink
+    failure.
+
+    DeliveryGuarantee#AT_LEAST_ONCE the sink will wait for all outstanding records in
+    the Pulsar buffers to be acknowledged by the Pulsar producer on a checkpoint. No messages
+    will be lost in case of any issue with the Pulsar brokers but messages may be duplicated
+    when Flink restarts.
+
+    DeliveryGuarantee#EXACTLY_ONCE: In this mode the PulsarSink will write all messages
+    in a Pulsar transaction that will be committed to Pulsar on a checkpoint. Thus, no
+    duplicates will be seen in case of a Flink restart. However, this delays record writing
+     effectively until a checkpoint is written, so adjust the checkpoint duration accordingly.
+    Additionally, it is highly recommended to tweak Pulsar transaction timeout (link) >>
+    maximum checkpoint duration + maximum restart duration or data loss may happen when Pulsar
+    expires an uncommitted transaction.
+
+    See PulsarSinkBuilder for more details.
+    """
+
+    def __init__(self, j_pulsar_sink):
+        super(PulsarSink, self).__init__(sink=j_pulsar_sink)
+
+    @staticmethod
+    def builder() -> 'PulsarSinkBuilder':
+        """
+        Get a PulsarSinkBuilder to builder a PulsarSink.
+        """
+        return PulsarSinkBuilder()
+
+
+class PulsarSinkBuilder(object):
+    """
+    The builder class for PulsarSink to make it easier for the users to construct a PulsarSink.
+
+    The following example shows the minimum setup to create a PulsarSink that reads the String
+    values from a Pulsar topic.
+
+    Example:
+    ::
+
+        >>> sink = PulsarSink() \\
+        ...     .builder() \\
+        ...     .set_service_url(PULSAR_BROKER_URL) \\
+        ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
+        ...     .set_topics([TOPIC1, TOPIC2]) \\
+        ...     .set_serialization_schema(
+        ...         PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .build()
+
+    The service url, admin url, and the record serializer are required fields that must be set. If
+    you don't set the topics, make sure you have provided a custom TopicRouter. Otherwise,
+    you must provide the topics to produce.
+
+    To specify the delivery guarantees of PulsarSink, one can call
+    #setDeliveryGuarantee(DeliveryGuarantee). The default value of the delivery guarantee is
+    DeliveryGuarantee#NONE, and it wouldn't promise the consistence when write the message into
+    Pulsar.
+
+    Example:
+    ::
+
+        >>> sink = PulsarSink() \\
+        ...     .builder() \\
+        ...     .set_service_url(PULSAR_BROKER_URL) \\
+        ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
+        ...     .set_topics([TOPIC1, TOPIC2]) \\
+        ...     .set_serialization_schema(
+        ...         PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
+        ...     .build()
+    """
+
+    def __init__(self):
+        JPulsarSink = get_gateway().jvm.org.apache.flink.connector.pulsar.sink.PulsarSink
+        self._j_pulsar_sink_builder = JPulsarSink.builder()
+
+    def set_admin_url(self, admin_url: str) -> 'PulsarSinkBuilder':
+        """
+        Sets the admin endpoint for the PulsarAdmin of the PulsarSink.
+        """
+        self._j_pulsar_sink_builder.setAdminUrl(admin_url)
+        return self
+
+    def set_service_url(self, service_url: str) -> 'PulsarSinkBuilder':
+        """
+        Sets the server's link for the PulsarProducer of the PulsarSink.
+        """
+        self._j_pulsar_sink_builder.setServiceUrl(service_url)
+        return self
+
+    def set_producer_name(self, producer_name: str) -> 'PulsarSinkBuilder':
+        """
+        The producer name is informative, and it can be used to identify a particular producer
+        instance from the topic stats.
+        """
+        self._j_pulsar_sink_builder.setProducerName(producer_name)
+        return self
+
+    def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSinkBuilder':
+        """
+        Set a pulsar topic list for flink sink. Some topic may not exist currently, write to this
+        non-existed topic wouldn't throw any exception.
+        """
+        if not isinstance(topics, list):
+            topics = [topics]
+        self._j_pulsar_sink_builder.setTopics(topics)
+        return self
+
+    def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) -> 'PulsarSinkBuilder':
+        """
+        Sets the wanted the DeliveryGuarantee. The default delivery guarantee is
+        DeliveryGuarantee#NONE.
+        """
+        self._j_pulsar_sink_builder.setDeliveryGuarantee(
+            delivery_guarantee._to_j_delivery_guarantee())
+        return self
+
+    def set_topic_routing_mode(self, topic_routing_mode: TopicRoutingMode) -> 'PulsarSinkBuilder':
+        """
+        Set a routing mode for choosing right topic partition to send messages.
+        """
+        self._j_pulsar_sink_builder.setTopicRoutingMode(
+            topic_routing_mode._to_j_topic_routing_mode())
+        return self
+
+    def set_serialization_schema(self, pulsar_serialization_schema: PulsarSerializationSchema) \
+            -> 'PulsarSinkBuilder':
+        """
+        Sets the PulsarSerializationSchema that transforms incoming records to bytes.
+        """
+        self._j_pulsar_sink_builder.setSerializationSchema(
+            pulsar_serialization_schema._j_pulsar_serialization_schema)
+        return self
+
+    def enable_schema_evolution(self) \
+            -> 'PulsarSinkBuilder':
+        """
+        If you enable this option, we would consume and deserialize the message by using Pulsar
+        Schema.
+        """
+        self._j_pulsar_sink_builder.enableSchemaEvolution()
+        return self
+
+    def delay_sending_message(self, message_delayer: MessageDelayer):
+        """
+        Set a message delayer for enable Pulsar message delay delivery.
+        """
+        self._j_pulsar_sink_builder.delaySendingMessage(message_delayer._j_message_delayer)
+        return self
+
+    def set_config(self, key: ConfigOption, value) -> 'PulsarSinkBuilder':
+        """
+        Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be
+        found in PulsarSinkOptions and PulsarOptions.
+
+        Make sure the option could be set only once or with same value.
+        """
+        self._j_pulsar_sink_builder.setConfig(key._j_config_option, value)
+        return self
+
+    def set_config_with_dict(self, config: Dict) -> 'PulsarSinkBuilder':

Review Comment:
   I change only the method name. Same as before, I call setConfig(JConfiguration.fromMap(config)) in this method. Because Python dict can auto convert Java HashMap, using setProperties is not convenient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org