You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/05 11:00:32 UTC

[plc4x] branch develop updated (150c2e1072 -> 8511fdc88a)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 150c2e1072 build(deps): bump protobuf-java from 3.22.3 to 3.22.4 (#928)
     new 922c7218ed fix(plc4go/cbus): decrease wait time if it is a confirmed request to server
     new 6b8da79620 fix(plc4go/spi): avoid test transport getting stuck on a endless loop when filling
     new 8511fdc88a test(plc4go/cbus): add test for Reader sendMessageOverTheWire

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/cbus/MessageCodec.go      |   7 +-
 plc4go/internal/cbus/MessageCodec_test.go |  14 +-
 plc4go/internal/cbus/Reader.go            |  16 +-
 plc4go/internal/cbus/Reader_test.go       | 825 ++++++++++++++++++++++++++++++
 plc4go/spi/transports/test/Transport.go   |   4 +-
 5 files changed, 847 insertions(+), 19 deletions(-)


[plc4x] 03/03: test(plc4go/cbus): add test for Reader sendMessageOverTheWire

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 8511fdc88adfedad0fb55e5eab4be09527bd9d55
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri May 5 13:00:18 2023 +0200

    test(plc4go/cbus): add test for Reader sendMessageOverTheWire
---
 plc4go/internal/cbus/Reader.go      |  16 +-
 plc4go/internal/cbus/Reader_test.go | 825 ++++++++++++++++++++++++++++++++++++
 2 files changed, 832 insertions(+), 9 deletions(-)

diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 967ec3ae13..24c1502e16 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -124,11 +124,7 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS
 func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
 	// Send the  over the wire
 	log.Trace().Msg("Send ")
-	if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
-		cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
-		if !ok {
-			return false
-		}
+	if err := m.messageCodec.SendRequest(ctx, messageToSend, func(cbusMessage spi.Message) bool {
 		messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
 		if !ok {
 			return false
@@ -143,7 +139,10 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
 		if !ok {
 			return false
 		}
-		return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
+		actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
+		// TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
+		expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
+		return actualAlpha == expectedAlpha
 	}, func(receivedMessage spi.Message) error {
 		defer func(transaction spi.RequestTransaction) {
 			// This is just to make sure we don't forget to close the transaction here
@@ -151,8 +150,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
 		}(transaction)
 		// Convert the response into an
 		log.Trace().Msg("convert response to ")
-		cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
-		messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
+		messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
 		if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
 			log.Trace().Msg("We got a server failure")
 			addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
@@ -200,7 +198,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
 	}, time.Second*1); err != nil {
 		log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
 		addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
-		if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
+		if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
 			log.Debug().Err(err).Msg("Error failing request")
 		}
 	}
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index 7bd2932a08..855a0c3240 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -22,6 +22,7 @@ package cbus
 import (
 	"context"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+	apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	spiModel "github.com/apache/plc4x/plc4go/spi/model"
@@ -29,6 +30,7 @@ import (
 	"github.com/stretchr/testify/assert"
 	"net/url"
 	"strings"
+	"sync"
 	"sync/atomic"
 	"testing"
 	"time"
@@ -365,3 +367,826 @@ func TestReader_readSync(t *testing.T) {
 		})
 	}
 }
