You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/02 12:08:18 UTC

[plc4x] branch develop updated: fix(plc4go): pass loggers where missing

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2ff14f33c0 fix(plc4go): pass loggers where missing
2ff14f33c0 is described below

commit 2ff14f33c0a5fd2351d23128253f98804b38e3f1
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 14:08:11 2023 +0200

    fix(plc4go): pass loggers where missing
---
 plc4go/internal/ads/Driver.go                      |  18 +-
 plc4go/internal/ads/Subscriber.go                  |   2 +-
 plc4go/internal/ads/model/AdsSubscriptionHandle.go |  11 +-
 plc4go/internal/cbus/Connection_test.go            | 223 ++++++----------
 plc4go/internal/cbus/Reader_test.go                | 286 ++++++++-------------
 plc4go/internal/eip/EipDriver.go                   |  24 +-
 plc4go/internal/knxnetip/Discoverer.go             |   2 +-
 plc4go/internal/knxnetip/Driver.go                 |   8 +-
 plc4go/internal/knxnetip/Subscriber.go             |   2 +-
 plc4go/internal/modbus/ModbusAsciiDriver.go        |   2 +-
 plc4go/internal/modbus/ModbusRtuDriver.go          |   2 +-
 plc4go/internal/modbus/ModbusTcpDriver.go          |   2 +-
 plc4go/internal/s7/Driver.go                       |   2 +-
 plc4go/internal/simulated/Driver.go                |   2 +-
 plc4go/internal/simulated/Driver_test.go           |  13 +-
 plc4go/internal/simulated/Reader.go                |  10 +-
 plc4go/internal/simulated/Reader_test.go           |   4 +-
 plc4go/internal/simulated/Writer.go                |  10 +-
 plc4go/internal/simulated/Writer_test.go           |   4 +-
 plc4go/spi/pool/WorkerPool_test.go                 |   7 +
 .../transactions/RequestTransactionManager_test.go |  10 +-
 plc4go/spi/transports/test/Transport.go            |   2 +-
 22 files changed, 269 insertions(+), 377 deletions(-)

diff --git a/plc4go/internal/ads/Driver.go b/plc4go/internal/ads/Driver.go
index 91681eebd8..cfc2a33f3f 100644
--- a/plc4go/internal/ads/Driver.go
+++ b/plc4go/internal/ads/Driver.go
@@ -49,8 +49,8 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
 	return driver
 }
 
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
-	m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+	m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
 	// Get the transport specified in the url
 	transport, ok := transports[transportUrl.Scheme]
 	if !ok {
@@ -60,30 +60,30 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
 		return ch
 	}
 	// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
-	options["defaultTcpPort"] = []string{strconv.Itoa(int(adsModel.AdsConstants_ADSTCPDEFAULTPORT))}
+	driverOptions["defaultTcpPort"] = []string{strconv.Itoa(int(adsModel.AdsConstants_ADSTCPDEFAULTPORT))}
 	// Have the transport create a new transport-instance.
-	transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+	transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions)
 	if err != nil {
-		m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+		m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
 		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
 		return ch
 	}
 
 	// Create a new codec for taking care of encoding/decoding of messages
-	codec := NewMessageCodec(transportInstance)
+	codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
 	m.log.Debug().Msgf("working with codec %#v", codec)
 
-	configuration, err := model.ParseFromOptions(m.log, options)
+	configuration, err := model.ParseFromOptions(m.log, driverOptions)
 	if err != nil {
-		m.log.Error().Err(err).Msgf("Invalid options")
+		m.log.Error().Err(err).Msgf("Invalid driverOptions")
 		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "invalid configuration"))
 		return ch
 	}
 
 	// Create the new connection
-	connection, err := NewConnection(codec, configuration, options)
+	connection, err := NewConnection(codec, configuration, driverOptions)
 	if err != nil {
 		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "couldn't create connection"))
diff --git a/plc4go/internal/ads/Subscriber.go b/plc4go/internal/ads/Subscriber.go
index cf0ced6ed1..5442b57184 100644
--- a/plc4go/internal/ads/Subscriber.go
+++ b/plc4go/internal/ads/Subscriber.go
@@ -147,7 +147,7 @@ func (m *Connection) subscribe(ctx context.Context, subscriptionRequest apiModel
 			)
 		}
 		// Create a new subscription handle.
