You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/07/19 01:51:44 UTC

[pulsar] branch master updated: [issue 4589] Fix redelivered message logic of partition topic (#4653)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cc5f25b  [issue 4589] Fix redelivered message logic of partition topic (#4653)
cc5f25b is described below

commit cc5f25bf1a97609ad065908b9a60866c382ee2e0
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Fri Jul 19 09:51:39 2019 +0800

    [issue 4589] Fix redelivered message logic of partition topic (#4653)
    
    Fixes #4589
    
    Motivation
    When using Partition-topic, the logic of redeliver messages will not be triggered when the time of ackTimeout arrives.
    
    This is because the unAckedMessageTrackerPtr_->add(msg.getMessageId()) is not call in the listener handling of partitioned topic in cpp code
---
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc |  1 +
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 51 +++++++++++++++++++
 pulsar-client-go/pulsar/c_consumer.go            |  2 +-
 pulsar-client-go/pulsar/c_error.go               |  2 +-
 pulsar-client-go/pulsar/consumer_test.go         | 62 ++++++++++++++++++++++++
 5 files changed, 116 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index ca4596e..3ba0033 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -365,6 +365,7 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
         }
         messages_.push(msg);
         if (messageListener_) {
+            unAckedMessageTrackerPtr_->add(msg.getMessageId());
             listenerExecutor_->postWork(
                 std::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer));
         }
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 80ca4e6..6258bbc 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1550,6 +1550,57 @@ TEST(BasicEndToEndTest, testUnAckedMessageTimeout) {
     client.close();
 }
 
+static long messagesReceived = 0;
+
+static void unackMessageListenerFunction(Consumer consumer, const Message &msg) { messagesReceived++; }
+
+TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout) {
+    Client client(lookupUrl);
+    long unAckedMessagesTimeoutMs = 10000;
+
+    std::string topicName = "persistent://public/default/testPartitionTopicUnAckedMessageTimeout";
+
+    // call admin api to make it partitioned
+    std::string url =
+        adminUrl + "admin/v2/persistent/public/default/testPartitionTopicUnAckedMessageTimeout/partitions";
+    int res = makePutRequest(url, "3");
+
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    std::string subName = "my-sub-name";
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consConfig;
+    consConfig.setMessageListener(
+        std::bind(unackMessageListenerFunction, std::placeholders::_1, std::placeholders::_2));
+    consConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+    result = client.subscribe(topicName, subName, consConfig, consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+    for (int i = 0; i < 10; i++) {
+        Message msg = MessageBuilder().setContent("test-" + std::to_string(i)).build();
+        producer.sendAsync(msg, nullptr);
+    }
+
+    producer.flush();
+    long timeWaited = 0;
+    while (true) {
+        // maximum wait time
+        ASSERT_LE(timeWaited, unAckedMessagesTimeoutMs * 3);
+        if (messagesReceived >= 10 * 2) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(500));
+        timeWaited += 500;
+    }
+}
+
 TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {
     Client client(lookupUrl);
     std::string topicName = "testUnAckedMessageTimeoutListener";
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index ac6d9ed..10d86e0 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -211,7 +211,7 @@ func subscribeAsync(client *client, options ConsumerOptions, schema Schema, call
 		C._pulsar_client_subscribe_multi_topics_async(client.ptr, (**C.char)(cArray), C.int(len(options.Topics)),
 			subName, conf, callbackPtr)
 
-		for idx, _ := range options.Topics {
+		for idx := range options.Topics {
 			C.free(unsafe.Pointer(a[idx]))
 		}
 
diff --git a/pulsar-client-go/pulsar/c_error.go b/pulsar-client-go/pulsar/c_error.go
index a7c83ae..d24a7e3 100644
--- a/pulsar-client-go/pulsar/c_error.go
+++ b/pulsar-client-go/pulsar/c_error.go
@@ -42,7 +42,7 @@ type Error struct {
 
 func newError(result C.pulsar_result, msg string) error {
 	return &Error{
-		msg:    fmt.Sprintf("%s: %s", msg, C.GoString(C.pulsar_result_str(result))),
+		msg:    fmt.Sprintf("%s: %v", msg, C.GoString(C.pulsar_result_str(result))),
 		result: Result(result),
 	}
 }
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 59cbff6..e83d341 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -656,3 +656,65 @@ func TestConsumerShared(t *testing.T) {
 		}
 	}()
 }
+
+func TestConsumer_AckTimeout(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := fmt.Sprintf("my-topic-%d", time.Now().Unix())
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub",
+		AckTimeout:       10 * time.Second,
+		Name:             "my-consumer-name",
+		Type:             Shared,
+	})
+
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	assert.Equal(t, consumer.Topic(), "persistent://public/default/"+topic)
+	assert.Equal(t, consumer.Subscription(), "my-sub")
+
+	ctx := context.Background()
+
+	// send one message
+	if err := producer.Send(ctx, ProducerMessage{
+		Payload: []byte(fmt.Sprintf("hello-pulsar")),
+	}); err != nil {
+		t.Fatal(err)
+	}
+
+	// receive message but not ack
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-pulsar"))
+
+	// wait ack timeout
+	time.Sleep(10 * time.Second)
+
+	// receive message again
+	msgAgain, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	assert.Equal(t, string(msgAgain.Payload()), fmt.Sprintf("hello-pulsar"))
+
+	if err := consumer.Ack(msgAgain); err != nil {
+		assert.Nil(t, err)
+	}
+
+	if err := consumer.Unsubscribe(); err != nil {
+		assert.Nil(t, err)
+	}
+}