You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/08 04:21:35 UTC

[GitHub] [pulsar] k2la opened a new pull request #7198: [pulsar-client-cpp] Support Seek on Partitioned Topic

k2la opened a new pull request #7198:
URL: https://github.com/apache/pulsar/pull/7198


   ### Modifications
   Support seek on partitioned topic.
   
   ### Verifying this change
   - [ ] Make sure that the change passes the CI checks.
   - [ ] Make sure that seeking on partitioned topic is working.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #7198: [pulsar-client-cpp] Support Seek on Partitioned Topic

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #7198:
URL: https://github.com/apache/pulsar/pull/7198#discussion_r436455202



##########
File path: pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
##########
@@ -542,11 +542,27 @@ void PartitionedConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerS
 }
 
 void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
-    callback(ResultOperationNotSupported);
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+    stateLock.unlock();
+    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
+        (*i)->seekAsync(msgId, callback);

Review comment:
       The message id is only relative to 1 partition. Seeking with same message id on multiple partition will position the subscription in the wrong position.

##########
File path: pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
##########
@@ -542,11 +542,27 @@ void PartitionedConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerS
 }
 
 void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
-    callback(ResultOperationNotSupported);
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+    stateLock.unlock();
+    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
+        (*i)->seekAsync(msgId, callback);
+    }
 }
 
 void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
-    callback(ResultOperationNotSupported);
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);

Review comment:
       The mutex should be released before triggering the callback, since that might be blocking




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] k2la commented on pull request #7198: [pulsar-client-cpp] Support Seek on Partitioned Topic by Time

Posted by GitBox <gi...@apache.org>.
k2la commented on pull request #7198:
URL: https://github.com/apache/pulsar/pull/7198#issuecomment-640588277


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #7198: [pulsar-client-cpp] Support Seek on Partitioned Topic by Time

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7198:
URL: https://github.com/apache/pulsar/pull/7198


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] k2la commented on pull request #7198: [pulsar-client-cpp] Support Seek on Partitioned Topic by Time

Posted by GitBox <gi...@apache.org>.
k2la commented on pull request #7198:
URL: https://github.com/apache/pulsar/pull/7198#issuecomment-640481702


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] k2la commented on a change in pull request #7198: [pulsar-client-cpp] Support Seek on Partitioned Topic by Time

Posted by GitBox <gi...@apache.org>.
k2la commented on a change in pull request #7198:
URL: https://github.com/apache/pulsar/pull/7198#discussion_r436486338



##########
File path: pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
##########
@@ -542,11 +542,27 @@ void PartitionedConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerS
 }
 
 void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
-    callback(ResultOperationNotSupported);
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+    stateLock.unlock();
+    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
+        (*i)->seekAsync(msgId, callback);

Review comment:
       I understood that.
   This PR decides to support only seeking on partitioned topic by time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] k2la commented on a change in pull request #7198: [pulsar-client-cpp] Support Seek on Partitioned Topic by Time

Posted by GitBox <gi...@apache.org>.
k2la commented on a change in pull request #7198:
URL: https://github.com/apache/pulsar/pull/7198#discussion_r436486338



##########
File path: pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
##########
@@ -542,11 +542,27 @@ void PartitionedConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerS
 }
 
 void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
-    callback(ResultOperationNotSupported);
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+    stateLock.unlock();
+    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
+        (*i)->seekAsync(msgId, callback);

Review comment:
       I understood that.
   This PR decides to support only seeking on partitioned topic by time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org