-		subscriptionHandle := dirverModel.NewAdsSubscriptionHandle(m, tagName, directTag)
+		subscriptionHandle := dirverModel.NewAdsSubscriptionHandle(m, tagName, directTag, options.WithCustomLogger(m.log))
 		responseChan <- spiModel.NewDefaultPlcSubscriptionRequestResult(
 			subscriptionRequest,
 			spiModel.NewDefaultPlcSubscriptionResponse(
diff --git a/plc4go/internal/ads/model/AdsSubscriptionHandle.go b/plc4go/internal/ads/model/AdsSubscriptionHandle.go
index 111b0d576c..81bf0254c9 100644
--- a/plc4go/internal/ads/model/AdsSubscriptionHandle.go
+++ b/plc4go/internal/ads/model/AdsSubscriptionHandle.go
@@ -20,6 +20,8 @@
 package model
 
 import (
+	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/rs/zerolog"
 	"time"
 
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -34,15 +36,16 @@ type AdsSubscriptionHandle struct {
 	directTag  DirectPlcTag
 	consumers  []apiModel.PlcSubscriptionEventConsumer
 
-	apiModel.PlcSubscriptionHandle
+	log zerolog.Logger
 }
 
-func NewAdsSubscriptionHandle(subscriber spi.PlcSubscriber, tagName string, directTag DirectPlcTag) *AdsSubscriptionHandle {
+func NewAdsSubscriptionHandle(subscriber spi.PlcSubscriber, tagName string, directTag DirectPlcTag, _options ...options.WithOption) *AdsSubscriptionHandle {
 	return &AdsSubscriptionHandle{
 		subscriber: subscriber,
 		tagName:    tagName,
 		directTag:  directTag,
 		consumers:  []apiModel.PlcSubscriptionEventConsumer{},
+		log:        options.ExtractCustomLogger(_options...),
 	}
 }
 
@@ -65,7 +68,9 @@ func (t *AdsSubscriptionHandle) PublishPlcValue(value apiValues.PlcValue) {
 		map[string]spiModel.SubscriptionType{t.tagName: spiModel.SubscriptionChangeOfState},
 		map[string]time.Duration{t.tagName: time.Second},
 		map[string]apiModel.PlcResponseCode{t.tagName: apiModel.PlcResponseCode_OK},
-		map[string]apiValues.PlcValue{t.tagName: value})
+		map[string]apiValues.PlcValue{t.tagName: value},
+		options.WithCustomLogger(t.log),
+	)
 	for _, consumer := range t.consumers {
 		consumer(&event)
 	}
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 657d30a0f8..8feae638e5 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -30,11 +30,11 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"github.com/apache/plc4x/plc4go/spi/tracer"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
-	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
 	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 	"net/url"
 	"sync/atomic"
 	"testing"
@@ -187,15 +187,10 @@ func TestConnection_ConnectWithContext(t *testing.T) {
 
 				// Build the default connection
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport(loggerOption)
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}(), loggerOption)
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -850,15 +845,10 @@ func TestConnection_fireConnectionError(t *testing.T) {
 				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
 
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport()
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}(), loggerOption)
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -885,15 +875,10 @@ func TestConnection_fireConnectionError(t *testing.T) {
 				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
 
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport()
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}(), loggerOption)
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -982,15 +967,10 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
 				loggerOption := options.WithCustomLogger(logger)
 
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport(loggerOption)
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}())
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1071,15 +1051,10 @@ func TestConnection_sendReset(t *testing.T) {
 				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
 
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport()
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}(), loggerOption)
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1160,15 +1135,10 @@ func TestConnection_setApplicationFilter(t *testing.T) {
 
 				// Setup connection
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport(loggerOption)
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}(), loggerOption)
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1249,15 +1219,10 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
 
 				// Setup connection
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport(loggerOption)
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}(), loggerOption)
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1338,15 +1303,10 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
 
 				// Setup connection
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport(loggerOption)
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}(), loggerOption)
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1427,15 +1387,10 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
 
 				// Setup connection
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport(loggerOption)
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}(), loggerOption)
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1505,15 +1460,10 @@ func TestConnection_setupConnection(t *testing.T) {
 
 				// Setup connection
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport(loggerOption)
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}(), loggerOption)
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1542,11 +1492,8 @@ func TestConnection_setupConnection(t *testing.T) {
 				// Build the message codec
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
-				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				ti, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					RESET MockState = iota
@@ -1554,7 +1501,7 @@ func TestConnection_setupConnection(t *testing.T) {
 				)
 				currentState := atomic.Value{}
 				currentState.Store(RESET)
-				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+				ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
 					switch currentState.Load().(MockState) {
 					case RESET:
 						t.Log("Dispatching reset echo")
@@ -1564,12 +1511,9 @@ func TestConnection_setupConnection(t *testing.T) {
 						t.Log("Done")
 					}
 				})
-				codec := NewMessageCodec(transportInstance, loggerOption)
+				codec := NewMessageCodec(ti, loggerOption)
 				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1599,11 +1543,8 @@ func TestConnection_setupConnection(t *testing.T) {
 				// Build the message codec
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
-				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				ti, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					RESET MockState = iota
@@ -1613,7 +1554,7 @@ func TestConnection_setupConnection(t *testing.T) {
 				)
 				currentState := atomic.Value{}
 				currentState.Store(RESET)
-				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+				ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
 					switch currentState.Load().(MockState) {
 					case RESET:
 						t.Log("Dispatching reset echo")
@@ -1633,12 +1574,9 @@ func TestConnection_setupConnection(t *testing.T) {
 						t.Log("Done")
 					}
 				})
