You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/10 06:41:38 UTC

[GitHub] [flink] imaffe commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

imaffe commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r917344225


##########
docs/content.zh/docs/connectors/datastream/pulsar.md:
##########
@@ -429,9 +448,10 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
   {{< /tab >}}
   {{< /tabs >}}
 
-  {{< hint warning >}}
-  StopCursor.atEventTime(long) 目前已经处于弃用状态。
-  {{< /hint >}}
+- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里包含此时间戳的消息。
+  ```java
+  StopCursor.afterPublishTime(long);

Review Comment:
   Nice renaming~ 



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -248,17 +230,19 @@ private void assignPendingPartitionSplits(List<Integer> pendingReaders) {
                 });
 
         // Assign splits to downstream readers.
-        assignmentState.assignSplits(pendingReaders).ifPresent(context::assignSplits);
+        splitAssigner.createAssignment(pendingReaders).ifPresent(context::assignSplits);
 
         // If periodically partition discovery is disabled and the initializing discovery has done,
-        // signal NoMoreSplitsEvent to pending readers
-        if (assignmentState.noMoreNewPartitionSplits()) {
-            LOG.debug(
-                    "No more PulsarPartitionSplits to assign."
-                            + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
-                    pendingReaders,
-                    sourceConfiguration.getSubscriptionDesc());
-            pendingReaders.forEach(this.context::signalNoMoreSplits);
+        // signal NoMoreSplitsEvent to pending readers.
+        for (Integer reader : pendingReaders) {
+            if (splitAssigner.noMoreSplits(reader)) {
+                LOG.debug(
+                        "No more PulsarPartitionSplits to assign."
+                                + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
+                        reader,
+                        sourceConfiguration.getSubscriptionDesc());
+                context.signalNoMoreSplits(reader);

Review Comment:
   When will individual reader receive no more splits ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org