You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/14 02:28:02 UTC

[pulsar-client-go] branch master updated: [Issue 52]Add interceptor (#314)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b434511  [Issue 52]Add interceptor (#314)
b434511 is described below

commit b4345119bc9b0ba36b2fa132a4f5674d40e3cabd
Author: Lijingfeng <17...@qq.com>
AuthorDate: Tue Jul 14 10:24:59 2020 +0800

    [Issue 52]Add interceptor (#314)
    
    ### Motivation
    Add A chain of interceptors for Producer and Consumer as an option, these interceptors will be called at some points, it can be used for tracing, metrics, and so on.
    
    ### Modifications
    Add two files for interceptor definition.
    Call interceptor's methods at appropriate position.
    * review test
    
    Co-authored-by: lijingfeng <li...@laiye.com>
---
 pulsar/consumer.go                |   3 +
 pulsar/consumer_impl.go           |   5 ++
 pulsar/consumer_interceptor.go    |  51 +++++++++++++
 pulsar/consumer_partition.go      |  14 ++++
 pulsar/consumer_partition_test.go |   3 +
 pulsar/consumer_test.go           | 146 ++++++++++++++++++++++++++++++++++++++
 pulsar/producer.go                |   3 +
 pulsar/producer_impl.go           |   4 ++
 pulsar/producer_interceptor.go    |  43 +++++++++++
 pulsar/producer_partition.go      |  10 ++-
 pulsar/producer_test.go           | 102 ++++++++++++++++++++++++++
 11 files changed, 382 insertions(+), 2 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 8d3c771..c1fe454 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -137,6 +137,9 @@ type ConsumerOptions struct {
 
 	// Mark the subscription as replicated to keep it in sync across clusters
 	ReplicateSubscriptionState bool
+
+	// A chain of interceptors, These interceptors will be called at some points defined in ConsumerInterceptor interface.
+	Interceptors ConsumerInterceptors
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e3db670..ef93037 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -94,6 +94,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 		options.ReceiverQueueSize = 1000
 	}
 
+	if options.Interceptors == nil {
+		options.Interceptors = defaultConsumerInterceptors
+	}
+
 	if options.Name == "" {
 		options.Name = generateRandomName()
 	}
@@ -262,6 +266,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 				startMessageID:             messageID{},
 				subscriptionMode:           durable,
 				readCompacted:              c.options.ReadCompacted,
+				interceptors:               c.options.Interceptors,
 			}
 			cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq)
 			ch <- ConsumerError{
diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go
new file mode 100644
index 0000000..db46b78
--- /dev/null
+++ b/pulsar/consumer_interceptor.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+type ConsumerInterceptor interface {
+	// BeforeConsume This is called just before the message is send to Consumer's ConsumerMessage channel.
+	BeforeConsume(message ConsumerMessage)
+
+	// OnAcknowledge This is called consumer sends the acknowledgment to the broker.
+	OnAcknowledge(consumer Consumer, msgID MessageID)
+
+	// OnNegativeAcksSend This method will be called when a redelivery from a negative acknowledge occurs.
+	OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
+}
+
+type ConsumerInterceptors []ConsumerInterceptor
+
+func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) {
+	for i := range x {
+		x[i].BeforeConsume(message)
+	}
+}
+
+func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgID MessageID) {
+	for i := range x {
+		x[i].OnAcknowledge(consumer, msgID)
+	}
+}
+
+func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {
+	for i := range x {
+		x[i].OnNegativeAcksSend(consumer, msgIDs)
+	}
+}
+
+var defaultConsumerInterceptors = make(ConsumerInterceptors, 0)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index acf897d..0c723c8 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -114,6 +114,7 @@ type partitionConsumerOpts struct {
 	subscriptionMode           subscriptionMode
 	readCompacted              bool
 	disableForceTopicCreation  bool
+	interceptors               ConsumerInterceptors
 }
 
 type partitionConsumer struct {
@@ -274,6 +275,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
 			msgID: msgID,
 		}
 		pc.eventsCh <- req
+
+		pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
 	}
 }
 
@@ -284,6 +287,12 @@ func (pc *partitionConsumer) NackID(msgID messageID) {
 
 func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
 	pc.eventsCh <- &redeliveryRequest{msgIds}
+
+	iMsgIds := make([]MessageID, len(msgIds))
+	for i := range iMsgIds {
+		iMsgIds[i] = &msgIds[i]
+	}
+	pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIds)
 }
 
 func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
@@ -498,6 +507,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 			}
 		}
 
+		pc.options.interceptors.BeforeConsume(ConsumerMessage{
+			Consumer: pc.parentConsumer,
+			Message:  msg,
+		})
+
 		messages = append(messages, msg)
 	}
 
diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go
index 0fcbdc5..01831a4 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -34,6 +34,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
 		queueCh:              make(chan []*message, 1),
 		eventsCh:             eventsCh,
 		compressionProviders: make(map[pb.CompressionType]compression.Provider),
+		options:              &partitionConsumerOpts{},
 	}
 
 	headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
@@ -63,6 +64,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
 		queueCh:              make(chan []*message, 1),
 		eventsCh:             eventsCh,
 		compressionProviders: make(map[pb.CompressionType]compression.Provider),
+		options:              &partitionConsumerOpts{},
 	}
 
 	headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
@@ -92,6 +94,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
 		queueCh:              make(chan []*message, 1),
 		eventsCh:             eventsCh,
 		compressionProviders: make(map[pb.CompressionType]compression.Provider),
+		options:              &partitionConsumerOpts{},
 	}
 
 	headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4031f7d..a3a22b6 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -23,6 +23,7 @@ import (
 	"log"
 	"net/http"
 	"strconv"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -1343,6 +1344,151 @@ func TestProducerName(t *testing.T) {
 	}
 }
 
+type noopConsumerInterceptor struct{}
+
+func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}
+
+func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}
+
+// copyPropertyInterceptor copy all keys in message properties map and add a suffix
+type copyPropertyInterceptor struct {
+	suffix string
+}
+
+func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
+	properties := message.Properties()
+	copy := make(map[string]string, len(properties))
+	for k, v := range properties {
+		copy[k+x.suffix] = v
+	}
+	for ck, v := range copy {
+		properties[ck] = v
+	}
+}
+
+func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}
+
+func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}
+
+type metricConsumerInterceptor struct {
+	ackn  int32
+	nackn int32
+}
+
+func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {
+	atomic.AddInt32(&x.ackn, 1)
+}
+
+func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {
+	atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
+}
+
+func TestConsumerWithInterceptors(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	ctx := context.Background()
+
+	metric := &metricConsumerInterceptor{}
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:               topic,
+		SubscriptionName:    "my-sub",
+		Type:                Exclusive,
+		NackRedeliveryDelay: time.Second, // for testing nack
+		Interceptors: ConsumerInterceptors{
+			noopConsumerInterceptor{},
+			copyPropertyInterceptor{suffix: "-copy"},
+			metric,
+		},
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: false,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		if _, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			Key:     "pulsar",
+			Properties: map[string]string{
+				"key-1": "pulsar-1",
+			},
+		}); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	var nackIds []MessageID
+	// receive 10 messages
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		expectMsg := fmt.Sprintf("hello-%d", i)
+		expectProperties := map[string]string{
+			"key-1":      "pulsar-1",
+			"key-1-copy": "pulsar-1", // check properties copy by interceptor
+		}
+		assert.Equal(t, []byte(expectMsg), msg.Payload())
+		assert.Equal(t, "pulsar", msg.Key())
+		assert.Equal(t, expectProperties, msg.Properties())
+
+		// ack message
+		if i%2 == 0 {
+			consumer.Ack(msg)
+		} else {
+			nackIds = append(nackIds, msg.ID())
+		}
+	}
+	assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn))
+
+	for i := range nackIds {
+		consumer.NackID(nackIds[i])
+	}
+
+	// receive 5 nack messages
+	for i := 0; i < 5; i++ {
+		msg, err := consumer.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		expectMsg := fmt.Sprintf("hello-%d", i*2+1)
+		expectProperties := map[string]string{
+			"key-1":      "pulsar-1",
+			"key-1-copy": "pulsar-1", // check properties copy by interceptor
+		}
+		assert.Equal(t, []byte(expectMsg), msg.Payload())
+		assert.Equal(t, "pulsar", msg.Key())
+		assert.Equal(t, expectProperties, msg.Properties())
+
+		// ack message
+		consumer.Ack(msg)
+	}
+
+	assert.Equal(t, int32(5), atomic.LoadInt32(&metric.nackn))
+}
+
 func TestConsumerName(t *testing.T) {
 	assert := assert.New(t)
 
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 7d44a56..cb38e3e 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -135,6 +135,9 @@ type ProducerOptions struct {
 	// If set to a value greater than 1, messages will be queued until this threshold is reached or
 	// BatchingMaxMessages (see above) has been reached or the batch interval has elapsed.
 	BatchingMaxSize uint
+
+	// A chain of interceptors, These interceptors will be called at some points defined in ProducerInterceptor interface
+	Interceptors ProducerInterceptors
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 01c5d76..4ee0d8d 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -97,6 +97,10 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 		batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
 	}
 
+	if options.Interceptors == nil {
+		options.Interceptors = defaultProducerInterceptors
+	}
+
 	if options.MessageRouter == nil {
 		internalRouter := internal.NewDefaultRouter(
 			internal.NewSystemClock(),
diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go
new file mode 100644
index 0000000..cb2cc15
--- /dev/null
+++ b/pulsar/producer_interceptor.go
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+type ProducerInterceptor interface {
+	// BeforeSend This is called before send the message to the brokers. This method is allowed to modify the
+	BeforeSend(producer Producer, message *ProducerMessage)
+
+	// OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged,
+	// or when sending the message fails.
+	OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID)
+}
+
+type ProducerInterceptors []ProducerInterceptor
+
+func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) {
+	for i := range x {
+		x[i].BeforeSend(producer, message)
+	}
+}
+
+func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
+	for i := range x {
+		x[i].OnSendAcknowledgement(producer, message, msgID)
+	}
+}
+
+var defaultProducerInterceptors = make(ProducerInterceptors, 0)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b8dc13d..3832d69 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -444,6 +444,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
 		flushImmediately: flushImmediately,
 		publishTime:      time.Now(),
 	}
+	p.options.Interceptors.BeforeSend(p, msg)
 
 	messagesPending.Inc()
 	bytesPending.Add(float64(len(sr.msg.Payload)))
@@ -488,14 +489,19 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 			bytesPending.Sub(payloadSize)
 		}
 
-		if sr.callback != nil {
+		if sr.callback != nil || len(p.options.Interceptors) > 0 {
 			msgID := newMessageID(
 				int64(response.MessageId.GetLedgerId()),
 				int64(response.MessageId.GetEntryId()),
 				int32(idx),
 				p.partitionIdx,
 			)
-			sr.callback(msgID, sr.msg, nil)
+
+			if sr.callback != nil {
+				sr.callback(msgID, sr.msg, nil)
+			}
+
+			p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
 		}
 	}
 
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 8d389cb..d06ddbb 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -817,3 +817,105 @@ func TestMaxMessageSize(t *testing.T) {
 		}
 	}
 }
+
+type noopProduceInterceptor struct{}
+
+func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}
+
+func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
+}
+
+// copyPropertyIntercepotr copy all keys in message properties map and add a suffix
+type metricProduceInterceptor struct {
+	sendn int
+	ackn  int
+}
+
+func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {
+	x.sendn++
+}
+
+func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
+	x.ackn++
+}
+
+func TestProducerWithInterceptors(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/test-topic-interceptors"
+	ctx := context.Background()
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub",
+		Type:             Exclusive,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	metric := &metricProduceInterceptor{}
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: false,
+		Interceptors: ProducerInterceptors{
+			noopProduceInterceptor{},
+			metric,
+		},
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		if i%2 == 0 {
+			_, err := producer.Send(ctx, &ProducerMessage{
+				Payload: []byte(fmt.Sprintf("hello-%d", i)),
+				Key:     "pulsar",
+				Properties: map[string]string{
+					"key-1": "pulsar-1",
+				},
+			})
+			assert.Nil(t, err)
+		} else {
+			producer.SendAsync(ctx, &ProducerMessage{
+				Payload: []byte(fmt.Sprintf("hello-%d", i)),
+				Key:     "pulsar",
+				Properties: map[string]string{
+					"key-1": "pulsar-1",
+				},
+			}, func(_ MessageID, _ *ProducerMessage, err error) {
+				assert.Nil(t, err)
+			})
+			assert.Nil(t, err)
+		}
+	}
+
+	// receive 10 messages
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		expectMsg := fmt.Sprintf("hello-%d", i)
+		expectProperties := map[string]string{
+			"key-1": "pulsar-1",
+		}
+		assert.Equal(t, []byte(expectMsg), msg.Payload())
+		assert.Equal(t, "pulsar", msg.Key())
+		assert.Equal(t, expectProperties, msg.Properties())
+
+		// ack message
+		consumer.Ack(msg)
+	}
+
+	assert.Equal(t, 10, metric.sendn)
+	assert.Equal(t, 10, metric.ackn)
+}