You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/06 12:25:24 UTC

[GitHub] [pulsar-client-cpp] shibd opened a new pull request, #139: [feat] Support Dead Letter Topic.

shibd opened a new pull request, #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139

   Master Issue: #114 
   
   ### Motivation
   #77, #114 
   
   ### Modifications
   -  Add DeadLetterPolicy. When the message `redeliveryCount > DeadLetterPolicy.maxRedeliveryCount`, the message is sent to the DLQ topic and ack this msg.
   
   
   ### Verifying this change
   This change added tests and can be verified as follows:
   
   1. Add DeadLetterQueueTest to verify Send to DLQ, according to the following scenarios: 
   - Producer batch[enabled, disable]
   - Topic is [single, multi]
   - Consumer sub [Shared, Key_Shard]
   - Trigger by [unack, ac_timeout]
   
   2. Add `DeadLetterQueueTest.testInitSubscription` to cover init DLQ topic subscription: https://github.com/apache/pulsar/pull/13355
   3. Add `DeadLetterQueueTest.testAutoSetDLQTopicName` to cover default DLQ topic name.
   4. Add `DeadLetterQueueTest.testAutoSchema` to cover this issue: https://github.com/apache/pulsar/pull/9970. 
   Because `AUTO_PUBLISH` schemas type is not yet supported, this unit test can only be passed if the broker configuration `isSchemaValidationEnforced = false`. When we support `AUTO_PUBLISH` schema, we should set isSchemaValidationEnforced to true to revalidate.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1071952441


