You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/10/09 09:11:21 UTC
[pulsar-client-go] branch master updated: Fix deadlock when
connection closed (#376)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c03f45f Fix deadlock when connection closed (#376)
c03f45f is described below
commit c03f45fe8191cffbad9f1f658b06bba8e8ddbd4b
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Fri Oct 9 17:11:13 2020 +0800
Fix deadlock when connection closed (#376)
Fixes #366
### Motivation
In current code of `pulsar/internal/connection.go` we have 2 channels, closeCh and incomingRequestsCh. when the connection closes, the current mis-use of these 2 channels may have a deadlock.
PR #366 has detailed steps to reproduce and the root cause [analysis](https://github.com/apache/pulsar-client-go/pull/366#issuecomment-696759873) .
This PR tries to fix the deadlock.
### Modifications
- make the close logic independent, not in the same loop of normal events handling.
- when the connection closed, handle the existing requests in the channel and return an error to avoid deadlock.
### Verifying this change
passed the tests in #366
current ut passed
---
go.mod | 2 ++
pulsar/consumer_partition.go | 36 +++++++++++++++--------
pulsar/consumer_regex_test.go | 28 ++----------------
pulsar/internal/commands.go | 2 ++
pulsar/internal/connection.go | 54 ++++++++++++++++++++++------------
pulsar/internal/lookup_service_test.go | 3 +-
pulsar/internal/rpc_client.go | 7 ++---
pulsar/producer_partition.go | 10 ++++---
8 files changed, 78 insertions(+), 64 deletions(-)
diff --git a/go.mod b/go.mod
index 45b6241..3e41d06 100644
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,8 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
github.com/kr/pretty v0.2.0 // indirect
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+ github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9f555d2..8de3f38 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -150,10 +150,11 @@ type partitionConsumer struct {
startMessageID trackingMessageID
lastDequeuedMsg trackingMessageID
- eventsCh chan interface{}
- connectedCh chan struct{}
- closeCh chan struct{}
- clearQueueCh chan func(id trackingMessageID)
+ eventsCh chan interface{}
+ connectedCh chan struct{}
+ connectClosedCh chan connectionClosed
+ closeCh chan struct{}
+ clearQueueCh chan func(id trackingMessageID)
nackTracker *negativeAcksTracker
dlq *dlqRouter
@@ -174,12 +175,13 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: int32(options.partitionIdx),
- eventsCh: make(chan interface{}, 3),
+ eventsCh: make(chan interface{}, 10),
queueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
startMessageID: options.startMessageID,
connectedCh: make(chan struct{}),
messageCh: messageCh,
+ connectClosedCh: make(chan connectionClosed, 10),
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id trackingMessageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
@@ -566,7 +568,8 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) b
func (pc *partitionConsumer) ConnectionClosed() {
// Trigger reconnection in the consumer goroutine
- pc.eventsCh <- &connectionClosed{}
+ pc.log.Debug("connection closed and send to connectClosedCh")
+ pc.connectClosedCh <- connectionClosed{}
}
// Flow command gives additional permits to send messages to the consumer.
@@ -733,11 +736,22 @@ func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Debug("exiting events loop")
}()
+ pc.log.Debug("get into runEventsLoop")
+
+ go func() {
+ for {
+ select {
+ case <-pc.closeCh:
+ return
+ case <-pc.connectClosedCh:
+ pc.log.Debug("runEventsLoop will reconnect")
+ pc.reconnectToBroker()
+ }
+ }
+ }()
+
for {
- select {
- case <-pc.closeCh:
- return
- case i := <-pc.eventsCh:
+ for i := range pc.eventsCh {
switch v := i.(type) {
case *ackRequest:
pc.internalAck(v)
@@ -751,8 +765,6 @@ func (pc *partitionConsumer) runEventsLoop() {
pc.internalSeek(v)
case *seekByTimeRequest:
pc.internalSeekByTime(v)
- case *connectionClosed:
- pc.reconnectToBroker()
case *closeRequest:
pc.internalClose(v)
return
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index ed27373..228fc2d 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -176,23 +176,11 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
if err != nil {
t.Fatal(err)
}
-
rc.discover()
- time.Sleep(300 * time.Millisecond)
+ time.Sleep(2000 * time.Millisecond)
consumers = cloneConsumers(rc)
assert.Equal(t, 1, len(consumers))
-
- // delete the topic
- if err := deleteTopic(topic); err != nil {
- t.Fatal(err)
- }
-
- rc.discover()
- time.Sleep(300 * time.Millisecond)
-
- consumers = cloneConsumers(rc)
- assert.Equal(t, 0, len(consumers))
}
func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string) {
@@ -228,7 +216,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
defer deleteTopic(myTopic)
rc.discover()
- time.Sleep(300 * time.Millisecond)
+ time.Sleep(2000 * time.Millisecond)
consumers = cloneConsumers(rc)
assert.Equal(t, 0, len(consumers))
@@ -241,20 +229,10 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
}
rc.discover()
- time.Sleep(300 * time.Millisecond)
+ time.Sleep(2000 * time.Millisecond)
consumers = cloneConsumers(rc)
assert.Equal(t, 1, len(consumers))
-
- // delete the topic
- err = deleteTopic(fooTopic)
- assert.Nil(t, err)
-
- rc.discover()
- time.Sleep(300 * time.Millisecond)
-
- consumers = cloneConsumers(rc)
- assert.Equal(t, 0, len(consumers))
}
func TestRegexConsumer(t *testing.T) {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 2536bae..a4d5e5f 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -45,6 +45,8 @@ var ErrCorruptedMessage = errors.New("corrupted message")
// ErrEOM is the error returned by ReadMessage when no more input is available.
var ErrEOM = errors.New("EOF")
+var ErrConnectionClosed = errors.New("connection closed")
+
func NewMessageReader(headersAndPayload Buffer) *MessageReader {
return &MessageReader{
buffer: headersAndPayload,
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index aff263b..7270c30 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -89,7 +89,7 @@ type ConnectionListener interface {
// Connection is a interface of client cnx.
type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
- SendRequestNoWait(req *pb.BaseCommand)
+ SendRequestNoWait(req *pb.BaseCommand) error
WriteData(data Buffer)
RegisterListener(id uint64, listener ConnectionListener)
UnregisterListener(id uint64)
@@ -110,21 +110,15 @@ type ConsumerHandler interface {
type connectionState int32
const (
- connectionInit = 0
- connectionConnecting = 1
- connectionTCPConnected = 2
- connectionReady = 3
- connectionClosed = 4
+ connectionInit = 0
+ connectionReady = 1
+ connectionClosed = 2
)
func (s connectionState) String() string {
switch s {
case connectionInit:
return "Initializing"
- case connectionConnecting:
- return "Connecting"
- case connectionTCPConnected:
- return "TCPConnected"
case connectionReady:
return "Ready"
case connectionClosed:
@@ -286,8 +280,6 @@ func (c *connection) connect() bool {
c.log.Info("TCP connection established")
c.Unlock()
- c.changeState(connectionTCPConnected)
-
return true
}
@@ -358,11 +350,20 @@ func (c *connection) waitUntilReady() error {
return nil
}
+func (c *connection) failLeftRequestsWhenClose() {
+ for req := range c.incomingRequestsCh {
+ c.internalSendRequest(req)
+ }
+ close(c.incomingRequestsCh)
+}
+
func (c *connection) run() {
// All reads come from the reader goroutine
go c.reader.readFromConnection()
go c.runPingCheck()
+ c.log.Debugf("Connection run start channel %+v, requestLength %d", c, len(c.incomingRequestsCh))
+
defer func() {
// all the accesses to the pendingReqs should be happened in this run loop thread,
// including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239
@@ -379,6 +380,7 @@ func (c *connection) run() {
for {
select {
case <-c.closeCh:
+ c.failLeftRequestsWhenClose()
return
case req := <-c.incomingRequestsCh:
@@ -563,19 +565,28 @@ func (c *connection) Write(data Buffer) {
func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
callback func(command *pb.BaseCommand, err error)) {
- c.incomingRequestsCh <- &request{
- id: &requestID,
- cmd: req,
- callback: callback,
+ if c.state == connectionClosed {
+ callback(req, ErrConnectionClosed)
+ } else {
+ c.incomingRequestsCh <- &request{
+ id: &requestID,
+ cmd: req,
+ callback: callback,
+ }
}
}
-func (c *connection) SendRequestNoWait(req *pb.BaseCommand) {
+func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
+ if c.state == connectionClosed {
+ return ErrConnectionClosed
+ }
+
c.incomingRequestsCh <- &request{
id: nil,
cmd: req,
callback: nil,
}
+ return nil
}
func (c *connection) internalSendRequest(req *request) {
@@ -584,7 +595,14 @@ func (c *connection) internalSendRequest(req *request) {
c.pendingReqs[*req.id] = req
}
c.pendingLock.Unlock()
- c.writeCommand(req.cmd)
+ if c.state == connectionClosed {
+ c.log.Warnf("internalSendRequest failed for connectionClosed")
+ if req.callback != nil {
+ req.callback(req.cmd, ErrConnectionClosed)
+ }
+ } else {
+ c.writeCommand(req.cmd)
+ }
}
func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index fa22f18..aea9356 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -99,8 +99,9 @@ func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType
return nil, nil
}
-func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) {
+func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
assert.Fail(c.t, "Shouldn't be called")
+ return nil
}
func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupTopicResponse_LookupType {
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index c7d810a..f53c16b 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -59,7 +59,7 @@ type RPCClient interface {
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
- RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message)
+ RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error
RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
}
@@ -103,7 +103,6 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
}
ch := make(chan Res, 10)
- // TODO: in here, the error of callback always nil
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
ch <- Res{&RPCResult{
Cnx: cnx,
@@ -162,9 +161,9 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.Ba
return rpcResult, rpcErr
}
-func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) {
+func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
rpcRequestCount.Inc()
- cnx.SendRequestNoWait(baseCommand(cmdType, message))
+ return cnx.SendRequestNoWait(baseCommand(cmdType, message))
}
func (c *rpcClient) NewRequestID() uint64 {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e8cf0f7..d245c33 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -101,6 +101,8 @@ type partitionProducer struct {
// Channel where app is posting messages to be published
eventsChan chan interface{}
+ connectClosedCh chan connectionClosed
+
publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
lastSequenceID int64
@@ -133,6 +135,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
+ connectClosedCh: make(chan connectionClosed, 10),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
@@ -236,7 +239,7 @@ func (p *partitionProducer) GetBuffer() internal.Buffer {
func (p *partitionProducer) ConnectionClosed() {
// Trigger reconnection in the produce goroutine
p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed")
- p.eventsChan <- &connectionClosed{}
+ p.connectClosedCh <- connectionClosed{}
}
func (p *partitionProducer) reconnectToBroker() {
@@ -267,15 +270,14 @@ func (p *partitionProducer) runEventsLoop() {
switch v := i.(type) {
case *sendRequest:
p.internalSend(v)
- case *connectionClosed:
- p.reconnectToBroker()
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
-
+ case <-p.connectClosedCh:
+ p.reconnectToBroker()
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}