-				codec := NewMessageCodec(transportInstance, loggerOption)
+				codec := NewMessageCodec(ti, loggerOption)
 				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1668,11 +1606,8 @@ func TestConnection_setupConnection(t *testing.T) {
 				// Build the message codec
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
-				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				ti, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					RESET MockState = iota
@@ -1683,7 +1618,7 @@ func TestConnection_setupConnection(t *testing.T) {
 				)
 				currentState := atomic.Value{}
 				currentState.Store(RESET)
-				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+				ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
 					switch currentState.Load().(MockState) {
 					case RESET:
 						t.Log("Dispatching reset echo")
@@ -1708,12 +1643,9 @@ func TestConnection_setupConnection(t *testing.T) {
 						t.Log("Done")
 					}
 				})
-				codec := NewMessageCodec(transportInstance, loggerOption)
+				codec := NewMessageCodec(ti, loggerOption)
 				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1743,11 +1675,8 @@ func TestConnection_setupConnection(t *testing.T) {
 				// Build the message codec
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
-				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				ti, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					RESET MockState = iota
@@ -1759,7 +1688,7 @@ func TestConnection_setupConnection(t *testing.T) {
 				)
 				currentState := atomic.Value{}
 				currentState.Store(RESET)
-				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+				ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
 					switch currentState.Load().(MockState) {
 					case RESET:
 						t.Log("Dispatching reset echo")
@@ -1789,12 +1718,9 @@ func TestConnection_setupConnection(t *testing.T) {
 						t.Log("Done")
 					}
 				})
-				codec := NewMessageCodec(transportInstance, loggerOption)
+				codec := NewMessageCodec(ti, loggerOption)
 				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
