You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/15 16:32:48 UTC

[GitHub] [flink] syhily opened a new pull request, #19972: [FLINK-27399][Connector/Pulsar] Modify start cursor and stop cursor, change initial position setting logic.

syhily opened a new pull request, #19972:
URL: https://github.com/apache/flink/pull/19972

   ## What is the purpose of the change
   
   ## Brief change log
   
   This task modifies the `flink-connector-pulsar` module, adding some new mechanisms to set the initial consuming position.
   
   - Change `StartCursor`, add new useful methods, rename the confused `fromMessageTime()` method.
   - Introduce a new `SplitAssigner` for assigning the splits among the Pulsar readers.
   - Change the seeking mechanism from Pulsar consumer to Pulsar admin API. Which fixes a lot of issues.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as:
   
   - `PulsarSourceEnumeratorTest`
   - `PulsarOrderedPartitionSplitReaderTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] imaffe commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
imaffe commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r928515414


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This assigner is used for {@link SubscriptionType#Failover} and {@link
+ * SubscriptionType#Exclusive} subscriptions.
+ */
+@Internal
+public class NormalSplitAssigner implements SplitAssigner {
+    private static final long serialVersionUID = 8412586087991597092L;
+
+    private final StopCursor stopCursor;
+    private final boolean enablePartitionDiscovery;
+
+    // These fields would be saved into checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
+    private boolean initialized;
+
+    public NormalSplitAssigner(

Review Comment:
   nit: NormalSplitsAssigner would let me think the others are "abnormal", I would suggest "ExclusiveAndFailoverSplitAssigner" or "NonSharedSplitAssigner" but this name is too long, not desired as well. Any thoughts here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] imaffe commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
imaffe commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r917344225


##########
docs/content.zh/docs/connectors/datastream/pulsar.md:
##########
@@ -429,9 +448,10 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
   {{< /tab >}}
   {{< /tabs >}}
 
-  {{< hint warning >}}
-  StopCursor.atEventTime(long) 目前已经处于弃用状态。
-  {{< /hint >}}
+- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里包含此时间戳的消息。
+  ```java
+  StopCursor.afterPublishTime(long);

Review Comment:
   Nice renaming~ 



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -248,17 +230,19 @@ private void assignPendingPartitionSplits(List<Integer> pendingReaders) {
                 });
 
         // Assign splits to downstream readers.
-        assignmentState.assignSplits(pendingReaders).ifPresent(context::assignSplits);
+        splitAssigner.createAssignment(pendingReaders).ifPresent(context::assignSplits);
 
         // If periodically partition discovery is disabled and the initializing discovery has done,
