You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/11/14 19:56:02 UTC

[pulsar-client-go] branch master updated: Implemented decompression on consumer side (#98)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ccd4547  Implemented decompression on consumer side (#98)
ccd4547 is described below

commit ccd4547fb0df5d3c42ac223bea327c7d36cf80bb
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Nov 14 11:55:53 2019 -0800

    Implemented decompression on consumer side (#98)
    
    * Implemented decompression on consumer side
    
    * Fixed format
---
 pulsar/consumer_partition.go     | 54 ++++++++++++++++++++++++-
 pulsar/consumer_test.go          | 85 ++++++++++++++++++++++++++++++++++++++++
 pulsar/internal/batch_builder.go |  2 +
 pulsar/internal/commands.go      |  4 ++
 pulsar/internal/connection.go    | 11 ++----
 5 files changed, 146 insertions(+), 10 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index d49626f..e377723 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -26,10 +26,20 @@ import (
 
 	log "github.com/sirupsen/logrus"
 
+	"github.com/apache/pulsar-client-go/pkg/compression"
 	"github.com/apache/pulsar-client-go/pkg/pb"
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
+var (
+	compressionProviders = map[pb.CompressionType]compression.Provider{
+		pb.CompressionType_NONE: compression.NoopProvider,
+		pb.CompressionType_LZ4:  compression.Lz4Provider,
+		pb.CompressionType_ZLIB: compression.ZLibProvider,
+		pb.CompressionType_ZSTD: compression.ZStdProvider,
+	}
+)
+
 type consumerState int
 
 const (
@@ -219,10 +229,19 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 	reader := internal.NewMessageReader(headersAndPayload)
 	msgMeta, err := reader.ReadMessageMetadata()
 	if err != nil {
-		// TODO send discardCorruptedMessage
+		pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_ChecksumMismatch)
 		return err
 	}
 
+	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+	if err != nil {
+		pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
+		return err
+	}
+
+	// Reset the reader on the uncompressed buffer
+	reader.ResetBuffer(uncompressedHeadersAndPayload)
+
 	numMsgs := 1
 	if msgMeta.NumMessagesInBatch != nil {
 		numMsgs = int(msgMeta.GetNumMessagesInBatch())
@@ -236,7 +255,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 	for i := 0; i < numMsgs; i++ {
 		smm, payload, err := reader.ReadMessage()
 		if err != nil {
-			// TODO send corrupted message
+			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
 			return err
 		}
 
@@ -536,3 +555,34 @@ func (pc *partitionConsumer) grabConn() error {
 		return newUnexpectedErrMsg(msgType, requestID)
 	}
 }
+
+func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
+	provider, ok := compressionProviders[msgMeta.GetCompression()]
+	if !ok {
+		err := fmt.Errorf("Unsupported compression type: %v", msgMeta.GetCompression())
+		pc.log.WithError(err).Error("Failed to decompress message.")
+		return nil, err
+	}
+
+	uncompressed, err := provider.Decompress(payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
+	if err != nil {
+		return nil, err
+	} else {
+		return internal.NewBufferWrapper(uncompressed), nil
+	}
+}
+
+func (pc *partitionConsumer) discardCorruptedMessage(msgId *pb.MessageIdData, validationError pb.CommandAck_ValidationError) {
+	pc.log.WithFields(log.Fields{
+		"msgId":           msgId,
+		"validationError": validationError,
+	}).Error("Discarding corrupted message")
+
+	pc.client.rpcClient.RequestOnCnxNoWait(pc.conn,
+		pb.BaseCommand_ACK, &pb.CommandAck{
+			ConsumerId:      proto.Uint64(pc.consumerID),
+			MessageId:       []*pb.MessageIdData{msgId},
+			AckType:         pb.CommandAck_Individual.Enum(),
+			ValidationError: validationError.Enum(),
+		})
+}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f34966b..f342db4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -674,3 +674,88 @@ func TestConsumerNack(t *testing.T) {
 		consumer.Ack(msg)
 	}
 }
+
+func TestConsumerCompression(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	ctx := context.Background()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topicName,
+		CompressionType: LZ4,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-1",
+	})
+	assert.Nil(t, err)
+
+	const N = 100
+
+	for i := 0; i < N; i++ {
+		if err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	for i := 0; i < N; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
+		consumer.Ack(msg)
+	}
+}
+
+func TestConsumerCompressionWithBatches(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	ctx := context.Background()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topicName,
+		CompressionType:         ZLib,
+		BatchingMaxPublishDelay: 1 * time.Minute,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-1",
+	})
+	assert.Nil(t, err)
+
+	const N = 100
+
+	for i := 0; i < N; i++ {
+		producer.SendAsync(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+		}, nil)
+	}
+
+	producer.Flush()
+
+	for i := 0; i < N; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
+		consumer.Ack(msg)
+	}
+}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 53035ed..c3dffbd 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -153,7 +153,9 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks
 	bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
 	bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
 
+	uncompressedSize := bb.buffer.ReadableBytes()
 	compressed := bb.compressionProvider.Compress(bb.buffer.ReadableSlice())
+	bb.msgMetadata.UncompressedSize = &uncompressedSize
 
 	buffer := NewBuffer(4096)
 	serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, compressed)
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 09d223f..c046f43 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -143,6 +143,10 @@ func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata, []byte,
 	return &meta, r.buffer.Read(uint32(meta.GetPayloadSize())), nil
 }
 
+func (r *MessageReader) ResetBuffer(buffer Buffer) {
+	r.buffer = buffer
+}
+
 func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
 	cmd := &pb.BaseCommand{
 		Type: &cmdType,
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index ba4147a..88ee673 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -344,7 +344,6 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buff
 func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
 	c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
 	c.setLastDataReceived(time.Now())
-	var err error
 
 	switch *cmd.Type {
 	case pb.BaseCommand_SUCCESS:
@@ -386,10 +385,8 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
 	case pb.BaseCommand_SEND_ERROR:
 
 	case pb.BaseCommand_MESSAGE:
-		err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
-		if err != nil {
-			c.Close()
-		}
+		c.handleMessage(cmd.GetMessage(), headersAndPayload)
+
 	case pb.BaseCommand_PING:
 		c.handlePing()
 	case pb.BaseCommand_PONG:
@@ -465,19 +462,17 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
 	}
 }
 
-func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) error {
+func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) {
 	c.log.Debug("Got Message: ", response)
 	consumerID := response.GetConsumerId()
 	if consumer, ok := c.consumerHandler(consumerID); ok {
 		err := consumer.MessageReceived(response, payload)
 		if err != nil {
 			c.log.WithField("consumerID", consumerID).Error("handle message err: ", response.MessageId)
-			return errors.New("handler not found")
 		}
 	} else {
 		c.log.WithField("consumerID", consumerID).Warn("Got unexpected message: ", response.MessageId)
 	}
-	return nil
 }
 
 func (c *connection) lastDataReceived() time.Time {