You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:53 UTC

[pulsar-client-go] 16/38: Producer close

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 98552f3665b7f2964aeef2484de3e3db2b150b4c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Apr 11 11:39:59 2019 -0700

    Producer close
---
 pulsar/impl/commands.go           |  2 ++
 pulsar/impl/connection.go         |  8 +++++
 pulsar/impl/rpc_client.go         | 20 ++++++++++++
 pulsar/impl_partition_producer.go | 65 ++++++++++++++++++++++++++++++++++++---
 4 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/pulsar/impl/commands.go b/pulsar/impl/commands.go
index 08f8124..c2db791 100644
--- a/pulsar/impl/commands.go
+++ b/pulsar/impl/commands.go
@@ -29,6 +29,8 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
 		cmd.Pong = msg.(*pb.CommandPong)
 	case pb.BaseCommand_SEND:
 		cmd.Send = msg.(*pb.CommandSend)
+	case pb.BaseCommand_CLOSE_PRODUCER:
+		cmd.CloseProducer = msg.(*pb.CommandCloseProducer)
 	default:
 		log.Panic("Missing command type: ", cmdType)
 	}
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 0cba51d..8cdcf8b 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -25,6 +25,7 @@ type Connection interface {
 	SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
 	WriteData(data []byte)
 	RegisterListener(id uint64, listener ConnectionListener)
+	UnregisterListener(id uint64)
 	Close()
 }
 
@@ -351,6 +352,13 @@ func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
 	c.listeners[id] = listener
 }
 
+func (c *connection) UnregisterListener(id uint64) {
+	c.Lock()
+	defer c.Unlock()
+
+	delete(c.listeners, id)
+}
+
 func (c *connection) Close() {
 	c.Lock()
 	defer c.Unlock()
diff --git a/pulsar/impl/rpc_client.go b/pulsar/impl/rpc_client.go
index 2aa6106..706b9a1 100644
--- a/pulsar/impl/rpc_client.go
+++ b/pulsar/impl/rpc_client.go
@@ -26,6 +26,8 @@ type RpcClient interface {
 
 	Request(logicalAddr *url.URL, physicalAddr *url.URL, requestId uint64,
 		cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
+
+	RequestOnCnx(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
 }
 
 type rpcClient struct {
@@ -72,6 +74,24 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
 	return rpcResult, nil
 }
 
+func (c *rpcClient) RequestOnCnx(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type,
+	message proto.Message) (*RpcResult, error) {
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	rpcResult := &RpcResult{
+		Cnx: cnx,
+	}
+
+	cnx.SendRequest(requestId, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+		rpcResult.Response = response
+		wg.Done()
+	})
+
+	wg.Wait()
+	return rpcResult, nil
+}
+
 func (c *rpcClient) NewRequestId() uint64 {
 	return atomic.AddUint64(&c.requestIdGenerator, 1)
 }
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index d0cd8cf..c99ea67 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -11,12 +11,20 @@ import (
 	"time"
 )
 
+type producerState int
+
+const (
+	producerInit = iota
+	producerReady
+	producerClosing
+	producerClosed
+)
+
 type partitionProducer struct {
+	state  producerState
 	client *client
 	topic  string
 	log    *log.Entry
-	mutex  sync.Mutex
-	cond   *sync.Cond
 	cnx    impl.Connection
 
 	options             *ProducerOptions
@@ -52,6 +60,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 	}
 
 	p := &partitionProducer{
+		state:            producerInit,
 		log:              log.WithField("topic", topic),
 		client:           client,
 		topic:            topic,
@@ -74,6 +83,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 	} else {
 		p.log = p.log.WithField("name", *p.producerName)
 		p.log.Info("Created producer")
+		p.state = producerReady
 		go p.runEventsLoop()
 		return p, nil
 	}
@@ -136,6 +146,11 @@ func (p *partitionProducer) reconnectToBroker() {
 	p.log.Info("Reconnecting to broker")
 	backoff := impl.Backoff{}
 	for {
+		if p.state != producerReady {
+			// Producer is already closing
+			return
+		}
+
 		err := p.grabCnx()
 		if err == nil {
 			// Successfully reconnected
@@ -153,6 +168,10 @@ func (p *partitionProducer) runEventsLoop() {
 	for {
 		select {
 		case i := <-p.eventsChan:
+			if i == nil {
+				return
+			}
+
 			switch v := i.(type) {
 			case *sendRequest:
 				p.internalSend(v)
@@ -286,6 +305,33 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	}
 }
 
+func (p *partitionProducer) internalClose(req *closeProducer) {
+	if p.state != producerReady {
+		req.waitGroup.Done()
+		return
+	}
+
+	p.state = producerClosing
+	p.log.Info("Closing producer")
+
+	id := p.client.rpcClient.NewRequestId()
+	_, err := p.client.rpcClient.RequestOnCnx(p.cnx, id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
+		ProducerId: &p.producerId,
+		RequestId:  &id,
+	})
+
+	if err != nil {
+		req.err = err
+	} else {
+		p.log.Info("Closed producer")
+		p.state = producerClosed
+		p.cnx.UnregisterListener(p.producerId)
+		p.batchFlushTicker.Stop()
+	}
+
+	req.waitGroup.Done()
+}
+
 func (p *partitionProducer) LastSequenceID() int64 {
 	// TODO: return real last sequence id
 	return -1
@@ -296,8 +342,15 @@ func (p *partitionProducer) Flush() error {
 }
 
 func (p *partitionProducer) Close() error {
-	p.log.Info("Closing producer")
-	return nil
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	cp := &closeProducer{&wg, nil}
+	p.eventsChan <- cp
+
+	wg.Wait()
+	return cp.err
 }
 
 type sendRequest struct {
@@ -308,4 +361,6 @@ type sendRequest struct {
 }
 
 type closeProducer struct {
-}
\ No newline at end of file
+	waitGroup *sync.WaitGroup
+	err       error
+}