You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "BewareMyPower (via GitHub)" <gi...@apache.org> on 2023/02/16 07:01:57 UTC

[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #160: [fix] Redeliver messages that can't be decrypted.

BewareMyPower commented on code in PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160#discussion_r1108076041


##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf//private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+    int numberOfMessages = 20;
+    std::string msgContent = "msg-content";
+    Message msg;
+    for (int i = 0; i < numberOfMessages; i++) {
+        std::stringstream stream;
+        stream << msgContent << i;
+        msg = MessageBuilder().setContent(stream.str()).build();

Review Comment:
   We can concentrate an integer and a string by converting the integer to string, e.g.
   
   ```c++
            msg = MessageBuilder().setContent(msgContent + std::to_string(i)).build();
   ```
   
   The code above could simplify the code.



##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf//private-key.client-rsa.pem";

Review Comment:
   ```suggestion
       std::string PUBLIC_CERT_FILE_PATH = "../test-conf/public-key.client-rsa.pem";
       std::string PRIVATE_CERT_FILE_PATH = "../test-conf/private-key.client-rsa.pem";
   ```



##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = "../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = "../test-conf//private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));

Review Comment:
   Do we need another consumer for this test? I think only one consumer is enough to verify the case that messages cannot be decrypted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org