You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/08 14:13:22 UTC
[pulsar] branch master updated: [pulsar-client-cpp] Support Seek on
Partitioned Topic by Time (#7198)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2add228d [pulsar-client-cpp] Support Seek on Partitioned Topic by Time (#7198)
2add228d is described below
commit 2add228dd39ecf480179c16abaf87871461c4458
Author: k2la <mz...@gmail.com>
AuthorDate: Mon Jun 8 23:13:10 2020 +0900
[pulsar-client-cpp] Support Seek on Partitioned Topic by Time (#7198)
---
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 11 +++-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 77 ++++++++++++++++++++++++
2 files changed, 87 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 2a0ec4e..88a2f0a 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -546,7 +546,16 @@ void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c
}
void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
- callback(ResultOperationNotSupported);
+ Lock stateLock(mutex_);
+ if (state_ != Ready) {
+ stateLock.unlock();
+ callback(ResultAlreadyClosed);
+ return;
+ }
+ stateLock.unlock();
+ for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
+ (*i)->seekAsync(timestamp, callback);
+ }
}
void PartitionedConsumerImpl::runPartitionUpdateTask() {
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 25ff4ed..1e43957 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1579,6 +1579,83 @@ TEST(BasicEndToEndTest, testSeek) {
ASSERT_EQ(ResultOk, client.close());
}
+TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string topicName = "persistent://public/default/testSeekOnPartitionedTopic";
+
+ std::string url =
+ adminUrl + "admin/v2/persistent/public/default/testSeekOnPartitionedTopic" + "/partitions";
+ int res = makePutRequest(url, "3");
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ std::string subName = "sub-testSeekOnPartitionedTopic";
+ Producer producer;
+
+ Promise<Result, Producer> producerPromise;
+ client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+ Future<Result, Producer> producerFuture = producerPromise.getFuture();
+ Result result = producerFuture.get(producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ ConsumerConfiguration consConfig;
+ consConfig.setReceiverQueueSize(1);
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+ std::string temp = producer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ temp = consumer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+ uint64_t timestampMillis = TimeUtils::currentTimeMillis();
+
+ // Send 100 messages synchronously
+ std::string msgContent = "msg-content";
+ LOG_INFO("Publishing 100 messages synchronously");
+ int msgNum = 0;
+ for (; msgNum < 100; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ LOG_INFO("Trying to receive 100 messages");
+ Message msgReceived;
+ for (msgNum = 0; msgNum < 100; msgNum++) {
+ consumer.receive(msgReceived, 3000);
+ LOG_DEBUG("Received message :" << msgReceived.getMessageId());
+ std::stringstream expected;
+ expected << msgContent << msgNum;
+ ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
+ ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
+ }
+
+ // seek to the time before sending messages, expected receive first message.
+ result = consumer.seek(timestampMillis);
+ // Sleeping for 500ms to wait for consumer re-connect
+ std::this_thread::sleep_for(std::chrono::microseconds(500 * 1000));
+
+ ASSERT_EQ(ResultOk, result);
+ consumer.receive(msgReceived, 3000);
+ LOG_ERROR("Received message :" << msgReceived.getMessageId());
+ std::stringstream expected;
+ msgNum = 0;
+ expected << msgContent << msgNum;
+ ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
+ ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+ ASSERT_EQ(ResultOk, consumer.close());
+ ASSERT_EQ(ResultOk, producer.close());
+ ASSERT_EQ(ResultOk, client.close());
+}
+
TEST(BasicEndToEndTest, testUnAckedMessageTimeout) {
Client client(lookupUrl);
std::string topicName = "testUnAckedMessageTimeout";