You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/07/19 01:51:44 UTC
[pulsar] branch master updated: [issue 4589] Fix redelivered
message logic of partition topic (#4653)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cc5f25b [issue 4589] Fix redelivered message logic of partition topic (#4653)
cc5f25b is described below
commit cc5f25bf1a97609ad065908b9a60866c382ee2e0
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Fri Jul 19 09:51:39 2019 +0800
[issue 4589] Fix redelivered message logic of partition topic (#4653)
Fixes #4589
Motivation
When using Partition-topic, the logic of redeliver messages will not be triggered when the time of ackTimeout arrives.
This is because the unAckedMessageTrackerPtr_->add(msg.getMessageId()) is not call in the listener handling of partitioned topic in cpp code
---
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 1 +
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 51 +++++++++++++++++++
pulsar-client-go/pulsar/c_consumer.go | 2 +-
pulsar-client-go/pulsar/c_error.go | 2 +-
pulsar-client-go/pulsar/consumer_test.go | 62 ++++++++++++++++++++++++
5 files changed, 116 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index ca4596e..3ba0033 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -365,6 +365,7 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
}
messages_.push(msg);
if (messageListener_) {
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
listenerExecutor_->postWork(
std::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer));
}
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 80ca4e6..6258bbc 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1550,6 +1550,57 @@ TEST(BasicEndToEndTest, testUnAckedMessageTimeout) {
client.close();
}
+static long messagesReceived = 0;
+
+static void unackMessageListenerFunction(Consumer consumer, const Message &msg) { messagesReceived++; }
+
+TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout) {
+ Client client(lookupUrl);
+ long unAckedMessagesTimeoutMs = 10000;
+
+ std::string topicName = "persistent://public/default/testPartitionTopicUnAckedMessageTimeout";
+
+ // call admin api to make it partitioned
+ std::string url =
+ adminUrl + "admin/v2/persistent/public/default/testPartitionTopicUnAckedMessageTimeout/partitions";
+ int res = makePutRequest(url, "3");
+
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ std::string subName = "my-sub-name";
+
+ Producer producer;
+ Result result = client.createProducer(topicName, producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ ConsumerConfiguration consConfig;
+ consConfig.setMessageListener(
+ std::bind(unackMessageListenerFunction, std::placeholders::_1, std::placeholders::_2));
+ consConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+ result = client.subscribe(topicName, subName, consConfig, consumer);
+ ASSERT_EQ(ResultOk, result);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+ for (int i = 0; i < 10; i++) {
+ Message msg = MessageBuilder().setContent("test-" + std::to_string(i)).build();
+ producer.sendAsync(msg, nullptr);
+ }
+
+ producer.flush();
+ long timeWaited = 0;
+ while (true) {
+ // maximum wait time
+ ASSERT_LE(timeWaited, unAckedMessagesTimeoutMs * 3);
+ if (messagesReceived >= 10 * 2) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ timeWaited += 500;
+ }
+}
+
TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {
Client client(lookupUrl);
std::string topicName = "testUnAckedMessageTimeoutListener";
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index ac6d9ed..10d86e0 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -211,7 +211,7 @@ func subscribeAsync(client *client, options ConsumerOptions, schema Schema, call
C._pulsar_client_subscribe_multi_topics_async(client.ptr, (**C.char)(cArray), C.int(len(options.Topics)),
subName, conf, callbackPtr)
- for idx, _ := range options.Topics {
+ for idx := range options.Topics {
C.free(unsafe.Pointer(a[idx]))
}
diff --git a/pulsar-client-go/pulsar/c_error.go b/pulsar-client-go/pulsar/c_error.go
index a7c83ae..d24a7e3 100644
--- a/pulsar-client-go/pulsar/c_error.go
+++ b/pulsar-client-go/pulsar/c_error.go
@@ -42,7 +42,7 @@ type Error struct {
func newError(result C.pulsar_result, msg string) error {
return &Error{
- msg: fmt.Sprintf("%s: %s", msg, C.GoString(C.pulsar_result_str(result))),
+ msg: fmt.Sprintf("%s: %v", msg, C.GoString(C.pulsar_result_str(result))),
result: Result(result),
}
}
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 59cbff6..e83d341 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -656,3 +656,65 @@ func TestConsumerShared(t *testing.T) {
}
}()
}
+
+func TestConsumer_AckTimeout(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := fmt.Sprintf("my-topic-%d", time.Now().Unix())
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ AckTimeout: 10 * time.Second,
+ Name: "my-consumer-name",
+ Type: Shared,
+ })
+
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ assert.Equal(t, consumer.Topic(), "persistent://public/default/"+topic)
+ assert.Equal(t, consumer.Subscription(), "my-sub")
+
+ ctx := context.Background()
+
+ // send one message
+ if err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-pulsar")),
+ }); err != nil {
+ t.Fatal(err)
+ }
+
+ // receive message but not ack
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-pulsar"))
+
+ // wait ack timeout
+ time.Sleep(10 * time.Second)
+
+ // receive message again
+ msgAgain, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, string(msgAgain.Payload()), fmt.Sprintf("hello-pulsar"))
+
+ if err := consumer.Ack(msgAgain); err != nil {
+ assert.Nil(t, err)
+ }
+
+ if err := consumer.Unsubscribe(); err != nil {
+ assert.Nil(t, err)
+ }
+}