You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/08/20 08:06:14 UTC

[pulsar-client-go] branch master updated: [Issue:30] Support seek logic and add test case (#56)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 49b7a3c  [Issue:30] Support seek logic and add test case (#56)
49b7a3c is described below

commit 49b7a3c43807aae16ef117a2370484dfe46d965e
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Tue Aug 20 16:06:09 2019 +0800

    [Issue:30] Support seek logic and add test case (#56)
    
    * [Issue:30] Support seek logic and add test case
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix a little
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
 pulsar/consumer_test.go           | 68 +++++++++++++++++++++++++++++++++++++++
 pulsar/impl_consumer.go           |  8 ++---
 pulsar/impl_partition_consumer.go | 17 ++++------
 pulsar/impl_partition_producer.go |  2 +-
 pulsar/internal/connection.go     | 33 +++++++++++++++++--
 pulsar/producer.go                |  2 +-
 6 files changed, 111 insertions(+), 19 deletions(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 97b4264..f5e9ba3 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -722,3 +722,71 @@ func TestConsumer_Shared(t *testing.T) {
 	res := util.RemoveDuplicateElement(msgList)
 	assert.Equal(t, 10, len(res))
 }
+
+func TestConsumer_Seek(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "persistent://public/default/testSeek"
+	testURL := adminURL + "/" + "admin/v2/persistent/public/default/testSeek"
+	makeHTTPCall(t, http.MethodPut, testURL, "1")
+	subName := "sub-testSeek"
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	assert.Equal(t, producer.Topic(), topicName)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: subName,
+	})
+	assert.Nil(t, err)
+	assert.Equal(t, consumer.Topic(), topicName)
+	assert.Equal(t, consumer.Subscription(), subName)
+	defer consumer.Close()
+
+	ctx := context.Background()
+
+	// Send 10 messages synchronously
+	t.Log("Publishing 10 messages synchronously")
+	for msgNum := 0; msgNum < 10; msgNum++ {
+		if err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	t.Log("Trying to receive 10 messages")
+	idList := make([]MessageID, 0, 10)
+	for msgNum := 0; msgNum < 10; msgNum++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		idList = append(idList, msg.ID())
+		fmt.Println(string(msg.Payload()))
+	}
+
+	for index, id := range idList {
+		if index == 4 {
+			// seek to fourth message, expected receive fourth message.
+			err = consumer.Seek(id)
+			assert.Nil(t, err)
+			break
+		}
+	}
+
+	// Sleeping for 500ms to wait for consumer re-connect
+	time.Sleep(500 * time.Millisecond)
+
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	t.Logf("again received message:%+v", msg.ID())
+	assert.Equal(t, "msg-content-4", string(msg.Payload()))
+}
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
index 1f800b5..dcc1df8 100644
--- a/pulsar/impl_consumer.go
+++ b/pulsar/impl_consumer.go
@@ -268,10 +268,10 @@ func (c *consumer) Seek(msgID MessageID) error {
 
 	partition := id.GetPartition()
 
-    // current topic is non-partition topic, we only need to get the first value in the consumers.
-    if partition < 0 {
-        partition = 0
-    }
+	// current topic is non-partition topic, we only need to get the first value in the consumers.
+	if partition < 0 {
+		partition = 0
+	}
 	return c.consumers[partition].Seek(msgID)
 }
 
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 3a729cd..4e29d25 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -80,7 +80,7 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions
 		consumerID:   client.rpcClient.NewConsumerID(),
 		partitionIdx: partitionID,
 		partitionNum: partitionNum,
-		eventsChan:   make(chan interface{}),
+		eventsChan:   make(chan interface{}, 1),
 		subQueue:     make(chan ConsumerMessage, options.ReceiverQueueSize),
 	}
 
@@ -167,7 +167,7 @@ func (pc *partitionConsumer) grabCnx() error {
 		return err
 	}
 
