You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:53 UTC
[pulsar-client-go] 16/38: Producer close
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 98552f3665b7f2964aeef2484de3e3db2b150b4c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Apr 11 11:39:59 2019 -0700
Producer close
---
pulsar/impl/commands.go | 2 ++
pulsar/impl/connection.go | 8 +++++
pulsar/impl/rpc_client.go | 20 ++++++++++++
pulsar/impl_partition_producer.go | 65 ++++++++++++++++++++++++++++++++++++---
4 files changed, 90 insertions(+), 5 deletions(-)
diff --git a/pulsar/impl/commands.go b/pulsar/impl/commands.go
index 08f8124..c2db791 100644
--- a/pulsar/impl/commands.go
+++ b/pulsar/impl/commands.go
@@ -29,6 +29,8 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
cmd.Pong = msg.(*pb.CommandPong)
case pb.BaseCommand_SEND:
cmd.Send = msg.(*pb.CommandSend)
+ case pb.BaseCommand_CLOSE_PRODUCER:
+ cmd.CloseProducer = msg.(*pb.CommandCloseProducer)
default:
log.Panic("Missing command type: ", cmdType)
}
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 0cba51d..8cdcf8b 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -25,6 +25,7 @@ type Connection interface {
SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
WriteData(data []byte)
RegisterListener(id uint64, listener ConnectionListener)
+ UnregisterListener(id uint64)
Close()
}
@@ -351,6 +352,13 @@ func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
c.listeners[id] = listener
}
+func (c *connection) UnregisterListener(id uint64) {
+ c.Lock()
+ defer c.Unlock()
+
+ delete(c.listeners, id)
+}
+
func (c *connection) Close() {
c.Lock()
defer c.Unlock()
diff --git a/pulsar/impl/rpc_client.go b/pulsar/impl/rpc_client.go
index 2aa6106..706b9a1 100644
--- a/pulsar/impl/rpc_client.go
+++ b/pulsar/impl/rpc_client.go
@@ -26,6 +26,8 @@ type RpcClient interface {
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestId uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
+
+ RequestOnCnx(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
}
type rpcClient struct {
@@ -72,6 +74,24 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
return rpcResult, nil
}
+func (c *rpcClient) RequestOnCnx(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type,
+ message proto.Message) (*RpcResult, error) {
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+
+ rpcResult := &RpcResult{
+ Cnx: cnx,
+ }
+
+ cnx.SendRequest(requestId, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+ rpcResult.Response = response
+ wg.Done()
+ })
+
+ wg.Wait()
+ return rpcResult, nil
+}
+
func (c *rpcClient) NewRequestId() uint64 {
return atomic.AddUint64(&c.requestIdGenerator, 1)
}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index d0cd8cf..c99ea67 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -11,12 +11,20 @@ import (
"time"
)
+type producerState int
+
+const (
+ producerInit = iota
+ producerReady
+ producerClosing
+ producerClosed
+)
+
type partitionProducer struct {
+ state producerState
client *client
topic string
log *log.Entry
- mutex sync.Mutex
- cond *sync.Cond
cnx impl.Connection
options *ProducerOptions
@@ -52,6 +60,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
}
p := &partitionProducer{
+ state: producerInit,
log: log.WithField("topic", topic),
client: client,
topic: topic,
@@ -74,6 +83,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
} else {
p.log = p.log.WithField("name", *p.producerName)
p.log.Info("Created producer")
+ p.state = producerReady
go p.runEventsLoop()
return p, nil
}
@@ -136,6 +146,11 @@ func (p *partitionProducer) reconnectToBroker() {
p.log.Info("Reconnecting to broker")
backoff := impl.Backoff{}
for {
+ if p.state != producerReady {
+ // Producer is already closing
+ return
+ }
+
err := p.grabCnx()
if err == nil {
// Successfully reconnected
@@ -153,6 +168,10 @@ func (p *partitionProducer) runEventsLoop() {
for {
select {
case i := <-p.eventsChan:
+ if i == nil {
+ return
+ }
+
switch v := i.(type) {
case *sendRequest:
p.internalSend(v)
@@ -286,6 +305,33 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
}
}
+func (p *partitionProducer) internalClose(req *closeProducer) {
+ if p.state != producerReady {
+ req.waitGroup.Done()
+ return
+ }
+
+ p.state = producerClosing
+ p.log.Info("Closing producer")
+
+ id := p.client.rpcClient.NewRequestId()
+ _, err := p.client.rpcClient.RequestOnCnx(p.cnx, id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
+ ProducerId: &p.producerId,
+ RequestId: &id,
+ })
+
+ if err != nil {
+ req.err = err
+ } else {
+ p.log.Info("Closed producer")
+ p.state = producerClosed
+ p.cnx.UnregisterListener(p.producerId)
+ p.batchFlushTicker.Stop()
+ }
+
+ req.waitGroup.Done()
+}
+
func (p *partitionProducer) LastSequenceID() int64 {
// TODO: return real last sequence id
return -1
@@ -296,8 +342,15 @@ func (p *partitionProducer) Flush() error {
}
func (p *partitionProducer) Close() error {
- p.log.Info("Closing producer")
- return nil
+
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+
+ cp := &closeProducer{&wg, nil}
+ p.eventsChan <- cp
+
+ wg.Wait()
+ return cp.err
}
type sendRequest struct {
@@ -308,4 +361,6 @@ type sendRequest struct {
}
type closeProducer struct {
-}
\ No newline at end of file
+ waitGroup *sync.WaitGroup
+ err error
+}