You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/15 15:18:53 UTC

[plc4x] branch develop updated: fix(plc4go): fix concurrency issues

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new e6f661eab5 fix(plc4go): fix concurrency issues
e6f661eab5 is described below

commit e6f661eab583e06e014c25ea92851fea8e3c48dc
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Jun 15 17:18:46 2023 +0200

    fix(plc4go): fix concurrency issues
---
 plc4go/internal/cbus/Connection.go      | 22 ++++++++++++----------
 plc4go/internal/cbus/Connection_test.go |  2 ++
 plc4go/spi/pool/dynamicExecutor.go      |  8 +++++++-
 3 files changed, 21 insertions(+), 11 deletions(-)

diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 7a1652e17c..ec7a0b739f 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -247,54 +247,56 @@ func (c *Connection) startSubscriptionHandler() {
 	c.log.Debug().Msg("Starting SAL handler")
 	c.handlerWaitGroup.Add(1)
 	go func() {
+		salLogger := c.log.With().Str("handlerType", "SAL").Logger()
 		defer c.handlerWaitGroup.Done()
 		defer func() {
 			if err := recover(); err != nil {
-				c.log.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
+				salLogger.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
 			}
 		}()
-		c.log.Debug().Msg("SAL handler stated")
+		salLogger.Debug().Msg("SAL handler started")
 		for c.IsConnected() {
 			for monitoredSal := range c.messageCodec.monitoredSALs {
 				handled := false
 				for _, subscriber := range c.subscribers {
 					if ok := subscriber.handleMonitoredSAL(monitoredSal); ok {
-						c.log.Debug().Msgf("\n%v handled\n%s", subscriber, monitoredSal)
+						salLogger.Debug().Msgf("\n%v handled\n%s", subscriber, monitoredSal)
 						handled = true
 					}
 				}
 				if !handled {
-					c.log.Debug().Msgf("SAL was not handled:\n%s", monitoredSal)
+					salLogger.Debug().Msgf("SAL was not handled:\n%s", monitoredSal)
 				}
 			}
 		}
-		c.log.Info().Msg("Ending SAL handler")
+		salLogger.Info().Msg("Ending SAL handler")
 	}()
 	c.log.Debug().Msg("Starting MMI handler")
 	c.handlerWaitGroup.Add(1)
 	go func() {
+		mmiLogger := c.log.With().Str("handlerType", "MMI").Logger()
 		defer c.handlerWaitGroup.Done()
 		defer func() {
 			if err := recover(); err != nil {
-				c.log.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
+				mmiLogger.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
 			}
 		}()
-		c.log.Debug().Msg("default MMI started")
+		mmiLogger.Debug().Msg("default MMI started")
 		for c.IsConnected() {
 			for calReply := range c.messageCodec.monitoredMMIs {
 				handled := false
 				for _, subscriber := range c.subscribers {
 					if ok := subscriber.handleMonitoredMMI(calReply); ok {
-						c.log.Debug().Msgf("\n%v handled\n%s", subscriber, calReply)
+						mmiLogger.Debug().Msgf("\n%v handled\n%s", subscriber, calReply)
 						handled = true
 					}
 				}
 				if !handled {
-					c.log.Debug().Msgf("MMI was not handled:\n%s", calReply)
+					mmiLogger.Debug().Msgf("MMI was not handled:\n%s", calReply)
 				}
 			}
 		}
-		c.log.Info().Msg("Ending MMI handler")
+		mmiLogger.Info().Msg("Ending MMI handler")
 	}()
 }
 
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 4a39a2d69a..65c66ba0f3 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -1789,6 +1789,8 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
 				fields.DefaultConnection = defaultConnection
 
 				codec := NewMessageCodec(nil, _options...)
+				codec.monitoredMMIs = make(chan readWriteModel.CALReply, 1)
+				codec.monitoredSALs = make(chan readWriteModel.MonitoredSAL, 1)
 				go func() {
 					codec.monitoredMMIs <- nil
 					codec.monitoredSALs <- nil
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index 12a39665ca..7a1b7d5381 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -126,6 +126,7 @@ func (e *dynamicExecutor) Start() {
 		for e.IsRunning() {
 			workerLog.Trace().Msg("running")
 			mutex.Lock()
+			workersChanged := false
 			newWorkers := make([]*worker, 0)
 			for _, _worker := range e.worker {
 				deadline := time.Now().Add(-timeToBecomeUnused)
@@ -138,9 +139,14 @@ func (e *dynamicExecutor) Start() {
 				} else {
 					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
 					newWorkers = append(newWorkers, _worker)
+					workersChanged = true
 				}
 			}
-			e.worker = newWorkers
+			if workersChanged {
+				e.stateChange.Lock()
+				e.worker = newWorkers
+				e.stateChange.Unlock()
+			}
 			mutex.Unlock()
 			func() {
 				workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)