You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/02 01:17:16 UTC

[GitHub] merlimat closed pull request #1129: Added end to end encryption in C++ client

merlimat closed pull request #1129: Added end to end encryption in C++ client
URL: https://github.com/apache/incubator-pulsar/pull/1129
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index c73f9422a..274abd52e 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -24,6 +24,8 @@
 #include <pulsar/Result.h>
 #include <pulsar/ConsumerType.h>
 #include <pulsar/Message.h>
+#include <pulsar/ConsumerCryptoFailureAction.h>
+#include <pulsar/CryptoKeyReader.h>
 
 #pragma GCC visibility push(default)
 namespace pulsar {
@@ -139,6 +141,14 @@ class ConsumerConfiguration {
      * @return the configured timeout in milliseconds caching BrokerConsumerStats.
      */
     long getBrokerConsumerStatsCacheTimeInMs() const;
+
+    bool isEncryptionEnabled() const;
+    const CryptoKeyReaderPtr getCryptoKeyReader() const;
+    ConsumerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
+
+    ConsumerCryptoFailureAction getCryptoFailureAction() const;
+    ConsumerConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction action);
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerCryptoFailureAction.h b/pulsar-client-cpp/include/pulsar/ConsumerCryptoFailureAction.h
new file mode 100644
index 000000000..d9d845c76
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/ConsumerCryptoFailureAction.h
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef CONSUMERCRYPTOFAILUREACTION_H_
+#define CONSUMERCRYPTOFAILUREACTION_H_
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+enum class ConsumerCryptoFailureAction
+{
+    FAIL,     // This is the default option to fail consume until crypto succeeds
+    DISCARD,  // Message is silently acknowledged and not delivered to the application
+    CONSUME   // Deliver the encrypted message to the application. It's the application's
+              // responsibility to decrypt the message. If message is also compressed,
+              // decompression will fail. If message contain batch messages, client will
+              // not be able to retrieve individual messages in the batch
+};
+
+} /* namespace pulsar */
+
+#pragma GCC visibility pop
+
+#endif /* CONSUMERCRYPTOFAILUREACTION_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h
new file mode 100644
index 000000000..cb4cf1516
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef CRYPTOKEYREADER_H_
+#define CRYPTOKEYREADER_H_
+
+#include <pulsar/Result.h>
+#include <pulsar/EncryptionKeyInfo.h>
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+class CryptoKeyReader {
+   public:
+    CryptoKeyReader() {}
+
+    /*
+     * Return the encryption key corresponding to the key name in the argument
+     * <p>
+     * This method should be implemented to return the EncryptionKeyInfo. This method will be
+     * called at the time of producer creation as well as consumer receiving messages.
+     * Hence, application should not make any blocking calls within the implementation.
+     * <p>
+     *
+     * @param keyName
+     *            Unique name to identify the key
+     * @param metadata
+     *            Additional information needed to identify the key
+     * @param encKeyInfo updated with details about the public key
+     * @return Result ResultOk is returned for success
+     *
+     */
+    virtual Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+                                EncryptionKeyInfo& encKeyInfo) const = 0;
+
+    /*
+     * @param keyName
+     *            Unique name to identify the key
+     * @param metadata
+     *            Additional information needed to identify the key
+     * @param encKeyInfo updated with details about the private key
+     * @return Result ResultOk is returned for success
+     */
+    virtual Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+                                 EncryptionKeyInfo& encKeyInfo) const = 0;
+
+}; /* namespace pulsar */
+
+typedef boost::shared_ptr<CryptoKeyReader> CryptoKeyReaderPtr;
+}  // namespace pulsar
+
+#pragma GCC visibility pop
+
+#endif /* CRYPTOKEYREADER_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/EncryptionKeyInfo.h b/pulsar-client-cpp/include/pulsar/EncryptionKeyInfo.h
new file mode 100644
index 000000000..9461b14b2
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/EncryptionKeyInfo.h
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef ENCRYPTIONKEYINFO_H_
+#define ENCRYPTIONKEYINFO_H_
+
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <map>
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+class EncryptionKeyInfoImpl;
+class PulsarWrapper;
+
+typedef boost::shared_ptr<EncryptionKeyInfoImpl> EncryptionKeyInfoImplPtr;
+
+class EncryptionKeyInfo {
+    /*
+     * This object contains the encryption key and corresponding metadata which contains
+     * additional information about the key such as version, timestammp
+     */
+
+   public:
+    typedef std::map<std::string, std::string> StringMap;
+
+    EncryptionKeyInfo();
+
+    EncryptionKeyInfo(std::string key, StringMap& metadata);
+
+    std::string& getKey();
+
+    void setKey(std::string key);
+
+    StringMap& getMetadata(void);
+
+    void setMetadata(StringMap& metadata);
+
+   private:
+    explicit EncryptionKeyInfo(EncryptionKeyInfoImplPtr);
+
+    EncryptionKeyInfoImplPtr impl_;
+
+    friend class PulsarWrapper;
+};
+
+} /* namespace pulsar */
+
+#pragma GCC visibility pop
+
+#endif /* ENCRYPTIONKEYINFO_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index d12b65dda..9f7bf1fe1 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -23,6 +23,11 @@
 #include <pulsar/Result.h>
 #include <pulsar/Message.h>
 #include <boost/function.hpp>
+#include <pulsar/ProducerCryptoFailureAction.h>
+#include <pulsar/CryptoKeyReader.h>
+
+#include <set>
+
 #pragma GCC visibility push(default)
 
 namespace pulsar {
@@ -112,6 +117,16 @@ class ProducerConfiguration {
     ProducerConfiguration& setBatchingMaxPublishDelayMs(const unsigned long& batchingMaxPublishDelayMs);
     const unsigned long& getBatchingMaxPublishDelayMs() const;
 
+    const CryptoKeyReaderPtr getCryptoKeyReader() const;
+    ProducerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
+
+    ProducerCryptoFailureAction getCryptoFailureAction() const;
+    ProducerConfiguration& setCryptoFailureAction(ProducerCryptoFailureAction action);
+
+    std::set<std::string>& getEncryptionKeys();
+    bool isEncryptionEnabled() const;
+    ProducerConfiguration& addEncryptionKey(std::string key);
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/include/pulsar/ProducerCryptoFailureAction.h b/pulsar-client-cpp/include/pulsar/ProducerCryptoFailureAction.h
new file mode 100644
index 000000000..0e9a5d93d
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/ProducerCryptoFailureAction.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PRODUCERCRYPTOFAILUREACTION_H_
+#define PRODUCERCRYPTOFAILUREACTION_H_
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+enum class ProducerCryptoFailureAction
+{
+    FAIL,  // This is the default option to fail send if crypto operation fails
+    SEND   // Ignore crypto failure and proceed with sending unencrypted messages
+};
+
+} /* namespace pulsar */
+
+#pragma GCC visibility pop
+
+#endif /* PRODUCERCRYPTOFAILUREACTION_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index 8085d4837..68480e85f 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -72,8 +72,8 @@ enum Result
     ResultSubscriptionNotFound,                   /// Subscription not found
     ResultConsumerNotFound,                       /// Consumer not found
     ResultUnsupportedVersionError,  /// Error when an older client/version doesn't support a required feature
-
-    ResultTopicTerminated  /// Topic was already terminated
+    ResultTopicTerminated,          /// Topic was already terminated
+    ResultCryptoError               /// Error when crypto operation fails
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index 01be50312..098688703 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -93,6 +93,10 @@ void BatchMessageContainer::sendMessage() {
     impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size());
     compressPayLoad();
 
+    SharedBuffer encryptedPayload;
+    producer_.encryptMessage(impl_->metadata, impl_->payload, encryptedPayload);
+    impl_->payload = encryptedPayload;
+
     Message msg;
     msg.impl_ = impl_;
 
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 17d2c4b7a..1b99f584f 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -82,4 +82,23 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
     }
     impl_->unAckedMessagesTimeoutMs = milliSeconds;
 }
