You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 16:46:17 UTC

[plc4x] branch develop updated (72432316d4 -> bf275e23d1)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 72432316d4 test(plc4go/cbus): fix failing new driver context test
     new 40cd416bca test(plc4go/spi): only set timeformat to nano when flag is set
     new ade41801b7 test(plc4go/spi): ensure no global logger is set
     new 9db3034ea5 fix(plc4go/spi): fix timeout output using the wrong duration
     new 65796b03e9 feat(plc4go/spi): improve logging for request transaction
     new 8e0f219335 feat(plc4go/spi): time custom message handling and don't block when default message channel is not being drained
     new 04662cf6d5 feat(plc4go/cbus): improved logging
     new bf275e23d1 fix(plc4go/cbus): fix error when reader doesn't get a alpha capable response

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/cbus/Browser.go                    |  35 ++---
 plc4go/internal/cbus/Browser_test.go               |  73 ++++------
 plc4go/internal/cbus/Connection.go                 |   6 +-
 plc4go/internal/cbus/MessageCodec.go               |  18 +--
 plc4go/internal/cbus/Reader.go                     | 159 ++++++++++++---------
 plc4go/internal/cbus/Reader_test.go                |  67 ++-------
 plc4go/spi/MessageCodec.go                         |   1 +
 plc4go/spi/default/DefaultCodec.go                 |  51 ++-----
 plc4go/spi/default/DefaultCodec_test.go            |  82 +++++------
 plc4go/spi/default/defaultExpectation.go           |  65 +++++++++
 plc4go/spi/default/mock_Expectation_test.go        |  41 ++++++
 plc4go/spi/mock_Expectation_test.go                |  41 ++++++
 plc4go/spi/pool/executor_test.go                   |   6 +-
 plc4go/spi/testutils/TestUtils.go                  |   4 +-
 plc4go/spi/transactions/RequestTransaction.go      |   9 +-
 .../transactions/RequestTransactionManager_test.go |   5 +-
 .../spi/transports/utils/TransportLogger_test.go   |   3 +-
 plc4go/spi/utils/Errors.go                         |   2 +-
 18 files changed, 377 insertions(+), 291 deletions(-)
 create mode 100644 plc4go/spi/default/defaultExpectation.go


[plc4x] 07/07: fix(plc4go/cbus): fix error when reader doesn't get a alpha capable response

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit bf275e23d1f043f70ff8328d320eb40d1ebd0180
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:43:53 2023 +0200

    fix(plc4go/cbus): fix error when reader doesn't get a alpha capable response
---
 plc4go/internal/cbus/Reader.go      | 15 ++++++---
 plc4go/internal/cbus/Reader_test.go | 67 ++++++++-----------------------------
 2 files changed, 25 insertions(+), 57 deletions(-)

diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 3a7063cb8f..55015da183 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -139,14 +139,16 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS
 func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
 	// Send the over the wire
 	m.log.Trace().Msg("send over the wire")
+	ttl := time.Second * 5
 	if deadline, ok := ctx.Deadline(); ok {
-		m.log.Debug().Msgf("Message expires in %s", deadline.Sub(time.Now()))
+		ttl = -time.Since(deadline)
+		m.log.Debug().Msgf("setting ttl to %s", ttl)
 	}
 	if err := m.messageCodec.SendRequest(
 		ctx,
 		messageToSend,
 		func(cbusMessage spi.Message) bool {
-			m.log.Trace().Msgf("Checking\n%T", cbusMessage)
+			m.log.Trace().Msgf("Checking %T", cbusMessage)
 			messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
 			if !ok {
 				m.log.Trace().Msg("Not a message to client")
@@ -166,7 +168,12 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
 			}
 			actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
 			// TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
-			expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
+			alphaRetriever, ok := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha })
+			if !ok {
+				m.log.Trace().Msg("no alpha there")
+				return false
+			}
+			expectedAlpha := alphaRetriever.GetAlpha().GetCharacter()
 			m.log.Trace().Msgf("Comparing expected alpha '%c' to actual alpha '%c'", expectedAlpha, actualAlpha)
 			return actualAlpha == expectedAlpha
 		},
@@ -223,7 +230,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
 			addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
 			return transaction.FailRequest(err)
 		},