-        // signal NoMoreSplitsEvent to pending readers
-        if (assignmentState.noMoreNewPartitionSplits()) {
-            LOG.debug(
-                    "No more PulsarPartitionSplits to assign."
-                            + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
-                    pendingReaders,
-                    sourceConfiguration.getSubscriptionDesc());
-            pendingReaders.forEach(this.context::signalNoMoreSplits);
+        // signal NoMoreSplitsEvent to pending readers.
+        for (Integer reader : pendingReaders) {
+            if (splitAssigner.noMoreSplits(reader)) {
+                LOG.debug(
+                        "No more PulsarPartitionSplits to assign."
+                                + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
+                        reader,
+                        sourceConfiguration.getSubscriptionDesc());
+                context.signalNoMoreSplits(reader);

Review Comment:
   When will individual reader receive no more splits ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] imaffe commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
imaffe commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r928513674


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** This assigner is used for {@link SubscriptionType#Shared} subscriptions. */
+@Internal
+public class SharedSplitAssigner implements SplitAssigner {
+    private static final long serialVersionUID = 8468503133499402491L;
+
+    private final StopCursor stopCursor;
+    private final boolean enablePartitionDiscovery;
+
+    // These fields would be saved into checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
+    private final Map<Integer, Set<String>> readerAssignedSplits;
+    private boolean initialized;
+
+    public SharedSplitAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        this.stopCursor = stopCursor;
+        this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
+        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
+        this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits();
+        this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
+        this.initialized = sourceEnumState.isInitialized();
+    }
+
+    @Override
+    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+        List<TopicPartition> newPartitions = new ArrayList<>();
+
+        for (TopicPartition partition : fetchedPartitions) {
+            if (!appendedPartitions.contains(partition)) {
+                appendedPartitions.add(partition);
+                newPartitions.add(partition);
+            }
+        }
+
+        if (!initialized) {
+            initialized = true;
+        }
+
+        return newPartitions;
+    }
+
+    @Override
+    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
+        Set<PulsarPartitionSplit> pendingPartitionSplits =
+                sharedPendingPartitionSplits.computeIfAbsent(subtaskId, id -> new HashSet<>());
+        pendingPartitionSplits.addAll(splits);
+    }
+
+    @Override
+    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
+            List<Integer> readers) {
+        if (readers.isEmpty()) {
+            return Optional.empty();
+        }
+
+        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
+        for (Integer reader : readers) {
+            Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.remove(reader);
+            if (pendingSplits == null) {
+                pendingSplits = new HashSet<>();
+            }
+
+            Set<String> assignedSplits =
+                    readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>());
+
+            for (TopicPartition partition : appendedPartitions) {
+                String partitionName = partition.toString();
+                if (!assignedSplits.contains(partitionName)) {
+                    pendingSplits.add(new PulsarPartitionSplit(partition, stopCursor));
+                    assignedSplits.add(partitionName);
+                }
+            }
+
+            if (!pendingSplits.isEmpty()) {

Review Comment:
   The pendingSplits contains 1. added back splits due to reader crash 2. possible new splits due to newly subscribed topics. Is this correct ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] imaffe commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
imaffe commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r928516418


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This assigner is used for {@link SubscriptionType#Failover} and {@link
+ * SubscriptionType#Exclusive} subscriptions.

Review Comment:
   Shouldn't KEY_SHARED use this splitAssigner as well ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r931924511


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This assigner is used for {@link SubscriptionType#Failover} and {@link
+ * SubscriptionType#Exclusive} subscriptions.

Review Comment:
   Yep



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun merged pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
tisonkun merged PR #19972:
URL: https://github.com/apache/flink/pull/19972


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rmetzger commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
rmetzger commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r941153631


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java:
##########
@@ -33,8 +33,10 @@
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Disabled;
 
 /** Unite test class for {@link PulsarSource}. */
+@Disabled("Could not reproduce on local machine.")

Review Comment:
   Thanks for clarifying. I didn't check the commits after your approval in detail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r915990147


##########
flink-python/pyflink/datastream/connectors/pulsar.py:
##########
@@ -138,10 +138,10 @@ def latest() -> 'StartCursor':
         return StartCursor(JStartCursor.latest())
 
     @staticmethod
-    def from_message_time(timestamp: int) -> 'StartCursor':
+    def from_publish_time(timestamp: int) -> 'StartCursor':

Review Comment:
   Add this breaking change may not sound like a good choice. I add a new method and deprecated the old ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #19972:
URL: https://github.com/apache/flink/pull/19972#issuecomment-1181779188

   @MartijnVisser Thank you. I'll review the patch in this week. Actually I ever try to request myself as a reviewer but forget several times >_<


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r921757700


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java:
##########
@@ -43,23 +44,16 @@ public class MessageIdStartCursor implements StartCursor {
      * code</a> for understanding pulsar internal logic.
      *
      * @param messageId The message id for start position.
-     * @param inclusive Should we include the start message id in consuming result.
+     * @param inclusive Whether we include the start message id in consuming result. This works only
+     *     if we provide a specified message id instead of {@link MessageId#earliest} or {@link
+     *     MessageId#latest}.
      */
     public MessageIdStartCursor(MessageId messageId, boolean inclusive) {
-        if (inclusive) {
-            this.messageId = messageId;
+        MessageIdImpl idImpl = unwrapMessageId(messageId);
+        if (MessageId.earliest.equals(idImpl) || MessageId.latest.equals(idImpl) || inclusive) {
+            this.messageId = idImpl;
         } else {
-            checkState(
-                    messageId instanceof MessageIdImpl,
-                    "We only support normal message id and batch message id.");
-            MessageIdImpl id = (MessageIdImpl) messageId;
-            if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) {
-                this.messageId = messageId;
-            } else {
-                this.messageId =
-                        new MessageIdImpl(
-                                id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex());
-            }
+            this.messageId = nextMessageId(idImpl);

Review Comment:
   Yep. We just reuse the same logic here in favor of DRY.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r941065417


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java:
##########
@@ -33,8 +33,10 @@
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Disabled;
 
 /** Unite test class for {@link PulsarSource}. */
+@Disabled("Could not reproduce on local machine.")

Review Comment:
   > unconfident
   
   There're several follow-up commits including tests disabled. I don't think we should mix up a test fix with test disable.
   
   It seems the current status stuck due to failure cannot reproduce locally. I'll try to help on this point this week. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r932957550


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java:
##########
@@ -18,36 +18,29 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.cursor.stop;
 
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 
 import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
 
 /**
  * A stop cursor that initialize the position to the latest message id. The offsets initialization
  * are taken care of by the {@code PulsarPartitionSplitReaderBase} instead of by the {@code
- * PulsarSourceEnumerator}.
+ * PulsarSourceEnumerator}. We would include the latest message available in Pulsar by default.
  */
-public class LatestMessageStopCursor implements StopCursor {
+public class LatestMessageStopCursor extends MessageIdStopCursor {

Review Comment:
   I'd prefer to `implements StopCursor` and keep `MessageIdStopCursor` holds a final `messageId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r931794267


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** This assigner is used for {@link SubscriptionType#Shared} subscriptions. */

Review Comment:
   They are using `NormalSplitAssigner.java`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r931803283


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+
+/** The factory for creating split assigner. */
+@Internal
+public final class SplitAssignerFactory {
+
+    private SplitAssignerFactory() {
+        // No public constructor.
+    }
+
+    /** Create blank assigner. */
+    public static SplitAssigner create(
+            StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
+        return create(stopCursor, sourceConfiguration, initialState());
+    }
+
+    /** Create assigner from checkpoint state. */
+    public static SplitAssigner create(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
+        if (subscriptionType == Exclusive
+                || subscriptionType == Failover
+                || subscriptionType == Key_Shared) {
+            return new NormalSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+        } else if (subscriptionType == Shared) {
+            return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+        } else {
+            throw new IllegalArgumentException(
+                    "We don't support this subscription type: " + subscriptionType);
+        }

Review Comment:
   I prefer `if else`.



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java:
##########
@@ -43,23 +44,16 @@ public class MessageIdStartCursor implements StartCursor {
      * code</a> for understanding pulsar internal logic.
      *
      * @param messageId The message id for start position.
-     * @param inclusive Should we include the start message id in consuming result.
+     * @param inclusive Whether we include the start message id in consuming result. This works only
+     *     if we provide a specified message id instead of {@link MessageId#earliest} or {@link
+     *     MessageId#latest}.
      */
     public MessageIdStartCursor(MessageId messageId, boolean inclusive) {

Review Comment:
   I prefer using `inclusive` because it has been used in `StartCursor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r932145488


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java:
##########
@@ -32,21 +38,23 @@ public class MessageIdStopCursor implements StopCursor {
 
     private final MessageId messageId;
 
-    private final boolean exclusive;
+    private final boolean inclusive;
 
-    public MessageIdStopCursor(MessageId messageId) {
-        this(messageId, true);
-    }
+    public MessageIdStopCursor(MessageId messageId, boolean inclusive) {
+        MessageIdImpl idImpl = unwrapMessageId(messageId);
+        checkArgument(!earliest.equals(idImpl), "MessageId.earliest is not supported.");
+        checkArgument(
+                !latest.equals(idImpl),
+                "MessageId.latest is not supported, use LatestMessageStopCursor instead.");
 
-    public MessageIdStopCursor(MessageId messageId, boolean exclusive) {
-        this.messageId = messageId;
-        this.exclusive = exclusive;
+        this.messageId = idImpl;
+        this.inclusive = inclusive;
     }
 
     @Override
     public boolean shouldStop(Message<?> message) {
         MessageId id = message.getMessageId();
-        if (exclusive) {
+        if (inclusive) {
             return id.compareTo(messageId) > 0;

Review Comment:
   Nope, this is a wrong use in `PulsarPartitionSplitReaderBase`, we just fix the bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r940792766


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java:
##########
@@ -33,8 +33,10 @@
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Disabled;
 
 /** Unite test class for {@link PulsarSource}. */
+@Disabled("Could not reproduce on local machine.")

Review Comment:
   +1 This is the heaviest and most unstable test of Pulsar connector.
   
   If you simply disable it, you'd better do it in a separate patch with a reference. Otherwise, I'm unconfident to other changes in this patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r921758376


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java:
##########
@@ -52,22 +52,19 @@ public CursorPosition(@Nullable Long timestamp) {
         this.timestamp = timestamp;

Review Comment:
   Yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #19972:
URL: https://github.com/apache/flink/pull/19972#issuecomment-1180062144

   @PatrickRen Can you also have a look at this PR? I've understood that this PR should help resolve this blocker test stability [FLINK-26721](https://issues.apache.org/jira/browse/FLINK-26721)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] imaffe commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
imaffe commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r928515937


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** This assigner is used for {@link SubscriptionType#Shared} subscriptions. */

Review Comment:
    what is our current status for KEY_SHARED subscriptionType ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] imaffe commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
imaffe commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r928523317


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java:
##########
@@ -65,28 +65,48 @@ static StopCursor latest() {
     }
 
     /**
-     * Stop when the messageId is equal or greater than the specified messageId. Message that is
-     * equal to the specified messageId will not be consumed.
+     * Stop consuming when the messageId is equal or greater than the specified messageId. Message
+     * that is equal to the specified messageId will not be consumed.
      */
     static StopCursor atMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(true);
+        } else {
+            return new MessageIdStopCursor(messageId);
+        }
     }
 
     /**
-     * Stop when the messageId is greater than the specified messageId. Message that is equal to the
-     * specified messageId will be consumed.
+     * Stop consuming when the messageId is greater than the specified messageId. Message that is
+     * equal to the specified messageId will be consumed.
      */
     static StopCursor afterMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId, false);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(false);
+        } else {
+            return new MessageIdStopCursor(messageId, false);
+        }
     }
 
-    @Deprecated
+    /** Stop consuming when message eventTime is greater than or equals the specified timestamp. */
     static StopCursor atEventTime(long timestamp) {
         return new EventTimestampStopCursor(timestamp);
     }
 
-    /** Stop when message publishTime is greater than the specified timestamp. */
+    /** Stop consuming when message eventTime is greater than the specified timestamp. */
+    static StopCursor afterEventTime(long timestamp) {
+        return new EventTimestampStopCursor(timestamp + 1);
+    }
+
+    /**
+     * Stop consuming when message publishTime is greater than or equals the specified timestamp.
+     */
     static StopCursor atPublishTime(long timestamp) {
         return new PublishTimestampStopCursor(timestamp);
     }
+
+    /** Stop consuming when message publishTime is greater than the specified timestamp. */
+    static StopCursor afterPublishTime(long timestamp) {
+        return new PublishTimestampStopCursor(timestamp + 1);
+    }

Review Comment:
   To me,at vs after is similar to how exclusive field. What would be the semantics if we add the exclusive field ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #19972:
URL: https://github.com/apache/flink/pull/19972#issuecomment-1181764921

   @tisonkun I've understood from @wuchong that you might also want to help/have a look at this PR, therefore I've tagged you. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on PR #19972:
URL: https://github.com/apache/flink/pull/19972#issuecomment-1213014404

   @tisonkun Finally, the ci turns green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] imaffe commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
imaffe commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r928515414


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This assigner is used for {@link SubscriptionType#Failover} and {@link
+ * SubscriptionType#Exclusive} subscriptions.
+ */
+@Internal
+public class NormalSplitAssigner implements SplitAssigner {
+    private static final long serialVersionUID = 8412586087991597092L;
+
+    private final StopCursor stopCursor;
+    private final boolean enablePartitionDiscovery;
+
+    // These fields would be saved into checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
+    private boolean initialized;
+
+    public NormalSplitAssigner(

Review Comment:
   nit: NormalSplitsAssigner would let me think the others are "abnormal", I would suggest "ExclusiveAndFailoverSplitAssigner" but this name is too long, not desired as well. Any thoughts here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r921051194


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java:
##########
@@ -80,12 +80,18 @@ protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consum
 
         // Reset the start position for ordered pulsar consumer.
         if (latestConsumedId != null) {
-            StartCursor startCursor = StartCursor.fromMessageId(latestConsumedId, false);
-            TopicPartition partition = split.getPartition();
-
+            LOG.debug("Start seeking from the checkpoint {}", latestConsumedId);
             try {
-                startCursor.seekPosition(
-                        partition.getTopic(), partition.getPartitionId(), consumer);
+                MessageId initialPosition;
+                if (latestConsumedId == MessageId.latest
+                        || latestConsumedId == MessageId.earliest) {
+                    // This logic is added only the compatible.

Review Comment:
   ```suggestion
                       //  for compatibility
   ```



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.cursor;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The helper class for Pulsar's message id. */
+@Internal
+public final class MessageIdUtils {
+
+    private MessageIdUtils() {
+        // No public constructor.
+    }
+
+    /**
+     * The implementation from the <a
+     * href="https://github.com/apache/pulsar/blob/7c8dc3201baad7d02d886dbc26db5c03abce77d6/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java#L85">this
+     * code</a> to get the next message id.
+     */

Review Comment:
   ```suggestion
       /**
        * The implementation from <a
        * href="https://github.com/apache/pulsar/blob/7c8dc3201baad7d02d886dbc26db5c03abce77d6/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java#L85">this
        * code snippet</a> to get next message id.
        */
   ```



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+
+/** The factory for creating split assigner. */
+@Internal
+public final class SplitAssignerFactory {
+
+    private SplitAssignerFactory() {
+        // No public constructor.
+    }
+
+    /** Create blank assigner. */
+    public static SplitAssigner create(
+            StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
+        return create(stopCursor, sourceConfiguration, initialState());
+    }
+
+    /** Create assigner from checkpoint state. */
+    public static SplitAssigner create(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
+        if (subscriptionType == Exclusive
+                || subscriptionType == Failover
+                || subscriptionType == Key_Shared) {
+            return new NormalSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+        } else if (subscriptionType == Shared) {
+            return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+        } else {
+            throw new IllegalArgumentException(
+                    "We don't support this subscription type: " + subscriptionType);
+        }

Review Comment:
   ```suggestion
           switch (subscriptionType) {
               case Exclusive:
               case Failover:
               case Key_Shared:
                   return new NormalSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
               case Shared:
                   return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
               default:
                   throw new IllegalArgumentException("Unknown subscription type: " + subscriptionType);
           }
   ```



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -248,17 +230,19 @@ private void assignPendingPartitionSplits(List<Integer> pendingReaders) {
                 });
 
         // Assign splits to downstream readers.
-        assignmentState.assignSplits(pendingReaders).ifPresent(context::assignSplits);
+        splitAssigner.createAssignment(pendingReaders).ifPresent(context::assignSplits);
 
         // If periodically partition discovery is disabled and the initializing discovery has done,
-        // signal NoMoreSplitsEvent to pending readers
-        if (assignmentState.noMoreNewPartitionSplits()) {
-            LOG.debug(
-                    "No more PulsarPartitionSplits to assign."
-                            + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
-                    pendingReaders,
-                    sourceConfiguration.getSubscriptionDesc());
-            pendingReaders.forEach(this.context::signalNoMoreSplits);
+        // signal NoMoreSplitsEvent to pending readers.
+        for (Integer reader : pendingReaders) {
+            if (splitAssigner.noMoreSplits(reader)) {
+                LOG.debug(
+                        "No more PulsarPartitionSplits to assign."
+                                + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
+                        reader,
+                        sourceConfiguration.getSubscriptionDesc());
+                context.signalNoMoreSplits(reader);

Review Comment:
   Here is no explicit happens-before relation. The event will be queued into source operator event thread and when it gets processed the source reader sets its internal state, which will be tested and then generates `InputStatus.END_OF_INPUT`.



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java:
##########
@@ -43,23 +44,16 @@ public class MessageIdStartCursor implements StartCursor {
      * code</a> for understanding pulsar internal logic.
      *
      * @param messageId The message id for start position.
-     * @param inclusive Should we include the start message id in consuming result.
+     * @param inclusive Whether we include the start message id in consuming result. This works only
+     *     if we provide a specified message id instead of {@link MessageId#earliest} or {@link
+     *     MessageId#latest}.
      */
     public MessageIdStartCursor(MessageId messageId, boolean inclusive) {
-        if (inclusive) {
-            this.messageId = messageId;
+        MessageIdImpl idImpl = unwrapMessageId(messageId);
+        if (MessageId.earliest.equals(idImpl) || MessageId.latest.equals(idImpl) || inclusive) {
+            this.messageId = idImpl;
         } else {
-            checkState(
-                    messageId instanceof MessageIdImpl,
-                    "We only support normal message id and batch message id.");
-            MessageIdImpl id = (MessageIdImpl) messageId;
-            if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) {
-                this.messageId = messageId;
-            } else {
-                this.messageId =
-                        new MessageIdImpl(
-                                id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex());
-            }
+            this.messageId = nextMessageId(idImpl);

Review Comment:
   @syhily IIRC you ever told me that we change the logic to get next message id in this PR. But it seems just the same before and after? Or `if (idImpl.getEntryId() < 0)` significant?



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java:
##########
@@ -30,13 +30,26 @@
 /**
  * A stop cursor that initialize the position to the latest message id. The offsets initialization
  * are taken care of by the {@code PulsarPartitionSplitReaderBase} instead of by the {@code
- * PulsarSourceEnumerator}.
+ * PulsarSourceEnumerator}. We would include the latest message available in Pulsar by default.
  */
 public class LatestMessageStopCursor implements StopCursor {
     private static final long serialVersionUID = 1702059838323965723L;
 
     private MessageId messageId;
 
+    /**
+     * Set this to false would include the latest available message when the flink pipeline start.
+     */
+    private final boolean exclusive;
+
+    public LatestMessageStopCursor() {
+        this.exclusive = false;
+    }

Review Comment:
   ```suggestion
   ```
   
   I tend to remove this constructor as `exclusive = false` is not an intuitive default.



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java:
##########
@@ -52,22 +52,19 @@ public CursorPosition(@Nullable Long timestamp) {
         this.timestamp = timestamp;

Review Comment:
   In these two constructors, the argument should be `Nonnull`?



##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.defaultStopCursor;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test utils for split assigners. */
+abstract class SplitAssignerTestBase<T extends SplitAssigner> {

Review Comment:
   ```suggestion
   abstract class SplitAssignerTestBase<T extends SplitAssigner> extends TestLogger {
   ```



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java:
##########
@@ -43,23 +44,16 @@ public class MessageIdStartCursor implements StartCursor {
      * code</a> for understanding pulsar internal logic.
      *
      * @param messageId The message id for start position.
-     * @param inclusive Should we include the start message id in consuming result.
+     * @param inclusive Whether we include the start message id in consuming result. This works only
+     *     if we provide a specified message id instead of {@link MessageId#earliest} or {@link
+     *     MessageId#latest}.
      */
     public MessageIdStartCursor(MessageId messageId, boolean inclusive) {

Review Comment:
   Can we keep consistency to take an `exclusive` argument?



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java:
##########
@@ -65,28 +65,48 @@ static StopCursor latest() {
     }
 
     /**
-     * Stop when the messageId is equal or greater than the specified messageId. Message that is
-     * equal to the specified messageId will not be consumed.
+     * Stop consuming when the messageId is equal or greater than the specified messageId. Message
+     * that is equal to the specified messageId will not be consumed.
      */
     static StopCursor atMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(true);
+        } else {
+            return new MessageIdStopCursor(messageId);
+        }
     }
 
     /**
-     * Stop when the messageId is greater than the specified messageId. Message that is equal to the
-     * specified messageId will be consumed.
+     * Stop consuming when the messageId is greater than the specified messageId. Message that is
+     * equal to the specified messageId will be consumed.
      */
     static StopCursor afterMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId, false);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(false);
+        } else {
+            return new MessageIdStopCursor(messageId, false);
+        }
     }
 
-    @Deprecated
+    /** Stop consuming when message eventTime is greater than or equals the specified timestamp. */
     static StopCursor atEventTime(long timestamp) {
         return new EventTimestampStopCursor(timestamp);
     }
 
-    /** Stop when message publishTime is greater than the specified timestamp. */
+    /** Stop consuming when message eventTime is greater than the specified timestamp. */
+    static StopCursor afterEventTime(long timestamp) {
+        return new EventTimestampStopCursor(timestamp + 1);
+    }
+
+    /**
+     * Stop consuming when message publishTime is greater than or equals the specified timestamp.
+     */
     static StopCursor atPublishTime(long timestamp) {
         return new PublishTimestampStopCursor(timestamp);
     }
+
+    /** Stop consuming when message publishTime is greater than the specified timestamp. */
+    static StopCursor afterPublishTime(long timestamp) {
+        return new PublishTimestampStopCursor(timestamp + 1);
+    }

Review Comment:
   May these constructors have an `exclusive` field?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r931924203


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** This assigner is used for {@link SubscriptionType#Shared} subscriptions. */
+@Internal
+public class SharedSplitAssigner implements SplitAssigner {
+    private static final long serialVersionUID = 8468503133499402491L;
+
+    private final StopCursor stopCursor;
+    private final boolean enablePartitionDiscovery;
+
+    // These fields would be saved into checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
+    private final Map<Integer, Set<String>> readerAssignedSplits;
+    private boolean initialized;
+
+    public SharedSplitAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        this.stopCursor = stopCursor;
+        this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
+        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
+        this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits();
+        this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
+        this.initialized = sourceEnumState.isInitialized();
+    }
+
+    @Override
+    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+        List<TopicPartition> newPartitions = new ArrayList<>();
+
+        for (TopicPartition partition : fetchedPartitions) {
+            if (!appendedPartitions.contains(partition)) {
+                appendedPartitions.add(partition);
+                newPartitions.add(partition);
+            }
+        }
+
+        if (!initialized) {
+            initialized = true;
+        }
+
+        return newPartitions;
+    }
+
+    @Override
+    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
+        Set<PulsarPartitionSplit> pendingPartitionSplits =
+                sharedPendingPartitionSplits.computeIfAbsent(subtaskId, id -> new HashSet<>());
+        pendingPartitionSplits.addAll(splits);
+    }
+
+    @Override
+    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
+            List<Integer> readers) {
+        if (readers.isEmpty()) {
+            return Optional.empty();
+        }
+
+        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
+        for (Integer reader : readers) {
+            Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.remove(reader);
+            if (pendingSplits == null) {
+                pendingSplits = new HashSet<>();
+            }
+
+            Set<String> assignedSplits =
+                    readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>());
+
+            for (TopicPartition partition : appendedPartitions) {
+                String partitionName = partition.toString();
+                if (!assignedSplits.contains(partitionName)) {
+                    pendingSplits.add(new PulsarPartitionSplit(partition, stopCursor));
+                    assignedSplits.add(partitionName);
+                }
+            }
+
+            if (!pendingSplits.isEmpty()) {

Review Comment:
   Yep



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r932127024


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java:
##########
@@ -32,21 +38,23 @@ public class MessageIdStopCursor implements StopCursor {
 
     private final MessageId messageId;
 
-    private final boolean exclusive;
+    private final boolean inclusive;
 
-    public MessageIdStopCursor(MessageId messageId) {
-        this(messageId, true);
-    }
+    public MessageIdStopCursor(MessageId messageId, boolean inclusive) {
+        MessageIdImpl idImpl = unwrapMessageId(messageId);
+        checkArgument(!earliest.equals(idImpl), "MessageId.earliest is not supported.");
+        checkArgument(
+                !latest.equals(idImpl),
+                "MessageId.latest is not supported, use LatestMessageStopCursor instead.");
 
-    public MessageIdStopCursor(MessageId messageId, boolean exclusive) {
-        this.messageId = messageId;
-        this.exclusive = exclusive;
+        this.messageId = idImpl;
+        this.inclusive = inclusive;
     }
 
     @Override
     public boolean shouldStop(Message<?> message) {
         MessageId id = message.getMessageId();
-        if (exclusive) {
+        if (inclusive) {
             return id.compareTo(messageId) > 0;

Review Comment:
   It seems we somehow revert the boolean logic here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r931794018


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This assigner is used for {@link SubscriptionType#Failover} and {@link
+ * SubscriptionType#Exclusive} subscriptions.
+ */
+@Internal
+public class NormalSplitAssigner implements SplitAssigner {
+    private static final long serialVersionUID = 8412586087991597092L;
+
+    private final StopCursor stopCursor;
+    private final boolean enablePartitionDiscovery;
+
+    // These fields would be saved into checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
+    private boolean initialized;
+
+    public NormalSplitAssigner(

Review Comment:
   Cool. This would be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19972: [FLINK-27399][Connector/Pulsar] Modify start cursor and stop cursor, change initial position setting logic.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19972:
URL: https://github.com/apache/flink/pull/19972#issuecomment-1156690944

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "da9be03ef90e2e12873a3ef5c72996f2fa8fa964",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "da9be03ef90e2e12873a3ef5c72996f2fa8fa964",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * da9be03ef90e2e12873a3ef5c72996f2fa8fa964 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #19972:
URL: https://github.com/apache/flink/pull/19972#issuecomment-1191425615

   @syhily Can you resolve the latest review comments? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] deadwind4 commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
deadwind4 commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r902125914


##########
flink-python/pyflink/datastream/connectors/pulsar.py:
##########
@@ -138,10 +138,10 @@ def latest() -> 'StartCursor':
         return StartCursor(JStartCursor.latest())
 
     @staticmethod
-    def from_message_time(timestamp: int) -> 'StartCursor':
+    def from_publish_time(timestamp: int) -> 'StartCursor':

Review Comment:
   Should we like this to keep forward-compatibility?
   
   https://github.com/apache/flink/blob/314e276f6c6bff990e82515d6ce90fd6a7c9561d/flink-python/pyflink/datastream/connectors/pulsar.py#L192-L207



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r913230836


##########
flink-python/pyflink/datastream/connectors/pulsar.py:
##########
@@ -138,10 +138,10 @@ def latest() -> 'StartCursor':
         return StartCursor(JStartCursor.latest())
 
     @staticmethod
-    def from_message_time(timestamp: int) -> 'StartCursor':
+    def from_publish_time(timestamp: int) -> 'StartCursor':

Review Comment:
   We have changed the MessgeTime to PublishTime in Java API. This should be a mistake in both Java and Python implementation. So I just introduce this breaking change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r921758152


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java:
##########
@@ -65,28 +65,48 @@ static StopCursor latest() {
     }
 
     /**
-     * Stop when the messageId is equal or greater than the specified messageId. Message that is
-     * equal to the specified messageId will not be consumed.
+     * Stop consuming when the messageId is equal or greater than the specified messageId. Message
+     * that is equal to the specified messageId will not be consumed.
      */
     static StopCursor atMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(true);
+        } else {
+            return new MessageIdStopCursor(messageId);
+        }
     }
 
     /**
-     * Stop when the messageId is greater than the specified messageId. Message that is equal to the
-     * specified messageId will be consumed.
+     * Stop consuming when the messageId is greater than the specified messageId. Message that is
+     * equal to the specified messageId will be consumed.
      */
     static StopCursor afterMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId, false);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(false);
+        } else {
+            return new MessageIdStopCursor(messageId, false);
+        }
     }
 
-    @Deprecated
+    /** Stop consuming when message eventTime is greater than or equals the specified timestamp. */
     static StopCursor atEventTime(long timestamp) {
         return new EventTimestampStopCursor(timestamp);
     }
 
-    /** Stop when message publishTime is greater than the specified timestamp. */
+    /** Stop consuming when message eventTime is greater than the specified timestamp. */
+    static StopCursor afterEventTime(long timestamp) {
+        return new EventTimestampStopCursor(timestamp + 1);
+    }
+
+    /**
+     * Stop consuming when message publishTime is greater than or equals the specified timestamp.
+     */
     static StopCursor atPublishTime(long timestamp) {
         return new PublishTimestampStopCursor(timestamp);
     }
+
+    /** Stop consuming when message publishTime is greater than the specified timestamp. */
+    static StopCursor afterPublishTime(long timestamp) {
+        return new PublishTimestampStopCursor(timestamp + 1);
+    }

Review Comment:
   Yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r921758247


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java:
##########
@@ -43,23 +44,16 @@ public class MessageIdStartCursor implements StartCursor {
      * code</a> for understanding pulsar internal logic.
      *
      * @param messageId The message id for start position.
-     * @param inclusive Should we include the start message id in consuming result.
+     * @param inclusive Whether we include the start message id in consuming result. This works only
+     *     if we provide a specified message id instead of {@link MessageId#earliest} or {@link
+     *     MessageId#latest}.
      */
     public MessageIdStartCursor(MessageId messageId, boolean inclusive) {

Review Comment:
   Yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r921758152


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java:
##########
@@ -65,28 +65,48 @@ static StopCursor latest() {
     }
 
     /**
-     * Stop when the messageId is equal or greater than the specified messageId. Message that is
-     * equal to the specified messageId will not be consumed.
+     * Stop consuming when the messageId is equal or greater than the specified messageId. Message
+     * that is equal to the specified messageId will not be consumed.
      */
     static StopCursor atMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(true);
+        } else {
+            return new MessageIdStopCursor(messageId);
+        }
     }
 
     /**
-     * Stop when the messageId is greater than the specified messageId. Message that is equal to the
-     * specified messageId will be consumed.
+     * Stop consuming when the messageId is greater than the specified messageId. Message that is
+     * equal to the specified messageId will be consumed.
      */
     static StopCursor afterMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId, false);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(false);
+        } else {
+            return new MessageIdStopCursor(messageId, false);
+        }
     }
 
-    @Deprecated
+    /** Stop consuming when message eventTime is greater than or equals the specified timestamp. */
     static StopCursor atEventTime(long timestamp) {
         return new EventTimestampStopCursor(timestamp);
     }
 
-    /** Stop when message publishTime is greater than the specified timestamp. */
+    /** Stop consuming when message eventTime is greater than the specified timestamp. */
+    static StopCursor afterEventTime(long timestamp) {
+        return new EventTimestampStopCursor(timestamp + 1);
+    }
+
+    /**
+     * Stop consuming when message publishTime is greater than or equals the specified timestamp.
+     */
     static StopCursor atPublishTime(long timestamp) {
         return new PublishTimestampStopCursor(timestamp);
     }
+
+    /** Stop consuming when message publishTime is greater than the specified timestamp. */
+    static StopCursor afterPublishTime(long timestamp) {
+        return new PublishTimestampStopCursor(timestamp + 1);
+    }

Review Comment:
   Yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r934562396


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java:
##########
@@ -33,8 +33,10 @@
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Disabled;
 
 /** Unite test class for {@link PulsarSource}. */
+@Disabled("Could not reproduce on local machine.")

Review Comment:
   Will you create a follow-up Jira ticket to address this issue? We should not disable a test without any reference on why and what. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rmetzger commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
rmetzger commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r940997077


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java:
##########
@@ -33,8 +33,10 @@
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Disabled;
 
 /** Unite test class for {@link PulsarSource}. */
+@Disabled("Could not reproduce on local machine.")

Review Comment:
   Confident or unconfident? I think you approved this pull request earlier?
   
   > you'd better do it in a separate patch with a reference
   I agree. It would be nice to have a JIRA ticket to re-enable this, mention the JIRA in the `@Disabled` annotation and do this in a separate commit (but it can be in this PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #19972: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r944089759


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java:
##########
@@ -91,8 +91,15 @@ protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consum
                     initialPosition = nextMessageId(latestConsumedId);
                 }
 
-                consumer.seek(initialPosition);
-            } catch (PulsarClientException e) {
+                // Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
+                // See https://github.com/apache/pulsar/issues/16757 for more details.
+                pulsarAdmin
+                        .topics()
+                        .resetCursor(
+                                split.getPartition().getFullTopicName(),
+                                sourceConfiguration.getSubscriptionName(),
+                                initialPosition);
+            } catch (PulsarAdminException e) {

Review Comment:
   Cool.
   
   It seems CI failed on Kafka tests. But you may retrigger it and we can merge this one once CI gives green.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org