You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:57:12 UTC

[pulsar] 18/27: [Issue 9495][c++ client] add 'encrypted' option in commands.newproducer() (#9542)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c9752435fe7936382ff5037bff115ab3b657fc5c
Author: wangyuwei <wa...@protonmail.ch>
AuthorDate: Thu Feb 11 11:37:22 2021 +0800

    [Issue 9495][c++ client] add 'encrypted' option in commands.newproducer() (#9542)
    
    
    Fixes #9495
    
    
    
    ### Modifications
    
    pass an option if encrption is enabled in producer when `commands.newProducer`
    
    
    ### Verifying this change
    
    redo the sample in issue #9495
    
    
    (cherry picked from commit f65b29701d7aad3b0a4c8e18c1f9392de1994263)
---
 pulsar-client-cpp/lib/Commands.cc              |  3 +-
 pulsar-client-cpp/lib/Commands.h               |  2 +-
 pulsar-client-cpp/lib/ProducerImpl.cc          |  6 +-
 pulsar-client-cpp/pulsar-test-service-start.sh |  8 +++
 pulsar-client-cpp/tests/BasicEndToEndTest.cc   | 79 +++++++++++++-------------
 5 files changed, 55 insertions(+), 43 deletions(-)

diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 239b9f0..364ea30 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -319,7 +319,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
                                    const std::string& producerName, uint64_t requestId,
                                    const std::map<std::string, std::string>& metadata,
                                    const SchemaInfo& schemaInfo, uint64_t epoch,
-                                   bool userProvidedProducerName) {
+                                   bool userProvidedProducerName, bool encrypted) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::PRODUCER);
     CommandProducer* producer = cmd.mutable_producer();
@@ -328,6 +328,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
     producer->set_request_id(requestId);
     producer->set_epoch(epoch);
     producer->set_user_provided_producer_name(userProvidedProducerName);
+    producer->set_encrypted(encrypted);
 
     for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
          it++) {
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 18f0049..bbe2a7c 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -96,7 +96,7 @@ class Commands {
                                     const std::string& producerName, uint64_t requestId,
                                     const std::map<std::string, std::string>& metadata,
                                     const SchemaInfo& schemaInfo, uint64_t epoch,
-                                    bool userProvidedProducerName);
+                                    bool userProvidedProducerName, bool encrypted);
 
     static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
                                proto::CommandAck_AckType ackType, int validationError);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 232708d..fcd02b5 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -142,9 +142,9 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
 
-    SharedBuffer cmd =
-        Commands::newProducer(topic_, producerId_, producerName_, requestId, conf_.getProperties(),
-                              conf_.getSchema(), epoch_, userProvidedProducerName_);
+    SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId,
+                                             conf_.getProperties(), conf_.getSchema(), epoch_,
+                                             userProvidedProducerName_, conf_.isEncryptionEnabled());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx,
                                std::placeholders::_1, std::placeholders::_2));
diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh b/pulsar-client-cpp/pulsar-test-service-start.sh
index 89b46df..d6da230 100755
--- a/pulsar-client-cpp/pulsar-test-service-start.sh
+++ b/pulsar-client-cpp/pulsar-test-service-start.sh
@@ -98,6 +98,14 @@ $PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-3 \
                         --actions produce,consume \
                         --role "anonymous"
 
+# Create "public/default-4" with encryption required
+$PULSAR_DIR/bin/pulsar-admin namespaces create public/default-4 \
+                        --clusters standalone
+$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-4 \
+                        --actions produce,consume \
+                        --role "anonymous"
+$PULSAR_DIR/bin/pulsar-admin namespaces set-encryption-required public/default-4 -e
+
 # Create "private" tenant
 $PULSAR_DIR/bin/pulsar-admin tenants create private -r "" -c "standalone"
 
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index bf9479f..255a563 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1284,7 +1284,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
 TEST(BasicEndToEndTest, testRSAEncryption) {
     ClientConfiguration config;
     Client client(lookupUrl);
-    std::string topicName = "my-rsaenctopic";
+    std::string topicNames[] = {"my-rsaenctopic", "persistent://public/default-4/my-rsaenctopic"};
     std::string subName = "my-sub-name";
     Producer producer;
 
@@ -1301,48 +1301,51 @@ TEST(BasicEndToEndTest, testRSAEncryption) {
     conf.addEncryptionKey("client-rsa.pem");
     conf.setCryptoKeyReader(keyReader);
 
-    Promise<Result, Producer> producerPromise;
-    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
-    Future<Result, Producer> producerFuture = producerPromise.getFuture();
-    Result result = producerFuture.get(producer);
-    ASSERT_EQ(ResultOk, result);
+    for (const auto &topicName : topicNames) {
+        Promise<Result, Producer> producerPromise;
+        client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
+        Future<Result, Producer> producerFuture = producerPromise.getFuture();
+        Result result = producerFuture.get(producer);
+        ASSERT_EQ(ResultOk, result);
 
-    ConsumerConfiguration consConfig;
-    consConfig.setCryptoKeyReader(keyReader);
-    // consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
+        ConsumerConfiguration consConfig;
+        consConfig.setCryptoKeyReader(keyReader);
+        // consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
 
-    Consumer consumer;
-    Promise<Result, Consumer> consumerPromise;
-    client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
-    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
-    result = consumerFuture.get(consumer);
-    ASSERT_EQ(ResultOk, result);
+        Consumer consumer;
+        Promise<Result, Consumer> consumerPromise;
+        client.subscribeAsync(topicName, subName, consConfig,
+                              WaitForCallbackValue<Consumer>(consumerPromise));
+        Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+        result = consumerFuture.get(consumer);
+        ASSERT_EQ(ResultOk, result);
 
-    // Send 1000 messages synchronously
-    std::string msgContent = "msg-content";
-    LOG_INFO("Publishing 1000 messages synchronously");
-    int msgNum = 0;
-    for (; msgNum < 1000; msgNum++) {
-        std::stringstream stream;
-        stream << msgContent << msgNum;
-        Message msg = MessageBuilder().setContent(stream.str()).build();
-        ASSERT_EQ(ResultOk, producer.send(msg));
-    }
+        // Send 1000 messages synchronously
+        std::string msgContent = "msg-content";
+        LOG_INFO("Publishing 1000 messages synchronously");
+        int msgNum = 0;
+        for (; msgNum < 1000; msgNum++) {
+            std::stringstream stream;
+            stream << msgContent << msgNum;
+            Message msg = MessageBuilder().setContent(stream.str()).build();
+            ASSERT_EQ(ResultOk, producer.send(msg));
+        }
 
-    LOG_INFO("Trying to receive 1000 messages");
-    Message msgReceived;
-    for (msgNum = 0; msgNum < 1000; msgNum++) {
-        consumer.receive(msgReceived, 1000);
-        LOG_DEBUG("Received message :" << msgReceived.getMessageId());
-        std::stringstream expected;
-        expected << msgContent << msgNum;
-        ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
-        ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
-    }
+        LOG_INFO("Trying to receive 1000 messages");
+        Message msgReceived;
+        for (msgNum = 0; msgNum < 1000; msgNum++) {
+            consumer.receive(msgReceived, 1000);
+            LOG_DEBUG("Received message :" << msgReceived.getMessageId());
+            std::stringstream expected;
+            expected << msgContent << msgNum;
+            ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
+            ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
+        }
 
-    ASSERT_EQ(ResultOk, consumer.unsubscribe());
-    ASSERT_EQ(ResultAlreadyClosed, consumer.close());
-    ASSERT_EQ(ResultOk, producer.close());
+        ASSERT_EQ(ResultOk, consumer.unsubscribe());
+        ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+        ASSERT_EQ(ResultOk, producer.close());
+    }
     ASSERT_EQ(ResultOk, client.close());
 }