You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/08/22 04:21:09 UTC

[pulsar-client-go] branch master updated: Add some test cases for project (#58)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new de147fa  Add some test cases for project (#58)
de147fa is described below

commit de147fa4f37447ea4207bda2f64b5e858941993b
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Thu Aug 22 12:21:05 2019 +0800

    Add some test cases for project (#58)
    
    ### Motivation
    
    - Add some test cases as follows:
    
        - TestConsumer_EventTime
        - TestNonPersistentTopic
        - TestConsumer_Flow
    
    - Fix consumer connection closed
    - Add `pprof` for debug project
    - Fix `flow` command logic
---
 .gitignore                        |  2 ++
 perf/pulsar-perf-go.go            | 16 +++++++++
 pulsar/consumer_test.go           | 75 +++++++++++++++++++++++++++++++++++++++
 pulsar/error.go                   |  5 ++-
 pulsar/impl_partition_consumer.go | 61 ++++++++++++++++---------------
 pulsar/internal/connection.go     | 26 +++++++++-----
 pulsar/producer_test.go           | 23 ++++++++++++
 7 files changed, 171 insertions(+), 37 deletions(-)

diff --git a/.gitignore b/.gitignore
index b37e915..7daaebc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,5 @@
 
 # Output of the go coverage tool
 *.out
+
+perf/perf
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index 06ac059..f831079 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -18,6 +18,11 @@
 package main
 
 import (
+	"fmt"
+	"net"
+	"net/http"
+	_ "net/http/pprof"
+
 	"github.com/spf13/cobra"
 
 	log "github.com/sirupsen/logrus"
@@ -30,6 +35,17 @@ type ClientArgs struct {
 var clientArgs ClientArgs
 
 func main() {
+	// use `go tool pprof http://localhost:3000/debug/pprof/profile` to get pprof file(cpu info)
+	// use `go tool pprof http://localhost:3000/debug/pprof/heap` to get inuse_space file
+	go func() {
+		listenAddr := net.JoinHostPort("localhost", "3000")
+		fmt.Printf("Profile server listening on %s\n", listenAddr)
+		profileRedirect := http.RedirectHandler("/debug/pprof", http.StatusSeeOther)
+		http.Handle("/", profileRedirect)
+		err := fmt.Errorf("%v", http.ListenAndServe(listenAddr, nil))
+		fmt.Println(err.Error())
+	}()
+
 	log.SetFormatter(&log.TextFormatter{
 		FullTimestamp:   true,
 		TimestampFormat: "15:04:05.000",
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f5e9ba3..cda9506 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -790,3 +790,78 @@ func TestConsumer_Seek(t *testing.T) {
 	t.Logf("again received message:%+v", msg.ID())
 	assert.Equal(t, "msg-content-4", string(msg.Payload()))
 }
+
+func TestConsumer_EventTime(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-event-time"
+	ctx := context.Background()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-1",
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	et := timeFromUnixTimestampMillis(uint64(5))
+	err = producer.Send(ctx, &ProducerMessage{
+		Payload:   []byte("test"),
+		EventTime: &et,
+	})
+	assert.Nil(t, err)
+
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	assert.Equal(t, et, msg.EventTime())
+	assert.Equal(t, "test", string(msg.Payload()))
+}
+
+func TestConsumer_Flow(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-received-since-flow"
+	ctx := context.Background()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:             topicName,
+		SubscriptionName:  "sub-1",
+		ReceiverQueueSize: 4,
+	})
+
+	for msgNum := 0; msgNum < 100; msgNum++ {
+		if err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	for msgNum := 0; msgNum < 100; msgNum++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		assert.Equal(t, fmt.Sprintf("msg-content-%d", msgNum), string(msg.Payload()))
+	}
+}
diff --git a/pulsar/error.go b/pulsar/error.go
index ec20844..bd7e037 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -45,7 +45,8 @@ const (
 	//BrokerMetadataError            Result = 10 // Broker failed in updating metadata
 	//BrokerPersistenceError         Result = 11 // Broker failed to persist entry
 	//ChecksumError                  Result = 12 // Corrupt message checksum failure
-	//ConsumerBusy                   Result = 13 // Exclusive consumer is already connected
+	// ConsumerBusy means Exclusive consumer is already connected
+	ConsumerBusy Result = 13
 	//NotConnectedError              Result = 14 // Producer/Consumer is not currently connected to broker
 	//AlreadyClosedError             Result = 15 // Producer/Consumer is already closed and not accepting any operation
 	//InvalidMessage                 Result = 16 // Error in publishing an already used message
@@ -104,6 +105,8 @@ func getResultStr(r Result) string {
 		return "InvalidTopicName"
 	case ResultConnectError:
 		return "ConnectError"
+	case ConsumerBusy:
+		return "ConsumerBusy"
 	default:
 		return fmt.Sprintf("Result(%d)", r)
 	}
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 4e29d25..6600ca4 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -63,25 +63,27 @@ type partitionConsumer struct {
 	omu               sync.Mutex // protects following
 	redeliverMessages []*pb.MessageIdData
 
-	unAckTracker *UnackedMessageTracker
+	unAckTracker      *UnackedMessageTracker
+	receivedSinceFlow uint32
 
 	eventsChan   chan interface{}
 	partitionIdx int
 	partitionNum int
 }
 
-func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan ConsumerMessage) (*partitionConsumer, error) {
+func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan<- ConsumerMessage) (*partitionConsumer, error) {
 	c := &partitionConsumer{
-		state:        consumerInit,
-		client:       client,
-		topic:        topic,
-		options:      options,
-		log:          log.WithField("topic", topic),
-		consumerID:   client.rpcClient.NewConsumerID(),
-		partitionIdx: partitionID,
-		partitionNum: partitionNum,
-		eventsChan:   make(chan interface{}, 1),
-		subQueue:     make(chan ConsumerMessage, options.ReceiverQueueSize),
+		state:             consumerInit,
+		client:            client,
+		topic:             topic,
+		options:           options,
+		log:               log.WithField("topic", topic),
+		consumerID:        client.rpcClient.NewConsumerID(),
+		partitionIdx:      partitionID,
+		partitionNum:      partitionNum,
+		eventsChan:        make(chan interface{}, 1),
+		subQueue:          make(chan ConsumerMessage, options.ReceiverQueueSize),
+		receivedSinceFlow: 0,
 	}
 
 	c.setDefault(options)
@@ -167,7 +169,7 @@ func (pc *partitionConsumer) grabCnx() error {
 		return err
 	}
 
-	pc.log.Infof("Lookup result: %v", lr)
+	pc.log.Debugf("Lookup result: %v", lr)
 	requestID := pc.client.rpcClient.NewRequestID()
 	res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
 		pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{
@@ -262,26 +264,33 @@ func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
 	return nil
 }
 
-func (pc *partitionConsumer) increaseAvailablePermits(receivedSinceFlow uint32) error {
-	highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1))
-	if receivedSinceFlow >= highwater {
-		if err := pc.internalFlow(receivedSinceFlow); err != nil {
-			pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+func (pc *partitionConsumer) increaseAvailablePermits() error {
+	pc.receivedSinceFlow++
+	highWater := uint32(math.Max(float64(pc.options.ReceiverQueueSize/2), 1))
+
+	pc.log.Debugf("receivedSinceFlow size is: %d, highWater size is: %d", pc.receivedSinceFlow, highWater)
+
+	// send flow request after 1/2 of the queue has been consumed
+	if pc.receivedSinceFlow >= highWater {
+		pc.log.Debugf("send flow command to broker, permits size is: %d", pc.receivedSinceFlow)
+		err := pc.internalFlow(pc.receivedSinceFlow)
+		if err != nil {
+			pc.log.Errorf("Send flow cmd error:%s", err.Error())
+			pc.receivedSinceFlow = 0
 			return err
 		}
-		receivedSinceFlow = 0
+		pc.receivedSinceFlow = 0
 	}
 	return nil
 }
 
-func (pc *partitionConsumer) messageProcessed(msgID MessageID, receivedSinceFlow uint32) error {
+func (pc *partitionConsumer) messageProcessed(msgID MessageID) error {
 	err := pc.trackMessage(msgID)
 	if err != nil {
 		return err
 	}
-	receivedSinceFlow++
 
-	err = pc.increaseAvailablePermits(receivedSinceFlow)
+	err = pc.increaseAvailablePermits()
 	if err != nil {
 		return err
 	}
@@ -303,19 +312,16 @@ func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err
 }
 
 func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
-	var receivedSinceFlow uint32
-
 	for {
 		select {
 		case tmpMsg, ok := <-pc.subQueue:
 			if ok {
 				msgs <- tmpMsg
 
-				err := pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow)
+				err := pc.messageProcessed(tmpMsg.ID())
 				if err != nil {
 					return err
 				}
-				receivedSinceFlow = 0
 				continue
 			}
 			break
@@ -326,13 +332,12 @@ func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Consu
 }
 
 func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
-	var receivedSinceFlow uint32
 	var err error
 
 	select {
 	case tmpMsg, ok := <-pc.subQueue:
 		if ok {
-			err = pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow)
+			err = pc.messageProcessed(tmpMsg.ID())
 			callback(tmpMsg.Message, err)
 			if err != nil {
 				pc.log.Errorf("processed messages error:%s", err.Error())
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 7319662..8084017 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -314,30 +314,36 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
 
 	switch *cmd.Type {
 	case pb.BaseCommand_SUCCESS:
-		c.handleResponse(*cmd.Success.RequestId, cmd)
+		c.handleResponse(cmd.Success.GetRequestId(), cmd)
 
 	case pb.BaseCommand_PRODUCER_SUCCESS:
-		c.handleResponse(*cmd.ProducerSuccess.RequestId, cmd)
+		c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
 
 	case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
-		c.handleResponse(*cmd.PartitionMetadataResponse.RequestId, cmd)
+		c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), cmd)
 
 	case pb.BaseCommand_LOOKUP_RESPONSE:
-		c.handleResponse(*cmd.LookupTopicResponse.RequestId, cmd)
+		lookupResult := cmd.LookupTopicResponse
+		c.handleResponse(lookupResult.GetRequestId(), cmd)
 
 	case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
-		c.handleResponse(*cmd.ConsumerStatsResponse.RequestId, cmd)
+		c.handleResponse(cmd.ConsumerStatsResponse.GetRequestId(), cmd)
 
 	case pb.BaseCommand_GET_LAST_MESSAGE_ID_RESPONSE:
-		c.handleResponse(*cmd.GetLastMessageIdResponse.RequestId, cmd)
+		c.handleResponse(cmd.GetLastMessageIdResponse.GetRequestId(), cmd)
 
 	case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE:
-		c.handleResponse(*cmd.GetTopicsOfNamespaceResponse.RequestId, cmd)
+		c.handleResponse(cmd.GetTopicsOfNamespaceResponse.GetRequestId(), cmd)
 
 	case pb.BaseCommand_GET_SCHEMA_RESPONSE:
-		c.handleResponse(*cmd.GetSchemaResponse.RequestId, cmd)
+		c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)
 
 	case pb.BaseCommand_ERROR:
+		if cmd.Error != nil {
+			c.log.Errorf("Error: %s, Error Message: %s", cmd.Error.GetError(), cmd.Error.GetMessage())
+			c.Close()
+			return
+		}
 	case pb.BaseCommand_CLOSE_PRODUCER:
 		c.handleCloseProducer(cmd.GetCloseProducer())
 	case pb.BaseCommand_CLOSE_CONSUMER:
@@ -501,6 +507,10 @@ func (c *connection) Close() {
 	for _, listener := range c.listeners {
 		listener.ConnectionClosed()
 	}
+
+	for _, cnx := range c.connWrapper.Consumers {
+		cnx.ConnectionClosed()
+	}
 }
 
 func (c *connection) changeState(state connectionState) {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 391c38f..ae3df35 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -486,3 +486,26 @@ func TestMessageRouter(t *testing.T) {
 	assert.NotNil(t, msg)
 	assert.Equal(t, string(msg.Payload()), "hello")
 }
+
+func TestNonPersistentTopic(t *testing.T) {
+	topicName := "non-persistent://public/default/testNonPersistentTopic"
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "my-sub",
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+}