You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/06/09 05:38:55 UTC

[flink] branch master updated: [FLINK-27729][python][pulsar] Support constructing StartCursor and StopCursor from MessageId

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b35b65e2d5 [FLINK-27729][python][pulsar] Support constructing StartCursor and StopCursor from MessageId
8b35b65e2d5 is described below

commit 8b35b65e2d51aa5d166d4395ecbb5cfa21ed55e7
Author: pengmide <pe...@gmail.com>
AuthorDate: Wed Jun 8 20:18:27 2022 +0800

    [FLINK-27729][python][pulsar] Support constructing StartCursor and StopCursor from MessageId
    
    This closes #19917.
---
 .../docs/connectors/datastream/pulsar.md           | 36 +++++++++++++
 docs/content/docs/connectors/datastream/pulsar.md  | 36 +++++++++++++
 .../pyflink/datastream/connectors/pulsar.py        | 59 ++++++++++++++++++++++
 3 files changed, 131 insertions(+)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index 0f84f6b492e..b7ca75e3c52 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -308,13 +308,31 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
   {{< /tabs >}}
 
 - 从给定的消息开始消费。
+  {{< tabs "pulsar-starting-position-from-message-id" >}}
+  {{< tab "Java" >}}
   ```java
   StartCursor.fromMessageId(MessageId);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StartCursor.from_message_id(message_id)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 - 与前者不同的是,给定的消息可以跳过,再进行消费。
+  {{< tabs "pulsar-starting-position-from-message-id-bool" >}}
+  {{< tab "Java" >}}
   ```java
   StartCursor.fromMessageId(MessageId, boolean);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StartCursor.from_message_id(message_id, boolean)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 - 从给定的消息时间开始消费。
   {{< tabs "pulsar-starting-position-message-time" >}}
   {{< tab "Java" >}}
@@ -372,13 +390,31 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
   {{< /tabs >}}
 
 - 停止于某条消息,结果里不包含此消息。
+  {{< tabs "pulsar-boundedness-at-message-id" >}}
+  {{< tab "Java" >}}
   ```java
   StopCursor.atMessageId(MessageId);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StopCursor.at_message_id(message_id)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 - 停止于某条消息之后,结果里包含此消息。
+  {{< tabs "pulsar-boundedness-after-message-id" >}}
+  {{< tab "Java" >}}
   ```java
   StopCursor.afterMessageId(MessageId);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StopCursor.after_message_id(message_id)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 - 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`。
   {{< tabs "pulsar-boundedness-publish-time" >}}
   {{< tab "Java" >}}
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index 66d050e540c..568110193e8 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -342,16 +342,34 @@ Built-in start cursors include:
 The Pulsar connector consumes from the latest available message if the message ID does not exist.
 
   The start message is included in consuming result.
+  {{< tabs "pulsar-starting-position-from-message-id" >}}
+  {{< tab "Java" >}}
   ```java
   StartCursor.fromMessageId(MessageId);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StartCursor.from_message_id(message_id)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 - Start from a specified message between the earliest and the latest.
 The Pulsar connector consumes from the latest available message if the message ID doesn't exist.
 
   Include or exclude the start message by using the second boolean parameter.
+  {{< tabs "pulsar-starting-position-from-message-id-bool" >}}
+  {{< tab "Java" >}}
   ```java
   StartCursor.fromMessageId(MessageId, boolean);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StartCursor.from_message_id(message_id, boolean)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 
 - Start from the specified message time by `Message<byte[]>.getPublishTime()`.
   {{< tabs "pulsar-starting-position-message-time" >}}
@@ -415,13 +433,31 @@ Built-in stop cursors include:
   {{< /tabs >}}
 
 - Stop when the connector meets a given message, or stop at a message which is produced after this given message.
+  {{< tabs "pulsar-boundedness-at-message-id" >}}
+  {{< tab "Java" >}}
   ```java
   StopCursor.atMessageId(MessageId);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StopCursor.at_message_id(message_id)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 - Stop but include the given message in the consuming result.
+  {{< tabs "pulsar-boundedness-after-message-id" >}}
+  {{< tab "Java" >}}
   ```java
   StopCursor.afterMessageId(MessageId);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StopCursor.after_message_id(message_id)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 
 - Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
   {{< tabs "pulsar-boundedness-publish-time" >}}
diff --git a/flink-python/pyflink/datastream/connectors/pulsar.py b/flink-python/pyflink/datastream/connectors/pulsar.py
index b6d58290830..79efca87f04 100644
--- a/flink-python/pyflink/datastream/connectors/pulsar.py
+++ b/flink-python/pyflink/datastream/connectors/pulsar.py
@@ -143,6 +143,25 @@ class StartCursor(object):
             .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
         return StartCursor(JStartCursor.fromMessageTime(timestamp))
 
+    @staticmethod
+    def from_message_id(message_id: bytes, inclusive: bool = True) -> 'StartCursor':
+        """
+        Find the available message id and start consuming from it. User could call pulsar Python
+        library serialize method to cover messageId bytes.
+
+        Example:
+        ::
+
+            >>> from pulsar import MessageId
+            >>> message_id_bytes = MessageId().serialize()
+            >>> start_cursor = StartCursor.from_message_id(message_id_bytes)
+        """
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        j_message_id = get_gateway().jvm.org.apache.pulsar.client.api.MessageId \
+            .fromByteArray(message_id)
+        return StartCursor(JStartCursor.fromMessageId(j_message_id, inclusive))
+
 
 class StopCursor(object):
     """
@@ -187,6 +206,46 @@ class StopCursor(object):
             .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
         return StopCursor(JStopCursor.atPublishTime(timestamp))
 
+    @staticmethod
+    def at_message_id(message_id: bytes) -> 'StopCursor':
+        """
+        Stop when the messageId is equal or greater than the specified messageId. Message that is
+        equal to the specified messageId will not be consumed. User could call pulsar Python
+        library serialize method to cover messageId bytes.
+
+        Example:
+        ::
+
+            >>> from pulsar import MessageId
+            >>> message_id_bytes = MessageId().serialize()
+            >>> stop_cursor = StopCursor.at_message_id(message_id_bytes)
+        """
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        j_message_id = get_gateway().jvm.org.apache.pulsar.client.api.MessageId \
+            .fromByteArray(message_id)
+        return StopCursor(JStopCursor.atMessageId(j_message_id))
+
+    @staticmethod
+    def after_message_id(message_id: bytes) -> 'StopCursor':
+        """
+        Stop when the messageId is greater than the specified messageId. Message that is equal to
+        the specified messageId will be consumed. User could call pulsar Python library serialize
+        method to cover messageId bytes.
+
+        Example:
+        ::
+
+            >>> from pulsar import MessageId
+            >>> message_id_bytes = MessageId().serialize()
+            >>> stop_cursor = StopCursor.after_message_id(message_id_bytes)
+        """
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        j_message_id = get_gateway().jvm.org.apache.pulsar.client.api.MessageId \
+            .fromByteArray(message_id)
+        return StopCursor(JStopCursor.afterMessageId(j_message_id))
+
 
 class PulsarSource(Source):
     """