You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/24 04:27:48 UTC

[GitHub] [flink] deadwind4 commented on a change in pull request #18388: [FLINK-25530][python][connector/pulsar] Support Pulsar source connector in Python DataStream API.

deadwind4 commented on a change in pull request #18388:
URL: https://github.com/apache/flink/pull/18388#discussion_r790406471



##########
File path: flink-python/pyflink/datastream/connectors.py
##########
@@ -1079,6 +1085,320 @@ def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuild
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
 
+class PulsarDeserializationSchema(object):
+    """
+    A schema bridge for deserializing the pulsar's Message into a flink managed instance. We
+    support both the pulsar's self managed schema and flink managed schema.
+    """
+
+    def __init__(self, _j_pulsar_deserialization_schema):
+        self._j_pulsar_deserialization_schema = _j_pulsar_deserialization_schema
+
+    @staticmethod
+    def flink_schema(deserialization_schema: DeserializationSchema) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the flink's DeserializationSchema. It would
+        consume the pulsar message as byte array and decode the message by using flink's logic.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkSchema(
+            deserialization_schema._j_deserialization_schema)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+    @staticmethod
+    def flink_type_info(type_information: TypeInformation, execution_config: ExecutionConfig) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the given TypeInformation. This method is
+        only used for treating message that was written into pulsar by TypeInformation.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_execution_config = execution_config._j_execution_config \
+            if execution_config is not None else None
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo(
+            type_information.get_java_type_info(), _j_execution_config)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+
+class SubscriptionType(Enum):
+    """
+    Types of subscription supported by Pulsar.
+
+    :data: `Exclusive`:
+
+    There can be only 1 consumer on the same topic with the same subscription name.
+
+    :data: `Shared`:
+
+    Multiple consumer will be able to use the same subscription name and the messages will be
+    dispatched according to a round-robin rotation between the connected consumers. In this mode,
+    the consumption order is not guaranteed.
+
+    :data: `Failover`:
+
+    Multiple consumer will be able to use the same subscription name but only 1 consumer will
+    receive the messages. If that consumer disconnects, one of the other connected consumers will
+    start receiving messages. In failover mode, the consumption ordering is guaranteed. In case of
+    partitioned topics, the ordering is guaranteed on a per-partition basis. The partitions
+    assignments will be split across the available consumers. On each partition, at most one
+    consumer will be active at a given point in time.
+
+    :data: `Key_Shared`:
+
+    Multiple consumer will be able to use the same subscription and all messages with the same key
+    will be dispatched to only one consumer. Use ordering_key to overwrite the message key for
+    message ordering.
+    """
+
+    Exclusive = 0,
+    Shared = 1,
+    Failover = 2,
+    Key_Shared = 3
+
+    def _to_j_subscription_type(self):
+        JSubscriptionType = get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
+        return getattr(JSubscriptionType, self.name)
+
+
+class StartCursor(object):
+    """
+    A interface for users to specify the start position of a pulsar subscription.
+    Since it would be serialized into split.
+    The implementation for this interface should be well considered.
+    I don't recommend adding extra internal state for this implementation.
+
+    This class would be used only for SubscriptionType.Exclusive and SubscriptionType.Failover.
+    """
+
+    def __init__(self, _j_start_cursor):
+        self._j_start_cursor = _j_start_cursor
+
+    @staticmethod
+    def default_start_cursor() -> 'StartCursor':
+        return StartCursor.earliest()
+
+    @staticmethod
+    def earliest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.earliest())
+
+    @staticmethod
+    def latest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.latest())
+
+    @staticmethod
+    def from_message_time(timestamp: int) -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.fromMessageTime(timestamp))
+
+
+class StopCursor(object):
+    """
+    A interface for users to specify the stop position of a pulsar subscription. Since it would be
+    serialized into split. The implementation for this interface should be well considered. I don't
+    recommend adding extra internal state for this implementation.
+    """
+
+    def __init__(self, _j_stop_cursor):
+        self._j_stop_cursor = _j_stop_cursor
+
+    @staticmethod
+    def default_stop_cursor() -> 'StopCursor':
+        return StopCursor.never()
+
+    @staticmethod
+    def never() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.never())
+
+    @staticmethod
+    def latest() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.latest())
+
+    @staticmethod
+    def at_event_time(timestamp: int) -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.atEventTime(timestamp))
+
+
+class PulsarSource(Source):
+    """
+    The Source implementation of Pulsar. Please use a PulsarSourceBuilder to construct a
+    PulsarSource.
+    """
+
+    def __init__(self, j_pulsar_source):
+        super(PulsarSource, self).__init__(source=j_pulsar_source)
+
+    @staticmethod
+    def builder() -> 'PulsarSourceBuilder':
+        """
+        Get a PulsarSourceBuilder to builder a PulsarSource.
+        """
+        return PulsarSourceBuilder()
+
+
+class PulsarSourceBuilder(object):
+    """
+    The builder class for PulsarSource to make it easier for the users to construct a PulsarSource.
+
+    The service url, admin url, subscription name, topics to consume, and the record deserializer
+    are required fields that must be set.
+
+    To specify the starting position of PulsarSource, one can call set_start_cursor(StartCursor).
+
+    By default the PulsarSource runs in an Boundedness.CONTINUOUS_UNBOUNDED mode and never stop
+    until the Flink job is canceled or fails. To let the PulsarSource run in
+    Boundedness.CONTINUOUS_UNBOUNDED but stops at some given offsets, one can call
+    set_unbounded_stop_cursor(StopCursor).

Review comment:
       have added examples.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org