You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/21 16:12:08 UTC
[plc4x] 02/03: refactor(plc4go/spi): move worker starting into a own method
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 430655fdf995364e800a7a5b5e938dd42cebcafb
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Jun 21 17:47:35 2023 +0200
refactor(plc4go/spi): move worker starting into a own method
---
plc4go/spi/default/DefaultCodec.go | 20 ++++++-----
plc4go/spi/default/DefaultCodec_test.go | 61 +++++++++++++++++++++++++--------
2 files changed, 58 insertions(+), 23 deletions(-)
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 209dff1cd8..6791b150d2 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -149,8 +149,7 @@ func (m *defaultCodec) ConnectWithContext(ctx context.Context) error {
}
m.log.Debug().Msg("Message codec currently not running, starting worker now")
- m.activeWorker.Add(1)
- go m.Work(m.DefaultCodecRequirements)
+ m.startWorker()
m.running.Store(true)
m.log.Trace().Msg("connected")
return nil
@@ -277,7 +276,13 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool {
return messageHandled
}
-func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
+func (m *defaultCodec) startWorker() {
+ m.log.Trace().Msg("starting worker")
+ m.activeWorker.Add(1)
+ go m.Work()
+}
+
+func (m *defaultCodec) Work() {
defer m.activeWorker.Done()
workerLog := m.log.With().Logger()
if !m.traceDefaultMessageCodecWorker {
@@ -286,19 +291,18 @@ func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
workerLog.Trace().Msg("Starting work")
defer workerLog.Trace().Msg("work ended")
- defer func(workerLog zerolog.Logger) {
+ defer func() {
if err := recover(); err != nil {
// TODO: If this is an error, cast it to an error and log it with "Err(err)"
m.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
}
if m.running.Load() {
workerLog.Warn().Msg("Keep running")
- m.activeWorker.Add(1)
- go m.Work(codec)
+ m.startWorker()
} else {
workerLog.Info().Msg("Worker terminated")
}
- }(workerLog)
+ }()
// Start an endless loop
mainLoop:
@@ -361,7 +365,7 @@ mainLoop:
if m.customMessageHandling != nil {
workerLog.Trace().Msg("Executing custom handling")
start := time.Now()
- handled := m.customMessageHandling(codec, message)
+ handled := m.customMessageHandling(m.DefaultCodecRequirements, message)
workerLog.Trace().Msgf("custom handling took %s", time.Since(start))
if handled {
workerLog.Trace().Msg("Custom handling handled the message")
diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go
index c0d00b5ac0..ca46f49059 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -884,14 +884,10 @@ func Test_defaultCodec_Work(t *testing.T) {
running bool
customMessageHandling func(codec DefaultCodecRequirements, message spi.Message) bool
}
- type args struct {
- codec DefaultCodecRequirements
- }
tests := []struct {
name string
fields fields
- args args
- mockSetup func(t *testing.T, fields *fields, args *args)
+ mockSetup func(t *testing.T, fields *fields)
manipulator func(t *testing.T, codec *defaultCodec)
}{
{
@@ -900,7 +896,7 @@ func Test_defaultCodec_Work(t *testing.T) {
codec.running.Store(true)
codec.activeWorker.Add(1)
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
fields.DefaultCodecRequirements = requirements
},
@@ -941,7 +937,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(nil, errors.New("nope"))
fields.DefaultCodecRequirements = requirements
@@ -987,7 +983,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(nil, nil)
fields.DefaultCodecRequirements = requirements
@@ -1033,7 +1029,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
fields.DefaultCodecRequirements = requirements
@@ -1057,7 +1053,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
fields.DefaultCodecRequirements = requirements
@@ -1103,7 +1099,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(nil, errors.New("nope"))
fields.DefaultCodecRequirements = requirements
@@ -1152,7 +1148,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
fields.DefaultCodecRequirements = requirements
@@ -1201,7 +1197,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
fields.DefaultCodecRequirements = requirements
@@ -1215,7 +1211,7 @@ func Test_defaultCodec_Work(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.mockSetup != nil {
- tt.mockSetup(t, &tt.fields, &tt.args)
+ tt.mockSetup(t, &tt.fields)
}
m := &defaultCodec{
DefaultCodecRequirements: tt.fields.DefaultCodecRequirements,
@@ -1232,7 +1228,7 @@ func Test_defaultCodec_Work(t *testing.T) {
time.Sleep(200 * time.Millisecond)
m.running.Store(false)
}()
- m.Work(tt.args.codec)
+ m.Work()
})
}
}
@@ -1300,3 +1296,38 @@ func Test_defaultCodec_String(t *testing.T) {
})
}
}
+
+func Test_defaultCodec_startWorker(t *testing.T) {
+ type fields struct {
+ DefaultCodecRequirements DefaultCodecRequirements
+ transportInstance transports.TransportInstance
+ expectations []spi.Expectation
+ defaultIncomingMessageChannel chan spi.Message
+ customMessageHandling func(codec DefaultCodecRequirements, message spi.Message) bool
+ receiveTimeout time.Duration
+ traceDefaultMessageCodecWorker bool
+ }
+ tests := []struct {
+ name string
+ fields fields
+ }{
+ {
+ name: "start it not running",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := &defaultCodec{
+ DefaultCodecRequirements: tt.fields.DefaultCodecRequirements,
+ transportInstance: tt.fields.transportInstance,
+ expectations: tt.fields.expectations,
+ defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
+ customMessageHandling: tt.fields.customMessageHandling,
+ receiveTimeout: tt.fields.receiveTimeout,
+ traceDefaultMessageCodecWorker: tt.fields.traceDefaultMessageCodecWorker,
+ log: testutils.ProduceTestingLogger(t),
+ }
+ m.startWorker()
+ })
+ }
+}