You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/11/14 19:56:02 UTC
[pulsar-client-go] branch master updated: Implemented decompression
on consumer side (#98)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ccd4547 Implemented decompression on consumer side (#98)
ccd4547 is described below
commit ccd4547fb0df5d3c42ac223bea327c7d36cf80bb
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Nov 14 11:55:53 2019 -0800
Implemented decompression on consumer side (#98)
* Implemented decompression on consumer side
* Fixed format
---
pulsar/consumer_partition.go | 54 ++++++++++++++++++++++++-
pulsar/consumer_test.go | 85 ++++++++++++++++++++++++++++++++++++++++
pulsar/internal/batch_builder.go | 2 +
pulsar/internal/commands.go | 4 ++
pulsar/internal/connection.go | 11 ++----
5 files changed, 146 insertions(+), 10 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index d49626f..e377723 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -26,10 +26,20 @@ import (
log "github.com/sirupsen/logrus"
+ "github.com/apache/pulsar-client-go/pkg/compression"
"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
+var (
+ compressionProviders = map[pb.CompressionType]compression.Provider{
+ pb.CompressionType_NONE: compression.NoopProvider,
+ pb.CompressionType_LZ4: compression.Lz4Provider,
+ pb.CompressionType_ZLIB: compression.ZLibProvider,
+ pb.CompressionType_ZSTD: compression.ZStdProvider,
+ }
+)
+
type consumerState int
const (
@@ -219,10 +229,19 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
reader := internal.NewMessageReader(headersAndPayload)
msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
- // TODO send discardCorruptedMessage
+ pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_ChecksumMismatch)
return err
}
+ uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
+ if err != nil {
+ pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
+ return err
+ }
+
+ // Reset the reader on the uncompressed buffer
+ reader.ResetBuffer(uncompressedHeadersAndPayload)
+
numMsgs := 1
if msgMeta.NumMessagesInBatch != nil {
numMsgs = int(msgMeta.GetNumMessagesInBatch())
@@ -236,7 +255,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
for i := 0; i < numMsgs; i++ {
smm, payload, err := reader.ReadMessage()
if err != nil {
- // TODO send corrupted message
+ pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
return err
}
@@ -536,3 +555,34 @@ func (pc *partitionConsumer) grabConn() error {
return newUnexpectedErrMsg(msgType, requestID)
}
}
+
+func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
+ provider, ok := compressionProviders[msgMeta.GetCompression()]
+ if !ok {
+ err := fmt.Errorf("Unsupported compression type: %v", msgMeta.GetCompression())
+ pc.log.WithError(err).Error("Failed to decompress message.")
+ return nil, err
+ }
+
+ uncompressed, err := provider.Decompress(payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
+ if err != nil {
+ return nil, err
+ } else {
+ return internal.NewBufferWrapper(uncompressed), nil
+ }
+}
+
+func (pc *partitionConsumer) discardCorruptedMessage(msgId *pb.MessageIdData, validationError pb.CommandAck_ValidationError) {
+ pc.log.WithFields(log.Fields{
+ "msgId": msgId,
+ "validationError": validationError,
+ }).Error("Discarding corrupted message")
+
+ pc.client.rpcClient.RequestOnCnxNoWait(pc.conn,
+ pb.BaseCommand_ACK, &pb.CommandAck{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ MessageId: []*pb.MessageIdData{msgId},
+ AckType: pb.CommandAck_Individual.Enum(),
+ ValidationError: validationError.Enum(),
+ })
+}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f34966b..f342db4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -674,3 +674,88 @@ func TestConsumerNack(t *testing.T) {
consumer.Ack(msg)
}
}
+
+func TestConsumerCompression(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ CompressionType: LZ4,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ })
+ assert.Nil(t, err)
+
+ const N = 100
+
+ for i := 0; i < N; i++ {
+ if err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ for i := 0; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
+ consumer.Ack(msg)
+ }
+}
+
+func TestConsumerCompressionWithBatches(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ CompressionType: ZLib,
+ BatchingMaxPublishDelay: 1 * time.Minute,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ })
+ assert.Nil(t, err)
+
+ const N = 100
+
+ for i := 0; i < N; i++ {
+ producer.SendAsync(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+ }, nil)
+ }
+
+ producer.Flush()
+
+ for i := 0; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
+ consumer.Ack(msg)
+ }
+}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 53035ed..c3dffbd 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -153,7 +153,9 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks
bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
+ uncompressedSize := bb.buffer.ReadableBytes()
compressed := bb.compressionProvider.Compress(bb.buffer.ReadableSlice())
+ bb.msgMetadata.UncompressedSize = &uncompressedSize
buffer := NewBuffer(4096)
serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, compressed)
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 09d223f..c046f43 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -143,6 +143,10 @@ func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata, []byte,
return &meta, r.buffer.Read(uint32(meta.GetPayloadSize())), nil
}
+func (r *MessageReader) ResetBuffer(buffer Buffer) {
+ r.buffer = buffer
+}
+
func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
cmd := &pb.BaseCommand{
Type: &cmdType,
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index ba4147a..88ee673 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -344,7 +344,6 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buff
func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
c.setLastDataReceived(time.Now())
- var err error
switch *cmd.Type {
case pb.BaseCommand_SUCCESS:
@@ -386,10 +385,8 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
case pb.BaseCommand_SEND_ERROR:
case pb.BaseCommand_MESSAGE:
- err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
- if err != nil {
- c.Close()
- }
+ c.handleMessage(cmd.GetMessage(), headersAndPayload)
+
case pb.BaseCommand_PING:
c.handlePing()
case pb.BaseCommand_PONG:
@@ -465,19 +462,17 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
}
}
-func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) error {
+func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) {
c.log.Debug("Got Message: ", response)
consumerID := response.GetConsumerId()
if consumer, ok := c.consumerHandler(consumerID); ok {
err := consumer.MessageReceived(response, payload)
if err != nil {
c.log.WithField("consumerID", consumerID).Error("handle message err: ", response.MessageId)
- return errors.New("handler not found")
}
} else {
c.log.WithField("consumerID", consumerID).Warn("Got unexpected message: ", response.MessageId)
}
- return nil
}
func (c *connection) lastDataReceived() time.Time {