You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cc...@apache.org on 2021/09/03 16:07:27 UTC

[pulsar-client-go] branch master updated: Encryption support producer (#560)

This is an automated email from the ASF dual-hosted git repository.

cckellogg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d3a9cc  Encryption support  producer (#560)
1d3a9cc is described below

commit 1d3a9cc80e63e67a9769d351539ac111c55e56ae
Author: Garule Prabhudas <pr...@gmail.com>
AuthorDate: Fri Sep 3 21:37:24 2021 +0530

    Encryption support  producer (#560)
    
    * add ability to encrypt messages
    - use base crypto package for encryption
    
    * fix typo
    
    * lint fixes
    
    * address review suggestions
    
    * revert go mod
    
    * remove encryption context
     - move it to Consumer MR
    
    * try to fix check issues
    
    * remove unused code
    
    * remove embedded crypto struct
    
    * review suggestions
    
    * remove duplicate log
    
    * lint code style issue fix
    
    * return error from flush methods on serialization error
    
    * update test case and do lazy data key generation
    
    * address review changes
    
    * add comments on test case
    
    Co-authored-by: PGarule <PG...@fanatics.com>
---
 go.mod                                       |   2 -
 go.sum                                       |  15 ----
 pulsar/encryption.go                         |  36 +++++++++
 pulsar/internal/batch_builder.go             |  34 +++++----
 pulsar/internal/commands.go                  |  26 +++++--
 pulsar/internal/crypto/encryptor.go          |  27 +++++++
 pulsar/internal/crypto/noop_encryptor.go     |  33 ++++++++
 pulsar/internal/crypto/producer_encryptor.go |  73 ++++++++++++++++++
 pulsar/internal/key_based_batch_builder.go   |  19 +++--
 pulsar/producer.go                           |   3 +
 pulsar/producer_partition.go                 |  64 +++++++++++++++-
 pulsar/producer_test.go                      | 109 +++++++++++++++++++++++++++
 12 files changed, 390 insertions(+), 51 deletions(-)

diff --git a/go.mod b/go.mod
index 67a7bb7..354f5b4 100644
--- a/go.mod
+++ b/go.mod
@@ -14,7 +14,6 @@ require (
 	github.com/google/uuid v1.1.2
 	github.com/inconshreveable/mousetrap v1.0.0 // indirect
 	github.com/klauspost/compress v1.10.8
-	github.com/kr/pretty v0.2.0 // indirect
 	github.com/linkedin/goavro/v2 v2.9.8
 	github.com/opentracing/opentracing-go v1.2.0
 	github.com/pierrec/lz4 v2.0.5+incompatible
@@ -27,7 +26,6 @@ require (
 	github.com/stretchr/testify v1.5.1
 	go.uber.org/atomic v1.7.0
 	golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
-	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
 )
 
 replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
diff --git a/go.sum b/go.sum
index 2ccd39e..8b372c7 100644
--- a/go.sum
+++ b/go.sum
@@ -22,7 +22,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
-github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
 github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
@@ -32,8 +31,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
-github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
-github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
 github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
 github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ=
@@ -73,8 +70,6 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
-github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
-github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
 github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
 github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
@@ -82,7 +77,6 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
 github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
-github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg=
 github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko=
 github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
@@ -169,8 +163,6 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
 github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
-github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
-github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
@@ -190,7 +182,6 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
@@ -212,7 +203,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -227,7 +217,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
-golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@@ -259,9 +248,5 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
-gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/pulsar/encryption.go b/pulsar/encryption.go
new file mode 100644
index 0000000..aade2ca
--- /dev/null
+++ b/pulsar/encryption.go
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "github.com/apache/pulsar-client-go/pulsar/crypto"
+
+// ProducerEncryptionInfo encryption related fields required by the producer
+type ProducerEncryptionInfo struct {
+	// KeyReader read RSA public/private key pairs
+	KeyReader crypto.KeyReader
+
+	// MessageCrypto used to encrypt and decrypt the data and session keys
+	MessageCrypto crypto.MessageCrypto
+
+	// Keys list of encryption key names to encrypt session key
+	Keys []string
+
+	// ProducerCryptoFailureAction action to be taken on failure of message encryption
+	// default is ProducerCryptoFailureActionFail
+	ProducerCryptoFailureAction int
+}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 3e1601f..92d6249 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -23,6 +23,7 @@ import (
 	"github.com/gogo/protobuf/proto"
 
 	"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+	"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
 	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 	"github.com/apache/pulsar-client-go/pulsar/log"
 )
@@ -35,7 +36,7 @@ type BuffersPool interface {
 type BatcherBuilderProvider func(
 	maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
-	bufferPool BuffersPool, logger log.Logger,
+	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) (BatchBuilder, error)
 
 // BatchBuilder is a interface of batch builders
@@ -51,12 +52,12 @@ type BatchBuilder interface {
 	) bool
 
 	// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
-	Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{})
+	Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}, err error)
 
 	// Flush all the messages buffered in multiple batches and wait until all
 	// messages have been successfully persisted.
 	FlushBatches() (
-		batchData []Buffer, sequenceID []uint64, callbacks [][]interface{},
+		batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, errors []error,
 	)
 
 	// Return the batch container batch message in multiple batches.
@@ -93,13 +94,15 @@ type batchContainer struct {
 	buffersPool         BuffersPool
 
 	log log.Logger
+
+	encryptor crypto.Encryptor
 }
 
 // newBatchContainer init a batchContainer
 func newBatchContainer(
 	maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
-	bufferPool BuffersPool, logger log.Logger,
+	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) batchContainer {
 
 	bc := batchContainer{
@@ -122,6 +125,7 @@ func newBatchContainer(
 		compressionProvider: getCompressionProvider(compressionType, level),
 		buffersPool:         bufferPool,
 		log:                 logger,
+		encryptor:           encryptor,
 	}
 
 	if compressionType != pb.CompressionType_NONE {
@@ -135,12 +139,12 @@ func newBatchContainer(
 func NewBatchBuilder(
 	maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
-	bufferPool BuffersPool, logger log.Logger,
+	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) (BatchBuilder, error) {
 
 	bc := newBatchContainer(
 		maxMessages, maxBatchSize, producerName, producerID, compressionType,
-		level, bufferPool, logger,
+		level, bufferPool, logger, encryptor,
 	)
 
 	return &bc, nil
@@ -211,11 +215,11 @@ func (bc *batchContainer) reset() {
 
 // Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
 func (bc *batchContainer) Flush() (
-	batchData Buffer, sequenceID uint64, callbacks []interface{},
+	batchData Buffer, sequenceID uint64, callbacks []interface{}, err error,
 ) {
 	if bc.numMessages == 0 {
 		// No-Op for empty batch
-		return nil, 0, nil
+		return nil, 0, nil, nil
 	}
 	bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages)
 
@@ -229,19 +233,21 @@ func (bc *batchContainer) Flush() (
 	if buffer == nil {
 		buffer = NewBuffer(int(uncompressedSize * 3 / 2))
 	}
-	serializeBatch(
-		buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider,
-	)
+
+	if err = serializeBatch(
+		buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, bc.encryptor,
+	); err == nil { // no error in serializing Batch
+		sequenceID = bc.cmdSend.Send.GetSequenceId()
+	}
 
 	callbacks = bc.callbacks
-	sequenceID = bc.cmdSend.Send.GetSequenceId()
 	bc.reset()
-	return buffer, sequenceID, callbacks
+	return buffer, sequenceID, callbacks, err
 }
 
 // FlushBatches only for multiple batches container
 func (bc *batchContainer) FlushBatches() (
-	batchData []Buffer, sequenceID []uint64, callbacks [][]interface{},
+	batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, errors []error,
 ) {
 	panic("single batch container not support FlushBatches(), please use Flush() instead")
 }
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index af6bac5..b91c0b6 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -24,6 +24,7 @@ import (
 	"github.com/gogo/protobuf/proto"
 
 	"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+	"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
 	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 )
 
@@ -221,9 +222,21 @@ func serializeBatch(wb Buffer,
 	cmdSend *pb.BaseCommand,
 	msgMetadata *pb.MessageMetadata,
 	uncompressedPayload Buffer,
-	compressionProvider compression.Provider) {
+	compressionProvider compression.Provider,
+	encryptor crypto.Encryptor) error {
 	// Wire format
 	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+
+	// compress the payload
+	compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice())
+
+	// encrypt the compressed payload
+	encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata)
+	if err != nil {
+		// error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail
+		return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err)
+	}
+
 	cmdSize := uint32(proto.Size(cmdSend))
 	msgMetadataSize := uint32(proto.Size(msgMetadata))
 
@@ -234,7 +247,7 @@ func serializeBatch(wb Buffer,
 	// Write cmd
 	wb.WriteUint32(cmdSize)
 	wb.ResizeIfNeeded(cmdSize)
-	_, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
+	_, err = cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
 	if err != nil {
 		panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err))
 	}
@@ -255,12 +268,8 @@ func serializeBatch(wb Buffer,
 	}
 	wb.WrittenBytes(msgMetadataSize)
 
-	// Make sure the buffer has enough space to hold the compressed data
-	// and perform the compression in-place
-	maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes())))
-	wb.ResizeIfNeeded(maxSize)
-	b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice())
-	wb.WrittenBytes(uint32(len(b)))
+	// add payload to the buffer
+	wb.Write(encryptedPayload)
 
 	// Write checksum at created checksum-placeholder
 	frameEndIdx := wb.WriterIndex()
@@ -269,6 +278,7 @@ func serializeBatch(wb Buffer,
 	// Set Sizes and checksum in the fixed-size header
 	wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
 	wb.PutUint32(checksum, checksumIdx)
+	return nil
 }
 
 // ConvertFromStringMap convert a string map to a KeyValue []byte
diff --git a/pulsar/internal/crypto/encryptor.go b/pulsar/internal/crypto/encryptor.go
new file mode 100644
index 0000000..7fdbf06
--- /dev/null
+++ b/pulsar/internal/crypto/encryptor.go
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+// Encryptor support encryption
+type Encryptor interface {
+	Encrypt([]byte, *pb.MessageMetadata) ([]byte, error)
+}
diff --git a/pulsar/internal/crypto/noop_encryptor.go b/pulsar/internal/crypto/noop_encryptor.go
new file mode 100644
index 0000000..4512e7b
--- /dev/null
+++ b/pulsar/internal/crypto/noop_encryptor.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+type noopEncryptor struct{}
+
+func NewNoopEncryptor() Encryptor {
+	return &noopEncryptor{}
+}
+
+// Encrypt Noop ecryptor
+func (e *noopEncryptor) Encrypt(data []byte, msgMetadata *pb.MessageMetadata) ([]byte, error) {
+	return data, nil
+}
diff --git a/pulsar/internal/crypto/producer_encryptor.go b/pulsar/internal/crypto/producer_encryptor.go
new file mode 100644
index 0000000..a5b972d
--- /dev/null
+++ b/pulsar/internal/crypto/producer_encryptor.go
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+	"fmt"
+
+	"github.com/apache/pulsar-client-go/pulsar/crypto"
+	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+	"github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+type producerEncryptor struct {
+	keys                        []string
+	keyReader                   crypto.KeyReader
+	messageCrypto               crypto.MessageCrypto
+	logger                      log.Logger
+	producerCryptoFailureAction int
+}
+
+func NewProducerEncryptor(keys []string,
+	keyReader crypto.KeyReader,
+	messageCrypto crypto.MessageCrypto,
+	producerCryptoFailureAction int,
+	logger log.Logger) Encryptor {
+	return &producerEncryptor{
+		keys:                        keys,
+		keyReader:                   keyReader,
+		messageCrypto:               messageCrypto,
+		logger:                      logger,
+		producerCryptoFailureAction: producerCryptoFailureAction,
+	}
+}
+
+// Encrypt producer encryptor
+func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata *pb.MessageMetadata) ([]byte, error) {
+	// encrypt payload
+	encryptedPayload, err := e.messageCrypto.Encrypt(e.keys,
+		e.keyReader,
+		crypto.NewMessageMetadataSupplier(msgMetadata),
+		payload)
+
+	// error encryping the payload
+	if err != nil {
+		// error occurred in encrypting the payload
+		// crypto ProducerCryptoFailureAction is set to send
+		// send unencrypted message
+		if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend {
+			e.logger.
+				WithError(err).
+				Warnf("Encryption failed for payload sending unencrypted message ProducerCryptoFailureAction is set to send")
+			return payload, nil
+		}
+
+		return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload :%v", err)
+	}
+	return encryptedPayload, nil
+}
diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go
index 545c2c8..940aa9f 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -24,6 +24,7 @@ import (
 	"time"
 
 	"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+	"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
 	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 	"github.com/apache/pulsar-client-go/pulsar/log"
 )