##########
lib/ProducerConfigurationImpl.h:
##########
@@ -50,6 +50,20 @@ struct ProducerConfigurationImpl {
     std::map<std::string, std::string> properties;
     bool chunkingEnabled{false};
     ProducerConfiguration::ProducerAccessMode accessMode{ProducerConfiguration::Shared};
+    std::string initialSubscriptionName;
+
+    /**
+     * Use this config to automatically create an initial subscription when creating the topic.
+     * If this field is not set, the initial subscription will not be created.
+     * This method is limited to internal use
+     *
+     * @param initialSubscriptionName Name of the initial subscription of the topic.
+     */
+    void setInitialSubscriptionName(const std::string& initialSubscriptionNameParam) {
+        initialSubscriptionName = initialSubscriptionNameParam;
+    }
+
+    const std::string& getInitialSubscriptionName() const { return initialSubscriptionName; }

Review Comment:
   It's better not to add a setter and getter to a POD class. `initialSubscriptionName` is a public field, we can modify it directly. Adding a getter and setter to a POD class is redundant and adds much more code.



##########
lib/ProducerConfiguration.cc:
##########
@@ -16,6 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include "pulsar/ProducerConfiguration.h"

Review Comment:
   ```suggestion
   #include <pulsar/ProducerConfiguration.h>
   ```
   
   We usually use `<>` for headers of the Pulsar client itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower merged pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower merged PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1071930114


##########
include/pulsar/ConsumerConfiguration.h:
##########
@@ -554,6 +593,8 @@ class PULSAR_PUBLIC ConsumerConfiguration {
     bool isStartMessageIdInclusive() const;
 
     friend class PulsarWrapper;
+    friend class DeadLetterQueueTest;
+    friend class DeadLetterQueueTest_testWithoutConsumerReceiveImmediately_Test;

Review Comment:
   Could you replace them with `PulsarFriend` and add some methods to `PulsarFriend` to access private fields of `ConsumerConfiguration`? Once the test name changed, these friend classes would be meaningless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1050484205


##########
lib/ConsumerImpl.cc:
##########
@@ -460,6 +476,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
         return;
     }
 
+    auto redeliveryCount = msg.redelivery_count();

Review Comment:
   It seems that we need to pass the `redeliveryCount` to the `decryptMessageIfNeeded` method before? When the decryption failed, `unAckedMessageTracker.add(m, redeliveryCount);` should be called. (Though I didn't find if there is a related test)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1050452695


##########
lib/DeadLetterPolicyImpl.h:
##########
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include "climits"
+#include "string"

Review Comment:
   ```suggestion
   #include <climits>
   #include <string>
   ```
   
   We usually use angle brackets for headers from system libraries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1062203687


##########
include/pulsar/DeadLetterPolicy.h:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef DEAD_LETTER_POLICY_HPP_
+#define DEAD_LETTER_POLICY_HPP_
+
+#include <pulsar/defines.h>
+
+#include <memory>
+#include <string>
+
+namespace pulsar {
+
+struct DeadLetterPolicyImpl;
+
+/**
+ * Configuration for the "dead letter queue" feature in consumer.
+ *
+ * see @DeadLetterPolicyBuilder
+ */
+class PULSAR_PUBLIC DeadLetterPolicy {
+   public:
+    DeadLetterPolicy();
+
+    /**
+     * Get dead letter topic
+     *
+     * @return
+     */
+    std::string getDeadLetterTopic() const;

Review Comment:
   It's okay to return by value here because the copy cost of a topic name is not high. But we should keep the code style consistent, I see `ProducerConfiguration::getInitialSubscriptionName` returns a const reference but `DeadLetterPolicy::getInitialSubscriptionName` returns a value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1062265293


##########
lib/ConsumerImpl.cc:
##########
@@ -1209,7 +1239,28 @@ void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& me
         redeliverUnacknowledgedMessages();
         return;
     }
-    redeliverMessages(messageIds);
+
+    ClientConnectionPtr cnx = getCnx().lock();
+    if (cnx) {
+        if (cnx->getServerProtocolVersion() >= proto::v2) {
+            auto needRedeliverMsgs = std::make_shared<std::set<MessageId>>();
+            auto needCallBack = std::make_shared<std::atomic<int>>(messageIds.size());
+            auto self = get_shared_this_ptr();
+            for (const auto& msgId : messageIds) {

Review Comment:
   You can add a TODO to add the `MAX_REDELIVER_UNACKNOWLEDGED` support in future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1050664001


##########
test-conf/standalone-ssl.conf:
##########
@@ -50,6 +50,9 @@ brokerShutdownTimeoutMs=3000
 # Enable backlog quota check. Enforces action on topic when the quota is reached
 backlogQuotaCheckEnabled=true
 
+# Disable schema validation: If a producer doesn’t carry a schema, the producer is allowed to connect to the topic and produce data.
+isSchemaValidationEnforced=true
+

Review Comment:
   > IMO, currently we don't need to handle this case. The default value of isSchemaValidationEnforced is false.
   
   I think we need to turn on validation. We should ensure that the schema still works when `isSchemaValidationEnforced` is true. 
   
   For example, DLQ producers create scenarios that fail (refer: verifying this change[5]), If we set `isSchemaValidationEnforced` to false, then the test passes.
   
   
   > I have an idea that we can add an option to download the latest schema info when creating producers without schema. e.g.
   
   Good point.  I will change #142 to support auto download schema first.
   
   BWT: I'm not going to expose this interface to users, just use it in DLQ for the time being. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1050664001


##########
test-conf/standalone-ssl.conf:
##########
@@ -50,6 +50,9 @@ brokerShutdownTimeoutMs=3000
 # Enable backlog quota check. Enforces action on topic when the quota is reached
 backlogQuotaCheckEnabled=true
 
+# Disable schema validation: If a producer doesn’t carry a schema, the producer is allowed to connect to the topic and produce data.
+isSchemaValidationEnforced=true
+

Review Comment:
   > IMO, currently we don't need to handle this case. The default value of isSchemaValidationEnforced is false.
   
   I think we need to turn this. We should ensure that the schema still works when `isSchemaValidationEnforced` is true. 
   
   For example, DLQ producers create scenarios that fail (refer: verifying this change[5]), If we set `isSchemaValidationEnforced` to false, then the test passes.
   
   
   > I have an idea that we can add an option to download the latest schema info when creating producers without schema. e.g.
   
   Good point.  I will change #142 to support auto download schema first.
   
   BWT: I'm not going to expose this interface to users, just use it in DLQ for the time being. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1071990162


##########
include/pulsar/ConsumerConfiguration.h:
##########
@@ -554,6 +593,8 @@ class PULSAR_PUBLIC ConsumerConfiguration {
     bool isStartMessageIdInclusive() const;
 
     friend class PulsarWrapper;
+    friend class DeadLetterQueueTest;
+    friend class DeadLetterQueueTest_testWithoutConsumerReceiveImmediately_Test;

Review Comment:
   Yes. This way is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] Anonymitaet commented on pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#issuecomment-1375428445

   Thanks! LGTM from the doc perspective.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#issuecomment-1373407356

   @BewareMyPower @Anonymitaet Thanks for your reviews. All suggestions have been fixed. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1065651467


##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 10;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_FALSE(msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // set ack timeout is 10 ms.
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    consumerImpl.unAckedMessageTrackerPtr_.reset(
+        new UnAckedMessageTrackerEnabled(10, PulsarFriend::getClientImplPtr(client), consumerImpl));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    std::this_thread::sleep_for(std::chrono::milliseconds(200));
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    client.close();
+}
+
+TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic);
+
+    client.close();
+}
+
+class DeadLetterQueueTest : public ::testing::TestWithParam<std::tuple<bool, bool, ConsumerType>> {
+   public:
+    void SetUp() override {
+        isProducerBatch_ = std::get<0>(GetParam());
+        isMultiConsumer_ = std::get<1>(GetParam());
+        consumerType_ = std::get<2>(GetParam());
+        producerConf_ = ProducerConfiguration().setBatchingEnabled(isProducerBatch_);

Review Comment:
   Could you also initialize the `ConsumerConfiguration` here? (this test file is too huge with 400+ lines)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1050474824


##########
include/pulsar/DeadLetterPolicyBuilder.h:
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef DEAD_LETTER_POLICY_BUILD_HPP_
+#define DEAD_LETTER_POLICY_BUILD_HPP_
+
+#include <pulsar/DeadLetterPolicy.h>
+#include <pulsar/defines.h>
+
+#include <memory>
+
+namespace pulsar {
+
+struct DeadLetterPolicyImpl;
+
+/**
+ * The builder to build a DeadLetterPolicyBuilder
+ *
+ * Example of building DeadLetterPolicy:
+ *
+ * ```c++
+ * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+ *                       .deadLetterTopic("dlq-topic")
+ *                       .maxRedeliverCount(10)
+ *                       .initialSubscriptionName("init-sub-name")
+ *                       .build();
+ * ```
+ */
+class PULSAR_PUBLIC DeadLetterPolicyBuilder {
+   public:
+    DeadLetterPolicyBuilder();
+
+    /**
+     * Set dead letter topic
+     *
+     * @return
+     */
+    DeadLetterPolicyBuilder& deadLetterTopic(const std::string& deadLetterTopic);
+
+    /**
+     * Set max redeliver count
+     *
+     * @return
+     */

Review Comment:
   When you add the API docs, please add the default value and explain the behavior. For example, from the code in `ConsumerImpl.cc` I see the dead letter policy only works when `maxRedeliverCount > 0`. And what if users didn't set the dead letter topic? Please make API docs readable and meaningful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1062256415


##########
lib/ConsumerImpl.cc:
##########
@@ -527,6 +544,10 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
                                 << startMessageId.value());
             return;
         }
+        if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
+            possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector<Message>{m});
+            increaseAvailablePermits(cnx);

Review Comment:
   If the redeliver logic needs extra tests. We can add a TODO here and add the tests in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1052850477


##########
lib/ConsumerImpl.cc:
##########
@@ -460,6 +476,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
         return;
     }
 
+    auto redeliveryCount = msg.redelivery_count();

Review Comment:
   It seems that this is another problem: we need to give the client that cannot decrypt the message a chance to re-decrypt the message, so we need to put it in `unAckedMessageTracker ` and wait for re-delivery.
   
   https://github.com/apache/pulsar/pull/3097 
   
   
   As for why `redeliveryCount` is passed into `decryptPayloadIfNeeded` methods in the Java implementation, it is because of the support for [PIP-130](https://github.com/apache/pulsar/pull/13707)
   
   
   Note: I'll open a new PR to support both issues.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1063266384


##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // set ack timeout is 10 ms.
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    consumerImpl.unAckedMessageTrackerPtr_.reset(
+        new UnAckedMessageTrackerEnabled(10, PulsarFriend::getClientImplPtr(client), consumerImpl));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    sleep(2);
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    client.close();

Review Comment:
   In order cover to this: https://github.com/apache/pulsar/pull/3079



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1063260550


##########
lib/ConsumerImpl.cc:
##########
@@ -527,6 +544,10 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
                                 << startMessageId.value());
             return;
         }
+        if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
+            possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector<Message>{m});
+            increaseAvailablePermits(cnx);

Review Comment:
   > And should we call redeliverUnacknowledgedMessages here as well?
   
   Yes, Need to call `redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds)`, I understand it wrong before.
   
   The design of the DLQ goes like this:
   
   When receiving the message `redeliveryCount == deadletterPolicy_.getMaxRedeliverCount`, We add it to `possibleSendToDeadLetterTopicMessages_` waiting for the user to call `nack method` or `ackTimeOut` trigger send to DLQ.
   
   https://github.com/apache/pulsar-client-cpp/blob/e818ead391dc52ef08bbd59c748a00492dcf0cb5/lib/NegativeAcksTracker.cc#L82-L84
   
   https://github.com/apache/pulsar-client-cpp/blob/1721e0005975bcc9cbd49566d6047760e6621a3b/lib/UnAckedMessageTrackerEnabled.cc#L68-L73
   
   This can reduce once redeliver message from the server to the client.
   
   But there are also unexpected scenarios, case `transaction.abort` : apache/pulsar/pull/17287, It does not go through the `redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds)` method, so need to directly judge again whether it is greater than: `if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount())`
   
   I originally thought the CPP client had no transactions, so it didn't need to handle the scenario. 
   
   But I found that the user can call this method:
   
   https://github.com/apache/pulsar-client-cpp/blob/2980bbe9d3ef47eef4a1d468c3898b5a989b4df0/include/pulsar/Consumer.h#L359
   
   It also triggers the server to deliver messages directly without going through the logic of sending them to DLQ. 
   
   I added back this judgment condition and added the test to cover it: `DeadLetterQueueTest.testSendDLQTriggerByRedeliverUnacknowledgedMessages`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1050508144


##########
test-conf/standalone-ssl.conf:
##########
@@ -50,6 +50,9 @@ brokerShutdownTimeoutMs=3000
 # Enable backlog quota check. Enforces action on topic when the quota is reached
 backlogQuotaCheckEnabled=true
 
+# Disable schema validation: If a producer doesn’t carry a schema, the producer is allowed to connect to the topic and produce data.
+isSchemaValidationEnforced=true
+

Review Comment:
   IMO, currently we don't need to handle this case. The default value of `isSchemaValidationEnforced` is false.
   
   I think you can continue the work of https://github.com/apache/pulsar-client-cpp/pull/142, but the goal is not supporting auto produce schema. Because C++ client only interacts with `SchemaInfo`, while `AutoProduceBytesSchema` is a `Schema<byte[]>`.
   
   I have an idea that we can add an option to download the latest schema info when creating producers without schema. e.g.
   
   ```
   // it should only work for the case when isSchemaValidationEnforced is true
   producerConf.setAutoDownloadSchema(true);
   ```
   
   Regarding the message validation, users should still perform the message validation by themselves. From the discussion [here](https://lists.apache.org/thread/2gbftzs7xwvvddythn818bor17rqpd4m), we can see Java client does not validate the message in `sendAsync`. Instead, it validates the message in `TypedMessageBuilder#value`. There is no corresponding concept in the C++ client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1071945567


##########
lib/ProducerConfiguration.cc:
##########
@@ -16,6 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include "pulsar/ProducerConfiguration.h"

Review Comment:
   ```suggestion
   #include <pulsar/ProducerConfiguration.h>
   ```
   
   We usually use `<>` for headers of the Pulsar client itself. BTW, it's already included in `ProducerConfigurationImpl.h`, there is no need to add these unnecessary changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1062309898


##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // set ack timeout is 10 ms.
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    consumerImpl.unAckedMessageTrackerPtr_.reset(
+        new UnAckedMessageTrackerEnabled(10, PulsarFriend::getClientImplPtr(client), consumerImpl));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    sleep(2);
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    client.close();
+}
+
+TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic);
+
+    client.close();
+}
+
+class DeadLetterQueueTest : public ::testing::TestWithParam<std::tuple<bool, bool, ConsumerType>> {
+   public:
+    void SetUp() override {
+        isProducerBatch_ = std::get<0>(GetParam());
+        isMultiConsumer_ = std::get<1>(GetParam());
+        consumerType_ = std::get<2>(GetParam());
+        producerConf_ = ProducerConfiguration().setBatchingEnabled(isProducerBatch_);
+    }
+
+    void TearDown() override { client_.close(); }
+
+    void initTopic(std::string topicName) {
+        if (isMultiConsumer_) {
+            // call admin api to make it partitioned
+            std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
+            int res = makePutRequest(url, "5");
+            LOG_INFO("res = " << res);
+            ASSERT_FALSE(res != 204 && res != 409);
+        }
+    }
+
+   protected:
+    Client client_{lookupUrl};
+    ProducerConfiguration producerConf_;
+    bool isMultiConsumer_;
+    bool isProducerBatch_;
+    ConsumerType consumerType_;
+};
+
+TEST_P(DeadLetterQueueTest, testSendDLQTriggerByAckTimeOutAndNeAck) {
+    Client client(lookupUrl);
+    const std::string topic = "testSendDLQTriggerByAckTimeOut-" + std::to_string(time(nullptr)) +
+                              std::to_string(isMultiConsumer_) + std::to_string(isProducerBatch_) +
+                              std::to_string(consumerType_);
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = topic + "-" + subName + "-DLQ";
+    initTopic(topic);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(consumerType_);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Reset the unack timeout
+    long unackTimeOut = 200;
+    if (isMultiConsumer_) {
+        auto multiConsumer = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+        multiConsumer->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+            unackTimeOut, PulsarFriend::getClientImplPtr(client), *multiConsumer));
+        multiConsumer->consumers_.forEachValue([&client, unackTimeOut](ConsumerImplPtr consumer) {
+            consumer->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+                unackTimeOut, PulsarFriend::getClientImplPtr(client), *consumer));
+        });
+    } else {
+        auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+        consumerImpl.unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+            unackTimeOut, PulsarFriend::getClientImplPtr(client), consumerImpl));
+    }

