You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/14 02:28:02 UTC
[pulsar-client-go] branch master updated: [Issue 52]Add interceptor
(#314)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b434511 [Issue 52]Add interceptor (#314)
b434511 is described below
commit b4345119bc9b0ba36b2fa132a4f5674d40e3cabd
Author: Lijingfeng <17...@qq.com>
AuthorDate: Tue Jul 14 10:24:59 2020 +0800
[Issue 52]Add interceptor (#314)
### Motivation
Add A chain of interceptors for Producer and Consumer as an option, these interceptors will be called at some points, it can be used for tracing, metrics, and so on.
### Modifications
Add two files for interceptor definition.
Call interceptor's methods at appropriate position.
* review test
Co-authored-by: lijingfeng <li...@laiye.com>
---
pulsar/consumer.go | 3 +
pulsar/consumer_impl.go | 5 ++
pulsar/consumer_interceptor.go | 51 +++++++++++++
pulsar/consumer_partition.go | 14 ++++
pulsar/consumer_partition_test.go | 3 +
pulsar/consumer_test.go | 146 ++++++++++++++++++++++++++++++++++++++
pulsar/producer.go | 3 +
pulsar/producer_impl.go | 4 ++
pulsar/producer_interceptor.go | 43 +++++++++++
pulsar/producer_partition.go | 10 ++-
pulsar/producer_test.go | 102 ++++++++++++++++++++++++++
11 files changed, 382 insertions(+), 2 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 8d3c771..c1fe454 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -137,6 +137,9 @@ type ConsumerOptions struct {
// Mark the subscription as replicated to keep it in sync across clusters
ReplicateSubscriptionState bool
+
+ // A chain of interceptors, These interceptors will be called at some points defined in ConsumerInterceptor interface.
+ Interceptors ConsumerInterceptors
}
// Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e3db670..ef93037 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -94,6 +94,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
options.ReceiverQueueSize = 1000
}
+ if options.Interceptors == nil {
+ options.Interceptors = defaultConsumerInterceptors
+ }
+
if options.Name == "" {
options.Name = generateRandomName()
}
@@ -262,6 +266,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
startMessageID: messageID{},
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
+ interceptors: c.options.Interceptors,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq)
ch <- ConsumerError{
diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go
new file mode 100644
index 0000000..db46b78
--- /dev/null
+++ b/pulsar/consumer_interceptor.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+type ConsumerInterceptor interface {
+ // BeforeConsume This is called just before the message is send to Consumer's ConsumerMessage channel.
+ BeforeConsume(message ConsumerMessage)
+
+ // OnAcknowledge This is called consumer sends the acknowledgment to the broker.
+ OnAcknowledge(consumer Consumer, msgID MessageID)
+
+ // OnNegativeAcksSend This method will be called when a redelivery from a negative acknowledge occurs.
+ OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
+}
+
+type ConsumerInterceptors []ConsumerInterceptor
+
+func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) {
+ for i := range x {
+ x[i].BeforeConsume(message)
+ }
+}
+
+func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgID MessageID) {
+ for i := range x {
+ x[i].OnAcknowledge(consumer, msgID)
+ }
+}
+
+func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {
+ for i := range x {
+ x[i].OnNegativeAcksSend(consumer, msgIDs)
+ }
+}
+
+var defaultConsumerInterceptors = make(ConsumerInterceptors, 0)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index acf897d..0c723c8 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -114,6 +114,7 @@ type partitionConsumerOpts struct {
subscriptionMode subscriptionMode
readCompacted bool
disableForceTopicCreation bool
+ interceptors ConsumerInterceptors
}
type partitionConsumer struct {
@@ -274,6 +275,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
msgID: msgID,
}
pc.eventsCh <- req
+
+ pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
}
}
@@ -284,6 +287,12 @@ func (pc *partitionConsumer) NackID(msgID messageID) {
func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
pc.eventsCh <- &redeliveryRequest{msgIds}
+
+ iMsgIds := make([]MessageID, len(msgIds))
+ for i := range iMsgIds {
+ iMsgIds[i] = &msgIds[i]
+ }
+ pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIds)
}
func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
@@ -498,6 +507,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
}
}
+ pc.options.interceptors.BeforeConsume(ConsumerMessage{
+ Consumer: pc.parentConsumer,
+ Message: msg,
+ })
+
messages = append(messages, msg)
}
diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go
index 0fcbdc5..01831a4 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -34,6 +34,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
+ options: &partitionConsumerOpts{},
}
headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
@@ -63,6 +64,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
+ options: &partitionConsumerOpts{},
}
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
@@ -92,6 +94,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
+ options: &partitionConsumerOpts{},
}
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4031f7d..a3a22b6 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -23,6 +23,7 @@ import (
"log"
"net/http"
"strconv"
+ "sync/atomic"
"testing"
"time"
@@ -1343,6 +1344,151 @@ func TestProducerName(t *testing.T) {
}
}
+type noopConsumerInterceptor struct{}
+
+func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}
+
+func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}
+
+// copyPropertyInterceptor copy all keys in message properties map and add a suffix
+type copyPropertyInterceptor struct {
+ suffix string
+}
+
+func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
+ properties := message.Properties()
+ copy := make(map[string]string, len(properties))
+ for k, v := range properties {
+ copy[k+x.suffix] = v
+ }
+ for ck, v := range copy {
+ properties[ck] = v
+ }
+}
+
+func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}
+
+func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}
+
+type metricConsumerInterceptor struct {
+ ackn int32
+ nackn int32
+}
+
+func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {
+ atomic.AddInt32(&x.ackn, 1)
+}
+
+func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {
+ atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
+}
+
+func TestConsumerWithInterceptors(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ metric := &metricConsumerInterceptor{}
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ NackRedeliveryDelay: time.Second, // for testing nack
+ Interceptors: ConsumerInterceptors{
+ noopConsumerInterceptor{},
+ copyPropertyInterceptor{suffix: "-copy"},
+ metric,
+ },
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ if _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ var nackIds []MessageID
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ expectProperties := map[string]string{
+ "key-1": "pulsar-1",
+ "key-1-copy": "pulsar-1", // check properties copy by interceptor
+ }
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ assert.Equal(t, "pulsar", msg.Key())
+ assert.Equal(t, expectProperties, msg.Properties())
+
+ // ack message
+ if i%2 == 0 {
+ consumer.Ack(msg)
+ } else {
+ nackIds = append(nackIds, msg.ID())
+ }
+ }
+ assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn))
+
+ for i := range nackIds {
+ consumer.NackID(nackIds[i])
+ }
+
+ // receive 5 nack messages
+ for i := 0; i < 5; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i*2+1)
+ expectProperties := map[string]string{
+ "key-1": "pulsar-1",
+ "key-1-copy": "pulsar-1", // check properties copy by interceptor
+ }
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ assert.Equal(t, "pulsar", msg.Key())
+ assert.Equal(t, expectProperties, msg.Properties())
+
+ // ack message
+ consumer.Ack(msg)
+ }
+
+ assert.Equal(t, int32(5), atomic.LoadInt32(&metric.nackn))
+}
+
func TestConsumerName(t *testing.T) {
assert := assert.New(t)
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 7d44a56..cb38e3e 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -135,6 +135,9 @@ type ProducerOptions struct {
// If set to a value greater than 1, messages will be queued until this threshold is reached or
// BatchingMaxMessages (see above) has been reached or the batch interval has elapsed.
BatchingMaxSize uint
+
+ // A chain of interceptors, These interceptors will be called at some points defined in ProducerInterceptor interface
+ Interceptors ProducerInterceptors
}
// Producer is used to publish messages on a topic
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 01c5d76..4ee0d8d 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -97,6 +97,10 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
}
+ if options.Interceptors == nil {
+ options.Interceptors = defaultProducerInterceptors
+ }
+
if options.MessageRouter == nil {
internalRouter := internal.NewDefaultRouter(
internal.NewSystemClock(),
diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go
new file mode 100644
index 0000000..cb2cc15
--- /dev/null
+++ b/pulsar/producer_interceptor.go
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+type ProducerInterceptor interface {
+ // BeforeSend This is called before send the message to the brokers. This method is allowed to modify the
+ BeforeSend(producer Producer, message *ProducerMessage)
+
+ // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged,
+ // or when sending the message fails.
+ OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID)
+}
+
+type ProducerInterceptors []ProducerInterceptor
+
+func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) {
+ for i := range x {
+ x[i].BeforeSend(producer, message)
+ }
+}
+
+func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
+ for i := range x {
+ x[i].OnSendAcknowledgement(producer, message, msgID)
+ }
+}
+
+var defaultProducerInterceptors = make(ProducerInterceptors, 0)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b8dc13d..3832d69 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -444,6 +444,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
flushImmediately: flushImmediately,
publishTime: time.Now(),
}
+ p.options.Interceptors.BeforeSend(p, msg)
messagesPending.Inc()
bytesPending.Add(float64(len(sr.msg.Payload)))
@@ -488,14 +489,19 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
bytesPending.Sub(payloadSize)
}
- if sr.callback != nil {
+ if sr.callback != nil || len(p.options.Interceptors) > 0 {
msgID := newMessageID(
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
int32(idx),
p.partitionIdx,
)
- sr.callback(msgID, sr.msg, nil)
+
+ if sr.callback != nil {
+ sr.callback(msgID, sr.msg, nil)
+ }
+
+ p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
}
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 8d389cb..d06ddbb 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -817,3 +817,105 @@ func TestMaxMessageSize(t *testing.T) {
}
}
}
+
+type noopProduceInterceptor struct{}
+
+func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}
+
+func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
+}
+
+// copyPropertyIntercepotr copy all keys in message properties map and add a suffix
+type metricProduceInterceptor struct {
+ sendn int
+ ackn int
+}
+
+func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {
+ x.sendn++
+}
+
+func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
+ x.ackn++
+}
+
+func TestProducerWithInterceptors(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "persistent://public/default/test-topic-interceptors"
+ ctx := context.Background()
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ metric := &metricProduceInterceptor{}
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ Interceptors: ProducerInterceptors{
+ noopProduceInterceptor{},
+ metric,
+ },
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ if i%2 == 0 {
+ _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ })
+ assert.Nil(t, err)
+ } else {
+ producer.SendAsync(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
+ assert.Nil(t, err)
+ })
+ assert.Nil(t, err)
+ }
+ }
+
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ expectProperties := map[string]string{
+ "key-1": "pulsar-1",
+ }
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ assert.Equal(t, "pulsar", msg.Key())
+ assert.Equal(t, expectProperties, msg.Properties())
+
+ // ack message
+ consumer.Ack(msg)
+ }
+
+ assert.Equal(t, 10, metric.sendn)
+ assert.Equal(t, 10, metric.ackn)
+}