You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/07/12 02:59:25 UTC

[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #560: Encryption support producer

cckellogg commented on a change in pull request #560:
URL: https://github.com/apache/pulsar-client-go/pull/560#discussion_r667584019



##########
File path: pulsar/internal/commands.go
##########
@@ -269,6 +270,117 @@ func serializeBatch(wb Buffer,
 	wb.PutUint32(checksum, checksumIdx)
 }
 
+// copy of the method serializeBatch(....) with an extension to encrypt payload
+func serializeBatchWithEncryption(wb Buffer,

Review comment:
       What are the differences between serializeBatch and serializeBatchWithEncryption? It looks like a lot of duplicate code?

##########
File path: pulsar/internal/commands.go
##########
@@ -269,6 +270,117 @@ func serializeBatch(wb Buffer,
 	wb.PutUint32(checksum, checksumIdx)
 }
 
+// copy of the method serializeBatch(....) with an extension to encrypt payload
+func serializeBatchWithEncryption(wb Buffer,
+	cmdSend *pb.BaseCommand,
+	msgMetadata *pb.MessageMetadata,
+	uncompressedPayload Buffer,
+	compressionProvider compression.Provider,
+	KeyReader crypto.KeyReader,
+	encryptionKeys []string,
+	msgCrypto crypto.MessageCrypto,
+	cryptoFailureAction int,
+) {
+	// Wire format
+	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+
+	// compress the payload
+	compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice())
+
+	encryptedPayload := encryptPayload(msgMetadata,
+		msgCrypto,
+		KeyReader,
+		encryptionKeys,
+		compressedPayload,
+		cryptoFailureAction)
+
+	// there was a error in encrypting the payload and
+	// crypto failure action is set to crypto.ProducerCryptoFailureActionFail
+	if encryptedPayload == nil {

Review comment:
       I'm not sure panic is the correct behavior here. 

##########
File path: pulsar/internal/commands.go
##########
@@ -269,6 +270,117 @@ func serializeBatch(wb Buffer,
 	wb.PutUint32(checksum, checksumIdx)
 }
 
+// copy of the method serializeBatch(....) with an extension to encrypt payload
+func serializeBatchWithEncryption(wb Buffer,
+	cmdSend *pb.BaseCommand,
+	msgMetadata *pb.MessageMetadata,
+	uncompressedPayload Buffer,
+	compressionProvider compression.Provider,
+	KeyReader crypto.KeyReader,
+	encryptionKeys []string,
+	msgCrypto crypto.MessageCrypto,
+	cryptoFailureAction int,
+) {
+	// Wire format
+	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+
+	// compress the payload
+	compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice())
+
+	encryptedPayload := encryptPayload(msgMetadata,
+		msgCrypto,
+		KeyReader,
+		encryptionKeys,
+		compressedPayload,
+		cryptoFailureAction)
+
+	// there was a error in encrypting the payload and
+	// crypto failure action is set to crypto.ProducerCryptoFailureActionFail
+	if encryptedPayload == nil {
+		panic(fmt.Errorf("error in encrypting the payload and message is not sent"))
+	}
+
+	compressedPayload = encryptedPayload
+
+	cmdSize := uint32(proto.Size(cmdSend))
+	msgMetadataSize := uint32(proto.Size(msgMetadata))
+
+	frameSizeIdx := wb.WriterIndex()
+	wb.WriteUint32(0) // Skip frame size until we now the size
+	frameStartIdx := wb.WriterIndex()
+
+	// Write cmd
+	wb.WriteUint32(cmdSize)
+	wb.ResizeIfNeeded(cmdSize)
+	_, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
+	if err != nil {
+		panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err))
+	}
+	wb.WrittenBytes(cmdSize)
+
+	// Create checksum placeholder
+	wb.WriteUint16(magicCrc32c)
+	checksumIdx := wb.WriterIndex()
+	wb.WriteUint32(0) // skip 4 bytes of checksum
+
+	// Write metadata
+	metadataStartIdx := wb.WriterIndex()
+	wb.WriteUint32(msgMetadataSize)
+	wb.ResizeIfNeeded(msgMetadataSize)
+	_, err = msgMetadata.MarshalToSizedBuffer(wb.WritableSlice()[:msgMetadataSize])
+	if err != nil {
+		panic(fmt.Sprintf("Protobuf error when serializing msgMetadata: %v", err))
+	}
+	wb.WrittenBytes(msgMetadataSize)
+
+	wb.Write(compressedPayload)
+
+	// Write checksum at created checksum-placeholder
+	frameEndIdx := wb.WriterIndex()
+	checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx))
+
+	// Set Sizes and checksum in the fixed-size header
+	wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
+	wb.PutUint32(checksum, checksumIdx)
+}
+
+func encryptPayload(msgMetadata *pb.MessageMetadata,
+	msgCrypto crypto.MessageCrypto,
+	KeyReader crypto.KeyReader,
+	encryptionKeys []string,
+	compressedPayload []byte,
+	cryptoFailureAction int,
+) []byte {
+
+	// encryption is enabled but KeyReader interface is not implemented
+	if KeyReader == nil {
+		// crypto failure action is set to send
+		// so send unencrypted message
+		if cryptoFailureAction == crypto.ProducerCryptoFailureActionSend {
+			return compressedPayload
+		}
+		return nil
+	}
+
+	// encrypt payload
+	encryptedPayload, err := msgCrypto.Encrypt(encryptionKeys,
+		KeyReader,
+		crypto.NewMessageMetadataSupplier(msgMetadata),
+		compressedPayload)
+
+	if err != nil {

Review comment:
       Should this error be log somewhere?

##########
File path: pulsar/producer_partition.go
##########
@@ -148,6 +179,26 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 	return p, nil
 }
 
+func (p *partitionProducer) sheduleDataKeyUpdate() {
+	for t := range p.dataKeyTicker.C {
+		p.log.Infof("Refreshing data key :%v", t)
+		err := p.updateDataKey()
+		if err != nil {
+			p.log.Errorf("Error refreshing data key : %v", err)
+		}
+	}
+}
+
+func (p *partitionProducer) updateDataKey() error {

Review comment:
       Does this need to be thread safe? Also, can we move the key refreshing to another MR.

##########
File path: pulsar/producer_partition.go
##########
@@ -197,30 +248,32 @@ func (p *partitionProducer) grabCnx() error {
 	}
 
 	p.producerName = res.Response.ProducerSuccess.GetProducerName()
+
+	var provider internal.BatcherBuilderProvider
 	if p.options.DisableBatching {
-		provider, _ := GetBatcherBuilderProvider(DefaultBatchBuilder)
-		p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
-			p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
-			compression.Level(p.options.CompressionLevel),
-			p,
-			p.log)
-		if err != nil {
-			return err
-		}
+		provider, _ = GetBatcherBuilderProvider(DefaultBatchBuilder)
+
 	} else if p.batchBuilder == nil {
-		provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType)

Review comment:
       Why change this? The `p` masks the partitionProducer and make the code harder to follow.

##########
File path: pulsar/internal/commands.go
##########
@@ -269,6 +270,117 @@ func serializeBatch(wb Buffer,
 	wb.PutUint32(checksum, checksumIdx)
 }
 
+// copy of the method serializeBatch(....) with an extension to encrypt payload
+func serializeBatchWithEncryption(wb Buffer,
+	cmdSend *pb.BaseCommand,
+	msgMetadata *pb.MessageMetadata,
+	uncompressedPayload Buffer,
+	compressionProvider compression.Provider,
+	KeyReader crypto.KeyReader,
+	encryptionKeys []string,
+	msgCrypto crypto.MessageCrypto,
+	cryptoFailureAction int,
+) {
+	// Wire format
+	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+
+	// compress the payload
+	compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice())
+
+	encryptedPayload := encryptPayload(msgMetadata,
+		msgCrypto,
+		KeyReader,
+		encryptionKeys,
+		compressedPayload,
+		cryptoFailureAction)
+
+	// there was a error in encrypting the payload and
+	// crypto failure action is set to crypto.ProducerCryptoFailureActionFail
+	if encryptedPayload == nil {
+		panic(fmt.Errorf("error in encrypting the payload and message is not sent"))
+	}
+
+	compressedPayload = encryptedPayload
+
+	cmdSize := uint32(proto.Size(cmdSend))
+	msgMetadataSize := uint32(proto.Size(msgMetadata))
+
+	frameSizeIdx := wb.WriterIndex()
+	wb.WriteUint32(0) // Skip frame size until we now the size
+	frameStartIdx := wb.WriterIndex()
+
+	// Write cmd
+	wb.WriteUint32(cmdSize)
+	wb.ResizeIfNeeded(cmdSize)
+	_, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
+	if err != nil {
+		panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err))
+	}
+	wb.WrittenBytes(cmdSize)
+
+	// Create checksum placeholder
+	wb.WriteUint16(magicCrc32c)
+	checksumIdx := wb.WriterIndex()
+	wb.WriteUint32(0) // skip 4 bytes of checksum
+
+	// Write metadata
+	metadataStartIdx := wb.WriterIndex()
+	wb.WriteUint32(msgMetadataSize)
+	wb.ResizeIfNeeded(msgMetadataSize)
+	_, err = msgMetadata.MarshalToSizedBuffer(wb.WritableSlice()[:msgMetadataSize])
+	if err != nil {
+		panic(fmt.Sprintf("Protobuf error when serializing msgMetadata: %v", err))
+	}
+	wb.WrittenBytes(msgMetadataSize)
+
+	wb.Write(compressedPayload)
+
+	// Write checksum at created checksum-placeholder
+	frameEndIdx := wb.WriterIndex()
+	checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx))
+
+	// Set Sizes and checksum in the fixed-size header
+	wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
+	wb.PutUint32(checksum, checksumIdx)
+}
+
+func encryptPayload(msgMetadata *pb.MessageMetadata,

Review comment:
       Should this return an error?

##########
File path: pulsar/internal/batch_builder.go
##########
@@ -128,24 +137,56 @@ func newBatchContainer(
 		bc.msgMetadata.Compression = &compressionType
 	}
 
+	for _, opt := range options {
+		opt(&bc)
+	}
+
 	return bc
 }
 
 // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
 func NewBatchBuilder(
 	maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
-	bufferPool BuffersPool, logger log.Logger,
+	bufferPool BuffersPool, logger log.Logger, options ...func(*batchContainer),
 ) (BatchBuilder, error) {
 
 	bc := newBatchContainer(
 		maxMessages, maxBatchSize, producerName, producerID, compressionType,
-		level, bufferPool, logger,
+		level, bufferPool, logger, options...,
 	)
 
 	return &bc, nil
 }
 
