You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 16:46:23 UTC

[plc4x] 06/07: feat(plc4go/cbus): improved logging

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 04662cf6d5dc35daa389460cb51e76b32eea6aa7
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:11:12 2023 +0200

    feat(plc4go/cbus): improved logging
    
    + reduced overhead logging
    + added timing information
    + add more traces
---
 plc4go/internal/cbus/Browser.go      |  35 ++++----
 plc4go/internal/cbus/Browser_test.go |  73 +++++++----------
 plc4go/internal/cbus/Connection.go   |   6 +-
 plc4go/internal/cbus/MessageCodec.go |  18 +++--
 plc4go/internal/cbus/Reader.go       | 152 +++++++++++++++++++----------------
 5 files changed, 143 insertions(+), 141 deletions(-)

diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index e9b84afcd1..ee202dec64 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -78,6 +78,8 @@ func (m *Browser) browseUnitInfo(ctx context.Context, interceptor func(result ap
 
 	if allUnits {
 		m.log.Info().Msg("Querying all (available) units")
+	} else {
+		m.log.Debug().Msgf("Querying units\n%s", units)
 	}
 unitLoop:
 	for _, unit := range units {
@@ -110,10 +112,10 @@ unitLoop:
 			readRequest, _ := m.connection.ReadRequestBuilder().
 				AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
 				Build()
-			timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
+			timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 5*time.Second)
 			m.log.Trace().Msgf("Executing readRequest\n%s\nwith timeout %s", readRequest, timeoutCtx)
 			requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
-			m.log.Trace().Msg("got a response")
+			m.log.Trace().Msgf("got a response\n%s", requestResult)
 			timeoutCancel()
 			if err := requestResult.GetErr(); err != nil {
 				if allUnits || allAttributes {
@@ -152,22 +154,22 @@ unitLoop:
 func (m *Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
 	if unitAddress := query.unitAddress; unitAddress != nil {
 		return []readWriteModel.UnitAddress{unitAddress}, false, nil
-	} else {
-		// TODO: check if we still want the option to brute force all addresses
-		installedUnitAddressBytes, err := getInstalledUnitAddressBytes(ctx)
-		if err != nil {
-			return nil, false, errors.New("Unable to get installed uints")
-		}
+	}
 
-		var units []readWriteModel.UnitAddress
-		for i := 0; i <= 0xFF; i++ {
-			unitAddressByte := byte(i)
-			if _, ok := installedUnitAddressBytes[unitAddressByte]; ok {
-				units = append(units, readWriteModel.NewUnitAddress(unitAddressByte))
-			}
+	// TODO: check if we still want the option to brute force all addresses
+	installedUnitAddressBytes, err := getInstalledUnitAddressBytes(ctx)
+	if err != nil {
+		return nil, false, errors.New("Unable to get installed uints")
+	}
+
+	var units []readWriteModel.UnitAddress
+	for i := 0; i <= 0xFF; i++ {
+		unitAddressByte := byte(i)
+		if _, ok := installedUnitAddressBytes[unitAddressByte]; ok {
+			units = append(units, readWriteModel.NewUnitAddress(unitAddressByte))
 		}
-		return units, true, nil
 	}
+	return units, true, nil
 }
 
 func (m *Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
@@ -292,7 +294,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
 		AddTagAddress("installationMMI", "status/binary/0xFF").
 		Build()
 	if err != nil {
-		return nil, errors.Wrap(err, "Error getting the installation MMI")
+		return nil, errors.Wrap(err, "Error building the installation MMI")
 	}
 	readCtx, readCtxCancel := context.WithTimeout(ctx, time.Second*2)
 	defer readCtxCancel()
@@ -306,6 +308,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
 			}
 		}()
 		defer readCtxCancel()
+		m.log.Debug().Msgf("sending read request\n%s", readRequest)
 		readRequestResult := <-readRequest.ExecuteWithContext(readCtx)
 		if err := readRequestResult.GetErr(); err != nil {
 			m.log.Warn().Err(err).Msg("Error reading the mmi")
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index a4fdfb4324..d9438a7659 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -21,8 +21,8 @@ package cbus
 
 import (
 	"context"
+	"encoding/hex"
 	"fmt"
-	"github.com/rs/zerolog"
 	"net/url"
 	"sync"
 	"sync/atomic"
@@ -103,13 +103,13 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 					INTERFACE_OPTIONS_3
 					INTERFACE_OPTIONS_1_PUN
 					INTERFACE_OPTIONS_1
-					MANUFACTURER
 					DONE
 				)
 				currentState := atomic.Value{}
 				currentState.Store(RESET)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -141,24 +141,17 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 						t.Log("Dispatching interface 1 echo and confirm???")
 						transportInstance.FillReadBuffer([]byte("@A3300079\r"))
 						transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
-						currentState.Store(MANUFACTURER)
-					case MANUFACTURER:
-						t.Log("Dispatching manufacturer")
-						transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
-						dispatchWg := sync.WaitGroup{}
-						dispatchWg.Add(1)
-						t.Cleanup(dispatchWg.Wait)
-						go func() {
-							defer dispatchWg.Done()
-							time.Sleep(200 * time.Millisecond)
-							t.Log("Dispatching 3 MMI segments")
-							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
-						}()
+
+						t.Log("Dispatching 3 MMI segments")
+						transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+
+						t.Log("Dispatching manufacturer")
+						transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
 					}
 				})
 				err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
@@ -222,7 +215,6 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 		DefaultBrowser  _default.DefaultBrowser
 		connection      plc4go.PlcConnection
 		sequenceCounter uint8
-		log             zerolog.Logger
 	}
 	type args struct {
 		ctx         context.Context
@@ -267,13 +259,13 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 					INTERFACE_OPTIONS_3
 					INTERFACE_OPTIONS_1_PUN
 					INTERFACE_OPTIONS_1
-					MANUFACTURER
 					DONE
 				)
 				currentState := atomic.Value{}
 				currentState.Store(RESET)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -305,24 +297,17 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 						t.Log("Dispatching interface 1 echo and confirm???")
 						transportInstance.FillReadBuffer([]byte("@A3300079\r"))
 						transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
-						currentState.Store(MANUFACTURER)
-					case MANUFACTURER:
-						t.Log("Dispatching manufacturer")
-						transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
-						dispatchWg := sync.WaitGroup{}
-						dispatchWg.Add(1)
-						t.Cleanup(dispatchWg.Wait)
-						go func() {
-							defer dispatchWg.Done()
-							time.Sleep(200 * time.Millisecond)
-							t.Log("Dispatching 3 MMI segments")
-							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
-						}()
+
+						t.Log("Dispatching 3 MMI segments")
+						transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+
+						t.Log("Dispatching manufacturer")
+						transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
 					}
 				})
 				err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
@@ -363,12 +348,13 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			if tt.setup != nil {
 				tt.setup(t, &tt.fields)
+				t.Log("Setup done")
 			}
 			m := &Browser{
 				DefaultBrowser:  tt.fields.DefaultBrowser,
 				connection:      tt.fields.connection,
 				sequenceCounter: tt.fields.sequenceCounter,
-				log:             tt.fields.log,
+				log:             testutils.ProduceTestingLogger(t),
 			}
 			gotResponseCode, gotQueryResults := m.browseUnitInfo(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
 			assert.Equalf(t, tt.wantResponseCode, gotResponseCode, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
@@ -552,6 +538,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
 				currentState.Store(RESET)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -586,17 +573,11 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
-						dispatchWg := sync.WaitGroup{}
-						dispatchWg.Add(1)
-						t.Cleanup(dispatchWg.Wait)
-						go func() {
-							defer dispatchWg.Done()
-							time.Sleep(200 * time.Millisecond)
-							t.Log("Dispatching 3 MMI segments")
-							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
-							transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
-						}()
+
+						t.Log("Dispatching 3 MMI segments")
+						transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+						transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
 					}
 				})
 				err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 5b4f767815..b23c76b986 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -285,16 +285,16 @@ func (c *Connection) startSubscriptionHandler() {
 		mmiLogger.Debug().Msg("default MMI started")
 		for c.IsConnected() {
 			for calReply := range c.messageCodec.monitoredMMIs {
-				mmiLogger.Trace().Msgf("got a MMI:\n%s", calReply)
+				mmiLogger.Trace().Msgf("got a MMI")
 				handled := false
 				for _, subscriber := range c.subscribers {
 					if ok := subscriber.handleMonitoredMMI(calReply); ok {
-						mmiLogger.Debug().Msgf("\n%v handled\n%s", subscriber, calReply)
+						mmiLogger.Debug().Msgf("\n%v handled", subscriber)
 						handled = true
 					}
 				}
 				if !handled {
-					mmiLogger.Debug().Msgf("MMI was not handled:\n%s", calReply)
+					mmiLogger.Debug().Msgf("MMI was not handled")
 				}
 			}
 		}
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 831c13ef86..e2856f57c2 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -22,14 +22,14 @@ package cbus
 import (
 	"bufio"
 	"context"
-	"sync"
-	"sync/atomic"
-
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
+	"sync"
+	"sync/atomic"
+	"time"
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
@@ -101,7 +101,7 @@ func (m *MessageCodec) Disconnect() error {
 }
 
 func (m *MessageCodec) Send(message spi.Message) error {
-	m.log.Trace().Msg("Sending message")
+	m.log.Trace().Msgf("Sending message\n%s", message)
 	// Cast the message to the correct type of struct
 	cbusMessage, ok := message.(readWriteModel.CBusMessage)
 	if !ok {
@@ -329,25 +329,29 @@ lookingForTheEnd:
 	}
 	m.log.Debug().Msgf("Parsing %q", sanitizedInput)
 	ctxForModel := options.GetLoggerContextForModel(context.TODO(), m.log, options.WithPassLoggerToModel(m.passLogToModel))
+	start := time.Now()
 	cBusMessage, err := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, pciResponse, m.requestContext, m.cbusOptions)
+	m.log.Trace().Msgf("Parsing took %s", time.Since(start))
 	if err != nil {
 		m.log.Debug().Err(err).Msg("First Parse Failed")
 		{ // Try SAL
+			m.log.Trace().Msg("try SAL")
 			requestContext := readWriteModel.NewRequestContext(false)
 			cBusMessage, secondErr := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, pciResponse, requestContext, m.cbusOptions)
 			if secondErr == nil {
-				m.log.Trace().Msgf("Parsed message as SAL:\n%s", cBusMessage)
+				m.log.Trace().Msgf("Parsed message as SAL")
 				return cBusMessage, nil
 			} else {
 				m.log.Debug().Err(secondErr).Msg("SAL parse failed too")
 			}
 		}
 		{ // Try MMI
+			m.log.Trace().Msg("try MMI")
 			requestContext := readWriteModel.NewRequestContext(false)
 			cbusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
 			cBusMessage, secondErr := readWriteModel.CBusMessageParse(ctxForModel, sanitizedInput, true, requestContext, cbusOptions)
 			if secondErr == nil {
-				m.log.Trace().Msgf("Parsed message as MMI:\n%s", cBusMessage)
+				m.log.Trace().Msg("Parsed message as MMI")
 				return cBusMessage, nil
 			} else {
 				m.log.Debug().Err(secondErr).Msg("CAL parse failed too")
@@ -358,13 +362,11 @@ lookingForTheEnd:
 		return nil, nil
 	}
 
-	m.log.Trace().Msgf("Parsed message:\n%s", cBusMessage)
 	return cBusMessage, nil
 }
 
 func extractMMIAndSAL(log zerolog.Logger) _default.CustomMessageHandler {
 	return func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
-		log.Trace().Msgf("Custom handling message:\n%s", message)
 		switch message := message.(type) {
 		case readWriteModel.CBusMessageToClientExactly:
 			switch reply := message.GetReply().(type) {
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 6949369e86..3a7063cb8f 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -127,87 +127,103 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS
 	// Start a new request-transaction (Is ended in the response-handler)
 	transaction := m.tm.StartTransaction()
 	transaction.Submit(func(transaction transactions.RequestTransaction) {
+		m.log.Trace().Msgf("Transaction getting handled:\n%s", transaction)
 		m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
 	})
 	if err := transaction.AwaitCompletion(ctx); err != nil {
 		m.log.Warn().Err(err).Msg("Error while awaiting completion")
 	}
-	m.log.Trace().Msg("Finished waiting")
+	m.log.Trace().Msg("Finished waiting for transaction to end")
 }
 
 func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
-	// Send the  over the wire
-	m.log.Trace().Msg("Send ")
-	if err := m.messageCodec.SendRequest(ctx, messageToSend, func(cbusMessage spi.Message) bool {
-		messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
-		if !ok {
-			return false
-		}
-		// Check if this errored
-		if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-			// This means we must handle this below
-			return true
-		}
+	// Send the over the wire
+	m.log.Trace().Msg("send over the wire")
+	if deadline, ok := ctx.Deadline(); ok {
+		m.log.Debug().Msgf("Message expires in %s", deadline.Sub(time.Now()))
+	}
+	if err := m.messageCodec.SendRequest(
+		ctx,
+		messageToSend,
+		func(cbusMessage spi.Message) bool {
+			m.log.Trace().Msgf("Checking\n%T", cbusMessage)
+			messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
+			if !ok {
+				m.log.Trace().Msg("Not a message to client")
+				return false
+			}
+			// Check if this errored
+			if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+				// This means we must handle this below
+				m.log.Trace().Msg("It is a error, we will handle it")
+				return true
+			}
 
-		confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-		if !ok {
-			return false
-		}
-		actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
-		// TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
-		expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
-		return actualAlpha == expectedAlpha
-	}, func(receivedMessage spi.Message) error {
-		// Convert the response into an
-		m.log.Trace().Msg("convert response to ")
-		messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
-		if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-			m.log.Trace().Msg("We got a server failure")
-			addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
-			return transaction.EndRequest()
-		}
-		replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-		if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
-			var responseCode apiModel.PlcResponseCode
-			switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
-			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
-				responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
-			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
-				responseCode = apiModel.PlcResponseCode_INVALID_DATA
-			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
-				responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
-			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
-				responseCode = apiModel.PlcResponseCode_INVALID_DATA
-			default:
-				return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
+			confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+			if !ok {
+				m.log.Trace().Msg("it is not a confirmation")
+				return false
+			}
+			actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
+			// TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
+			expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
+			m.log.Trace().Msgf("Comparing expected alpha '%c' to actual alpha '%c'", expectedAlpha, actualAlpha)
+			return actualAlpha == expectedAlpha
+		},
+		func(receivedMessage spi.Message) error {
+			// Convert the response into an
+			m.log.Trace().Msgf("convert message: %T", receivedMessage)
+			messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
+			if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+				m.log.Trace().Msg("We got a server failure")
+				addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
+				return transaction.EndRequest()
+			}
+			replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+			if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
+				var responseCode apiModel.PlcResponseCode
+				switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
+					responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
+					responseCode = apiModel.PlcResponseCode_INVALID_DATA
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
+					responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
+					responseCode = apiModel.PlcResponseCode_INVALID_DATA
+				default:
+					return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
+				}
+				m.log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
+				addResponseCode(tagName, responseCode)
+				return transaction.EndRequest()
 			}
-			m.log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
-			addResponseCode(tagName, responseCode)
-			return transaction.EndRequest()
-		}
 
-		alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
-		// TODO: it could be double confirmed but this is not implemented yet
-		embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
-		if !ok {
-			m.log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
-			addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
-			return transaction.EndRequest()
-		}
+			alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
+			// TODO: it could be double confirmed but this is not implemented yet
+			embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+			if !ok {
+				m.log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+				addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
+				return transaction.EndRequest()
+			}
 
-		m.log.Trace().Msg("Handling confirmed data")
-		// TODO: check if we can use a plcValueSerializer
-		encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
-		if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
-			log.Error().Err(err).Msg("error encoding reply")
-			addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+			m.log.Trace().Msg("Handling confirmed data")
+			// TODO: check if we can use a plcValueSerializer
+			encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
+			if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
+				log.Error().Err(err).Msg("error encoding reply")
+				addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+				return transaction.EndRequest()
+			}
 			return transaction.EndRequest()
-		}
-		return transaction.EndRequest()
-	}, func(err error) error {
-		addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
-		return transaction.FailRequest(err)
-	}, time.Second*1); err != nil {
+		},
+		func(err error) error {
+			m.log.Trace().Err(err).Msg("got and error")
+			addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+			return transaction.FailRequest(err)
+		},
+		time.Second*5); err != nil {
 		m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
 		addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
 		if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {