You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/08 12:09:23 UTC

[GitHub] [pulsar-client-go] snowcrumble opened a new pull request #314: Add interceptor

snowcrumble opened a new pull request #314:
URL: https://github.com/apache/pulsar-client-go/pull/314


   ### Motivation
   
   Add A chain of interceptors for Producer and Consumer as an option, these interceptors will be called at some points, it can be used for tracing, metrics, and so on.
   
   ### Modifications
   
   * Add two files for interceptor definition.
   * Call interceptor's methods at appropriate position.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   - Added unit tests for all interceptor's methods.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API: yes
     - The schema: no
     - The default values of configurations: no
     - The wire protocol: no
   
   ### Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? GoDocs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy merged pull request #314: [Issue 52]Add interceptor

Posted by GitBox <gi...@apache.org>.
wolfstudy merged pull request #314:
URL: https://github.com/apache/pulsar-client-go/pull/314


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #314: [Issue 52]Add interceptor

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #314:
URL: https://github.com/apache/pulsar-client-go/pull/314#issuecomment-657307348


   @snowcrumble Can you merge the master code and fix conflict? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #314: [Issue 52]Add interceptor

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #314:
URL: https://github.com/apache/pulsar-client-go/pull/314#discussion_r451932214



##########
File path: pulsar/consumer_test.go
##########
@@ -1342,3 +1344,130 @@ func TestProducerName(t *testing.T) {
 		consumer.Ack(msg)
 	}
 }
+
+type noopConsumerInterceptor struct{}
+
+func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}
+
+func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}
+
+// copyPropertyInterceptor copy all keys in message properties map and add a suffix
+type copyPropertyInterceptor struct {
+	suffix string
+}
+
+func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
+	properties := message.Properties()
+	copy := make(map[string]string, len(properties))
+	for k, v := range properties {
+		copy[k+x.suffix] = v
+	}
+	for ck, v := range copy {
+		properties[ck] = v
+	}
+}
+
+func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}
+
+func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}
+
+type metricConsumerInterceptor struct {
+	ackn  int32
+	nackn int32
+}
+
+func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {
+	atomic.AddInt32(&x.ackn, 1)
+}
+
+func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {
+	atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
+}
+
+func TestConsumerWithInterceptors(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	rand.Seed(time.Now().Unix())
+	topic := fmt.Sprintf("persistent://public/default/test-topic-interceptors-%d", rand.Int())
+	ctx := context.Background()
+
+	metric := &metricConsumerInterceptor{}
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:               topic,
+		SubscriptionName:    "my-sub",
+		Type:                Exclusive,
+		NackRedeliveryDelay: time.Second, // for testing nack
+		Interceptors: ConsumerInterceptors{
+			noopConsumerInterceptor{},
+			copyPropertyInterceptor{suffix: "-copy"},
+			metric,
+		},
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: false,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		if _, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			Key:     "pulsar",
+			Properties: map[string]string{
+				"key-1": "pulsar-1",
+			},
+		}); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	var nackIds []MessageID
+	// receive 10 messages
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		expectMsg := fmt.Sprintf("hello-%d", i)
+		expectProperties := map[string]string{
+			"key-1":      "pulsar-1",
+			"key-1-copy": "pulsar-1", // check properties copy by interceptor
+		}
+		assert.Equal(t, []byte(expectMsg), msg.Payload())
+		assert.Equal(t, "pulsar", msg.Key())
+		assert.Equal(t, expectProperties, msg.Properties())
+
+		// ack message
+		if i%2 == 0 {
+			consumer.Ack(msg)
+		} else {
+			nackIds = append(nackIds, msg.ID())
+		}
+	}
+	assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn))
+
+	for i := range nackIds {
+		consumer.NackID(nackIds[i])
+	}
+
+	time.Sleep(time.Second * 3) // waiting for nack actual perform

Review comment:
       Please avoid using sleep in the test, because the status of each server is uncertain.

##########
File path: pulsar/consumer_test.go
##########
@@ -1342,3 +1344,130 @@ func TestProducerName(t *testing.T) {
 		consumer.Ack(msg)
 	}
 }
+
+type noopConsumerInterceptor struct{}
+
+func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}
+
+func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}
+
+// copyPropertyInterceptor copy all keys in message properties map and add a suffix
+type copyPropertyInterceptor struct {
+	suffix string
+}
+
+func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
+	properties := message.Properties()
+	copy := make(map[string]string, len(properties))
+	for k, v := range properties {
+		copy[k+x.suffix] = v
+	}
+	for ck, v := range copy {
+		properties[ck] = v
+	}
+}
+
+func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}
+
+func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}
+
+type metricConsumerInterceptor struct {
+	ackn  int32
+	nackn int32
+}
+
+func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {
+	atomic.AddInt32(&x.ackn, 1)
+}
+
+func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {
+	atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
+}
+
+func TestConsumerWithInterceptors(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	rand.Seed(time.Now().Unix())
+	topic := fmt.Sprintf("persistent://public/default/test-topic-interceptors-%d", rand.Int())

Review comment:
       Maybe we can use the `newTopicName()` to create a random topic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org