@@ -1807,9 +1733,17 @@ func TestConnection_setupConnection(t *testing.T) {
 			fields: fields{
 				DefaultConnection: _default.NewDefaultConnection(nil),
 				messageCodec: func() *MessageCodec {
+					// Setup logger
+					logger := testutils.ProduceTestingLogger(t)
+
+					testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+					// Custom option for that
+					loggerOption := options.WithCustomLogger(logger)
+
 					transport := test.NewTransport()
 					transportUrl := url.URL{Scheme: "test"}
-					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					ti, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
 					if err != nil {
 						t.Error(err)
 						t.FailNow()
@@ -1827,7 +1761,7 @@ func TestConnection_setupConnection(t *testing.T) {
 					)
 					currentState := atomic.Value{}
 					currentState.Store(RESET)
-					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
 						switch currentState.Load().(MockState) {
 						case RESET:
 							t.Log("Dispatching reset echo")
@@ -1862,7 +1796,7 @@ func TestConnection_setupConnection(t *testing.T) {
 							t.Log("Done")
 						}
 					})
-					codec := NewMessageCodec(transportInstance)
+					codec := NewMessageCodec(ti, loggerOption)
 					if err = codec.Connect(); err != nil {
 						t.Error(err)
 						t.FailNow()
@@ -1893,15 +1827,10 @@ func TestConnection_setupConnection(t *testing.T) {
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
 
 				// Build the message codec
-				codec := NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport()
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}(), loggerOption)
+				transport := test.NewTransport(loggerOption)
+				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+				require.NoError(t, err)
+				codec := NewMessageCodec(ti, loggerOption)
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index b5baf829e0..0d7f54a647 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -33,6 +33,7 @@ import (
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
+	"github.com/stretchr/testify/require"
 	"net/url"
 	"strings"
 	"sync/atomic"
@@ -160,26 +161,6 @@ func TestReader_readSync(t *testing.T) {
 		},
 		{
 			name: "unmapped tag",
-			fields: fields{
-				messageCodec: func() *MessageCodec {
-					transport := test.NewTransport()
-					transportUrl := url.URL{Scheme: "test"}
-					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-						return nil
-					}
-					codec := NewMessageCodec(transportInstance)
-					err = codec.Connect()
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-						return nil
-					}
-					return codec
-				}(),
-			},
 			args: args{
 				ctx: context.Background(),
 				readRequest: spiModel.NewDefaultPlcReadRequest(
@@ -195,6 +176,25 @@ func TestReader_readSync(t *testing.T) {
 				result: make(chan apiModel.PlcReadRequestResult, 1),
 			},
 			setup: func(t *testing.T, fields *fields) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+
+				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				// Custom option for that
+				loggerOption := options.WithCustomLogger(logger)
+
+				transport := test.NewTransport()
+				transportUrl := url.URL{Scheme: "test"}
+				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+				require.NoError(t, err)
+				codec := NewMessageCodec(transportInstance, loggerOption)
+				err = codec.Connect()
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 				fields.tm = transactions.NewRequestTransactionManager(10, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
 			},
 			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
@@ -270,10 +270,7 @@ func TestReader_readSync(t *testing.T) {
 				transport := test.NewTransport()
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					INITIAL MockState = iota
@@ -293,10 +290,10 @@ func TestReader_readSync(t *testing.T) {
 				})
 				codec := NewMessageCodec(transportInstance, loggerOption)
 				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
 				fields.messageCodec = codec
 			},
 			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
@@ -353,17 +350,13 @@ func TestReader_readSync(t *testing.T) {
 				transport := test.NewTransport()
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				codec := NewMessageCodec(transportInstance, loggerOption)
 				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
-
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
 				fields.messageCodec = codec
 			},
 			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
@@ -449,21 +442,13 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				codec := NewMessageCodec(transportInstance, loggerOption)
+				err = codec.Connect()
+				require.NoError(t, err)
 				t.Cleanup(func() {
-					if err := codec.Disconnect(); err != nil {
-						t.Error(err)
-					}
+					assert.NoError(t, codec.Disconnect())
 				})
-				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
 				fields.messageCodec = codec
 
 				transaction := NewMockRequestTransaction(t)
@@ -478,46 +463,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 			name: "Send message which responds with message to client",
 			fields: fields{
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
-				messageCodec: func() *MessageCodec {
-					transport := test.NewTransport()
-					transportUrl := url.URL{Scheme: "test"}
-					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-						return nil
-					}
-					type MockState uint8
-					const (
-						INITIAL MockState = iota
-						DONE
-					)
-					currentState := atomic.Value{}
-					currentState.Store(INITIAL)
-					transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
-						switch currentState.Load().(MockState) {
-						case INITIAL:
-							t.Log("Dispatching read response")
-							transportInstance.FillReadBuffer([]byte("@1A2001\r@"))
-							currentState.Store(DONE)
-						case DONE:
-							t.Log("Done")
-						}
-					})
-					codec := NewMessageCodec(transportInstance)
-					t.Cleanup(func() {
-						if err := codec.Disconnect(); err != nil {
-							t.Error(err)
-						}
-					})
-					err = codec.Connect()
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-						return nil
-					}
-					return codec
-				}(),
 			},
 			args: args{
 				ctx: func() context.Context {
@@ -556,14 +501,49 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+
 				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
 
+				// Custom option for that
+				loggerOption := options.WithCustomLogger(logger)
+
 				transaction := NewMockRequestTransaction(t)
 				expect := transaction.EXPECT()
 				expect.FailRequest(mock.Anything).Return(errors.New("Nope")).Run(func(_ error) {
 					close(ch)
 				})
 				args.transaction = transaction
+
+				transport := test.NewTransport(loggerOption)
+				transportUrl := url.URL{Scheme: "test"}
+				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+				require.NoError(t, err)
+				type MockState uint8
+				const (
+					INITIAL MockState = iota
+					DONE
+				)
+				currentState := atomic.Value{}
+				currentState.Store(INITIAL)
+				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					switch currentState.Load().(MockState) {
+					case INITIAL:
+						t.Log("Dispatching read response")
+						transportInstance.FillReadBuffer([]byte("@1A2001\r@"))
+						currentState.Store(DONE)
+					case DONE:
+						t.Log("Done")
+					}
+				})
+				codec := NewMessageCodec(transportInstance, loggerOption)
+				err = codec.Connect()
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 		},
 		{
@@ -571,14 +551,18 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 			fields: fields{
 				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
 				messageCodec: func() *MessageCodec {
-					transport := test.NewTransport()
+					// Setup logger
+					logger := testutils.ProduceTestingLogger(t)
+
+					testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+					// Custom option for that
+					loggerOption := options.WithCustomLogger(logger)
+
+					transport := test.NewTransport(loggerOption)
 					transportUrl := url.URL{Scheme: "test"}
-					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-						return nil
-					}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+					require.NoError(t, err)
 					type MockState uint8
 					const (
 						INITIAL MockState = iota
@@ -596,18 +580,12 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 							t.Log("Done")
 						}
 					})
-					codec := NewMessageCodec(transportInstance)
+					codec := NewMessageCodec(transportInstance, loggerOption)
+					err = codec.Connect()
+					require.NoError(t, err)
 					t.Cleanup(func() {
-						if err := codec.Disconnect(); err != nil {
-							t.Error(err)
-						}
+						assert.NoError(t, codec.Disconnect())
 					})
-					err = codec.Connect()
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-						return nil
-					}
 					return codec
 				}(),
 			},
@@ -721,10 +699,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					INITIAL MockState = iota
@@ -743,16 +718,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					}
 				})
 				codec := NewMessageCodec(transportInstance, loggerOption)
