You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 16:46:23 UTC
[plc4x] 06/07: feat(plc4go/cbus): improved logging
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 04662cf6d5dc35daa389460cb51e76b32eea6aa7
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:11:12 2023 +0200
feat(plc4go/cbus): improved logging
+ reduced overhead logging
+ added timing information
+ add more traces
---
plc4go/internal/cbus/Browser.go | 35 ++++----
plc4go/internal/cbus/Browser_test.go | 73 +++++++----------
plc4go/internal/cbus/Connection.go | 6 +-
plc4go/internal/cbus/MessageCodec.go | 18 +++--
plc4go/internal/cbus/Reader.go | 152 +++++++++++++++++++----------------
5 files changed, 143 insertions(+), 141 deletions(-)
diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index e9b84afcd1..ee202dec64 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -78,6 +78,8 @@ func (m *Browser) browseUnitInfo(ctx context.Context, interceptor func(result ap
if allUnits {
m.log.Info().Msg("Querying all (available) units")
+ } else {
+ m.log.Debug().Msgf("Querying units\n%s", units)
}
unitLoop:
for _, unit := range units {
@@ -110,10 +112,10 @@ unitLoop:
readRequest, _ := m.connection.ReadRequestBuilder().
AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
Build()
- timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
+ timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 5*time.Second)
m.log.Trace().Msgf("Executing readRequest\n%s\nwith timeout %s", readRequest, timeoutCtx)
requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
- m.log.Trace().Msg("got a response")
+ m.log.Trace().Msgf("got a response\n%s", requestResult)
timeoutCancel()
if err := requestResult.GetErr(); err != nil {
if allUnits || allAttributes {
@@ -152,22 +154,22 @@ unitLoop:
func (m *Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
if unitAddress := query.unitAddress; unitAddress != nil {
return []readWriteModel.UnitAddress{unitAddress}, false, nil
- } else {
- // TODO: check if we still want the option to brute force all addresses
- installedUnitAddressBytes, err := getInstalledUnitAddressBytes(ctx)
- if err != nil {
- return nil, false, errors.New("Unable to get installed uints")
- }
+ }
- var units []readWriteModel.UnitAddress
- for i := 0; i <= 0xFF; i++ {
- unitAddressByte := byte(i)
- if _, ok := installedUnitAddressBytes[unitAddressByte]; ok {
- units = append(units, readWriteModel.NewUnitAddress(unitAddressByte))
- }
+ // TODO: check if we still want the option to brute force all addresses
+ installedUnitAddressBytes, err := getInstalledUnitAddressBytes(ctx)
+ if err != nil {
+ return nil, false, errors.New("Unable to get installed uints")
+ }
+
+ var units []readWriteModel.UnitAddress
+ for i := 0; i <= 0xFF; i++ {
+ unitAddressByte := byte(i)
+ if _, ok := installedUnitAddressBytes[unitAddressByte]; ok {
+ units = append(units, readWriteModel.NewUnitAddress(unitAddressByte))
}
- return units, true, nil
}
+ return units, true, nil
}
func (m *Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
@@ -292,7 +294,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
AddTagAddress("installationMMI", "status/binary/0xFF").
Build()
if err != nil {
- return nil, errors.Wrap(err, "Error getting the installation MMI")
+ return nil, errors.Wrap(err, "Error building the installation MMI")
}
readCtx, readCtxCancel := context.WithTimeout(ctx, time.Second*2)
defer readCtxCancel()
@@ -306,6 +308,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
}
}()
defer readCtxCancel()
+ m.log.Debug().Msgf("sending read request\n%s", readRequest)
readRequestResult := <-readRequest.ExecuteWithContext(readCtx)
if err := readRequestResult.GetErr(); err != nil {
m.log.Warn().Err(err).Msg("Error reading the mmi")
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index a4fdfb4324..d9438a7659 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -21,8 +21,8 @@ package cbus
import (
"context"
+ "encoding/hex"
"fmt"
- "github.com/rs/zerolog"
"net/url"
"sync"
"sync/atomic"
@@ -103,13 +103,13 @@ func TestBrowser_BrowseQuery(t *testing.T) {
INTERFACE_OPTIONS_3
INTERFACE_OPTIONS_1_PUN
INTERFACE_OPTIONS_1
- MANUFACTURER
DONE
)
currentState := atomic.Value{}
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -141,24 +141,17 @@ func TestBrowser_BrowseQuery(t *testing.T) {
t.Log("Dispatching interface 1 echo and confirm???")
transportInstance.FillReadBuffer([]byte("@A3300079\r"))
transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
- currentState.Store(MANUFACTURER)
- case MANUFACTURER:
- t.Log("Dispatching manufacturer")
- transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
currentState.Store(DONE)
case DONE:
t.Log("Connection dance done")
- dispatchWg := sync.WaitGroup{}
- dispatchWg.Add(1)
- t.Cleanup(dispatchWg.Wait)
- go func() {
- defer dispatchWg.Done()
- time.Sleep(200 * time.Millisecond)
- t.Log("Dispatching 3 MMI segments")
- transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
- }()
+
+ t.Log("Dispatching 3 MMI segments")
+ transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+
+ t.Log("Dispatching manufacturer")
+ transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
}
})
err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
@@ -222,7 +215,6 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
DefaultBrowser _default.DefaultBrowser
connection plc4go.PlcConnection
sequenceCounter uint8
- log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -267,13 +259,13 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
INTERFACE_OPTIONS_3
INTERFACE_OPTIONS_1_PUN
INTERFACE_OPTIONS_1
- MANUFACTURER
DONE
)
currentState := atomic.Value{}
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -305,24 +297,17 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
t.Log("Dispatching interface 1 echo and confirm???")
transportInstance.FillReadBuffer([]byte("@A3300079\r"))
transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
- currentState.Store(MANUFACTURER)
- case MANUFACTURER:
- t.Log("Dispatching manufacturer")
- transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
currentState.Store(DONE)
case DONE:
t.Log("Connection dance done")
- dispatchWg := sync.WaitGroup{}
- dispatchWg.Add(1)
- t.Cleanup(dispatchWg.Wait)
- go func() {
- defer dispatchWg.Done()
- time.Sleep(200 * time.Millisecond)
- t.Log("Dispatching 3 MMI segments")
- transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
- }()
+
+ t.Log("Dispatching 3 MMI segments")
+ transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+
+ t.Log("Dispatching manufacturer")
+ transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
}
})
err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
@@ -363,12 +348,13 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields)
+ t.Log("Setup done")
}
m := &Browser{
DefaultBrowser: tt.fields.DefaultBrowser,
connection: tt.fields.connection,
sequenceCounter: tt.fields.sequenceCounter,
- log: tt.fields.log,
+ log: testutils.ProduceTestingLogger(t),
}
gotResponseCode, gotQueryResults := m.browseUnitInfo(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
assert.Equalf(t, tt.wantResponseCode, gotResponseCode, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
@@ -552,6 +538,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -586,17 +573,11 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
currentState.Store(DONE)
case DONE:
t.Log("Connection dance done")
- dispatchWg := sync.WaitGroup{}
- dispatchWg.Add(1)
- t.Cleanup(dispatchWg.Wait)
- go func() {
- defer dispatchWg.Done()
- time.Sleep(200 * time.Millisecond)
- t.Log("Dispatching 3 MMI segments")
- transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
- }()
+
+ t.Log("Dispatching 3 MMI segments")
+ transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
}
})
err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 5b4f767815..b23c76b986 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -285,16 +285,16 @@ func (c *Connection) startSubscriptionHandler() {
mmiLogger.Debug().Msg("default MMI started")
for c.IsConnected() {
for calReply := range c.messageCodec.monitoredMMIs {
- mmiLogger.Trace().Msgf("got a MMI:\n%s", calReply)
+ mmiLogger.Trace().Msgf("got a MMI")
handled := false
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredMMI(calReply); ok {
- mmiLogger.Debug().Msgf("\n%v handled\n%s", subscriber, calReply)
+ mmiLogger.Debug().Msgf("\n%v handled", subscriber)
handled = true
}
}
if !handled {
- mmiLogger.Debug().Msgf("MMI was not handled:\n%s", calReply)
+ mmiLogger.Debug().Msgf("MMI was not handled")
}
}
}
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 831c13ef86..e2856f57c2 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -22,14 +22,14 @@ package cbus
import (
"bufio"
"context"
- "sync"
- "sync/atomic"
-
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
+ "sync"
+ "sync/atomic"
+ "time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
@@ -101,7 +101,7 @@ func (m *MessageCodec) Disconnect() error {
}
func (m *MessageCodec) Send(message spi.Message) error {
- m.log.Trace().Msg("Sending message")
+ m.log.Trace().Msgf("Sending message\n%s", message)
// Cast the message to the correct type of struct
cbusMessage, ok := message.(readWriteModel.CBusMessage)
if !ok {
@@ -329,25 +329,29 @@ lookingForTheEnd:
}
m.log.Debug().Msgf("Parsing %q", sanitizedInput)
ctxForModel := options.GetLoggerContextForModel(context.TODO(), m.log, options.WithPassLoggerToModel(m.passLogToModel))
+ start := time.Now()
cBusMessage, err := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, pciResponse, m.requestContext, m.cbusOptions)
+ m.log.Trace().Msgf("Parsing took %s", time.Since(start))
if err != nil {
m.log.Debug().Err(err).Msg("First Parse Failed")
{ // Try SAL
+ m.log.Trace().Msg("try SAL")
requestContext := readWriteModel.NewRequestContext(false)
cBusMessage, secondErr := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, pciResponse, requestContext, m.cbusOptions)
if secondErr == nil {
- m.log.Trace().Msgf("Parsed message as SAL:\n%s", cBusMessage)
+ m.log.Trace().Msgf("Parsed message as SAL")
return cBusMessage, nil
} else {
m.log.Debug().Err(secondErr).Msg("SAL parse failed too")
}
}
{ // Try MMI
+ m.log.Trace().Msg("try MMI")
requestContext := readWriteModel.NewRequestContext(false)
cbusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
cBusMessage, secondErr := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, true, requestContext, cbusOptions)
if secondErr == nil {
- m.log.Trace().Msgf("Parsed message as MMI:\n%s", cBusMessage)
+ m.log.Trace().Msg("Parsed message as MMI")
return cBusMessage, nil
} else {
m.log.Debug().Err(secondErr).Msg("CAL parse failed too")
@@ -358,13 +362,11 @@ lookingForTheEnd:
return nil, nil
}
- m.log.Trace().Msgf("Parsed message:\n%s", cBusMessage)
return cBusMessage, nil
}
func extractMMIAndSAL(log zerolog.Logger) _default.CustomMessageHandler {
return func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
- log.Trace().Msgf("Custom handling message:\n%s", message)
switch message := message.(type) {
case readWriteModel.CBusMessageToClientExactly:
switch reply := message.GetReply().(type) {
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 6949369e86..3a7063cb8f 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -127,87 +127,103 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func(transaction transactions.RequestTransaction) {
+ m.log.Trace().Msgf("Transaction getting handled:\n%s", transaction)
m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
})
if err := transaction.AwaitCompletion(ctx); err != nil {
m.log.Warn().Err(err).Msg("Error while awaiting completion")
}
- m.log.Trace().Msg("Finished waiting")
+ m.log.Trace().Msg("Finished waiting for transaction to end")
}
func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
- // Send the over the wire
- m.log.Trace().Msg("Send ")
- if err := m.messageCodec.SendRequest(ctx, messageToSend, func(cbusMessage spi.Message) bool {
- messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
- if !ok {
- return false
- }
- // Check if this errored
- if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
- // This means we must handle this below
- return true
- }
+ // Send the over the wire
+ m.log.Trace().Msg("send over the wire")
+ if deadline, ok := ctx.Deadline(); ok {
+ m.log.Debug().Msgf("Message expires in %s", deadline.Sub(time.Now()))
+ }
+ if err := m.messageCodec.SendRequest(
+ ctx,
+ messageToSend,
+ func(cbusMessage spi.Message) bool {
+ m.log.Trace().Msgf("Checking\n%T", cbusMessage)
+ messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
+ if !ok {
+ m.log.Trace().Msg("Not a message to client")
+ return false
+ }
+ // Check if this errored
+ if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+ // This means we must handle this below
+ m.log.Trace().Msg("It is a error, we will handle it")
+ return true
+ }
- confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
- if !ok {
- return false
- }
- actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
- // TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
- expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
- return actualAlpha == expectedAlpha
- }, func(receivedMessage spi.Message) error {
- // Convert the response into an
- m.log.Trace().Msg("convert response to ")
- messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
- if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
- m.log.Trace().Msg("We got a server failure")
- addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
- return transaction.EndRequest()
- }
- replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
- if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
- var responseCode apiModel.PlcResponseCode
- switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
- responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
- responseCode = apiModel.PlcResponseCode_INVALID_DATA
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
- responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
- responseCode = apiModel.PlcResponseCode_INVALID_DATA
- default:
- return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
+ confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !ok {
+ m.log.Trace().Msg("it is not a confirmation")
+ return false
+ }
+ actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
+ // TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
+ expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
+ m.log.Trace().Msgf("Comparing expected alpha '%c' to actual alpha '%c'", expectedAlpha, actualAlpha)
+ return actualAlpha == expectedAlpha
+ },
+ func(receivedMessage spi.Message) error {
+ // Convert the response into an
+ m.log.Trace().Msgf("convert message: %T", receivedMessage)
+ messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
+ if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+ m.log.Trace().Msg("We got a server failure")
+ addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
+ return transaction.EndRequest()
+ }
+ replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
+ var responseCode apiModel.PlcResponseCode
+ switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
+ responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
+ responseCode = apiModel.PlcResponseCode_INVALID_DATA
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
+ responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
+ responseCode = apiModel.PlcResponseCode_INVALID_DATA
+ default:
+ return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
+ }
+ m.log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
+ addResponseCode(tagName, responseCode)
+ return transaction.EndRequest()
}
- m.log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
- addResponseCode(tagName, responseCode)
- return transaction.EndRequest()
- }
- alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
- // TODO: it could be double confirmed but this is not implemented yet
- embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
- if !ok {
- m.log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
- addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
- return transaction.EndRequest()
- }
+ alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
+ // TODO: it could be double confirmed but this is not implemented yet
+ embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+ if !ok {
+ m.log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+ addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
+ return transaction.EndRequest()
+ }
- m.log.Trace().Msg("Handling confirmed data")
- // TODO: check if we can use a plcValueSerializer
- encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
- if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
- log.Error().Err(err).Msg("error encoding reply")
- addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+ m.log.Trace().Msg("Handling confirmed data")
+ // TODO: check if we can use a plcValueSerializer
+ encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
+ if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
+ log.Error().Err(err).Msg("error encoding reply")
+ addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+ return transaction.EndRequest()
+ }
return transaction.EndRequest()
- }
- return transaction.EndRequest()
- }, func(err error) error {
- addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
- return transaction.FailRequest(err)
- }, time.Second*1); err != nil {
+ },
+ func(err error) error {
+ m.log.Trace().Err(err).Msg("got and error")
+ addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+ return transaction.FailRequest(err)
+ },
+ time.Second*5); err != nil {
m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {