You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:57:12 UTC
[pulsar] 18/27: [Issue 9495][c++ client] add 'encrypted' option in
commands.newproducer() (#9542)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c9752435fe7936382ff5037bff115ab3b657fc5c
Author: wangyuwei <wa...@protonmail.ch>
AuthorDate: Thu Feb 11 11:37:22 2021 +0800
[Issue 9495][c++ client] add 'encrypted' option in commands.newproducer() (#9542)
Fixes #9495
### Modifications
pass an option if encrption is enabled in producer when `commands.newProducer`
### Verifying this change
redo the sample in issue #9495
(cherry picked from commit f65b29701d7aad3b0a4c8e18c1f9392de1994263)
---
pulsar-client-cpp/lib/Commands.cc | 3 +-
pulsar-client-cpp/lib/Commands.h | 2 +-
pulsar-client-cpp/lib/ProducerImpl.cc | 6 +-
pulsar-client-cpp/pulsar-test-service-start.sh | 8 +++
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 79 +++++++++++++-------------
5 files changed, 55 insertions(+), 43 deletions(-)
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 239b9f0..364ea30 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -319,7 +319,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
const std::string& producerName, uint64_t requestId,
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
- bool userProvidedProducerName) {
+ bool userProvidedProducerName, bool encrypted) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
@@ -328,6 +328,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
producer->set_request_id(requestId);
producer->set_epoch(epoch);
producer->set_user_provided_producer_name(userProvidedProducerName);
+ producer->set_encrypted(encrypted);
for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
it++) {
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 18f0049..bbe2a7c 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -96,7 +96,7 @@ class Commands {
const std::string& producerName, uint64_t requestId,
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
- bool userProvidedProducerName);
+ bool userProvidedProducerName, bool encrypted);
static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
proto::CommandAck_AckType ackType, int validationError);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 232708d..fcd02b5 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -142,9 +142,9 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
- SharedBuffer cmd =
- Commands::newProducer(topic_, producerId_, producerName_, requestId, conf_.getProperties(),
- conf_.getSchema(), epoch_, userProvidedProducerName_);
+ SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId,
+ conf_.getProperties(), conf_.getSchema(), epoch_,
+ userProvidedProducerName_, conf_.isEncryptionEnabled());
cnx->sendRequestWithId(cmd, requestId)
.addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx,
std::placeholders::_1, std::placeholders::_2));
diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh b/pulsar-client-cpp/pulsar-test-service-start.sh
index 89b46df..d6da230 100755
--- a/pulsar-client-cpp/pulsar-test-service-start.sh
+++ b/pulsar-client-cpp/pulsar-test-service-start.sh
@@ -98,6 +98,14 @@ $PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-3 \
--actions produce,consume \
--role "anonymous"
+# Create "public/default-4" with encryption required
+$PULSAR_DIR/bin/pulsar-admin namespaces create public/default-4 \
+ --clusters standalone
+$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-4 \
+ --actions produce,consume \
+ --role "anonymous"
+$PULSAR_DIR/bin/pulsar-admin namespaces set-encryption-required public/default-4 -e
+
# Create "private" tenant
$PULSAR_DIR/bin/pulsar-admin tenants create private -r "" -c "standalone"
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index bf9479f..255a563 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1284,7 +1284,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
TEST(BasicEndToEndTest, testRSAEncryption) {
ClientConfiguration config;
Client client(lookupUrl);
- std::string topicName = "my-rsaenctopic";
+ std::string topicNames[] = {"my-rsaenctopic", "persistent://public/default-4/my-rsaenctopic"};
std::string subName = "my-sub-name";
Producer producer;
@@ -1301,48 +1301,51 @@ TEST(BasicEndToEndTest, testRSAEncryption) {
conf.addEncryptionKey("client-rsa.pem");
conf.setCryptoKeyReader(keyReader);
- Promise<Result, Producer> producerPromise;
- client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
- Future<Result, Producer> producerFuture = producerPromise.getFuture();
- Result result = producerFuture.get(producer);
- ASSERT_EQ(ResultOk, result);
+ for (const auto &topicName : topicNames) {
+ Promise<Result, Producer> producerPromise;
+ client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
+ Future<Result, Producer> producerFuture = producerPromise.getFuture();
+ Result result = producerFuture.get(producer);
+ ASSERT_EQ(ResultOk, result);
- ConsumerConfiguration consConfig;
- consConfig.setCryptoKeyReader(keyReader);
- // consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
+ ConsumerConfiguration consConfig;
+ consConfig.setCryptoKeyReader(keyReader);
+ // consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
- Consumer consumer;
- Promise<Result, Consumer> consumerPromise;
- client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
- Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
- result = consumerFuture.get(consumer);
- ASSERT_EQ(ResultOk, result);
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicName, subName, consConfig,
+ WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
- // Send 1000 messages synchronously
- std::string msgContent = "msg-content";
- LOG_INFO("Publishing 1000 messages synchronously");
- int msgNum = 0;
- for (; msgNum < 1000; msgNum++) {
- std::stringstream stream;
- stream << msgContent << msgNum;
- Message msg = MessageBuilder().setContent(stream.str()).build();
- ASSERT_EQ(ResultOk, producer.send(msg));
- }
+ // Send 1000 messages synchronously
+ std::string msgContent = "msg-content";
+ LOG_INFO("Publishing 1000 messages synchronously");
+ int msgNum = 0;
+ for (; msgNum < 1000; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
- LOG_INFO("Trying to receive 1000 messages");
- Message msgReceived;
- for (msgNum = 0; msgNum < 1000; msgNum++) {
- consumer.receive(msgReceived, 1000);
- LOG_DEBUG("Received message :" << msgReceived.getMessageId());
- std::stringstream expected;
- expected << msgContent << msgNum;
- ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
- ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
- }
+ LOG_INFO("Trying to receive 1000 messages");
+ Message msgReceived;
+ for (msgNum = 0; msgNum < 1000; msgNum++) {
+ consumer.receive(msgReceived, 1000);
+ LOG_DEBUG("Received message :" << msgReceived.getMessageId());
+ std::stringstream expected;
+ expected << msgContent << msgNum;
+ ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
+ ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
+ }
- ASSERT_EQ(ResultOk, consumer.unsubscribe());
- ASSERT_EQ(ResultAlreadyClosed, consumer.close());
- ASSERT_EQ(ResultOk, producer.close());
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+ ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+ ASSERT_EQ(ResultOk, producer.close());
+ }
ASSERT_EQ(ResultOk, client.close());
}