+				err = codec.Connect()
+				require.NoError(t, err)
 				t.Cleanup(func() {
-					if err := codec.Disconnect(); err != nil {
-						t.Error(err)
-					}
+					assert.NoError(t, codec.Disconnect())
 				})
-				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
 				fields.messageCodec = codec
 			},
 		},
@@ -819,10 +789,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					INITIAL MockState = iota
@@ -841,16 +808,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					}
 				})
 				codec := NewMessageCodec(transportInstance, loggerOption)
+				err = codec.Connect()
+				require.NoError(t, err)
 				t.Cleanup(func() {
-					if err := codec.Disconnect(); err != nil {
-						t.Error(err)
-					}
+					assert.NoError(t, codec.Disconnect())
 				})
-				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
 				fields.messageCodec = codec
 			},
 		},
@@ -917,10 +879,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					INITIAL MockState = iota
@@ -939,16 +898,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					}
 				})
 				codec := NewMessageCodec(transportInstance, loggerOption)
+				err = codec.Connect()
+				require.NoError(t, err)
 				t.Cleanup(func() {
-					if err := codec.Disconnect(); err != nil {
-						t.Error(err)
-					}
+					assert.NoError(t, codec.Disconnect())
 				})
-				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
 				fields.messageCodec = codec
 			},
 		},
@@ -1015,10 +969,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					INITIAL MockState = iota
@@ -1037,16 +988,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					}
 				})
 				codec := NewMessageCodec(transportInstance, loggerOption)
+				err = codec.Connect()
+				require.NoError(t, err)
 				t.Cleanup(func() {
-					if err := codec.Disconnect(); err != nil {
-						t.Error(err)
-					}
+					assert.NoError(t, codec.Disconnect())
 				})
-				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
 				fields.messageCodec = codec
 			},
 		},
@@ -1113,10 +1059,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					INITIAL MockState = iota
@@ -1135,16 +1078,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					}
 				})
 				codec := NewMessageCodec(transportInstance, loggerOption)
+				err = codec.Connect()
+				require.NoError(t, err)
 				t.Cleanup(func() {
-					if err := codec.Disconnect(); err != nil {
-						t.Error(err)
-					}
+					assert.NoError(t, codec.Disconnect())
 				})
-				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
 				fields.messageCodec = codec
 			},
 		},
@@ -1211,10 +1149,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				type MockState uint8
 				const (
 					INITIAL MockState = iota
@@ -1233,16 +1168,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					}
 				})
 				codec := NewMessageCodec(transportInstance, loggerOption)
+				err = codec.Connect()
+				require.NoError(t, err)
 				t.Cleanup(func() {
-					if err := codec.Disconnect(); err != nil {
-						t.Error(err)
-					}
+					assert.NoError(t, codec.Disconnect())
 				})
-				err = codec.Connect()
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
 				fields.messageCodec = codec
 			},
 		},
diff --git a/plc4go/internal/eip/EipDriver.go b/plc4go/internal/eip/EipDriver.go
index 6dc0e4f9a3..5370638d9d 100644
--- a/plc4go/internal/eip/EipDriver.go
+++ b/plc4go/internal/eip/EipDriver.go
@@ -53,8 +53,8 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
 	return driver
 }
 
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
-	m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+	m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
 	// Get an the transport specified in the url
 	transport, ok := transports[transportUrl.Scheme]
 	if !ok {
@@ -64,39 +64,39 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
 		return ch
 	}
 	// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
-	options["defaultTcpPort"] = []string{"44818"}
+	driverOptions["defaultTcpPort"] = []string{"44818"}
 	// Have the transport create a new transport-instance.
-	transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+	transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions)
 	if err != nil {
-		m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+		m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
 		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
 		return ch
 	}
 
-	codec := NewMessageCodec(transportInstance)
+	codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
 	m.log.Debug().Msgf("working with codec %#v", codec)
 
-	configuration, err := ParseFromOptions(m.log, options)
+	configuration, err := ParseFromOptions(m.log, driverOptions)
 	if err != nil {
-		m.log.Error().Err(err).Msgf("Invalid options")
+		m.log.Error().Err(err).Msgf("Invalid driverOptions")
 		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
-		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
+		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid driverOptions"))
 		return ch
 	}
 
 	driverContext, err := NewDriverContext(configuration)
 	if err != nil {
-		m.log.Error().Err(err).Msgf("Invalid options")
+		m.log.Error().Err(err).Msgf("Invalid driverOptions")
 		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
-		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
+		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid driverOptions"))
 		return ch
 	}
 	driverContext.awaitSetupComplete = m.awaitSetupComplete
 	driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
 
 	// Create the new connection
