You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/05 11:00:35 UTC

[plc4x] 03/03: test(plc4go/cbus): add test for Reader sendMessageOverTheWire

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 8511fdc88adfedad0fb55e5eab4be09527bd9d55
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri May 5 13:00:18 2023 +0200

    test(plc4go/cbus): add test for Reader sendMessageOverTheWire
---
 plc4go/internal/cbus/Reader.go      |  16 +-
 plc4go/internal/cbus/Reader_test.go | 825 ++++++++++++++++++++++++++++++++++++
 2 files changed, 832 insertions(+), 9 deletions(-)

diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 967ec3ae13..24c1502e16 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -124,11 +124,7 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS
 func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
 	// Send the  over the wire
 	log.Trace().Msg("Send ")
-	if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
-		cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
-		if !ok {
-			return false
-		}
+	if err := m.messageCodec.SendRequest(ctx, messageToSend, func(cbusMessage spi.Message) bool {
 		messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
 		if !ok {
 			return false
@@ -143,7 +139,10 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
 		if !ok {
 			return false
 		}
-		return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
+		actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
+		// TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
+		expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
+		return actualAlpha == expectedAlpha
 	}, func(receivedMessage spi.Message) error {
 		defer func(transaction spi.RequestTransaction) {
 			// This is just to make sure we don't forget to close the transaction here
@@ -151,8 +150,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
 		}(transaction)
 		// Convert the response into an
 		log.Trace().Msg("convert response to ")
-		cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
-		messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
+		messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
 		if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
 			log.Trace().Msg("We got a server failure")
 			addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
@@ -200,7 +198,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
 	}, time.Second*1); err != nil {
 		log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
 		addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
-		if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
+		if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
 			log.Debug().Err(err).Msg("Error failing request")
 		}
 	}
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index 7bd2932a08..855a0c3240 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -22,6 +22,7 @@ package cbus
 import (
 	"context"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+	apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	spiModel "github.com/apache/plc4x/plc4go/spi/model"
@@ -29,6 +30,7 @@ import (
 	"github.com/stretchr/testify/assert"
 	"net/url"
 	"strings"
+	"sync"
 	"sync/atomic"
 	"testing"
 	"time"
@@ -365,3 +367,826 @@ func TestReader_readSync(t *testing.T) {
 		})
 	}
 }
+
+type _TestReader_sendMessageOverTheWire_Transaction struct {
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) String() string {
+	return "_TestReader_sendMessageOverTheWire_Transaction"
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) FailRequest(err error) error {
+	return err
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) EndRequest() error {
+	return nil
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) Submit(operation spi.RequestTransactionRunnable) {
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) AwaitCompletion(ctx context.Context) error {
+	return nil
+}
+
+func TestReader_sendMessageOverTheWire(t *testing.T) {
+	type fields struct {
+		alphaGenerator *AlphaGenerator
+		messageCodec   *MessageCodec
+		tm             spi.RequestTransactionManager
+	}
+	type args struct {
+		ctx             context.Context
+		transaction     spi.RequestTransaction
+		messageToSend   readWriteModel.CBusMessage
+		addResponseCode func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode)
+		tagName         string
+		addPlcValue     func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue)
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		args   args
+		wg     *sync.WaitGroup
+	}{
+		{
+			name: "Send message empty message",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction:   _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: nil,
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_INTERNAL_ERROR, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with message to client",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("@1A2001\r@"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestReset(
+						readWriteModel.RequestType_RESET,
+						nil,
+						0,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_REQUEST_TIMEOUT, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with server error",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("!"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestReset(
+						readWriteModel.RequestType_RESET,
+						nil,
+						0,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with too many retransmissions",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g#\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_REMOTE_ERROR, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with corruption",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g$\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with sync loss",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g%\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_REMOTE_BUSY, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with too long",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g'\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with confirm only",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g.\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_NOT_FOUND, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with ok",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g.890150435F434E49454421\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_OK, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: func() *sync.WaitGroup {
+				wg := &sync.WaitGroup{}
+				wg.Add(1) // We getting an response and a value
+				return wg
+			}(),
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			m := &Reader{
+				alphaGenerator: tt.fields.alphaGenerator,
+				messageCodec:   tt.fields.messageCodec,
+				tm:             tt.fields.tm,
+			}
+			tt.wg.Add(1)
+			m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t, tt.wg), tt.args.tagName, tt.args.addPlcValue(t, tt.wg))
+			t.Log("Waiting now")
+			tt.wg.Wait() // TODO: we need to timeout this too
+			t.Log("Done waiting")
+		})
+	}
+}