You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/05 11:00:35 UTC
[plc4x] 03/03: test(plc4go/cbus): add test for Reader sendMessageOverTheWire
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 8511fdc88adfedad0fb55e5eab4be09527bd9d55
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri May 5 13:00:18 2023 +0200
test(plc4go/cbus): add test for Reader sendMessageOverTheWire
---
plc4go/internal/cbus/Reader.go | 16 +-
plc4go/internal/cbus/Reader_test.go | 825 ++++++++++++++++++++++++++++++++++++
2 files changed, 832 insertions(+), 9 deletions(-)
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 967ec3ae13..24c1502e16 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -124,11 +124,7 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS
func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Send the over the wire
log.Trace().Msg("Send ")
- if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
- cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
- if !ok {
- return false
- }
+ if err := m.messageCodec.SendRequest(ctx, messageToSend, func(cbusMessage spi.Message) bool {
messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
if !ok {
return false
@@ -143,7 +139,10 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
if !ok {
return false
}
- return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
+ actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
+ // TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
+ expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
+ return actualAlpha == expectedAlpha
}, func(receivedMessage spi.Message) error {
defer func(transaction spi.RequestTransaction) {
// This is just to make sure we don't forget to close the transaction here
@@ -151,8 +150,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
}(transaction)
// Convert the response into an
log.Trace().Msg("convert response to ")
- cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
- messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
+ messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
log.Trace().Msg("We got a server failure")
addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
@@ -200,7 +198,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
}, time.Second*1); err != nil {
log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
- if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
+ if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
log.Debug().Err(err).Msg("Error failing request")
}
}
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index 7bd2932a08..855a0c3240 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -22,6 +22,7 @@ package cbus
import (
"context"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+ apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
@@ -29,6 +30,7 @@ import (
"github.com/stretchr/testify/assert"
"net/url"
"strings"
+ "sync"
"sync/atomic"
"testing"
"time"
@@ -365,3 +367,826 @@ func TestReader_readSync(t *testing.T) {
})
}
}
+
+type _TestReader_sendMessageOverTheWire_Transaction struct {
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) String() string {
+ return "_TestReader_sendMessageOverTheWire_Transaction"
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) FailRequest(err error) error {
+ return err
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) EndRequest() error {
+ return nil
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) Submit(operation spi.RequestTransactionRunnable) {
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) AwaitCompletion(ctx context.Context) error {
+ return nil
+}
+
+func TestReader_sendMessageOverTheWire(t *testing.T) {
+ type fields struct {
+ alphaGenerator *AlphaGenerator
+ messageCodec *MessageCodec
+ tm spi.RequestTransactionManager
+ }
+ type args struct {
+ ctx context.Context
+ transaction spi.RequestTransaction
+ messageToSend readWriteModel.CBusMessage
+ addResponseCode func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode)
+ tagName string
+ addPlcValue func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue)
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ wg *sync.WaitGroup
+ }{
+ {
+ name: "Send message empty message",
+ fields: fields{
+ alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+ messageCodec: func() *MessageCodec {
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ codec := NewMessageCodec(transportInstance)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ return codec
+ }(),
+ },
+ args: args{
+ ctx: func() context.Context {
+ timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ t.Cleanup(cancel)
+ return timeout
+ }(),
+ transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+ messageToSend: nil,
+ addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ return func(name string, responseCode apiModel.PlcResponseCode) {
+ t.Logf("Got response code %s for %s", responseCode, name)
+ assert.Equal(t, "horst", name)
+ assert.Equal(t, apiModel.PlcResponseCode_INTERNAL_ERROR, responseCode)
+ wg.Done()
+ }
+ },
+ tagName: "horst",
+ addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ return func(name string, plcValue apiValues.PlcValue) {
+ t.Logf("Got response %s for %s", plcValue, name)
+ wg.Done()
+ }
+ },
+ },
+ wg: &sync.WaitGroup{},
+ },
+ {
+ name: "Send message which responds with message to client",
+ fields: fields{
+ alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+ messageCodec: func() *MessageCodec {
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("@1A2001\r@"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ return codec
+ }(),
+ },
+ args: args{
+ ctx: func() context.Context {
+ timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ t.Cleanup(cancel)
+ return timeout
+ }(),
+ transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+ messageToSend: readWriteModel.NewCBusMessageToServer(
+ readWriteModel.NewRequestReset(
+ readWriteModel.RequestType_RESET,
+ nil,
+ 0,
+ nil,
+ readWriteModel.RequestType_EMPTY,
+ nil,
+ nil,
+ readWriteModel.RequestType_EMPTY,
+ readWriteModel.NewRequestTermination(),
+ nil,
+ ),
+ nil,
+ nil,
+ ),
+ addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ return func(name string, responseCode apiModel.PlcResponseCode) {
+ t.Logf("Got response code %s for %s", responseCode, name)
+ assert.Equal(t, "horst", name)
+ assert.Equal(t, apiModel.PlcResponseCode_REQUEST_TIMEOUT, responseCode)
+ wg.Done()
+ }
+ },
+ tagName: "horst",
+ addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ return func(name string, plcValue apiValues.PlcValue) {
+ t.Logf("Got response %s for %s", plcValue, name)
+ wg.Done()
+ }
+ },
+ },
+ wg: &sync.WaitGroup{},
+ },
+ {
+ name: "Send message which responds with server error",
+ fields: fields{
+ alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+ messageCodec: func() *MessageCodec {
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("!"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ return codec
+ }(),
+ },
+ args: args{
+ ctx: func() context.Context {
+ timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ t.Cleanup(cancel)
+ return timeout
+ }(),
+ transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+ messageToSend: readWriteModel.NewCBusMessageToServer(
+ readWriteModel.NewRequestReset(
+ readWriteModel.RequestType_RESET,
+ nil,
+ 0,
+ nil,
+ readWriteModel.RequestType_EMPTY,
+ nil,
+ nil,
+ readWriteModel.RequestType_EMPTY,
+ readWriteModel.NewRequestTermination(),
+ nil,
+ ),
+ nil,
+ nil,
+ ),
+ addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ return func(name string, responseCode apiModel.PlcResponseCode) {
+ t.Logf("Got response code %s for %s", responseCode, name)
+ assert.Equal(t, "horst", name)
+ assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
+ wg.Done()
+ }
+ },
+ tagName: "horst",
+ addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ return func(name string, plcValue apiValues.PlcValue) {
+ t.Logf("Got response %s for %s", plcValue, name)
+ wg.Done()
+ }
+ },
+ },
+ wg: &sync.WaitGroup{},
+ },
+ {
+ name: "Send message which responds with too many retransmissions",
+ fields: fields{
+ alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+ messageCodec: func() *MessageCodec {
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g#\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ return codec
+ }(),
+ },
+ args: args{
+ ctx: func() context.Context {
+ timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ t.Cleanup(cancel)
+ return timeout
+ }(),
+ transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+ messageToSend: readWriteModel.NewCBusMessageToServer(
+ readWriteModel.NewRequestDirectCommandAccess(
+ readWriteModel.NewCALDataIdentify(
+ readWriteModel.Attribute_CurrentSenseLevels,
+ readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+ nil,
+ nil,
+ ),
+ readWriteModel.NewAlpha('g'),
+ readWriteModel.RequestType_DIRECT_COMMAND,
+ nil,
+ nil,
+ readWriteModel.RequestType_EMPTY,
+ readWriteModel.NewRequestTermination(),
+ nil,
+ ),
+ nil,
+ nil,
+ ),
+ addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ return func(name string, responseCode apiModel.PlcResponseCode) {
+ t.Logf("Got response code %s for %s", responseCode, name)
+ assert.Equal(t, "horst", name)
+ assert.Equal(t, apiModel.PlcResponseCode_REMOTE_ERROR, responseCode)
+ wg.Done()
+ }
+ },
+ tagName: "horst",
+ addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ return func(name string, plcValue apiValues.PlcValue) {
+ t.Logf("Got response %s for %s", plcValue, name)
+ wg.Done()
+ }
+ },
+ },
+ wg: &sync.WaitGroup{},
+ },
+ {
+ name: "Send message which responds with corruption",
+ fields: fields{
+ alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+ messageCodec: func() *MessageCodec {
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g$\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ return codec
+ }(),
+ },
+ args: args{
+ ctx: func() context.Context {
+ timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ t.Cleanup(cancel)
+ return timeout
+ }(),
+ transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+ messageToSend: readWriteModel.NewCBusMessageToServer(
+ readWriteModel.NewRequestDirectCommandAccess(
+ readWriteModel.NewCALDataIdentify(
+ readWriteModel.Attribute_CurrentSenseLevels,
+ readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+ nil,
+ nil,
+ ),
+ readWriteModel.NewAlpha('g'),
+ readWriteModel.RequestType_DIRECT_COMMAND,
+ nil,
+ nil,
+ readWriteModel.RequestType_EMPTY,
+ readWriteModel.NewRequestTermination(),
+ nil,
+ ),
+ nil,
+ nil,
+ ),
+ addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ return func(name string, responseCode apiModel.PlcResponseCode) {
+ t.Logf("Got response code %s for %s", responseCode, name)
+ assert.Equal(t, "horst", name)
+ assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
+ wg.Done()
+ }
+ },
+ tagName: "horst",
+ addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ return func(name string, plcValue apiValues.PlcValue) {
+ t.Logf("Got response %s for %s", plcValue, name)
+ wg.Done()
+ }
+ },
+ },
+ wg: &sync.WaitGroup{},
+ },
+ {
+ name: "Send message which responds with sync loss",
+ fields: fields{
+ alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+ messageCodec: func() *MessageCodec {
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g%\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ return codec
+ }(),
+ },
+ args: args{
+ ctx: func() context.Context {
+ timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ t.Cleanup(cancel)
+ return timeout
+ }(),
+ transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+ messageToSend: readWriteModel.NewCBusMessageToServer(
+ readWriteModel.NewRequestDirectCommandAccess(
+ readWriteModel.NewCALDataIdentify(
+ readWriteModel.Attribute_CurrentSenseLevels,
+ readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+ nil,
+ nil,
+ ),
+ readWriteModel.NewAlpha('g'),
+ readWriteModel.RequestType_DIRECT_COMMAND,
+ nil,
+ nil,
+ readWriteModel.RequestType_EMPTY,
+ readWriteModel.NewRequestTermination(),
+ nil,
+ ),
+ nil,
+ nil,
+ ),
+ addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ return func(name string, responseCode apiModel.PlcResponseCode) {
+ t.Logf("Got response code %s for %s", responseCode, name)
+ assert.Equal(t, "horst", name)
+ assert.Equal(t, apiModel.PlcResponseCode_REMOTE_BUSY, responseCode)
+ wg.Done()
+ }
+ },
+ tagName: "horst",
+ addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ return func(name string, plcValue apiValues.PlcValue) {
+ t.Logf("Got response %s for %s", plcValue, name)
+ wg.Done()
+ }
+ },
+ },
+ wg: &sync.WaitGroup{},
+ },
+ {
+ name: "Send message which responds with too long",
+ fields: fields{
+ alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+ messageCodec: func() *MessageCodec {
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g'\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ return codec
+ }(),
+ },
+ args: args{
+ ctx: func() context.Context {
+ timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ t.Cleanup(cancel)
+ return timeout
+ }(),
+ transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+ messageToSend: readWriteModel.NewCBusMessageToServer(
+ readWriteModel.NewRequestDirectCommandAccess(
+ readWriteModel.NewCALDataIdentify(
+ readWriteModel.Attribute_CurrentSenseLevels,
+ readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+ nil,
+ nil,
+ ),
+ readWriteModel.NewAlpha('g'),
+ readWriteModel.RequestType_DIRECT_COMMAND,
+ nil,
+ nil,
+ readWriteModel.RequestType_EMPTY,
+ readWriteModel.NewRequestTermination(),
+ nil,
+ ),
+ nil,
+ nil,
+ ),
+ addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ return func(name string, responseCode apiModel.PlcResponseCode) {
+ t.Logf("Got response code %s for %s", responseCode, name)
+ assert.Equal(t, "horst", name)
+ assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
+ wg.Done()
+ }
+ },
+ tagName: "horst",
+ addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ return func(name string, plcValue apiValues.PlcValue) {
+ t.Logf("Got response %s for %s", plcValue, name)
+ wg.Done()
+ }
+ },
+ },
+ wg: &sync.WaitGroup{},
+ },
+ {
+ name: "Send message which responds with confirm only",
+ fields: fields{
+ alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+ messageCodec: func() *MessageCodec {
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g.\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ return codec
+ }(),
+ },
+ args: args{
+ ctx: func() context.Context {
+ timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ t.Cleanup(cancel)
+ return timeout
+ }(),
+ transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+ messageToSend: readWriteModel.NewCBusMessageToServer(
+ readWriteModel.NewRequestDirectCommandAccess(
+ readWriteModel.NewCALDataIdentify(
+ readWriteModel.Attribute_CurrentSenseLevels,
+ readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+ nil,
+ nil,
+ ),
+ readWriteModel.NewAlpha('g'),
+ readWriteModel.RequestType_DIRECT_COMMAND,
+ nil,
+ nil,
+ readWriteModel.RequestType_EMPTY,
+ readWriteModel.NewRequestTermination(),
+ nil,
+ ),
+ nil,
+ nil,
+ ),
+ addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ return func(name string, responseCode apiModel.PlcResponseCode) {
+ t.Logf("Got response code %s for %s", responseCode, name)
+ assert.Equal(t, "horst", name)
+ assert.Equal(t, apiModel.PlcResponseCode_NOT_FOUND, responseCode)
+ wg.Done()
+ }
+ },
+ tagName: "horst",
+ addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ return func(name string, plcValue apiValues.PlcValue) {
+ t.Logf("Got response %s for %s", plcValue, name)
+ wg.Done()
+ }
+ },
+ },
+ wg: &sync.WaitGroup{},
+ },
+ {
+ name: "Send message which responds with ok",
+ fields: fields{
+ alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+ messageCodec: func() *MessageCodec {
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g.890150435F434E49454421\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ return nil
+ }
+ return codec
+ }(),
+ },
+ args: args{
+ ctx: func() context.Context {
+ timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ t.Cleanup(cancel)
+ return timeout
+ }(),
+ transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+ messageToSend: readWriteModel.NewCBusMessageToServer(
+ readWriteModel.NewRequestDirectCommandAccess(
+ readWriteModel.NewCALDataIdentify(
+ readWriteModel.Attribute_CurrentSenseLevels,
+ readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+ nil,
+ nil,
+ ),
+ readWriteModel.NewAlpha('g'),
+ readWriteModel.RequestType_DIRECT_COMMAND,
+ nil,
+ nil,
+ readWriteModel.RequestType_EMPTY,
+ readWriteModel.NewRequestTermination(),
+ nil,
+ ),
+ nil,
+ nil,
+ ),
+ addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ return func(name string, responseCode apiModel.PlcResponseCode) {
+ t.Logf("Got response code %s for %s", responseCode, name)
+ assert.Equal(t, "horst", name)
+ assert.Equal(t, apiModel.PlcResponseCode_OK, responseCode)
+ wg.Done()
+ }
+ },
+ tagName: "horst",
+ addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ return func(name string, plcValue apiValues.PlcValue) {
+ t.Logf("Got response %s for %s", plcValue, name)
+ wg.Done()
+ }
+ },
+ },
+ wg: func() *sync.WaitGroup {
+ wg := &sync.WaitGroup{}
+ wg.Add(1) // We getting an response and a value
+ return wg
+ }(),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := &Reader{
+ alphaGenerator: tt.fields.alphaGenerator,
+ messageCodec: tt.fields.messageCodec,
+ tm: tt.fields.tm,
+ }
+ tt.wg.Add(1)
+ m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t, tt.wg), tt.args.tagName, tt.args.addPlcValue(t, tt.wg))
+ t.Log("Waiting now")
+ tt.wg.Wait() // TODO: we need to timeout this too
+ t.Log("Done waiting")
+ })
+ }
+}