-	pc.log.Debug("Lookup result: ", lr)
+	pc.log.Infof("Lookup result: %v", lr)
 	requestID := pc.client.rpcClient.NewRequestID()
 	res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
 		pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{
@@ -192,16 +192,13 @@ func (pc *partitionConsumer) grabCnx() error {
 
 	pc.cnx = res.Cnx
 	pc.log.WithField("cnx", res.Cnx).Debug("Connected consumer")
+	pc.cnx.AddConsumeHandler(pc.consumerID, pc)
 
 	msgType := res.Response.GetType()
 
 	switch msgType {
 	case pb.BaseCommand_SUCCESS:
-		pc.cnx.AddConsumeHandler(pc.consumerID, pc)
-		if err := pc.internalFlow(uint32(pc.options.ReceiverQueueSize)); err != nil {
-			return err
-		}
-		return nil
+		return pc.internalFlow(uint32(pc.options.ReceiverQueueSize))
 	case pb.BaseCommand_ERROR:
 		errMsg := res.Response.GetError()
 		return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
@@ -593,7 +590,6 @@ func (pc *partitionConsumer) internalClose(req *handlerClose) {
 		pc.log.Info("Closed consumer")
 		pc.state = consumerClosed
 		close(pc.options.MessageChannel)
-		//pc.cnx.UnregisterListener(pc.consumerID)
 	}
 
 	req.waitGroup.Done()
@@ -711,13 +707,13 @@ type handlerClose struct {
 type handleConnectionClosed struct{}
 
 func (pc *partitionConsumer) ConnectionClosed() {
-	// Trigger reconnection in the produce goroutine
+	// Trigger reconnection in the consumer goroutine
 	pc.eventsChan <- &handleConnectionClosed{}
 }
 
 func (pc *partitionConsumer) reconnectToBroker() {
 	pc.log.Info("Reconnecting to broker")
-	backoff := internal.Backoff{}
+	backoff := new(internal.Backoff)
 	for {
 		if pc.state != consumerReady {
 			// Consumer is already closing
@@ -727,6 +723,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
 		err := pc.grabCnx()
 		if err == nil {
 			// Successfully reconnected
+			pc.log.Info("Successfully reconnected")
 			return
 		}
 
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index e00c1e1..7cfc45d 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -89,7 +89,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 		topic:            topic,
 		options:          options,
 		producerID:       client.rpcClient.NewProducerID(),
-		eventsChan:       make(chan interface{}),
+		eventsChan:       make(chan interface{}, 1),
 		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
 		publishSemaphore: make(util.Semaphore, maxPendingMessages),
 		pendingQueue:     util.NewBlockingQueue(maxPendingMessages),
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 2886a7c..7319662 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -28,10 +28,10 @@ import (
 	"sync/atomic"
 	"time"
 
-	"github.com/golang/protobuf/proto"
-
 	"github.com/apache/pulsar-client-go/pkg/auth"
 	"github.com/apache/pulsar-client-go/pkg/pb"
+	"github.com/apache/pulsar-client-go/util"
+	"github.com/golang/protobuf/proto"
 	log "github.com/sirupsen/logrus"
 )
 
@@ -65,6 +65,9 @@ type Connection interface {
 
 type ConsumerHandler interface {
 	MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error
+
+	// ConnectionClosed close the TCP connection.
+	ConnectionClosed()
 }
 
 type connectionState int
@@ -336,7 +339,9 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
 
 	case pb.BaseCommand_ERROR:
 	case pb.BaseCommand_CLOSE_PRODUCER:
+		c.handleCloseProducer(cmd.GetCloseProducer())
 	case pb.BaseCommand_CLOSE_CONSUMER:
+		c.handleCloseConsumer(cmd.GetCloseConsumer())
 
 	case pb.BaseCommand_SEND_RECEIPT:
 		c.handleSendReceipt(cmd.GetSendReceipt())
@@ -398,7 +403,7 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
 	if producer, ok := c.listeners[producerID]; ok {
 		producer.ReceivedSendReceipt(response)
 	} else {
-		c.log.WithField("producerId", producerID).Warn("Got unexpected send receipt for message: ", response.MessageId)
+		c.log.WithField("producerID", producerID).Warn("Got unexpected send receipt for message: ", response.MessageId)
 	}
 }
 
@@ -438,6 +443,28 @@ func (c *connection) handlePing() {
 	c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{}))
 }
 
+func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) {
+	c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId())
+	consumerID := closeConsumer.GetConsumerId()
+	if consumer, ok := c.connWrapper.Consumers[consumerID]; ok {
+		if !util.IsNil(consumer) {
+			consumer.ConnectionClosed()
+		}
+	} else {
+		c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer")
+	}
+}
+
+func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer) {
+	c.log.Infof("Broker notification of Closed consumer: %d", closeProducer.GetProducerId())
+	producerID := closeProducer.GetProducerId()
+	if producer, ok := c.listeners[producerID]; ok {
+		producer.ConnectionClosed()
+	} else {
+		c.log.WithField("producerID", producerID).Warn("Producer with ID not found while closing producer")
+	}
+}
+
 func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
 	c.Lock()
 	defer c.Unlock()
diff --git a/pulsar/producer.go b/pulsar/producer.go
index ff27d3a..a199e23 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -66,7 +66,7 @@ type ProducerOptions struct {
 	// SendTimeout set the send timeout (default: 30 seconds)
 	// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
 	// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
-	// deduplication feature.
+	// duplication feature.
 	SendTimeout time.Duration
 
 	// MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.