Review Comment:
   I think a better way to set the `unackTimeout` is to modify the `impl_` field of `ConsumerConfiguration` directly, so we can pass the check in `ConsumerConfiguration::setUnAckedMessagesTimeoutMs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#issuecomment-1396387216

   > I have left the last comments. This PR overall LGTM. Please avoid committing such a huge patch next time. If a PR had many code changes, it would be very hard to review all of them. Some potential issues might not be exposed.
   
   Get it, Thanks for your professional reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1065647417


##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // set ack timeout is 10 ms.
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    consumerImpl.unAckedMessageTrackerPtr_.reset(
+        new UnAckedMessageTrackerEnabled(10, PulsarFriend::getClientImplPtr(client), consumerImpl));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    sleep(2);
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    client.close();
+}
+
+TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic);
+
+    client.close();
+}
+
+class DeadLetterQueueTest : public ::testing::TestWithParam<std::tuple<bool, bool, ConsumerType>> {
+   public:
+    void SetUp() override {
+        isProducerBatch_ = std::get<0>(GetParam());
+        isMultiConsumer_ = std::get<1>(GetParam());
+        consumerType_ = std::get<2>(GetParam());
+        producerConf_ = ProducerConfiguration().setBatchingEnabled(isProducerBatch_);
+    }
+
+    void TearDown() override { client_.close(); }
+
+    void initTopic(std::string topicName) {
+        if (isMultiConsumer_) {
+            // call admin api to make it partitioned
+            std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
+            int res = makePutRequest(url, "5");
+            LOG_INFO("res = " << res);
+            ASSERT_FALSE(res != 204 && res != 409);
+        }
+    }
+
+   protected:
+    Client client_{lookupUrl};
+    ProducerConfiguration producerConf_;
+    bool isMultiConsumer_;
+    bool isProducerBatch_;
+    ConsumerType consumerType_;
+};
+
+TEST_P(DeadLetterQueueTest, testSendDLQTriggerByAckTimeOutAndNeAck) {
+    Client client(lookupUrl);
+    const std::string topic = "testSendDLQTriggerByAckTimeOut-" + std::to_string(time(nullptr)) +
+                              std::to_string(isMultiConsumer_) + std::to_string(isProducerBatch_) +
+                              std::to_string(consumerType_);
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = topic + "-" + subName + "-DLQ";
+    initTopic(topic);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(consumerType_);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Reset the unack timeout
+    long unackTimeOut = 200;
+    if (isMultiConsumer_) {
+        auto multiConsumer = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+        multiConsumer->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+            unackTimeOut, PulsarFriend::getClientImplPtr(client), *multiConsumer));
+        multiConsumer->consumers_.forEachValue([&client, unackTimeOut](ConsumerImplPtr consumer) {
+            consumer->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+                unackTimeOut, PulsarFriend::getClientImplPtr(client), *consumer));
+        });
+    } else {
+        auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+        consumerImpl.unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+            unackTimeOut, PulsarFriend::getClientImplPtr(client), consumerImpl));
+    }

Review Comment:
   The class won't be exposed. The `PulsarFriend` is already exposed in these headers.
   
   In addition, the friend declaration does not **expose** anything. It just indicates you can implement a class with the same name. Exposing usually means the methods and fields are exposed to users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1062292149


##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;

Review Comment:
   If we only send 1 message, the `num` variable and the following for loops can be removed.



##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());

Review Comment:
   ```suggestion
           ASSERT_FALSE(msg.getDataAsString().empty());
   ```



##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // set ack timeout is 10 ms.
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    consumerImpl.unAckedMessageTrackerPtr_.reset(
+        new UnAckedMessageTrackerEnabled(10, PulsarFriend::getClientImplPtr(client), consumerImpl));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    sleep(2);
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    client.close();

Review Comment:
   I didn't get the logic here, I think even without other logics, the consumer could still receive the message sent by the producer.



##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());

Review Comment:
   ```suggestion
           ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
   ```



##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // set ack timeout is 10 ms.
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    consumerImpl.unAckedMessageTrackerPtr_.reset(
+        new UnAckedMessageTrackerEnabled(10, PulsarFriend::getClientImplPtr(client), consumerImpl));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    sleep(2);

Review Comment:
   Are 2 seconds too long? You can use `std::chrono` to sleep N milliseconds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1063275274


##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // set ack timeout is 10 ms.
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    consumerImpl.unAckedMessageTrackerPtr_.reset(
+        new UnAckedMessageTrackerEnabled(10, PulsarFriend::getClientImplPtr(client), consumerImpl));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    sleep(2);
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    client.close();
+}
+
+TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic);
+
+    client.close();
+}
+
+class DeadLetterQueueTest : public ::testing::TestWithParam<std::tuple<bool, bool, ConsumerType>> {
+   public:
+    void SetUp() override {
+        isProducerBatch_ = std::get<0>(GetParam());
+        isMultiConsumer_ = std::get<1>(GetParam());
+        consumerType_ = std::get<2>(GetParam());
+        producerConf_ = ProducerConfiguration().setBatchingEnabled(isProducerBatch_);
+    }
+
+    void TearDown() override { client_.close(); }
+
+    void initTopic(std::string topicName) {
+        if (isMultiConsumer_) {
+            // call admin api to make it partitioned
+            std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
+            int res = makePutRequest(url, "5");
+            LOG_INFO("res = " << res);
+            ASSERT_FALSE(res != 204 && res != 409);
+        }
+    }
+
+   protected:
+    Client client_{lookupUrl};
+    ProducerConfiguration producerConf_;
+    bool isMultiConsumer_;
+    bool isProducerBatch_;
+    ConsumerType consumerType_;
+};
+
+TEST_P(DeadLetterQueueTest, testSendDLQTriggerByAckTimeOutAndNeAck) {
+    Client client(lookupUrl);
+    const std::string topic = "testSendDLQTriggerByAckTimeOut-" + std::to_string(time(nullptr)) +
+                              std::to_string(isMultiConsumer_) + std::to_string(isProducerBatch_) +
+                              std::to_string(consumerType_);
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = topic + "-" + subName + "-DLQ";
+    initTopic(topic);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(consumerType_);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Reset the unack timeout
+    long unackTimeOut = 200;
+    if (isMultiConsumer_) {
+        auto multiConsumer = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+        multiConsumer->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+            unackTimeOut, PulsarFriend::getClientImplPtr(client), *multiConsumer));
+        multiConsumer->consumers_.forEachValue([&client, unackTimeOut](ConsumerImplPtr consumer) {
+            consumer->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+                unackTimeOut, PulsarFriend::getClientImplPtr(client), *consumer));
+        });
+    } else {
+        auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+        consumerImpl.unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+            unackTimeOut, PulsarFriend::getClientImplPtr(client), consumerImpl));
+    }

Review Comment:
   In this case we need to add `friend` to the `ConsumerConfiguration` include file. This may expose the test class to the user.
   
   Is there any good way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] Anonymitaet commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1063150807


##########
include/pulsar/ConsumerConfiguration.h:
##########
@@ -398,6 +399,42 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     const BatchReceivePolicy& getBatchReceivePolicy() const;
 
+    /**
+     * Set dead letter policy for consumer
+     *
+     * By default some message will redelivery so many times possible, even to the extent that it can be never
+     * stop. By using dead letter mechanism messages will has the max redelivery count, when message exceeding

Review Comment:
   ```suggestion
        * stopped. By using the dead letter mechanism, messages have the max redelivery count, when they exceeding
   ```



##########
include/pulsar/ConsumerConfiguration.h:
##########
@@ -398,6 +399,42 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     const BatchReceivePolicy& getBatchReceivePolicy() const;
 
+    /**
+     * Set dead letter policy for consumer
+     *
+     * By default some message will redelivery so many times possible, even to the extent that it can be never
+     * stop. By using dead letter mechanism messages will has the max redelivery count, when message exceeding
+     * the maximum number of redeliveries, message will send to the Dead Letter Topic and acknowledged

Review Comment:
   ```suggestion
        * the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
   ```



##########
include/pulsar/ConsumerConfiguration.h:
##########
@@ -398,6 +399,42 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     const BatchReceivePolicy& getBatchReceivePolicy() const;
 
+    /**
+     * Set dead letter policy for consumer
+     *
+     * By default some message will redelivery so many times possible, even to the extent that it can be never

Review Comment:
   ```suggestion
        * By default, some messages are redelivered many times, even to the extent that they can never be
   ```
   
   Write in the simple present tense as much as possible if you are covering facts that were, are, and forever shall be true. https://docs.google.com/document/d/1lc5j4RtuLIzlEYCBo97AC8-U_3Erzs_lxpkDuseU0n4/edit#bookmark=id.e8uqh1awkcnp



##########
include/pulsar/ConsumerConfiguration.h:
##########
@@ -398,6 +399,42 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     const BatchReceivePolicy& getBatchReceivePolicy() const;
 
+    /**
+     * Set dead letter policy for consumer
+     *
+     * By default some message will redelivery so many times possible, even to the extent that it can be never
+     * stop. By using dead letter mechanism messages will has the max redelivery count, when message exceeding
+     * the maximum number of redeliveries, message will send to the Dead Letter Topic and acknowledged
+     * automatic.
+     *
+     * You can enable the dead letter mechanism by setting dead letter policy.
+     * example:
+     *
+     * <pre>
+     * * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+     *                       .maxRedeliverCount(10)
+     *                       .build();
+     * </pre>
+     * Default dead letter topic name is {TopicName}-{Subscription}-DLQ.
+     * To setting a custom dead letter topic name

Review Comment:
   ```suggestion
        * To set a custom dead letter topic name
   ```



##########
include/pulsar/ConsumerConfiguration.h:
##########
@@ -398,6 +399,42 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     const BatchReceivePolicy& getBatchReceivePolicy() const;
 
+    /**
+     * Set dead letter policy for consumer
+     *
+     * By default some message will redelivery so many times possible, even to the extent that it can be never
+     * stop. By using dead letter mechanism messages will has the max redelivery count, when message exceeding
+     * the maximum number of redeliveries, message will send to the Dead Letter Topic and acknowledged
+     * automatic.

Review Comment:
   ```suggestion
        * automatically.
   ```



##########
include/pulsar/ConsumerConfiguration.h:
##########
@@ -398,6 +399,42 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     const BatchReceivePolicy& getBatchReceivePolicy() const;
 
+    /**
+     * Set dead letter policy for consumer
+     *
+     * By default some message will redelivery so many times possible, even to the extent that it can be never
+     * stop. By using dead letter mechanism messages will has the max redelivery count, when message exceeding
+     * the maximum number of redeliveries, message will send to the Dead Letter Topic and acknowledged
+     * automatic.
+     *
+     * You can enable the dead letter mechanism by setting dead letter policy.
+     * example:
+     *
+     * <pre>
+     * * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+     *                       .maxRedeliverCount(10)
+     *                       .build();
+     * </pre>
+     * Default dead letter topic name is {TopicName}-{Subscription}-DLQ.
+     * To setting a custom dead letter topic name
+     * <pre>
+     * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+     *                       .deadLetterTopic("dlq-topic")
+     *                       .maxRedeliverCount(10)
+     *                       .initialSubscriptionName("init-sub-name")
+     *                       .build();
+     * </pre>
+     * @param deadLetterPolicy thd default is empty

Review Comment:
   ```suggestion
        * @param deadLetterPolicy Default value is empty
   ```



##########
include/pulsar/DeadLetterPolicyBuilder.h:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef DEAD_LETTER_POLICY_BUILD_HPP_
+#define DEAD_LETTER_POLICY_BUILD_HPP_
+
+#include <pulsar/DeadLetterPolicy.h>
+#include <pulsar/defines.h>
+
+#include <memory>
+
+namespace pulsar {
+
+struct DeadLetterPolicyImpl;
+
+/**
+ * The builder to build a DeadLetterPolicyBuilder
+ *
+ * Example of building DeadLetterPolicy:
+ *
+ * ```c++
+ * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+ *                       .deadLetterTopic("dlq-topic")
+ *                       .maxRedeliverCount(10)
+ *                       .initialSubscriptionName("init-sub-name")
+ *                       .build();
+ * ```
+ */
+class PULSAR_PUBLIC DeadLetterPolicyBuilder {
+   public:
+    DeadLetterPolicyBuilder();
+
+    /**
+     * Set dead letter topic.
+     *
+     * @param deadLetterTopic Name of the dead topic where the failing messages will be sent.

Review Comment:
   ```suggestion
        * @param deadLetterTopic Name of the dead topic where the failing messages are sent.
   ```



##########
include/pulsar/DeadLetterPolicyBuilder.h:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef DEAD_LETTER_POLICY_BUILD_HPP_
+#define DEAD_LETTER_POLICY_BUILD_HPP_
+
+#include <pulsar/DeadLetterPolicy.h>
+#include <pulsar/defines.h>
+
+#include <memory>
+
+namespace pulsar {
+
+struct DeadLetterPolicyImpl;
+
+/**
+ * The builder to build a DeadLetterPolicyBuilder
+ *
+ * Example of building DeadLetterPolicy:
+ *
+ * ```c++
+ * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+ *                       .deadLetterTopic("dlq-topic")
+ *                       .maxRedeliverCount(10)
+ *                       .initialSubscriptionName("init-sub-name")
+ *                       .build();
+ * ```
+ */
+class PULSAR_PUBLIC DeadLetterPolicyBuilder {
+   public:
+    DeadLetterPolicyBuilder();
+
+    /**
+     * Set dead letter topic.
+     *
+     * @param deadLetterTopic Name of the dead topic where the failing messages will be sent.
+     * The default value is: sourceTopicName + "-" + subscriptionName + "-DLQ"
+     *
+     * @return
+     */
+    DeadLetterPolicyBuilder& deadLetterTopic(const std::string& deadLetterTopic);
+
+    /**
+     * Set max redeliver count
+     *
+     * @param maxRedeliverCount Maximum number of times that a message will be redelivered before being sent

Review Comment:
   ```suggestion
        * @param maxRedeliverCount Maximum number of times that a message is redelivered before being sent
   ```



##########
include/pulsar/DeadLetterPolicyBuilder.h:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef DEAD_LETTER_POLICY_BUILD_HPP_
+#define DEAD_LETTER_POLICY_BUILD_HPP_
+
+#include <pulsar/DeadLetterPolicy.h>
+#include <pulsar/defines.h>
+
+#include <memory>
+
+namespace pulsar {
+
+struct DeadLetterPolicyImpl;
+
+/**
+ * The builder to build a DeadLetterPolicyBuilder
+ *
+ * Example of building DeadLetterPolicy:
+ *
+ * ```c++
+ * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+ *                       .deadLetterTopic("dlq-topic")
+ *                       .maxRedeliverCount(10)
+ *                       .initialSubscriptionName("init-sub-name")
+ *                       .build();
+ * ```
+ */
+class PULSAR_PUBLIC DeadLetterPolicyBuilder {
+   public:
+    DeadLetterPolicyBuilder();
+
+    /**
+     * Set dead letter topic.
+     *
+     * @param deadLetterTopic Name of the dead topic where the failing messages will be sent.
+     * The default value is: sourceTopicName + "-" + subscriptionName + "-DLQ"
+     *
+     * @return
+     */
+    DeadLetterPolicyBuilder& deadLetterTopic(const std::string& deadLetterTopic);
+
+    /**
+     * Set max redeliver count
+     *
+     * @param maxRedeliverCount Maximum number of times that a message will be redelivered before being sent
+     * to the dead letter queue.
+     * - The maxRedeliverCount must be greater than 0.
+     * - The default value is {INT_MAX} (DLQ is not enabled)
+     *
+     * @return
+     */
+    DeadLetterPolicyBuilder& maxRedeliverCount(int maxRedeliverCount);
+
+    /**
+     * Set initial subscription name
+     *
+     * @param initialSubscriptionName  Name of the initial subscription name of the dead letter topic.
+     * If this field is not set, the initial subscription for the dead letter topic will not be created.

Review Comment:
   ```suggestion
        * If this field is not set, the initial subscription for the dead letter topic is not created.
   ```



##########
include/pulsar/ConsumerConfiguration.h:
##########
@@ -398,6 +399,42 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     const BatchReceivePolicy& getBatchReceivePolicy() const;
 
+    /**
+     * Set dead letter policy for consumer
+     *
+     * By default some message will redelivery so many times possible, even to the extent that it can be never
+     * stop. By using dead letter mechanism messages will has the max redelivery count, when message exceeding
+     * the maximum number of redeliveries, message will send to the Dead Letter Topic and acknowledged
+     * automatic.
+     *
+     * You can enable the dead letter mechanism by setting dead letter policy.
+     * example:

Review Comment:
   ```suggestion
        * Example:
   ```



##########
include/pulsar/DeadLetterPolicyBuilder.h:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef DEAD_LETTER_POLICY_BUILD_HPP_
+#define DEAD_LETTER_POLICY_BUILD_HPP_
+
+#include <pulsar/DeadLetterPolicy.h>
+#include <pulsar/defines.h>
+
+#include <memory>
+
+namespace pulsar {
+
+struct DeadLetterPolicyImpl;
+
+/**
+ * The builder to build a DeadLetterPolicyBuilder
+ *
+ * Example of building DeadLetterPolicy:
+ *
+ * ```c++
+ * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+ *                       .deadLetterTopic("dlq-topic")
+ *                       .maxRedeliverCount(10)
+ *                       .initialSubscriptionName("init-sub-name")
+ *                       .build();
+ * ```
+ */
+class PULSAR_PUBLIC DeadLetterPolicyBuilder {
+   public:
+    DeadLetterPolicyBuilder();
+
+    /**
+     * Set dead letter topic.
+     *
+     * @param deadLetterTopic Name of the dead topic where the failing messages will be sent.
+     * The default value is: sourceTopicName + "-" + subscriptionName + "-DLQ"
+     *
+     * @return
+     */
+    DeadLetterPolicyBuilder& deadLetterTopic(const std::string& deadLetterTopic);
+
+    /**
+     * Set max redeliver count
+     *
+     * @param maxRedeliverCount Maximum number of times that a message will be redelivered before being sent
+     * to the dead letter queue.
+     * - The maxRedeliverCount must be greater than 0.
+     * - The default value is {INT_MAX} (DLQ is not enabled)
+     *
+     * @return
+     */
+    DeadLetterPolicyBuilder& maxRedeliverCount(int maxRedeliverCount);
+
+    /**
+     * Set initial subscription name
+     *
+     * @param initialSubscriptionName  Name of the initial subscription name of the dead letter topic.
+     * If this field is not set, the initial subscription for the dead letter topic will not be created.
+     * If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer
+     * will fail to be created.

Review Comment:
   ```suggestion
        * fails to be created.
   ```



##########
include/pulsar/ConsumerConfiguration.h:
##########
@@ -398,6 +399,42 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     const BatchReceivePolicy& getBatchReceivePolicy() const;
 
+    /**
+     * Set dead letter policy for consumer
+     *
+     * By default some message will redelivery so many times possible, even to the extent that it can be never
+     * stop. By using dead letter mechanism messages will has the max redelivery count, when message exceeding
+     * the maximum number of redeliveries, message will send to the Dead Letter Topic and acknowledged
+     * automatic.
+     *
+     * You can enable the dead letter mechanism by setting dead letter policy.

Review Comment:
   ```suggestion
        * You can enable the dead letter mechanism by setting the dead letter policy.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1062236859