-		time.Second*5); err != nil {
+		ttl); err != nil {
 		m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
 		addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
 		if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index cc017ac3a0..e66c1d7534 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -234,11 +234,7 @@ func TestReader_readSync(t *testing.T) {
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 			},
 			args: args{
-				ctx: func() context.Context {
-					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-					t.Cleanup(cancel)
-					return timeout
-				}(),
+				ctx: testutils.TestContext(t),
 				readRequest: spiModel.NewDefaultPlcReadRequest(
 					map[string]apiModel.PlcTag{
 						"blub": NewCALIdentifyTag(readWriteModel.NewUnitAddress(2), nil, readWriteModel.Attribute_Type, 1),
@@ -397,11 +393,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 			},
 			args: args{
-				ctx: func() context.Context {
-					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-					t.Cleanup(cancel)
-					return timeout
-				}(),
+				ctx:           testutils.TestContext(t),
 				messageToSend: nil,
 				addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
@@ -440,16 +432,12 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 			},
 		},
 		{
-			name: "Send message which responds with message to client",
+			name: "Send message which responds with message to server",
 			fields: fields{
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 			},
 			args: args{
-				ctx: func() context.Context {
-					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-					t.Cleanup(cancel)
-					return timeout
-				}(),
+				ctx: testutils.TestContext(t),
 				messageToSend: readWriteModel.NewCBusMessageToServer(
 					readWriteModel.NewRequestReset(
 						readWriteModel.RequestType_RESET,
@@ -470,7 +458,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
 						t.Logf("Got response code %s for %s", responseCode, name)
 						assert.Equal(t, "horst", name)
-						assert.Equal(t, apiModel.PlcResponseCode_REQUEST_TIMEOUT, responseCode)
+						assert.Equal(t, apiModel.PlcResponseCode_INTERNAL_ERROR, responseCode)
 					}
 				},
 				tagName: "horst",
@@ -527,11 +515,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 			},
 			args: args{
-				ctx: func() context.Context {
-					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-					t.Cleanup(cancel)
-					return timeout
-				}(),
+				ctx: testutils.TestContext(t),
 				messageToSend: readWriteModel.NewCBusMessageToServer(
 					readWriteModel.NewRequestReset(
 						readWriteModel.RequestType_RESET,
@@ -610,11 +594,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 			},
 			args: args{
-				ctx: func() context.Context {
-					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-					t.Cleanup(cancel)
-					return timeout
-				}(),
+				ctx: testutils.TestContext(t),
 				messageToSend: readWriteModel.NewCBusMessageToServer(
 					readWriteModel.NewRequestDirectCommandAccess(
 						readWriteModel.NewCALDataIdentify(
@@ -695,11 +675,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 			},
 			args: args{
-				ctx: func() context.Context {
-					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-					t.Cleanup(cancel)
-					return timeout
-				}(),
+				ctx: testutils.TestContext(t),
 				messageToSend: readWriteModel.NewCBusMessageToServer(
 					readWriteModel.NewRequestDirectCommandAccess(
 						readWriteModel.NewCALDataIdentify(
@@ -780,11 +756,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 			},
 			args: args{
-				ctx: func() context.Context {
-					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-					t.Cleanup(cancel)
-					return timeout
-				}(),
+				ctx: testutils.TestContext(t),
 				messageToSend: readWriteModel.NewCBusMessageToServer(
 					readWriteModel.NewRequestDirectCommandAccess(
 						readWriteModel.NewCALDataIdentify(
@@ -865,11 +837,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 			},
 			args: args{
-				ctx: func() context.Context {
-					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-					t.Cleanup(cancel)
-					return timeout
-				}(),
+				ctx: testutils.TestContext(t),
 				messageToSend: readWriteModel.NewCBusMessageToServer(
 					readWriteModel.NewRequestDirectCommandAccess(
 						readWriteModel.NewCALDataIdentify(
@@ -950,11 +918,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 			},
 			args: args{
-				ctx: func() context.Context {
-					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-					t.Cleanup(cancel)
-					return timeout
-				}(),
+				ctx: testutils.TestContext(t),
 				messageToSend: readWriteModel.NewCBusMessageToServer(
 					readWriteModel.NewRequestDirectCommandAccess(
 						readWriteModel.NewCALDataIdentify(
@@ -1035,11 +999,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 			},
 			args: args{
-				ctx: func() context.Context {
-					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-					t.Cleanup(cancel)
-					return timeout
-				}(),
+				ctx: testutils.TestContext(t),
 				messageToSend: readWriteModel.NewCBusMessageToServer(
 					readWriteModel.NewRequestDirectCommandAccess(
 						readWriteModel.NewCALDataIdentify(
@@ -1125,10 +1085,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				alphaGenerator: tt.fields.alphaGenerator,
 				messageCodec:   tt.fields.messageCodec,
 				tm:             tt.fields.tm,
+				log:            testutils.ProduceTestingLogger(t),
 			}
 			m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t), tt.args.tagName, tt.args.addPlcValue(t))
 			t.Log("Waiting now")
-			timer := time.NewTimer(3 * time.Second)
+			timer := time.NewTimer(10 * time.Second)
 			defer utils.CleanupTimer(timer)
 			select {
 			case <-ch:


[plc4x] 03/07: fix(plc4go/spi): fix timeout output using the wrong duration

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 9db3034ea5d4d1e27819accac73aab4b48afcedf
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 16:51:57 2023 +0200

    fix(plc4go/spi): fix timeout output using the wrong duration
---
 plc4go/spi/MessageCodec.go                  |  1 +
 plc4go/spi/default/DefaultCodec.go          | 38 ++-----------
 plc4go/spi/default/DefaultCodec_test.go     | 82 ++++++++++++++---------------
 plc4go/spi/default/defaultExpectation.go    | 65 +++++++++++++++++++++++
 plc4go/spi/default/mock_Expectation_test.go | 41 +++++++++++++++
 plc4go/spi/mock_Expectation_test.go         | 41 +++++++++++++++
 plc4go/spi/utils/Errors.go                  |  2 +-
 7 files changed, 193 insertions(+), 77 deletions(-)

diff --git a/plc4go/spi/MessageCodec.go b/plc4go/spi/MessageCodec.go
index 94826da578..4127c4c688 100644
--- a/plc4go/spi/MessageCodec.go
+++ b/plc4go/spi/MessageCodec.go
@@ -28,6 +28,7 @@ import (
 type Expectation interface {
 	fmt.Stringer
 	GetContext() context.Context
+	GetCreationTime() time.Time
 	GetExpiration() time.Time
 	GetAcceptsMessage() AcceptsMessage
 	GetHandleMessage() HandleMessage
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index e365c5b095..91c48d0ce9 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -21,7 +21,6 @@ package _default
 
 import (
 	"context"
-	"fmt"
 	"runtime/debug"
 	"sync"
 	"sync/atomic"
@@ -56,14 +55,6 @@ func NewDefaultCodec(requirements DefaultCodecRequirements, transportInstance tr
 	return buildDefaultCodec(requirements, transportInstance, options...)
 }
 
-type DefaultExpectation struct {
-	Context        context.Context
-	Expiration     time.Time
-	AcceptsMessage spi.AcceptsMessage
-	HandleMessage  spi.HandleMessage
-	HandleError    spi.HandleError
-}
-
 type CustomMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool
 
 func WithCustomMessageHandler(customMessageHandler CustomMessageHandler) options.WithOption {
@@ -126,30 +117,6 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
 ///////////////////////////////////////
 ///////////////////////////////////////
 
-func (d *DefaultExpectation) GetContext() context.Context {
-	return d.Context
-}
-
-func (d *DefaultExpectation) GetExpiration() time.Time {
-	return d.Expiration
-}
-
-func (d *DefaultExpectation) GetAcceptsMessage() spi.AcceptsMessage {
-	return d.AcceptsMessage
-}
-
-func (d *DefaultExpectation) GetHandleMessage() spi.HandleMessage {
-	return d.HandleMessage
-}
-
-func (d *DefaultExpectation) GetHandleError() spi.HandleError {
-	return d.HandleError
-}
-
-func (d *DefaultExpectation) String() string {
-	return fmt.Sprintf("Expectation(expires at %v)", d.Expiration)
-}
-
 func (m *defaultCodec) GetTransportInstance() transports.TransportInstance {
 	return m.transportInstance
 }
@@ -211,8 +178,9 @@ func (m *defaultCodec) IsRunning() bool {
 func (m *defaultCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
 	m.expectationsChangeMutex.Lock()
 	defer m.expectationsChangeMutex.Unlock()
-	expectation := &DefaultExpectation{
+	expectation := &defaultExpectation{
 		Context:        ctx,
+		CreationTime:   time.Now(),
 		Expiration:     time.Now().Add(ttl),
 		AcceptsMessage: acceptsMessage,
 		HandleMessage:  handleMessage,
@@ -247,7 +215,7 @@ func (m *defaultCodec) TimeoutExpectations(now time.Time) {
 			i--
 			// Call the error handler.
 			go func(expectation spi.Expectation) {
-				if err := expectation.GetHandleError()(utils.NewTimeoutError(now.Sub(expectation.GetExpiration()))); err != nil {
+				if err := expectation.GetHandleError()(utils.NewTimeoutError(expectation.GetExpiration().Sub(expectation.GetCreationTime()))); err != nil {
 					m.log.Error().Err(err).Msg("Got an error handling error on expectation")
 				}
 			}(expectation)
diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go
index c6829c64a5..19bde057ac 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -59,7 +59,7 @@ func TestDefaultExpectation_GetAcceptsMessage(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			m := &DefaultExpectation{
+			m := &defaultExpectation{
 				Context:        tt.fields.Context,
 				Expiration:     tt.fields.Expiration,
 				AcceptsMessage: tt.fields.AcceptsMessage,
@@ -90,7 +90,7 @@ func TestDefaultExpectation_GetContext(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			m := &DefaultExpectation{
+			m := &defaultExpectation{
 				Context:        tt.fields.Context,
 				Expiration:     tt.fields.Expiration,
 				AcceptsMessage: tt.fields.AcceptsMessage,
@@ -121,7 +121,7 @@ func TestDefaultExpectation_GetExpiration(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			m := &DefaultExpectation{
+			m := &defaultExpectation{
 				Context:        tt.fields.Context,
 				Expiration:     tt.fields.Expiration,
 				AcceptsMessage: tt.fields.AcceptsMessage,
@@ -156,7 +156,7 @@ func TestDefaultExpectation_GetHandleError(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			m := &DefaultExpectation{
+			m := &defaultExpectation{
 				Context:        tt.fields.Context,
 				Expiration:     tt.fields.Expiration,
 				AcceptsMessage: tt.fields.AcceptsMessage,
@@ -191,7 +191,7 @@ func TestDefaultExpectation_GetHandleMessage(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			m := &DefaultExpectation{
+			m := &defaultExpectation{
 				Context:        tt.fields.Context,
 				Expiration:     tt.fields.Expiration,
 				AcceptsMessage: tt.fields.AcceptsMessage,
@@ -223,7 +223,7 @@ func TestDefaultExpectation_String(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			m := &DefaultExpectation{
+			m := &defaultExpectation{
 				Context:        tt.fields.Context,
 				Expiration:     tt.fields.Expiration,
 				AcceptsMessage: tt.fields.AcceptsMessage,
@@ -617,12 +617,12 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
 			name: "handle some",
 			fields: fields{
 				expectations: []spi.Expectation{
-					&DefaultExpectation{ // doesn't accept
+					&defaultExpectation{ // doesn't accept
 						AcceptsMessage: func(_ spi.Message) bool {
 							return false
 						},
 					},
-					&DefaultExpectation{ // accepts but fails
+					&defaultExpectation{ // accepts but fails
 						AcceptsMessage: func(_ spi.Message) bool {
 							return true
 						},
@@ -633,7 +633,7 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
 							return nil
 						},
 					},
-					&DefaultExpectation{ // accepts but fails and fails to handle the error
+					&defaultExpectation{ // accepts but fails and fails to handle the error
 						AcceptsMessage: func(_ spi.Message) bool {
 							return true
 						},
@@ -644,7 +644,7 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
 							return errors.New("I failed completely")
 						},
 					},
-					&DefaultExpectation{ // accepts
+					&defaultExpectation{ // accepts
 						AcceptsMessage: func(_ spi.Message) bool {
 							return true
 						},
@@ -652,7 +652,7 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
 							return nil
 						},
 					},
-					&DefaultExpectation{ // accepts
+					&defaultExpectation{ // accepts
 						AcceptsMessage: func(_ spi.Message) bool {
 							return true
 						},
@@ -660,7 +660,7 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
 							return nil
 						},
 					},
-					&DefaultExpectation{ // accepts
+					&defaultExpectation{ // accepts
 						AcceptsMessage: func(_ spi.Message) bool {
 							return true
 						},
@@ -823,26 +823,26 @@ func Test_defaultCodec_TimeoutExpectations(t *testing.T) {
 			name: "timeout some",
 			fields: fields{
 				expectations: []spi.Expectation{
-					&DefaultExpectation{ // Expired
+					&defaultExpectation{ // Expired
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return nil
 						},
 					},
-					&DefaultExpectation{ // Expired errors
+					&defaultExpectation{ // Expired errors
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 					},
-					&DefaultExpectation{ // Fine
+					&defaultExpectation{ // Fine
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 						Expiration: time.Time{}.Add(3 * time.Hour),
 					},
-					&DefaultExpectation{ // Context error
+					&defaultExpectation{ // Context error
 						Context: func() context.Context {
 							ctx, cancelFunc := context.WithCancel(context.Background())
 							cancelFunc() // Cancel it instantly
@@ -902,26 +902,26 @@ func Test_defaultCodec_Work(t *testing.T) {
 			name: "work hard (panics everywhere)",
 			fields: fields{
 				expectations: []spi.Expectation{
-					&DefaultExpectation{ // Expired
+					&defaultExpectation{ // Expired
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return nil
 						},
 					},
-					&DefaultExpectation{ // Expired errors
+					&defaultExpectation{ // Expired errors
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 					},
-					&DefaultExpectation{ // Fine
+					&defaultExpectation{ // Fine
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 						Expiration: time.Time{}.Add(3 * time.Hour),
 					},
-					&DefaultExpectation{ // Context error
+					&defaultExpectation{ // Context error
 						Context: func() context.Context {
 							ctx, cancelFunc := context.WithCancel(context.Background())
 							cancelFunc() // Cancel it instantly
@@ -943,26 +943,26 @@ func Test_defaultCodec_Work(t *testing.T) {
 			name: "work harder (nil message)",
 			fields: fields{
 				expectations: []spi.Expectation{
-					&DefaultExpectation{ // Expired
+					&defaultExpectation{ // Expired
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return nil
 						},
 					},
-					&DefaultExpectation{ // Expired errors
+					&defaultExpectation{ // Expired errors
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 					},
-					&DefaultExpectation{ // Fine
+					&defaultExpectation{ // Fine
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 						Expiration: time.Time{}.Add(3 * time.Hour),
 					},
-					&DefaultExpectation{ // Context error
+					&defaultExpectation{ // Context error
 						Context: func() context.Context {
 							ctx, cancelFunc := context.WithCancel(context.Background())
 							cancelFunc() // Cancel it instantly
@@ -989,26 +989,26 @@ func Test_defaultCodec_Work(t *testing.T) {
 			name: "work harder (message)",
 			fields: fields{
 				expectations: []spi.Expectation{
-					&DefaultExpectation{ // Expired
+					&defaultExpectation{ // Expired
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return nil
 						},
 					},
-					&DefaultExpectation{ // Expired errors
+					&defaultExpectation{ // Expired errors
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 					},
-					&DefaultExpectation{ // Fine
+					&defaultExpectation{ // Fine
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 						Expiration: time.Time{}.Add(3 * time.Hour),
 					},
-					&DefaultExpectation{ // Context error
+					&defaultExpectation{ // Context error
 						Context: func() context.Context {
 							ctx, cancelFunc := context.WithCancel(context.Background())
 							cancelFunc() // Cancel it instantly
@@ -1036,7 +1036,7 @@ func Test_defaultCodec_Work(t *testing.T) {
 			fields: fields{
 				defaultIncomingMessageChannel: make(chan spi.Message, 1),
 				expectations: []spi.Expectation{
-					&DefaultExpectation{ // Fine
+					&defaultExpectation{ // Fine
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
@@ -1059,26 +1059,26 @@ func Test_defaultCodec_Work(t *testing.T) {
 			name: "work harder (message receive error)",
 			fields: fields{
 				expectations: []spi.Expectation{
-					&DefaultExpectation{ // Expired
+					&defaultExpectation{ // Expired
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return nil
 						},
 					},
-					&DefaultExpectation{ // Expired errors
+					&defaultExpectation{ // Expired errors
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 					},
-					&DefaultExpectation{ // Fine
+					&defaultExpectation{ // Fine
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 						Expiration: time.Time{}.Add(3 * time.Hour),
 					},
-					&DefaultExpectation{ // Context error
+					&defaultExpectation{ // Context error
 						Context: func() context.Context {
 							ctx, cancelFunc := context.WithCancel(context.Background())
 							cancelFunc() // Cancel it instantly
@@ -1108,26 +1108,26 @@ func Test_defaultCodec_Work(t *testing.T) {
 					return false
 				},
 				expectations: []spi.Expectation{
-					&DefaultExpectation{ // Expired
+					&defaultExpectation{ // Expired
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return nil
 						},
 					},
-					&DefaultExpectation{ // Expired errors
+					&defaultExpectation{ // Expired errors
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 					},
-					&DefaultExpectation{ // Fine
+					&defaultExpectation{ // Fine
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 						Expiration: time.Time{}.Add(3 * time.Hour),
 					},
-					&DefaultExpectation{ // Context error
+					&defaultExpectation{ // Context error
 						Context: func() context.Context {
 							ctx, cancelFunc := context.WithCancel(context.Background())
 							cancelFunc() // Cancel it instantly
@@ -1157,26 +1157,26 @@ func Test_defaultCodec_Work(t *testing.T) {
 					return true
 				},
 				expectations: []spi.Expectation{
-					&DefaultExpectation{ // Expired
+					&defaultExpectation{ // Expired
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return nil
 						},
 					},
-					&DefaultExpectation{ // Expired errors
+					&defaultExpectation{ // Expired errors
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 					},
-					&DefaultExpectation{ // Fine
+					&defaultExpectation{ // Fine
 						Context: context.Background(),
 						HandleError: func(err error) error {
 							return errors.New("yep")
 						},
 						Expiration: time.Time{}.Add(3 * time.Hour),
 					},
-					&DefaultExpectation{ // Context error
+					&defaultExpectation{ // Context error
 						Context: func() context.Context {
 							ctx, cancelFunc := context.WithCancel(context.Background())
 							cancelFunc() // Cancel it instantly
diff --git a/plc4go/spi/default/defaultExpectation.go b/plc4go/spi/default/defaultExpectation.go
new file mode 100644
index 0000000000..fe11e0e081
--- /dev/null
+++ b/plc4go/spi/default/defaultExpectation.go
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package _default
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/apache/plc4x/plc4go/spi"
+)
+
+type defaultExpectation struct {
+	Context        context.Context
+	CreationTime   time.Time
+	Expiration     time.Time
+	AcceptsMessage spi.AcceptsMessage
+	HandleMessage  spi.HandleMessage
+	HandleError    spi.HandleError
+}
+
+func (d *defaultExpectation) GetContext() context.Context {
+	return d.Context
+}
+
+func (d *defaultExpectation) GetCreationTime() time.Time {
+	return d.CreationTime
+}
+
+func (d *defaultExpectation) GetExpiration() time.Time {
+	return d.Expiration
+}
+
+func (d *defaultExpectation) GetAcceptsMessage() spi.AcceptsMessage {
+	return d.AcceptsMessage
+}
+
+func (d *defaultExpectation) GetHandleMessage() spi.HandleMessage {
+	return d.HandleMessage
+}
+
+func (d *defaultExpectation) GetHandleError() spi.HandleError {
+	return d.HandleError
+}
+
+func (d *defaultExpectation) String() string {
+	return fmt.Sprintf("Expectation(expires at %v)", d.Expiration)
+}
diff --git a/plc4go/spi/default/mock_Expectation_test.go b/plc4go/spi/default/mock_Expectation_test.go
index 0ffc04e950..638d330a84 100644
--- a/plc4go/spi/default/mock_Expectation_test.go
+++ b/plc4go/spi/default/mock_Expectation_test.go
@@ -129,6 +129,47 @@ func (_c *MockExpectation_GetContext_Call) RunAndReturn(run func() context.Conte
 	return _c
 }
 
+// GetCreationTime provides a mock function with given fields:
+func (_m *MockExpectation) GetCreationTime() time.Time {
+	ret := _m.Called()
+
+	var r0 time.Time
+	if rf, ok := ret.Get(0).(func() time.Time); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(time.Time)
+	}
+
+	return r0
+}
+
+// MockExpectation_GetCreationTime_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCreationTime'
+type MockExpectation_GetCreationTime_Call struct {
+	*mock.Call
+}
+
+// GetCreationTime is a helper method to define mock.On call
+func (_e *MockExpectation_Expecter) GetCreationTime() *MockExpectation_GetCreationTime_Call {
+	return &MockExpectation_GetCreationTime_Call{Call: _e.mock.On("GetCreationTime")}
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) Run(run func()) *MockExpectation_GetCreationTime_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) Return(_a0 time.Time) *MockExpectation_GetCreationTime_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) RunAndReturn(run func() time.Time) *MockExpectation_GetCreationTime_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
 // GetExpiration provides a mock function with given fields:
 func (_m *MockExpectation) GetExpiration() time.Time {
 	ret := _m.Called()
diff --git a/plc4go/spi/mock_Expectation_test.go b/plc4go/spi/mock_Expectation_test.go
index c2f5369299..ede1ebbe16 100644
--- a/plc4go/spi/mock_Expectation_test.go
+++ b/plc4go/spi/mock_Expectation_test.go
@@ -127,6 +127,47 @@ func (_c *MockExpectation_GetContext_Call) RunAndReturn(run func() context.Conte
 	return _c
 }
 
+// GetCreationTime provides a mock function with given fields:
+func (_m *MockExpectation) GetCreationTime() time.Time {
+	ret := _m.Called()
+
+	var r0 time.Time
+	if rf, ok := ret.Get(0).(func() time.Time); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(time.Time)
+	}
+
+	return r0
+}
+
+// MockExpectation_GetCreationTime_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCreationTime'
+type MockExpectation_GetCreationTime_Call struct {
+	*mock.Call
+}
+
+// GetCreationTime is a helper method to define mock.On call
+func (_e *MockExpectation_Expecter) GetCreationTime() *MockExpectation_GetCreationTime_Call {
+	return &MockExpectation_GetCreationTime_Call{Call: _e.mock.On("GetCreationTime")}
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) Run(run func()) *MockExpectation_GetCreationTime_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) Return(_a0 time.Time) *MockExpectation_GetCreationTime_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) RunAndReturn(run func() time.Time) *MockExpectation_GetCreationTime_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
 // GetExpiration provides a mock function with given fields:
 func (_m *MockExpectation) GetExpiration() time.Time {
 	ret := _m.Called()
diff --git a/plc4go/spi/utils/Errors.go b/plc4go/spi/utils/Errors.go
index 339fb4b3c8..697dffaeee 100644
--- a/plc4go/spi/utils/Errors.go
+++ b/plc4go/spi/utils/Errors.go
@@ -91,7 +91,7 @@ func NewTimeoutError(timeout time.Duration) TimeoutError {
 }
 
 func (t TimeoutError) Error() string {
-	return fmt.Sprintf("got timeout after %v", t.timeout)
+	return fmt.Sprintf("got timeout after %s", t.timeout)
 }
 
 func (t TimeoutError) Is(target error) bool {


[plc4x] 05/07: feat(plc4go/spi): time custom message handling and don't block when default message channel is not being drained

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 8e0f2193356967a11df6e3e0b160b458aa8152b9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:05:29 2023 +0200

    feat(plc4go/spi): time custom message handling and don't block when default message channel is not being drained
    
    + set buffer to 100 to allow for a bit latency
---
 plc4go/spi/default/DefaultCodec.go | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 91c48d0ce9..78c3742f50 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -104,7 +104,7 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
 	return &defaultCodec{
 		DefaultCodecRequirements:      defaultCodecRequirements,
 		transportInstance:             transportInstance,
-		defaultIncomingMessageChannel: make(chan spi.Message),
+		defaultIncomingMessageChannel: make(chan spi.Message, 100),
 		expectations:                  []spi.Expectation{},
 		customMessageHandling:         customMessageHandler,
 		log:                           logger,
@@ -329,10 +329,14 @@ mainLoop:
 			time.Sleep(time.Millisecond * 10)
 			continue mainLoop
 		}
+		workerLog.Trace().Msgf("got message:\n%s", message)
 
 		if m.customMessageHandling != nil {
 			workerLog.Trace().Msg("Executing custom handling")
-			if m.customMessageHandling(codec, message) {
+			start := time.Now()
+			handled := m.customMessageHandling(codec, message)
+			workerLog.Trace().Msgf("custom handling took %s", time.Since(start))
+			if handled {
 				workerLog.Trace().Msg("Custom handling handled the message")
 				continue mainLoop
 			}
@@ -352,12 +356,9 @@ mainLoop:
 }
 
 func (m *defaultCodec) passToDefaultIncomingMessageChannel(workerLog zerolog.Logger, message spi.Message) {
-	timeout := time.NewTimer(time.Millisecond * 40)
-	defer utils.CleanupTimer(timeout)
 	select {
 	case m.defaultIncomingMessageChannel <- message:
-	case <-timeout.C:
-		timeout.Stop()
+	default:
 		workerLog.Warn().Msgf("Message discarded\n%s", message)
 	}
 }


[plc4x] 01/07: test(plc4go/spi): only set timeformat to nano when flag is set

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 40cd416bca8f9436d348cd8805db0dbcf424ec67
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 15:21:11 2023 +0200

    test(plc4go/spi): only set timeformat to nano when flag is set
---
 plc4go/spi/testutils/TestUtils.go | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/plc4go/spi/testutils/TestUtils.go b/plc4go/spi/testutils/TestUtils.go
index 3829067a1c..e3937a7ff3 100644
--- a/plc4go/spi/testutils/TestUtils.go
+++ b/plc4go/spi/testutils/TestUtils.go
@@ -140,7 +140,9 @@ func ProduceTestingLogger(t *testing.T) zerolog.Logger {
 
 			},
 			func(w *zerolog.ConsoleWriter) {
-				w.TimeFormat = time.StampNano
+				if highLogPrecision {
+					w.TimeFormat = time.StampNano
+				}
 			},
 		),
 	)


[plc4x] 02/07: test(plc4go/spi): ensure no global logger is set

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit ade41801b72fc437fe3925f39102b30ae4162748
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 16:08:40 2023 +0200

    test(plc4go/spi): ensure no global logger is set
---
 plc4go/spi/pool/executor_test.go                          | 6 ++----
 plc4go/spi/transactions/RequestTransactionManager_test.go | 5 +----
 plc4go/spi/transports/utils/TransportLogger_test.go       | 3 +--
 3 files changed, 4 insertions(+), 10 deletions(-)

diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index 81dd7485f1..ee126bb989 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -74,7 +74,6 @@ func Test_executor_IsRunning(t *testing.T) {
 		queueDepth   int
 		workItems    chan workItem
 		traceWorkers bool
-		log          zerolog.Logger
 	}
 	tests := []struct {
 		name   string
@@ -94,7 +93,7 @@ func Test_executor_IsRunning(t *testing.T) {
 				queueDepth:   tt.fields.queueDepth,
 				workItems:    tt.fields.workItems,
 				traceWorkers: tt.fields.traceWorkers,
-				log:          tt.fields.log,
+				log:          produceTestingLogger(t),
 			}
 			assert.Equalf(t, tt.want, e.IsRunning(), "IsRunning()")
 		})
@@ -419,7 +418,6 @@ func Test_executor_String(t *testing.T) {
 		queueDepth   int
 		workItems    chan workItem
 		traceWorkers bool
-		log          zerolog.Logger
 	}
 	tests := []struct {
 		name   string
@@ -472,7 +470,7 @@ func Test_executor_String(t *testing.T) {
 				queueDepth:   tt.fields.queueDepth,
 				workItems:    tt.fields.workItems,
 				traceWorkers: tt.fields.traceWorkers,
-				log:          tt.fields.log,
+				log:          produceTestingLogger(t),
 			}
 			assert.Equalf(t, tt.want, e.String(), "String()")
 		})
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 3ebe3d2367..4113411419 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -260,7 +260,6 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 		currentTransactionId       int32
 		workLog                    list.List
 		executor                   pool.Executor
-		log                        zerolog.Logger
 	}
 	type args struct {
 		transaction *requestTransaction
@@ -279,8 +278,6 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 				transaction: &requestTransaction{},
 			},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
-				fields.log = testutils.ProduceTestingLogger(t)
-
 				completionFuture := NewMockCompletionFuture(t)
 				expect := completionFuture.EXPECT()
 				expect.Cancel(true, nil).Return()
@@ -300,7 +297,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 				currentTransactionId:       tt.fields.currentTransactionId,
 				workLog:                    tt.fields.workLog,
 				executor:                   tt.fields.executor,
-				log:                        tt.fields.log,
+				log:                        testutils.ProduceTestingLogger(t),
 			}
 			if err := r.failRequest(tt.args.transaction, tt.args.err); (err != nil) != tt.wantErr {
 				t.Errorf("failRequest() error = %v, wantErr %v", err, tt.wantErr)
diff --git a/plc4go/spi/transports/utils/TransportLogger_test.go b/plc4go/spi/transports/utils/TransportLogger_test.go
index a88f1aa988..fd341dd1ab 100644
--- a/plc4go/spi/transports/utils/TransportLogger_test.go
+++ b/plc4go/spi/transports/utils/TransportLogger_test.go
@@ -80,7 +80,6 @@ func TestTransportLogger_Close(t1 *testing.T) {
 func TestTransportLogger_Read(t1 *testing.T) {
 	type fields struct {
 		source io.ReadWriteCloser
-		log    zerolog.Logger
 	}
 	type args struct {
 		p []byte
@@ -98,7 +97,7 @@ func TestTransportLogger_Read(t1 *testing.T) {
 		t1.Run(tt.name, func(t1 *testing.T) {
 			t := &TransportLogger{
 				source: tt.fields.source,
-				log:    tt.fields.log,
+				log:    testutils.ProduceTestingLogger(t1),
 			}
 			got, err := t.Read(tt.args.p)
 			if (err != nil) != tt.wantErr {


[plc4x] 06/07: feat(plc4go/cbus): improved logging

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 04662cf6d5dc35daa389460cb51e76b32eea6aa7
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:11:12 2023 +0200

    feat(plc4go/cbus): improved logging
    
    + reduced overhead logging
    + added timing information
    + add more traces
---
 plc4go/internal/cbus/Browser.go      |  35 ++++----
 plc4go/internal/cbus/Browser_test.go |  73 +++++++----------
 plc4go/internal/cbus/Connection.go   |   6 +-
 plc4go/internal/cbus/MessageCodec.go |  18 +++--
 plc4go/internal/cbus/Reader.go       | 152 +++++++++++++++++++----------------
 5 files changed, 143 insertions(+), 141 deletions(-)

diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index e9b84afcd1..ee202dec64 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -78,6 +78,8 @@ func (m *Browser) browseUnitInfo(ctx context.Context, interceptor func(result ap
 
 	if allUnits {
 		m.log.Info().Msg("Querying all (available) units")
+	} else {
+		m.log.Debug().Msgf("Querying units\n%s", units)
 	}
 unitLoop:
 	for _, unit := range units {
@@ -110,10 +112,10 @@ unitLoop:
 			readRequest, _ := m.connection.ReadRequestBuilder().
 				AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
 				Build()
-			timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
+			timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 5*time.Second)
 			m.log.Trace().Msgf("Executing readRequest\n%s\nwith timeout %s", readRequest, timeoutCtx)
 			requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
-			m.log.Trace().Msg("got a response")
+			m.log.Trace().Msgf("got a response\n%s", requestResult)
 			timeoutCancel()
 			if err := requestResult.GetErr(); err != nil {
 				if allUnits || allAttributes {
@@ -152,22 +154,22 @@ unitLoop:
 func (m *Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
 	if unitAddress := query.unitAddress; unitAddress != nil {
 		return []readWriteModel.UnitAddress{unitAddress}, false, nil
-	} else {
-		// TODO: check if we still want the option to brute force all addresses
-		installedUnitAddressBytes, err := getInstalledUnitAddressBytes(ctx)
-		if err != nil {
-			return nil, false, errors.New("Unable to get installed uints")
-		}
+	}
 
-		var units []readWriteModel.UnitAddress
-		for i := 0; i <= 0xFF; i++ {
-			unitAddressByte := byte(i)
-			if _, ok := installedUnitAddressBytes[unitAddressByte]; ok {
-				units = append(units, readWriteModel.NewUnitAddress(unitAddressByte))
-			}
+	// TODO: check if we still want the option to brute force all addresses
+	installedUnitAddressBytes, err := getInstalledUnitAddressBytes(ctx)
+	if err != nil {
+		return nil, false, errors.New("Unable to get installed uints")
+	}
+
+	var units []readWriteModel.UnitAddress
+	for i := 0; i <= 0xFF; i++ {
+		unitAddressByte := byte(i)
+		if _, ok := installedUnitAddressBytes[unitAddressByte]; ok {
+			units = append(units, readWriteModel.NewUnitAddress(unitAddressByte))
 		}
-		return units, true, nil
 	}
+	return units, true, nil
 }
 
 func (m *Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
@@ -292,7 +294,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
 		AddTagAddress("installationMMI", "status/binary/0xFF").
 		Build()
 	if err != nil {
-		return nil, errors.Wrap(err, "Error getting the installation MMI")
+		return nil, errors.Wrap(err, "Error building the installation MMI")
 	}
 	readCtx, readCtxCancel := context.WithTimeout(ctx, time.Second*2)
 	defer readCtxCancel()
@@ -306,6 +308,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
 			}
 		}()
 		defer readCtxCancel()
+		m.log.Debug().Msgf("sending read request\n%s", readRequest)
 		readRequestResult := <-readRequest.ExecuteWithContext(readCtx)
 		if err := readRequestResult.GetErr(); err != nil {
 			m.log.Warn().Err(err).Msg("Error reading the mmi")
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index a4fdfb4324..d9438a7659 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -21,8 +21,8 @@ package cbus
 
 import (
 	"context"
+	"encoding/hex"
 	"fmt"
-	"github.com/rs/zerolog"
 	"net/url"
 	"sync"
 	"sync/atomic"
@@ -103,13 +103,13 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 					INTERFACE_OPTIONS_3
 					INTERFACE_OPTIONS_1_PUN
 					INTERFACE_OPTIONS_1
-					MANUFACTURER
 					DONE
 				)
 				currentState := atomic.Value{}
 				currentState.Store(RESET)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -141,24 +141,17 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 						t.Log("Dispatching interface 1 echo and confirm???")
 						transportInstance.FillReadBuffer([]byte("@A3300079\r"))
 						transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
-						currentState.Store(MANUFACTURER)
-					case MANUFACTURER:
-						t.Log("Dispatching manufacturer")
-						transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
-						dispatchWg := sync.WaitGroup{}
-						dispatchWg.Add(1)
-						t.Cleanup(dispatchWg.Wait)
-						go func() {
-							defer dispatchWg.Done()
-							time.Sleep(200 * time.Millisecond)
-							t.Log("Dispatching 3 MMI segments")
-							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
-						}()
+
+						t.Log("Dispatching 3 MMI segments")
+						transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+
+						t.Log("Dispatching manufacturer")
+						transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
 					}
 				})
 				err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
@@ -222,7 +215,6 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 		DefaultBrowser  _default.DefaultBrowser
 		connection      plc4go.PlcConnection
 		sequenceCounter uint8
-		log             zerolog.Logger
 	}
 	type args struct {
 		ctx         context.Context
@@ -267,13 +259,13 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 					INTERFACE_OPTIONS_3
 					INTERFACE_OPTIONS_1_PUN
 					INTERFACE_OPTIONS_1
-					MANUFACTURER
 					DONE
 				)
 				currentState := atomic.Value{}
 				currentState.Store(RESET)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -305,24 +297,17 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 						t.Log("Dispatching interface 1 echo and confirm???")
 						transportInstance.FillReadBuffer([]byte("@A3300079\r"))
 						transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
-						currentState.Store(MANUFACTURER)
-					case MANUFACTURER:
-						t.Log("Dispatching manufacturer")
-						transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
-						dispatchWg := sync.WaitGroup{}
-						dispatchWg.Add(1)
-						t.Cleanup(dispatchWg.Wait)
-						go func() {
-							defer dispatchWg.Done()
-							time.Sleep(200 * time.Millisecond)
-							t.Log("Dispatching 3 MMI segments")
-							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
-						}()
+
+						t.Log("Dispatching 3 MMI segments")
+						transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+
+						t.Log("Dispatching manufacturer")
+						transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
 					}
 				})
 				err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
@@ -363,12 +348,13 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			if tt.setup != nil {
 				tt.setup(t, &tt.fields)
+				t.Log("Setup done")
 			}
 			m := &Browser{
 				DefaultBrowser:  tt.fields.DefaultBrowser,
 				connection:      tt.fields.connection,
 				sequenceCounter: tt.fields.sequenceCounter,
-				log:             tt.fields.log,
+				log:             testutils.ProduceTestingLogger(t),
 			}
 			gotResponseCode, gotQueryResults := m.browseUnitInfo(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
 			assert.Equalf(t, tt.wantResponseCode, gotResponseCode, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
@@ -552,6 +538,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
 				currentState.Store(RESET)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -586,17 +573,11 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
-						dispatchWg := sync.WaitGroup{}
-						dispatchWg.Add(1)
-						t.Cleanup(dispatchWg.Wait)
-						go func() {
-							defer dispatchWg.Done()
-							time.Sleep(200 * time.Millisecond)
-							t.Log("Dispatching 3 MMI segments")
-							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
-						}()
+
+						t.Log("Dispatching 3 MMI segments")
+						transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
 					}
 				})
 				err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 5b4f767815..b23c76b986 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -285,16 +285,16 @@ func (c *Connection) startSubscriptionHandler() {
 		mmiLogger.Debug().Msg("default MMI started")
 		for c.IsConnected() {
 			for calReply := range c.messageCodec.monitoredMMIs {
-				mmiLogger.Trace().Msgf("got a MMI:\n%s", calReply)
+				mmiLogger.Trace().Msgf("got a MMI")
 				handled := false
 				for _, subscriber := range c.subscribers {
 					if ok := subscriber.handleMonitoredMMI(calReply); ok {
-						mmiLogger.Debug().Msgf("\n%v handled\n%s", subscriber, calReply)
+						mmiLogger.Debug().Msgf("\n%v handled", subscriber)
 						handled = true
 					}
 				}
 				if !handled {
-					mmiLogger.Debug().Msgf("MMI was not handled:\n%s", calReply)
+					mmiLogger.Debug().Msgf("MMI was not handled")
 				}
 			}
 		}
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 831c13ef86..e2856f57c2 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -22,14 +22,14 @@ package cbus
 import (
 	"bufio"
 	"context"
-	"sync"
-	"sync/atomic"
-
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
+	"sync"
+	"sync/atomic"
+	"time"
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
@@ -101,7 +101,7 @@ func (m *MessageCodec) Disconnect() error {
 }
 
 func (m *MessageCodec) Send(message spi.Message) error {
-	m.log.Trace().Msg("Sending message")
+	m.log.Trace().Msgf("Sending message\n%s", message)
 	// Cast the message to the correct type of struct
 	cbusMessage, ok := message.(readWriteModel.CBusMessage)
 	if !ok {
@@ -329,25 +329,29 @@ lookingForTheEnd:
 	}
 	m.log.Debug().Msgf("Parsing %q", sanitizedInput)
 	ctxForModel := options.GetLoggerContextForModel(context.TODO(), m.log, options.WithPassLoggerToModel(m.passLogToModel))
+	start := time.Now()
 	cBusMessage, err := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, pciResponse, m.requestContext, m.cbusOptions)
+	m.log.Trace().Msgf("Parsing took %s", time.Since(start))
 	if err != nil {
 		m.log.Debug().Err(err).Msg("First Parse Failed")
 		{ // Try SAL
+			m.log.Trace().Msg("try SAL")
 			requestContext := readWriteModel.NewRequestContext(false)
 			cBusMessage, secondErr := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, pciResponse, requestContext, m.cbusOptions)
 			if secondErr == nil {
-				m.log.Trace().Msgf("Parsed message as SAL:\n%s", cBusMessage)
+				m.log.Trace().Msgf("Parsed message as SAL")
 				return cBusMessage, nil
 			} else {
 				m.log.Debug().Err(secondErr).Msg("SAL parse failed too")
 			}
 		}
 		{ // Try MMI
+			m.log.Trace().Msg("try MMI")
 			requestContext := readWriteModel.NewRequestContext(false)
 			cbusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
 			cBusMessage, secondErr := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, true, requestContext, cbusOptions)
 			if secondErr == nil {
-				m.log.Trace().Msgf("Parsed message as MMI:\n%s", cBusMessage)
+				m.log.Trace().Msg("Parsed message as MMI")
 				return cBusMessage, nil
 			} else {
 				m.log.Debug().Err(secondErr).Msg("CAL parse failed too")
@@ -358,13 +362,11 @@ lookingForTheEnd:
 		return nil, nil
 	}
 
-	m.log.Trace().Msgf("Parsed message:\n%s", cBusMessage)
 	return cBusMessage, nil
 }
 
 func extractMMIAndSAL(log zerolog.Logger) _default.CustomMessageHandler {
 	return func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
-		log.Trace().Msgf("Custom handling message:\n%s", message)
 		switch message := message.(type) {
 		case readWriteModel.CBusMessageToClientExactly:
 			switch reply := message.GetReply().(type) {
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 6949369e86..3a7063cb8f 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -127,87 +127,103 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS
 	// Start a new request-transaction (Is ended in the response-handler)
 	transaction := m.tm.StartTransaction()
 	transaction.Submit(func(transaction transactions.RequestTransaction) {
+		m.log.Trace().Msgf("Transaction getting handled:\n%s", transaction)
 		m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
 	})
 	if err := transaction.AwaitCompletion(ctx); err != nil {
 		m.log.Warn().Err(err).Msg("Error while awaiting completion")
 	}
-	m.log.Trace().Msg("Finished waiting")
+	m.log.Trace().Msg("Finished waiting for transaction to end")
 }
 
 func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
-	// Send the  over the wire
-	m.log.Trace().Msg("Send ")
-	if err := m.messageCodec.SendRequest(ctx, messageToSend, func(cbusMessage spi.Message) bool {
-		messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
-		if !ok {
-			return false
-		}
-		// Check if this errored
-		if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-			// This means we must handle this below
-			return true
-		}
+	// Send the over the wire
+	m.log.Trace().Msg("send over the wire")
+	if deadline, ok := ctx.Deadline(); ok {
+		m.log.Debug().Msgf("Message expires in %s", deadline.Sub(time.Now()))
+	}
+	if err := m.messageCodec.SendRequest(
+		ctx,
+		messageToSend,
+		func(cbusMessage spi.Message) bool {
+			m.log.Trace().Msgf("Checking\n%T", cbusMessage)
+			messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
+			if !ok {
+				m.log.Trace().Msg("Not a message to client")
+				return false
+			}
+			// Check if this errored
+			if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+				// This means we must handle this below
+				m.log.Trace().Msg("It is a error, we will handle it")
+				return true
+			}
 
-		confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-		if !ok {
-			return false
-		}
-		actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
-		// TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
-		expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
-		return actualAlpha == expectedAlpha
-	}, func(receivedMessage spi.Message) error {
-		// Convert the response into an
-		m.log.Trace().Msg("convert response to ")
-		messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
-		if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-			m.log.Trace().Msg("We got a server failure")
-			addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
-			return transaction.EndRequest()
-		}
-		replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-		if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
-			var responseCode apiModel.PlcResponseCode
-			switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
-			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
-				responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
-			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
-				responseCode = apiModel.PlcResponseCode_INVALID_DATA
-			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
-				responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
-			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
-				responseCode = apiModel.PlcResponseCode_INVALID_DATA
-			default:
-				return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
+			confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+			if !ok {
+				m.log.Trace().Msg("it is not a confirmation")
+				return false
+			}
+			actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
+			// TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
+			expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
+			m.log.Trace().Msgf("Comparing expected alpha '%c' to actual alpha '%c'", expectedAlpha, actualAlpha)
+			return actualAlpha == expectedAlpha
+		},
+		func(receivedMessage spi.Message) error {
+			// Convert the response into an
+			m.log.Trace().Msgf("convert message: %T", receivedMessage)
+			messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
+			if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+				m.log.Trace().Msg("We got a server failure")
+				addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
+				return transaction.EndRequest()
+			}
+			replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+			if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
+				var responseCode apiModel.PlcResponseCode
+				switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
+					responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
+					responseCode = apiModel.PlcResponseCode_INVALID_DATA
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
+					responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
+					responseCode = apiModel.PlcResponseCode_INVALID_DATA
+				default:
+					return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
+				}
+				m.log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
+				addResponseCode(tagName, responseCode)
+				return transaction.EndRequest()
 			}
-			m.log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
-			addResponseCode(tagName, responseCode)
-			return transaction.EndRequest()
-		}
 
-		alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
-		// TODO: it could be double confirmed but this is not implemented yet
-		embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
-		if !ok {
-			m.log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
-			addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
-			return transaction.EndRequest()
-		}
+			alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
+			// TODO: it could be double confirmed but this is not implemented yet
+			embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+			if !ok {
+				m.log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+				addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
+				return transaction.EndRequest()
+			}
 
-		m.log.Trace().Msg("Handling confirmed data")
-		// TODO: check if we can use a plcValueSerializer
-		encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
-		if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
-			log.Error().Err(err).Msg("error encoding reply")
-			addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+			m.log.Trace().Msg("Handling confirmed data")
+			// TODO: check if we can use a plcValueSerializer
+			encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
+			if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
+				log.Error().Err(err).Msg("error encoding reply")
+				addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+				return transaction.EndRequest()
+			}
 			return transaction.EndRequest()
-		}
-		return transaction.EndRequest()
-	}, func(err error) error {
-		addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
-		return transaction.FailRequest(err)
-	}, time.Second*1); err != nil {
+		},
+		func(err error) error {
+			m.log.Trace().Err(err).Msg("got and error")
+			addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+			return transaction.FailRequest(err)
+		},
+		time.Second*5); err != nil {
 		m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
 		addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
 		if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {


[plc4x] 04/07: feat(plc4go/spi): improve logging for request transaction

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 65796b03e9dbba26ab533ecafb3703e727f15d04
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:04:20 2023 +0200

    feat(plc4go/spi): improve logging for request transaction
---
 plc4go/spi/transactions/RequestTransaction.go | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/plc4go/spi/transactions/RequestTransaction.go b/plc4go/spi/transactions/RequestTransaction.go
index 027df85cc1..18f88c7293 100644
--- a/plc4go/spi/transactions/RequestTransaction.go
+++ b/plc4go/spi/transactions/RequestTransaction.go
@@ -105,16 +105,17 @@ func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
 	if t.operation != nil {
 		t.transactionLog.Warn().Msg("Operation already set")
 	}
-	t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
+	t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Submission")
 	t.operation = func() {
-		t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
+		t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Start operation")
 		operation(t)
-		t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
+		t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Completed operation")
 	}
 	t.parent.submitTransaction(t)
 }
 
 func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
+	t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Awaiting completion")
 	timeout, cancelFunc := context.WithTimeout(ctx, time.Minute*30) // This is intentionally set very high
 	defer cancelFunc()
 	for t.completionFuture == nil {
@@ -125,6 +126,7 @@ func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
 		}
 	}
 	if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
+		t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Errored")
 		return err
 	}
 	stillActive := true
@@ -139,6 +141,7 @@ func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
 		}
 		t.parent.runningRequestMutex.RUnlock()
 	}
+	t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Completed")
 	return nil
 }