You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 16:46:17 UTC
[plc4x] branch develop updated (72432316d4 -> bf275e23d1)
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
from 72432316d4 test(plc4go/cbus): fix failing new driver context test
new 40cd416bca test(plc4go/spi): only set timeformat to nano when flag is set
new ade41801b7 test(plc4go/spi): ensure no global logger is set
new 9db3034ea5 fix(plc4go/spi): fix timeout output using the wrong duration
new 65796b03e9 feat(plc4go/spi): improve logging for request transaction
new 8e0f219335 feat(plc4go/spi): time custom message handling and don't block when default message channel is not being drained
new 04662cf6d5 feat(plc4go/cbus): improved logging
new bf275e23d1 fix(plc4go/cbus): fix error when reader doesn't get a alpha capable response
The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
plc4go/internal/cbus/Browser.go | 35 ++---
plc4go/internal/cbus/Browser_test.go | 73 ++++------
plc4go/internal/cbus/Connection.go | 6 +-
plc4go/internal/cbus/MessageCodec.go | 18 +--
plc4go/internal/cbus/Reader.go | 159 ++++++++++++---------
plc4go/internal/cbus/Reader_test.go | 67 ++-------
plc4go/spi/MessageCodec.go | 1 +
plc4go/spi/default/DefaultCodec.go | 51 ++-----
plc4go/spi/default/DefaultCodec_test.go | 82 +++++------
plc4go/spi/default/defaultExpectation.go | 65 +++++++++
plc4go/spi/default/mock_Expectation_test.go | 41 ++++++
plc4go/spi/mock_Expectation_test.go | 41 ++++++
plc4go/spi/pool/executor_test.go | 6 +-
plc4go/spi/testutils/TestUtils.go | 4 +-
plc4go/spi/transactions/RequestTransaction.go | 9 +-
.../transactions/RequestTransactionManager_test.go | 5 +-
.../spi/transports/utils/TransportLogger_test.go | 3 +-
plc4go/spi/utils/Errors.go | 2 +-
18 files changed, 377 insertions(+), 291 deletions(-)
create mode 100644 plc4go/spi/default/defaultExpectation.go
[plc4x] 07/07: fix(plc4go/cbus): fix error when reader doesn't get a alpha capable response
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit bf275e23d1f043f70ff8328d320eb40d1ebd0180
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:43:53 2023 +0200
fix(plc4go/cbus): fix error when reader doesn't get a alpha capable response
---
plc4go/internal/cbus/Reader.go | 15 ++++++---
plc4go/internal/cbus/Reader_test.go | 67 ++++++++-----------------------------
2 files changed, 25 insertions(+), 57 deletions(-)
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 3a7063cb8f..55015da183 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -139,14 +139,16 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS
func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Send the over the wire
m.log.Trace().Msg("send over the wire")
+ ttl := time.Second * 5
if deadline, ok := ctx.Deadline(); ok {
- m.log.Debug().Msgf("Message expires in %s", deadline.Sub(time.Now()))
+ ttl = -time.Since(deadline)
+ m.log.Debug().Msgf("setting ttl to %s", ttl)
}
if err := m.messageCodec.SendRequest(
ctx,
messageToSend,
func(cbusMessage spi.Message) bool {
- m.log.Trace().Msgf("Checking\n%T", cbusMessage)
+ m.log.Trace().Msgf("Checking %T", cbusMessage)
messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
if !ok {
m.log.Trace().Msg("Not a message to client")
@@ -166,7 +168,12 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
}
actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
// TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
- expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
+ alphaRetriever, ok := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha })
+ if !ok {
+ m.log.Trace().Msg("no alpha there")
+ return false
+ }
+ expectedAlpha := alphaRetriever.GetAlpha().GetCharacter()
m.log.Trace().Msgf("Comparing expected alpha '%c' to actual alpha '%c'", expectedAlpha, actualAlpha)
return actualAlpha == expectedAlpha
},
@@ -223,7 +230,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
return transaction.FailRequest(err)
},
- time.Second*5); err != nil {
+ ttl); err != nil {
m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index cc017ac3a0..e66c1d7534 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -234,11 +234,7 @@ func TestReader_readSync(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
- ctx: func() context.Context {
- timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- t.Cleanup(cancel)
- return timeout
- }(),
+ ctx: testutils.TestContext(t),
readRequest: spiModel.NewDefaultPlcReadRequest(
map[string]apiModel.PlcTag{
"blub": NewCALIdentifyTag(readWriteModel.NewUnitAddress(2), nil, readWriteModel.Attribute_Type, 1),
@@ -397,11 +393,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
- ctx: func() context.Context {
- timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- t.Cleanup(cancel)
- return timeout
- }(),
+ ctx: testutils.TestContext(t),
messageToSend: nil,
addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
@@ -440,16 +432,12 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
},
},
{
- name: "Send message which responds with message to client",
+ name: "Send message which responds with message to server",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
- ctx: func() context.Context {
- timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- t.Cleanup(cancel)
- return timeout
- }(),
+ ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestReset(
readWriteModel.RequestType_RESET,
@@ -470,7 +458,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
- assert.Equal(t, apiModel.PlcResponseCode_REQUEST_TIMEOUT, responseCode)
+ assert.Equal(t, apiModel.PlcResponseCode_INTERNAL_ERROR, responseCode)
}
},
tagName: "horst",
@@ -527,11 +515,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
- ctx: func() context.Context {
- timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- t.Cleanup(cancel)
- return timeout
- }(),
+ ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestReset(
readWriteModel.RequestType_RESET,
@@ -610,11 +594,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
- ctx: func() context.Context {
- timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- t.Cleanup(cancel)
- return timeout
- }(),
+ ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
@@ -695,11 +675,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
- ctx: func() context.Context {
- timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- t.Cleanup(cancel)
- return timeout
- }(),
+ ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
@@ -780,11 +756,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
- ctx: func() context.Context {
- timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- t.Cleanup(cancel)
- return timeout
- }(),
+ ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
@@ -865,11 +837,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
- ctx: func() context.Context {
- timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- t.Cleanup(cancel)
- return timeout
- }(),
+ ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
@@ -950,11 +918,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
- ctx: func() context.Context {
- timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- t.Cleanup(cancel)
- return timeout
- }(),
+ ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
@@ -1035,11 +999,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
- ctx: func() context.Context {
- timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- t.Cleanup(cancel)
- return timeout
- }(),
+ ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
@@ -1125,10 +1085,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: tt.fields.alphaGenerator,
messageCodec: tt.fields.messageCodec,
tm: tt.fields.tm,
+ log: testutils.ProduceTestingLogger(t),
}
m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t), tt.args.tagName, tt.args.addPlcValue(t))
t.Log("Waiting now")
- timer := time.NewTimer(3 * time.Second)
+ timer := time.NewTimer(10 * time.Second)
defer utils.CleanupTimer(timer)
select {
case <-ch:
[plc4x] 03/07: fix(plc4go/spi): fix timeout output using the wrong duration
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 9db3034ea5d4d1e27819accac73aab4b48afcedf
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 16:51:57 2023 +0200
fix(plc4go/spi): fix timeout output using the wrong duration
---
plc4go/spi/MessageCodec.go | 1 +
plc4go/spi/default/DefaultCodec.go | 38 ++-----------
plc4go/spi/default/DefaultCodec_test.go | 82 ++++++++++++++---------------
plc4go/spi/default/defaultExpectation.go | 65 +++++++++++++++++++++++
plc4go/spi/default/mock_Expectation_test.go | 41 +++++++++++++++
plc4go/spi/mock_Expectation_test.go | 41 +++++++++++++++
plc4go/spi/utils/Errors.go | 2 +-
7 files changed, 193 insertions(+), 77 deletions(-)
diff --git a/plc4go/spi/MessageCodec.go b/plc4go/spi/MessageCodec.go
index 94826da578..4127c4c688 100644
--- a/plc4go/spi/MessageCodec.go
+++ b/plc4go/spi/MessageCodec.go
@@ -28,6 +28,7 @@ import (
type Expectation interface {
fmt.Stringer
GetContext() context.Context
+ GetCreationTime() time.Time
GetExpiration() time.Time
GetAcceptsMessage() AcceptsMessage
GetHandleMessage() HandleMessage
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index e365c5b095..91c48d0ce9 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -21,7 +21,6 @@ package _default
import (
"context"
- "fmt"
"runtime/debug"
"sync"
"sync/atomic"
@@ -56,14 +55,6 @@ func NewDefaultCodec(requirements DefaultCodecRequirements, transportInstance tr
return buildDefaultCodec(requirements, transportInstance, options...)
}
-type DefaultExpectation struct {
- Context context.Context
- Expiration time.Time
- AcceptsMessage spi.AcceptsMessage
- HandleMessage spi.HandleMessage
- HandleError spi.HandleError
-}
-
type CustomMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool
func WithCustomMessageHandler(customMessageHandler CustomMessageHandler) options.WithOption {
@@ -126,30 +117,6 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
///////////////////////////////////////
///////////////////////////////////////
-func (d *DefaultExpectation) GetContext() context.Context {
- return d.Context
-}
-
-func (d *DefaultExpectation) GetExpiration() time.Time {
- return d.Expiration
-}
-
-func (d *DefaultExpectation) GetAcceptsMessage() spi.AcceptsMessage {
- return d.AcceptsMessage
-}
-
-func (d *DefaultExpectation) GetHandleMessage() spi.HandleMessage {
- return d.HandleMessage
-}
-
-func (d *DefaultExpectation) GetHandleError() spi.HandleError {
- return d.HandleError
-}
-
-func (d *DefaultExpectation) String() string {
- return fmt.Sprintf("Expectation(expires at %v)", d.Expiration)
-}
-
func (m *defaultCodec) GetTransportInstance() transports.TransportInstance {
return m.transportInstance
}
@@ -211,8 +178,9 @@ func (m *defaultCodec) IsRunning() bool {
func (m *defaultCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
m.expectationsChangeMutex.Lock()
defer m.expectationsChangeMutex.Unlock()
- expectation := &DefaultExpectation{
+ expectation := &defaultExpectation{
Context: ctx,
+ CreationTime: time.Now(),
Expiration: time.Now().Add(ttl),
AcceptsMessage: acceptsMessage,
HandleMessage: handleMessage,
@@ -247,7 +215,7 @@ func (m *defaultCodec) TimeoutExpectations(now time.Time) {
i--
// Call the error handler.
go func(expectation spi.Expectation) {
- if err := expectation.GetHandleError()(utils.NewTimeoutError(now.Sub(expectation.GetExpiration()))); err != nil {
+ if err := expectation.GetHandleError()(utils.NewTimeoutError(expectation.GetExpiration().Sub(expectation.GetCreationTime()))); err != nil {
m.log.Error().Err(err).Msg("Got an error handling error on expectation")
}
}(expectation)
diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go
index c6829c64a5..19bde057ac 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -59,7 +59,7 @@ func TestDefaultExpectation_GetAcceptsMessage(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- m := &DefaultExpectation{
+ m := &defaultExpectation{
Context: tt.fields.Context,
Expiration: tt.fields.Expiration,
AcceptsMessage: tt.fields.AcceptsMessage,
@@ -90,7 +90,7 @@ func TestDefaultExpectation_GetContext(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- m := &DefaultExpectation{
+ m := &defaultExpectation{
Context: tt.fields.Context,
Expiration: tt.fields.Expiration,
AcceptsMessage: tt.fields.AcceptsMessage,
@@ -121,7 +121,7 @@ func TestDefaultExpectation_GetExpiration(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- m := &DefaultExpectation{
+ m := &defaultExpectation{
Context: tt.fields.Context,
Expiration: tt.fields.Expiration,
AcceptsMessage: tt.fields.AcceptsMessage,
@@ -156,7 +156,7 @@ func TestDefaultExpectation_GetHandleError(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- m := &DefaultExpectation{
+ m := &defaultExpectation{
Context: tt.fields.Context,
Expiration: tt.fields.Expiration,
AcceptsMessage: tt.fields.AcceptsMessage,
@@ -191,7 +191,7 @@ func TestDefaultExpectation_GetHandleMessage(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- m := &DefaultExpectation{
+ m := &defaultExpectation{
Context: tt.fields.Context,
Expiration: tt.fields.Expiration,
AcceptsMessage: tt.fields.AcceptsMessage,
@@ -223,7 +223,7 @@ func TestDefaultExpectation_String(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- m := &DefaultExpectation{
+ m := &defaultExpectation{
Context: tt.fields.Context,
Expiration: tt.fields.Expiration,
AcceptsMessage: tt.fields.AcceptsMessage,
@@ -617,12 +617,12 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
name: "handle some",
fields: fields{
expectations: []spi.Expectation{
- &DefaultExpectation{ // doesn't accept
+ &defaultExpectation{ // doesn't accept
AcceptsMessage: func(_ spi.Message) bool {
return false
},
},
- &DefaultExpectation{ // accepts but fails
+ &defaultExpectation{ // accepts but fails
AcceptsMessage: func(_ spi.Message) bool {
return true
},
@@ -633,7 +633,7 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
return nil
},
},
- &DefaultExpectation{ // accepts but fails and fails to handle the error
+ &defaultExpectation{ // accepts but fails and fails to handle the error
AcceptsMessage: func(_ spi.Message) bool {
return true
},
@@ -644,7 +644,7 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
return errors.New("I failed completely")
},
},
- &DefaultExpectation{ // accepts
+ &defaultExpectation{ // accepts
AcceptsMessage: func(_ spi.Message) bool {
return true
},
@@ -652,7 +652,7 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
return nil
},
},
- &DefaultExpectation{ // accepts
+ &defaultExpectation{ // accepts
AcceptsMessage: func(_ spi.Message) bool {
return true
},
@@ -660,7 +660,7 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
return nil
},
},
- &DefaultExpectation{ // accepts
+ &defaultExpectation{ // accepts
AcceptsMessage: func(_ spi.Message) bool {
return true
},
@@ -823,26 +823,26 @@ func Test_defaultCodec_TimeoutExpectations(t *testing.T) {
name: "timeout some",
fields: fields{
expectations: []spi.Expectation{
- &DefaultExpectation{ // Expired
+ &defaultExpectation{ // Expired
Context: context.Background(),
HandleError: func(err error) error {
return nil
},
},
- &DefaultExpectation{ // Expired errors
+ &defaultExpectation{ // Expired errors
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
},
- &DefaultExpectation{ // Fine
+ &defaultExpectation{ // Fine
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
Expiration: time.Time{}.Add(3 * time.Hour),
},
- &DefaultExpectation{ // Context error
+ &defaultExpectation{ // Context error
Context: func() context.Context {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc() // Cancel it instantly
@@ -902,26 +902,26 @@ func Test_defaultCodec_Work(t *testing.T) {
name: "work hard (panics everywhere)",
fields: fields{
expectations: []spi.Expectation{
- &DefaultExpectation{ // Expired
+ &defaultExpectation{ // Expired
Context: context.Background(),
HandleError: func(err error) error {
return nil
},
},
- &DefaultExpectation{ // Expired errors
+ &defaultExpectation{ // Expired errors
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
},
- &DefaultExpectation{ // Fine
+ &defaultExpectation{ // Fine
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
Expiration: time.Time{}.Add(3 * time.Hour),
},
- &DefaultExpectation{ // Context error
+ &defaultExpectation{ // Context error
Context: func() context.Context {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc() // Cancel it instantly
@@ -943,26 +943,26 @@ func Test_defaultCodec_Work(t *testing.T) {
name: "work harder (nil message)",
fields: fields{
expectations: []spi.Expectation{
- &DefaultExpectation{ // Expired
+ &defaultExpectation{ // Expired
Context: context.Background(),
HandleError: func(err error) error {
return nil
},
},
- &DefaultExpectation{ // Expired errors
+ &defaultExpectation{ // Expired errors
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
},
- &DefaultExpectation{ // Fine
+ &defaultExpectation{ // Fine
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
Expiration: time.Time{}.Add(3 * time.Hour),
},
- &DefaultExpectation{ // Context error
+ &defaultExpectation{ // Context error
Context: func() context.Context {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc() // Cancel it instantly
@@ -989,26 +989,26 @@ func Test_defaultCodec_Work(t *testing.T) {
name: "work harder (message)",
fields: fields{
expectations: []spi.Expectation{
- &DefaultExpectation{ // Expired
+ &defaultExpectation{ // Expired
Context: context.Background(),
HandleError: func(err error) error {
return nil
},
},
- &DefaultExpectation{ // Expired errors
+ &defaultExpectation{ // Expired errors
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
},
- &DefaultExpectation{ // Fine
+ &defaultExpectation{ // Fine
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
Expiration: time.Time{}.Add(3 * time.Hour),
},
- &DefaultExpectation{ // Context error
+ &defaultExpectation{ // Context error
Context: func() context.Context {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc() // Cancel it instantly
@@ -1036,7 +1036,7 @@ func Test_defaultCodec_Work(t *testing.T) {
fields: fields{
defaultIncomingMessageChannel: make(chan spi.Message, 1),
expectations: []spi.Expectation{
- &DefaultExpectation{ // Fine
+ &defaultExpectation{ // Fine
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
@@ -1059,26 +1059,26 @@ func Test_defaultCodec_Work(t *testing.T) {
name: "work harder (message receive error)",
fields: fields{
expectations: []spi.Expectation{
- &DefaultExpectation{ // Expired
+ &defaultExpectation{ // Expired
Context: context.Background(),
HandleError: func(err error) error {
return nil
},
},
- &DefaultExpectation{ // Expired errors
+ &defaultExpectation{ // Expired errors
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
},
- &DefaultExpectation{ // Fine
+ &defaultExpectation{ // Fine
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
Expiration: time.Time{}.Add(3 * time.Hour),
},
- &DefaultExpectation{ // Context error
+ &defaultExpectation{ // Context error
Context: func() context.Context {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc() // Cancel it instantly
@@ -1108,26 +1108,26 @@ func Test_defaultCodec_Work(t *testing.T) {
return false
},
expectations: []spi.Expectation{
- &DefaultExpectation{ // Expired
+ &defaultExpectation{ // Expired
Context: context.Background(),
HandleError: func(err error) error {
return nil
},
},
- &DefaultExpectation{ // Expired errors
+ &defaultExpectation{ // Expired errors
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
},
- &DefaultExpectation{ // Fine
+ &defaultExpectation{ // Fine
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
Expiration: time.Time{}.Add(3 * time.Hour),
},
- &DefaultExpectation{ // Context error
+ &defaultExpectation{ // Context error
Context: func() context.Context {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc() // Cancel it instantly
@@ -1157,26 +1157,26 @@ func Test_defaultCodec_Work(t *testing.T) {
return true
},
expectations: []spi.Expectation{
- &DefaultExpectation{ // Expired
+ &defaultExpectation{ // Expired
Context: context.Background(),
HandleError: func(err error) error {
return nil
},
},
- &DefaultExpectation{ // Expired errors
+ &defaultExpectation{ // Expired errors
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
},
- &DefaultExpectation{ // Fine
+ &defaultExpectation{ // Fine
Context: context.Background(),
HandleError: func(err error) error {
return errors.New("yep")
},
Expiration: time.Time{}.Add(3 * time.Hour),
},
- &DefaultExpectation{ // Context error
+ &defaultExpectation{ // Context error
Context: func() context.Context {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc() // Cancel it instantly
diff --git a/plc4go/spi/default/defaultExpectation.go b/plc4go/spi/default/defaultExpectation.go
new file mode 100644
index 0000000000..fe11e0e081
--- /dev/null
+++ b/plc4go/spi/default/defaultExpectation.go
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package _default
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/apache/plc4x/plc4go/spi"
+)
+
+type defaultExpectation struct {
+ Context context.Context
+ CreationTime time.Time
+ Expiration time.Time
+ AcceptsMessage spi.AcceptsMessage
+ HandleMessage spi.HandleMessage
+ HandleError spi.HandleError
+}
+
+func (d *defaultExpectation) GetContext() context.Context {
+ return d.Context
+}
+
+func (d *defaultExpectation) GetCreationTime() time.Time {
+ return d.CreationTime
+}
+
+func (d *defaultExpectation) GetExpiration() time.Time {
+ return d.Expiration
+}
+
+func (d *defaultExpectation) GetAcceptsMessage() spi.AcceptsMessage {
+ return d.AcceptsMessage
+}
+
+func (d *defaultExpectation) GetHandleMessage() spi.HandleMessage {
+ return d.HandleMessage
+}
+
+func (d *defaultExpectation) GetHandleError() spi.HandleError {
+ return d.HandleError
+}
+
+func (d *defaultExpectation) String() string {
+ return fmt.Sprintf("Expectation(expires at %v)", d.Expiration)
+}
diff --git a/plc4go/spi/default/mock_Expectation_test.go b/plc4go/spi/default/mock_Expectation_test.go
index 0ffc04e950..638d330a84 100644
--- a/plc4go/spi/default/mock_Expectation_test.go
+++ b/plc4go/spi/default/mock_Expectation_test.go
@@ -129,6 +129,47 @@ func (_c *MockExpectation_GetContext_Call) RunAndReturn(run func() context.Conte
return _c
}
+// GetCreationTime provides a mock function with given fields:
+func (_m *MockExpectation) GetCreationTime() time.Time {
+ ret := _m.Called()
+
+ var r0 time.Time
+ if rf, ok := ret.Get(0).(func() time.Time); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(time.Time)
+ }
+
+ return r0
+}
+
+// MockExpectation_GetCreationTime_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCreationTime'
+type MockExpectation_GetCreationTime_Call struct {
+ *mock.Call
+}
+
+// GetCreationTime is a helper method to define mock.On call
+func (_e *MockExpectation_Expecter) GetCreationTime() *MockExpectation_GetCreationTime_Call {
+ return &MockExpectation_GetCreationTime_Call{Call: _e.mock.On("GetCreationTime")}
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) Run(run func()) *MockExpectation_GetCreationTime_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run()
+ })
+ return _c
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) Return(_a0 time.Time) *MockExpectation_GetCreationTime_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) RunAndReturn(run func() time.Time) *MockExpectation_GetCreationTime_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
// GetExpiration provides a mock function with given fields:
func (_m *MockExpectation) GetExpiration() time.Time {
ret := _m.Called()
diff --git a/plc4go/spi/mock_Expectation_test.go b/plc4go/spi/mock_Expectation_test.go
index c2f5369299..ede1ebbe16 100644
--- a/plc4go/spi/mock_Expectation_test.go
+++ b/plc4go/spi/mock_Expectation_test.go
@@ -127,6 +127,47 @@ func (_c *MockExpectation_GetContext_Call) RunAndReturn(run func() context.Conte
return _c
}
+// GetCreationTime provides a mock function with given fields:
+func (_m *MockExpectation) GetCreationTime() time.Time {
+ ret := _m.Called()
+
+ var r0 time.Time
+ if rf, ok := ret.Get(0).(func() time.Time); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(time.Time)
+ }
+
+ return r0
+}
+
+// MockExpectation_GetCreationTime_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCreationTime'
+type MockExpectation_GetCreationTime_Call struct {
+ *mock.Call
+}
+
+// GetCreationTime is a helper method to define mock.On call
+func (_e *MockExpectation_Expecter) GetCreationTime() *MockExpectation_GetCreationTime_Call {
+ return &MockExpectation_GetCreationTime_Call{Call: _e.mock.On("GetCreationTime")}
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) Run(run func()) *MockExpectation_GetCreationTime_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run()
+ })
+ return _c
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) Return(_a0 time.Time) *MockExpectation_GetCreationTime_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockExpectation_GetCreationTime_Call) RunAndReturn(run func() time.Time) *MockExpectation_GetCreationTime_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
// GetExpiration provides a mock function with given fields:
func (_m *MockExpectation) GetExpiration() time.Time {
ret := _m.Called()
diff --git a/plc4go/spi/utils/Errors.go b/plc4go/spi/utils/Errors.go
index 339fb4b3c8..697dffaeee 100644
--- a/plc4go/spi/utils/Errors.go
+++ b/plc4go/spi/utils/Errors.go
@@ -91,7 +91,7 @@ func NewTimeoutError(timeout time.Duration) TimeoutError {
}
func (t TimeoutError) Error() string {
- return fmt.Sprintf("got timeout after %v", t.timeout)
+ return fmt.Sprintf("got timeout after %s", t.timeout)
}
func (t TimeoutError) Is(target error) bool {
[plc4x] 05/07: feat(plc4go/spi): time custom message handling and don't block when default message channel is not being drained
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 8e0f2193356967a11df6e3e0b160b458aa8152b9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:05:29 2023 +0200
feat(plc4go/spi): time custom message handling and don't block when default message channel is not being drained
+ set buffer to 100 to allow for a bit latency
---
plc4go/spi/default/DefaultCodec.go | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 91c48d0ce9..78c3742f50 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -104,7 +104,7 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
return &defaultCodec{
DefaultCodecRequirements: defaultCodecRequirements,
transportInstance: transportInstance,
- defaultIncomingMessageChannel: make(chan spi.Message),
+ defaultIncomingMessageChannel: make(chan spi.Message, 100),
expectations: []spi.Expectation{},
customMessageHandling: customMessageHandler,
log: logger,
@@ -329,10 +329,14 @@ mainLoop:
time.Sleep(time.Millisecond * 10)
continue mainLoop
}
+ workerLog.Trace().Msgf("got message:\n%s", message)
if m.customMessageHandling != nil {
workerLog.Trace().Msg("Executing custom handling")
- if m.customMessageHandling(codec, message) {
+ start := time.Now()
+ handled := m.customMessageHandling(codec, message)
+ workerLog.Trace().Msgf("custom handling took %s", time.Since(start))
+ if handled {
workerLog.Trace().Msg("Custom handling handled the message")
continue mainLoop
}
@@ -352,12 +356,9 @@ mainLoop:
}
func (m *defaultCodec) passToDefaultIncomingMessageChannel(workerLog zerolog.Logger, message spi.Message) {
- timeout := time.NewTimer(time.Millisecond * 40)
- defer utils.CleanupTimer(timeout)
select {
case m.defaultIncomingMessageChannel <- message:
- case <-timeout.C:
- timeout.Stop()
+ default:
workerLog.Warn().Msgf("Message discarded\n%s", message)
}
}
[plc4x] 01/07: test(plc4go/spi): only set timeformat to nano when flag is set
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 40cd416bca8f9436d348cd8805db0dbcf424ec67
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 15:21:11 2023 +0200
test(plc4go/spi): only set timeformat to nano when flag is set
---
plc4go/spi/testutils/TestUtils.go | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/plc4go/spi/testutils/TestUtils.go b/plc4go/spi/testutils/TestUtils.go
index 3829067a1c..e3937a7ff3 100644
--- a/plc4go/spi/testutils/TestUtils.go
+++ b/plc4go/spi/testutils/TestUtils.go
@@ -140,7 +140,9 @@ func ProduceTestingLogger(t *testing.T) zerolog.Logger {
},
func(w *zerolog.ConsoleWriter) {
- w.TimeFormat = time.StampNano
+ if highLogPrecision {
+ w.TimeFormat = time.StampNano
+ }
},
),
)
[plc4x] 02/07: test(plc4go/spi): ensure no global logger is set
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit ade41801b72fc437fe3925f39102b30ae4162748
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 16:08:40 2023 +0200
test(plc4go/spi): ensure no global logger is set
---
plc4go/spi/pool/executor_test.go | 6 ++----
plc4go/spi/transactions/RequestTransactionManager_test.go | 5 +----
plc4go/spi/transports/utils/TransportLogger_test.go | 3 +--
3 files changed, 4 insertions(+), 10 deletions(-)
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index 81dd7485f1..ee126bb989 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -74,7 +74,6 @@ func Test_executor_IsRunning(t *testing.T) {
queueDepth int
workItems chan workItem
traceWorkers bool
- log zerolog.Logger
}
tests := []struct {
name string
@@ -94,7 +93,7 @@ func Test_executor_IsRunning(t *testing.T) {
queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
- log: tt.fields.log,
+ log: produceTestingLogger(t),
}
assert.Equalf(t, tt.want, e.IsRunning(), "IsRunning()")
})
@@ -419,7 +418,6 @@ func Test_executor_String(t *testing.T) {
queueDepth int
workItems chan workItem
traceWorkers bool
- log zerolog.Logger
}
tests := []struct {
name string
@@ -472,7 +470,7 @@ func Test_executor_String(t *testing.T) {
queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
- log: tt.fields.log,
+ log: produceTestingLogger(t),
}
assert.Equalf(t, tt.want, e.String(), "String()")
})
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 3ebe3d2367..4113411419 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -260,7 +260,6 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
currentTransactionId int32
workLog list.List
executor pool.Executor
- log zerolog.Logger
}
type args struct {
transaction *requestTransaction
@@ -279,8 +278,6 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
transaction: &requestTransaction{},
},
mockSetup: func(t *testing.T, fields *fields, args *args) {
- fields.log = testutils.ProduceTestingLogger(t)
-
completionFuture := NewMockCompletionFuture(t)
expect := completionFuture.EXPECT()
expect.Cancel(true, nil).Return()
@@ -300,7 +297,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
- log: tt.fields.log,
+ log: testutils.ProduceTestingLogger(t),
}
if err := r.failRequest(tt.args.transaction, tt.args.err); (err != nil) != tt.wantErr {
t.Errorf("failRequest() error = %v, wantErr %v", err, tt.wantErr)
diff --git a/plc4go/spi/transports/utils/TransportLogger_test.go b/plc4go/spi/transports/utils/TransportLogger_test.go
index a88f1aa988..fd341dd1ab 100644
--- a/plc4go/spi/transports/utils/TransportLogger_test.go
+++ b/plc4go/spi/transports/utils/TransportLogger_test.go
@@ -80,7 +80,6 @@ func TestTransportLogger_Close(t1 *testing.T) {
func TestTransportLogger_Read(t1 *testing.T) {
type fields struct {
source io.ReadWriteCloser
- log zerolog.Logger
}
type args struct {
p []byte
@@ -98,7 +97,7 @@ func TestTransportLogger_Read(t1 *testing.T) {
t1.Run(tt.name, func(t1 *testing.T) {
t := &TransportLogger{
source: tt.fields.source,
- log: tt.fields.log,
+ log: testutils.ProduceTestingLogger(t1),
}
got, err := t.Read(tt.args.p)
if (err != nil) != tt.wantErr {
[plc4x] 06/07: feat(plc4go/cbus): improved logging
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 04662cf6d5dc35daa389460cb51e76b32eea6aa7
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:11:12 2023 +0200
feat(plc4go/cbus): improved logging
+ reduced overhead logging
+ added timing information
+ add more traces
---
plc4go/internal/cbus/Browser.go | 35 ++++----
plc4go/internal/cbus/Browser_test.go | 73 +++++++----------
plc4go/internal/cbus/Connection.go | 6 +-
plc4go/internal/cbus/MessageCodec.go | 18 +++--
plc4go/internal/cbus/Reader.go | 152 +++++++++++++++++++----------------
5 files changed, 143 insertions(+), 141 deletions(-)
diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index e9b84afcd1..ee202dec64 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -78,6 +78,8 @@ func (m *Browser) browseUnitInfo(ctx context.Context, interceptor func(result ap
if allUnits {
m.log.Info().Msg("Querying all (available) units")
+ } else {
+ m.log.Debug().Msgf("Querying units\n%s", units)
}
unitLoop:
for _, unit := range units {
@@ -110,10 +112,10 @@ unitLoop:
readRequest, _ := m.connection.ReadRequestBuilder().
AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
Build()
- timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
+ timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 5*time.Second)
m.log.Trace().Msgf("Executing readRequest\n%s\nwith timeout %s", readRequest, timeoutCtx)
requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
- m.log.Trace().Msg("got a response")
+ m.log.Trace().Msgf("got a response\n%s", requestResult)
timeoutCancel()
if err := requestResult.GetErr(); err != nil {
if allUnits || allAttributes {
@@ -152,22 +154,22 @@ unitLoop:
func (m *Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
if unitAddress := query.unitAddress; unitAddress != nil {
return []readWriteModel.UnitAddress{unitAddress}, false, nil
- } else {
- // TODO: check if we still want the option to brute force all addresses
- installedUnitAddressBytes, err := getInstalledUnitAddressBytes(ctx)
- if err != nil {
- return nil, false, errors.New("Unable to get installed uints")
- }
+ }
- var units []readWriteModel.UnitAddress
- for i := 0; i <= 0xFF; i++ {
- unitAddressByte := byte(i)
- if _, ok := installedUnitAddressBytes[unitAddressByte]; ok {
- units = append(units, readWriteModel.NewUnitAddress(unitAddressByte))
- }
+ // TODO: check if we still want the option to brute force all addresses
+ installedUnitAddressBytes, err := getInstalledUnitAddressBytes(ctx)
+ if err != nil {
+ return nil, false, errors.New("Unable to get installed uints")
+ }
+
+ var units []readWriteModel.UnitAddress
+ for i := 0; i <= 0xFF; i++ {
+ unitAddressByte := byte(i)
+ if _, ok := installedUnitAddressBytes[unitAddressByte]; ok {
+ units = append(units, readWriteModel.NewUnitAddress(unitAddressByte))
}
- return units, true, nil
}
+ return units, true, nil
}
func (m *Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
@@ -292,7 +294,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
AddTagAddress("installationMMI", "status/binary/0xFF").
Build()
if err != nil {
- return nil, errors.Wrap(err, "Error getting the installation MMI")
+ return nil, errors.Wrap(err, "Error building the installation MMI")
}
readCtx, readCtxCancel := context.WithTimeout(ctx, time.Second*2)
defer readCtxCancel()
@@ -306,6 +308,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
}
}()
defer readCtxCancel()
+ m.log.Debug().Msgf("sending read request\n%s", readRequest)
readRequestResult := <-readRequest.ExecuteWithContext(readCtx)
if err := readRequestResult.GetErr(); err != nil {
m.log.Warn().Err(err).Msg("Error reading the mmi")
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index a4fdfb4324..d9438a7659 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -21,8 +21,8 @@ package cbus
import (
"context"
+ "encoding/hex"
"fmt"
- "github.com/rs/zerolog"
"net/url"
"sync"
"sync/atomic"
@@ -103,13 +103,13 @@ func TestBrowser_BrowseQuery(t *testing.T) {
INTERFACE_OPTIONS_3
INTERFACE_OPTIONS_1_PUN
INTERFACE_OPTIONS_1
- MANUFACTURER
DONE
)
currentState := atomic.Value{}
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -141,24 +141,17 @@ func TestBrowser_BrowseQuery(t *testing.T) {
t.Log("Dispatching interface 1 echo and confirm???")
transportInstance.FillReadBuffer([]byte("@A3300079\r"))
transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
- currentState.Store(MANUFACTURER)
- case MANUFACTURER:
- t.Log("Dispatching manufacturer")
- transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
currentState.Store(DONE)
case DONE:
t.Log("Connection dance done")
- dispatchWg := sync.WaitGroup{}
- dispatchWg.Add(1)
- t.Cleanup(dispatchWg.Wait)
- go func() {
- defer dispatchWg.Done()
- time.Sleep(200 * time.Millisecond)
- t.Log("Dispatching 3 MMI segments")
- transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
- }()
+
+ t.Log("Dispatching 3 MMI segments")
+ transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+
+ t.Log("Dispatching manufacturer")
+ transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
}
})
err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
@@ -222,7 +215,6 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
DefaultBrowser _default.DefaultBrowser
connection plc4go.PlcConnection
sequenceCounter uint8
- log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -267,13 +259,13 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
INTERFACE_OPTIONS_3
INTERFACE_OPTIONS_1_PUN
INTERFACE_OPTIONS_1
- MANUFACTURER
DONE
)
currentState := atomic.Value{}
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -305,24 +297,17 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
t.Log("Dispatching interface 1 echo and confirm???")
transportInstance.FillReadBuffer([]byte("@A3300079\r"))
transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
- currentState.Store(MANUFACTURER)
- case MANUFACTURER:
- t.Log("Dispatching manufacturer")
- transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
currentState.Store(DONE)
case DONE:
t.Log("Connection dance done")
- dispatchWg := sync.WaitGroup{}
- dispatchWg.Add(1)
- t.Cleanup(dispatchWg.Wait)
- go func() {
- defer dispatchWg.Done()
- time.Sleep(200 * time.Millisecond)
- t.Log("Dispatching 3 MMI segments")
- transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
- }()
+
+ t.Log("Dispatching 3 MMI segments")
+ transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+
+ t.Log("Dispatching manufacturer")
+ transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
}
})
err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
@@ -363,12 +348,13 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields)
+ t.Log("Setup done")
}
m := &Browser{
DefaultBrowser: tt.fields.DefaultBrowser,
connection: tt.fields.connection,
sequenceCounter: tt.fields.sequenceCounter,
- log: tt.fields.log,
+ log: testutils.ProduceTestingLogger(t),
}
gotResponseCode, gotQueryResults := m.browseUnitInfo(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
assert.Equalf(t, tt.wantResponseCode, gotResponseCode, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
@@ -552,6 +538,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -586,17 +573,11 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
currentState.Store(DONE)
case DONE:
t.Log("Connection dance done")
- dispatchWg := sync.WaitGroup{}
- dispatchWg.Add(1)
- t.Cleanup(dispatchWg.Wait)
- go func() {
- defer dispatchWg.Done()
- time.Sleep(200 * time.Millisecond)
- t.Log("Dispatching 3 MMI segments")
- transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
- }()
+
+ t.Log("Dispatching 3 MMI segments")
+ transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
}
})
err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 5b4f767815..b23c76b986 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -285,16 +285,16 @@ func (c *Connection) startSubscriptionHandler() {
mmiLogger.Debug().Msg("default MMI started")
for c.IsConnected() {
for calReply := range c.messageCodec.monitoredMMIs {
- mmiLogger.Trace().Msgf("got a MMI:\n%s", calReply)
+ mmiLogger.Trace().Msgf("got a MMI")
handled := false
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredMMI(calReply); ok {
- mmiLogger.Debug().Msgf("\n%v handled\n%s", subscriber, calReply)
+ mmiLogger.Debug().Msgf("\n%v handled", subscriber)
handled = true
}
}
if !handled {
- mmiLogger.Debug().Msgf("MMI was not handled:\n%s", calReply)
+ mmiLogger.Debug().Msgf("MMI was not handled")
}
}
}
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 831c13ef86..e2856f57c2 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -22,14 +22,14 @@ package cbus
import (
"bufio"
"context"
- "sync"
- "sync/atomic"
-
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
+ "sync"
+ "sync/atomic"
+ "time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
@@ -101,7 +101,7 @@ func (m *MessageCodec) Disconnect() error {
}
func (m *MessageCodec) Send(message spi.Message) error {
- m.log.Trace().Msg("Sending message")
+ m.log.Trace().Msgf("Sending message\n%s", message)
// Cast the message to the correct type of struct
cbusMessage, ok := message.(readWriteModel.CBusMessage)
if !ok {
@@ -329,25 +329,29 @@ lookingForTheEnd:
}
m.log.Debug().Msgf("Parsing %q", sanitizedInput)
ctxForModel := options.GetLoggerContextForModel(context.TODO(), m.log, options.WithPassLoggerToModel(m.passLogToModel))
+ start := time.Now()
cBusMessage, err := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, pciResponse, m.requestContext, m.cbusOptions)
+ m.log.Trace().Msgf("Parsing took %s", time.Since(start))
if err != nil {
m.log.Debug().Err(err).Msg("First Parse Failed")
{ // Try SAL
+ m.log.Trace().Msg("try SAL")
requestContext := readWriteModel.NewRequestContext(false)
cBusMessage, secondErr := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, pciResponse, requestContext, m.cbusOptions)
if secondErr == nil {
- m.log.Trace().Msgf("Parsed message as SAL:\n%s", cBusMessage)
+ m.log.Trace().Msgf("Parsed message as SAL")
return cBusMessage, nil
} else {
m.log.Debug().Err(secondErr).Msg("SAL parse failed too")
}
}
{ // Try MMI
+ m.log.Trace().Msg("try MMI")
requestContext := readWriteModel.NewRequestContext(false)
cbusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
cBusMessage, secondErr := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, true, requestContext, cbusOptions)
if secondErr == nil {
- m.log.Trace().Msgf("Parsed message as MMI:\n%s", cBusMessage)
+ m.log.Trace().Msg("Parsed message as MMI")
return cBusMessage, nil
} else {
m.log.Debug().Err(secondErr).Msg("CAL parse failed too")
@@ -358,13 +362,11 @@ lookingForTheEnd:
return nil, nil
}
- m.log.Trace().Msgf("Parsed message:\n%s", cBusMessage)
return cBusMessage, nil
}
func extractMMIAndSAL(log zerolog.Logger) _default.CustomMessageHandler {
return func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
- log.Trace().Msgf("Custom handling message:\n%s", message)
switch message := message.(type) {
case readWriteModel.CBusMessageToClientExactly:
switch reply := message.GetReply().(type) {
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 6949369e86..3a7063cb8f 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -127,87 +127,103 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func(transaction transactions.RequestTransaction) {
+ m.log.Trace().Msgf("Transaction getting handled:\n%s", transaction)
m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
})
if err := transaction.AwaitCompletion(ctx); err != nil {
m.log.Warn().Err(err).Msg("Error while awaiting completion")
}
- m.log.Trace().Msg("Finished waiting")
+ m.log.Trace().Msg("Finished waiting for transaction to end")
}
func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
- // Send the over the wire
- m.log.Trace().Msg("Send ")
- if err := m.messageCodec.SendRequest(ctx, messageToSend, func(cbusMessage spi.Message) bool {
- messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
- if !ok {
- return false
- }
- // Check if this errored
- if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
- // This means we must handle this below
- return true
- }
+ // Send the over the wire
+ m.log.Trace().Msg("send over the wire")
+ if deadline, ok := ctx.Deadline(); ok {
+ m.log.Debug().Msgf("Message expires in %s", deadline.Sub(time.Now()))
+ }
+ if err := m.messageCodec.SendRequest(
+ ctx,
+ messageToSend,
+ func(cbusMessage spi.Message) bool {
+ m.log.Trace().Msgf("Checking\n%T", cbusMessage)
+ messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
+ if !ok {
+ m.log.Trace().Msg("Not a message to client")
+ return false
+ }
+ // Check if this errored
+ if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+ // This means we must handle this below
+ m.log.Trace().Msg("It is a error, we will handle it")
+ return true
+ }
- confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
- if !ok {
- return false
- }
- actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
- // TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
- expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
- return actualAlpha == expectedAlpha
- }, func(receivedMessage spi.Message) error {
- // Convert the response into an
- m.log.Trace().Msg("convert response to ")
- messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
- if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
- m.log.Trace().Msg("We got a server failure")
- addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
- return transaction.EndRequest()
- }
- replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
- if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
- var responseCode apiModel.PlcResponseCode
- switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
- responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
- responseCode = apiModel.PlcResponseCode_INVALID_DATA
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
- responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
- responseCode = apiModel.PlcResponseCode_INVALID_DATA
- default:
- return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
+ confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !ok {
+ m.log.Trace().Msg("it is not a confirmation")
+ return false
+ }
+ actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
+ // TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
+ expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
+ m.log.Trace().Msgf("Comparing expected alpha '%c' to actual alpha '%c'", expectedAlpha, actualAlpha)
+ return actualAlpha == expectedAlpha
+ },
+ func(receivedMessage spi.Message) error {
+ // Convert the response into an
+ m.log.Trace().Msgf("convert message: %T", receivedMessage)
+ messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
+ if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+ m.log.Trace().Msg("We got a server failure")
+ addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
+ return transaction.EndRequest()
+ }
+ replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
+ var responseCode apiModel.PlcResponseCode
+ switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
+ responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
+ responseCode = apiModel.PlcResponseCode_INVALID_DATA
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
+ responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
+ responseCode = apiModel.PlcResponseCode_INVALID_DATA
+ default:
+ return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
+ }
+ m.log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
+ addResponseCode(tagName, responseCode)
+ return transaction.EndRequest()
}
- m.log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
- addResponseCode(tagName, responseCode)
- return transaction.EndRequest()
- }
- alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
- // TODO: it could be double confirmed but this is not implemented yet
- embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
- if !ok {
- m.log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
- addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
- return transaction.EndRequest()
- }
+ alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
+ // TODO: it could be double confirmed but this is not implemented yet
+ embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+ if !ok {
+ m.log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+ addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
+ return transaction.EndRequest()
+ }
- m.log.Trace().Msg("Handling confirmed data")
- // TODO: check if we can use a plcValueSerializer
- encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
- if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
- log.Error().Err(err).Msg("error encoding reply")
- addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+ m.log.Trace().Msg("Handling confirmed data")
+ // TODO: check if we can use a plcValueSerializer
+ encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
+ if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
+ log.Error().Err(err).Msg("error encoding reply")
+ addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+ return transaction.EndRequest()
+ }
return transaction.EndRequest()
- }
- return transaction.EndRequest()
- }, func(err error) error {
- addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
- return transaction.FailRequest(err)
- }, time.Second*1); err != nil {
+ },
+ func(err error) error {
+ m.log.Trace().Err(err).Msg("got and error")
+ addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+ return transaction.FailRequest(err)
+ },
+ time.Second*5); err != nil {
m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
[plc4x] 04/07: feat(plc4go/spi): improve logging for request transaction
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 65796b03e9dbba26ab533ecafb3703e727f15d04
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:04:20 2023 +0200
feat(plc4go/spi): improve logging for request transaction
---
plc4go/spi/transactions/RequestTransaction.go | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/plc4go/spi/transactions/RequestTransaction.go b/plc4go/spi/transactions/RequestTransaction.go
index 027df85cc1..18f88c7293 100644
--- a/plc4go/spi/transactions/RequestTransaction.go
+++ b/plc4go/spi/transactions/RequestTransaction.go
@@ -105,16 +105,17 @@ func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
if t.operation != nil {
t.transactionLog.Warn().Msg("Operation already set")
}
- t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
+ t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Submission")
t.operation = func() {
- t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
+ t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Start operation")
operation(t)
- t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
+ t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Completed operation")
}
t.parent.submitTransaction(t)
}
func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
+ t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Awaiting completion")
timeout, cancelFunc := context.WithTimeout(ctx, time.Minute*30) // This is intentionally set very high
defer cancelFunc()
for t.completionFuture == nil {
@@ -125,6 +126,7 @@ func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
}
}
if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
+ t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Errored")
return err
}
stillActive := true
@@ -139,6 +141,7 @@ func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
}
t.parent.runningRequestMutex.RUnlock()
}
+ t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Completed")
return nil
}