You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/12 13:16:40 UTC

[GitHub] [pulsar] 315157973 opened a new pull request #7518: Support partitioned topics in the Reader

315157973 opened a new pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518


   
   
   Fixes #<xyz>
   
   *(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)*
   
   Master Issue: #7265
   
   ### Motivation
   Support partitioned topics in the Reader
   
   ### Modifications
   class relationship:
   PulsarClientImpl -> MultiTopicsReaderImpl -> MultiTopicsConsumerImpl -> ConsumerImpl
   
   PulsarClientImpl support build MultiTopicsReader
   MultiTopicsReader wraps one MultiTopicsConsumerImpl 
   MultiTopicsConsumerImpl contains multiple consumerImpl
   
   Make MultiTopicsConsumerImpl support seek by messageId
   
   seek by time:
   All consumerImpl seek by time, Reader can get all the messages returned by each partition
   
   seek by message:
   1)When seek by latest/earliest,all partitions seek by latest/earliest,Reader can get all the messages returned by each partition
   2)When the messageId contains explicit partition information, it will only receive messages from this partition, and will not receive messages from other partitions.To avoid other partitions returning messages in inclusive mode, I make other consumer seek to latest and add a ignoredConsumersSet to ignore messages from other partitions .
   
   ### Verifying this change
   unit tests:
   TopicReaderTest
   MultiTopicsReaderTest
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-698814894


   /pulsarbot run-failure-checks
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] slinstaedt commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
slinstaedt commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-701702369


   Seeking to a specific MessageId seems to be mandatory though. How should one resume reading otherwise? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-698826599


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-699279959


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-698851687


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
jiazhai commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-695954288


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-698320480






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-698800772


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-725446833


   @sijie @jiazhai Please help review this PR, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-725446465


   @slinstaedt We will support the reader to seek to the earliest position or the latest position first. Seek to a specific position is confusing in the partitioned reader, we can try to support later since there is not a good idea yet.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-726723032


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-726390256


   @315157973 Can you rebase it to the latest master?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