##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // set ack timeout is 10 ms.
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    consumerImpl.unAckedMessageTrackerPtr_.reset(
+        new UnAckedMessageTrackerEnabled(10, PulsarFriend::getClientImplPtr(client), consumerImpl));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    sleep(2);
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    client.close();
+}
+
+TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic);
+
+    client.close();
+}
+
+class DeadLetterQueueTest : public ::testing::TestWithParam<std::tuple<bool, bool, ConsumerType>> {
+   public:
+    void SetUp() override {
+        isProducerBatch_ = std::get<0>(GetParam());
+        isMultiConsumer_ = std::get<1>(GetParam());
+        consumerType_ = std::get<2>(GetParam());
+        producerConf_ = ProducerConfiguration().setBatchingEnabled(isProducerBatch_);
+    }
+
+    void TearDown() override { client_.close(); }
+
+    void initTopic(std::string topicName) {
+        if (isMultiConsumer_) {
+            // call admin api to make it partitioned
+            std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
+            int res = makePutRequest(url, "5");
+            LOG_INFO("res = " << res);
+            ASSERT_FALSE(res != 204 && res != 409);
+        }
+    }
+
+   protected:
+    Client client_{lookupUrl};
+    ProducerConfiguration producerConf_;
+    bool isMultiConsumer_;
+    bool isProducerBatch_;
+    ConsumerType consumerType_;
+};
+
+TEST_P(DeadLetterQueueTest, testSendDLQTriggerByAckTimeOutAndNeAck) {
+    Client client(lookupUrl);
+    const std::string topic = "testSendDLQTriggerByAckTimeOut-" + std::to_string(time(nullptr)) +
+                              std::to_string(isMultiConsumer_) + std::to_string(isProducerBatch_) +
+                              std::to_string(consumerType_);
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = topic + "-" + subName + "-DLQ";
+    initTopic(topic);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(consumerType_);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Reset the unack timeout
+    long unackTimeOut = 200;
+    if (isMultiConsumer_) {
+        auto multiConsumer = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+        multiConsumer->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+            unackTimeOut, PulsarFriend::getClientImplPtr(client), *multiConsumer));
+        multiConsumer->consumers_.forEachValue([&client, unackTimeOut](ConsumerImplPtr consumer) {
+            consumer->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+                unackTimeOut, PulsarFriend::getClientImplPtr(client), *consumer));
+        });
+    } else {
+        auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+        consumerImpl.unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+            unackTimeOut, PulsarFriend::getClientImplPtr(client), consumerImpl));
+    }
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer));
+    const int num = 100;
+    Message msg;
+    for (int i = 0; i < num; ++i) {
+        msg = MessageBuilder()
+                  .setContent(std::to_string(i))
+                  .setPartitionKey("p-key")
+                  .setOrderingKey("o-key")
+                  .setProperty("pk-1", "pv-1")
+                  .build();
+        producer.sendAsync(msg, [](Result res, const MessageId &msgId) { ASSERT_EQ(res, ResultOk); });
+    }
+
+    // receive messages and don't ack.
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        // Randomly specify some messages manually negativeAcknowledge.
+        if (rand() % 2 == 0) {
+            consumer.negativeAcknowledge(msg);
+        }
+    }
+
+    // assert dlq msg.
+    Consumer deadLetterQueueConsumer;
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
+    ASSERT_EQ(ResultOk, client.subscribe(dlqTopic, "dlq-sub", dlqConsumerConfig, deadLetterQueueConsumer));
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_EQ(msg.getPartitionKey(), "p-key");
+        ASSERT_EQ(msg.getOrderingKey(), "o-key");
+        ASSERT_EQ(msg.getProperty("pk-1"), "pv-1");
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+
+    ASSERT_EQ(ResultTimeout, deadLetterQueueConsumer.receive(msg, 200));
+}
+
+TEST_P(DeadLetterQueueTest, testSendDLQTriggerByNegativeAcknowledge) {
+    Client client(lookupUrl);
+    const std::string topic = "testSendDLQTriggerByNegativeAcknowledge-" + std::to_string(time(nullptr)) +
+                              std::to_string(isMultiConsumer_) + std::to_string(isProducerBatch_) +
+                              std::to_string(consumerType_);
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = topic + subName + "DLQ";
+    initTopic(topic);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(consumerType_);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer));
+
+    const int num = 10;
+    Message msg;
+    for (int i = 0; i < num; ++i) {
+        msg = MessageBuilder()
+                  .setContent(std::to_string(i))
+                  .setPartitionKey("p-key")
+                  .setOrderingKey("o-key")
+                  .setProperty("pk-1", "pv-1")
+                  .build();
+        producer.sendAsync(msg, [](Result res, const MessageId &msgId) { ASSERT_EQ(res, ResultOk); });
+    }
+
+    // nack all msg.
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    Consumer deadLetterQueueConsumer;
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
+    ASSERT_EQ(ResultOk, client.subscribe(dlqTopic, "dlq-sub", dlqConsumerConfig, deadLetterQueueConsumer));
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_EQ(msg.getPartitionKey(), "p-key");
+        ASSERT_EQ(msg.getOrderingKey(), "o-key");
+        ASSERT_EQ(msg.getProperty("pk-1"), "pv-1");
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterQueueConsumer.receive(msg, 200));
+}
+
+TEST_P(DeadLetterQueueTest, testInitSubscription) {
+    Client client(lookupUrl);
+    const std::string topic = "testInitSubscription-" + std::to_string(time(nullptr)) +
+                              std::to_string(isMultiConsumer_) + std::to_string(isProducerBatch_) +
+                              std::to_string(consumerType_);
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = topic + subName + "DLQ";
+    const std::string dlqInitSub = "dlq-init-sub";
+    initTopic(topic);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .initialSubscriptionName(dlqInitSub)
+                         .deadLetterTopic(dlqTopic)
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(consumerType_);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    Consumer deadLetterQueueConsumer;
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
+    ASSERT_EQ(ResultOk, client.subscribe(dlqTopic, "dlq-sub", dlqConsumerConfig, deadLetterQueueConsumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer));
+
+    const int num = 10;
+    Message msg;
+    for (int i = 0; i < num; ++i) {
+        msg = MessageBuilder().setContent(std::to_string(i)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // nack all msg.
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // Use this subscription to ensure that messages are sent to the DLQ.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+
+    // If there is no initial subscription, then the subscription will not receive the DLQ messages sent
+    // before the subscription.
+    Consumer initDLQConsumer;
+    ConsumerConfiguration initDLQConsumerConfig;
+    dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionLatest);
+    ASSERT_EQ(ResultOk, client.subscribe(dlqTopic, dlqInitSub, initDLQConsumerConfig, initDLQConsumer));
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, initDLQConsumer.receive(msg, 1000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, initDLQConsumer.receive(msg, 200));
+}
+
+bool isBatchs[2] = {true, false};
+bool isMultiTopics[2] = {true, false};
+ConsumerType subTypes[2] = {ConsumerType::ConsumerShared, ConsumerType::ConsumerKeyShared};
+
+std::vector<std::tuple<bool, bool, ConsumerType>> getValues() {
+    std::vector<std::tuple<bool, bool, ConsumerType>> values;
+    for (const auto isBatch : isBatchs) {
+        for (const auto isMultiTopic : isMultiTopics) {
+            for (const auto subType : subTypes) {
+                values.emplace_back(std::make_tuple(isBatch, isMultiTopic, subType));
+            }
+        }
+    }
+    return values;
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, DeadLetterQueueTest, ::testing::ValuesIn(getValues()));

Review Comment:
   ```suggestion
   INSTANTIATE_TEST_SUITE_P(Pulsar, DeadLetterQueueTest,
                            testing::Combine(testing::Values(true, false), testing::Values(true, false),
                                             testing::Values(ConsumerType::ConsumerShared,
                                                             ConsumerType::ConsumerKeyShared)),
                            [](const testing::TestParamInfo<DeadLetterQueueTest::ParamType> &info) {
                                return "isBatch_" + std::to_string(std::get<0>(info.param)) + "_isMultiTopics_" +
                                       std::to_string(std::get<1>(info.param)) + "_subType_" +
                                       std::to_string(std::get<2>(info.param));
                            });
   ```
   
   P.S. The last argument here is optional. Without it, the tests will be:
   
   ```
   Pulsar/DeadLetterQueueTest.testInitSubscription/0
   Pulsar/DeadLetterQueueTest.testInitSubscription/1
   ...
   Pulsar/DeadLetterQueueTest.testInitSubscription/7
   ```
   
   With it, the tests wil be:
   
   ```
   Pulsar/DeadLetterQueueTest.testInitSubscription/isBatch_1_isMultiTopics_1_subType_1
   Pulsar/DeadLetterQueueTest.testInitSubscription/isBatch_1_isMultiTopics_1_subType_3
   Pulsar/DeadLetterQueueTest.testInitSubscription/isBatch_1_isMultiTopics_0_subType_1
   Pulsar/DeadLetterQueueTest.testInitSubscription/isBatch_1_isMultiTopics_0_subType_3
   Pulsar/DeadLetterQueueTest.testInitSubscription/isBatch_0_isMultiTopics_1_subType_1
   Pulsar/DeadLetterQueueTest.testInitSubscription/isBatch_0_isMultiTopics_1_subType_3
   Pulsar/DeadLetterQueueTest.testInitSubscription/isBatch_0_isMultiTopics_0_subType_1
   Pulsar/DeadLetterQueueTest.testInitSubscription/isBatch_0_isMultiTopics_0_subType_3
   ```
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1050474824


##########
include/pulsar/DeadLetterPolicyBuilder.h:
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef DEAD_LETTER_POLICY_BUILD_HPP_
+#define DEAD_LETTER_POLICY_BUILD_HPP_
+
+#include <pulsar/DeadLetterPolicy.h>
+#include <pulsar/defines.h>
+
+#include <memory>
+
+namespace pulsar {
+
+struct DeadLetterPolicyImpl;
+
+/**
+ * The builder to build a DeadLetterPolicyBuilder
+ *
+ * Example of building DeadLetterPolicy:
+ *
+ * ```c++
+ * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+ *                       .deadLetterTopic("dlq-topic")
+ *                       .maxRedeliverCount(10)
+ *                       .initialSubscriptionName("init-sub-name")
+ *                       .build();
+ * ```
+ */
+class PULSAR_PUBLIC DeadLetterPolicyBuilder {
+   public:
+    DeadLetterPolicyBuilder();
+
+    /**
+     * Set dead letter topic
+     *
+     * @return
+     */
+    DeadLetterPolicyBuilder& deadLetterTopic(const std::string& deadLetterTopic);
+
+    /**
+     * Set max redeliver count
+     *
+     * @return
+     */

Review Comment:
   When you add the API docs, please add the default value and explain the behavior. For example, from the code in `ConsumerImpl.cc` I see the dead letter policy only works when `maxRedeliverCount <= 0`. And what if users didn't set the dead letter topic? Please make API docs readable and meaningful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1062271300


##########
lib/ConsumerImpl.cc:
##########
@@ -1526,4 +1577,96 @@ void ConsumerImpl::cancelTimers() noexcept {
     checkExpiredChunkedTimer_->cancel(ec);
 }
 
+void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) {
+    auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId);
+    if (!messages) {
+        cb(false);
+        return;
+    }
+
+    // Initialize deadLetterProducer_
+    if (!deadLetterProducer_) {
+        Lock createLock(createProducerLock_);
+        if (!deadLetterProducer_) {
+            deadLetterProducer_ = std::make_shared<Promise<Result, Producer>>();
+            ProducerConfiguration producerConfiguration;
+            producerConfiguration.setSchema(config_.getSchema());
+            producerConfiguration.setBlockIfQueueFull(false);
+            if (!deadLetterPolicy_.getInitialSubscriptionName().empty()) {
+                producerConfiguration.setInitialSubscriptionName(
+                    deadLetterPolicy_.getInitialSubscriptionName());
+            }
+            ClientImplPtr client = client_.lock();
+            if (client) {
+                client->createProducerAsync(
+                    deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration,
+                    [this](Result res, Producer producer) {

Review Comment:
   We should catch `shared_from_this()` to avoid `this` pointer is expired.



##########
lib/ConsumerImpl.cc:
##########
@@ -1526,4 +1577,96 @@ void ConsumerImpl::cancelTimers() noexcept {
     checkExpiredChunkedTimer_->cancel(ec);
 }
 
+void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) {
+    auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId);
+    if (!messages) {
+        cb(false);
+        return;
+    }
+
+    // Initialize deadLetterProducer_
+    if (!deadLetterProducer_) {
+        Lock createLock(createProducerLock_);
+        if (!deadLetterProducer_) {
+            deadLetterProducer_ = std::make_shared<Promise<Result, Producer>>();
+            ProducerConfiguration producerConfiguration;
+            producerConfiguration.setSchema(config_.getSchema());
+            producerConfiguration.setBlockIfQueueFull(false);
+            if (!deadLetterPolicy_.getInitialSubscriptionName().empty()) {
+                producerConfiguration.setInitialSubscriptionName(
+                    deadLetterPolicy_.getInitialSubscriptionName());
+            }
+            ClientImplPtr client = client_.lock();
+            if (client) {
+                client->createProducerAsync(
+                    deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration,
+                    [this](Result res, Producer producer) {
+                        if (res == ResultOk) {
+                            deadLetterProducer_->setValue(producer);
+                        } else {
+                            LOG_ERROR("Dead letter producer create exception with topic: "
+                                      << deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res);
+                            deadLetterProducer_.reset();
+                        }
+                    });
+            } else {
+                LOG_WARN(getName() << "Client is destroyed and cannot create dead letter producer.");
+            }
+        }
+        createLock.unlock();

Review Comment:
   ```suggestion
   ```
   
   `createLock.unlock()` will be automatically called. And since you don't need to unlock before `createLock` goes out of scope, you can change its type to `std::lock_guard<std::mutex>` instead of the default `Lock` (`std::unique_lock`).



##########
lib/ConsumerImpl.cc:
##########
@@ -1526,4 +1577,96 @@ void ConsumerImpl::cancelTimers() noexcept {
     checkExpiredChunkedTimer_->cancel(ec);
 }
 
+void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) {
+    auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId);
+    if (!messages) {
+        cb(false);
+        return;
+    }
+
+    // Initialize deadLetterProducer_
+    if (!deadLetterProducer_) {
+        Lock createLock(createProducerLock_);
+        if (!deadLetterProducer_) {
+            deadLetterProducer_ = std::make_shared<Promise<Result, Producer>>();
+            ProducerConfiguration producerConfiguration;
+            producerConfiguration.setSchema(config_.getSchema());
+            producerConfiguration.setBlockIfQueueFull(false);
+            if (!deadLetterPolicy_.getInitialSubscriptionName().empty()) {
+                producerConfiguration.setInitialSubscriptionName(
+                    deadLetterPolicy_.getInitialSubscriptionName());
+            }
+            ClientImplPtr client = client_.lock();
+            if (client) {
+                client->createProducerAsync(
+                    deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration,
+                    [this](Result res, Producer producer) {
+                        if (res == ResultOk) {
+                            deadLetterProducer_->setValue(producer);
+                        } else {
+                            LOG_ERROR("Dead letter producer create exception with topic: "
+                                      << deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res);
+                            deadLetterProducer_.reset();
+                        }
+                    });
+            } else {
+                LOG_WARN(getName() << "Client is destroyed and cannot create dead letter producer.");
+            }
+        }
+        createLock.unlock();
+    }
+
+    for (const auto& message : messages.value()) {
+        auto self = get_shared_this_ptr();
+        deadLetterProducer_->getFuture().addListener([self, message, cb](Result res, Producer producer) {
+            auto originMessageId = message.getMessageId();
+            std::stringstream originMessageIdStr;
+            originMessageIdStr << originMessageId;
+            MessageBuilder msgBuilder;
+            msgBuilder.setAllocatedContent(const_cast<void*>(message.getData()), message.getLength())
+                .setProperties(message.getProperties())
+                .setProperty(PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr.str())
+                .setProperty(SYSTEM_PROPERTY_REAL_TOPIC, message.getTopicName());
+            if (message.hasPartitionKey()) {
+                msgBuilder.setPartitionKey(message.getPartitionKey());
+            }
+            if (message.hasOrderingKey()) {
+                msgBuilder.setOrderingKey(message.getOrderingKey());
+            }
+            producer.sendAsync(msgBuilder.build(), [self, originMessageId, cb](Result res,

Review Comment:
   I prefer catching a `weak_ptr` instead of a `shared_ptr` here because the pending sends could extend the lifetime of the consumer. If the consumer closed before the send is completed, the callback should not be called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1062311753


##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 1;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_TRUE(!msg.getDataAsString().empty());

Review Comment:
   Please change other places of `ASSERT_TRUE(!condition)` to `ASSERT_FALSE(condition)` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1065663539


##########
include/pulsar/ProducerConfiguration.h:
##########
@@ -532,11 +532,15 @@ class PULSAR_PUBLIC ProducerConfiguration {
      */
     ProducerAccessMode getAccessMode() const;
 
-    friend class PulsarWrapper;
-
    private:
-    struct Impl;
-    std::shared_ptr<ProducerConfigurationImpl> impl_;
+    typedef std::shared_ptr<ProducerConfigurationImpl> ProducerConfigurationImplPtr;
+    ProducerConfigurationImplPtr impl_;
+
+    ProducerConfiguration(ProducerConfigurationImplPtr& impl);

Review Comment:
   Pass a `std::shared_ptr` by const reference or value, don't pass it by non-const reference. Actually, I found this constructor is never used. And could you avoid other unrelated changes (`typedef`, moving the position of `friend`) as well?
   
   ```diff
   diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h
   index dc708e9..ca11bd0 100644
   --- a/include/pulsar/ProducerConfiguration.h
   +++ b/include/pulsar/ProducerConfiguration.h
   @@ -532,15 +532,12 @@ class PULSAR_PUBLIC ProducerConfiguration {
         */
        ProducerAccessMode getAccessMode() const;
   
   -   private:
   -    typedef std::shared_ptr<ProducerConfigurationImpl> ProducerConfigurationImplPtr;
   -    ProducerConfigurationImplPtr impl_;
   -
   -    ProducerConfiguration(ProducerConfigurationImplPtr& impl);
   -
        friend class PulsarWrapper;
   -    friend class ConsumerImpl;
        friend class ProducerImpl;
   +    friend class ConsumerImpl;
   +
   +   private:
   +    std::shared_ptr<ProducerConfigurationImpl> impl_;
    };
    }  // namespace pulsar
    #endif /* PULSAR_PRODUCERCONFIGURATION_H_ */
   diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc
   index 3c1bd24..0ee79f6 100644
   --- a/lib/ProducerConfiguration.cc
   +++ b/lib/ProducerConfiguration.cc
   @@ -32,8 +32,6 @@ ProducerConfiguration::~ProducerConfiguration() {}
   
    ProducerConfiguration::ProducerConfiguration(const ProducerConfiguration& x) : impl_(x.impl_) {}
   
   -ProducerConfiguration::ProducerConfiguration(ProducerConfigurationImplPtr& impl) : impl_(impl) {}
   -
    ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfiguration& x) {
        impl_ = x.impl_;
        return *this;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1065711546


##########
test-conf/standalone-ssl.conf:
##########
@@ -50,6 +50,9 @@ brokerShutdownTimeoutMs=3000
 # Enable backlog quota check. Enforces action on topic when the quota is reached
 backlogQuotaCheckEnabled=true
 
+# Disable schema validation: If a producer doesn’t carry a schema, the producer is allowed to connect to the topic and produce data.
+isSchemaValidationEnforced=true
+

Review Comment:
   It conflicts with https://github.com/apache/pulsar-client-cpp/pull/157, since this change is not required in this PR, I think you can revert it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#issuecomment-1377197163

   BTW, let's address comments in this PR in priority to #157.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1063259612


##########
lib/ConsumerImpl.cc:
##########
@@ -1526,4 +1577,96 @@ void ConsumerImpl::cancelTimers() noexcept {
     checkExpiredChunkedTimer_->cancel(ec);
 }
 
+void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) {
+    auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId);
+    if (!messages) {
+        cb(false);
+        return;
+    }
+
+    // Initialize deadLetterProducer_
+    if (!deadLetterProducer_) {
+        Lock createLock(createProducerLock_);
+        if (!deadLetterProducer_) {
+            deadLetterProducer_ = std::make_shared<Promise<Result, Producer>>();
+            ProducerConfiguration producerConfiguration;
+            producerConfiguration.setSchema(config_.getSchema());
+            producerConfiguration.setBlockIfQueueFull(false);
+            if (!deadLetterPolicy_.getInitialSubscriptionName().empty()) {
+                producerConfiguration.setInitialSubscriptionName(
+                    deadLetterPolicy_.getInitialSubscriptionName());
+            }
+            ClientImplPtr client = client_.lock();
+            if (client) {
+                client->createProducerAsync(
+                    deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration,
+                    [this](Result res, Producer producer) {
+                        if (res == ResultOk) {
+                            deadLetterProducer_->setValue(producer);
+                        } else {
+                            LOG_ERROR("Dead letter producer create exception with topic: "
+                                      << deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res);
+                            deadLetterProducer_.reset();
+                        }
+                    });
+            } else {
+                LOG_WARN(getName() << "Client is destroyed and cannot create dead letter producer.");
+            }
+        }
+        createLock.unlock();
+    }
+
+    for (const auto& message : messages.value()) {
+        auto self = get_shared_this_ptr();
+        deadLetterProducer_->getFuture().addListener([self, message, cb](Result res, Producer producer) {
+            auto originMessageId = message.getMessageId();
+            std::stringstream originMessageIdStr;
+            originMessageIdStr << originMessageId;
+            MessageBuilder msgBuilder;
+            msgBuilder.setAllocatedContent(const_cast<void*>(message.getData()), message.getLength())
+                .setProperties(message.getProperties())
+                .setProperty(PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr.str())
+                .setProperty(SYSTEM_PROPERTY_REAL_TOPIC, message.getTopicName());
+            if (message.hasPartitionKey()) {
+                msgBuilder.setPartitionKey(message.getPartitionKey());
+            }
+            if (message.hasOrderingKey()) {
+                msgBuilder.setOrderingKey(message.getOrderingKey());
+            }
+            producer.sendAsync(msgBuilder.build(), [self, originMessageId, cb](Result res,

Review Comment:
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1066823262


##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 10;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_FALSE(msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // set ack timeout is 10 ms.
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    consumerImpl.unAckedMessageTrackerPtr_.reset(
+        new UnAckedMessageTrackerEnabled(10, PulsarFriend::getClientImplPtr(client), consumerImpl));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    std::this_thread::sleep_for(std::chrono::milliseconds(200));
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    client.close();
+}
+
+TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic);
+
+    client.close();
+}
+
+class DeadLetterQueueTest : public ::testing::TestWithParam<std::tuple<bool, bool, ConsumerType>> {
+   public:
+    void SetUp() override {
+        isProducerBatch_ = std::get<0>(GetParam());
+        isMultiConsumer_ = std::get<1>(GetParam());
+        consumerType_ = std::get<2>(GetParam());
+        producerConf_ = ProducerConfiguration().setBatchingEnabled(isProducerBatch_);

Review Comment:
   Yes, I refactored it, PTAL.



##########
include/pulsar/ProducerConfiguration.h:
##########
@@ -532,11 +532,15 @@ class PULSAR_PUBLIC ProducerConfiguration {
      */
     ProducerAccessMode getAccessMode() const;
 
-    friend class PulsarWrapper;
-
    private:
-    struct Impl;
-    std::shared_ptr<ProducerConfigurationImpl> impl_;
+    typedef std::shared_ptr<ProducerConfigurationImpl> ProducerConfigurationImplPtr;
+    ProducerConfigurationImplPtr impl_;
+
+    ProducerConfiguration(ProducerConfigurationImplPtr& impl);

Review Comment:
   Yes, this #157 related modifications. I reverted it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1071989431


##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/ConsumerConfiguration.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ConsumerConfigurationImpl.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testDLQWithSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "test-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 10;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_FALSE(msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    // set ack timeout is 10 ms.
+    consumerConfig.impl_->unAckedMessagesTimeoutMs = 10;
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    std::this_thread::sleep_for(std::chrono::milliseconds(200));
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    client.close();
+}
+
+TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic);
+
+    client.close();
+}
+
+class DeadLetterQueueTest : public ::testing::TestWithParam<std::tuple<bool, bool, ConsumerType>> {
+   public:
+    void SetUp() override {
+        isProducerBatch_ = std::get<0>(GetParam());
+        isMultiConsumer_ = std::get<1>(GetParam());
+        consumerType_ = std::get<2>(GetParam());
+
+        std::string testSuiteName = testing::UnitTest::GetInstance()->current_test_info()->name();
+        replace(testSuiteName.begin(), testSuiteName.end(), '/', '_');

Review Comment:
   ```suggestion
           std::replace(testSuiteName.begin(), testSuiteName.end(), '/', '_');
   ```



##########
lib/Commands.h:
##########
@@ -110,7 +110,8 @@ class Commands {
                                     const std::map<std::string, std::string>& metadata,
                                     const SchemaInfo& schemaInfo, uint64_t epoch,
                                     bool userProvidedProducerName, bool encrypted,
-                                    ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch);
+                                    ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
+                                    std::string initialSubscriptionName);

Review Comment:
   Pass a `std::string` by const reference (`const std::string&`), not value.



##########
lib/ConsumerImpl.cc:
##########
@@ -1526,4 +1593,108 @@ void ConsumerImpl::cancelTimers() noexcept {
     checkExpiredChunkedTimer_->cancel(ec);
 }
 
+void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) {
+    auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId);
+    if (!messages) {
+        cb(false);
+        return;
+    }
+
+    // Initialize deadLetterProducer_
+    if (!deadLetterProducer_) {
+        std::lock_guard<std::mutex> createLock(createProducerLock_);
+        if (!deadLetterProducer_) {
+            deadLetterProducer_ = std::make_shared<Promise<Result, Producer>>();
+            ProducerConfiguration producerConfiguration;
+            producerConfiguration.setSchema(config_.getSchema());
+            producerConfiguration.setBlockIfQueueFull(false);
+            producerConfiguration.impl_->setInitialSubscriptionName(
+                deadLetterPolicy_.getInitialSubscriptionName());
+            ClientImplPtr client = client_.lock();
+            if (client) {
+                auto self = get_shared_this_ptr();
+                client->createProducerAsync(
+                    deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration,
+                    [self](Result res, Producer producer) {
+                        if (res == ResultOk) {
+                            self->deadLetterProducer_->setValue(producer);
+                        } else {
+                            LOG_ERROR("Dead letter producer create exception with topic: "
+                                      << self->deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res);
+                            self->deadLetterProducer_.reset();
+                        }
+                    });
+            } else {
+                LOG_WARN(getName() << "Client is destroyed and cannot create dead letter producer.");

Review Comment:
   In this case, we should skip the code below because `deadLetterProducer_->getFuture()` will never complete.



##########
tests/DeadLetterQueueTest.cc:
##########
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/ConsumerConfiguration.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ConsumerConfigurationImpl.h"
+#include "lib/LogUtils.h"
+#include "lib/MessageIdUtil.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+TEST(DeadLetterQueueTest, testDLQWithSchema) {
+    Client client(lookupUrl);
+    const std::string topic = "testDLQWithSchema-" + std::to_string(time(nullptr));
+    const std::string subName = "test-sub";
+
+    static const std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
+
+    auto dlqPolicy = DeadLetterPolicyBuilder()
+                         .maxRedeliverCount(3)
+                         .deadLetterTopic(topic + subName + "-DLQ")
+                         .initialSubscriptionName("init-sub")
+                         .build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    consumerConfig.setSchema(schemaInfo);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
+    ConsumerConfiguration dlqConsumerConfig;
+    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    dlqConsumerConfig.setSchema(schemaInfo);
+    Consumer deadLetterConsumer;
+    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
+                                         deadLetterConsumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setSchema(schemaInfo);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string data = "{\"re\":2.1,\"im\":1.23}";
+    const int num = 10;
+    for (int i = 0; i < num; ++i) {
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
+    }
+
+    // nack all msg.
+    Message msg;
+    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    // assert dlq msg.
+    for (int i = 0; i < num; i++) {
+        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
+        ASSERT_FALSE(msg.getDataAsString().empty());
+        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
+        ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
+    }
+    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));
+
+    client.close();
+}
+
+// If the user never receives this message, the message should not be delivered to the DLQ.
+TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
+    Client client(lookupUrl);
+    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    // set ack timeout is 10 ms.
+    consumerConfig.impl_->unAckedMessagesTimeoutMs = 10;
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    // Wait a while, message should not be send to DLQ
+    std::this_thread::sleep_for(std::chrono::milliseconds(200));
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    client.close();
+}
+
+TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) {
+    Client client(lookupUrl);
+    const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr));
+    const std::string subName = "dlq-sub";
+    const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ";
+    auto dlqPolicy =
+        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setDeadLetterPolicy(dlqPolicy);
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+
+    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic);
+
+    client.close();
+}
+
+class DeadLetterQueueTest : public ::testing::TestWithParam<std::tuple<bool, bool, ConsumerType>> {
+   public:
+    void SetUp() override {
+        isProducerBatch_ = std::get<0>(GetParam());
+        isMultiConsumer_ = std::get<1>(GetParam());
+        consumerType_ = std::get<2>(GetParam());
+
+        std::string testSuiteName = testing::UnitTest::GetInstance()->current_test_info()->name();
+        replace(testSuiteName.begin(), testSuiteName.end(), '/', '_');
+        topic_ = testSuiteName + std::to_string(time(nullptr));
+        subName_ = "test-sub";
+        dlqTopic_ = topic_ + "-" + subName_ + "-DLQ";
+
+        if (isMultiConsumer_) {
+            // call admin api to make it partitioned
+            std::string url = adminUrl + "admin/v2/persistent/public/default/" + topic_ + "/partitions";
+            int res = makePutRequest(url, "5");
+            LOG_INFO("res = " << res);
+            ASSERT_FALSE(res != 204 && res != 409);
+        }
+
+        producerConf_.setBatchingEnabled(isProducerBatch_);
+        consumerConf_.setConsumerType(consumerType_);
+        consumerConf_.setDeadLetterPolicy(
+            DeadLetterPolicyBuilder().maxRedeliverCount(3).deadLetterTopic(dlqTopic_).build());
+    }
+
+    void setConsumerUnAckMessageTimeoutMs(int unAckedMessagesTimeoutMs) {
+        consumerConf_.impl_->unAckedMessagesTimeoutMs = unAckedMessagesTimeoutMs;
+    }
+
+    void TearDown() override { client_.close(); }
+
+   protected:
+    Client client_{lookupUrl};
+    ProducerConfiguration producerConf_;
+    ConsumerConfiguration consumerConf_;
+    bool isMultiConsumer_;
+    bool isProducerBatch_;
+    ConsumerType consumerType_;

Review Comment:
   Change them to local variables because they are only used in `SetUp()`.



##########
lib/ConsumerImpl.cc:
##########
@@ -1220,7 +1287,7 @@ void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
             LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for Consumer - " << getConsumerId());
         }
     } else {
-        LOG_DEBUG("Connection not ready for Consumer - " << getConsumerId());
+        LOG_WARN("Connection not ready for Consumer - " << getConsumerId());

Review Comment:
   Please revert this unrelated change. It should not be a warning log when a `shared_ptr` is expired.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#issuecomment-1385127794

   I have left the last comments. This PR overall LGTM. Please avoid committing such a huge patch next time. If a PR contains many code changes, it would be very hard to review all of them. Some potential issues might not be exposed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#issuecomment-1371896577

   @Anonymitaet Could you help review the documents part (i.e. the API docs in `include/pulsar/*.h`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1062215072


##########
include/pulsar/ProducerConfiguration.h:
##########
@@ -532,6 +532,18 @@ class PULSAR_PUBLIC ProducerConfiguration {
      */
     ProducerAccessMode getAccessMode() const;
 
+    /**
+     * Use this configuration to automatically create an initial subscription when creating a topic.
+     *
+     * If this field is not set, the initial subscription is not created.
+     */
+    ProducerConfiguration& setInitialSubscriptionName(const std::string& initialSubscriptionName);
+
+    /**
+     * Get initial subscription name.
+     */
+    const std::string& getInitialSubscriptionName() const;
+

Review Comment:
   These methods should not be exposed to users, see https://github.com/apache/pulsar/pull/13355/files. We should access `ProducerConfigurationImpl::initialSubscriptionName`  directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1062248819


##########
lib/ConsumerImpl.cc:
##########
@@ -527,6 +544,10 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
                                 << startMessageId.value());
             return;
         }
+        if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
+            possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector<Message>{m});
+            increaseAvailablePermits(cnx);

Review Comment:
   ```suggestion
               if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
                   increaseAvailablePermits(cnx);
               }
   ```
   
   See the Java implementation [here](https://github.com/apache/pulsar/pull/17287/files#diff-f6e4c1c4091aa10525f331e48e66b29f22b9f7987755c1b4fb887e24f198bed6).
   
   And should we call `redeliverUnacknowledgedMessages` here as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #139: [feat] Support Dead Letter Topic.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1050498383


##########
tests/ConsumerConfigurationTest.cc:
##########
@@ -20,12 +20,15 @@
 #include <lib/LogUtils.h>
 #include <pulsar/Client.h>
 
+#include <climits>
+
 #include "NoOpsCryptoKeyReader.h"
 
 DECLARE_LOG_OBJECT()
 
 #include "../lib/Future.h"
 #include "../lib/Utils.h"
+#include "pulsar/DeadLetterPolicyBuilder.h"

Review Comment:
   ```suggestion
   #include <pulsar/DeadLetterPolicyBuilder.h>
   ```
   
   Make include style consistent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org