You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/17 09:53:28 UTC

[plc4x] branch develop updated: fix(plc4go/cbus): fixed go routine leak

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new a64125789 fix(plc4go/cbus): fixed go routine leak
a64125789 is described below

commit a6412578904c05634bc6124aa7b3b1c1811e50a6
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 11:53:22 2022 +0200

    fix(plc4go/cbus): fixed go routine leak
    
    https://go.dev/blog/concurrency-timeouts
---
 plc4go/internal/cbus/Connection.go | 69 +++++++++++++++++++++++++-------------
 1 file changed, 45 insertions(+), 24 deletions(-)

diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index a6d51a302..96c0ebfff 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -205,22 +205,11 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
 		return
 	}
 	c.fireConnected(ch)
+	c.startSubscriptionHandler()
+	c.startDefaultIncomingMessageHandler()
+}
 
-	log.Debug().Msg("Starting subscription handler")
-	go func() {
-		log.Debug().Msg("Subscription handler stated")
-		for c.IsConnected() {
-			for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
-				for _, subscriber := range c.subscribers {
-					if ok := subscriber.handleMonitoredSal(monitoredSal); ok {
-						log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
-					}
-				}
-			}
-		}
-		log.Info().Msg("Ending subscription handler")
-	}()
-
+func (c *Connection) startDefaultIncomingMessageHandler() {
 	log.Debug().Msg("Starting default incoming message handler")
 	go func() {
 		log.Debug().Msg("default incoming message handler started")
@@ -252,14 +241,31 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
 	}()
 }
 
+func (c *Connection) startSubscriptionHandler() {
+	log.Debug().Msg("Starting subscription handler")
+	go func() {
+		log.Debug().Msg("Subscription handler stated")
+		for c.IsConnected() {
+			for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
+				for _, subscriber := range c.subscribers {
+					if ok := subscriber.handleMonitoredSal(monitoredSal); ok {
+						log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
+					}
+				}
+			}
+		}
+		log.Info().Msg("Ending subscription handler")
+	}()
+}
+
 func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, requestContext *readWriteModel.RequestContext, sendOutErrorNotification bool) (ok bool) {
 	log.Debug().Msgf("Send a reset (sendOutErrorNotification: %t)", sendOutErrorNotification)
 	requestTypeReset := readWriteModel.RequestType_RESET
 	requestReset := readWriteModel.NewRequestReset(requestTypeReset, &requestTypeReset, requestTypeReset, &requestTypeReset, requestTypeReset, nil, &requestTypeReset, requestTypeReset, readWriteModel.NewRequestTermination(), *cbusOptions)
 	cBusMessage := readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)
 
-	receivedResetEchoChan := make(chan bool)
-	receivedResetEchoErrorChan := make(chan error)
+	receivedResetEchoChan := make(chan bool, 1)
+	receivedResetEchoErrorChan := make(chan error, 1)
 	if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
 		switch message := message.(type) {
 		case readWriteModel.CBusMessageToClientExactly:
@@ -276,16 +282,25 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
 		switch message.(type) {
 		case readWriteModel.CBusMessageToClientExactly:
 			// This is the powerup notification
-			go func() { receivedResetEchoChan <- false }()
+			select {
+			case receivedResetEchoChan <- false:
+			default:
+			}
 		case readWriteModel.CBusMessageToServerExactly:
 			// This is the echo
-			go func() { receivedResetEchoChan <- true }()
+			select {
+			case receivedResetEchoChan <- true:
+			default:
+			}
 		default:
 			return errors.Errorf("Unmapped type %T", message)
 		}
 		return nil
 	}, func(err error) error {
-		receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request")
+		select {
+		case receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request"):
+		default:
+		}
 		return nil
 	}, c.GetTtl()); err != nil {
 		if sendOutErrorNotification {
@@ -376,8 +391,8 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon
 	directCommand := readWriteModel.NewRequestDirectCommandAccess(calData /*we don't want a alpha otherwise the PCI will auto-switch*/, nil, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
 	cBusMessage := readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)
 
-	directCommandAckChan := make(chan bool)
-	directCommandAckErrorChan := make(chan error)
+	directCommandAckChan := make(chan bool, 1)
+	directCommandAckErrorChan := make(chan error, 1)
 	if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
 		switch message := message.(type) {
 		case readWriteModel.CBusMessageToClientExactly:
@@ -410,7 +425,10 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon
 						switch data := encodedReply.GetCalReply().GetCalData().(type) {
 						case readWriteModel.CALDataAcknowledgeExactly:
 							if data.GetParamNo() == paramNo {
-								directCommandAckChan <- true
+								select {
+								case directCommandAckChan <- true:
+								default:
+								}
 							}
 						}
 					}
@@ -419,7 +437,10 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon
 		}
 		return nil
 	}, func(err error) error {
-		directCommandAckErrorChan <- errors.Wrap(err, "got error processing request")
+		select {
+		case directCommandAckErrorChan <- errors.Wrap(err, "got error processing request"):
+		default:
+		}
 		return nil
 	}, c.GetTtl()); err != nil {
 		c.fireConnectionError(errors.Wrap(err, "Error during sending of write request"), ch)