You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/17 09:53:28 UTC
[plc4x] branch develop updated: fix(plc4go/cbus): fixed go routine leak
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new a64125789 fix(plc4go/cbus): fixed go routine leak
a64125789 is described below
commit a6412578904c05634bc6124aa7b3b1c1811e50a6
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 11:53:22 2022 +0200
fix(plc4go/cbus): fixed go routine leak
https://go.dev/blog/concurrency-timeouts
---
plc4go/internal/cbus/Connection.go | 69 +++++++++++++++++++++++++-------------
1 file changed, 45 insertions(+), 24 deletions(-)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index a6d51a302..96c0ebfff 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -205,22 +205,11 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
return
}
c.fireConnected(ch)
+ c.startSubscriptionHandler()
+ c.startDefaultIncomingMessageHandler()
+}
- log.Debug().Msg("Starting subscription handler")
- go func() {
- log.Debug().Msg("Subscription handler stated")
- for c.IsConnected() {
- for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
- for _, subscriber := range c.subscribers {
- if ok := subscriber.handleMonitoredSal(monitoredSal); ok {
- log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
- }
- }
- }
- }
- log.Info().Msg("Ending subscription handler")
- }()
-
+func (c *Connection) startDefaultIncomingMessageHandler() {
log.Debug().Msg("Starting default incoming message handler")
go func() {
log.Debug().Msg("default incoming message handler started")
@@ -252,14 +241,31 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}()
}
+func (c *Connection) startSubscriptionHandler() {
+ log.Debug().Msg("Starting subscription handler")
+ go func() {
+ log.Debug().Msg("Subscription handler stated")
+ for c.IsConnected() {
+ for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
+ for _, subscriber := range c.subscribers {
+ if ok := subscriber.handleMonitoredSal(monitoredSal); ok {
+ log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
+ }
+ }
+ }
+ }
+ log.Info().Msg("Ending subscription handler")
+ }()
+}
+
func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, requestContext *readWriteModel.RequestContext, sendOutErrorNotification bool) (ok bool) {
log.Debug().Msgf("Send a reset (sendOutErrorNotification: %t)", sendOutErrorNotification)
requestTypeReset := readWriteModel.RequestType_RESET
requestReset := readWriteModel.NewRequestReset(requestTypeReset, &requestTypeReset, requestTypeReset, &requestTypeReset, requestTypeReset, nil, &requestTypeReset, requestTypeReset, readWriteModel.NewRequestTermination(), *cbusOptions)
cBusMessage := readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)
- receivedResetEchoChan := make(chan bool)
- receivedResetEchoErrorChan := make(chan error)
+ receivedResetEchoChan := make(chan bool, 1)
+ receivedResetEchoErrorChan := make(chan error, 1)
if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
switch message := message.(type) {
case readWriteModel.CBusMessageToClientExactly:
@@ -276,16 +282,25 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
switch message.(type) {
case readWriteModel.CBusMessageToClientExactly:
// This is the powerup notification
- go func() { receivedResetEchoChan <- false }()
+ select {
+ case receivedResetEchoChan <- false:
+ default:
+ }
case readWriteModel.CBusMessageToServerExactly:
// This is the echo
- go func() { receivedResetEchoChan <- true }()
+ select {
+ case receivedResetEchoChan <- true:
+ default:
+ }
default:
return errors.Errorf("Unmapped type %T", message)
}
return nil
}, func(err error) error {
- receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request")
+ select {
+ case receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request"):
+ default:
+ }
return nil
}, c.GetTtl()); err != nil {
if sendOutErrorNotification {
@@ -376,8 +391,8 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon
directCommand := readWriteModel.NewRequestDirectCommandAccess(calData /*we don't want a alpha otherwise the PCI will auto-switch*/, nil, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
cBusMessage := readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)
- directCommandAckChan := make(chan bool)
- directCommandAckErrorChan := make(chan error)
+ directCommandAckChan := make(chan bool, 1)
+ directCommandAckErrorChan := make(chan error, 1)
if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
switch message := message.(type) {
case readWriteModel.CBusMessageToClientExactly:
@@ -410,7 +425,10 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon
switch data := encodedReply.GetCalReply().GetCalData().(type) {
case readWriteModel.CALDataAcknowledgeExactly:
if data.GetParamNo() == paramNo {
- directCommandAckChan <- true
+ select {
+ case directCommandAckChan <- true:
+ default:
+ }
}
}
}
@@ -419,7 +437,10 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon
}
return nil
}, func(err error) error {
- directCommandAckErrorChan <- errors.Wrap(err, "got error processing request")
+ select {
+ case directCommandAckErrorChan <- errors.Wrap(err, "got error processing request"):
+ default:
+ }
return nil
}, c.GetTtl()); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error during sending of write request"), ch)