You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/08/22 04:21:09 UTC
[pulsar-client-go] branch master updated: Add some test cases for
project (#58)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new de147fa Add some test cases for project (#58)
de147fa is described below
commit de147fa4f37447ea4207bda2f64b5e858941993b
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Thu Aug 22 12:21:05 2019 +0800
Add some test cases for project (#58)
### Motivation
- Add some test cases as follows:
- TestConsumer_EventTime
- TestNonPersistentTopic
- TestConsumer_Flow
- Fix consumer connection closed
- Add `pprof` for debug project
- Fix `flow` command logic
---
.gitignore | 2 ++
perf/pulsar-perf-go.go | 16 +++++++++
pulsar/consumer_test.go | 75 +++++++++++++++++++++++++++++++++++++++
pulsar/error.go | 5 ++-
pulsar/impl_partition_consumer.go | 61 ++++++++++++++++---------------
pulsar/internal/connection.go | 26 +++++++++-----
pulsar/producer_test.go | 23 ++++++++++++
7 files changed, 171 insertions(+), 37 deletions(-)
diff --git a/.gitignore b/.gitignore
index b37e915..7daaebc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,5 @@
# Output of the go coverage tool
*.out
+
+perf/perf
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index 06ac059..f831079 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -18,6 +18,11 @@
package main
import (
+ "fmt"
+ "net"
+ "net/http"
+ _ "net/http/pprof"
+
"github.com/spf13/cobra"
log "github.com/sirupsen/logrus"
@@ -30,6 +35,17 @@ type ClientArgs struct {
var clientArgs ClientArgs
func main() {
+ // use `go tool pprof http://localhost:3000/debug/pprof/profile` to get pprof file(cpu info)
+ // use `go tool pprof http://localhost:3000/debug/pprof/heap` to get inuse_space file
+ go func() {
+ listenAddr := net.JoinHostPort("localhost", "3000")
+ fmt.Printf("Profile server listening on %s\n", listenAddr)
+ profileRedirect := http.RedirectHandler("/debug/pprof", http.StatusSeeOther)
+ http.Handle("/", profileRedirect)
+ err := fmt.Errorf("%v", http.ListenAndServe(listenAddr, nil))
+ fmt.Println(err.Error())
+ }()
+
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
TimestampFormat: "15:04:05.000",
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f5e9ba3..cda9506 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -790,3 +790,78 @@ func TestConsumer_Seek(t *testing.T) {
t.Logf("again received message:%+v", msg.ID())
assert.Equal(t, "msg-content-4", string(msg.Payload()))
}
+
+func TestConsumer_EventTime(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-event-time"
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ et := timeFromUnixTimestampMillis(uint64(5))
+ err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte("test"),
+ EventTime: &et,
+ })
+ assert.Nil(t, err)
+
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, et, msg.EventTime())
+ assert.Equal(t, "test", string(msg.Payload()))
+}
+
+func TestConsumer_Flow(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-received-since-flow"
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ ReceiverQueueSize: 4,
+ })
+
+ for msgNum := 0; msgNum < 100; msgNum++ {
+ if err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ for msgNum := 0; msgNum < 100; msgNum++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d", msgNum), string(msg.Payload()))
+ }
+}
diff --git a/pulsar/error.go b/pulsar/error.go
index ec20844..bd7e037 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -45,7 +45,8 @@ const (
//BrokerMetadataError Result = 10 // Broker failed in updating metadata
//BrokerPersistenceError Result = 11 // Broker failed to persist entry
//ChecksumError Result = 12 // Corrupt message checksum failure
- //ConsumerBusy Result = 13 // Exclusive consumer is already connected
+ // ConsumerBusy means Exclusive consumer is already connected
+ ConsumerBusy Result = 13
//NotConnectedError Result = 14 // Producer/Consumer is not currently connected to broker
//AlreadyClosedError Result = 15 // Producer/Consumer is already closed and not accepting any operation
//InvalidMessage Result = 16 // Error in publishing an already used message
@@ -104,6 +105,8 @@ func getResultStr(r Result) string {
return "InvalidTopicName"
case ResultConnectError:
return "ConnectError"
+ case ConsumerBusy:
+ return "ConsumerBusy"
default:
return fmt.Sprintf("Result(%d)", r)
}
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 4e29d25..6600ca4 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -63,25 +63,27 @@ type partitionConsumer struct {
omu sync.Mutex // protects following
redeliverMessages []*pb.MessageIdData
- unAckTracker *UnackedMessageTracker
+ unAckTracker *UnackedMessageTracker
+ receivedSinceFlow uint32
eventsChan chan interface{}
partitionIdx int
partitionNum int
}
-func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan ConsumerMessage) (*partitionConsumer, error) {
+func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan<- ConsumerMessage) (*partitionConsumer, error) {
c := &partitionConsumer{
- state: consumerInit,
- client: client,
- topic: topic,
- options: options,
- log: log.WithField("topic", topic),
- consumerID: client.rpcClient.NewConsumerID(),
- partitionIdx: partitionID,
- partitionNum: partitionNum,
- eventsChan: make(chan interface{}, 1),
- subQueue: make(chan ConsumerMessage, options.ReceiverQueueSize),
+ state: consumerInit,
+ client: client,
+ topic: topic,
+ options: options,
+ log: log.WithField("topic", topic),
+ consumerID: client.rpcClient.NewConsumerID(),
+ partitionIdx: partitionID,
+ partitionNum: partitionNum,
+ eventsChan: make(chan interface{}, 1),
+ subQueue: make(chan ConsumerMessage, options.ReceiverQueueSize),
+ receivedSinceFlow: 0,
}
c.setDefault(options)
@@ -167,7 +169,7 @@ func (pc *partitionConsumer) grabCnx() error {
return err
}
- pc.log.Infof("Lookup result: %v", lr)
+ pc.log.Debugf("Lookup result: %v", lr)
requestID := pc.client.rpcClient.NewRequestID()
res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{
@@ -262,26 +264,33 @@ func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
return nil
}
-func (pc *partitionConsumer) increaseAvailablePermits(receivedSinceFlow uint32) error {
- highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1))
- if receivedSinceFlow >= highwater {
- if err := pc.internalFlow(receivedSinceFlow); err != nil {
- pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+func (pc *partitionConsumer) increaseAvailablePermits() error {
+ pc.receivedSinceFlow++
+ highWater := uint32(math.Max(float64(pc.options.ReceiverQueueSize/2), 1))
+
+ pc.log.Debugf("receivedSinceFlow size is: %d, highWater size is: %d", pc.receivedSinceFlow, highWater)
+
+ // send flow request after 1/2 of the queue has been consumed
+ if pc.receivedSinceFlow >= highWater {
+ pc.log.Debugf("send flow command to broker, permits size is: %d", pc.receivedSinceFlow)
+ err := pc.internalFlow(pc.receivedSinceFlow)
+ if err != nil {
+ pc.log.Errorf("Send flow cmd error:%s", err.Error())
+ pc.receivedSinceFlow = 0
return err
}
- receivedSinceFlow = 0
+ pc.receivedSinceFlow = 0
}
return nil
}
-func (pc *partitionConsumer) messageProcessed(msgID MessageID, receivedSinceFlow uint32) error {
+func (pc *partitionConsumer) messageProcessed(msgID MessageID) error {
err := pc.trackMessage(msgID)
if err != nil {
return err
}
- receivedSinceFlow++
- err = pc.increaseAvailablePermits(receivedSinceFlow)
+ err = pc.increaseAvailablePermits()
if err != nil {
return err
}
@@ -303,19 +312,16 @@ func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err
}
func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
- var receivedSinceFlow uint32
-
for {
select {
case tmpMsg, ok := <-pc.subQueue:
if ok {
msgs <- tmpMsg
- err := pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow)
+ err := pc.messageProcessed(tmpMsg.ID())
if err != nil {
return err
}
- receivedSinceFlow = 0
continue
}
break
@@ -326,13 +332,12 @@ func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Consu
}
func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
- var receivedSinceFlow uint32
var err error
select {
case tmpMsg, ok := <-pc.subQueue:
if ok {
- err = pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow)
+ err = pc.messageProcessed(tmpMsg.ID())
callback(tmpMsg.Message, err)
if err != nil {
pc.log.Errorf("processed messages error:%s", err.Error())
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 7319662..8084017 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -314,30 +314,36 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
switch *cmd.Type {
case pb.BaseCommand_SUCCESS:
- c.handleResponse(*cmd.Success.RequestId, cmd)
+ c.handleResponse(cmd.Success.GetRequestId(), cmd)
case pb.BaseCommand_PRODUCER_SUCCESS:
- c.handleResponse(*cmd.ProducerSuccess.RequestId, cmd)
+ c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
- c.handleResponse(*cmd.PartitionMetadataResponse.RequestId, cmd)
+ c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), cmd)
case pb.BaseCommand_LOOKUP_RESPONSE:
- c.handleResponse(*cmd.LookupTopicResponse.RequestId, cmd)
+ lookupResult := cmd.LookupTopicResponse
+ c.handleResponse(lookupResult.GetRequestId(), cmd)
case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
- c.handleResponse(*cmd.ConsumerStatsResponse.RequestId, cmd)
+ c.handleResponse(cmd.ConsumerStatsResponse.GetRequestId(), cmd)
case pb.BaseCommand_GET_LAST_MESSAGE_ID_RESPONSE:
- c.handleResponse(*cmd.GetLastMessageIdResponse.RequestId, cmd)
+ c.handleResponse(cmd.GetLastMessageIdResponse.GetRequestId(), cmd)
case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE:
- c.handleResponse(*cmd.GetTopicsOfNamespaceResponse.RequestId, cmd)
+ c.handleResponse(cmd.GetTopicsOfNamespaceResponse.GetRequestId(), cmd)
case pb.BaseCommand_GET_SCHEMA_RESPONSE:
- c.handleResponse(*cmd.GetSchemaResponse.RequestId, cmd)
+ c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)
case pb.BaseCommand_ERROR:
+ if cmd.Error != nil {
+ c.log.Errorf("Error: %s, Error Message: %s", cmd.Error.GetError(), cmd.Error.GetMessage())
+ c.Close()
+ return
+ }
case pb.BaseCommand_CLOSE_PRODUCER:
c.handleCloseProducer(cmd.GetCloseProducer())
case pb.BaseCommand_CLOSE_CONSUMER:
@@ -501,6 +507,10 @@ func (c *connection) Close() {
for _, listener := range c.listeners {
listener.ConnectionClosed()
}
+
+ for _, cnx := range c.connWrapper.Consumers {
+ cnx.ConnectionClosed()
+ }
}
func (c *connection) changeState(state connectionState) {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 391c38f..ae3df35 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -486,3 +486,26 @@ func TestMessageRouter(t *testing.T) {
assert.NotNil(t, msg)
assert.Equal(t, string(msg.Payload()), "hello")
}
+
+func TestNonPersistentTopic(t *testing.T) {
+ topicName := "non-persistent://public/default/testNonPersistentTopic"
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "my-sub",
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+}