jiazhai commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-695954288


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#discussion_r491861050



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -681,12 +713,45 @@ public void seek(long timestamp) throws PulsarClientException {
         try {
             seekAsync(timestamp).get();
         } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
     @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
-        return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on topics consumer"));
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        MessageIdImpl targetMessageId = MessageIdImpl.convertToMessageIdImpl(messageId);
+        if (targetMessageId == null || isIllegalMultiTopicsMessageId(messageId)) {
+            resultFuture.completeExceptionally(
+                    new PulsarClientException("Illegal messageId, messageId can only be earliest、latest or determine partition"));
+            return resultFuture;
+        }
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
+        consumers.values().forEach(consumerImpl -> {
+            if (MessageId.latest.equals(messageId) || MessageId.earliest.equals(messageId)
+                    || consumerImpl.getPartitionIndex() == targetMessageId.getPartitionIndex()) {

Review comment:
       If the internal topics of the multiple topics consumer  with the same partition index but different topic name, it will introduce some problems here?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -81,6 +83,9 @@
     // shared incoming queue was full
     private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
 
+    //When consumerImpl is added to the set, its messages will be ignored
+    private final ConcurrentOpenHashSet<ConsumerImpl<T>> ignoredConsumers;

Review comment:
       Why need a set for storing ignored consumers here? I noticed if a consumer is added to this set, the message will be acked and users will miss messages of this consumer. Is it an expected behavior?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -1272,4 +1389,35 @@ public Timeout getPartitionsAutoUpdateTimeout() {
     }
 
     private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
+
+    public boolean addIgnoreConsumer(ConsumerImpl<T> consumer) {
+        return ignoredConsumers.add(consumer);
+    }
+
+    public boolean removeIgnoreConsumer(ConsumerImpl<T> consumer) {
+        return ignoredConsumers.remove(consumer);
+    }
+
+    public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {
+        //only support earliest/latest and messageId contains certain partition info
+        if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) {
+            return false;
+        }
+        MessageIdImpl messageIdImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
+        if (messageIdImpl != null && messageIdImpl.getPartitionIndex() >= 0 && messageIdImpl.getLedgerId() >= 0
+                && messageIdImpl.getEntryId() >= 0) {
+            return false;
+        }
+        return true;
+    }
+
+    public void tryAcknowledgeMessage(Message<T> msg) {
+        if (msg != null) {
+            BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()));
+            //Non-batching messages ack every time, batchMessage only need to ack the last one to avoid multi recycle
+            if (batchMessageId.getBatchIndex() < 0 || batchMessageId.getBatchSize() - 1 == batchMessageId.getBatchIndex()) {
+                acknowledgeCumulativeAsync(msg);

Review comment:
       I think here also assume that all topics of the multiple topics consumer are under a partitioned topic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sollecitom commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
sollecitom commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-694219738


   Any progress on this? :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#discussion_r491861050



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -681,12 +713,45 @@ public void seek(long timestamp) throws PulsarClientException {
         try {
             seekAsync(timestamp).get();
         } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
     @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
-        return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on topics consumer"));
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        MessageIdImpl targetMessageId = MessageIdImpl.convertToMessageIdImpl(messageId);
+        if (targetMessageId == null || isIllegalMultiTopicsMessageId(messageId)) {
+            resultFuture.completeExceptionally(
+                    new PulsarClientException("Illegal messageId, messageId can only be earliest、latest or determine partition"));
+            return resultFuture;
+        }
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
+        consumers.values().forEach(consumerImpl -> {
+            if (MessageId.latest.equals(messageId) || MessageId.earliest.equals(messageId)
+                    || consumerImpl.getPartitionIndex() == targetMessageId.getPartitionIndex()) {

Review comment:
       If the internal topics of the multiple topics consumer  with the same partition index but different topic name, it will introduce some problems here?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -81,6 +83,9 @@
     // shared incoming queue was full
     private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
 
+    //When consumerImpl is added to the set, its messages will be ignored
+    private final ConcurrentOpenHashSet<ConsumerImpl<T>> ignoredConsumers;

Review comment:
       Why need a set for storing ignored consumers here? I noticed if a consumer is added to this set, the message will be acked and users will miss messages of this consumer. Is it an expected behavior?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -1272,4 +1389,35 @@ public Timeout getPartitionsAutoUpdateTimeout() {
     }
 
     private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
+
+    public boolean addIgnoreConsumer(ConsumerImpl<T> consumer) {
+        return ignoredConsumers.add(consumer);
+    }
+
+    public boolean removeIgnoreConsumer(ConsumerImpl<T> consumer) {
+        return ignoredConsumers.remove(consumer);
+    }
+
+    public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {
+        //only support earliest/latest and messageId contains certain partition info
+        if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) {
+            return false;
+        }
+        MessageIdImpl messageIdImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
+        if (messageIdImpl != null && messageIdImpl.getPartitionIndex() >= 0 && messageIdImpl.getLedgerId() >= 0
+                && messageIdImpl.getEntryId() >= 0) {
+            return false;
+        }
+        return true;
+    }
+
+    public void tryAcknowledgeMessage(Message<T> msg) {
+        if (msg != null) {
+            BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()));
+            //Non-batching messages ack every time, batchMessage only need to ack the last one to avoid multi recycle
+            if (batchMessageId.getBatchIndex() < 0 || batchMessageId.getBatchSize() - 1 == batchMessageId.getBatchIndex()) {
+                acknowledgeCumulativeAsync(msg);

Review comment:
       I think here also assume that all topics of the multiple topics consumer are under a partitioned topic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-695900760


   @sollecitom : @jiazhai @codelipenghui will review this pull request today


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-698320480


   There are still some details of this requirement that need to be discussed:
   Suppose there is a partitioned topic that contains 3 partitions.
   1) If a `MessageId` belongs to `partition-1`, when the reader sets startMessageId, how should the messages of the other two partitions be processed? earliest, latest...?
   2) If the user first seeks (earliest) and then seeks a certain `MessageId`, how should the messages of the other two partitions be processed? Forbid the other 2 partitions to return messages? Or continue to let the other 2 partitions return messages?
   
   Thanks for @codelipenghui 's suggestion:
   In this PR, for MultiTopicReader, we only support seek `earliest` and `latest` , and do not support seek with a specific `MessageId`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-699656947


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #7518: Support partitioned topics in the Reader

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#issuecomment-695900760


   @sollecitom : @jiazhai @codelipenghui will review this pull request today


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org