You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/02 12:08:18 UTC
[plc4x] branch develop updated: fix(plc4go): pass loggers where missing
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 2ff14f33c0 fix(plc4go): pass loggers where missing
2ff14f33c0 is described below
commit 2ff14f33c0a5fd2351d23128253f98804b38e3f1
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 14:08:11 2023 +0200
fix(plc4go): pass loggers where missing
---
plc4go/internal/ads/Driver.go | 18 +-
plc4go/internal/ads/Subscriber.go | 2 +-
plc4go/internal/ads/model/AdsSubscriptionHandle.go | 11 +-
plc4go/internal/cbus/Connection_test.go | 223 ++++++----------
plc4go/internal/cbus/Reader_test.go | 286 ++++++++-------------
plc4go/internal/eip/EipDriver.go | 24 +-
plc4go/internal/knxnetip/Discoverer.go | 2 +-
plc4go/internal/knxnetip/Driver.go | 8 +-
plc4go/internal/knxnetip/Subscriber.go | 2 +-
plc4go/internal/modbus/ModbusAsciiDriver.go | 2 +-
plc4go/internal/modbus/ModbusRtuDriver.go | 2 +-
plc4go/internal/modbus/ModbusTcpDriver.go | 2 +-
plc4go/internal/s7/Driver.go | 2 +-
plc4go/internal/simulated/Driver.go | 2 +-
plc4go/internal/simulated/Driver_test.go | 13 +-
plc4go/internal/simulated/Reader.go | 10 +-
plc4go/internal/simulated/Reader_test.go | 4 +-
plc4go/internal/simulated/Writer.go | 10 +-
plc4go/internal/simulated/Writer_test.go | 4 +-
plc4go/spi/pool/WorkerPool_test.go | 7 +
.../transactions/RequestTransactionManager_test.go | 10 +-
plc4go/spi/transports/test/Transport.go | 2 +-
22 files changed, 269 insertions(+), 377 deletions(-)
diff --git a/plc4go/internal/ads/Driver.go b/plc4go/internal/ads/Driver.go
index 91681eebd8..cfc2a33f3f 100644
--- a/plc4go/internal/ads/Driver.go
+++ b/plc4go/internal/ads/Driver.go
@@ -49,8 +49,8 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
return driver
}
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
// Get the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
@@ -60,30 +60,30 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
- options["defaultTcpPort"] = []string{strconv.Itoa(int(adsModel.AdsConstants_ADSTCPDEFAULTPORT))}
+ driverOptions["defaultTcpPort"] = []string{strconv.Itoa(int(adsModel.AdsConstants_ADSTCPDEFAULTPORT))}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions)
if err != nil {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
// Create a new codec for taking care of encoding/decoding of messages
- codec := NewMessageCodec(transportInstance)
+ codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
m.log.Debug().Msgf("working with codec %#v", codec)
- configuration, err := model.ParseFromOptions(m.log, options)
+ configuration, err := model.ParseFromOptions(m.log, driverOptions)
if err != nil {
- m.log.Error().Err(err).Msgf("Invalid options")
+ m.log.Error().Err(err).Msgf("Invalid driverOptions")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "invalid configuration"))
return ch
}
// Create the new connection
- connection, err := NewConnection(codec, configuration, options)
+ connection, err := NewConnection(codec, configuration, driverOptions)
if err != nil {
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "couldn't create connection"))
diff --git a/plc4go/internal/ads/Subscriber.go b/plc4go/internal/ads/Subscriber.go
index cf0ced6ed1..5442b57184 100644
--- a/plc4go/internal/ads/Subscriber.go
+++ b/plc4go/internal/ads/Subscriber.go
@@ -147,7 +147,7 @@ func (m *Connection) subscribe(ctx context.Context, subscriptionRequest apiModel
)
}
// Create a new subscription handle.
- subscriptionHandle := dirverModel.NewAdsSubscriptionHandle(m, tagName, directTag)
+ subscriptionHandle := dirverModel.NewAdsSubscriptionHandle(m, tagName, directTag, options.WithCustomLogger(m.log))
responseChan <- spiModel.NewDefaultPlcSubscriptionRequestResult(
subscriptionRequest,
spiModel.NewDefaultPlcSubscriptionResponse(
diff --git a/plc4go/internal/ads/model/AdsSubscriptionHandle.go b/plc4go/internal/ads/model/AdsSubscriptionHandle.go
index 111b0d576c..81bf0254c9 100644
--- a/plc4go/internal/ads/model/AdsSubscriptionHandle.go
+++ b/plc4go/internal/ads/model/AdsSubscriptionHandle.go
@@ -20,6 +20,8 @@
package model
import (
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -34,15 +36,16 @@ type AdsSubscriptionHandle struct {
directTag DirectPlcTag
consumers []apiModel.PlcSubscriptionEventConsumer
- apiModel.PlcSubscriptionHandle
+ log zerolog.Logger
}
-func NewAdsSubscriptionHandle(subscriber spi.PlcSubscriber, tagName string, directTag DirectPlcTag) *AdsSubscriptionHandle {
+func NewAdsSubscriptionHandle(subscriber spi.PlcSubscriber, tagName string, directTag DirectPlcTag, _options ...options.WithOption) *AdsSubscriptionHandle {
return &AdsSubscriptionHandle{
subscriber: subscriber,
tagName: tagName,
directTag: directTag,
consumers: []apiModel.PlcSubscriptionEventConsumer{},
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -65,7 +68,9 @@ func (t *AdsSubscriptionHandle) PublishPlcValue(value apiValues.PlcValue) {
map[string]spiModel.SubscriptionType{t.tagName: spiModel.SubscriptionChangeOfState},
map[string]time.Duration{t.tagName: time.Second},
map[string]apiModel.PlcResponseCode{t.tagName: apiModel.PlcResponseCode_OK},
- map[string]apiValues.PlcValue{t.tagName: value})
+ map[string]apiValues.PlcValue{t.tagName: value},
+ options.WithCustomLogger(t.log),
+ )
for _, consumer := range t.consumers {
consumer(&event)
}
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 657d30a0f8..8feae638e5 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -30,11 +30,11 @@ import (
"github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/apache/plc4x/plc4go/spi/transactions"
- "github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/test"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"net/url"
"sync/atomic"
"testing"
@@ -187,15 +187,10 @@ func TestConnection_ConnectWithContext(t *testing.T) {
// Build the default connection
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport(loggerOption)
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }(), loggerOption)
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -850,15 +845,10 @@ func TestConnection_fireConnectionError(t *testing.T) {
testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }(), loggerOption)
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -885,15 +875,10 @@ func TestConnection_fireConnectionError(t *testing.T) {
testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }(), loggerOption)
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -982,15 +967,10 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
loggerOption := options.WithCustomLogger(logger)
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport(loggerOption)
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }())
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1071,15 +1051,10 @@ func TestConnection_sendReset(t *testing.T) {
testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }(), loggerOption)
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1160,15 +1135,10 @@ func TestConnection_setApplicationFilter(t *testing.T) {
// Setup connection
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport(loggerOption)
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }(), loggerOption)
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1249,15 +1219,10 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
// Setup connection
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport(loggerOption)
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }(), loggerOption)
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1338,15 +1303,10 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
// Setup connection
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport(loggerOption)
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }(), loggerOption)
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1427,15 +1387,10 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
// Setup connection
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport(loggerOption)
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }(), loggerOption)
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1505,15 +1460,10 @@ func TestConnection_setupConnection(t *testing.T) {
// Setup connection
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport(loggerOption)
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }(), loggerOption)
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1542,11 +1492,8 @@ func TestConnection_setupConnection(t *testing.T) {
// Build the message codec
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ ti, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ require.NoError(t, err)
type MockState uint8
const (
RESET MockState = iota
@@ -1554,7 +1501,7 @@ func TestConnection_setupConnection(t *testing.T) {
)
currentState := atomic.Value{}
currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
switch currentState.Load().(MockState) {
case RESET:
t.Log("Dispatching reset echo")
@@ -1564,12 +1511,9 @@ func TestConnection_setupConnection(t *testing.T) {
t.Log("Done")
}
})
- codec := NewMessageCodec(transportInstance, loggerOption)
+ codec := NewMessageCodec(ti, loggerOption)
err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1599,11 +1543,8 @@ func TestConnection_setupConnection(t *testing.T) {
// Build the message codec
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ ti, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ require.NoError(t, err)
type MockState uint8
const (
RESET MockState = iota
@@ -1613,7 +1554,7 @@ func TestConnection_setupConnection(t *testing.T) {
)
currentState := atomic.Value{}
currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
switch currentState.Load().(MockState) {
case RESET:
t.Log("Dispatching reset echo")
@@ -1633,12 +1574,9 @@ func TestConnection_setupConnection(t *testing.T) {
t.Log("Done")
}
})
- codec := NewMessageCodec(transportInstance, loggerOption)
+ codec := NewMessageCodec(ti, loggerOption)
err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1668,11 +1606,8 @@ func TestConnection_setupConnection(t *testing.T) {
// Build the message codec
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ ti, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ require.NoError(t, err)
type MockState uint8
const (
RESET MockState = iota
@@ -1683,7 +1618,7 @@ func TestConnection_setupConnection(t *testing.T) {
)
currentState := atomic.Value{}
currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
switch currentState.Load().(MockState) {
case RESET:
t.Log("Dispatching reset echo")
@@ -1708,12 +1643,9 @@ func TestConnection_setupConnection(t *testing.T) {
t.Log("Done")
}
})
- codec := NewMessageCodec(transportInstance, loggerOption)
+ codec := NewMessageCodec(ti, loggerOption)
err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1743,11 +1675,8 @@ func TestConnection_setupConnection(t *testing.T) {
// Build the message codec
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ ti, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ require.NoError(t, err)
type MockState uint8
const (
RESET MockState = iota
@@ -1759,7 +1688,7 @@ func TestConnection_setupConnection(t *testing.T) {
)
currentState := atomic.Value{}
currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
switch currentState.Load().(MockState) {
case RESET:
t.Log("Dispatching reset echo")
@@ -1789,12 +1718,9 @@ func TestConnection_setupConnection(t *testing.T) {
t.Log("Done")
}
})
- codec := NewMessageCodec(transportInstance, loggerOption)
+ codec := NewMessageCodec(ti, loggerOption)
err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1807,9 +1733,17 @@ func TestConnection_setupConnection(t *testing.T) {
fields: fields{
DefaultConnection: _default.NewDefaultConnection(nil),
messageCodec: func() *MessageCodec {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
transport := test.NewTransport()
transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ ti, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
if err != nil {
t.Error(err)
t.FailNow()
@@ -1827,7 +1761,7 @@ func TestConnection_setupConnection(t *testing.T) {
)
currentState := atomic.Value{}
currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
switch currentState.Load().(MockState) {
case RESET:
t.Log("Dispatching reset echo")
@@ -1862,7 +1796,7 @@ func TestConnection_setupConnection(t *testing.T) {
t.Log("Done")
}
})
- codec := NewMessageCodec(transportInstance)
+ codec := NewMessageCodec(ti, loggerOption)
if err = codec.Connect(); err != nil {
t.Error(err)
t.FailNow()
@@ -1893,15 +1827,10 @@ func TestConnection_setupConnection(t *testing.T) {
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
// Build the message codec
- codec := NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }(), loggerOption)
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ require.NoError(t, err)
+ codec := NewMessageCodec(ti, loggerOption)
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index b5baf829e0..0d7f54a647 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -33,6 +33,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
"net/url"
"strings"
"sync/atomic"
@@ -160,26 +161,6 @@ func TestReader_readSync(t *testing.T) {
},
{
name: "unmapped tag",
- fields: fields{
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- codec := NewMessageCodec(transportInstance)
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
- },
args: args{
ctx: context.Background(),
readRequest: spiModel.NewDefaultPlcReadRequest(
@@ -195,6 +176,25 @@ func TestReader_readSync(t *testing.T) {
result: make(chan apiModel.PlcReadRequestResult, 1),
},
setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ transport := test.NewTransport()
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+ require.NoError(t, err)
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
fields.tm = transactions.NewRequestTransactionManager(10, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
},
resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
@@ -270,10 +270,7 @@ func TestReader_readSync(t *testing.T) {
transport := test.NewTransport()
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
type MockState uint8
const (
INITIAL MockState = iota
@@ -293,10 +290,10 @@ func TestReader_readSync(t *testing.T) {
})
codec := NewMessageCodec(transportInstance, loggerOption)
err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
fields.messageCodec = codec
},
resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
@@ -353,17 +350,13 @@ func TestReader_readSync(t *testing.T) {
transport := test.NewTransport()
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
codec := NewMessageCodec(transportInstance, loggerOption)
err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
-
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
fields.messageCodec = codec
},
resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
@@ -449,21 +442,13 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ require.NoError(t, err)
t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
+ assert.NoError(t, codec.Disconnect())
})
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
fields.messageCodec = codec
transaction := NewMockRequestTransaction(t)
@@ -478,46 +463,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
name: "Send message which responds with message to client",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- INITIAL MockState = iota
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(INITIAL)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case INITIAL:
- t.Log("Dispatching read response")
- transportInstance.FillReadBuffer([]byte("@1A2001\r@"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
- })
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
},
args: args{
ctx: func() context.Context {
@@ -556,14 +501,49 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
},
},
setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
expect.FailRequest(mock.Anything).Return(errors.New("Nope")).Run(func(_ error) {
close(ch)
})
args.transaction = transaction
+
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ require.NoError(t, err)
+ type MockState uint8
+ const (
+ INITIAL MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(INITIAL)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case INITIAL:
+ t.Log("Dispatching read response")
+ transportInstance.FillReadBuffer([]byte("@1A2001\r@"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
},
{
@@ -571,14 +551,18 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ require.NoError(t, err)
type MockState uint8
const (
INITIAL MockState = iota
@@ -596,18 +580,12 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
t.Log("Done")
}
})
- codec := NewMessageCodec(transportInstance)
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ require.NoError(t, err)
t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
+ assert.NoError(t, codec.Disconnect())
})
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
return codec
}(),
},
@@ -721,10 +699,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
type MockState uint8
const (
INITIAL MockState = iota
@@ -743,16 +718,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
})
codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ require.NoError(t, err)
t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
+ assert.NoError(t, codec.Disconnect())
})
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
fields.messageCodec = codec
},
},
@@ -819,10 +789,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
type MockState uint8
const (
INITIAL MockState = iota
@@ -841,16 +808,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
})
codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ require.NoError(t, err)
t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
+ assert.NoError(t, codec.Disconnect())
})
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
fields.messageCodec = codec
},
},
@@ -917,10 +879,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
type MockState uint8
const (
INITIAL MockState = iota
@@ -939,16 +898,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
})
codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ require.NoError(t, err)
t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
+ assert.NoError(t, codec.Disconnect())
})
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
fields.messageCodec = codec
},
},
@@ -1015,10 +969,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
type MockState uint8
const (
INITIAL MockState = iota
@@ -1037,16 +988,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
})
codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ require.NoError(t, err)
t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
+ assert.NoError(t, codec.Disconnect())
})
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
fields.messageCodec = codec
},
},
@@ -1113,10 +1059,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
type MockState uint8
const (
INITIAL MockState = iota
@@ -1135,16 +1078,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
})
codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ require.NoError(t, err)
t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
+ assert.NoError(t, codec.Disconnect())
})
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
fields.messageCodec = codec
},
},
@@ -1211,10 +1149,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
type MockState uint8
const (
INITIAL MockState = iota
@@ -1233,16 +1168,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
})
codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ require.NoError(t, err)
t.Cleanup(func() {
- if err := codec.Disconnect(); err != nil {
- t.Error(err)
- }
+ assert.NoError(t, codec.Disconnect())
})
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
fields.messageCodec = codec
},
},
diff --git a/plc4go/internal/eip/EipDriver.go b/plc4go/internal/eip/EipDriver.go
index 6dc0e4f9a3..5370638d9d 100644
--- a/plc4go/internal/eip/EipDriver.go
+++ b/plc4go/internal/eip/EipDriver.go
@@ -53,8 +53,8 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
return driver
}
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
@@ -64,39 +64,39 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
- options["defaultTcpPort"] = []string{"44818"}
+ driverOptions["defaultTcpPort"] = []string{"44818"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions)
if err != nil {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
- codec := NewMessageCodec(transportInstance)
+ codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
m.log.Debug().Msgf("working with codec %#v", codec)
- configuration, err := ParseFromOptions(m.log, options)
+ configuration, err := ParseFromOptions(m.log, driverOptions)
if err != nil {
- m.log.Error().Err(err).Msgf("Invalid options")
+ m.log.Error().Err(err).Msgf("Invalid driverOptions")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid driverOptions"))
return ch
}
driverContext, err := NewDriverContext(configuration)
if err != nil {
- m.log.Error().Err(err).Msgf("Invalid options")
+ m.log.Error().Err(err).Msgf("Invalid driverOptions")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid driverOptions"))
return ch
}
driverContext.awaitSetupComplete = m.awaitSetupComplete
driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
// Create the new connection
- connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, options)
+ connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions, options.WithCustomLogger(m.log))
m.log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index 89b7bc0353..092b4fb3b2 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -178,7 +178,7 @@ func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.Transp
return func() {
d.log.Debug().Msgf("Scanning %v", udpTransportInstance)
// Create a codec for sending and receiving messages.
- codec := NewMessageCodec(udpTransportInstance, nil)
+ codec := NewMessageCodec(udpTransportInstance, nil, options.WithCustomLogger(d.log))
// Explicitly start the worker
if err := codec.Connect(); err != nil {
d.log.Error().Err(err).Msg("Error connecting")
diff --git a/plc4go/internal/knxnetip/Driver.go b/plc4go/internal/knxnetip/Driver.go
index 494a9c1ed9..2d230e4b2e 100644
--- a/plc4go/internal/knxnetip/Driver.go
+++ b/plc4go/internal/knxnetip/Driver.go
@@ -51,7 +51,7 @@ func (m *Driver) CheckQuery(query string) error {
return err
}
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
@@ -60,9 +60,9 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
- options["defaultUdpPort"] = []string{"3671"}
+ driverOptions["defaultUdpPort"] = []string{"3671"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
if err != nil {
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't initialize transport configuration for given transport url %#v", transportUrl))
@@ -70,7 +70,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
}
// Create the new connection
- connection := NewConnection(transportInstance, options, m.GetPlcTagHandler())
+ connection := NewConnection(transportInstance, driverOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
m.log.Trace().Str("transport", transportUrl.String()).Stringer("connection", connection).Msg("created new connection instance, trying to connect now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/knxnetip/Subscriber.go b/plc4go/internal/knxnetip/Subscriber.go
index e6be1eb835..3c6f8b0a4f 100644
--- a/plc4go/internal/knxnetip/Subscriber.go
+++ b/plc4go/internal/knxnetip/Subscriber.go
@@ -177,7 +177,7 @@ func (m *Subscriber) handleValueChange(destinationAddress []byte, payload []byte
plcValues[tagName] = spiValues.NewPlcList(plcValueList)
}
}
- event := NewSubscriptionEvent(tags, types, intervals, responseCodes, addresses, plcValues)
+ event := NewSubscriptionEvent(tags, types, intervals, responseCodes, addresses, plcValues, options.WithCustomLogger(m.log))
consumer(&event)
}
}
diff --git a/plc4go/internal/modbus/ModbusAsciiDriver.go b/plc4go/internal/modbus/ModbusAsciiDriver.go
index 73e224cd98..becf7180cb 100644
--- a/plc4go/internal/modbus/ModbusAsciiDriver.go
+++ b/plc4go/internal/modbus/ModbusAsciiDriver.go
@@ -103,7 +103,7 @@ func (m ModbusAsciiDriver) GetConnectionWithContext(ctx context.Context, transpo
m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
// Create the new connection
- connection := NewConnection(unitIdentifier, codec, connectionOptions, m.GetPlcTagHandler())
+ connection := NewConnection(unitIdentifier, codec, connectionOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/modbus/ModbusRtuDriver.go b/plc4go/internal/modbus/ModbusRtuDriver.go
index a2c0f5b6fc..710c4c3a4a 100644
--- a/plc4go/internal/modbus/ModbusRtuDriver.go
+++ b/plc4go/internal/modbus/ModbusRtuDriver.go
@@ -103,7 +103,7 @@ func (m ModbusRtuDriver) GetConnectionWithContext(ctx context.Context, transport
m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
// Create the new connection
- connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler())
+ connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/modbus/ModbusTcpDriver.go b/plc4go/internal/modbus/ModbusTcpDriver.go
index e0d6069dc8..da5e0494f2 100644
--- a/plc4go/internal/modbus/ModbusTcpDriver.go
+++ b/plc4go/internal/modbus/ModbusTcpDriver.go
@@ -103,7 +103,7 @@ func (m ModbusTcpDriver) GetConnectionWithContext(ctx context.Context, transport
m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
// Create the new connection
- connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler())
+ connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index ae8ffc382a..2fea12f057 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -96,7 +96,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
// Create the new connection
- connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions)
+ connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions, options.WithCustomLogger(m.log))
m.log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/simulated/Driver.go b/plc4go/internal/simulated/Driver.go
index 847b8cb396..69b66c87fc 100644
--- a/plc4go/internal/simulated/Driver.go
+++ b/plc4go/internal/simulated/Driver.go
@@ -48,7 +48,7 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
}
func (d *Driver) GetConnectionWithContext(ctx context.Context, _ url.URL, _ map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- connection := NewConnection(NewDevice("test", options.WithCustomLogger(d.log)), d.GetPlcTagHandler(), d.valueHandler, driverOptions)
+ connection := NewConnection(NewDevice("test", options.WithCustomLogger(d.log)), d.GetPlcTagHandler(), d.valueHandler, driverOptions, options.WithCustomLogger(d.log))
d.log.Debug().Msgf("Connecting and returning connection %v", connection)
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/simulated/Driver_test.go b/plc4go/internal/simulated/Driver_test.go
index b9847c43fe..5d89c0bb44 100644
--- a/plc4go/internal/simulated/Driver_test.go
+++ b/plc4go/internal/simulated/Driver_test.go
@@ -20,6 +20,7 @@
package simulated
import (
+ "github.com/apache/plc4x/plc4go/spi/testutils"
"net/url"
"testing"
"time"
@@ -49,7 +50,7 @@ func TestDriver_CheckQuery(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := NewDriver()
+ d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
if err := d.CheckTagAddress(tt.args.query); (err != nil) != tt.wantErr {
t.Errorf("CheckTagAddress() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -86,7 +87,7 @@ func TestDriver_Discover(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := NewDriver()
+ d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
if err := d.Discover(tt.args.callback, tt.args.discoveryOptions...); (err != nil) != tt.wantErr {
t.Errorf("Discover() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -130,7 +131,7 @@ func TestDriver_GetConnection(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := NewDriver()
+ d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
connectionChan := d.GetConnection(tt.args.in0, tt.args.in1, tt.args.options)
timeout := time.NewTimer(3 * time.Second)
defer utils.CleanupTimer(timeout)
@@ -160,7 +161,7 @@ func TestDriver_GetDefaultTransport(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := NewDriver()
+ d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
if got := d.GetDefaultTransport(); got != tt.want {
t.Errorf("GetDefaultTransport() = %v, want %v", got, tt.want)
}
@@ -180,7 +181,7 @@ func TestDriver_GetProtocolCode(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := NewDriver()
+ d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
if got := d.GetProtocolCode(); got != tt.want {
t.Errorf("GetProtocolCode() = %v, want %v", got, tt.want)
}
@@ -200,7 +201,7 @@ func TestDriver_GetProtocolName(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := NewDriver()
+ d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
if got := d.GetProtocolName(); got != tt.want {
t.Errorf("GetProtocolName() = %v, want %v", got, tt.want)
}
diff --git a/plc4go/internal/simulated/Reader.go b/plc4go/internal/simulated/Reader.go
index 3958fc2ebc..4470c3c1b5 100644
--- a/plc4go/internal/simulated/Reader.go
+++ b/plc4go/internal/simulated/Reader.go
@@ -21,8 +21,10 @@ package simulated
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"strconv"
"time"
@@ -35,13 +37,17 @@ type Reader struct {
device *Device
options map[string][]string
tracer *tracer.Tracer
+
+ log zerolog.Logger
}
-func NewReader(device *Device, options map[string][]string, tracer *tracer.Tracer) *Reader {
+func NewReader(device *Device, readerOptions map[string][]string, tracer *tracer.Tracer, _options ...options.WithOption) *Reader {
return &Reader{
device: device,
- options: options,
+ options: readerOptions,
tracer: tracer,
+
+ log: options.ExtractCustomLogger(_options...),
}
}
diff --git a/plc4go/internal/simulated/Reader_test.go b/plc4go/internal/simulated/Reader_test.go
index e2df890bb6..8bf5b1b9b7 100644
--- a/plc4go/internal/simulated/Reader_test.go
+++ b/plc4go/internal/simulated/Reader_test.go
@@ -21,6 +21,8 @@ package simulated
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/stretchr/testify/assert"
"testing"
"time"
@@ -161,7 +163,7 @@ func TestReader_Read(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- r := NewReader(tt.fields.device, tt.fields.options, nil)
+ r := NewReader(tt.fields.device, tt.fields.options, nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
readRequest := spiModel.NewDefaultPlcReadRequest(tt.args.fields, tt.args.fieldNames, r, nil)
timeBeforeReadRequest := time.Now()
readResponseChannel := r.Read(context.TODO(), readRequest)
diff --git a/plc4go/internal/simulated/Writer.go b/plc4go/internal/simulated/Writer.go
index 44f39a86cb..f2fc95cfa0 100644
--- a/plc4go/internal/simulated/Writer.go
+++ b/plc4go/internal/simulated/Writer.go
@@ -21,8 +21,10 @@ package simulated
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"strconv"
"time"
@@ -34,13 +36,17 @@ type Writer struct {
device *Device
options map[string][]string
tracer *tracer.Tracer
+
+ log zerolog.Logger
}
-func NewWriter(device *Device, options map[string][]string, tracer *tracer.Tracer) *Writer {
+func NewWriter(device *Device, writerOptions map[string][]string, tracer *tracer.Tracer, _options ...options.WithOption) *Writer {
return &Writer{
device: device,
- options: options,
+ options: writerOptions,
tracer: tracer,
+
+ log: options.ExtractCustomLogger(_options...),
}
}
diff --git a/plc4go/internal/simulated/Writer_test.go b/plc4go/internal/simulated/Writer_test.go
index e823a28344..4f6adea84b 100644
--- a/plc4go/internal/simulated/Writer_test.go
+++ b/plc4go/internal/simulated/Writer_test.go
@@ -21,6 +21,8 @@ package simulated
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/stretchr/testify/assert"
"testing"
"time"
@@ -173,7 +175,7 @@ func TestWriter_Write(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- w := NewWriter(tt.fields.device, tt.fields.options, nil)
+ w := NewWriter(tt.fields.device, tt.fields.options, nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
writeRequest := spiModel.NewDefaultPlcWriteRequest(tt.args.fields, tt.args.fieldNames, tt.args.values, w, nil)
timeBeforeWriteRequest := time.Now()
writeResponseChannel := w.Write(context.TODO(), writeRequest)
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index 6c798eb391..b084e61d56 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -247,6 +247,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
tests := []struct {
name string
args args
+ setup func(t *testing.T, args *args)
executorValidator func(*testing.T, *executor) bool
}{
{
@@ -256,6 +257,9 @@ func TestNewFixedSizeExecutor(t *testing.T) {
queueDepth: 14,
options: []options.WithOption{WithExecutorOptionTracerWorkers(true)},
},
+ setup: func(t *testing.T, args *args) {
+ args.options = append(args.options, options.WithCustomLogger(produceTestLogger(t)))
+ },
executorValidator: func(t *testing.T, e *executor) bool {
return !e.running && !e.shutdown && len(e.worker) == 13 && cap(e.workItems) == 14
},
@@ -263,6 +267,9 @@ func TestNewFixedSizeExecutor(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.args)
+ }
fixedSizeExecutor := NewFixedSizeExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...)
defer fixedSizeExecutor.Stop()
assert.True(t, tt.executorValidator(t, fixedSizeExecutor.(*executor)), "NewFixedSizeExecutor(%v, %v, %v)", tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index f3d355789b..ae71055f07 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -37,9 +37,10 @@ func TestNewRequestTransactionManager(t *testing.T) {
requestTransactionManagerOptions []options.WithOption
}
tests := []struct {
- name string
- args args
- want RequestTransactionManager
+ name string
+ args args
+ setup func(t *testing.T, args *args)
+ want RequestTransactionManager
}{
{
name: "just create one",
@@ -65,6 +66,9 @@ func TestNewRequestTransactionManager(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.args)
+ }
if got := NewRequestTransactionManager(tt.args.numberOfConcurrentRequests, tt.args.requestTransactionManagerOptions...); !assert.Equal(t, tt.want, got) {
t.Errorf("NewRequestTransactionManager() = %v, want %v", got, tt.want)
}
diff --git a/plc4go/spi/transports/test/Transport.go b/plc4go/spi/transports/test/Transport.go
index 53c3f9383f..5f11bd346d 100644
--- a/plc4go/spi/transports/test/Transport.go
+++ b/plc4go/spi/transports/test/Transport.go
@@ -63,7 +63,7 @@ func (m *Transport) CreateTransportInstance(transportUrl url.URL, options map[st
return preregisteredInstance, nil
}
m.log.Trace().Msg("create transport instance")
- return NewTransportInstance(m), nil
+ return NewTransportInstance(m, _options...), nil
}
func (m *Transport) AddPreregisteredInstances(transportUrl url.URL, preregisteredInstance transports.TransportInstance) error {