+
+type _TestReader_sendMessageOverTheWire_Transaction struct {
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) String() string {
+	return "_TestReader_sendMessageOverTheWire_Transaction"
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) FailRequest(err error) error {
+	return err
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) EndRequest() error {
+	return nil
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) Submit(operation spi.RequestTransactionRunnable) {
+}
+
+func (_ _TestReader_sendMessageOverTheWire_Transaction) AwaitCompletion(ctx context.Context) error {
+	return nil
+}
+
+func TestReader_sendMessageOverTheWire(t *testing.T) {
+	type fields struct {
+		alphaGenerator *AlphaGenerator
+		messageCodec   *MessageCodec
+		tm             spi.RequestTransactionManager
+	}
+	type args struct {
+		ctx             context.Context
+		transaction     spi.RequestTransaction
+		messageToSend   readWriteModel.CBusMessage
+		addResponseCode func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode)
+		tagName         string
+		addPlcValue     func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue)
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		args   args
+		wg     *sync.WaitGroup
+	}{
+		{
+			name: "Send message empty message",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction:   _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: nil,
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_INTERNAL_ERROR, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with message to client",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("@1A2001\r@"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestReset(
+						readWriteModel.RequestType_RESET,
+						nil,
+						0,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_REQUEST_TIMEOUT, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with server error",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("!"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestReset(
+						readWriteModel.RequestType_RESET,
+						nil,
+						0,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with too many retransmissions",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g#\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_REMOTE_ERROR, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with corruption",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g$\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with sync loss",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g%\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_REMOTE_BUSY, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with too long",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g'\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with confirm only",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g.\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_NOT_FOUND, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: &sync.WaitGroup{},
+		},
+		{
+			name: "Send message which responds with ok",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					type MockState uint8
+					const (
+						INITIAL MockState = iota
+						DONE
+					)
+					currentState := atomic.Value{}
+					currentState.Store(INITIAL)
+					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+						switch currentState.Load().(MockState) {
+						case INITIAL:
+							t.Log("Dispatching read response")
+							transportInstance.FillReadBuffer([]byte("g.890150435F434E49454421\r\n"))
+							currentState.Store(DONE)
+						case DONE:
+							t.Log("Done")
+						}
+					})
+					codec := NewMessageCodec(transportInstance)
+					t.Cleanup(func() {
+						if err := codec.Disconnect(); err != nil {
+							t.Error(err)
+						}
+					})
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+					t.Cleanup(cancel)
+					return timeout
+				}(),
+				transaction: _TestReader_sendMessageOverTheWire_Transaction{},
+				messageToSend: readWriteModel.NewCBusMessageToServer(
+					readWriteModel.NewRequestDirectCommandAccess(
+						readWriteModel.NewCALDataIdentify(
+							readWriteModel.Attribute_CurrentSenseLevels,
+							readWriteModel.CALCommandTypeContainer_CALCommandIdentify,
+							nil,
+							nil,
+						),
+						readWriteModel.NewAlpha('g'),
+						readWriteModel.RequestType_DIRECT_COMMAND,
+						nil,
+						nil,
+						readWriteModel.RequestType_EMPTY,
+						readWriteModel.NewRequestTermination(),
+						nil,
+					),
+					nil,
+					nil,
+				),
+				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+					return func(name string, responseCode apiModel.PlcResponseCode) {
+						t.Logf("Got response code %s for %s", responseCode, name)
+						assert.Equal(t, "horst", name)
+						assert.Equal(t, apiModel.PlcResponseCode_OK, responseCode)
+						wg.Done()
+					}
+				},
+				tagName: "horst",
+				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+					return func(name string, plcValue apiValues.PlcValue) {
+						t.Logf("Got response %s for %s", plcValue, name)
+						wg.Done()
+					}
+				},
+			},
+			wg: func() *sync.WaitGroup {
+				wg := &sync.WaitGroup{}
+				wg.Add(1) // We getting an response and a value
+				return wg
+			}(),
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			m := &Reader{
+				alphaGenerator: tt.fields.alphaGenerator,
+				messageCodec:   tt.fields.messageCodec,
+				tm:             tt.fields.tm,
+			}
+			tt.wg.Add(1)
+			m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t, tt.wg), tt.args.tagName, tt.args.addPlcValue(t, tt.wg))
+			t.Log("Waiting now")
+			tt.wg.Wait() // TODO: we need to timeout this too
+			t.Log("Done waiting")
+		})
+	}
+}


[plc4x] 02/03: fix(plc4go/spi): avoid test transport getting stuck on a endless loop when filling

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 6b8da79620d81a54dff7531c5ceddb14e6e219b3
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri May 5 12:20:17 2023 +0200

    fix(plc4go/spi): avoid test transport getting stuck on a endless loop when filling
---
 plc4go/internal/cbus/MessageCodec.go      |  2 +-
 plc4go/internal/cbus/MessageCodec_test.go | 14 +++++++-------
 plc4go/spi/transports/test/Transport.go   |  4 ++--
 3 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 53535522c6..1085eb8ca1 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -116,7 +116,7 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 				return true
 			}
 		}); err != nil {
-			return nil, errors.Wrap(err, "error filling buffer")
+			log.Debug().Err(err).Msg("Error filling buffer")
 		}
 	}
 	log.Trace().Msg("Buffer filled")
