You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/31 13:48:03 UTC
[plc4x] branch develop updated: fix(plc4go/cbus): remove some remaining global logs
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 0025f9fd73 fix(plc4go/cbus): remove some remaining global logs
0025f9fd73 is described below
commit 0025f9fd730858585a6535bd35363e887e4abfc6
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed May 31 15:47:55 2023 +0200
fix(plc4go/cbus): remove some remaining global logs
---
plc4go/internal/cbus/Browser_test.go | 33 +-
plc4go/internal/cbus/Discoverer.go | 10 +-
plc4go/internal/cbus/Discoverer_test.go | 99 +++--
plc4go/internal/cbus/Driver_test.go | 6 +
plc4go/internal/cbus/MessageCodec_test.go | 265 +++++++++---
plc4go/internal/cbus/Reader_test.go | 660 ++++++++++++++++--------------
plc4go/internal/cbus/noGlobalLog_test.go | 4 +-
plc4go/spi/default/DefaultCodec.go | 4 +
plc4go/spi/transports/tcp/Transport.go | 6 +-
plc4go/spi/utils/Net.go | 12 +-
10 files changed, 680 insertions(+), 419 deletions(-)
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index f14cba80ac..8fc96fc76f 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -22,9 +22,11 @@ package cbus
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"net/url"
"sync/atomic"
"testing"
+ "time"
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -101,6 +103,9 @@ func TestBrowser_BrowseQuery(t *testing.T) {
t.Error(err)
t.FailNow()
}
+ t.Cleanup(func() {
+ assert.NoError(t, transportInstance.Close())
+ })
type MockState uint8
const (
RESET MockState = iota
@@ -161,12 +166,24 @@ func TestBrowser_BrowseQuery(t *testing.T) {
t.Error(err)
t.FailNow()
}
- connectionConnectResult := <-NewDriver(loggerOption).GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
+ driver := NewDriver(loggerOption)
+ connectionConnectResult := <-driver.GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
if err := connectionConnectResult.GetErr(); err != nil {
t.Error(err)
t.FailNow()
}
fields.connection = connectionConnectResult.GetConnection()
+ t.Cleanup(func() {
+ timer := time.NewTimer(1 * time.Second)
+ t.Cleanup(func() {
+ utils.CleanupTimer(timer)
+ })
+ select {
+ case <-fields.connection.Close():
+ case <-timer.C:
+ t.Error("timeout")
+ }
+ })
},
want: apiModel.PlcResponseCode_OK,
want1: []apiModel.PlcBrowseItem{
@@ -367,6 +384,9 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
t.Error(err)
t.FailNow()
}
+ t.Cleanup(func() {
+ assert.NoError(t, transportInstance.Close())
+ })
type MockState uint8
const (
RESET MockState = iota
@@ -428,6 +448,17 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
t.FailNow()
}
fields.connection = connectionConnectResult.GetConnection()
+ t.Cleanup(func() {
+ timer := time.NewTimer(1 * time.Second)
+ t.Cleanup(func() {
+ utils.CleanupTimer(timer)
+ })
+ select {
+ case <-fields.connection.Close():
+ case <-timer.C:
+ t.Error("timeout")
+ }
+ })
},
want: map[byte]any{
1: true,
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 139ffda597..2032b0bae5 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -38,8 +38,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/utils"
-
- "github.com/rs/zerolog/log"
)
type Discoverer struct {
@@ -70,7 +68,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
if err != nil {
return errors.Wrap(err, "error getting addresses")
}
- if log.Debug().Enabled() {
+ if d.log.Debug().Enabled() {
for _, provider := range interfaces {
d.log.Debug().Msgf("Discover on %s", provider)
d.log.Trace().Msgf("Discover on %#v", provider.containedInterface())
@@ -82,7 +80,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
tcpTransport := tcp.NewTransport()
// Iterate over all network devices of this system.
for _, netInterface := range interfaces {
- interfaceLog := log.With().Stringer("interface", netInterface).Logger()
+ interfaceLog := d.log.With().Stringer("interface", netInterface).Logger()
interfaceLog.Debug().Msg("Scanning")
addrs, err := netInterface.Addrs()
if err != nil {
@@ -117,7 +115,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
if ipv4Addr == nil || ipv4Addr.IsLoopback() {
continue
}
- addresses, err := utils.GetIPAddresses(ctx, netInterface.containedInterface(), false)
+ addresses, err := utils.GetIPAddresses(d.log, ctx, netInterface.containedInterface(), false)
if err != nil {
addressLogger.Warn().Err(err).Msgf("Can't get addresses for %v", netInterface)
continue
@@ -206,7 +204,7 @@ func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *
func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) pool.Runnable {
return func() {
- transportInstanceLogger := log.With().Stringer("transportInstance", tcpTransportInstance).Logger()
+ transportInstanceLogger := d.log.With().Stringer("transportInstance", tcpTransportInstance).Logger()
transportInstanceLogger.Debug().Msgf("Scanning %v", tcpTransportInstance)
// Create a codec for sending and receiving messages.
codec := NewMessageCodec(tcpTransportInstance, options.WithCustomLogger(d.log))
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 2e11686608..f005d926dd 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -22,22 +22,25 @@ package cbus
import (
"context"
"fmt"
+ "net"
+ "net/url"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+ readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/tcp"
"github.com/apache/plc4x/plc4go/spi/utils"
+
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"
- "net"
- "net/url"
- "strconv"
- "sync"
- "testing"
- "time"
)
func TestNewDiscoverer(t *testing.T) {
@@ -151,55 +154,67 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields, args *args)
}{
{
name: "create a dispatcher",
args: args{
- tcpTransportInstance: func() *tcp.TransportInstance {
- listen, err := net.Listen("tcp", "127.0.0.1:0")
+ callback: func(t *testing.T, event apiModel.PlcDiscoveryItem) {
+ assert.NotNil(t, event)
+ },
+ },
+ setup: func(t *testing.T, fields *fields, args *args) {
+ listen, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ go func() {
+ conn, err := listen.Accept()
if err != nil {
t.Error(err)
- t.FailNow()
+ return
}
- go func() {
- conn, err := listen.Accept()
- if err != nil {
- t.Error(err)
- return
- }
- write, err := conn.Write([]byte("x.890050435F434E49454422\r\n"))
- if err != nil {
- t.Error(err)
- return
- }
- t.Logf("%d written", write)
- }()
- t.Cleanup(func() {
- if err := listen.Close(); err != nil {
- t.Error(err)
- }
- })
- transport := tcp.NewTransport()
- parse, err := url.Parse("tcp://" + listen.Addr().String())
+ write, err := conn.Write([]byte("x.890050435F434E49454422\r\n"))
if err != nil {
t.Error(err)
- t.FailNow()
+ return
}
- instance, err := transport.CreateTransportInstance(*parse, nil)
- if err != nil {
+ t.Logf("%d written", write)
+ }()
+ t.Cleanup(func() {
+ if err := listen.Close(); err != nil {
t.Error(err)
- t.FailNow()
}
- return instance.(*tcp.TransportInstance)
- }(),
- callback: func(t *testing.T, event apiModel.PlcDiscoveryItem) {
- assert.NotNil(t, event)
- },
+ })
+
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ loggerOption := options.WithCustomLogger(logger)
+ transport := tcp.NewTransport(loggerOption)
+ parse, err := url.Parse("tcp://" + listen.Addr().String())
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ instance, err := transport.CreateTransportInstance(*parse, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ args.tcpTransportInstance = instance.(*tcp.TransportInstance)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
+ }
d := &Discoverer{
transportInstanceCreationQueue: tt.fields.transportInstanceCreationQueue,
deviceScanningQueue: tt.fields.deviceScanningQueue,
@@ -232,6 +247,7 @@ func TestDiscoverer_createTransportInstanceDispatcher(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields, args *args)
}{
{
name: "create a dispatcher",
@@ -242,7 +258,6 @@ func TestDiscoverer_createTransportInstanceDispatcher(t *testing.T) {
return &wg
}(),
ip: net.IPv4(127, 0, 0, 1),
- tcpTransport: tcp.NewTransport(),
transportInstances: make(chan transports.TransportInstance, 1),
cBusPort: func() uint16 {
listen, err := net.Listen("tcp", "127.0.0.1:0")
@@ -278,10 +293,16 @@ func TestDiscoverer_createTransportInstanceDispatcher(t *testing.T) {
return uint16(port)
}(),
},
+ setup: func(t *testing.T, fields *fields, args *args) {
+ args.tcpTransport = tcp.NewTransport(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ },
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
+ }
d := &Discoverer{
transportInstanceCreationQueue: tt.fields.transportInstanceCreationQueue,
deviceScanningQueue: tt.fields.deviceScanningQueue,
diff --git a/plc4go/internal/cbus/Driver_test.go b/plc4go/internal/cbus/Driver_test.go
index 7b75e79cc2..878eef1dc3 100644
--- a/plc4go/internal/cbus/Driver_test.go
+++ b/plc4go/internal/cbus/Driver_test.go
@@ -26,6 +26,7 @@ import (
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
_default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/test"
@@ -53,6 +54,7 @@ func TestDriver_DiscoverWithContext(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields, args *args)
wantErr assert.ErrorAssertionFunc
}{
{
@@ -69,12 +71,16 @@ func TestDriver_DiscoverWithContext(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
+ }
m := &Driver{
DefaultDriver: tt.fields.DefaultDriver,
tm: tt.fields.tm,
awaitSetupComplete: tt.fields.awaitSetupComplete,
awaitDisconnectComplete: tt.fields.awaitDisconnectComplete,
}
+ m.log = testutils.ProduceTestingLogger(t)
tt.wantErr(t, m.DiscoverWithContext(tt.args.ctx, tt.args.callback, tt.args.discoveryOptions...), fmt.Sprintf("DiscoverWithContext(%v, func()*, %v)", tt.args.ctx, tt.args.discoveryOptions))
})
}
diff --git a/plc4go/internal/cbus/MessageCodec_test.go b/plc4go/internal/cbus/MessageCodec_test.go
index 130d8957d0..9f9e9dcf82 100644
--- a/plc4go/internal/cbus/MessageCodec_test.go
+++ b/plc4go/internal/cbus/MessageCodec_test.go
@@ -24,6 +24,8 @@ import (
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/test"
"github.com/stretchr/testify/assert"
@@ -48,6 +50,7 @@ func TestMessageCodec_Send(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields, args *args)
wantErr assert.ErrorAssertionFunc
}{
{
@@ -56,19 +59,36 @@ func TestMessageCodec_Send(t *testing.T) {
},
{
name: "a cbus message",
- fields: fields{
- DefaultCodec: NewMessageCodec(test.NewTransportInstance(test.NewTransport())),
- },
args: args{message: readWriteModel.NewCBusMessageToClient(
readWriteModel.NewReplyOrConfirmationConfirmation(
readWriteModel.NewConfirmation(readWriteModel.NewAlpha('!'), nil, readWriteModel.ConfirmationType_CHECKSUM_FAILURE), nil, 0x00, nil, nil,
), nil, nil,
)},
+ setup: func(t *testing.T, fields *fields, args *args) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ instance := test.NewTransportInstance(transport, loggerOption)
+ codec := NewMessageCodec(instance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.DefaultCodec = codec
+ },
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
+ }
m := &MessageCodec{
DefaultCodec: tt.fields.DefaultCodec,
requestContext: tt.fields.requestContext,
@@ -101,17 +121,13 @@ func TestMessageCodec_Receive(t *testing.T) {
tests := []struct {
name string
fields fields
+ setup func(t *testing.T, fields *fields)
want spi.Message
wantErr assert.ErrorAssertionFunc
}{
{
name: "No data",
fields: fields{
- DefaultCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- transportInstance := test.NewTransportInstance(transport)
- return transportInstance
- }()),
requestContext: requestContext,
cbusOptions: cbusOptions,
monitoredMMIs: nil,
@@ -120,17 +136,28 @@ func TestMessageCodec_Receive(t *testing.T) {
hashEncountered: 0,
currentlyReportedServerErrors: 0,
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ instance := test.NewTransportInstance(transport, loggerOption)
+ codec := NewMessageCodec(instance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.DefaultCodec = codec
+ },
wantErr: assert.NoError,
},
{
name: "checksum error",
fields: fields{
- DefaultCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- transportInstance := test.NewTransportInstance(transport)
- transportInstance.FillReadBuffer([]byte("!"))
- return transportInstance
- }()),
requestContext: requestContext,
cbusOptions: cbusOptions,
monitoredMMIs: nil,
@@ -145,17 +172,29 @@ func TestMessageCodec_Receive(t *testing.T) {
),
requestContext, cbusOptions,
),
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ instance := test.NewTransportInstance(transport, loggerOption)
+ instance.FillReadBuffer([]byte("!"))
+ codec := NewMessageCodec(instance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.DefaultCodec = codec
+ },
wantErr: assert.NoError,
},
{
name: "A21 echo",
fields: fields{
- DefaultCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- transportInstance := test.NewTransportInstance(transport)
- transportInstance.FillReadBuffer([]byte("@A62120\r@A62120\r"))
- return transportInstance
- }()),
requestContext: requestContext,
cbusOptions: cbusOptions,
monitoredMMIs: nil,
@@ -164,17 +203,29 @@ func TestMessageCodec_Receive(t *testing.T) {
hashEncountered: 0,
currentlyReportedServerErrors: 0,
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ instance := test.NewTransportInstance(transport, loggerOption)
+ instance.FillReadBuffer([]byte("@A62120\r@A62120\r"))
+ codec := NewMessageCodec(instance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.DefaultCodec = codec
+ },
wantErr: assert.NoError,
},
{
name: "garbage",
fields: fields{
- DefaultCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- transportInstance := test.NewTransportInstance(transport)
- transportInstance.FillReadBuffer([]byte("what on earth\n\r"))
- return transportInstance
- }()),
requestContext: requestContext,
cbusOptions: cbusOptions,
monitoredMMIs: nil,
@@ -183,17 +234,29 @@ func TestMessageCodec_Receive(t *testing.T) {
hashEncountered: 0,
currentlyReportedServerErrors: 0,
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ instance := test.NewTransportInstance(transport, loggerOption)
+ instance.FillReadBuffer([]byte("what on earth\n\r"))
+ codec := NewMessageCodec(instance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.DefaultCodec = codec
+ },
wantErr: assert.NoError,
},
{
name: "error encountered multiple time",
fields: fields{
- DefaultCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- transportInstance := test.NewTransportInstance(transport)
- transportInstance.FillReadBuffer([]byte("AFFE!!!\r"))
- return transportInstance
- }()),
requestContext: requestContext,
cbusOptions: cbusOptions,
monitoredMMIs: nil,
@@ -202,6 +265,24 @@ func TestMessageCodec_Receive(t *testing.T) {
hashEncountered: 9999,
currentlyReportedServerErrors: 0,
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ instance := test.NewTransportInstance(transport, loggerOption)
+ instance.FillReadBuffer([]byte("AFFE!!!\r"))
+ codec := NewMessageCodec(instance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.DefaultCodec = codec
+ },
want: readWriteModel.NewCBusMessageToClient(
readWriteModel.NewServerErrorReply(
33, cbusOptions, requestContext,
@@ -213,12 +294,6 @@ func TestMessageCodec_Receive(t *testing.T) {
{
name: "error encountered and reported multiple time",
fields: fields{
- DefaultCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- transportInstance := test.NewTransportInstance(transport)
- transportInstance.FillReadBuffer([]byte("@1A2001!!!\r"))
- return transportInstance
- }()),
requestContext: requestContext,
cbusOptions: cbusOptions,
monitoredMMIs: nil,
@@ -246,17 +321,29 @@ func TestMessageCodec_Receive(t *testing.T) {
),
requestContext, cbusOptions,
),
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ instance := test.NewTransportInstance(transport, loggerOption)
+ instance.FillReadBuffer([]byte("@1A2001!!!\r"))
+ codec := NewMessageCodec(instance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.DefaultCodec = codec
+ },
wantErr: assert.NoError,
},
{
name: "mmi",
fields: fields{
- DefaultCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- transportInstance := test.NewTransportInstance(transport)
- transportInstance.FillReadBuffer([]byte("86040200F940380001000000000000000008000000000000000000000000FA\r\n"))
- return transportInstance
- }()),
requestContext: requestContext,
cbusOptions: cbusOptions,
monitoredMMIs: nil,
@@ -265,6 +352,24 @@ func TestMessageCodec_Receive(t *testing.T) {
hashEncountered: 9999,
currentlyReportedServerErrors: 9999,
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ instance := test.NewTransportInstance(transport, loggerOption)
+ instance.FillReadBuffer([]byte("86040200F940380001000000000000000008000000000000000000000000FA\r\n"))
+ codec := NewMessageCodec(instance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.DefaultCodec = codec
+ },
want: readWriteModel.NewCBusMessageToClient(
readWriteModel.NewReplyOrConfirmationReply(
readWriteModel.NewReplyEncodedReply(
@@ -447,12 +552,6 @@ func TestMessageCodec_Receive(t *testing.T) {
{
name: "sal",
fields: fields{
- DefaultCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- transportInstance := test.NewTransportInstance(transport)
- transportInstance.FillReadBuffer([]byte("0531AC0079042F0401430316000011\r\n"))
- return transportInstance
- }()),
requestContext: requestContext,
cbusOptions: cbusOptions,
monitoredMMIs: nil,
@@ -461,6 +560,24 @@ func TestMessageCodec_Receive(t *testing.T) {
hashEncountered: 9999,
currentlyReportedServerErrors: 9999,
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ instance := test.NewTransportInstance(transport, loggerOption)
+ instance.FillReadBuffer([]byte("0531AC0079042F0401430316000011\r\n"))
+ codec := NewMessageCodec(instance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.DefaultCodec = codec
+ },
want: readWriteModel.NewCBusMessageToClient(
readWriteModel.NewReplyOrConfirmationReply(
readWriteModel.NewReplyEncodedReply(
@@ -518,6 +635,9 @@ func TestMessageCodec_Receive(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
m := &MessageCodec{
DefaultCodec: tt.fields.DefaultCodec,
requestContext: tt.fields.requestContext,
@@ -539,9 +659,20 @@ func TestMessageCodec_Receive(t *testing.T) {
func TestMessageCodec_Receive_Delayed_Response(t *testing.T) {
t.Run("instant data", func(t *testing.T) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
transport := test.NewTransport()
transportInstance := test.NewTransportInstance(transport)
- codec := NewMessageCodec(transportInstance)
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
codec.requestContext = readWriteModel.NewRequestContext(true)
var msg spi.Message
@@ -570,9 +701,20 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) {
assert.True(t, msg.(readWriteModel.CBusMessageToClient).GetReply().GetIsAlpha())
})
t.Run("data after 6 times", func(t *testing.T) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
transport := test.NewTransport()
transportInstance := test.NewTransportInstance(transport)
- codec := NewMessageCodec(transportInstance)
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
codec.requestContext = readWriteModel.NewRequestContext(true)
var msg spi.Message
@@ -604,9 +746,20 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) {
assert.True(t, msg.(readWriteModel.CBusMessageToClient).GetReply().GetIsAlpha())
})
t.Run("data after 16 times", func(t *testing.T) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
transport := test.NewTransport()
transportInstance := test.NewTransportInstance(transport)
- codec := NewMessageCodec(transportInstance)
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
codec.requestContext = readWriteModel.NewRequestContext(true)
var msg spi.Message
@@ -663,7 +816,11 @@ func TestNewMessageCodec(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- assert.NotNilf(t, NewMessageCodec(tt.args.transportInstance), "NewMessageCodec(%v)", tt.args.transportInstance)
+ codec := NewMessageCodec(tt.args.transportInstance)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ assert.NotNilf(t, codec, "NewMessageCodec(%v)", tt.args.transportInstance)
})
}
}
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index 1ead699a20..c9a5cb4903 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -238,41 +238,6 @@ func TestReader_readSync(t *testing.T) {
name: "read identify type",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- INITIAL MockState = iota
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(INITIAL)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case INITIAL:
- t.Log("Dispatching read response")
- transportInstance.FillReadBuffer([]byte("g.890150435F434E49454421\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
},
args: args{
ctx: func() context.Context {
@@ -293,7 +258,46 @@ func TestReader_readSync(t *testing.T) {
result: make(chan apiModel.PlcReadRequestResult, 1),
},
setup: func(t *testing.T, fields *fields) {
- fields.tm = transactions.NewRequestTransactionManager(10, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ fields.tm = transactions.NewRequestTransactionManager(10, loggerOption)
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g.890150435F434E49454421\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.messageCodec = codec
},
resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
timer := time.NewTimer(2 * time.Second)
@@ -316,24 +320,6 @@ func TestReader_readSync(t *testing.T) {
name: "read identify type aborted",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- codec := NewMessageCodec(transportInstance)
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
},
args: args{
ctx: func() context.Context {
@@ -355,6 +341,30 @@ func TestReader_readSync(t *testing.T) {
},
setup: func(t *testing.T, fields *fields) {
fields.tm = transactions.NewRequestTransactionManager(10, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+
+ fields.messageCodec = codec
},
resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
timer := time.NewTimer(2 * time.Second)
@@ -400,11 +410,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
addPlcValue func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue)
}
tests := []struct {
- name string
- fields fields
- args args
- mockSetup func(t *testing.T, fields *fields, args *args)
- wg *sync.WaitGroup
+ name string
+ fields fields
+ args args
+ setup func(t *testing.T, fields *fields, args *args)
+ wg *sync.WaitGroup
}{
{
name: "Send message empty message",
@@ -457,7 +467,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
expect.FailRequest(mock.Anything).Return(errors.New("no I say"))
@@ -548,7 +558,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
expect.FailRequest(mock.Anything).Return(errors.New("Nope"))
@@ -639,7 +649,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
expect.EndRequest().Return(nil)
@@ -651,46 +661,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
name: "Send message which responds with too many retransmissions",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- INITIAL MockState = iota
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(INITIAL)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case INITIAL:
- t.Log("Dispatching read response")
- transportInstance.FillReadBuffer([]byte("g#\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
- })
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
},
args: args{
ctx: func() context.Context {
@@ -733,11 +703,56 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
expect.EndRequest().Return(nil)
args.transaction = transaction
+
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g#\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.messageCodec = codec
},
wg: &sync.WaitGroup{},
},
@@ -745,46 +760,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
name: "Send message which responds with corruption",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- INITIAL MockState = iota
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(INITIAL)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case INITIAL:
- t.Log("Dispatching read response")
- transportInstance.FillReadBuffer([]byte("g$\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
- })
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
},
args: args{
ctx: func() context.Context {
@@ -827,11 +802,56 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
expect.EndRequest().Return(nil)
args.transaction = transaction
+
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g$\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.messageCodec = codec
},
wg: &sync.WaitGroup{},
},
@@ -839,46 +859,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
name: "Send message which responds with sync loss",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- INITIAL MockState = iota
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(INITIAL)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case INITIAL:
- t.Log("Dispatching read response")
- transportInstance.FillReadBuffer([]byte("g%\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
- })
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
},
args: args{
ctx: func() context.Context {
@@ -921,11 +901,56 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
expect.EndRequest().Return(nil)
args.transaction = transaction
+
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g%\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.messageCodec = codec
},
wg: &sync.WaitGroup{},
},
@@ -933,46 +958,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
name: "Send message which responds with too long",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- INITIAL MockState = iota
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(INITIAL)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case INITIAL:
- t.Log("Dispatching read response")
- transportInstance.FillReadBuffer([]byte("g'\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
- })
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
},
args: args{
ctx: func() context.Context {
@@ -1015,11 +1000,56 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
expect.EndRequest().Return(nil)
args.transaction = transaction
+
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g'\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.messageCodec = codec
},
wg: &sync.WaitGroup{},
},
@@ -1027,46 +1057,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
name: "Send message which responds with confirm only",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- INITIAL MockState = iota
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(INITIAL)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case INITIAL:
- t.Log("Dispatching read response")
- transportInstance.FillReadBuffer([]byte("g.\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
- })
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
},
args: args{
ctx: func() context.Context {
@@ -1109,11 +1099,56 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
expect.EndRequest().Return(nil)
args.transaction = transaction
+
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g.\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.messageCodec = codec
},
wg: &sync.WaitGroup{},
},
@@ -1121,46 +1156,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
name: "Send message which responds with ok",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- INITIAL MockState = iota
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(INITIAL)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case INITIAL:
- t.Log("Dispatching read response")
- transportInstance.FillReadBuffer([]byte("g.890150435F434E49454421\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
- })
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
},
args: args{
ctx: func() context.Context {
@@ -1203,11 +1198,56 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
expect.EndRequest().Return(nil)
args.transaction = transaction
+
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("g.890150435F434E49454421\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ t.Cleanup(func() {
+ if err := codec.Disconnect(); err != nil {
+ t.Error(err)
+ }
+ })
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.messageCodec = codec
},
wg: func() *sync.WaitGroup {
wg := &sync.WaitGroup{}
@@ -1218,8 +1258,8 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if tt.mockSetup != nil {
- tt.mockSetup(t, &tt.fields, &tt.args)
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
}
m := &Reader{
alphaGenerator: tt.fields.alphaGenerator,
diff --git a/plc4go/internal/cbus/noGlobalLog_test.go b/plc4go/internal/cbus/noGlobalLog_test.go
index 7a3475947d..defdd82756 100644
--- a/plc4go/internal/cbus/noGlobalLog_test.go
+++ b/plc4go/internal/cbus/noGlobalLog_test.go
@@ -19,7 +19,9 @@
package cbus
+import "github.com/apache/plc4x/plc4go/spi/testutils"
+
// This ensures that we don't global log
func init() {
- //testutils.ExplodingGlobalLogger(true)
+ testutils.ExplodingGlobalLogger(true)
}
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index f5be14b852..53ec5c6bae 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -172,6 +172,10 @@ func (m *defaultCodec) ConnectWithContext(ctx context.Context) error {
func (m *defaultCodec) Disconnect() error {
m.log.Trace().Msg("Disconnecting")
m.running = false
+ if m.transportInstance == nil {
+ // TODO: check if we move that case to the constructor
+ return nil
+ }
return m.transportInstance.Close()
}
diff --git a/plc4go/spi/transports/tcp/Transport.go b/plc4go/spi/transports/tcp/Transport.go
index 09c0fd2ba1..26f84c2782 100644
--- a/plc4go/spi/transports/tcp/Transport.go
+++ b/plc4go/spi/transports/tcp/Transport.go
@@ -40,8 +40,10 @@ type Transport struct {
log zerolog.Logger
}
-func NewTransport() *Transport {
- return &Transport{}
+func NewTransport(_options ...options.WithOption) *Transport {
+ return &Transport{
+ log: options.ExtractCustomLogger(_options...),
+ }
}
func (m Transport) GetTransportCode() string {
diff --git a/plc4go/spi/utils/Net.go b/plc4go/spi/utils/Net.go
index a2a6e539be..d48cc6b45f 100644
--- a/plc4go/spi/utils/Net.go
+++ b/plc4go/spi/utils/Net.go
@@ -22,6 +22,7 @@ package utils
import (
"bytes"
"context"
+ "github.com/rs/zerolog"
"net"
"sync"
"time"
@@ -30,10 +31,9 @@ import (
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/pcap"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
-func GetIPAddresses(ctx context.Context, netInterface net.Interface, useArpBasedScan bool) (foundIps chan net.IP, err error) {
+func GetIPAddresses(log zerolog.Logger, ctx context.Context, netInterface net.Interface, useArpBasedScan bool) (foundIps chan net.IP, err error) {
foundIps = make(chan net.IP, 65536)
addrs, err := netInterface.Addrs()
if err != nil {
@@ -71,13 +71,13 @@ func GetIPAddresses(ctx context.Context, netInterface net.Interface, useArpBased
log.Debug().Stringer("IP", ipnet.IP).Stringer("Mask", ipnet.Mask).Msg("Expanding local subnet")
if useArpBasedScan {
- if err := lockupIpsUsingArp(ctx, netInterface, ipnet, foundIps, wg); err != nil {
+ if err := lockupIpsUsingArp(log, ctx, netInterface, ipnet, foundIps, wg); err != nil {
log.Error().Err(err).Msg("failing to resolve using arp scan. Falling back to ip based scan")
useArpBasedScan = false
}
}
if !useArpBasedScan {
- if err := lookupIps(ctx, ipnet, foundIps, wg); err != nil {
+ if err := lookupIps(log, ctx, ipnet, foundIps, wg); err != nil {
log.Error().Err(err).Msg("error looking up ips")
}
}
@@ -92,7 +92,7 @@ func GetIPAddresses(ctx context.Context, netInterface net.Interface, useArpBased
// As PING operations might be blocked by a firewall, responding to ARP packets is mandatory for IP based
// systems. So we are using an ARP scan to resolve the ethernet hardware addresses of each possible ip in range
// Only for devices that respond will we schedule a discovery.
-func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *net.IPNet, foundIps chan net.IP, wg *sync.WaitGroup) error {
+func lockupIpsUsingArp(log zerolog.Logger, ctx context.Context, netInterface net.Interface, ipNet *net.IPNet, foundIps chan net.IP, wg *sync.WaitGroup) error {
// We add on signal for error handling
wg.Add(1)
go func() { wg.Done() }()
@@ -225,7 +225,7 @@ func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *n
}
// Simply takes the IP address and the netmask and schedules one discovery task for every possible IP
-func lookupIps(ctx context.Context, ipnet *net.IPNet, foundIps chan net.IP, wg *sync.WaitGroup) error {
+func lookupIps(log zerolog.Logger, ctx context.Context, ipnet *net.IPNet, foundIps chan net.IP, wg *sync.WaitGroup) error {
log.Debug().Msgf("Scanning all IP addresses for network: %s", ipnet)
// expand CIDR-block into one target for each IP
// Remark: The last IP address a network contains is a special broadcast address. We don't want to check that one.