You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/29 01:05:55 UTC

[GitHub] [pulsar-client-cpp] shibd opened a new pull request, #160: [fix] Redeliver messages that can't be decrypted.

shibd opened a new pull request, #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160

   Master Issue: https://github.com/apache/pulsar/pull/3097
   
   ### Motivation
   Customers who can't decrypt their message must be given a chance to re-decrypt the message if redelivery is enabled.
   
   ### Modifications
   - When the decrypted message fails, add to `unAckedMessageTrackerPtr_` to wait for redelivery.
   
   ### Verifying this change
    - Added `testRedeliveryOfDecryptionFailedMessages` to cover it.
    
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #160: [fix] Redeliver messages that can't be decrypted.

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160#discussion_r1108076041


##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf//private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+    int numberOfMessages = 20;
+    std::string msgContent = "msg-content";
+    Message msg;
+    for (int i = 0; i < numberOfMessages; i++) {
+        std::stringstream stream;
+        stream << msgContent << i;
+        msg = MessageBuilder().setContent(stream.str()).build();

Review Comment:
   We can concentrate an integer and a string by converting the integer to string, e.g.
   
   ```c++
            msg = MessageBuilder().setContent(msgContent + std::to_string(i)).build();
   ```
   
   The code above could simplify the code.



##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf//private-key.client-rsa.pem";

Review Comment:
   ```suggestion
       std::string PUBLIC_CERT_FILE_PATH = "../test-conf/public-key.client-rsa.pem";
       std::string PRIVATE_CERT_FILE_PATH = "../test-conf/private-key.client-rsa.pem";
   ```



##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf//private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));

Review Comment:
   Do we need another consumer for this test? I think only one consumer is enough to verify the case that messages cannot be decrypted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #160: [fix] Redeliver messages that can't be decrypted.

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160#discussion_r1113050168


##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,91 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf/public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf/private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+    int numberOfMessages = 20;
+    std::string msgContent = "msg-content";
+    Message msg;
+    for (int i = 0; i < numberOfMessages; i++) {
+        msg = MessageBuilder().setContent(msgContent + std::to_string(i)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // Consuming from consumer 2 and 3
+    // no message should be returned since they can't decrypt the message
+    ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+    ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+    // All messages would be received by consumer 1
+    std::set<std::string> valuesSent;
+    for (int i = 0; i < numberOfMessages; i++) {
+        auto value = msgContent + std::to_string(i);
+        valuesSent.emplace(value);
+        msg = MessageBuilder().setContent(value).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // All messages would be received by consumer 1
+    std::set<std::string> valuesReceived;
+    for (int i = 0; i < numberOfMessages; i++) {
+        ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000));
+        ASSERT_EQ(ResultOk, consumer1.acknowledge(msg));
+        valuesReceived.emplace(msg.getDataAsString());
+    }
+    ASSERT_EQ(valuesSent, valuesReceived);
+
+    // Consuming from consumer 2 and 3 again just to be sure
+    // no message should be returned since they can't decrypt the message
+    ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+    ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+    ASSERT_EQ(ResultOk, client.close());
+}

Review Comment:
   When `consumer2` and `consumer3` receive a msg and the decrypted message failed, it will be added to `unAckedMessageTracker` and redelivery. So, just `consumer1` will receive all messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #160: [fix] Redeliver messages that can't be decrypted.

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie commented on code in PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160#discussion_r1112867142


##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,91 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf/public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf/private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+    int numberOfMessages = 20;
+    std::string msgContent = "msg-content";
+    Message msg;
+    for (int i = 0; i < numberOfMessages; i++) {
+        msg = MessageBuilder().setContent(msgContent + std::to_string(i)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // Consuming from consumer 2 and 3
+    // no message should be returned since they can't decrypt the message
+    ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+    ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+    // All messages would be received by consumer 1
+    std::set<std::string> valuesSent;
+    for (int i = 0; i < numberOfMessages; i++) {
+        auto value = msgContent + std::to_string(i);
+        valuesSent.emplace(value);
+        msg = MessageBuilder().setContent(value).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // All messages would be received by consumer 1
+    std::set<std::string> valuesReceived;
+    for (int i = 0; i < numberOfMessages; i++) {
+        ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000));
+        ASSERT_EQ(ResultOk, consumer1.acknowledge(msg));
+        valuesReceived.emplace(msg.getDataAsString());
+    }
+    ASSERT_EQ(valuesSent, valuesReceived);
+
+    // Consuming from consumer 2 and 3 again just to be sure
+    // no message should be returned since they can't decrypt the message
+    ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+    ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+    ASSERT_EQ(ResultOk, client.close());
+}

Review Comment:
   Where do we verify the message is added to unAckedMessageTrackerPtr_ correctly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #160: [fix] Redeliver messages that can't be decrypted.

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160#discussion_r1108246157


##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf//private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));

Review Comment:
   Yes, `consumer2` is set the `NoOpsCryptoKeyReader` to cover [change](https://github.com/apache/pulsar-client-cpp/pull/160/files#diff-102603faf4c8eaedb96f25866948c8b38eaf6064f1a77410572a46bec3f48d8dR712-R713), and `consumer3` is not set CryptoKeyReader to cover [change](https://github.com/apache/pulsar-client-cpp/pull/160/files#diff-102603faf4c8eaedb96f25866948c8b38eaf6064f1a77410572a46bec3f48d8dR690-R691)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #160: [fix] Redeliver messages that can't be decrypted.

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160#discussion_r1108087040


##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf//private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+    int numberOfMessages = 20;
+    std::string msgContent = "msg-content";
+    Message msg;
+    for (int i = 0; i < numberOfMessages; i++) {
+        std::stringstream stream;
+        stream << msgContent << i;
+        msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // Consuming from consumer 2 and 3
+    // no message should be returned since they can't decrypt the message
+    ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+    ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+    // All messages would be received by consumer 1
+    std::unordered_set<std::string> receivedMsgs;
+    for (int i = 0; i < numberOfMessages; i++) {
+        ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000));
+        ASSERT_EQ(ResultOk, consumer1.acknowledge(msg));
+        receivedMsgs.insert(msg.getDataAsString());
+    }
+
+    ASSERT_EQ(receivedMsgs.size(), numberOfMessages);
+    for (int i = 0; i < numberOfMessages; i++) {
+        std::stringstream expected;
+        expected << msgContent << i;
+        ASSERT_TRUE(receivedMsgs.count(expected.str()));

Review Comment:
   It's better not to convert from `size_t` to `bool` immediately, i.e. check `xxx.count(key) > 0`.
   
   BTW, in this case, we can store the messages in an ordered set and then just compare the sent messages and received messages.
   
   ```c++
       std::set<std::string> valuesSent;
       for (int i = 0; i < numberOfMessages; i++) {
           auto value = msgContent + std::to_string(i);
           valuesSent.emplace(value);
           msg = MessageBuilder().setContent(value).build();
           ASSERT_EQ(ResultOk, producer.send(msg));
       }
   
       /* ... */
   
       // All messages would be received by consumer 1
       std::set<std::string> valuesReceived;
       for (int i = 0; i < numberOfMessages; i++) {
           ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000));
           ASSERT_EQ(ResultOk, consumer1.acknowledge(msg));
           valuesReceived.emplace(msg.getDataAsString());
       }
       ASSERT_EQ(valuesSent, valuesReceived);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #160: [fix] Redeliver messages that can't be decrypted.

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160#discussion_r1113050168


##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,91 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf/public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf/private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+    int numberOfMessages = 20;
+    std::string msgContent = "msg-content";
+    Message msg;
+    for (int i = 0; i < numberOfMessages; i++) {
+        msg = MessageBuilder().setContent(msgContent + std::to_string(i)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // Consuming from consumer 2 and 3
+    // no message should be returned since they can't decrypt the message
+    ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+    ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+    // All messages would be received by consumer 1
+    std::set<std::string> valuesSent;
+    for (int i = 0; i < numberOfMessages; i++) {
+        auto value = msgContent + std::to_string(i);
+        valuesSent.emplace(value);
+        msg = MessageBuilder().setContent(value).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // All messages would be received by consumer 1
+    std::set<std::string> valuesReceived;
+    for (int i = 0; i < numberOfMessages; i++) {
+        ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000));
+        ASSERT_EQ(ResultOk, consumer1.acknowledge(msg));
+        valuesReceived.emplace(msg.getDataAsString());
+    }
+    ASSERT_EQ(valuesSent, valuesReceived);
+
+    // Consuming from consumer 2 and 3 again just to be sure
+    // no message should be returned since they can't decrypt the message
+    ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+    ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+    ASSERT_EQ(ResultOk, client.close());
+}

Review Comment:
   When `consumer2` and `consumer3` receive a msg and the decrypted message failed, it will be added to `unAckedMessageTracker` and redeliver. So, just `consumer1` will receive all messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd merged pull request #160: [fix] Redeliver messages that can't be decrypted.

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd merged PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org