You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/06 08:18:55 UTC

[GitHub] [pulsar-client-go] GPrabhudas opened a new pull request #612: Encryption support ext consumer

GPrabhudas opened a new pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612


   Breakdown of PR #552
   
   This PR includes the encryption/decryption changes at consumer side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704606587



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       So, I think it's better to resend the message than doing `ack`. 
   Any other thoughts here ??




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r703778525



##########
File path: pulsar/internal/crypto/decryptor.go
##########
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+// Decryptor support decrypting of message
+type Decryptor interface {
+	Decrypt(payload []byte, msgID *pb.MessageIdData, msgMetadata *pb.MessageMetadata) ([]byte, error)
+	CryptoFailureAction() int

Review comment:
       Why is this needed? I think the Decryptor should just have Decrypt and return an error. The caller of this can then figure out what to do with the error.

##########
File path: pulsar/message.go
##########
@@ -116,6 +116,10 @@ type Message interface {
 
 	//Get the de-serialized value of the message, according the configured
 	GetSchemaValue(v interface{}) error
+
+	// GetEncryptionContext get the ecryption context of message
+	// It will be used by the application to parse undecrypted message
+	GetEncryptionContext() EncryptionContext

Review comment:
       Let's return a pointer or interface

##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)
+			return err
+		case crypto.ConsumerCryptoFailureActionDiscard:
+			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
+			return fmt.Errorf("discarding message on decryption error :%v", err)
+		case crypto.ConsumerCryptoFailureActionConsume:
+			pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
+			messages = append(messages, &message{
+				publishTime:  timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),

Review comment:
       Nit. we can use create a new variable here and move `messages := make([]*message, 0)` back to where it was
   ```
   messages := []*message{
   		{
   			
   		},
   	}
   ```

##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)
+			return err
+		case crypto.ConsumerCryptoFailureActionDiscard:
+			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
+			return fmt.Errorf("discarding message on decryption error :%v", err)
+		case crypto.ConsumerCryptoFailureActionConsume:
+			pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
+			messages = append(messages, &message{
+				publishTime:  timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+				eventTime:    timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+				key:          msgMeta.GetPartitionKey(),
+				producerName: msgMeta.GetProducerName(),
+				properties:   internal.ConvertToStringMap(msgMeta.GetProperties()),
+				topic:        pc.topic,
+				msgID: newMessageID(
+					int64(pbMsgID.GetLedgerId()),
+					int64(pbMsgID.GetEntryId()),
+					pbMsgID.GetBatchIndex(),
+					pc.partitionIdx,
+				),
+				payLoad:             headersAndPayload.ReadableSlice(),
+				schema:              pc.options.schema,
+				replicationClusters: msgMeta.GetReplicateTo(),
+				replicatedFrom:      msgMeta.GetReplicatedFrom(),
+				redeliveryCount:     response.GetRedeliveryCount(),
+				encryptionContext:   createEncryptionContext(msgMeta),

Review comment:
       Does the encryptionContext need to be added to messages that don't fail?

##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)

Review comment:
       remove this and add a log under ConsumerCryptoFailureActionDiscard so there is more context.

##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       The java clients add this to the unacked message tracker do we need to do the same?

##########
File path: pulsar/impl_message.go
##########
@@ -215,6 +233,7 @@ type message struct {
 	replicatedFrom      string
 	redeliveryCount     uint32
 	schema              Schema
+	encryptionContext   EncryptionContext

Review comment:
       Let's make this a pointer




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r717675114



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       @cckellogg looks like the CI is passed now. 
   Please do suggest if there are any other improvements on this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r712676881



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       Yes, we can do nack. I'll push changes with nack on failure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704567147



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)
+			return err
+		case crypto.ConsumerCryptoFailureActionDiscard:
+			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
+			return fmt.Errorf("discarding message on decryption error :%v", err)
+		case crypto.ConsumerCryptoFailureActionConsume:
+			pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
+			messages = append(messages, &message{
+				publishTime:  timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+				eventTime:    timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+				key:          msgMeta.GetPartitionKey(),
+				producerName: msgMeta.GetProducerName(),
+				properties:   internal.ConvertToStringMap(msgMeta.GetProperties()),
+				topic:        pc.topic,
+				msgID: newMessageID(
+					int64(pbMsgID.GetLedgerId()),
+					int64(pbMsgID.GetEntryId()),
+					pbMsgID.GetBatchIndex(),
+					pc.partitionIdx,
+				),
+				payLoad:             headersAndPayload.ReadableSlice(),
+				schema:              pc.options.schema,
+				replicationClusters: msgMeta.GetReplicateTo(),
+				replicatedFrom:      msgMeta.GetReplicatedFrom(),
+				redeliveryCount:     response.GetRedeliveryCount(),
+				encryptionContext:   createEncryptionContext(msgMeta),

Review comment:
       So the encryption context is only needed for failed messages?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704064533



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704575541



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)
+			return err
+		case crypto.ConsumerCryptoFailureActionDiscard:
+			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
+			return fmt.Errorf("discarding message on decryption error :%v", err)
+		case crypto.ConsumerCryptoFailureActionConsume:
+			pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
+			messages = append(messages, &message{
+				publishTime:  timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+				eventTime:    timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+				key:          msgMeta.GetPartitionKey(),
+				producerName: msgMeta.GetProducerName(),
+				properties:   internal.ConvertToStringMap(msgMeta.GetProperties()),
+				topic:        pc.topic,
+				msgID: newMessageID(
+					int64(pbMsgID.GetLedgerId()),
+					int64(pbMsgID.GetEntryId()),
+					pbMsgID.GetBatchIndex(),
+					pc.partitionIdx,
+				),
+				payLoad:             headersAndPayload.ReadableSlice(),
+				schema:              pc.options.schema,
+				replicationClusters: msgMeta.GetReplicateTo(),
+				replicatedFrom:      msgMeta.GetReplicatedFrom(),
+				redeliveryCount:     response.GetRedeliveryCount(),
+				encryptionContext:   createEncryptionContext(msgMeta),

Review comment:
       yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704587154



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       yeah, but i don't think it is good idea to do `ack`.
   
   Let's say user wants to consume this message again by providing proper crypto configuration, if we do `ack`, then he may not be able to consume.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r710380516



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       good point maybe we should just Nack it here so it's redelivered on fail?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy merged pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
wolfstudy merged pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704064820



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)
+			return err
+		case crypto.ConsumerCryptoFailureActionDiscard:
+			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
+			return fmt.Errorf("discarding message on decryption error :%v", err)
+		case crypto.ConsumerCryptoFailureActionConsume:
+			pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
+			messages = append(messages, &message{
+				publishTime:  timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+				eventTime:    timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+				key:          msgMeta.GetPartitionKey(),
+				producerName: msgMeta.GetProducerName(),
+				properties:   internal.ConvertToStringMap(msgMeta.GetProperties()),
+				topic:        pc.topic,
+				msgID: newMessageID(
+					int64(pbMsgID.GetLedgerId()),
+					int64(pbMsgID.GetEntryId()),
+					pbMsgID.GetBatchIndex(),
+					pc.partitionIdx,
+				),
+				payLoad:             headersAndPayload.ReadableSlice(),
+				schema:              pc.options.schema,
+				replicationClusters: msgMeta.GetReplicateTo(),
+				replicatedFrom:      msgMeta.GetReplicatedFrom(),
+				redeliveryCount:     response.GetRedeliveryCount(),
+				encryptionContext:   createEncryptionContext(msgMeta),

Review comment:
       I don't there is a need to add this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r709138229



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       ### java client
   The purpose of java client adding msg id to `tracker` is to trigger redelivery of message request to broker, so that the message can be resent.
   
   If it is not added to tracker it will not be resent to the  consumer until it restarted. And this tracker will be active only if [ackTimeoutMillis](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)  config is provided.
   
   ## go client
   > I don't think we have this so may we just need to ack the message so it's not resent?
   
   If we leave the changes as it is in this PR, i.e not adding msg id to tracker (since this client doesn't support this) => the message will not be resent.
   
   As per [Acknowledgement timeout](https://pulsar.apache.org/docs/en/concepts-messaging/#acknowledgement-timeout)  this doc, if we want to redeliver message then it is preferable to use negative acknowledgements over acknowledgement timeout. Java client is doing the later one(i.e triggering redelivery of message request on ack timeout).
   
   
   So to be able to resent the message, we can do Nack (negative acknowledgement).
   
   @cckellogg  what do you suggest ?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r709138229



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       ### java client
   The purpose of java client adding msg id to `tracker` is to trigger redelivery of message request to broker, so that the message can be resent.
   
   If it is not added to tracker it will not be resent to the  consumer until it restarted. And this tracker will be active only if `
   [ackTimeoutMillis](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)  config is provided.
   
   ## go client
   > I don't think we have this so may we just need to ack the message so it's not resent?
   
   If we leave the changes as it is in this PR, i.e not adding msg id to tracker (since this client doesn't support this) => the message will not be resent.
   
   [Acknowledgement timeout](https://pulsar.apache.org/docs/en/concepts-messaging/#acknowledgement-timeout)  this doc prefer negative acknowledgements over acknowledgement timeout. Java client is doing the later one(i.e triggering redelivery of message request on ack timeout).
   
   
   So to be able to resent the message, we can do Nack (negative acknowledgement).
   
   @cckellogg  what do you suggest ?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r714151701



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       Looks good to me will you please fix the CI issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704085300



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       Let me take a look at this..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704071172



##########
File path: pulsar/internal/crypto/decryptor.go
##########
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+// Decryptor support decrypting of message
+type Decryptor interface {
+	Decrypt(payload []byte, msgID *pb.MessageIdData, msgMetadata *pb.MessageMetadata) ([]byte, error)
+	CryptoFailureAction() int

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704064820



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)
+			return err
+		case crypto.ConsumerCryptoFailureActionDiscard:
+			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
+			return fmt.Errorf("discarding message on decryption error :%v", err)
+		case crypto.ConsumerCryptoFailureActionConsume:
+			pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
+			messages = append(messages, &message{
+				publishTime:  timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+				eventTime:    timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+				key:          msgMeta.GetPartitionKey(),
+				producerName: msgMeta.GetProducerName(),
+				properties:   internal.ConvertToStringMap(msgMeta.GetProperties()),
+				topic:        pc.topic,
+				msgID: newMessageID(
+					int64(pbMsgID.GetLedgerId()),
+					int64(pbMsgID.GetEntryId()),
+					pbMsgID.GetBatchIndex(),
+					pc.partitionIdx,
+				),
+				payLoad:             headersAndPayload.ReadableSlice(),
+				schema:              pc.options.schema,
+				replicationClusters: msgMeta.GetReplicateTo(),
+				replicatedFrom:      msgMeta.GetReplicatedFrom(),
+				redeliveryCount:     response.GetRedeliveryCount(),
+				encryptionContext:   createEncryptionContext(msgMeta),

Review comment:
       I don't think there is a need to add this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704064204



##########
File path: pulsar/impl_message.go
##########
@@ -215,6 +233,7 @@ type message struct {
 	replicatedFrom      string
 	redeliveryCount     uint32
 	schema              Schema
+	encryptionContext   EncryptionContext

Review comment:
       done

##########
File path: pulsar/message.go
##########
@@ -116,6 +116,10 @@ type Message interface {
 
 	//Get the de-serialized value of the message, according the configured
 	GetSchemaValue(v interface{}) error
+
+	// GetEncryptionContext get the ecryption context of message
+	// It will be used by the application to parse undecrypted message
+	GetEncryptionContext() EncryptionContext

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r717675114



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       @cckellogg looks like the CI is passed now. 
   Please do suggest if there are any other improvements on this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r709138229



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       ### java client
   The purpose of java client adding msg id to `tracker` is to trigger redelivery of message request to broker, so that the message can be resent.
   
   If it is not added to tracker it will not be resent to the  consumer until it restarted. And this tracker will be active only if `
   [ackTimeoutMillis](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)  config is provided.
   
   ## go client
   > I don't think we have this so may we just need to ack the message so it's not resent?
   
   If we leave the changes as it is this PR, i.e not adding msg id to tracker (since this client doesn't support this) => the message will not be resent.
   
   [Acknowledgement timeout](https://pulsar.apache.org/docs/en/concepts-messaging/#acknowledgement-timeout)  this doc prefer negative acknowledgements over acknowledgement timeout. Java client is doing the later one(i.e triggering redelivery of message request on ack timeout).
   
   
   So to be able to resent the message, we can do Nack (negative acknowledgement).
   
   @cckellogg  what do you suggest ?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r714900932



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       thanks @cckellogg 
   
   I don't think CI is failing due to last commit.
   
   I see most of the open PR's are failing with the same issue as below.
   
   `TestNamespaceTopicsNamespaceDoesNotExit (56.25s)
       client_impl_test.go:387: 
           	Error Trace:	client_impl_test.go:387
           	Error:      	Expected nil, but got: &errors.errorString{s:"server error: AuthorizationError: Exception occurred while trying to authorize GetTopicsOfNamespace"}
           	Test:       	TestNamespaceTopicsNamespaceDoesNotExit`
   
   And last commit do not have any changes that effects `client_impl.go`. More likely this has something to do with broker config that CI uses for running test cases.
   
   please let me know If I'm wrong here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r709138229



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       ### java client
   The purpose of java client adding msg id to `tracker` is to trigger redelivery of message request to broker, so that the message can be resent.
   
   If it is not added to tracker it will not be resent to the  consumer until it restarted. And this tracker will be active only if `
   [ackTimeoutMillis](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)  config is provided.
   
   ## go client
   > I don't think we have this so may we just need to ack the message so it's not resent?
   
   If we leave the changes as it is in this PR, i.e not adding msg id to tracker (since this client doesn't support this) => the message will not be resent.
   
   As per [Acknowledgement timeout](https://pulsar.apache.org/docs/en/concepts-messaging/#acknowledgement-timeout)  this doc, if we want to redeliver message then it is preferable to use negative acknowledgements over acknowledgement timeout. Java client is doing the later one(i.e triggering redelivery of message request on ack timeout).
   
   
   So to be able to resent the message, we can do Nack (negative acknowledgement).
   
   @cckellogg  what do you suggest ?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704068602



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)
+			return err
+		case crypto.ConsumerCryptoFailureActionDiscard:
+			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
+			return fmt.Errorf("discarding message on decryption error :%v", err)
+		case crypto.ConsumerCryptoFailureActionConsume:
+			pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
+			messages = append(messages, &message{
+				publishTime:  timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] GPrabhudas commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
GPrabhudas commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r712772071



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       @cckellogg I've made the changes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #612: Encryption support ext consumer

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #612:
URL: https://github.com/apache/pulsar-client-go/pull/612#discussion_r704566636



##########
File path: pulsar/consumer_partition.go
##########
@@ -479,7 +505,49 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		return err
 	}
 
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+	messages := make([]*message, 0)
+
+	// error decrypting the payload
+	if err != nil {
+		pc.log.Error(err)
+		switch pc.decryptor.CryptoFailureAction() {
+		case crypto.ConsumerCryptoFailureActionFail:
+			pc.log.Errorf("consuming message failed due to decryption err :%v", err)

Review comment:
       I don't think we have this so may we just need to ack the message so it's not resent?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org