You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/21 16:12:08 UTC

[plc4x] 02/03: refactor(plc4go/spi): move worker starting into a own method

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 430655fdf995364e800a7a5b5e938dd42cebcafb
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Jun 21 17:47:35 2023 +0200

    refactor(plc4go/spi): move worker starting into a own method
---
 plc4go/spi/default/DefaultCodec.go      | 20 ++++++-----
 plc4go/spi/default/DefaultCodec_test.go | 61 +++++++++++++++++++++++++--------
 2 files changed, 58 insertions(+), 23 deletions(-)

diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 209dff1cd8..6791b150d2 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -149,8 +149,7 @@ func (m *defaultCodec) ConnectWithContext(ctx context.Context) error {
 	}
 
 	m.log.Debug().Msg("Message codec currently not running, starting worker now")
-	m.activeWorker.Add(1)
-	go m.Work(m.DefaultCodecRequirements)
+	m.startWorker()
 	m.running.Store(true)
 	m.log.Trace().Msg("connected")
 	return nil
@@ -277,7 +276,13 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool {
 	return messageHandled
 }
 
-func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
+func (m *defaultCodec) startWorker() {
+	m.log.Trace().Msg("starting worker")
+	m.activeWorker.Add(1)
+	go m.Work()
+}
+
+func (m *defaultCodec) Work() {
 	defer m.activeWorker.Done()
 	workerLog := m.log.With().Logger()
 	if !m.traceDefaultMessageCodecWorker {
@@ -286,19 +291,18 @@ func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
 	workerLog.Trace().Msg("Starting work")
 	defer workerLog.Trace().Msg("work ended")
 
-	defer func(workerLog zerolog.Logger) {
+	defer func() {
 		if err := recover(); err != nil {
 			// TODO: If this is an error, cast it to an error and log it with "Err(err)"
 			m.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
 		}
 		if m.running.Load() {
 			workerLog.Warn().Msg("Keep running")
-			m.activeWorker.Add(1)
-			go m.Work(codec)
+			m.startWorker()
 		} else {
 			workerLog.Info().Msg("Worker terminated")
 		}
-	}(workerLog)
+	}()
 
 	// Start an endless loop
 mainLoop:
@@ -361,7 +365,7 @@ mainLoop:
 		if m.customMessageHandling != nil {
 			workerLog.Trace().Msg("Executing custom handling")
 			start := time.Now()
-			handled := m.customMessageHandling(codec, message)
+			handled := m.customMessageHandling(m.DefaultCodecRequirements, message)
 			workerLog.Trace().Msgf("custom handling took %s", time.Since(start))
 			if handled {
 				workerLog.Trace().Msg("Custom handling handled the message")
diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go
index c0d00b5ac0..ca46f49059 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -884,14 +884,10 @@ func Test_defaultCodec_Work(t *testing.T) {
 		running                       bool
 		customMessageHandling         func(codec DefaultCodecRequirements, message spi.Message) bool
 	}
-	type args struct {
-		codec DefaultCodecRequirements
-	}
 	tests := []struct {
 		name        string
 		fields      fields
-		args        args
-		mockSetup   func(t *testing.T, fields *fields, args *args)
+		mockSetup   func(t *testing.T, fields *fields)
 		manipulator func(t *testing.T, codec *defaultCodec)
 	}{
 		{
@@ -900,7 +896,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 				codec.running.Store(true)
 				codec.activeWorker.Add(1)
 			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
+			mockSetup: func(t *testing.T, fields *fields) {
 				requirements := NewMockDefaultCodecRequirements(t)
 				fields.DefaultCodecRequirements = requirements
 			},
@@ -941,7 +937,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 					},
 				},
 			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
+			mockSetup: func(t *testing.T, fields *fields) {
 				requirements := NewMockDefaultCodecRequirements(t)
 				requirements.EXPECT().Receive().Return(nil, errors.New("nope"))
 				fields.DefaultCodecRequirements = requirements
@@ -987,7 +983,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 					},
 				},
 			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
+			mockSetup: func(t *testing.T, fields *fields) {
 				requirements := NewMockDefaultCodecRequirements(t)
 				requirements.EXPECT().Receive().Return(nil, nil)
 				fields.DefaultCodecRequirements = requirements
@@ -1033,7 +1029,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 					},
 				},
 			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
+			mockSetup: func(t *testing.T, fields *fields) {
 				requirements := NewMockDefaultCodecRequirements(t)
 				requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
 				fields.DefaultCodecRequirements = requirements
@@ -1057,7 +1053,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 					},
 				},
 			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
+			mockSetup: func(t *testing.T, fields *fields) {
 				requirements := NewMockDefaultCodecRequirements(t)
 				requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
 				fields.DefaultCodecRequirements = requirements
@@ -1103,7 +1099,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 					},
 				},
 			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
+			mockSetup: func(t *testing.T, fields *fields) {
 				requirements := NewMockDefaultCodecRequirements(t)
 				requirements.EXPECT().Receive().Return(nil, errors.New("nope"))
 				fields.DefaultCodecRequirements = requirements
@@ -1152,7 +1148,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 					},
 				},
 			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
+			mockSetup: func(t *testing.T, fields *fields) {
 				requirements := NewMockDefaultCodecRequirements(t)
 				requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
 				fields.DefaultCodecRequirements = requirements
@@ -1201,7 +1197,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 					},
 				},
 			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
+			mockSetup: func(t *testing.T, fields *fields) {
 				requirements := NewMockDefaultCodecRequirements(t)
 				requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
 				fields.DefaultCodecRequirements = requirements
@@ -1215,7 +1211,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			if tt.mockSetup != nil {
-				tt.mockSetup(t, &tt.fields, &tt.args)
+				tt.mockSetup(t, &tt.fields)
 			}
 			m := &defaultCodec{
 				DefaultCodecRequirements:      tt.fields.DefaultCodecRequirements,
@@ -1232,7 +1228,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 				time.Sleep(200 * time.Millisecond)
 				m.running.Store(false)
 			}()
-			m.Work(tt.args.codec)
+			m.Work()
 		})
 	}
 }
@@ -1300,3 +1296,38 @@ func Test_defaultCodec_String(t *testing.T) {
 		})
 	}
 }
+
+func Test_defaultCodec_startWorker(t *testing.T) {
+	type fields struct {
+		DefaultCodecRequirements       DefaultCodecRequirements
+		transportInstance              transports.TransportInstance
+		expectations                   []spi.Expectation
+		defaultIncomingMessageChannel  chan spi.Message
+		customMessageHandling          func(codec DefaultCodecRequirements, message spi.Message) bool
+		receiveTimeout                 time.Duration
+		traceDefaultMessageCodecWorker bool
+	}
+	tests := []struct {
+		name   string
+		fields fields
+	}{
+		{
+			name: "start it not running",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			m := &defaultCodec{
+				DefaultCodecRequirements:       tt.fields.DefaultCodecRequirements,
+				transportInstance:              tt.fields.transportInstance,
+				expectations:                   tt.fields.expectations,
+				defaultIncomingMessageChannel:  tt.fields.defaultIncomingMessageChannel,
+				customMessageHandling:          tt.fields.customMessageHandling,
+				receiveTimeout:                 tt.fields.receiveTimeout,
+				traceDefaultMessageCodecWorker: tt.fields.traceDefaultMessageCodecWorker,
+				log:                            testutils.ProduceTestingLogger(t),
+			}
+			m.startWorker()
+		})
+	}
+}