+// UseEncryptionKeys encryption key names to use

Review comment:
       What about removing these config options and passing an Encryptor interface/struct. I think the config functions work better when things are scoped in their own package - it make it name spacing clearer. Also, it doesn't really match the rest of the code.
   ```
   type Encrypter interface {
    Encrypt(msgMetadata, payload []byte) ([]byte, error)
   }
   ```
   If encryption is not provided there can be a noop encryter
   

##########
File path: pulsar/producer.go
##########
@@ -163,6 +165,19 @@ type ProducerOptions struct {
 	// PartitionsAutoDiscoveryInterval is the time interval for the background process to discover new partitions
 	// Default is 1 minute
 	PartitionsAutoDiscoveryInterval time.Duration
+
+	// EncryptionKeys list of encryption key names to encrypt session key
+	EncryptionKeys []string

Review comment:
       Should we wrap these into a struct? Maybe something like this?
   ```
   type Encryption {
     Keys []string
     KeyReader crypto.KeyReader
     etc
   ]
   
   ```
   

##########
File path: go.mod
##########
@@ -14,10 +14,7 @@ require (
 	github.com/google/uuid v1.1.2
 	github.com/inconshreveable/mousetrap v1.0.0 // indirect
 	github.com/klauspost/compress v1.10.8

Review comment:
       Curious why is the mod and sum file changing? Can these changes be done in a separate PR?

##########
File path: pulsar/internal/commands.go
##########
@@ -269,6 +270,117 @@ func serializeBatch(wb Buffer,
 	wb.PutUint32(checksum, checksumIdx)
 }
 
+// copy of the method serializeBatch(....) with an extension to encrypt payload
+func serializeBatchWithEncryption(wb Buffer,
+	cmdSend *pb.BaseCommand,
+	msgMetadata *pb.MessageMetadata,
+	uncompressedPayload Buffer,
+	compressionProvider compression.Provider,
+	KeyReader crypto.KeyReader,
+	encryptionKeys []string,
+	msgCrypto crypto.MessageCrypto,
+	cryptoFailureAction int,
+) {
+	// Wire format
+	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+
+	// compress the payload
+	compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice())

Review comment:
       We want to compress before encrypting?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org