-	connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, options)
+	connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions, options.WithCustomLogger(m.log))
 	m.log.Debug().Msg("created connection, connecting now")
 	return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index 89b7bc0353..092b4fb3b2 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -178,7 +178,7 @@ func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.Transp
 	return func() {
 		d.log.Debug().Msgf("Scanning %v", udpTransportInstance)
 		// Create a codec for sending and receiving messages.
-		codec := NewMessageCodec(udpTransportInstance, nil)
+		codec := NewMessageCodec(udpTransportInstance, nil, options.WithCustomLogger(d.log))
 		// Explicitly start the worker
 		if err := codec.Connect(); err != nil {
 			d.log.Error().Err(err).Msg("Error connecting")
diff --git a/plc4go/internal/knxnetip/Driver.go b/plc4go/internal/knxnetip/Driver.go
index 494a9c1ed9..2d230e4b2e 100644
--- a/plc4go/internal/knxnetip/Driver.go
+++ b/plc4go/internal/knxnetip/Driver.go
@@ -51,7 +51,7 @@ func (m *Driver) CheckQuery(query string) error {
 	return err
 }
 
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
 	// Get an the transport specified in the url
 	transport, ok := transports[transportUrl.Scheme]
 	if !ok {
@@ -60,9 +60,9 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
 		return ch
 	}
 	// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
-	options["defaultUdpPort"] = []string{"3671"}
+	driverOptions["defaultUdpPort"] = []string{"3671"}
 	// Have the transport create a new transport-instance.
-	transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+	transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
 	if err != nil {
 		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't initialize transport configuration for given transport url %#v", transportUrl))
@@ -70,7 +70,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
 	}
 
 	// Create the new connection
-	connection := NewConnection(transportInstance, options, m.GetPlcTagHandler())
+	connection := NewConnection(transportInstance, driverOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
 	m.log.Trace().Str("transport", transportUrl.String()).Stringer("connection", connection).Msg("created new connection instance, trying to connect now")
 	return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/knxnetip/Subscriber.go b/plc4go/internal/knxnetip/Subscriber.go
index e6be1eb835..3c6f8b0a4f 100644
--- a/plc4go/internal/knxnetip/Subscriber.go
+++ b/plc4go/internal/knxnetip/Subscriber.go
@@ -177,7 +177,7 @@ func (m *Subscriber) handleValueChange(destinationAddress []byte, payload []byte
 					plcValues[tagName] = spiValues.NewPlcList(plcValueList)
 				}
 			}
-			event := NewSubscriptionEvent(tags, types, intervals, responseCodes, addresses, plcValues)
+			event := NewSubscriptionEvent(tags, types, intervals, responseCodes, addresses, plcValues, options.WithCustomLogger(m.log))
 			consumer(&event)
 		}
 	}
diff --git a/plc4go/internal/modbus/ModbusAsciiDriver.go b/plc4go/internal/modbus/ModbusAsciiDriver.go
index 73e224cd98..becf7180cb 100644
--- a/plc4go/internal/modbus/ModbusAsciiDriver.go
+++ b/plc4go/internal/modbus/ModbusAsciiDriver.go
@@ -103,7 +103,7 @@ func (m ModbusAsciiDriver) GetConnectionWithContext(ctx context.Context, transpo
 	m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
 
 	// Create the new connection
-	connection := NewConnection(unitIdentifier, codec, connectionOptions, m.GetPlcTagHandler())
+	connection := NewConnection(unitIdentifier, codec, connectionOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
 	m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
 	return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/modbus/ModbusRtuDriver.go b/plc4go/internal/modbus/ModbusRtuDriver.go
index a2c0f5b6fc..710c4c3a4a 100644
--- a/plc4go/internal/modbus/ModbusRtuDriver.go
+++ b/plc4go/internal/modbus/ModbusRtuDriver.go
@@ -103,7 +103,7 @@ func (m ModbusRtuDriver) GetConnectionWithContext(ctx context.Context, transport
 	m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
 
 	// Create the new connection
-	connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler())
+	connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
 	m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
 	return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/modbus/ModbusTcpDriver.go b/plc4go/internal/modbus/ModbusTcpDriver.go
index e0d6069dc8..da5e0494f2 100644
--- a/plc4go/internal/modbus/ModbusTcpDriver.go
+++ b/plc4go/internal/modbus/ModbusTcpDriver.go
@@ -103,7 +103,7 @@ func (m ModbusTcpDriver) GetConnectionWithContext(ctx context.Context, transport
 	m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
 
 	// Create the new connection
-	connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler())
+	connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
 	m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
 	return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index ae8ffc382a..2fea12f057 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -96,7 +96,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
 	driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
 
 	// Create the new connection
-	connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions)
+	connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions, options.WithCustomLogger(m.log))
 	m.log.Debug().Msg("created connection, connecting now")
 	return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/simulated/Driver.go b/plc4go/internal/simulated/Driver.go
index 847b8cb396..69b66c87fc 100644
--- a/plc4go/internal/simulated/Driver.go
+++ b/plc4go/internal/simulated/Driver.go
@@ -48,7 +48,7 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
 }
 
 func (d *Driver) GetConnectionWithContext(ctx context.Context, _ url.URL, _ map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
-	connection := NewConnection(NewDevice("test", options.WithCustomLogger(d.log)), d.GetPlcTagHandler(), d.valueHandler, driverOptions)
+	connection := NewConnection(NewDevice("test", options.WithCustomLogger(d.log)), d.GetPlcTagHandler(), d.valueHandler, driverOptions, options.WithCustomLogger(d.log))
 	d.log.Debug().Msgf("Connecting and returning connection %v", connection)
 	return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/simulated/Driver_test.go b/plc4go/internal/simulated/Driver_test.go
index b9847c43fe..5d89c0bb44 100644
--- a/plc4go/internal/simulated/Driver_test.go
+++ b/plc4go/internal/simulated/Driver_test.go
@@ -20,6 +20,7 @@
 package simulated
 
 import (
+	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"net/url"
 	"testing"
 	"time"
@@ -49,7 +50,7 @@ func TestDriver_CheckQuery(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			d := NewDriver()
+			d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
 			if err := d.CheckTagAddress(tt.args.query); (err != nil) != tt.wantErr {
 				t.Errorf("CheckTagAddress() error = %v, wantErr %v", err, tt.wantErr)
 			}
@@ -86,7 +87,7 @@ func TestDriver_Discover(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			d := NewDriver()
+			d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
 			if err := d.Discover(tt.args.callback, tt.args.discoveryOptions...); (err != nil) != tt.wantErr {
 				t.Errorf("Discover() error = %v, wantErr %v", err, tt.wantErr)
 			}
@@ -130,7 +131,7 @@ func TestDriver_GetConnection(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			d := NewDriver()
+			d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
 			connectionChan := d.GetConnection(tt.args.in0, tt.args.in1, tt.args.options)
 			timeout := time.NewTimer(3 * time.Second)
 			defer utils.CleanupTimer(timeout)
@@ -160,7 +161,7 @@ func TestDriver_GetDefaultTransport(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			d := NewDriver()
+			d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
 			if got := d.GetDefaultTransport(); got != tt.want {
 				t.Errorf("GetDefaultTransport() = %v, want %v", got, tt.want)
 			}
@@ -180,7 +181,7 @@ func TestDriver_GetProtocolCode(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			d := NewDriver()
+			d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
 			if got := d.GetProtocolCode(); got != tt.want {
 				t.Errorf("GetProtocolCode() = %v, want %v", got, tt.want)
 			}
@@ -200,7 +201,7 @@ func TestDriver_GetProtocolName(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			d := NewDriver()
+			d := NewDriver(options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
 			if got := d.GetProtocolName(); got != tt.want {
 				t.Errorf("GetProtocolName() = %v, want %v", got, tt.want)
 			}
diff --git a/plc4go/internal/simulated/Reader.go b/plc4go/internal/simulated/Reader.go
index 3958fc2ebc..4470c3c1b5 100644
--- a/plc4go/internal/simulated/Reader.go
+++ b/plc4go/internal/simulated/Reader.go
@@ -21,8 +21,10 @@ package simulated
 
 import (
 	"context"
+	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/tracer"
 	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
 	"strconv"
 	"time"
 
@@ -35,13 +37,17 @@ type Reader struct {
 	device  *Device
 	options map[string][]string
 	tracer  *tracer.Tracer
+
+	log zerolog.Logger
 }
 
-func NewReader(device *Device, options map[string][]string, tracer *tracer.Tracer) *Reader {
+func NewReader(device *Device, readerOptions map[string][]string, tracer *tracer.Tracer, _options ...options.WithOption) *Reader {
 	return &Reader{
 		device:  device,
-		options: options,
+		options: readerOptions,
 		tracer:  tracer,
+
+		log: options.ExtractCustomLogger(_options...),
 	}
 }
 
diff --git a/plc4go/internal/simulated/Reader_test.go b/plc4go/internal/simulated/Reader_test.go
index e2df890bb6..8bf5b1b9b7 100644
--- a/plc4go/internal/simulated/Reader_test.go
+++ b/plc4go/internal/simulated/Reader_test.go
@@ -21,6 +21,8 @@ package simulated
 
 import (
 	"context"
+	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"github.com/stretchr/testify/assert"
 	"testing"
 	"time"
@@ -161,7 +163,7 @@ func TestReader_Read(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			r := NewReader(tt.fields.device, tt.fields.options, nil)
+			r := NewReader(tt.fields.device, tt.fields.options, nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
 			readRequest := spiModel.NewDefaultPlcReadRequest(tt.args.fields, tt.args.fieldNames, r, nil)
 			timeBeforeReadRequest := time.Now()
 			readResponseChannel := r.Read(context.TODO(), readRequest)
diff --git a/plc4go/internal/simulated/Writer.go b/plc4go/internal/simulated/Writer.go
index 44f39a86cb..f2fc95cfa0 100644
--- a/plc4go/internal/simulated/Writer.go
+++ b/plc4go/internal/simulated/Writer.go
@@ -21,8 +21,10 @@ package simulated
 
 import (
 	"context"
+	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/tracer"
 	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
 	"strconv"
 	"time"
 
@@ -34,13 +36,17 @@ type Writer struct {
 	device  *Device
 	options map[string][]string
 	tracer  *tracer.Tracer
+
+	log zerolog.Logger
 }
 
-func NewWriter(device *Device, options map[string][]string, tracer *tracer.Tracer) *Writer {
+func NewWriter(device *Device, writerOptions map[string][]string, tracer *tracer.Tracer, _options ...options.WithOption) *Writer {
 	return &Writer{
 		device:  device,
-		options: options,
+		options: writerOptions,
 		tracer:  tracer,
+
+		log: options.ExtractCustomLogger(_options...),
 	}
 }
 
diff --git a/plc4go/internal/simulated/Writer_test.go b/plc4go/internal/simulated/Writer_test.go
index e823a28344..4f6adea84b 100644
--- a/plc4go/internal/simulated/Writer_test.go
+++ b/plc4go/internal/simulated/Writer_test.go
@@ -21,6 +21,8 @@ package simulated
 
 import (
 	"context"
+	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"github.com/stretchr/testify/assert"
 	"testing"
 	"time"
@@ -173,7 +175,7 @@ func TestWriter_Write(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			w := NewWriter(tt.fields.device, tt.fields.options, nil)
+			w := NewWriter(tt.fields.device, tt.fields.options, nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
 			writeRequest := spiModel.NewDefaultPlcWriteRequest(tt.args.fields, tt.args.fieldNames, tt.args.values, w, nil)
 			timeBeforeWriteRequest := time.Now()
 			writeResponseChannel := w.Write(context.TODO(), writeRequest)
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index 6c798eb391..b084e61d56 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -247,6 +247,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
 	tests := []struct {
 		name              string
 		args              args
+		setup             func(t *testing.T, args *args)
 		executorValidator func(*testing.T, *executor) bool
 	}{
 		{
@@ -256,6 +257,9 @@ func TestNewFixedSizeExecutor(t *testing.T) {
 				queueDepth:      14,
 				options:         []options.WithOption{WithExecutorOptionTracerWorkers(true)},
 			},
+			setup: func(t *testing.T, args *args) {
+				args.options = append(args.options, options.WithCustomLogger(produceTestLogger(t)))
+			},
 			executorValidator: func(t *testing.T, e *executor) bool {
 				return !e.running && !e.shutdown && len(e.worker) == 13 && cap(e.workItems) == 14
 			},
@@ -263,6 +267,9 @@ func TestNewFixedSizeExecutor(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.args)
+			}
 			fixedSizeExecutor := NewFixedSizeExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...)
 			defer fixedSizeExecutor.Stop()
 			assert.True(t, tt.executorValidator(t, fixedSizeExecutor.(*executor)), "NewFixedSizeExecutor(%v, %v, %v)", tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index f3d355789b..ae71055f07 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -37,9 +37,10 @@ func TestNewRequestTransactionManager(t *testing.T) {
 		requestTransactionManagerOptions []options.WithOption
 	}
 	tests := []struct {
-		name string
-		args args
-		want RequestTransactionManager
+		name  string
+		args  args
+		setup func(t *testing.T, args *args)
+		want  RequestTransactionManager
 	}{
 		{
 			name: "just create one",
@@ -65,6 +66,9 @@ func TestNewRequestTransactionManager(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.args)
+			}
 			if got := NewRequestTransactionManager(tt.args.numberOfConcurrentRequests, tt.args.requestTransactionManagerOptions...); !assert.Equal(t, tt.want, got) {
 				t.Errorf("NewRequestTransactionManager() = %v, want %v", got, tt.want)
 			}
diff --git a/plc4go/spi/transports/test/Transport.go b/plc4go/spi/transports/test/Transport.go
index 53c3f9383f..5f11bd346d 100644
--- a/plc4go/spi/transports/test/Transport.go
+++ b/plc4go/spi/transports/test/Transport.go
@@ -63,7 +63,7 @@ func (m *Transport) CreateTransportInstance(transportUrl url.URL, options map[st
 		return preregisteredInstance, nil
 	}
 	m.log.Trace().Msg("create transport instance")
-	return NewTransportInstance(m), nil
+	return NewTransportInstance(m, _options...), nil
 }
 
 func (m *Transport) AddPreregisteredInstances(transportUrl url.URL, preregisteredInstance transports.TransportInstance) error {