diff --git a/plc4go/internal/cbus/MessageCodec_test.go b/plc4go/internal/cbus/MessageCodec_test.go
index 3d403b9172..130d8957d0 100644
--- a/plc4go/internal/cbus/MessageCodec_test.go
+++ b/plc4go/internal/cbus/MessageCodec_test.go
@@ -120,7 +120,7 @@ func TestMessageCodec_Receive(t *testing.T) {
 				hashEncountered:               0,
 				currentlyReportedServerErrors: 0,
 			},
-			wantErr: assert.Error,
+			wantErr: assert.NoError,
 		},
 		{
 			name: "checksum error",
@@ -547,8 +547,8 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) {
 		var msg spi.Message
 		var err error
 		msg, err = codec.Receive()
-		// No data yet so this should error
-		assert.Error(t, err)
+		// No data yet so this should return no error and no data
+		assert.NoError(t, err)
 		assert.Nil(t, msg)
 		// Now we add a confirmation
 		transportInstance.FillReadBuffer([]byte("i."))
@@ -578,8 +578,8 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) {
 		var msg spi.Message
 		var err error
 		msg, err = codec.Receive()
-		// No data yet so this should error
-		assert.Error(t, err)
+		// No data yet so this should return no error and no data
+		assert.NoError(t, err)
 		assert.Nil(t, msg)
 		// Now we add a confirmation
 		transportInstance.FillReadBuffer([]byte("i."))
@@ -612,8 +612,8 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) {
 		var msg spi.Message
 		var err error
 		msg, err = codec.Receive()
-		// No data yet so this should error
-		assert.Error(t, err)
+		// No data yet so this should return no error and no data
+		assert.NoError(t, err)
 		assert.Nil(t, msg)
 		// Now we add a confirmation
 		transportInstance.FillReadBuffer([]byte("i."))
diff --git a/plc4go/spi/transports/test/Transport.go b/plc4go/spi/transports/test/Transport.go
index 63c776e6d8..995bd969dc 100644
--- a/plc4go/spi/transports/test/Transport.go
+++ b/plc4go/spi/transports/test/Transport.go
@@ -128,8 +128,8 @@ func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, re
 }
 
 func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
-	log.Trace().Msgf("Peek %d readable bytes", numBytes)
 	availableBytes := uint32(math.Min(float64(numBytes), float64(len(m.readBuffer))))
+	log.Trace().Msgf("Peek %d readable bytes (%d available bytes)", numBytes, availableBytes)
 	var err error
 	if availableBytes != numBytes {
 		err = errors.New("not enough bytes available")
@@ -137,7 +137,7 @@ func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
 	if availableBytes == 0 {
 		return nil, err
 	}
-	return m.readBuffer[0:availableBytes], nil
+	return m.readBuffer[0:availableBytes], err
 }
 
 func (m *TransportInstance) Read(numBytes uint32) ([]byte, error) {


[plc4x] 01/03: fix(plc4go/cbus): decrease wait time if it is a confirmed request to server

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 922c7218ed49183aad83fbf2debdfa6090ae4121
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri May 5 11:28:48 2023 +0200

    fix(plc4go/cbus): decrease wait time if it is a confirmed request to server
---
 plc4go/internal/cbus/MessageCodec.go | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index cf67bc4acc..53535522c6 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -184,8 +184,13 @@ lookingForTheEnd:
 		return nil, err
 	}
 	if indexOfCR+1 == indexOfLF {
+		log.Trace().Msg("pci response for sure")
 		// This means a <cr> is directly followed by a <lf> which means that we know for sure this is a response
 		pciResponse = true
+	} else if indexOfCR >= 0 && int(readableBytes) >= indexOfCR+2 && peekedBytes[+indexOfCR+1] != '\n' {
+		log.Trace().Msg("pci request for sure")
+		// We got a request to pci for sure because the cr is followed by something else than \n
+		requestToPci = true
 	}
 	const numberOfCyclesToWait = 15
 	const estimatedElapsedTime = numberOfCyclesToWait * 10