+
+bool ConsumerConfiguration::isEncryptionEnabled() const { return (impl_->cryptoKeyReader != NULL); }
+
+const CryptoKeyReaderPtr ConsumerConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; }
+
+ConsumerConfiguration& ConsumerConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) {
+    impl_->cryptoKeyReader = cryptoKeyReader;
+    return *this;
+}
+
+ConsumerCryptoFailureAction ConsumerConfiguration::getCryptoFailureAction() const {
+    return impl_->cryptoFailureAction;
+}
+
+ConsumerConfiguration& ConsumerConfiguration::setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+    impl_->cryptoFailureAction = action;
+    return *this;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 1fafc9aa7..91434a2fd 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -32,6 +32,8 @@ struct ConsumerConfigurationImpl {
     int maxTotalReceiverQueueSizeAcrossPartitions;
     std::string consumerName;
     long brokerConsumerStatsCacheTimeInMs;
+    CryptoKeyReaderPtr cryptoKeyReader;
+    ConsumerCryptoFailureAction cryptoFailureAction;
     ConsumerConfigurationImpl()
         : unAckedMessagesTimeoutMs(0),
           consumerType(ConsumerExclusive),
@@ -39,7 +41,9 @@ struct ConsumerConfigurationImpl {
           hasMessageListener(false),
           brokerConsumerStatsCacheTimeInMs(30 * 1000),  // 30 seconds
           receiverQueueSize(1000),
-          maxTotalReceiverQueueSizeAcrossPartitions(50000) {}
+          maxTotalReceiverQueueSizeAcrossPartitions(50000),
+          cryptoKeyReader(),
+          cryptoFailureAction(ConsumerCryptoFailureAction::FAIL) {}
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index ce2074788..1c62023b2 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -57,7 +57,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
       messageListenerRunning_(true),
       batchAcknowledgementTracker_(topic_, subscription, (long)consumerId_),
       brokerConsumerStats_(),
-      consumerStatsBasePtr_() {
+      consumerStatsBasePtr_(),
+      msgCrypto_() {
     std::stringstream consumerStrStream;
     consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
     consumerStr_ = consumerStrStream.str();
@@ -81,6 +82,11 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
     } else {
         consumerStatsBasePtr_ = boost::make_shared<ConsumerStatsDisabled>();
     }
+
+    // Create msgCrypto
+    if (conf.isEncryptionEnabled()) {
+        msgCrypto_ = boost::make_shared<MessageCrypto>(consumerStr_, false);
+    }
 }
 
 ConsumerImpl::~ConsumerImpl() {
@@ -249,6 +255,11 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
                                    SharedBuffer& payload) {
     LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes());
 
+    if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
+        // Message was discarded or not consumed due to decryption failure
+        return;
+    }
+
     if (!uncompressMessageIfNeeded(cnx, msg, metadata, payload)) {
         // Message was discarded on decompression error
         return;
@@ -339,6 +350,48 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
     return batchSize - skippedMessages;
 }
 
+bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
+                                          const proto::MessageMetadata& metadata, SharedBuffer& payload) {
+    if (!metadata.encryption_keys_size()) {
+        return true;
+    }
+
+    // If KeyReader is not configured throw exception based on config param
+    if (!config_.isEncryptionEnabled()) {
+        if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
+            LOG_WARN(getName() << "CryptoKeyReader is not implemented. Consuming encrypted message.");
+            return true;
+        } else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
+            LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader is not implemented and config "
+                                  "is set to discard");
+            discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::DecryptionError);
+        } else {
+            LOG_ERROR(getName() << "Message delivery failed since CryptoKeyReader is not implemented to "
+                                   "consume encrypted message");
+        }
+        return false;
+    }
+
+    SharedBuffer decryptedPayload;
+    if (msgCrypto_->decrypt(metadata, payload, config_.getCryptoKeyReader(), decryptedPayload)) {
+        payload = decryptedPayload;
+        return true;
+    }
+
+    if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
+        // Note, batch message will fail to consume even if config is set to consume
+        LOG_WARN(
+            getName() << "Decryption failed. Consuming encrypted message since config is set to consume.");
+        return true;
+    } else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
+        LOG_WARN(getName() << "Discarding message since decryption failed and config is set to discard");
+        discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::DecryptionError);
+    } else {
+        LOG_ERROR(getName() << "Message delivery failed since unable to decrypt incoming message");
+    }
+    return false;
+}
+
 bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
                                              const proto::MessageMetadata& metadata, SharedBuffer& payload) {
     if (!metadata.has_compression()) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index c68abf48f..d115ed83e 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -32,6 +32,7 @@
 #include "ExecutorService.h"
 #include "ConsumerImplBase.h"
 #include "lib/UnAckedMessageTrackerDisabled.h"
+#include "MessageCrypto.h"
 
 #include "CompressionCodec.h"
 #include <boost/dynamic_bitset.hpp>
@@ -51,6 +52,7 @@ class ConsumerImpl;
 class BatchAcknowledgementTracker;
 typedef boost::shared_ptr<ConsumerImpl> ConsumerImplPtr;
 typedef boost::weak_ptr<ConsumerImpl> ConsumerImplWeakPtr;
+typedef boost::shared_ptr<MessageCrypto> MessageCryptoPtr;
 
 enum ConsumerTopicType
 {
@@ -124,6 +126,9 @@ class ConsumerImpl : public ConsumerImplBase,
     uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage);
     void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback);
 
+    bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
+                                const proto::MessageMetadata& metadata, SharedBuffer& payload);
+
     // TODO - Convert these functions to lambda when we move to C++11
     Result receiveHelper(Message& msg);
     Result receiveHelper(Message& msg, int timeout);
@@ -157,6 +162,8 @@ class ConsumerImpl : public ConsumerImplBase,
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
     BrokerConsumerStatsImpl brokerConsumerStats_;
 
+    MessageCryptoPtr msgCrypto_;
+
     friend class PulsarFriend;
 };
 
diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfo.cc b/pulsar-client-cpp/lib/EncryptionKeyInfo.cc
new file mode 100644
index 000000000..68c6c0a88
--- /dev/null
+++ b/pulsar-client-cpp/lib/EncryptionKeyInfo.cc
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/EncryptionKeyInfo.h>
+
+#include "EncryptionKeyInfoImpl.h"
+
+namespace pulsar {
+
+EncryptionKeyInfo::EncryptionKeyInfo() : impl_(new EncryptionKeyInfoImpl()) {}
+
+EncryptionKeyInfo::EncryptionKeyInfo(EncryptionKeyInfoImplPtr impl) : impl_(impl) {}
+
+std::string& EncryptionKeyInfo::getKey() { return impl_->getKey(); }
+
+void EncryptionKeyInfo::setKey(std::string key) { impl_->setKey(key); }
+
+EncryptionKeyInfo::StringMap& EncryptionKeyInfo::getMetadata() { return impl_->getMetadata(); }
+
+void EncryptionKeyInfo::setMetadata(StringMap& metadata) { impl_->setMetadata(metadata); }
+
+}; /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc
new file mode 100644
index 000000000..1409c5004
--- /dev/null
+++ b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "EncryptionKeyInfoImpl.h"
+
+namespace pulsar {
+
+EncryptionKeyInfoImpl::EncryptionKeyInfoImpl() : key_(), metadata_() {}
+
+EncryptionKeyInfoImpl::EncryptionKeyInfoImpl(std::string key, StringMap& metadata) {
+    key_ = key;
+    metadata_ = metadata;
+}
+
+std::string& EncryptionKeyInfoImpl::getKey() { return key_; }
+
+void EncryptionKeyInfoImpl::setKey(std::string key) { key_ = key; }
+
+EncryptionKeyInfoImpl::StringMap& EncryptionKeyInfoImpl::getMetadata() { return metadata_; }
+
+void EncryptionKeyInfoImpl::setMetadata(StringMap& metadata) { metadata_ = metadata; }
+
+}; /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h
new file mode 100644
index 000000000..80bfae164
--- /dev/null
+++ b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef LIB_ENCRYPTIONKEYINFOIMPL_H_
+#define LIB_ENCRYPTIONKEYINFOIMPL_H_
+
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <map>
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+class EncryptionKeyInfoImpl {
+   public:
+    typedef std::map<std::string, std::string> StringMap;
+
+    EncryptionKeyInfoImpl();
+
+    EncryptionKeyInfoImpl(std::string key, StringMap& metadata);
+
+    std::string& getKey();
+
+    void setKey(std::string key);
+
+    StringMap& getMetadata(void);
+
+    void setMetadata(StringMap& metadata);
+
+   private:
+    StringMap metadata_;
+    std::string key_;
+};
+
+} /* namespace pulsar */
+
+#pragma GCC visibility pop
+
+#endif /* LIB_ENCRYPTIONKEYINFOIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc b/pulsar-client-cpp/lib/MessageCrypto.cc
new file mode 100644
index 000000000..5e3bd3cbc
--- /dev/null
+++ b/pulsar-client-cpp/lib/MessageCrypto.cc
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "LogUtils.h"
+#include "MessageCrypto.h"
+
+namespace pulsar {
+
+DECLARE_LOG_OBJECT()
+
+MessageCrypto::MessageCrypto(std::string& logCtx, bool keyGenNeeded)
+    : dataKeyLen_(32),
+      dataKey_(new unsigned char[dataKeyLen_]),
+      tagLen_(16),
+      ivLen_(12),
+      iv_(new unsigned char[ivLen_]),
+      logCtx_(logCtx) {
+    SSL_library_init();
+    SSL_load_error_strings();
+
+    if (!keyGenNeeded) {
+        mdCtx_ = EVP_MD_CTX_create();
+        EVP_MD_CTX_init(mdCtx_);
+        return;
+    }
+
+    RAND_bytes(dataKey_.get(), dataKeyLen_);
+    RAND_bytes(iv_.get(), ivLen_);
+}
+
+MessageCrypto::~MessageCrypto() {}
+
+RSA* MessageCrypto::loadPublicKey(std::string& pubKeyStr) {
+    BIO* pubBio = NULL;
+    RSA* rsaPub = NULL;
+
+    pubBio = BIO_new_mem_buf((char*)pubKeyStr.c_str(), -1);
+    if (pubBio == NULL) {
+        LOG_ERROR(logCtx_ << " Failed to get memory for public key");
+        return rsaPub;
+    }
+
+    rsaPub = PEM_read_bio_RSA_PUBKEY(pubBio, NULL, NULL, NULL);
+    if (rsaPub == NULL) {
+        LOG_ERROR(logCtx_ << " Failed to load public key");
+    }
+
+    BIO_free(pubBio);
+    return rsaPub;
+}
+
+RSA* MessageCrypto::loadPrivateKey(std::string& privateKeyStr) {
+    BIO* privBio = NULL;
+    RSA* rsaPriv = NULL;
+
+    privBio = BIO_new_mem_buf((char*)privateKeyStr.c_str(), -1);
+    if (privBio == NULL) {
+        LOG_ERROR(logCtx_ << " Failed to get memory for private key");
+        return rsaPriv;
+    }
+
+    rsaPriv = PEM_read_bio_RSAPrivateKey(privBio, NULL, NULL, NULL);
+    if (rsaPriv == NULL) {
+        LOG_ERROR(logCtx_ << " Failed to load private key");
+    }
+
+    BIO_free(privBio);
+    return rsaPriv;
+}
+
+bool MessageCrypto::getDigest(const std::string& keyName, const void* input, unsigned int inputLen,
+                              unsigned char keyDigest[], unsigned int& digestLen) {
+    if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) {
+        LOG_ERROR(logCtx_ + "Failed to initialize md5 digest for key " + keyName);
+        return false;
+    }
+
+    digestLen = 0;
+    if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) {
+        LOG_ERROR(logCtx_ + "Failed to get md5 hash for data key " + keyName);
+        return false;
+    }
+
+    if (EVP_DigestFinal_ex(mdCtx_, keyDigest, &digestLen) != 1) {
+        LOG_ERROR(logCtx_ + "Failed to finalize md hash for data key " + keyName);
+        return false;
+    }
+
+    return true;
+}
+
+void MessageCrypto::removeExpiredDataKey() {
+    boost::posix_time::ptime now = boost::posix_time::second_clock::universal_time();
+    boost::posix_time::time_duration expireTime = boost::posix_time::hours(4);
+
+    auto dataKeyCacheIter = dataKeyCache_.begin();
+    while (dataKeyCacheIter != dataKeyCache_.end()) {
+        auto dataKeyEntry = dataKeyCacheIter->second;
+        boost::posix_time::time_duration td = now - dataKeyEntry.second;
+
+        if ((now - dataKeyEntry.second) > expireTime) {
+            dataKeyCache_.erase(dataKeyCacheIter++);
+        } else {
+            dataKeyCacheIter++;
+        }
+    }
+}
+
+Result MessageCrypto::addPublicKeyCipher(std::set<std::string>& keyNames,
+                                         const CryptoKeyReaderPtr keyReader) {
+    Lock lock(mutex_);
+
+    // Generate data key
+    RAND_bytes(dataKey_.get(), dataKeyLen_);
+
+    Result result = ResultOk;
+    for (auto it = keyNames.begin(); it != keyNames.end(); it++) {
+        result = addPublicKeyCipher(*it, keyReader);
+        if (result != ResultOk) {
+            return result;
+        }
+    }
+    return result;
+}
+
+Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader) {
+    if (keyName.empty()) {
+        LOG_ERROR(logCtx_ + "Keyname is empty ");
+        return ResultCryptoError;
+    }
+
+    // Read the public key and its info using callback
+    StringMap keyMeta;
+    EncryptionKeyInfo keyInfo;
+    Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo);
+    if (result != ResultOk) {
+        LOG_ERROR(logCtx_ + "Failed to get public key from KeyReader for key " + keyName);
+        return result;
+    }
+
+    RSA* pubKey = loadPublicKey(keyInfo.getKey());
+    if (pubKey == NULL) {
+        LOG_ERROR(logCtx_ + "Failed to load public key " + keyName);
+        return ResultCryptoError;
+    }
+
+    int inSize = RSA_size(pubKey);
+    boost::scoped_array<unsigned char> encryptedKey(new unsigned char[inSize]);
+
+    int outSize =
+        RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), pubKey, RSA_PKCS1_OAEP_PADDING);
+
+    if (inSize != outSize) {
+        LOG_ERROR(logCtx_ + "Ciphertext is length not matching input key length for key " + keyName);
+        return ResultCryptoError;
+    }
+    std::string encryptedKeyStr(reinterpret_cast<char*>(encryptedKey.get()), inSize);
+    std::shared_ptr<EncryptionKeyInfo> eki(new EncryptionKeyInfo());
+    eki->setKey(encryptedKeyStr);
+    eki->setMetadata(keyInfo.getMetadata());
+
+    encryptedDataKeyMap_.insert(std::make_pair(keyName, eki));
+    return ResultOk;
+}
+
+bool MessageCrypto::removeKeyCipher(std::string& keyName) {
+    if (!keyName.size()) {
+        return false;
+    }
+    encryptedDataKeyMap_.erase(keyName);
+    return true;
+}
+
+bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReaderPtr keyReader,
+                            proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
+                            SharedBuffer& encryptedPayload) {
+    if (!encKeys.size()) {
+        return false;
+    }
+    SharedBuffer emptyBuffer;
+
+    Lock lock(mutex_);
+
+    // Update message metadata with encrypted data key
+    for (auto it = encKeys.begin(); it != encKeys.end(); it++) {
+        const std::string& keyName = *it;
+        auto keyInfoIter = encryptedDataKeyMap_.find(keyName);
+
+        if (keyInfoIter == encryptedDataKeyMap_.end()) {
+            // Attempt to load the key. This will allow us to load keys as soon as
+            // a new key is added to producer config
+            Result result = addPublicKeyCipher(keyName, keyReader);
+            if (result != ResultOk) {
+                return false;
+            }
+
+            keyInfoIter = encryptedDataKeyMap_.find(keyName);
+
+            if (keyInfoIter == encryptedDataKeyMap_.end()) {
+                LOG_ERROR(logCtx_ + "Unable to find encrypted data key for " + keyName);
+                return false;
+            }
+        }
+        EncryptionKeyInfo* keyInfo = keyInfoIter->second.get();
+
+        proto::EncryptionKeys* encKeys = proto::EncryptionKeys().New();
+        encKeys->set_key(keyName);
+        encKeys->set_value(keyInfo->getKey());
+
+        if (keyInfo->getMetadata().size()) {
+            for (auto metaIter = keyInfo->getMetadata().begin(); metaIter != keyInfo->getMetadata().end();
+                 metaIter++) {
+                proto::KeyValue* keyValue = proto::KeyValue().New();
+                keyValue->set_key(metaIter->first);
+                keyValue->set_value(metaIter->second);
+                encKeys->mutable_metadata()->AddAllocated(keyValue);
+            }
+        }
+
+        msgMetadata.mutable_encryption_keys()->AddAllocated(encKeys);
+    }
+
+    // TODO: Replace random with counter and periodic refreshing based on timer/counter value
+    RAND_bytes(iv_.get(), ivLen_);
+    msgMetadata.set_encryption_param(reinterpret_cast<char*>(iv_.get()), ivLen_);
+
+    EVP_CIPHER_CTX* cipherCtx = NULL;
+    encryptedPayload = SharedBuffer::allocate(payload.readableBytes() + EVP_MAX_BLOCK_LENGTH + tagLen_);
+    int encLen = 0;
+
+    if (!(cipherCtx = EVP_CIPHER_CTX_new())) {
+        LOG_ERROR(logCtx_ + " Failed to cipher ctx.");
+        return false;
+    }
+
+    if (EVP_EncryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, dataKey_.get(), iv_.get()) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to init cipher ctx.");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+
+    if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to set cipher padding.");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+
+    if (EVP_EncryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()),
+                          &encLen, reinterpret_cast<unsigned const char*>(payload.data()),
+                          payload.readableBytes()) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to encrypt payload.");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+    encryptedPayload.bytesWritten(encLen);
+    encLen = 0;
+
+    if (EVP_EncryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()),
+                            &encLen) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to finalize encryption.");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+    encryptedPayload.bytesWritten(encLen);
+
+    if (EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_GET_TAG, tagLen_, encryptedPayload.mutableData()) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to get cipher tag info.");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+    encryptedPayload.bytesWritten(tagLen_);
+
+    EVP_CIPHER_CTX_free(cipherCtx);
+
+    return true;
+}
+
+bool MessageCrypto::decryptDataKey(const std::string& keyName, const std::string& encryptedDataKey,
+                                   const google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta,
+                                   const CryptoKeyReaderPtr keyReader) {
+    StringMap keyMeta;
+    for (auto iter = encKeyMeta.begin(); iter != encKeyMeta.end(); iter++) {
+        keyMeta[iter->key()] = iter->value();
+    }
+
+    // Read the private key info using callback
+    EncryptionKeyInfo keyInfo;
+    keyReader->getPrivateKey(keyName, keyMeta, keyInfo);
+
+    // Convert key from string to RSA key
+    RSA* privKey = loadPrivateKey(keyInfo.getKey());
+    if (privKey == NULL) {
+        LOG_ERROR(logCtx_ + " Failed to load private key " + keyName);
+        return false;
+    }
+
+    // Decrypt data key
+    int outSize = RSA_private_decrypt(encryptedDataKey.size(),
+                                      reinterpret_cast<unsigned const char*>(encryptedDataKey.c_str()),
+                                      dataKey_.get(), privKey, RSA_PKCS1_OAEP_PADDING);
+
+    if (outSize == -1) {
+        LOG_ERROR(logCtx_ + "Failed to decrypt AES key for " + keyName);
+        return false;
+    }
+
+    unsigned char keyDigest[EVP_MAX_MD_SIZE];
+    unsigned int digestLen = 0;
+    if (!getDigest(keyName, encryptedDataKey.c_str(), encryptedDataKey.size(), keyDigest, digestLen)) {
+        LOG_ERROR(logCtx_ + "Failed to get digest for data key " + keyName);
+        return false;
+    }
+
+    std::string keyDigestStr(reinterpret_cast<char*>(keyDigest), digestLen);
+    std::string dataKeyStr(reinterpret_cast<char*>(dataKey_.get()), dataKeyLen_);
+    dataKeyCache_[keyDigestStr] = make_pair(dataKeyStr, boost::posix_time::second_clock::universal_time());
+
+    // Remove expired entries from the cache
+    removeExpiredDataKey();
+    return true;
+}
+
+bool MessageCrypto::decryptData(const std::string& dataKeySecret, const proto::MessageMetadata& msgMetadata,
+                                SharedBuffer& payload, SharedBuffer& decryptedPayload) {
+    // unpack iv and encrypted data
+    msgMetadata.encryption_param().copy(reinterpret_cast<char*>(iv_.get()),
+                                        msgMetadata.encryption_param().size());
+
+    EVP_CIPHER_CTX* cipherCtx = NULL;
+    decryptedPayload = SharedBuffer::allocate(payload.readableBytes() + EVP_MAX_BLOCK_LENGTH + tagLen_);
+
+    if (!(cipherCtx = EVP_CIPHER_CTX_new())) {
+        LOG_ERROR(logCtx_ + " Failed to get cipher ctx");
+        return false;
+    }
+
+    if (!EVP_DecryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL,
+                            reinterpret_cast<unsigned const char*>(dataKeySecret.c_str()),
+                            reinterpret_cast<unsigned const char*>(iv_.get()))) {
+        LOG_ERROR(logCtx_ + " Failed to init decrypt cipher ctx");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+
+    if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to set cipher padding");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+
+    int cipherLen = payload.readableBytes() - tagLen_;
+    int decLen = 0;
+    if (!EVP_DecryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(decryptedPayload.mutableData()),
+                           &decLen, reinterpret_cast<unsigned const char*>(payload.data()), cipherLen)) {
+        LOG_ERROR(logCtx_ + " Failed to decrypt update");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    };
+    decryptedPayload.bytesWritten(decLen);
+
+    if (!EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_SET_TAG, tagLen_, (void*)(payload.data() + cipherLen))) {
+        LOG_ERROR(logCtx_ + " Failed to set gcm tag");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+
+    if (!EVP_DecryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(decryptedPayload.mutableData()),
+                             &decLen)) {
+        LOG_ERROR(logCtx_ + " Failed to finalize encrypted message");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+    decryptedPayload.bytesWritten(decLen);
+
+    EVP_CIPHER_CTX_free(cipherCtx);
+
+    return true;
+}
+
+bool MessageCrypto::getKeyAndDecryptData(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
+                                         SharedBuffer& decryptedPayload) {
+    SharedBuffer decryptedData;
+    bool dataDecrypted = false;
+
+    for (auto iter = msgMetadata.encryption_keys().begin(); iter != msgMetadata.encryption_keys().end();
+         iter++) {
+        const std::string& keyName = iter->key();
+        const std::string& encDataKey = iter->value();
+        unsigned char keyDigest[EVP_MAX_MD_SIZE];
+        unsigned int digestLen = 0;
+        getDigest(keyName, encDataKey.c_str(), encDataKey.size(), keyDigest, digestLen);
+
+        std::string keyDigestStr(reinterpret_cast<char*>(keyDigest), digestLen);
+
+        auto dataKeyCacheIter = dataKeyCache_.find(keyDigestStr);
+        if (dataKeyCacheIter != dataKeyCache_.end()) {
+            // Taking a small performance hit here if the hash collides. When it
+            // retruns a different key, decryption fails. At this point, we would
+            // call decryptDataKey to refresh the cache and come here again to decrypt.
+            auto dataKeyEntry = dataKeyCacheIter->second;
+            if (decryptData(dataKeyEntry.first, msgMetadata, payload, decryptedPayload)) {
+                dataDecrypted = true;
+                break;
+            }
+        } else {
+            // First time, entry won't be present in cache
+            LOG_DEBUG(logCtx_ + " Failed to decrypt data or data key is not in cache for " + keyName +
+                      ". Will attempt to refresh.");
+        }
+    }
+    return dataDecrypted;
+}
+
+bool MessageCrypto::decrypt(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
+                            const CryptoKeyReaderPtr keyReader, SharedBuffer& decryptedPayload) {
+    // Attempt to decrypt using the existing key
+    if (getKeyAndDecryptData(msgMetadata, payload, decryptedPayload)) {
+        return true;
+    }
+
+    // Either first time, or decryption failed. Attempt to regenerate data key
+    bool isDataKeyDecrypted = false;
+    for (int index = 0; index < msgMetadata.encryption_keys_size(); index++) {
+        const proto::EncryptionKeys& encKeys = msgMetadata.encryption_keys(index);
+
+        const std::string& encDataKey = encKeys.value();
+        const google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta = encKeys.metadata();
+        if (decryptDataKey(encKeys.key(), encDataKey, encKeyMeta, keyReader)) {
+            isDataKeyDecrypted = true;
+            break;
+        }
+    }
+
+    if (!isDataKeyDecrypted) {
+        // Unable to decrypt data key
+        return false;
+    }
+
+    return getKeyAndDecryptData(msgMetadata, payload, decryptedPayload);
+}
+
+} /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/MessageCrypto.h b/pulsar-client-cpp/lib/MessageCrypto.h
new file mode 100644
index 000000000..016ce54f9
--- /dev/null
+++ b/pulsar-client-cpp/lib/MessageCrypto.h
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef LIB_MESSAGECRYPTO_H_
+#define LIB_MESSAGECRYPTO_H_
+
+#include <iostream>
+#include <map>
+#include <set>
+#include <boost/thread/mutex.hpp>
+#include <boost/scoped_array.hpp>
+
+#include <openssl/ssl.h>
+#include <openssl/rand.h>
+#include <openssl/bio.h>
+#include <openssl/evp.h>
+#include <openssl/rsa.h>
+#include <openssl/engine.h>
+
+#include "SharedBuffer.h"
+#include "ExecutorService.h"
+#include "pulsar/CryptoKeyReader.h"
+#include "PulsarApi.pb.h"
+
+namespace pulsar {
+
+class MessageCrypto {
+   public:
+    typedef std::map<std::string, std::string> StringMap;
+    typedef std::map<std::string, std::pair<std::string, boost::posix_time::ptime>> DataKeyCacheMap;
+
+    MessageCrypto(std::string& logCtx, bool keyGenNeeded);
+    ~MessageCrypto();
+
+    /*
+     * Encrypt data key using the public key(s) in the argument. <p> If more than one key name is specified,
+     * data key is encrypted using each of those keys. If the public key is expired or changed, application is
+     * responsible to remove the old key and add the new key <p>
+     *
+     * @param keyNames List of public keys to encrypt data key
+     * @param keyReader Implementation to read the key values
+     * @return ResultOk if succeeded
+     *
+     */
+    Result addPublicKeyCipher(std::set<std::string>& keyNames, const CryptoKeyReaderPtr keyReader);
+
+    /*
+     * Remove a key <p> Remove the key identified by the keyName from the list of keys.<p>
+     *
+     * @param keyName Unique name to identify the key
+     * @return true if succeeded, false otherwise
+     */
+    bool removeKeyCipher(std::string& keyName);
+
+    /*
+     * Encrypt the payload using the data key and update message metadata with the keyname & encrypted data
+     * key
+     *
+     * @param encKeys One or more public keys to encrypt data key
+     * @param keyReader Implementation to read the key values
+     * @param msgMetadata Message Metadata
+     * @param payload Message which needs to be encrypted
+     * @param encryptedPayload Contains encrypted payload if success
+     *
+     * @return true if success
+     */
+    bool encrypt(std::set<std::string>& encKeys, const CryptoKeyReaderPtr keyReader,
+                 proto::MessageMetadata& msgMetadata, SharedBuffer& payload, SharedBuffer& encryptedPayload);
+
+    /*
+     * Decrypt the payload using the data key. Keys used to encrypt data key can be retrieved from msgMetadata
+     *
+     * @param msgMetadata Message Metadata
+     * @param payload Message which needs to be decrypted
+     * @param keyReader KeyReader implementation to retrieve key value
+     * @param decryptedPayload Contains decrypted payload if success
+     *
+     * @return true if success
+     */
+    bool decrypt(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
+                 const CryptoKeyReaderPtr keyReader, SharedBuffer& decryptedPayload);
+
+   private:
+    typedef boost::unique_lock<boost::mutex> Lock;
+    boost::mutex mutex_;
+
+    int dataKeyLen_;
+    boost::scoped_array<unsigned char> dataKey_;
+
+    int tagLen_;
+    int ivLen_;
+    boost::scoped_array<unsigned char> iv_;
+
+    std::string logCtx_;
+
+    /* This cache uses the digest of encrypted data key as it's key. It's possible
+     * for consumers to receive messages with data key encrypted using older or
+     * newer version of public key. If we use the key name as the key for dataKeyCache,
+     * we will end up decrypting data key way too often which is costly.
+     */
+    DataKeyCacheMap dataKeyCache_;
+
+    // Map of key name and encrypted gcm key, metadata pair which is sent with encrypted message
+    std::map<std::string, std::shared_ptr<EncryptionKeyInfo>> encryptedDataKeyMap_;
+
+    EVP_MD_CTX* mdCtx_;
+
+    RSA* loadPublicKey(std::string& pubKeyStr);
+    RSA* loadPrivateKey(std::string& privateKeyStr);
+    bool getDigest(const std::string& keyName, const void* input, unsigned int inputLen,
+                   unsigned char keyDigest[], unsigned int& digestLen);
+    void removeExpiredDataKey();
+
+    Result addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader);
+
+    bool decryptDataKey(const std::string& keyName, const std::string& encryptedDataKey,
+                        const google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta,
+                        const CryptoKeyReaderPtr keyReader);
+    bool decryptData(const std::string& dataKeySecret, const proto::MessageMetadata& msgMetadata,
+                     SharedBuffer& payload, SharedBuffer& decPayload);
+    bool getKeyAndDecryptData(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
+                              SharedBuffer& decryptedPayload);
+};
+
+} /* namespace pulsar */
+
+#endif /* LIB_MESSAGECRYPTO_H_ */
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index c010a9f36..ab70db225 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -157,4 +157,32 @@ ProducerConfiguration& ProducerConfiguration::setBatchingMaxPublishDelayMs(
 const unsigned long& ProducerConfiguration::getBatchingMaxPublishDelayMs() const {
     return impl_->batchingMaxPublishDelayMs;
 }
+
+const CryptoKeyReaderPtr ProducerConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; }
+
+ProducerConfiguration& ProducerConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) {
+    impl_->cryptoKeyReader = cryptoKeyReader;
+    return *this;
+}
+
+ProducerCryptoFailureAction ProducerConfiguration::getCryptoFailureAction() const {
+    return impl_->cryptoFailureAction;
+}
+
+ProducerConfiguration& ProducerConfiguration::setCryptoFailureAction(ProducerCryptoFailureAction action) {
+    impl_->cryptoFailureAction = action;
+    return *this;
+}
+
+std::set<std::string>& ProducerConfiguration::getEncryptionKeys() { return impl_->encryptionKeys; }
+
+bool ProducerConfiguration::isEncryptionEnabled() const {
+    return (!impl_->encryptionKeys.empty() && (impl_->cryptoKeyReader != NULL));
+}
+
+ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key) {
+    impl_->encryptionKeys.insert(key);
+    return *this;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index 11fe7ccd1..3f788a95c 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -41,6 +41,9 @@ struct ProducerConfigurationImpl {
     unsigned int batchingMaxMessages;
     unsigned long batchingMaxAllowedSizeInBytes;
     unsigned long batchingMaxPublishDelayMs;
+    CryptoKeyReaderPtr cryptoKeyReader;
+    std::set<std::string> encryptionKeys;
+    ProducerCryptoFailureAction cryptoFailureAction;
     ProducerConfigurationImpl()
         : sendTimeoutMs(30000),
           compressionType(CompressionNone),
@@ -52,8 +55,10 @@ struct ProducerConfigurationImpl {
           batchingEnabled(false),
           batchingMaxMessages(1000),
           batchingMaxAllowedSizeInBytes(128 * 1024),  // 128 KB
-          batchingMaxPublishDelayMs(10) {             // 10 milli seconds
-    }
+          batchingMaxPublishDelayMs(10),              // 10 milli seconds
+          cryptoKeyReader(),
+          encryptionKeys(),
+          cryptoFailureAction(ProducerCryptoFailureAction::FAIL) {}
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 2d36c7f9e..b2ed33f52 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -50,7 +50,9 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const
       producerStr_("[" + topic_ + ", " + producerName_ + "] "),
       producerId_(client->newProducerId()),
       msgSequenceGenerator_(0),
-      sendTimer_() {
+      sendTimer_(),
+      msgCrypto_(),
+      dataKeyGenIntervalSec_(4 * 60 * 60) {
     LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
                                 << " id: " << producerId_);
 
@@ -70,10 +72,26 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const
     } else {
         producerStatsBasePtr_ = boost::make_shared<ProducerStatsDisabled>();
     }
+
+    if (conf_.isEncryptionEnabled()) {
+        std::ostringstream logCtxStream;
+        logCtxStream << "[" << topic_ << ", " << producerName_ << ", " << producerId_ << "]";
+        std::string logCtx = logCtxStream.str();
+        msgCrypto_ = boost::make_shared<MessageCrypto>(logCtx, true);
+        msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
+
+        dataKeyGenTImer_ = executor_->createDeadlineTimer();
+        dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
+        dataKeyGenTImer_->async_wait(
+            boost::bind(&pulsar::ProducerImpl::refreshEncryptionKey, this, boost::asio::placeholders::error));
+    }
 }
 
 ProducerImpl::~ProducerImpl() {
     LOG_DEBUG(getName() << "~ProducerImpl");
+    if (dataKeyGenTImer_) {
+        dataKeyGenTImer_->cancel();
+    }
     closeAsync(ResultCallback());
     printStats();
 }
@@ -84,6 +102,19 @@ const std::string& ProducerImpl::getProducerName() const { return producerName_;
 
 int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished_; }
 
+void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
+    if (ec) {
+        LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
+        return;
+    }
+
+    msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
+
+    dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
+    dataKeyGenTImer_->async_wait(
+        boost::bind(&pulsar::ProducerImpl::refreshEncryptionKey, this, boost::asio::placeholders::error));
+}
+
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     Lock lock(mutex_);
     if (state_ == Closed) {
@@ -262,14 +293,23 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
     SharedBuffer& payload = msg.impl_->payload;
 
     uint32_t uncompressedSize = payload.readableBytes();
+    uint32_t payloadSize = uncompressedSize;
 
     if (!batchMessageContainer) {
         // If batching is enabled we compress all the payloads together before sending the batch
         payload = CompressionCodecProvider::getCodec(conf_.getCompressionType()).encode(payload);
+        payloadSize = payload.readableBytes();
+
+        // Encrypt the payload if enabled
+        SharedBuffer encryptedPayload;
+        if (!encryptMessage(msg.impl_->metadata, payload, encryptedPayload)) {
+            cb(ResultCryptoError, msg);
+            return;
+        }
+        payload = encryptedPayload;
     }
-    uint32_t compressedSize = payload.readableBytes();
-    if (compressedSize > Commands::MaxMessageSize) {
-        LOG_DEBUG(getName() << " - compressed Message payload size" << compressedSize << "cannot exceed "
+    if (payloadSize > Commands::MaxMessageSize) {
+        LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed "
                             << Commands::MaxMessageSize << " bytes");
         cb(ResultMessageTooBig, msg);
         return;
@@ -559,6 +599,17 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId) {
     }
 }
 
+bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
+                                  SharedBuffer& encryptedPayload) {
+    if (!conf_.isEncryptionEnabled() || msgCrypto_ == NULL) {
+        encryptedPayload = payload;
+        return true;
+    }
+
+    return msgCrypto_->encrypt(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader(), metadata, payload,
+                               encryptedPayload);
+}
+
 void ProducerImpl::disconnectProducer() {
     LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
     Lock lock(mutex_);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index f89823f43..2907d405d 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -27,6 +27,7 @@
 #include "HandlerBase.h"
 #include "SharedBuffer.h"
 #include "CompressionCodec.h"
+#include "MessageCrypto.h"
 #include "stats/ProducerStatsDisabled.h"
 #include "stats/ProducerStatsImpl.h"
 
@@ -37,6 +38,7 @@ namespace pulsar {
 class BatchMessageContainer;
 
 typedef boost::shared_ptr<BatchMessageContainer> BatchMessageContainerPtr;
+typedef boost::shared_ptr<MessageCrypto> MessageCryptoPtr;
 
 class PulsarFriend;
 
@@ -120,6 +122,10 @@ class ProducerImpl : public HandlerBase,
 
     void resendMessages(ClientConnectionPtr cnx);
 
+    void refreshEncryptionKey(const boost::system::error_code& ec);
+    bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
+                        SharedBuffer& encryptedPayload);
+
     typedef boost::unique_lock<boost::mutex> Lock;
 
     ProducerConfiguration conf_;
@@ -144,6 +150,10 @@ class ProducerImpl : public HandlerBase,
     Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
 
     void failPendingMessages(Result result);
+
+    MessageCryptoPtr msgCrypto_;
+    DeadlineTimerPtr dataKeyGenTImer_;
+    uint32_t dataKeyGenIntervalSec_;
 };
 
 struct ProducerImplCmp {
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 1deb5f2e5..64e23e1f2 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -122,6 +122,9 @@ const char* pulsar::strResult(Result result) {
 
         case ResultTopicTerminated:
             return "TopicTerminated";
+
+        case ResultCryptoError:
+            return "CryptoError";
     };
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client-cpp/perf/PerfConsumer.cc b/pulsar-client-cpp/perf/PerfConsumer.cc
index 0ddef60d3..97aa8db89 100644
--- a/pulsar-client-cpp/perf/PerfConsumer.cc
+++ b/pulsar-client-cpp/perf/PerfConsumer.cc
@@ -68,6 +68,8 @@ struct Arguments {
     int receiverQueueSize;
     int ioThreads;
     int listenerThreads;
+    std::string encKeyName;
+    std::string encKeyValueFile;
 };
 
 namespace pulsar {
@@ -87,6 +89,37 @@ class PulsarFriend {
 #include <atomic>
 #endif
 
+class EncKeyReader: public CryptoKeyReader {
+
+  private:
+    std::string privKeyContents;
+
+    void readFile(std::string fileName, std::string& fileContents) const {
+        std::ifstream ifs(fileName);
+        std::stringstream fileStream;
+        fileStream << ifs.rdbuf();
+        fileContents = fileStream.str();
+    }
+
+  public:
+
+    EncKeyReader(std::string keyFile) {
+        if (keyFile.empty()) {
+            return;
+        }
+        readFile(keyFile, privKeyContents);
+    }
+
+    Result getPublicKey(const std::string &keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const {
+        return ResultInvalidConfiguration;
+    }
+
+    Result getPrivateKey(const std::string &keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const {
+        encKeyInfo.setKey(privKeyContents);
+        return ResultOk;
+    }
+};
+
 // Counters
 std::atomic<uint32_t> messagesReceived;
 std::atomic<uint32_t> bytesReceived;
@@ -149,6 +182,10 @@ void startPerfConsumer(const Arguments& args) {
     ConsumerConfiguration consumerConf;
     consumerConf.setMessageListener(messageListener);
     consumerConf.setReceiverQueueSize(args.receiverQueueSize);
+    boost::shared_ptr<EncKeyReader> keyReader = boost::make_shared<EncKeyReader>(args.encKeyValueFile);
+    if (!args.encKeyName.empty()) {
+        consumerConf.setCryptoKeyReader(keyReader);
+    }
 
     Latch latch(args.numTopics * args.numConsumers);
 
@@ -261,7 +298,12 @@ int main(int argc, char** argv) {
      "Number of IO threads to use")  //
 
     ("listener-threads,l", po::value<int>(&args.listenerThreads)->default_value(1),
-     "Number of listener threads");
+     "Number of listener threads") //
+
+    ("encryption-key-name,k", po::value<std::string>(&args.encKeyName)->default_value(""), "The private key name to decrypt payload") //
+
+    ("encryption-key-value-file,f", po::value<std::string>(&args.encKeyValueFile)->default_value(""),
+            "The file which contains the private key to decrypt payload"); //
 
     po::options_description hidden;
     hidden.add_options()("topic", po::value<std::string>(&args.topic), "Topic name");
diff --git a/pulsar-client-cpp/perf/PerfProducer.cc b/pulsar-client-cpp/perf/PerfProducer.cc
index d6cea4cf1..d591d3429 100644
--- a/pulsar-client-cpp/perf/PerfProducer.cc
+++ b/pulsar-client-cpp/perf/PerfProducer.cc
@@ -62,6 +62,8 @@ struct Arguments {
     unsigned int batchingMaxMessages;
     long batchingMaxAllowedSizeInBytes;
     long batchingMaxPublishDelayMs;
+    std::string encKeyName;
+    std::string encKeyValueFile;
 };
 
 namespace pulsar {
@@ -74,11 +76,43 @@ class PulsarFriend {
 };
 }
 
-// Stats
 unsigned long messagesProduced;
 unsigned long bytesProduced;
 using namespace boost::accumulators;
+using namespace pulsar;
+
+class EncKeyReader: public CryptoKeyReader {
+
+  private:
+    std::string pubKeyContents;
+
+    void readFile(std::string fileName, std::string& fileContents) const {
+        std::ifstream ifs(fileName);
+        std::stringstream fileStream;
+        fileStream << ifs.rdbuf();
+        fileContents = fileStream.str();
+    }
 
+  public:
+
+    EncKeyReader(std::string keyFile) {
+        if (keyFile.empty()) {
+            return;
+        }
+        readFile(keyFile, pubKeyContents);
+    }
+
+    Result getPublicKey(const std::string &keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const {
+        encKeyInfo.setKey(pubKeyContents);
+        return ResultOk;
+    }
+
+    Result getPrivateKey(const std::string &keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const {
+        return ResultInvalidConfiguration;
+    }
+};
+
+// Stats
 typedef accumulator_set<uint64_t, stats<tag::mean, tag::p_square_quantile> > LatencyAccumulator;
 LatencyAccumulator e2eLatencyAccumulator(quantile_probability = 0.99);
 std::vector<pulsar::Producer> producerList;
@@ -230,7 +264,12 @@ int main(int argc, char** argv) {
             "Use only is batch-size > 1, Default is 128 KB") //
 
     ("max-batch-publish-delay-in-ms", po::value<long>(&args.batchingMaxPublishDelayMs)->default_value(3000),
-            "Use only is batch-size > 1, Default is 3 seconds");
+            "Use only is batch-size > 1, Default is 3 seconds") //
+
+    ("encryption-key-name,k", po::value<std::string>(&args.encKeyName)->default_value(""), "The public key name to encrypt payload") //
+
+    ("encryption-key-value-file,f", po::value<std::string>(&args.encKeyValueFile)->default_value(""),
+            "The file which contains the public key to encrypt payload"); //
 
     po::options_description hidden;
     hidden.add_options()("topic", po::value<std::string>(&args.topic), "Topic name");
@@ -291,6 +330,12 @@ int main(int argc, char** argv) {
 
     // Block if queue is full else we will start seeing errors in sendAsync
     producerConf.setBlockIfQueueFull(true);
+    boost::shared_ptr<EncKeyReader> keyReader = boost::make_shared<EncKeyReader>(args.encKeyValueFile);
+    if (!args.encKeyName.empty()) {
+        producerConf.addEncryptionKey(args.encKeyName);
+        producerConf.setCryptoKeyReader(keyReader);
+    }
+
     pulsar::ClientConfiguration conf;
     conf.setUseTls(args.isUseTls);
     conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 8191d99f4..0c3f2b427 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -65,6 +65,42 @@ static void sendCallBack(Result r, const Message& msg, std::string prefix, doubl
     sendCallBack(r, msg, prefix);
 }
 
+class EncKeyReader : public CryptoKeyReader {
+   private:
+    void readFile(std::string fileName, std::string& fileContents) const {
+        std::ifstream ifs(fileName);
+        std::stringstream fileStream;
+        fileStream << ifs.rdbuf();
+
+        fileContents = fileStream.str();
+    }
+
+   public:
+    EncKeyReader() {}
+
+    Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+                        EncryptionKeyInfo& encKeyInfo) const {
+        std::string CERT_FILE_PATH =
+            "../../pulsar-broker/src/test/resources/certificate/public-key." + keyName;
+        std::string keyContents;
+        readFile(CERT_FILE_PATH, keyContents);
+
+        encKeyInfo.setKey(keyContents);
+        return ResultOk;
+    }
+
+    Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+                         EncryptionKeyInfo& encKeyInfo) const {
+        std::string CERT_FILE_PATH =
+            "../../pulsar-broker/src/test/resources/certificate/private-key." + keyName;
+        std::string keyContents;
+        readFile(CERT_FILE_PATH, keyContents);
+
+        encKeyInfo.setKey(keyContents);
+        return ResultOk;
+    }
+};
+
 TEST(BasicEndToEndTest, testBatchMessages) {
     ClientConfiguration config;
     Client client(lookupUrl);
@@ -1103,3 +1139,173 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
         ASSERT_TRUE(receivedMsgIndex.find(boost::lexical_cast<std::string>(i)) != receivedMsgIndex.end());
     }
 }
+
+TEST(BasicEndToEndTest, testRSAEncryption) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "persistent://prop/unit/ns1/my-rsaenctopic";
+    std::string subName = "my-sub-name";
+    Producer producer;
+
+    boost::shared_ptr<EncKeyReader> keyReader = boost::make_shared<EncKeyReader>();
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    ConsumerConfiguration consConfig;
+    consConfig.setCryptoKeyReader(keyReader);
+    // consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
+
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // Send 1000 messages synchronously
+    std::string msgContent = "msg-content";
+    LOG_INFO("Publishing 1000 messages synchronously");
+    int msgNum = 0;
+    for (; msgNum < 1000; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    LOG_INFO("Trying to receive 1000 messages");
+    Message msgReceived;
+    for (msgNum = 0; msgNum < 1000; msgNum++) {
+        consumer.receive(msgReceived, 1000);
+        LOG_DEBUG("Received message :" << msgReceived.getMessageId());
+        std::stringstream expected;
+        expected << msgContent << msgNum;
+        ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
+    }
+
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+    ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+    ASSERT_EQ(ResultOk, producer.close());
+    ASSERT_EQ(ResultOk, client.close());
+}
+
+TEST(BasicEndToEndTest, testEncryptionFailure) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "persistent://prop/unit/ns1/my-rsaencfailtopic";
+    std::string subName = "my-sub-name";
+    Producer producer;
+
+    boost::shared_ptr<EncKeyReader> keyReader = boost::make_shared<EncKeyReader>();
+
+    ConsumerConfiguration consConfig;
+
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    std::string msgContent = "msg-content";
+    int msgNum = 0;
+    int totalMsgs = 10;
+    std::stringstream stream;
+    stream << msgContent << msgNum;
+    Message msg = MessageBuilder().setContent(msgContent).build();
+
+    // 1. Non existing key
+
+    {
+        ProducerConfiguration prodConf;
+        prodConf.setCryptoKeyReader(keyReader);
+        prodConf.addEncryptionKey("client-non-existing-rsa.pem");
+
+        Promise<Result, Producer> producerPromise;
+        client.createProducerAsync(topicName, prodConf, WaitForCallbackValue<Producer>(producerPromise));
+        Future<Result, Producer> producerFuture = producerPromise.getFuture();
+        result = producerFuture.get(producer);
+        ASSERT_EQ(ResultOk, result);
+
+        ASSERT_EQ(ResultCryptoError, producer.send(msg));
+    }
+
+    // 2. Add valid key
+    {
+        ProducerConfiguration prodConf;
+        prodConf.setCryptoKeyReader(keyReader);
+        prodConf.addEncryptionKey("client-rsa.pem");
+
+        Promise<Result, Producer> producerPromise;
+        client.createProducerAsync(topicName, prodConf, WaitForCallbackValue<Producer>(producerPromise));
+        Future<Result, Producer> producerFuture = producerPromise.getFuture();
+        result = producerFuture.get(producer);
+        ASSERT_EQ(ResultOk, result);
+
+        msgNum++;
+        for (; msgNum < totalMsgs; msgNum++) {
+            std::stringstream stream;
+            stream << msgContent << msgNum;
+            Message msg = MessageBuilder().setContent(stream.str()).build();
+            ASSERT_EQ(ResultOk, producer.send(msg));
+        }
+    }
+
+    // 3. Key reader is not set by consumer
+    Message msgReceived;
+    ASSERT_EQ(ResultTimeout, consumer.receive(msgReceived, 5000));
+    ASSERT_EQ(ResultOk, consumer.close());
+
+    // 4. Set consumer config to consume even if decryption fails
+    consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
+
+    Promise<Result, Consumer> consumerPromise2;
+    client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise2));
+    consumerFuture = consumerPromise2.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, consumer.receive(msgReceived, 1000));
+
+    // Received message 0. Skip message comparision since its encrypted
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(ResultOk, consumer.close());
+
+    // 5. Set valid keyreader and consume messages
+    msgNum = 1;
+    consConfig.setCryptoKeyReader(keyReader);
+    consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Promise<Result, Consumer> consumerPromise3;
+    client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise3));
+    consumerFuture = consumerPromise3.getFuture();
+    result = consumerFuture.get(consumer);
+
+    for (; msgNum < totalMsgs - 1; msgNum++) {
+        ASSERT_EQ(ResultOk, consumer.receive(msgReceived, 1000));
+        LOG_DEBUG("Received message :" << msgReceived.getMessageId());
+        std::stringstream expected;
+        expected << msgContent << msgNum;
+        ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
+    }
+    ASSERT_EQ(ResultOk, consumer.close());
+
+    // 6. Discard message if decryption fails
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::DISCARD);
+
+    Promise<Result, Consumer> consumerPromise4;
+    client.subscribeAsync(topicName, subName, consConfig2, WaitForCallbackValue<Consumer>(consumerPromise4));
+    consumerFuture = consumerPromise4.getFuture();
+    result = consumerFuture.get(consumer);
+
+    // Since messag is discarded, no message will be received.
+    ASSERT_EQ(ResultTimeout, consumer.receive(msgReceived, 5000));
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services