You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/08 14:13:22 UTC

[pulsar] branch master updated: [pulsar-client-cpp] Support Seek on Partitioned Topic by Time (#7198)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2add228d [pulsar-client-cpp] Support Seek on Partitioned Topic by Time (#7198)
2add228d is described below

commit 2add228dd39ecf480179c16abaf87871461c4458
Author: k2la <mz...@gmail.com>
AuthorDate: Mon Jun 8 23:13:10 2020 +0900

    [pulsar-client-cpp] Support Seek on Partitioned Topic by Time (#7198)
---
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 11 +++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 77 ++++++++++++++++++++++++
 2 files changed, 87 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 2a0ec4e..88a2f0a 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -546,7 +546,16 @@ void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c
 }
 
 void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
-    callback(ResultOperationNotSupported);
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        stateLock.unlock();
+        callback(ResultAlreadyClosed);
+        return;
+    }
+    stateLock.unlock();
+    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
+        (*i)->seekAsync(timestamp, callback);
+    }
 }
 
 void PartitionedConsumerImpl::runPartitionUpdateTask() {
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 25ff4ed..1e43957 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1579,6 +1579,83 @@ TEST(BasicEndToEndTest, testSeek) {
     ASSERT_EQ(ResultOk, client.close());
 }
 
+TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "persistent://public/default/testSeekOnPartitionedTopic";
+
+    std::string url =
+        adminUrl + "admin/v2/persistent/public/default/testSeekOnPartitionedTopic" + "/partitions";
+    int res = makePutRequest(url, "3");
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    std::string subName = "sub-testSeekOnPartitionedTopic";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consConfig;
+    consConfig.setReceiverQueueSize(1);
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    std::string temp = producer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    temp = consumer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+    uint64_t timestampMillis = TimeUtils::currentTimeMillis();
+
+    // Send 100 messages synchronously
+    std::string msgContent = "msg-content";
+    LOG_INFO("Publishing 100 messages synchronously");
+    int msgNum = 0;
+    for (; msgNum < 100; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    LOG_INFO("Trying to receive 100 messages");
+    Message msgReceived;
+    for (msgNum = 0; msgNum < 100; msgNum++) {
+        consumer.receive(msgReceived, 3000);
+        LOG_DEBUG("Received message :" << msgReceived.getMessageId());
+        std::stringstream expected;
+        expected << msgContent << msgNum;
+        ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
+    }
+
+    // seek to the time before sending messages, expected receive first message.
+    result = consumer.seek(timestampMillis);
+    // Sleeping for 500ms to wait for consumer re-connect
+    std::this_thread::sleep_for(std::chrono::microseconds(500 * 1000));
+
+    ASSERT_EQ(ResultOk, result);
+    consumer.receive(msgReceived, 3000);
+    LOG_ERROR("Received message :" << msgReceived.getMessageId());
+    std::stringstream expected;
+    msgNum = 0;
+    expected << msgContent << msgNum;
+    ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
+    ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+    ASSERT_EQ(ResultOk, consumer.close());
+    ASSERT_EQ(ResultOk, producer.close());
+    ASSERT_EQ(ResultOk, client.close());
+}
+
 TEST(BasicEndToEndTest, testUnAckedMessageTimeout) {
     Client client(lookupUrl);
     std::string topicName = "testUnAckedMessageTimeout";