You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/08/20 08:06:14 UTC
[pulsar-client-go] branch master updated: [Issue:30] Support seek
logic and add test case (#56)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 49b7a3c [Issue:30] Support seek logic and add test case (#56)
49b7a3c is described below
commit 49b7a3c43807aae16ef117a2370484dfe46d965e
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Tue Aug 20 16:06:09 2019 +0800
[Issue:30] Support seek logic and add test case (#56)
* [Issue:30] Support seek logic and add test case
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix a little
Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
pulsar/consumer_test.go | 68 +++++++++++++++++++++++++++++++++++++++
pulsar/impl_consumer.go | 8 ++---
pulsar/impl_partition_consumer.go | 17 ++++------
pulsar/impl_partition_producer.go | 2 +-
pulsar/internal/connection.go | 33 +++++++++++++++++--
pulsar/producer.go | 2 +-
6 files changed, 111 insertions(+), 19 deletions(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 97b4264..f5e9ba3 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -722,3 +722,71 @@ func TestConsumer_Shared(t *testing.T) {
res := util.RemoveDuplicateElement(msgList)
assert.Equal(t, 10, len(res))
}
+
+func TestConsumer_Seek(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "persistent://public/default/testSeek"
+ testURL := adminURL + "/" + "admin/v2/persistent/public/default/testSeek"
+ makeHTTPCall(t, http.MethodPut, testURL, "1")
+ subName := "sub-testSeek"
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, producer.Topic(), topicName)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: subName,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, consumer.Topic(), topicName)
+ assert.Equal(t, consumer.Subscription(), subName)
+ defer consumer.Close()
+
+ ctx := context.Background()
+
+ // Send 10 messages synchronously
+ t.Log("Publishing 10 messages synchronously")
+ for msgNum := 0; msgNum < 10; msgNum++ {
+ if err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ t.Log("Trying to receive 10 messages")
+ idList := make([]MessageID, 0, 10)
+ for msgNum := 0; msgNum < 10; msgNum++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ idList = append(idList, msg.ID())
+ fmt.Println(string(msg.Payload()))
+ }
+
+ for index, id := range idList {
+ if index == 4 {
+ // seek to fourth message, expected receive fourth message.
+ err = consumer.Seek(id)
+ assert.Nil(t, err)
+ break
+ }
+ }
+
+ // Sleeping for 500ms to wait for consumer re-connect
+ time.Sleep(500 * time.Millisecond)
+
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ t.Logf("again received message:%+v", msg.ID())
+ assert.Equal(t, "msg-content-4", string(msg.Payload()))
+}
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
index 1f800b5..dcc1df8 100644
--- a/pulsar/impl_consumer.go
+++ b/pulsar/impl_consumer.go
@@ -268,10 +268,10 @@ func (c *consumer) Seek(msgID MessageID) error {
partition := id.GetPartition()
- // current topic is non-partition topic, we only need to get the first value in the consumers.
- if partition < 0 {
- partition = 0
- }
+ // current topic is non-partition topic, we only need to get the first value in the consumers.
+ if partition < 0 {
+ partition = 0
+ }
return c.consumers[partition].Seek(msgID)
}
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 3a729cd..4e29d25 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -80,7 +80,7 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: partitionID,
partitionNum: partitionNum,
- eventsChan: make(chan interface{}),
+ eventsChan: make(chan interface{}, 1),
subQueue: make(chan ConsumerMessage, options.ReceiverQueueSize),
}
@@ -167,7 +167,7 @@ func (pc *partitionConsumer) grabCnx() error {
return err
}
- pc.log.Debug("Lookup result: ", lr)
+ pc.log.Infof("Lookup result: %v", lr)
requestID := pc.client.rpcClient.NewRequestID()
res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{
@@ -192,16 +192,13 @@ func (pc *partitionConsumer) grabCnx() error {
pc.cnx = res.Cnx
pc.log.WithField("cnx", res.Cnx).Debug("Connected consumer")
+ pc.cnx.AddConsumeHandler(pc.consumerID, pc)
msgType := res.Response.GetType()
switch msgType {
case pb.BaseCommand_SUCCESS:
- pc.cnx.AddConsumeHandler(pc.consumerID, pc)
- if err := pc.internalFlow(uint32(pc.options.ReceiverQueueSize)); err != nil {
- return err
- }
- return nil
+ return pc.internalFlow(uint32(pc.options.ReceiverQueueSize))
case pb.BaseCommand_ERROR:
errMsg := res.Response.GetError()
return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
@@ -593,7 +590,6 @@ func (pc *partitionConsumer) internalClose(req *handlerClose) {
pc.log.Info("Closed consumer")
pc.state = consumerClosed
close(pc.options.MessageChannel)
- //pc.cnx.UnregisterListener(pc.consumerID)
}
req.waitGroup.Done()
@@ -711,13 +707,13 @@ type handlerClose struct {
type handleConnectionClosed struct{}
func (pc *partitionConsumer) ConnectionClosed() {
- // Trigger reconnection in the produce goroutine
+ // Trigger reconnection in the consumer goroutine
pc.eventsChan <- &handleConnectionClosed{}
}
func (pc *partitionConsumer) reconnectToBroker() {
pc.log.Info("Reconnecting to broker")
- backoff := internal.Backoff{}
+ backoff := new(internal.Backoff)
for {
if pc.state != consumerReady {
// Consumer is already closing
@@ -727,6 +723,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
err := pc.grabCnx()
if err == nil {
// Successfully reconnected
+ pc.log.Info("Successfully reconnected")
return
}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index e00c1e1..7cfc45d 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -89,7 +89,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
topic: topic,
options: options,
producerID: client.rpcClient.NewProducerID(),
- eventsChan: make(chan interface{}),
+ eventsChan: make(chan interface{}, 1),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: make(util.Semaphore, maxPendingMessages),
pendingQueue: util.NewBlockingQueue(maxPendingMessages),
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 2886a7c..7319662 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -28,10 +28,10 @@ import (
"sync/atomic"
"time"
- "github.com/golang/protobuf/proto"
-
"github.com/apache/pulsar-client-go/pkg/auth"
"github.com/apache/pulsar-client-go/pkg/pb"
+ "github.com/apache/pulsar-client-go/util"
+ "github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
)
@@ -65,6 +65,9 @@ type Connection interface {
type ConsumerHandler interface {
MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error
+
+ // ConnectionClosed close the TCP connection.
+ ConnectionClosed()
}
type connectionState int
@@ -336,7 +339,9 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
case pb.BaseCommand_ERROR:
case pb.BaseCommand_CLOSE_PRODUCER:
+ c.handleCloseProducer(cmd.GetCloseProducer())
case pb.BaseCommand_CLOSE_CONSUMER:
+ c.handleCloseConsumer(cmd.GetCloseConsumer())
case pb.BaseCommand_SEND_RECEIPT:
c.handleSendReceipt(cmd.GetSendReceipt())
@@ -398,7 +403,7 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
if producer, ok := c.listeners[producerID]; ok {
producer.ReceivedSendReceipt(response)
} else {
- c.log.WithField("producerId", producerID).Warn("Got unexpected send receipt for message: ", response.MessageId)
+ c.log.WithField("producerID", producerID).Warn("Got unexpected send receipt for message: ", response.MessageId)
}
}
@@ -438,6 +443,28 @@ func (c *connection) handlePing() {
c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{}))
}
+func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) {
+ c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId())
+ consumerID := closeConsumer.GetConsumerId()
+ if consumer, ok := c.connWrapper.Consumers[consumerID]; ok {
+ if !util.IsNil(consumer) {
+ consumer.ConnectionClosed()
+ }
+ } else {
+ c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer")
+ }
+}
+
+func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer) {
+ c.log.Infof("Broker notification of Closed consumer: %d", closeProducer.GetProducerId())
+ producerID := closeProducer.GetProducerId()
+ if producer, ok := c.listeners[producerID]; ok {
+ producer.ConnectionClosed()
+ } else {
+ c.log.WithField("producerID", producerID).Warn("Producer with ID not found while closing producer")
+ }
+}
+
func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
c.Lock()
defer c.Unlock()
diff --git a/pulsar/producer.go b/pulsar/producer.go
index ff27d3a..a199e23 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -66,7 +66,7 @@ type ProducerOptions struct {
// SendTimeout set the send timeout (default: 30 seconds)
// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
- // deduplication feature.
+ // duplication feature.
SendTimeout time.Duration
// MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.