@@ -85,14 +86,14 @@ func (h *keyBasedBatches) Val(key string) *batchContainer {
 func NewKeyBasedBatchBuilder(
 	maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
-	bufferPool BuffersPool, logger log.Logger,
+	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) (BatchBuilder, error) {
 
 	bb := &keyBasedBatchContainer{
 		batches: newKeyBasedBatches(),
 		batchContainer: newBatchContainer(
 			maxMessages, maxBatchSize, producerName, producerID,
-			compressionType, level, bufferPool, logger,
+			compressionType, level, bufferPool, logger, encryptor,
 		),
 		compressionType: compressionType,
 		level:           level,
@@ -144,7 +145,7 @@ func (bc *keyBasedBatchContainer) Add(
 		// create batchContainer for new key
 		t := newBatchContainer(
 			bc.maxMessages, bc.maxBatchSize, bc.producerName, bc.producerID,
-			bc.compressionType, bc.level, bc.buffersPool, bc.log,
+			bc.compressionType, bc.level, bc.buffersPool, bc.log, bc.encryptor,
 		)
 		batchPart = &t
 		bc.batches.Add(msgKey, &t)
@@ -179,11 +180,11 @@ func (bc *keyBasedBatchContainer) reset() {
 // Flush all the messages buffered in multiple batches and wait until all
 // messages have been successfully persisted.
 func (bc *keyBasedBatchContainer) FlushBatches() (
-	batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{},
+	batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{}, errors []error,
 ) {
 	if bc.numMessages == 0 {
 		// No-Op for empty batch
-		return nil, nil, nil
+		return nil, nil, nil, nil
 	}
 
 	bc.log.Debug("keyBasedBatchContainer flush: messages: ", bc.numMessages)
@@ -194,6 +195,7 @@ func (bc *keyBasedBatchContainer) FlushBatches() (
 	batchesData = make([]Buffer, batchesLen)
 	sequenceIDs = make([]uint64, batchesLen)
 	callbacks = make([][]interface{}, batchesLen)
+	errors = make([]error, batchesLen)
 
 	bc.batches.l.RLock()
 	defer bc.batches.l.RUnlock()
@@ -203,21 +205,22 @@ func (bc *keyBasedBatchContainer) FlushBatches() (
 	sort.Strings(sortedKeys)
 	for _, k := range sortedKeys {
 		container := bc.batches.containers[k]
-		b, s, c := container.Flush()
+		b, s, c, err := container.Flush()
 		if b != nil {
 			batchesData[idx] = b
 			sequenceIDs[idx] = s
 			callbacks[idx] = c
+			errors[idx] = err
 		}
 		idx++
 	}
 
 	bc.reset()
-	return batchesData, sequenceIDs, callbacks
+	return batchesData, sequenceIDs, callbacks, errors
 }
 
 func (bc *keyBasedBatchContainer) Flush() (
-	batchData Buffer, sequenceID uint64, callbacks []interface{},
+	batchData Buffer, sequenceID uint64, callbacks []interface{}, err error,
 ) {
 	panic("multi batches container not support Flush(), please use FlushBatches() instead")
 }
diff --git a/pulsar/producer.go b/pulsar/producer.go
index ffbdebb..07a8f75 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -163,6 +163,9 @@ type ProducerOptions struct {
 	// PartitionsAutoDiscoveryInterval is the time interval for the background process to discover new partitions
 	// Default is 1 minute
 	PartitionsAutoDiscoveryInterval time.Duration
+
+	// Encryption necessary fields to perform encryption of message
+	Encryption *ProducerEncryptionInfo
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index ca6850d..4ae4e00 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,11 +19,14 @@ package pulsar
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"sync/atomic"
 	"time"
 
+	"github.com/apache/pulsar-client-go/pulsar/crypto"
 	"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+	internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
 
 	"github.com/gogo/protobuf/proto"
 
@@ -130,6 +133,24 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 		p.producerName = options.Name
 	}
 
+	encryption := options.Encryption
+	// add default message crypto if not provided
+	if encryption != nil && len(encryption.Keys) > 0 {
+		if encryption.KeyReader == nil {
+			return nil, fmt.Errorf("encryption is enabled, KeyReader can not be nil")
+		}
+
+		if encryption.MessageCrypto == nil {
+			logCtx := fmt.Sprintf("[%v] [%v] [%v]", p.topic, p.producerName, p.producerID)
+			messageCrypto, err := crypto.NewDefaultMessageCrypto(logCtx, true, logger)
+			if err != nil {
+				logger.WithError(err).Error("Unable to get MessageCrypto instance. Producer creation is abandoned")
+				return nil, err
+			}
+			p.options.Encryption.MessageCrypto = messageCrypto
+		}
+	}
+
 	err := p.grabCnx()
 	if err != nil {
 		logger.WithError(err).Error("Failed to create producer")
@@ -147,6 +168,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 	if p.options.SendTimeout > 0 {
 		go p.failTimeoutMessages()
 	}
+
 	go p.runEventsLoop()
 
 	return p, nil
@@ -205,13 +227,25 @@ func (p *partitionProducer) grabCnx() error {
 	}
 
 	p.producerName = res.Response.ProducerSuccess.GetProducerName()
+
+	var encryptor internalcrypto.Encryptor
+	if p.options.Encryption != nil {
+		encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
+			p.options.Encryption.KeyReader,
+			p.options.Encryption.MessageCrypto,
+			p.options.Encryption.ProducerCryptoFailureAction, p.log)
+	} else {
+		encryptor = internalcrypto.NewNoopEncryptor()
+	}
+
 	if p.options.DisableBatching {
 		provider, _ := GetBatcherBuilderProvider(DefaultBatchBuilder)
 		p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
 			p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
 			compression.Level(p.options.CompressionLevel),
 			p,
-			p.log)
+			p.log,
+			encryptor)
 		if err != nil {
 			return err
 		}
@@ -225,7 +259,8 @@ func (p *partitionProducer) grabCnx() error {
 			p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
 			compression.Level(p.options.CompressionLevel),
 			p,
-			p.log)
+			p.log,
+			encryptor)
 		if err != nil {
 			return err
 		}
@@ -470,11 +505,22 @@ type pendingItem struct {
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
-	batchData, sequenceID, callbacks := p.batchBuilder.Flush()
+	batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
 	if batchData == nil {
 		return
 	}
 
+	// error occurred in batch flush
+	// report it using callback
+	if err != nil {
+		for _, cb := range callbacks {
+			if sr, ok := cb.(*sendRequest); ok {
+				sr.callback(nil, sr.msg, err)
+			}
+		}
+		return
+	}
+
 	p.pendingQueue.Put(&pendingItem{
 		sentAt:       time.Now(),
 		batchData:    batchData,
@@ -589,12 +635,22 @@ func (p *partitionProducer) failTimeoutMessages() {
 }
 
 func (p *partitionProducer) internalFlushCurrentBatches() {
-	batchesData, sequenceIDs, callbacks := p.batchBuilder.FlushBatches()
+	batchesData, sequenceIDs, callbacks, errors := p.batchBuilder.FlushBatches()
 	if batchesData == nil {
 		return
 	}
 
 	for i := range batchesData {
+		// error occurred in processing batch
+		// report it using callback
+		if errors[i] != nil {
+			for _, cb := range callbacks[i] {
+				if sr, ok := cb.(*sendRequest); ok {
+					sr.callback(nil, sr.msg, errors[i])
+				}
+			}
+			continue
+		}
 		if batchesData[i] == nil {
 			continue
 		}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index dc7a5ef..f914017 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -30,6 +30,8 @@ import (
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 	"github.com/stretchr/testify/assert"
 
+	"github.com/apache/pulsar-client-go/pulsar/crypto"
+	plog "github.com/apache/pulsar-client-go/pulsar/log"
 	log "github.com/sirupsen/logrus"
 )
 
@@ -996,6 +998,113 @@ func TestSendContextExpired(t *testing.T) {
 	makeHTTPCall(t, http.MethodDelete, quotaURL, "")
 }
 
+func TestProducerWithRSAEncryption(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	ctx := context.Background()
+
+	msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, plog.DefaultNopLogger())
+	assert.Nil(t, err)
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: false,
+		Encryption: &ProducerEncryptionInfo{
+			KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+				"crypto/testdata/pri_key_rsa.pem"),
+			MessageCrypto: msgCrypto,
+			Keys:          []string{"my-app.key"},
+		},
+		Schema: NewStringSchema(nil),
+	})
+
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		if _, err := producer.Send(ctx, &ProducerMessage{
+			Value: fmt.Sprintf("hello-%d", i),
+		}); err != nil {
+			log.Fatal(err)
+		}
+	}
+}
+
+func TestProducuerCreationFailOnNilKeyReader(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, plog.DefaultNopLogger())
+	assert.Nil(t, err)
+
+	// create producer
+	// Producer creation should fail as keyreader is nil
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: false,
+		Encryption: &ProducerEncryptionInfo{
+			MessageCrypto: msgCrypto,
+			Keys:          []string{"my-app.key"},
+		},
+		Schema: NewStringSchema(nil),
+	})
+
+	assert.NotNil(t, err)
+	assert.Nil(t, producer)
+}
+
+func TestProducuerSendFailOnInvalidKey(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, plog.DefaultNopLogger())
+	assert.Nil(t, err)
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: false,
+		Encryption: &ProducerEncryptionInfo{
+			KeyReader: crypto.NewFileKeyReader("crypto/testdata/invalid_pub_key_rsa.pem",
+				"crypto/testdata/pri_key_rsa.pem"),
+			MessageCrypto: msgCrypto,
+			Keys:          []string{"my-app.key"},
+		},
+		Schema: NewStringSchema(nil),
+	})
+
+	assert.Nil(t, err)
+	assert.NotNil(t, producer)
+
+	// producer should send return an error as keyreader is configured with wrong pub.key and fail while encrypting message
+	mid, err := producer.Send(context.Background(), &ProducerMessage{
+		Value: "test",
+	})
+
+	assert.NotNil(t, err)
+	assert.Nil(t, mid)
+}
+
 type noopProduceInterceptor struct{}
 
 func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}