You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/09/21 08:28:56 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #7518: Support partitioned topics in the Reader

codelipenghui commented on a change in pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#discussion_r491861050



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -681,12 +713,45 @@ public void seek(long timestamp) throws PulsarClientException {
         try {
             seekAsync(timestamp).get();
         } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
     @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
-        return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on topics consumer"));
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        MessageIdImpl targetMessageId = MessageIdImpl.convertToMessageIdImpl(messageId);
+        if (targetMessageId == null || isIllegalMultiTopicsMessageId(messageId)) {
+            resultFuture.completeExceptionally(
+                    new PulsarClientException("Illegal messageId, messageId can only be earliest、latest or determine partition"));
+            return resultFuture;
+        }
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
+        consumers.values().forEach(consumerImpl -> {
+            if (MessageId.latest.equals(messageId) || MessageId.earliest.equals(messageId)
+                    || consumerImpl.getPartitionIndex() == targetMessageId.getPartitionIndex()) {

Review comment:
       If the internal topics of the multiple topics consumer  with the same partition index but different topic name, it will introduce some problems here?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -81,6 +83,9 @@
     // shared incoming queue was full
     private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
 
+    //When consumerImpl is added to the set, its messages will be ignored
+    private final ConcurrentOpenHashSet<ConsumerImpl<T>> ignoredConsumers;

Review comment:
       Why need a set for storing ignored consumers here? I noticed if a consumer is added to this set, the message will be acked and users will miss messages of this consumer. Is it an expected behavior?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -1272,4 +1389,35 @@ public Timeout getPartitionsAutoUpdateTimeout() {
     }
 
     private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
+
+    public boolean addIgnoreConsumer(ConsumerImpl<T> consumer) {
+        return ignoredConsumers.add(consumer);
+    }
+
+    public boolean removeIgnoreConsumer(ConsumerImpl<T> consumer) {
+        return ignoredConsumers.remove(consumer);
+    }
+
+    public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {
+        //only support earliest/latest and messageId contains certain partition info
+        if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) {
+            return false;
+        }
+        MessageIdImpl messageIdImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
+        if (messageIdImpl != null && messageIdImpl.getPartitionIndex() >= 0 && messageIdImpl.getLedgerId() >= 0
+                && messageIdImpl.getEntryId() >= 0) {
+            return false;
+        }
+        return true;
+    }
+
+    public void tryAcknowledgeMessage(Message<T> msg) {
+        if (msg != null) {
+            BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()));
+            //Non-batching messages ack every time, batchMessage only need to ack the last one to avoid multi recycle
+            if (batchMessageId.getBatchIndex() < 0 || batchMessageId.getBatchSize() - 1 == batchMessageId.getBatchIndex()) {
+                acknowledgeCumulativeAsync(msg);

Review comment:
       I think here also assume that all topics of the multiple topics consumer are under a partitioned topic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org