You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/15 15:18:53 UTC
[plc4x] branch develop updated: fix(plc4go): fix concurrency issues
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new e6f661eab5 fix(plc4go): fix concurrency issues
e6f661eab5 is described below
commit e6f661eab583e06e014c25ea92851fea8e3c48dc
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Jun 15 17:18:46 2023 +0200
fix(plc4go): fix concurrency issues
---
plc4go/internal/cbus/Connection.go | 22 ++++++++++++----------
plc4go/internal/cbus/Connection_test.go | 2 ++
plc4go/spi/pool/dynamicExecutor.go | 8 +++++++-
3 files changed, 21 insertions(+), 11 deletions(-)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 7a1652e17c..ec7a0b739f 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -247,54 +247,56 @@ func (c *Connection) startSubscriptionHandler() {
c.log.Debug().Msg("Starting SAL handler")
c.handlerWaitGroup.Add(1)
go func() {
+ salLogger := c.log.With().Str("handlerType", "SAL").Logger()
defer c.handlerWaitGroup.Done()
defer func() {
if err := recover(); err != nil {
- c.log.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
+ salLogger.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
}
}()
- c.log.Debug().Msg("SAL handler stated")
+ salLogger.Debug().Msg("SAL handler started")
for c.IsConnected() {
for monitoredSal := range c.messageCodec.monitoredSALs {
handled := false
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredSAL(monitoredSal); ok {
- c.log.Debug().Msgf("\n%v handled\n%s", subscriber, monitoredSal)
+ salLogger.Debug().Msgf("\n%v handled\n%s", subscriber, monitoredSal)
handled = true
}
}
if !handled {
- c.log.Debug().Msgf("SAL was not handled:\n%s", monitoredSal)
+ salLogger.Debug().Msgf("SAL was not handled:\n%s", monitoredSal)
}
}
}
- c.log.Info().Msg("Ending SAL handler")
+ salLogger.Info().Msg("Ending SAL handler")
}()
c.log.Debug().Msg("Starting MMI handler")
c.handlerWaitGroup.Add(1)
go func() {
+ mmiLogger := c.log.With().Str("handlerType", "MMI").Logger()
defer c.handlerWaitGroup.Done()
defer func() {
if err := recover(); err != nil {
- c.log.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
+ mmiLogger.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
}
}()
- c.log.Debug().Msg("default MMI started")
+ mmiLogger.Debug().Msg("default MMI started")
for c.IsConnected() {
for calReply := range c.messageCodec.monitoredMMIs {
handled := false
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredMMI(calReply); ok {
- c.log.Debug().Msgf("\n%v handled\n%s", subscriber, calReply)
+ mmiLogger.Debug().Msgf("\n%v handled\n%s", subscriber, calReply)
handled = true
}
}
if !handled {
- c.log.Debug().Msgf("MMI was not handled:\n%s", calReply)
+ mmiLogger.Debug().Msgf("MMI was not handled:\n%s", calReply)
}
}
}
- c.log.Info().Msg("Ending MMI handler")
+ mmiLogger.Info().Msg("Ending MMI handler")
}()
}
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 4a39a2d69a..65c66ba0f3 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -1789,6 +1789,8 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
fields.DefaultConnection = defaultConnection
codec := NewMessageCodec(nil, _options...)
+ codec.monitoredMMIs = make(chan readWriteModel.CALReply, 1)
+ codec.monitoredSALs = make(chan readWriteModel.MonitoredSAL, 1)
go func() {
codec.monitoredMMIs <- nil
codec.monitoredSALs <- nil
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index 12a39665ca..7a1b7d5381 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -126,6 +126,7 @@ func (e *dynamicExecutor) Start() {
for e.IsRunning() {
workerLog.Trace().Msg("running")
mutex.Lock()
+ workersChanged := false
newWorkers := make([]*worker, 0)
for _, _worker := range e.worker {
deadline := time.Now().Add(-timeToBecomeUnused)
@@ -138,9 +139,14 @@ func (e *dynamicExecutor) Start() {
} else {
workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
newWorkers = append(newWorkers, _worker)
+ workersChanged = true
}
}
- e.worker = newWorkers
+ if workersChanged {
+ e.stateChange.Lock()
+ e.worker = newWorkers
+ e.stateChange.Unlock()
+ }
mutex.Unlock()
func() {
workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)