You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/30 13:51:55 UTC

[plc4x] branch develop updated (dd12ce4e2 -> 24e8bf91e)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from dd12ce4e2 chore: duplicate profiles for x86_64
     new b7db778f4 fix(plc-simulator/cbus): avoid sending out the inner message
     new 24e8bf91e refactor(plc4go/connection-cache): cleanup

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/pkg/api/cache/plc_connection_cache.go       | 466 +----------
 plc4go/pkg/api/cache/plc_connection_cache_test.go  | 861 +++------------------
 plc4go/pkg/api/cache/plc_connection_common.go      | 103 +++
 plc4go/pkg/api/cache/plc_connection_container.go   | 193 +++++
 plc4go/pkg/api/cache/plc_connection_lease.go       | 204 +++++
 plc4go/pkg/api/cache/plc_connection_lease_test.go  | 650 ++++++++++++++++
 .../server/cbus/protocol/CBusServerAdapter.java    |   8 +-
 7 files changed, 1316 insertions(+), 1169 deletions(-)
 create mode 100644 plc4go/pkg/api/cache/plc_connection_common.go
 create mode 100644 plc4go/pkg/api/cache/plc_connection_container.go
 create mode 100644 plc4go/pkg/api/cache/plc_connection_lease.go
 create mode 100644 plc4go/pkg/api/cache/plc_connection_lease_test.go


[plc4x] 01/02: fix(plc-simulator/cbus): avoid sending out the inner message

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit b7db778f4506c34cdd3639d9f2c0d1251016fef6
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Aug 30 15:15:48 2022 +0200

    fix(plc-simulator/cbus): avoid sending out the inner message
    
    + The inner message is contained in a "decoded" form a virtual field now
---
 .../plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/plc4j/utils/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java b/plc4j/utils/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
index cb9378dff..2943d6b13 100644
--- a/plc4j/utils/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
+++ b/plc4j/utils/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
@@ -122,7 +122,7 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
         if (request instanceof RequestDirectCommandAccess) {
             RequestDirectCommandAccess requestDirectCommandAccess = (RequestDirectCommandAccess) request;
             CALData calData = requestDirectCommandAccess.getCalData();
-            LOGGER.info("Handling RequestDirectCommandAccess\n{}\n{}", requestDirectCommandAccess, calData);
+            LOGGER.info("Handling RequestDirectCommandAccess\n{}", requestDirectCommandAccess);
 
             // TODO: handle other cal data type
             if (calData instanceof CALDataWrite) {
@@ -137,7 +137,7 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
                         replyOrConfirmation = new ReplyOrConfirmationConfirmation((byte) 0x0, new Confirmation(requestDirectCommandAccess.getAlpha(), null, ConfirmationType.CONFIRMATION_SUCCESSFUL), replyOrConfirmation, cBusOptions, requestContext);
                     }
                     CBusMessageToClient cBusMessageToClient = new CBusMessageToClient(replyOrConfirmation, requestContext, cBusOptions);
-                    LOGGER.info("Sending ack\n{}\n{}", cBusMessageToClient, encodedReply);
+                    LOGGER.info("Sending ack\n{}", cBusMessageToClient);
                     ctx.writeAndFlush(cBusMessageToClient);
                 };
                 switch (calDataWrite.getParamNo().getParameterType()) {
@@ -547,7 +547,7 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
                 Reply reply = new ReplyEncodedReply((byte) 0x0, encodedReply, null, cBusOptions, requestContext);
                 ReplyOrConfirmation replyOrConfirmation = new ReplyOrConfirmationReply((byte) 0x00, reply, new ResponseTermination(), cBusOptions, requestContext);
                 CBusMessage message = new CBusMessageToClient(replyOrConfirmation, requestContext, cBusOptions);
-                LOGGER.info("[SAL Monitor] Sending out\n{}\n{}", message, encodedReply);
+                LOGGER.info("[SAL Monitor] Sending out\n{}", message);
                 ctx.writeAndFlush(message);
             } finally {
                 outputLock.unlock();
@@ -595,7 +595,7 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
                 Reply reply = new ReplyEncodedReply((byte) 0x0, encodedReply, null, cBusOptions, requestContext);
                 ReplyOrConfirmation replyOrConfirmation = new ReplyOrConfirmationReply((byte) 0x00, reply, new ResponseTermination(), cBusOptions, requestContext);
                 CBusMessage message = new CBusMessageToClient(replyOrConfirmation, requestContext, cBusOptions);
-                LOGGER.info("[MMI Monitor] Sending out\n{}\n{}", message, encodedReply);
+                LOGGER.info("[MMI Monitor] Sending out\n{}", message);
                 ctx.writeAndFlush(message);
             } finally {
                 outputLock.unlock();


[plc4x] 02/02: refactor(plc4go/connection-cache): cleanup

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 24e8bf91ea4d39ad187553436979a6f7e5b45e64
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Aug 30 15:51:45 2022 +0200

    refactor(plc4go/connection-cache): cleanup
    
    + split up connection cache elements in multiple files.
    + added test
    + added String() methods for better output
---
 plc4go/pkg/api/cache/plc_connection_cache.go      | 466 +-----------
 plc4go/pkg/api/cache/plc_connection_cache_test.go | 861 ++++------------------
 plc4go/pkg/api/cache/plc_connection_common.go     | 103 +++
 plc4go/pkg/api/cache/plc_connection_container.go  | 193 +++++
 plc4go/pkg/api/cache/plc_connection_lease.go      | 204 +++++
 plc4go/pkg/api/cache/plc_connection_lease_test.go | 650 ++++++++++++++++
 6 files changed, 1312 insertions(+), 1165 deletions(-)

diff --git a/plc4go/pkg/api/cache/plc_connection_cache.go b/plc4go/pkg/api/cache/plc_connection_cache.go
index fbaf48aad..4cdac2077 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache.go
@@ -22,7 +22,6 @@ package cache
 import (
 	"fmt"
 	"github.com/apache/plc4x/plc4go/pkg/api"
-	"github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/apache/plc4x/plc4go/spi/utils"
@@ -37,6 +36,27 @@ type PlcConnectionCache interface {
 	Close() <-chan PlcConnectionCacheCloseResult
 }
 
+func NewPlcConnectionCache(driverManager plc4go.PlcDriverManager) PlcConnectionCache {
+	return NewPlcConnectionCacheWithMaxLeaseTime(driverManager, time.Second*5)
+}
+
+func NewPlcConnectionCacheWithMaxLeaseTime(driverManager plc4go.PlcDriverManager, maxLeaseTime time.Duration) PlcConnectionCache {
+	return &plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  maxLeaseTime,
+		maxWaitTime:   maxLeaseTime * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+}
+
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
 type plcConnectionCache struct {
 	driverManager plc4go.PlcDriverManager
 
@@ -50,21 +70,22 @@ type plcConnectionCache struct {
 	tracer      *spi.Tracer
 }
 
-func NewPlcConnectionCache(driverManager plc4go.PlcDriverManager) PlcConnectionCache {
-	return NewPlcConnectionCacheWithMaxLeaseTime(driverManager, time.Second*5)
-}
-
-func NewPlcConnectionCacheWithMaxLeaseTime(driverManager plc4go.PlcDriverManager, maxLeaseTime time.Duration) PlcConnectionCache {
-	return &plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  maxLeaseTime,
-		maxWaitTime:   maxLeaseTime * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
+func (t *plcConnectionCache) onConnectionEvent(event connectionEvent) {
+	connectionContainerInstance := event.getConnectionContainer()
+	if errorEvent, ok := event.(connectionErrorEvent); ok {
+		if t.tracer != nil {
+			t.tracer.AddTrace("destroy-connection", errorEvent.getError().Error())
+		}
+		log.Debug().Str("connectionString", connectionContainerInstance.connectionString)
 	}
 }
 
+//
+// Internal section
+//
+///////////////////////////////////////
+///////////////////////////////////////
+
 func (t *plcConnectionCache) EnableTracer() {
 	t.tracer = spi.NewTracer("cache")
 }
@@ -214,421 +235,6 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
 	return ch
 }
 
-// onConnectionEvent: Callback called by the connection container to signal connection events
-// that have an impact on the cache itself (Like connections being permanently closed).
-func (t *plcConnectionCache) onConnectionEvent(event connectionEvent) {
-	connectionContainer := event.getConnectionContainer()
-	if errorEvent, ok := event.(connectionErrorEvent); ok {
-		if t.tracer != nil {
-			t.tracer.AddTrace("destroy-connection", errorEvent.getError().Error())
-		}
-		log.Debug().Str("connectionString", event.getConnectionContainer().connectionString).
-			Msg("Got connection-error event ...")
-		delete(t.connections, connectionContainer.connectionString)
-	}
-}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// connectionContainer
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-type connectionContainer struct {
-	lock             lock.RWMutex
-	connectionString string
-	driverManager    plc4go.PlcDriverManager
-	tracerEnabled    bool
-	connection       spi.PlcConnection
-	leaseCounter     uint32
-	closed           bool
-	// The current state of this connection.
-	state cachedPlcConnectionState
-	// Queue of waiting clients.
-	queue []chan plc4go.PlcConnectionConnectResult
-	// Listeners for connection events.
-	listeners []connectionListener
-}
-
-func (t *connectionContainer) connect() {
-	log.Debug().Str("connectionString", t.connectionString).Msg("Connecting new cached connection ...")
-	// Initialize the new connection.
-	connectionResultChan := t.driverManager.GetConnection(t.connectionString)
-
-	// Allow us to finish this function and return the lock quickly
-	// Wait for the connection to be established.
-	// TODO: Add some timeout handling.
-	connectionResult := <-connectionResultChan
-
-	// Get the lock.
-	t.lock.Lock()
-	defer t.lock.Unlock()
-
-	// If the connection was successful, pass the active connection into the container.
-	// If something went wrong, we have to remove the connection from the cache and return the error.
-	if connectionResult.GetErr() == nil {
-		log.Debug().Str("connectionString", t.connectionString).Msg("Successfully connected new cached connection.")
-		// Inject the real connection into the container.
-		if _, ok := connectionResult.GetConnection().(spi.PlcConnection); !ok {
-			panic("Return connection doesn't implement the spi.PlcConnection interface")
-		}
-		t.connection = connectionResult.GetConnection().(spi.PlcConnection)
-		t.tracerEnabled = t.connection.IsTraceEnabled()
-		// Mark the connection as idle for now.
-		t.state = StateIdle
-		// If there is a request in the queue, hand out the connection to that.
-		if len(t.queue) > 0 {
-			// Get the first in the queue.
-			queueHead := t.queue[0]
-			t.queue = t.queue[1:]
-			// Mark the connection as being used.
-			t.state = StateInUse
-			// Return the lease to the caller.
-			connection := newPlcConnectionLease(t, t.leaseCounter, t.connection)
-			// In this case we don't need to check for blocks
-			// as the getConnection function of the connection cache
-			// is definitely eagerly waiting for input.
-			queueHead <- _default.NewDefaultPlcConnectionConnectResult(connection, nil)
-		}
-	} else {
-		log.Debug().Str("connectionString", t.connectionString).
-			Err(connectionResult.GetErr()).
-			Msg("Error connecting new cached connection.")
-		// Tell the connection cache that the connection is no longer available.
-		if t.listeners != nil {
-			event := connectionErrorEvent{
-				conn: *t,
-				err:  connectionResult.GetErr(),
-			}
-			for _, listener := range t.listeners {
-				listener.onConnectionEvent(event)
-			}
-		}
-
-		// Send a failure to all waiting clients.
-		if len(t.queue) > 0 {
-			for _, waitingClient := range t.queue {
-				waitingClient <- _default.NewDefaultPlcConnectionConnectResult(nil, connectionResult.GetErr())
-			}
-		}
-	}
-}
-
-func (t *connectionContainer) addListener(listener connectionListener) {
-	// Get the lock.
-	t.lock.Lock()
-	defer t.lock.Unlock()
-	// Add the listener to the queue
-	t.listeners = append(t.listeners, listener)
-}
-
-func (t *connectionContainer) lease() <-chan plc4go.PlcConnectionConnectResult {
-	t.lock.Lock()
-	defer t.lock.Unlock()
-
-	ch := make(chan plc4go.PlcConnectionConnectResult)
-	// Check if the connection is available.
-	if t.state == StateIdle {
-		t.leaseCounter++
-		connection := newPlcConnectionLease(t, t.leaseCounter, t.connection)
-		t.state = StateInUse
-		// In this case we don't need to check for blocks
-		// as the getConnection function of the connection cache
-		// is definitely eagerly waiting for input.
-		log.Debug().Str("connectionString", t.connectionString).
-			Msg("Got lease instantly as connection was idle.")
-		go func() {
-			ch <- _default.NewDefaultPlcConnectionConnectResult(connection, nil)
-		}()
-	} else if t.state == StateInUse || t.state == StateInitialized {
-		// If the connection is currently busy or not finished initializing,
-		// add the new channel to the queue for this connection.
-		t.queue = append(t.queue, ch)
-		log.Debug().Str("connectionString", t.connectionString).
-			Int("waiting-queue-size", len(t.queue)).
-			Msg("Added lease-request to queue.")
-	}
-	return ch
-}
-
-func (t *connectionContainer) returnConnection(state cachedPlcConnectionState) error {
-	// Intentionally not locking anything, as there are two cases, where the connection is returned:
-	// 1) The connection failed to get established (No connection has a lock anyway)
-	// 2) The connection is returned, then the one returning it already has a lock on it.
-	// If the connection is marked as "invalid", destroy it and remove it from the cache.
-	if state == StateInvalid {
-		// TODO: Perhaps do a maximum number of retries and then call failConnection()
-		log.Debug().Str("connectionString", t.connectionString).
-			Msg("Client returned invalid connection, reconnecting.")
-		t.connect()
-	} else {
-		log.Debug().Str("connectionString", t.connectionString).
-			Msg("Client returned valid connection.")
-	}
-
-	// Check how many others are waiting for this connection.
-	if len(t.queue) > 0 {
-		// There are waiting clients, give the connection to the next client in the line.
-		next := t.queue[0]
-		t.queue = t.queue[1:]
-		t.leaseCounter++
-		connection := newPlcConnectionLease(t, t.leaseCounter, t.connection)
-		// Send asynchronously as the receiver might have given up waiting,
-		// and we don't want anything to block here. 1ms should be enough for
-		// the calling process to reach the blocking read.
-		go func() {
-			// In this case we don't need to check for blocks
-			// as the getConnection function of the connection cache
-			// is definitely eagerly waiting for input.
-			next <- _default.NewDefaultPlcConnectionConnectResult(connection, nil)
-			log.Debug().Str("connectionString", t.connectionString).
-				Int("waiting-queue-size", len(t.queue)).
-				Msg("Returned connection to the next client waiting.")
-		}()
-	} else {
-		// Otherwise, just mark the connection as idle.
-		log.Debug().Str("connectionString", t.connectionString).
-			Msg("Connection set to 'idle'.")
-		t.state = StateIdle
-	}
-	return nil
-}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// plcConnectionLease
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-type plcConnectionLease struct {
-	// Reference back to the container, so we can give the connection back.
-	connectionContainer *connectionContainer
-	// Counter for the number of times this connection has been used before.
-	leaseId uint32
-	// The actual connection being cached.
-	connection spi.PlcConnection
-}
-
-func newPlcConnectionLease(connectionContainer *connectionContainer, leaseId uint32, connection spi.PlcConnection) *plcConnectionLease {
-	p := &plcConnectionLease{
-		connectionContainer: connectionContainer,
-		leaseId:             leaseId,
-		connection:          connection,
-	}
-	if connection.IsTraceEnabled() {
-		connection.GetTracer().SetConnectionId(p.GetConnectionId())
-	}
-	return p
-}
-
-func (t *plcConnectionLease) IsTraceEnabled() bool {
-	if t.connection == nil {
-		panic("Called 'IsTraceEnabled' on a closed cached connection")
-	}
-	return t.connection.IsTraceEnabled()
-}
-
-func (t *plcConnectionLease) GetTracer() *spi.Tracer {
-	if t.connection == nil {
-		panic("Called 'GetTracer' on a closed cached connection")
-	}
-	return t.connection.GetTracer()
-}
-
-func (t *plcConnectionLease) GetConnectionId() string {
-	if t.connection == nil {
-		panic("Called 'GetConnectionId' on a closed cached connection")
-	}
-	return fmt.Sprintf("%s-%d", t.connection.GetConnectionId(), t.leaseId)
-}
-
-func (t *plcConnectionLease) Connect() <-chan plc4go.PlcConnectionConnectResult {
-	panic("Called 'Connect' on a cached connection")
-}
-
-func (t *plcConnectionLease) BlockingClose() {
-	if t.connection == nil {
-		panic("Called 'BlockingClose' on a closed cached connection")
-	}
-	// Call close and wait for the operation to finish.
-	<-t.Close()
-}
-
-func (t *plcConnectionLease) Close() <-chan plc4go.PlcConnectionCloseResult {
-	if t.connection == nil {
-		panic("Called 'Close' on a closed cached connection")
-	}
-
-	result := make(chan plc4go.PlcConnectionCloseResult)
-
-	go func() {
-		// Check if the connection is still alive, if it is, put it back into the cache
-		pingResults := t.Ping()
-		pingTimeout := time.NewTimer(time.Second * 5)
-		newState := StateIdle
-		select {
-		case pingResult := <-pingResults:
-			{
-				if pingResult.GetErr() != nil {
-					newState = StateInvalid
-				}
-			}
-		case <-pingTimeout.C:
-			{
-				// Add some trace information
-				if t.connection.IsTraceEnabled() {
-					t.connection.GetTracer().AddTrace("ping", "timeout")
-				}
-				// Mark the connection as broken ...
-				newState = StateInvalid
-			}
-		}
-
-		// Extract the trace entries from the connection.
-		var traces []spi.TraceEntry
-		if t.IsTraceEnabled() {
-			tracer := t.GetTracer()
-			// Save all traces.
-			traces = tracer.GetTraces()
-			// Clear the log.
-			tracer.ResetTraces()
-			// Reset the connection id back to the one without the lease-id.
-			tracer.SetConnectionId(t.connection.GetConnectionId())
-		}
-
-		// Return the connection to the connection container and don't actually close it.
-		err := t.connectionContainer.returnConnection(newState)
-
-		// Finish closing the connection.
-		timeout := time.NewTimer(10 * time.Millisecond)
-		defer utils.CleanupTimer(timeout)
-		select {
-		case result <- _default.NewDefaultPlcConnectionCloseResultWithTraces(t, err, traces):
-		case <-timeout.C:
-		}
-
-		// Detach the connection from this lease, so it can no longer be used by the client.
-		t.connection = nil
-	}()
-
-	return result
-}
-
-func (t *plcConnectionLease) IsConnected() bool {
-	if t.connection == nil {
-		return false
-	}
-	return t.connection.IsConnected()
-}
-
-func (t *plcConnectionLease) Ping() <-chan plc4go.PlcConnectionPingResult {
-	if t.connection == nil {
-		panic("Called 'Ping' on a closed cached connection")
-	}
-	return t.connection.Ping()
-}
-
-func (t *plcConnectionLease) GetMetadata() model.PlcConnectionMetadata {
-	if t.connection == nil {
-		panic("Called 'GetMetadata' on a closed cached connection")
-	}
-	return t.connection.GetMetadata()
-}
-
-func (t *plcConnectionLease) ReadRequestBuilder() model.PlcReadRequestBuilder {
-	if t.connection == nil {
-		panic("Called 'ReadRequestBuilder' on a closed cached connection")
-	}
-	return t.connection.ReadRequestBuilder()
-}
-
-func (t *plcConnectionLease) WriteRequestBuilder() model.PlcWriteRequestBuilder {
-	if t.connection == nil {
-		panic("Called 'WriteRequestBuilder' on a closed cached connection")
-	}
-	return t.connection.WriteRequestBuilder()
-}
-
-func (t *plcConnectionLease) SubscriptionRequestBuilder() model.PlcSubscriptionRequestBuilder {
-	if t.connection == nil {
-		panic("Called 'SubscriptionRequestBuilder' on a closed cached connection")
-	}
-	return t.connection.SubscriptionRequestBuilder()
-}
-
-func (t *plcConnectionLease) UnsubscriptionRequestBuilder() model.PlcUnsubscriptionRequestBuilder {
-	if t.connection == nil {
-		panic("Called 'UnsubscriptionRequestBuilder' on a closed cached connection")
-	}
-	return t.connection.UnsubscriptionRequestBuilder()
-}
-
-func (t *plcConnectionLease) BrowseRequestBuilder() model.PlcBrowseRequestBuilder {
-	if t.connection == nil {
-		panic("Called 'BrowseRequestBuilder' on a closed cached connection")
-	}
-	return t.connection.BrowseRequestBuilder()
-}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// cachedPlcConnectionState
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-type cachedPlcConnectionState int32
-
-const (
-	StateInitialized cachedPlcConnectionState = iota
-	StateIdle
-	StateInUse
-	StateInvalid
-)
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// Events
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-type connectionListener interface {
-	onConnectionEvent(event connectionEvent)
-}
-
-type connectionEvent interface {
-	getConnectionContainer() connectionContainer
-}
-
-type connectionErrorEvent struct {
-	conn connectionContainer
-	err  error
-}
-
-func (c connectionErrorEvent) getConnectionContainer() connectionContainer {
-	return c.conn
-}
-
-func (c connectionErrorEvent) getError() error {
-	return c.err
-}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// PlcConnectionCacheCloseResult / plcConnectionCacheCloseResult
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-type PlcConnectionCacheCloseResult interface {
-	GetConnectionCache() PlcConnectionCache
-	GetErr() error
-}
-
-type plcConnectionCacheCloseResult struct {
-	connectionCache PlcConnectionCache
-	err             error
-}
-
-func newDefaultPlcConnectionCacheCloseResult(connectionCache PlcConnectionCache, err error) PlcConnectionCacheCloseResult {
-	return &plcConnectionCacheCloseResult{
-		connectionCache: connectionCache,
-		err:             err,
-	}
-}
-
-func (p plcConnectionCacheCloseResult) GetConnectionCache() PlcConnectionCache {
-	return p.connectionCache
-}
-
-func (p plcConnectionCacheCloseResult) GetErr() error {
-	return p.err
+func (t *plcConnectionCache) String() string {
+	return fmt.Sprintf("plcConnectionCache{driverManager: %s, maxLeaseTime: %s, maxWaitTime: %s, connections: %s, tracer: %s}", t.driverManager, t.maxLeaseTime, t.maxWaitTime, t.connections, t.tracer)
 }
diff --git a/plc4go/pkg/api/cache/plc_connection_cache_test.go b/plc4go/pkg/api/cache/plc_connection_cache_test.go
index 24f385b95..4eb436d83 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache_test.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache_test.go
@@ -32,7 +32,7 @@ import (
 
 var debugTimeout = 1
 
-func TestPlcConnectionCache_GetConnection(t1 *testing.T) {
+func TestPlcConnectionCache_GetConnection(t *testing.T) {
 	type fields struct {
 		driverManager plc4go.PlcDriverManager
 	}
@@ -74,26 +74,26 @@ func TestPlcConnectionCache_GetConnection(t1 *testing.T) {
 		},
 	}
 	for _, tt := range tests {
-		t1.Run(tt.name, func(t1 *testing.T) {
+		t.Run(tt.name, func(t *testing.T) {
 			cc := NewPlcConnectionCache(tt.fields.driverManager)
 			got := cc.GetConnection(tt.args.connectionString)
 			select {
 			case connectResult := <-got:
 				if tt.wantErr && (connectResult.GetErr() == nil) {
-					t1.Errorf("PlcConnectionCache.GetConnection() = %v, wantErr %v", connectResult.GetErr(), tt.wantErr)
+					t.Errorf("PlcConnectionCache.GetConnection() = %v, wantErr %v", connectResult.GetErr(), tt.wantErr)
 				} else if connectResult.GetErr() != nil {
-					t1.Errorf("PlcConnectionCache.GetConnection() error = %v, wantErr %v", connectResult.GetErr(), tt.wantErr)
+					t.Errorf("PlcConnectionCache.GetConnection() error = %v, wantErr %v", connectResult.GetErr(), tt.wantErr)
 				}
 			case <-time.After(10 * time.Second):
 				if !tt.wantTimeout {
-					t1.Errorf("PlcConnectionCache.GetConnection() got timeout")
+					t.Errorf("PlcConnectionCache.GetConnection() got timeout")
 				}
 			}
 		})
 	}
 }
 
-func TestPlcConnectionCache_Close(t1 *testing.T) {
+func TestPlcConnectionCache_Close(t *testing.T) {
 	type fields struct {
 		driverManager plc4go.PlcDriverManager
 	}
@@ -107,14 +107,16 @@ func TestPlcConnectionCache_Close(t1 *testing.T) {
 		wantErr     bool
 		wantTimeout bool
 	}{
-		{name: "simple",
+		{
+			name: "simple",
 			fields: fields{
 				driverManager: func() plc4go.PlcDriverManager {
 					driverManager := plc4go.NewPlcDriverManager()
 					driverManager.RegisterDriver(simulated.NewDriver())
 					return driverManager
 				}(),
-			}, args: args{
+			},
+			args: args{
 				connectionStrings: []string{
 					"simulated://1.2.3.4:42",
 					"simulated://4.3.2.1:23",
@@ -124,9 +126,21 @@ func TestPlcConnectionCache_Close(t1 *testing.T) {
 			wantErr:     false,
 			wantTimeout: false,
 		},
+		{
+			name: "empty close",
+			fields: fields{
+				driverManager: func() plc4go.PlcDriverManager {
+					driverManager := plc4go.NewPlcDriverManager()
+					driverManager.RegisterDriver(simulated.NewDriver())
+					return driverManager
+				}(),
+			},
+			wantErr:     false,
+			wantTimeout: false,
+		},
 	}
 	for _, tt := range tests {
-		t1.Run(tt.name, func(t1 *testing.T) {
+		t.Run(tt.name, func(t *testing.T) {
 			cc := NewPlcConnectionCache(tt.fields.driverManager)
 			// Connect to all sources first
 			for _, connectionString := range tt.args.connectionStrings {
@@ -134,14 +148,14 @@ func TestPlcConnectionCache_Close(t1 *testing.T) {
 				select {
 				case connectResult := <-got:
 					if connectResult.GetErr() != nil {
-						t1.Errorf("PlcConnectionCache.GetConnection() error = %v", connectResult.GetErr())
+						t.Errorf("PlcConnectionCache.GetConnection() error = %v", connectResult.GetErr())
 					} else {
 						// Give the connection back.
 						connectResult.GetConnection().Close()
 					}
 				case <-time.After(10 * time.Second):
 					if !tt.wantTimeout {
-						t1.Errorf("PlcConnectionCache.GetConnection() got timeout")
+						t.Errorf("PlcConnectionCache.GetConnection() got timeout")
 					}
 				}
 			}
@@ -151,13 +165,13 @@ func TestPlcConnectionCache_Close(t1 *testing.T) {
 			select {
 			case cacheCloseResult := <-cacheCloseResults:
 				if tt.wantErr && (cacheCloseResult.GetErr() == nil) {
-					t1.Errorf("PlcConnectionCache.Close() = %v, wantErr %v", cacheCloseResult.GetErr(), tt.wantErr)
+					t.Errorf("PlcConnectionCache.Close() = %v, wantErr %v", cacheCloseResult.GetErr(), tt.wantErr)
 				} else if cacheCloseResult.GetErr() != nil {
-					t1.Errorf("PlcConnectionCache.Close() error = %v, wantErr %v", cacheCloseResult.GetErr(), tt.wantErr)
+					t.Errorf("PlcConnectionCache.Close() error = %v, wantErr %v", cacheCloseResult.GetErr(), tt.wantErr)
 				}
 			case <-time.After(10 * time.Second):
 				if !tt.wantTimeout {
-					t1.Errorf("PlcConnectionCache.Close() got timeout")
+					t.Errorf("PlcConnectionCache.Close() got timeout")
 				}
 			}
 
@@ -165,7 +179,7 @@ func TestPlcConnectionCache_Close(t1 *testing.T) {
 	}
 }
 
-func readFromPlc(t1 *testing.T, cache plcConnectionCache, connectionString string, resourceString string) <-chan []spi.TraceEntry {
+func readFromPlc(t *testing.T, cache plcConnectionCache, connectionString string, resourceString string) <-chan []spi.TraceEntry {
 	ch := make(chan []spi.TraceEntry)
 
 	// Get a connection
@@ -173,7 +187,7 @@ func readFromPlc(t1 *testing.T, cache plcConnectionCache, connectionString strin
 	select {
 	case connectResult := <-connectionResultChan:
 		if connectResult.GetErr() != nil {
-			t1.Errorf("PlcConnectionCache.GetConnection() error = %v", connectResult.GetErr())
+			t.Errorf("PlcConnectionCache.GetConnection() error = %v", connectResult.GetErr())
 			return nil
 		}
 		connection := connectResult.GetConnection()
@@ -189,7 +203,7 @@ func readFromPlc(t1 *testing.T, cache plcConnectionCache, connectionString strin
 		// Prepare a read request.
 		readRequest, err := connection.ReadRequestBuilder().AddQuery("test", resourceString).Build()
 		if err != nil {
-			t1.Errorf("PlcConnectionCache.ReadRequest.Build() error = %v", err)
+			t.Errorf("PlcConnectionCache.ReadRequest.Build() error = %v", err)
 			return ch
 		}
 
@@ -199,45 +213,45 @@ func readFromPlc(t1 *testing.T, cache plcConnectionCache, connectionString strin
 		case readRequestResult := <-execution:
 			err := readRequestResult.GetErr()
 			if err != nil {
-				t1.Errorf("PlcConnectionCache.ReadRequest.Read() error = %v", err)
+				t.Errorf("PlcConnectionCache.ReadRequest.Read() error = %v", err)
 			}
 		case <-time.After(1 * time.Second):
-			t1.Errorf("PlcConnectionCache.ReadRequest.Read() timeout")
+			t.Errorf("PlcConnectionCache.ReadRequest.Read() timeout")
 		}
 		return ch
 	case <-time.After(20 * time.Second):
-		t1.Errorf("PlcConnectionCache.GetConnection() got timeout")
+		t.Errorf("PlcConnectionCache.GetConnection() got timeout")
 	}
 	return ch
 }
 
-func executeAndTestReadFromPlc(t1 *testing.T, cache plcConnectionCache, connectionString string, resourceString string, expectedTraceEntries []string, expectedNumTotalConnections int) <-chan bool {
+func executeAndTestReadFromPlc(t *testing.T, cache plcConnectionCache, connectionString string, resourceString string, expectedTraceEntries []string, expectedNumTotalConnections int) <-chan bool {
 	ch := make(chan bool)
 	go func() {
 		// Read once from the cache.
-		tracesChannel := readFromPlc(t1, cache, connectionString, resourceString)
+		tracesChannel := readFromPlc(t, cache, connectionString, resourceString)
 		traces := <-tracesChannel
 
 		// In the log we should see one "Successfully connected" entry.
 		if len(traces) != len(expectedTraceEntries) {
-			t1.Errorf("Expected %d 'Successfully connected' entries in the log but got %d", len(expectedTraceEntries), len(traces))
+			t.Errorf("Expected %d 'Successfully connected' entries in the log but got %d", len(expectedTraceEntries), len(traces))
 		}
 		for i, expectedTraceEntry := range expectedTraceEntries {
 			currentTraceEntry := traces[i].Operation + "-" + traces[i].Message
 			if expectedTraceEntry != currentTraceEntry {
-				t1.Errorf("Expected %s as trace entry but got %s", expectedTraceEntry, currentTraceEntry)
+				t.Errorf("Expected %s as trace entry but got %s", expectedTraceEntry, currentTraceEntry)
 			}
 		}
 		// Now there should be one connection in the cache.
 		if len(cache.connections) != expectedNumTotalConnections {
-			t1.Errorf("Expected %d connections in the cache but got %d", expectedNumTotalConnections, len(cache.connections))
+			t.Errorf("Expected %d connections in the cache but got %d", expectedNumTotalConnections, len(cache.connections))
 		}
 		ch <- true
 	}()
 	return ch
 }
 
-func TestPlcConnectionCache_ReusingAnExistingConnection(t1 *testing.T) {
+func TestPlcConnectionCache_ReusingAnExistingConnection(t *testing.T) {
 	driverManager := plc4go.NewPlcDriverManager()
 	driverManager.RegisterDriver(simulated.NewDriver())
 	cache := plcConnectionCache{
@@ -252,11 +266,11 @@ func TestPlcConnectionCache_ReusingAnExistingConnection(t1 *testing.T) {
 
 	// Initially there should be no connection in the cache.
 	if len(cache.connections) != 0 {
-		t1.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
+		t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
 	}
 
 	// Read once from the cache.
-	finishedChan := executeAndTestReadFromPlc(t1, cache, "simulated://1.2.3.4:42?traceEnabled=true", "RANDOM/test_random:BOOL",
+	finishedChan := executeAndTestReadFromPlc(t, cache, "simulated://1.2.3.4:42?traceEnabled=true", "RANDOM/test_random:BOOL",
 		[]string{
 			"connect-started",
 			"connect-success",
@@ -268,11 +282,11 @@ func TestPlcConnectionCache_ReusingAnExistingConnection(t1 *testing.T) {
 	select {
 	case _ = <-finishedChan:
 	case <-time.After(500 * time.Millisecond * time.Duration(debugTimeout)):
-		t1.Errorf("Timeout")
+		t.Errorf("Timeout")
 	}
 
 	// Request the same connection for a second time.
-	finishedChan = executeAndTestReadFromPlc(t1, cache, "simulated://1.2.3.4:42?traceEnabled=true", "RANDOM/test_random:BOOL",
+	finishedChan = executeAndTestReadFromPlc(t, cache, "simulated://1.2.3.4:42?traceEnabled=true", "RANDOM/test_random:BOOL",
 		[]string{
 			"read-started",
 			"read-success",
@@ -282,23 +296,23 @@ func TestPlcConnectionCache_ReusingAnExistingConnection(t1 *testing.T) {
 	select {
 	case _ = <-finishedChan:
 	case <-time.After(500 * time.Millisecond * time.Duration(debugTimeout)):
-		t1.Errorf("Timeout")
+		t.Errorf("Timeout")
 	}
 
-	assert.NotNil(t1, cache.GetTracer(), "Tracer should be available")
+	assert.NotNil(t, cache.GetTracer(), "Tracer should be available")
 	traces := cache.GetTracer().GetTraces()
-	assert.Equal(t1, 5, len(traces), "Unexpected number of trace entries")
+	assert.Equal(t, 5, len(traces), "Unexpected number of trace entries")
 	// First is needs to create a new container for this connection
-	assert.Equal(t1, "create new cached connection", traces[0].Message, "Unexpected message")
+	assert.Equal(t, "create new cached connection", traces[0].Message, "Unexpected message")
 	// Then it gets a lease for the connection
-	assert.Equal(t1, "lease", traces[1].Message, "Unexpected message")
-	assert.Equal(t1, "success", traces[2].Message, "Unexpected message")
+	assert.Equal(t, "lease", traces[1].Message, "Unexpected message")
+	assert.Equal(t, "success", traces[2].Message, "Unexpected message")
 	// And a second time
-	assert.Equal(t1, "lease", traces[3].Message, "Unexpected message")
-	assert.Equal(t1, "success", traces[4].Message, "Unexpected message")
+	assert.Equal(t, "lease", traces[3].Message, "Unexpected message")
+	assert.Equal(t, "success", traces[4].Message, "Unexpected message")
 }
 
-func TestPlcConnectionCache_MultipleConcurrentConnectionRequests(t1 *testing.T) {
+func TestPlcConnectionCache_MultipleConcurrentConnectionRequests(t *testing.T) {
 	driverManager := plc4go.NewPlcDriverManager()
 	driverManager.RegisterDriver(simulated.NewDriver())
 	cache := plcConnectionCache{
@@ -313,11 +327,11 @@ func TestPlcConnectionCache_MultipleConcurrentConnectionRequests(t1 *testing.T)
 
 	// Initially there should be no connection in the cache.
 	if len(cache.connections) != 0 {
-		t1.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
+		t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
 	}
 
 	// Read once from the cache.
-	firstRun := executeAndTestReadFromPlc(t1, cache, "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
+	firstRun := executeAndTestReadFromPlc(t, cache, "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
 		[]string{
 			"connect-started",
 			"connect-success",
@@ -333,7 +347,7 @@ func TestPlcConnectionCache_MultipleConcurrentConnectionRequests(t1 *testing.T)
 	// As the connection takes 100ms, the second connection request will come
 	// in while the first is still not finished. So in theory it would have
 	// to wait for the first operation to be finished first.
-	secondRun := executeAndTestReadFromPlc(t1, cache, "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
+	secondRun := executeAndTestReadFromPlc(t, cache, "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
 		[]string{
 			"read-started",
 			"read-success",
@@ -345,30 +359,30 @@ func TestPlcConnectionCache_MultipleConcurrentConnectionRequests(t1 *testing.T)
 		select {
 		case _ = <-secondRun:
 		case <-time.After(500 * time.Millisecond * time.Duration(debugTimeout)):
-			t1.Errorf("Timeout")
+			t.Errorf("Timeout")
 		}
 		break
 	case <-time.After(1 * time.Second * time.Duration(debugTimeout)):
-		t1.Errorf("Timeout")
+		t.Errorf("Timeout")
 	}
 
 	// This should be quite equal to the serial case as the connections are requested serially.
-	assert.NotNil(t1, cache.GetTracer(), "Tracer should be available")
+	assert.NotNil(t, cache.GetTracer(), "Tracer should be available")
 	traces := cache.GetTracer().GetTraces()
-	assert.Equal(t1, 5, len(traces), "Unexpected number of trace entries")
+	assert.Equal(t, 5, len(traces), "Unexpected number of trace entries")
 	// First is needs to create a new container for this connection
-	assert.Equal(t1, "create new cached connection", traces[0].Message, "Unexpected message")
+	assert.Equal(t, "create new cached connection", traces[0].Message, "Unexpected message")
 	// Then it gets a lease for the connection
-	assert.Equal(t1, "lease", traces[1].Message, "Unexpected message")
+	assert.Equal(t, "lease", traces[1].Message, "Unexpected message")
 	// And a second time
-	assert.Equal(t1, "lease", traces[2].Message, "Unexpected message")
+	assert.Equal(t, "lease", traces[2].Message, "Unexpected message")
 	// Now the delay of 100ms is over, and we should see the first success
-	assert.Equal(t1, "success", traces[3].Message, "Unexpected message")
+	assert.Equal(t, "success", traces[3].Message, "Unexpected message")
 	// Now the first operation is finished, and we should see the second success
-	assert.Equal(t1, "success", traces[4].Message, "Unexpected message")
+	assert.Equal(t, "success", traces[4].Message, "Unexpected message")
 }
 
-func TestPlcConnectionCache_ConnectWithError(t1 *testing.T) {
+func TestPlcConnectionCache_ConnectWithError(t *testing.T) {
 	driverManager := plc4go.NewPlcDriverManager()
 	driverManager.RegisterDriver(simulated.NewDriver())
 	cache := plcConnectionCache{
@@ -383,28 +397,28 @@ func TestPlcConnectionCache_ConnectWithError(t1 *testing.T) {
 
 	// Initially there should be no connection in the cache.
 	if len(cache.connections) != 0 {
-		t1.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
+		t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
 	}
 
 	connectionResultChan := cache.GetConnection("simulated://1.2.3.4:42?connectionError=hurz&traceEnabled=true")
 	select {
 	case connectResult := <-connectionResultChan:
 		if connectResult.GetErr() == nil {
-			t1.Error("An error was expected")
+			t.Error("An error was expected")
 			return
 		}
 		if connectResult.GetErr().Error() != "hurz" {
-			t1.Errorf("An error '%s' was expected, but got '%s'", "hurz", connectResult.GetErr().Error())
+			t.Errorf("An error '%s' was expected, but got '%s'", "hurz", connectResult.GetErr().Error())
 		}
 	case <-time.After(20 * time.Second):
-		t1.Errorf("PlcConnectionCache.GetConnection() got timeout")
+		t.Errorf("PlcConnectionCache.GetConnection() got timeout")
 	}
 }
 
 // In this test, the ping operation used to test the connection before
 // putting it back into the cache will return an error, hereby marking
 // the connection as invalid
-func TestPlcConnectionCache_ReturningConnectionWithPingError(t1 *testing.T) {
+func TestPlcConnectionCache_ReturningConnectionWithPingError(t *testing.T) {
 	driverManager := plc4go.NewPlcDriverManager()
 	driverManager.RegisterDriver(simulated.NewDriver())
 	cache := plcConnectionCache{
@@ -419,14 +433,14 @@ func TestPlcConnectionCache_ReturningConnectionWithPingError(t1 *testing.T) {
 
 	// Initially there should be no connection in the cache.
 	if len(cache.connections) != 0 {
-		t1.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
+		t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
 	}
 
 	connectionResultChan := cache.GetConnection("simulated://1.2.3.4:42?pingError=hurz&traceEnabled=true")
 	select {
 	case connectResult := <-connectionResultChan:
 		if connectResult.GetErr() != nil {
-			t1.Errorf("PlcConnectionCache.GetConnection() error = %v", connectResult.GetErr())
+			t.Errorf("PlcConnectionCache.GetConnection() error = %v", connectResult.GetErr())
 		}
 		connection := connectResult.GetConnection().(*plcConnectionLease)
 		if connection != nil {
@@ -436,30 +450,30 @@ func TestPlcConnectionCache_ReturningConnectionWithPingError(t1 *testing.T) {
 				traces := (closeResult.(_default.DefaultPlcConnectionCloseResult)).GetTraces()
 				// We expect 4 traces (Connect start & success and Ping start and error.
 				if len(traces) != 4 {
-					t1.Errorf("Expected %d trace entries but got %d", 4, len(traces))
+					t.Errorf("Expected %d trace entries but got %d", 4, len(traces))
 				}
 				if traces[0].Operation+"-"+traces[0].Message != "connect-started" {
-					t1.Errorf("Expected '%s' as first trace message, but got '%s'", "connect-started", traces[0])
+					t.Errorf("Expected '%s' as first trace message, but got '%s'", "connect-started", traces[0])
 				}
 				if traces[1].Operation+"-"+traces[1].Message != "connect-success" {
-					t1.Errorf("Expected '%s' as second trace message, but got '%s'", "connect-success", traces[1])
+					t.Errorf("Expected '%s' as second trace message, but got '%s'", "connect-success", traces[1])
 				}
 				if traces[2].Operation+"-"+traces[2].Message != "ping-started" {
-					t1.Errorf("Expected '%s' as third trace message, but got '%s'", "ping-started", traces[2])
+					t.Errorf("Expected '%s' as third trace message, but got '%s'", "ping-started", traces[2])
 				}
 				if traces[3].Operation+"-"+traces[3].Message != "ping-error: hurz" {
-					t1.Errorf("Expected '%s' as fourth trace message, but got '%s'", "ping-error: hurz", traces[3])
+					t.Errorf("Expected '%s' as fourth trace message, but got '%s'", "ping-error: hurz", traces[3])
 				}
 			}
 		}
 	case <-time.After(20 * time.Second):
-		t1.Errorf("PlcConnectionCache.GetConnection() got timeout")
+		t.Errorf("PlcConnectionCache.GetConnection() got timeout")
 	}
 }
 
 // In this test, we'll make the ping operation take longer than the timeout in the connection cache
 // Therefore the error handling should kick in.
-func TestPlcConnectionCache_PingTimeout(t1 *testing.T) {
+func TestPlcConnectionCache_PingTimeout(t *testing.T) {
 	driverManager := plc4go.NewPlcDriverManager()
 	driverManager.RegisterDriver(simulated.NewDriver())
 	cache := plcConnectionCache{
@@ -474,11 +488,11 @@ func TestPlcConnectionCache_PingTimeout(t1 *testing.T) {
 
 	// Initially there should be no connection in the cache.
 	if len(cache.connections) != 0 {
-		t1.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
+		t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
 	}
 
 	// Read once from the cache.
-	firstRun := executeAndTestReadFromPlc(t1, cache, "simulated://1.2.3.4:42?pingDelay=10000&traceEnabled=true", "RANDOM/test_random:BOOL",
+	firstRun := executeAndTestReadFromPlc(t, cache, "simulated://1.2.3.4:42?pingDelay=10000&traceEnabled=true", "RANDOM/test_random:BOOL",
 		[]string{
 			"connect-started",
 			"connect-success",
@@ -492,7 +506,7 @@ func TestPlcConnectionCache_PingTimeout(t1 *testing.T) {
 	case _ = <-firstRun:
 		break
 	case <-time.After(20 * time.Second * time.Duration(debugTimeout)):
-		t1.Errorf("Timeout")
+		t.Errorf("Timeout")
 	}
 
 }
@@ -500,7 +514,7 @@ func TestPlcConnectionCache_PingTimeout(t1 *testing.T) {
 // In this test there are multiple requests for the same connection but the first operation fails at returning
 // the connection due to a timeout in the ping operation. The second call should get a new connection in this
 // case.
-func TestPlcConnectionCache_SecondCallGetNewConnectionAfterPingTimeout(t1 *testing.T) {
+func TestPlcConnectionCache_SecondCallGetNewConnectionAfterPingTimeout(t *testing.T) {
 	driverManager := plc4go.NewPlcDriverManager()
 	driverManager.RegisterDriver(simulated.NewDriver())
 	cache := plcConnectionCache{
@@ -515,11 +529,11 @@ func TestPlcConnectionCache_SecondCallGetNewConnectionAfterPingTimeout(t1 *testi
 
 	// Initially there should be no connection in the cache.
 	if len(cache.connections) != 0 {
-		t1.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
+		t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
 	}
 
 	// Read once from the cache.
-	firstRun := executeAndTestReadFromPlc(t1, cache, "simulated://1.2.3.4:42?pingDelay=10000&connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
+	firstRun := executeAndTestReadFromPlc(t, cache, "simulated://1.2.3.4:42?pingDelay=10000&connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
 		[]string{
 			"connect-started",
 			"connect-success",
@@ -535,7 +549,7 @@ func TestPlcConnectionCache_SecondCallGetNewConnectionAfterPingTimeout(t1 *testi
 	// As the connection takes 100ms, the second connection request will come
 	// in while the first is still not finished. So in theory it would have
 	// to wait for the first operation to be finished first.
-	secondRun := executeAndTestReadFromPlc(t1, cache, "simulated://1.2.3.4:42?pingDelay=10000&connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
+	secondRun := executeAndTestReadFromPlc(t, cache, "simulated://1.2.3.4:42?pingDelay=10000&connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
 		[]string{
 			"connect-started",
 			"connect-success",
@@ -549,32 +563,32 @@ func TestPlcConnectionCache_SecondCallGetNewConnectionAfterPingTimeout(t1 *testi
 		select {
 		case _ = <-secondRun:
 		case <-time.After(20 * time.Second * time.Duration(debugTimeout)):
-			t1.Errorf("Timeout")
+			t.Errorf("Timeout")
 		}
 		break
 	case <-time.After(30 * time.Second * time.Duration(debugTimeout)):
-		t1.Errorf("Timeout")
+		t.Errorf("Timeout")
 	}
 
 	// This should be quite equal to the serial case as the connections are requested serially.
-	assert.NotNil(t1, cache.GetTracer(), "Tracer should be available")
+	assert.NotNil(t, cache.GetTracer(), "Tracer should be available")
 	traces := cache.GetTracer().GetTraces()
-	assert.Equal(t1, 5, len(traces), "Unexpected number of trace entries")
+	assert.Equal(t, 5, len(traces), "Unexpected number of trace entries")
 	// First is needs to create a new container for this connection
-	assert.Equal(t1, "create new cached connection", traces[0].Message, "Unexpected message")
+	assert.Equal(t, "create new cached connection", traces[0].Message, "Unexpected message")
 	// Then it gets a lease for the connection
-	assert.Equal(t1, "lease", traces[1].Message, "Unexpected message")
+	assert.Equal(t, "lease", traces[1].Message, "Unexpected message")
 	// And a second time
-	assert.Equal(t1, "lease", traces[2].Message, "Unexpected message")
+	assert.Equal(t, "lease", traces[2].Message, "Unexpected message")
 	// Now the delay of 100ms is over, and we should see the first success
-	assert.Equal(t1, "success", traces[3].Message, "Unexpected message")
+	assert.Equal(t, "success", traces[3].Message, "Unexpected message")
 	// Now the first operation is finished, and we should see the second success
-	assert.Equal(t1, "success", traces[4].Message, "Unexpected message")
+	assert.Equal(t, "success", traces[4].Message, "Unexpected message")
 }
 
 // In this test the first client requests a connection, but doesn't listen on the response-channel
 // This shouldn't block the connection cache.
-func TestPlcConnectionCache_FistReadGivesUpBeforeItGetsTheConnectionSoSecondOneTakesOver(t1 *testing.T) {
+func TestPlcConnectionCache_FistReadGivesUpBeforeItGetsTheConnectionSoSecondOneTakesOver(t *testing.T) {
 	driverManager := plc4go.NewPlcDriverManager()
 	driverManager.RegisterDriver(simulated.NewDriver())
 	cache := plcConnectionCache{
@@ -589,7 +603,7 @@ func TestPlcConnectionCache_FistReadGivesUpBeforeItGetsTheConnectionSoSecondOneT
 
 	// Initially there should be no connection in the cache.
 	if len(cache.connections) != 0 {
-		t1.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
+		t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
 	}
 
 	// Intentionally just ignore the response.
@@ -599,7 +613,7 @@ func TestPlcConnectionCache_FistReadGivesUpBeforeItGetsTheConnectionSoSecondOneT
 
 	// Read once from the cache.
 	// NOTE: It doesn't contain the connect-part, as the previous connection handled that.
-	firstRun := executeAndTestReadFromPlc(t1, cache, "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
+	firstRun := executeAndTestReadFromPlc(t, cache, "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
 		[]string{
 			"read-started",
 			"read-success",
@@ -611,11 +625,11 @@ func TestPlcConnectionCache_FistReadGivesUpBeforeItGetsTheConnectionSoSecondOneT
 	case _ = <-firstRun:
 		break
 	case <-time.After(30 * time.Second * time.Duration(debugTimeout)):
-		t1.Errorf("Timeout")
+		t.Errorf("Timeout")
 	}
 }
 
-func TestPlcConnectionCache_SecondConnectionGivenUpWaiting(t1 *testing.T) {
+func TestPlcConnectionCache_SecondConnectionGivenUpWaiting(t *testing.T) {
 	driverManager := plc4go.NewPlcDriverManager()
 	driverManager.RegisterDriver(simulated.NewDriver())
 	cache := plcConnectionCache{
@@ -630,11 +644,11 @@ func TestPlcConnectionCache_SecondConnectionGivenUpWaiting(t1 *testing.T) {
 
 	// Initially there should be no connection in the cache.
 	if len(cache.connections) != 0 {
-		t1.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
+		t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
 	}
 
 	// Read once from the cache.
-	firstRun := executeAndTestReadFromPlc(t1, cache, "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
+	firstRun := executeAndTestReadFromPlc(t, cache, "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL",
 		[]string{
 			"connect-started",
 			"connect-success",
@@ -653,38 +667,38 @@ func TestPlcConnectionCache_SecondConnectionGivenUpWaiting(t1 *testing.T) {
 	select {
 	case _ = <-firstRun:
 	case <-time.After(30 * time.Second * time.Duration(debugTimeout)):
-		t1.Errorf("Timeout")
+		t.Errorf("Timeout")
 	}
 
 	// Wait for 1s to have the connection cache timeout (10ms) the lease as nobody's listening.
 	time.Sleep(time.Millisecond * 1000)
 
 	// This should be quite equal to the serial case as the connections are requested serially.
-	assert.NotNil(t1, cache.GetTracer(), "Tracer should be available")
+	assert.NotNil(t, cache.GetTracer(), "Tracer should be available")
 	traces := cache.GetTracer().GetTraces()
-	if assert.Equal(t1, 5, len(traces), "Unexpected number of trace entries") {
+	if assert.Equal(t, 5, len(traces), "Unexpected number of trace entries") {
 		// First is needs to create a new container for this connection
-		assert.Equal(t1, "create new cached connection", traces[0].Message, "Unexpected message")
+		assert.Equal(t, "create new cached connection", traces[0].Message, "Unexpected message")
 		// Then it gets a lease for the connection
-		assert.Equal(t1, "lease", traces[1].Message, "Unexpected message")
+		assert.Equal(t, "lease", traces[1].Message, "Unexpected message")
 		// And a second time
-		assert.Equal(t1, "lease", traces[2].Message, "Unexpected message")
+		assert.Equal(t, "lease", traces[2].Message, "Unexpected message")
 		// Now the delay of 100ms is over, and we should see the first success
-		assert.Equal(t1, "success", traces[3].Message, "Unexpected message")
+		assert.Equal(t, "success", traces[3].Message, "Unexpected message")
 		// Now the first operation is finished, and we should see the second give up
-		assert.Equal(t1, "client given up", traces[4].Message, "Unexpected message")
+		assert.Equal(t, "client given up", traces[4].Message, "Unexpected message")
 	} else if len(traces) > 0 {
 		var values string
 		for _, traceEntry := range traces {
 			values = values + traceEntry.Operation + "-" + traceEntry.Message + ", "
 		}
-		t1.Errorf("Got traces: %s", values)
+		t.Errorf("Got traces: %s", values)
 	} else {
-		t1.Error("No traces")
+		t.Error("No traces")
 	}
 }
 
-func TestPlcConnectionCache_MaximumWaitTimeReached(t1 *testing.T) {
+func TestPlcConnectionCache_MaximumWaitTimeReached(t *testing.T) {
 	driverManager := plc4go.NewPlcDriverManager()
 	driverManager.RegisterDriver(simulated.NewDriver())
 	// Reduce the max lease time as this way we also reduce the max wait time.
@@ -700,7 +714,7 @@ func TestPlcConnectionCache_MaximumWaitTimeReached(t1 *testing.T) {
 
 	// Initially there should be no connection in the cache.
 	if len(cache.connections) != 0 {
-		t1.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
+		t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections))
 	}
 
 	// The first and second connection should work fine
@@ -719,663 +733,40 @@ func TestPlcConnectionCache_MaximumWaitTimeReached(t1 *testing.T) {
 	go func() {
 		select {
 		case connectionResult := <-firstConnectionResults:
-			if assert.NotNil(t1, connectionResult) {
-				if assert.Nil(t1, connectionResult.GetErr()) {
+			if assert.NotNil(t, connectionResult) {
+				if assert.Nil(t, connectionResult.GetErr()) {
 					// Give back the connection.
 					connectionResult.GetConnection().Close()
 				}
 			}
 		case <-time.After(5 * time.Second):
-			t1.Errorf("Timeout")
+			t.Errorf("Timeout")
 		}
 	}()
 	go func() {
 		select {
 		case connectionResult := <-secondConnectionResults:
-			if assert.NotNil(t1, connectionResult) {
-				if assert.Nil(t1, connectionResult.GetErr()) {
+			if assert.NotNil(t, connectionResult) {
+				if assert.Nil(t, connectionResult.GetErr()) {
 					// Give back the connection.
 					connectionResult.GetConnection().Close()
 				}
 			}
 		case <-time.After(5 * time.Second):
-			t1.Errorf("Timeout")
+			t.Errorf("Timeout")
 		}
 	}()
 
 	// Now wait for the last connection to be timed out by the cache
 	select {
 	case connectionResult := <-thirdConnectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetConnection())
-			if assert.NotNil(t1, connectionResult.GetErr()) {
-				assert.Equal(t1, "timeout while waiting for connection", connectionResult.GetErr().Error())
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetConnection())
+			if assert.NotNil(t, connectionResult.GetErr()) {
+				assert.Equal(t, "timeout while waiting for connection", connectionResult.GetErr().Error())
 			}
 		}
 	case <-time.After(15 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-///////////////////////////////////////////////////////////////////////////////////////////////////////////
-// LeasedPlcConnection Tests
-
-func TestLeasedPlcConnection_IsTraceEnabled(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection().(spi.PlcConnection)
-				assert.True(t1, connection.IsTraceEnabled())
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'IsTraceEnabled' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.IsTraceEnabled()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-
-	// The first and second connection should work fine
-	connectionResults = cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection().(spi.PlcConnection)
-				assert.False(t1, connection.IsTraceEnabled())
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'IsTraceEnabled' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.IsTraceEnabled()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_GetTracer(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection().(spi.PlcConnection)
-				assert.NotNil(t1, connection.GetTracer())
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'GetTracer' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.GetTracer()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_GetConnectionId(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection().(spi.PlcConnection)
-				assert.Greater(t1, len(connection.GetConnectionId()), 0)
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'GetConnectionId' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.GetConnectionId()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_Connect(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection().(spi.PlcConnection)
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'Connect' on a cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.Connect()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_BlockingClose(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection()
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'BlockingClose' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.BlockingClose()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_Close(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection()
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'Close' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.Close()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_IsConnected(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection()
-				assert.True(t1, connection.IsConnected())
-				connection.BlockingClose()
-				assert.False(t1, connection.IsConnected())
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_Ping(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection()
-				connection.Ping()
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'Ping' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.Ping()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_GetMetadata(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection()
-				metadata := connection.GetMetadata()
-				if assert.NotNil(t1, metadata) {
-					attributes := metadata.GetConnectionAttributes()
-					assert.NotNil(t1, attributes)
-				}
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'GetMetadata' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.GetMetadata()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_ReadRequestBuilder(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection()
-				builder := connection.ReadRequestBuilder()
-				assert.NotNil(t1, builder)
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'ReadRequestBuilder' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.ReadRequestBuilder()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_WriteRequestBuilder(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection()
-				builder := connection.WriteRequestBuilder()
-				assert.NotNil(t1, builder)
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'WriteRequestBuilder' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.WriteRequestBuilder()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_SubscriptionRequestBuilder(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "not implemented")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.SubscriptionRequestBuilder()
-				}()
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'SubscriptionRequestBuilder' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.SubscriptionRequestBuilder()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_UnsubscriptionRequestBuilder(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "not implemented")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.UnsubscriptionRequestBuilder()
-				}()
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'UnsubscriptionRequestBuilder' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.UnsubscriptionRequestBuilder()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
-	}
-}
-
-func TestLeasedPlcConnection_BrowseRequestBuilder(t1 *testing.T) {
-	driverManager := plc4go.NewPlcDriverManager()
-	driverManager.RegisterDriver(simulated.NewDriver())
-	// Reduce the max lease time as this way we also reduce the max wait time.
-	cache := plcConnectionCache{
-		driverManager: driverManager,
-		maxLeaseTime:  time.Second * 1,
-		maxWaitTime:   time.Second * 5,
-		cacheLock:     lock.NewCASMutex(),
-		connections:   make(map[string]*connectionContainer),
-		tracer:        nil,
-	}
-	cache.EnableTracer()
-
-	// The first and second connection should work fine
-	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
-	select {
-	case connectionResult := <-connectionResults:
-		if assert.NotNil(t1, connectionResult) {
-			assert.Nil(t1, connectionResult.GetErr())
-			if assert.NotNil(t1, connectionResult.GetConnection()) {
-				connection := connectionResult.GetConnection()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "not implemented")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.BrowseRequestBuilder()
-				}()
-				connection.BlockingClose()
-				func() {
-					defer func() {
-						if r := recover(); r != nil {
-							assert.Equal(t1, r, "Called 'BrowseRequestBuilder' on a closed cached connection")
-						} else {
-							t1.Errorf("The code did not panic")
-						}
-					}()
-					connection.BrowseRequestBuilder()
-				}()
-			}
-		}
-	case <-time.After(1 * time.Second):
-		t1.Errorf("Timeout")
+		t.Errorf("Timeout")
 	}
 }
diff --git a/plc4go/pkg/api/cache/plc_connection_common.go b/plc4go/pkg/api/cache/plc_connection_common.go
new file mode 100644
index 000000000..906f15778
--- /dev/null
+++ b/plc4go/pkg/api/cache/plc_connection_common.go
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cache
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// cachedPlcConnectionState
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+type cachedPlcConnectionState int32
+
+const (
+	StateInitialized cachedPlcConnectionState = iota
+	StateIdle
+	StateInUse
+	StateInvalid
+)
+
+func (c cachedPlcConnectionState) String() string {
+	switch c {
+	case StateInitialized:
+		return "StateInitialized"
+	case StateIdle:
+		return "StateIdle"
+	case StateInUse:
+		return "StateInUse"
+	case StateInvalid:
+		return "StateInvalid"
+	}
+	return "Unknown"
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// Events
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+type connectionListener interface {
+	// onConnectionEvent: Callback called by the connection container to signal connection events
+	// that have an impact on the cache itself (Like connections being permanently closed).
+	onConnectionEvent(event connectionEvent)
+}
+
+type connectionEvent interface {
+	getConnectionContainer() connectionContainer
+}
+
+type connectionErrorEvent struct {
+	conn connectionContainer
+	err  error
+}
+
+func (c connectionErrorEvent) getConnectionContainer() connectionContainer {
+	return c.conn
+}
+
+func (c connectionErrorEvent) getError() error {
+	return c.err
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// PlcConnectionCacheCloseResult / plcConnectionCacheCloseResult
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+type PlcConnectionCacheCloseResult interface {
+	GetConnectionCache() PlcConnectionCache
+	GetErr() error
+}
+
+type plcConnectionCacheCloseResult struct {
+	connectionCache PlcConnectionCache
+	err             error
+}
+
+func newDefaultPlcConnectionCacheCloseResult(connectionCache PlcConnectionCache, err error) PlcConnectionCacheCloseResult {
+	return &plcConnectionCacheCloseResult{
+		connectionCache: connectionCache,
+		err:             err,
+	}
+}
+
+func (p plcConnectionCacheCloseResult) GetConnectionCache() PlcConnectionCache {
+	return p.connectionCache
+}
+
+func (p plcConnectionCacheCloseResult) GetErr() error {
+	return p.err
+}
diff --git a/plc4go/pkg/api/cache/plc_connection_container.go b/plc4go/pkg/api/cache/plc_connection_container.go
new file mode 100644
index 000000000..e61eda54c
--- /dev/null
+++ b/plc4go/pkg/api/cache/plc_connection_container.go
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cache
+
+import (
+	"fmt"
+	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
+	"github.com/apache/plc4x/plc4go/spi"
+	_default "github.com/apache/plc4x/plc4go/spi/default"
+	"github.com/rs/zerolog/log"
+	"github.com/viney-shih/go-lock"
+)
+
+type connectionContainer struct {
+	lock             lock.RWMutex
+	connectionString string
+	driverManager    plc4go.PlcDriverManager
+	tracerEnabled    bool
+	connection       spi.PlcConnection
+	leaseCounter     uint32
+	closed           bool
+	// The current state of this connection.
+	state cachedPlcConnectionState
+	// Queue of waiting clients.
+	queue []chan plc4go.PlcConnectionConnectResult
+	// Listeners for connection events.
+	listeners []connectionListener
+}
+
+func (t *connectionContainer) connect() {
+	log.Debug().Str("connectionString", t.connectionString).Msg("Connecting new cached connection ...")
+	// Initialize the new connection.
+	connectionResultChan := t.driverManager.GetConnection(t.connectionString)
+
+	// Allow us to finish this function and return the lock quickly
+	// Wait for the connection to be established.
+	// TODO: Add some timeout handling.
+	connectionResult := <-connectionResultChan
+
+	// Get the lock.
+	t.lock.Lock()
+	defer t.lock.Unlock()
+
+	// If the connection was successful, pass the active connection into the container.
+	// If something went wrong, we have to remove the connection from the cache and return the error.
+	if connectionResult.GetErr() == nil {
+		log.Debug().Str("connectionString", t.connectionString).Msg("Successfully connected new cached connection.")
+		// Inject the real connection into the container.
+		if _, ok := connectionResult.GetConnection().(spi.PlcConnection); !ok {
+			panic("Return connection doesn't implement the spi.PlcConnection interface")
+		}
+		t.connection = connectionResult.GetConnection().(spi.PlcConnection)
+		t.tracerEnabled = t.connection.IsTraceEnabled()
+		// Mark the connection as idle for now.
+		t.state = StateIdle
+		// If there is a request in the queue, hand out the connection to that.
+		if len(t.queue) > 0 {
+			// Get the first in the queue.
+			queueHead := t.queue[0]
+			t.queue = t.queue[1:]
+			// Mark the connection as being used.
+			t.state = StateInUse
+			// Return the lease to the caller.
+			connection := newPlcConnectionLease(t, t.leaseCounter, t.connection)
+			// In this case we don't need to check for blocks
+			// as the getConnection function of the connection cache
+			// is definitely eagerly waiting for input.
+			queueHead <- _default.NewDefaultPlcConnectionConnectResult(connection, nil)
+		}
+	} else {
+		log.Debug().Str("connectionString", t.connectionString).
+			Err(connectionResult.GetErr()).
+			Msg("Error connecting new cached connection.")
+		// Tell the connection cache that the connection is no longer available.
+		if t.listeners != nil {
+			event := connectionErrorEvent{
+				conn: *t,
+				err:  connectionResult.GetErr(),
+			}
+			for _, listener := range t.listeners {
+				listener.onConnectionEvent(event)
+			}
+		}
+
+		// Send a failure to all waiting clients.
+		if len(t.queue) > 0 {
+			for _, waitingClient := range t.queue {
+				waitingClient <- _default.NewDefaultPlcConnectionConnectResult(nil, connectionResult.GetErr())
+			}
+		}
+	}
+}
+
+func (t *connectionContainer) addListener(listener connectionListener) {
+	// Get the lock.
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	// Add the listener to the queue
+	t.listeners = append(t.listeners, listener)
+}
+
+func (t *connectionContainer) lease() <-chan plc4go.PlcConnectionConnectResult {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+
+	ch := make(chan plc4go.PlcConnectionConnectResult)
+	// Check if the connection is available.
+	if t.state == StateIdle {
+		t.leaseCounter++
+		connection := newPlcConnectionLease(t, t.leaseCounter, t.connection)
+		t.state = StateInUse
+		// In this case we don't need to check for blocks
+		// as the getConnection function of the connection cache
+		// is definitely eagerly waiting for input.
+		log.Debug().Str("connectionString", t.connectionString).
+			Msg("Got lease instantly as connection was idle.")
+		go func() {
+			ch <- _default.NewDefaultPlcConnectionConnectResult(connection, nil)
+		}()
+	} else if t.state == StateInUse || t.state == StateInitialized {
+		// If the connection is currently busy or not finished initializing,
+		// add the new channel to the queue for this connection.
+		t.queue = append(t.queue, ch)
+		log.Debug().Str("connectionString", t.connectionString).
+			Int("waiting-queue-size", len(t.queue)).
+			Msg("Added lease-request to queue.")
+	}
+	return ch
+}
+
+func (t *connectionContainer) returnConnection(state cachedPlcConnectionState) error {
+	// Intentionally not locking anything, as there are two cases, where the connection is returned:
+	// 1) The connection failed to get established (No connection has a lock anyway)
+	// 2) The connection is returned, then the one returning it already has a lock on it.
+	// If the connection is marked as "invalid", destroy it and remove it from the cache.
+	if state == StateInvalid {
+		// TODO: Perhaps do a maximum number of retries and then call failConnection()
+		log.Debug().Str("connectionString", t.connectionString).
+			Msg("Client returned invalid connection, reconnecting.")
+		t.connect()
+	} else {
+		log.Debug().Str("connectionString", t.connectionString).
+			Msg("Client returned valid connection.")
+	}
+
+	// Check how many others are waiting for this connection.
+	if len(t.queue) > 0 {
+		// There are waiting clients, give the connection to the next client in the line.
+		next := t.queue[0]
+		t.queue = t.queue[1:]
+		t.leaseCounter++
+		connection := newPlcConnectionLease(t, t.leaseCounter, t.connection)
+		// Send asynchronously as the receiver might have given up waiting,
+		// and we don't want anything to block here. 1ms should be enough for
+		// the calling process to reach the blocking read.
+		go func() {
+			// In this case we don't need to check for blocks
+			// as the getConnection function of the connection cache
+			// is definitely eagerly waiting for input.
+			next <- _default.NewDefaultPlcConnectionConnectResult(connection, nil)
+			log.Debug().Str("connectionString", t.connectionString).
+				Int("waiting-queue-size", len(t.queue)).
+				Msg("Returned connection to the next client waiting.")
+		}()
+	} else {
+		// Otherwise, just mark the connection as idle.
+		log.Debug().Str("connectionString", t.connectionString).
+			Msg("Connection set to 'idle'.")
+		t.state = StateIdle
+	}
+	return nil
+}
+
+func (t *connectionContainer) String() string {
+	return fmt.Sprintf("connectionContainer{%s:%s, leaseCounter: %d, closed: %t, state: %s}", t.connectionString, t.connection, t.leaseCounter, t.closed, t.state)
+}
diff --git a/plc4go/pkg/api/cache/plc_connection_lease.go b/plc4go/pkg/api/cache/plc_connection_lease.go
new file mode 100644
index 000000000..12e85c3bf
--- /dev/null
+++ b/plc4go/pkg/api/cache/plc_connection_lease.go
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cache
+
+import (
+	"fmt"
+	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
+	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+	"github.com/apache/plc4x/plc4go/spi"
+	_default "github.com/apache/plc4x/plc4go/spi/default"
+	"github.com/apache/plc4x/plc4go/spi/utils"
+	"time"
+)
+
+type plcConnectionLease struct {
+	// Reference back to the container, so we can give the connection back.
+	connectionContainer *connectionContainer
+	// Counter for the number of times this connection has been used before.
+	leaseId uint32
+	// The actual connection being cached.
+	connection spi.PlcConnection
+}
+
+func newPlcConnectionLease(connectionContainer *connectionContainer, leaseId uint32, connection spi.PlcConnection) *plcConnectionLease {
+	p := &plcConnectionLease{
+		connectionContainer: connectionContainer,
+		leaseId:             leaseId,
+		connection:          connection,
+	}
+	if connection.IsTraceEnabled() {
+		connection.GetTracer().SetConnectionId(p.GetConnectionId())
+	}
+	return p
+}
+
+func (t *plcConnectionLease) IsTraceEnabled() bool {
+	if t.connection == nil {
+		panic("Called 'IsTraceEnabled' on a closed cached connection")
+	}
+	return t.connection.IsTraceEnabled()
+}
+
+func (t *plcConnectionLease) GetTracer() *spi.Tracer {
+	if t.connection == nil {
+		panic("Called 'GetTracer' on a closed cached connection")
+	}
+	return t.connection.GetTracer()
+}
+
+func (t *plcConnectionLease) GetConnectionId() string {
+	if t.connection == nil {
+		panic("Called 'GetConnectionId' on a closed cached connection")
+	}
+	return fmt.Sprintf("%s-%d", t.connection.GetConnectionId(), t.leaseId)
+}
+
+func (t *plcConnectionLease) Connect() <-chan plc4go.PlcConnectionConnectResult {
+	panic("Called 'Connect' on a cached connection")
+}
+
+func (t *plcConnectionLease) BlockingClose() {
+	if t.connection == nil {
+		panic("Called 'BlockingClose' on a closed cached connection")
+	}
+	// Call close and wait for the operation to finish.
+	<-t.Close()
+}
+
+func (t *plcConnectionLease) Close() <-chan plc4go.PlcConnectionCloseResult {
+	if t.connection == nil {
+		panic("Called 'Close' on a closed cached connection")
+	}
+
+	result := make(chan plc4go.PlcConnectionCloseResult)
+
+	go func() {
+		// Check if the connection is still alive, if it is, put it back into the cache
+		pingResults := t.Ping()
+		pingTimeout := time.NewTimer(time.Second * 5)
+		newState := StateIdle
+		select {
+		case pingResult := <-pingResults:
+			{
+				if pingResult.GetErr() != nil {
+					newState = StateInvalid
+				}
+			}
+		case <-pingTimeout.C:
+			{
+				// Add some trace information
+				if t.connection.IsTraceEnabled() {
+					t.connection.GetTracer().AddTrace("ping", "timeout")
+				}
+				// Mark the connection as broken ...
+				newState = StateInvalid
+			}
+		}
+
+		// Extract the trace entries from the connection.
+		var traces []spi.TraceEntry
+		if t.IsTraceEnabled() {
+			tracer := t.GetTracer()
+			// Save all traces.
+			traces = tracer.GetTraces()
+			// Clear the log.
+			tracer.ResetTraces()
+			// Reset the connection id back to the one without the lease-id.
+			tracer.SetConnectionId(t.connection.GetConnectionId())
+		}
+
+		// Return the connection to the connection container and don't actually close it.
+		err := t.connectionContainer.returnConnection(newState)
+
+		// Finish closing the connection.
+		timeout := time.NewTimer(10 * time.Millisecond)
+		defer utils.CleanupTimer(timeout)
+		select {
+		case result <- _default.NewDefaultPlcConnectionCloseResultWithTraces(t, err, traces):
+		case <-timeout.C:
+		}
+
+		// Detach the connection from this lease, so it can no longer be used by the client.
+		t.connection = nil
+	}()
+
+	return result
+}
+
+func (t *plcConnectionLease) IsConnected() bool {
+	if t.connection == nil {
+		return false
+	}
+	return t.connection.IsConnected()
+}
+
+func (t *plcConnectionLease) Ping() <-chan plc4go.PlcConnectionPingResult {
+	if t.connection == nil {
+		panic("Called 'Ping' on a closed cached connection")
+	}
+	return t.connection.Ping()
+}
+
+func (t *plcConnectionLease) GetMetadata() apiModel.PlcConnectionMetadata {
+	if t.connection == nil {
+		panic("Called 'GetMetadata' on a closed cached connection")
+	}
+	return t.connection.GetMetadata()
+}
+
+func (t *plcConnectionLease) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
+	if t.connection == nil {
+		panic("Called 'ReadRequestBuilder' on a closed cached connection")
+	}
+	return t.connection.ReadRequestBuilder()
+}
+
+func (t *plcConnectionLease) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
+	if t.connection == nil {
+		panic("Called 'WriteRequestBuilder' on a closed cached connection")
+	}
+	return t.connection.WriteRequestBuilder()
+}
+
+func (t *plcConnectionLease) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
+	if t.connection == nil {
+		panic("Called 'SubscriptionRequestBuilder' on a closed cached connection")
+	}
+	return t.connection.SubscriptionRequestBuilder()
+}
+
+func (t *plcConnectionLease) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
+	if t.connection == nil {
+		panic("Called 'UnsubscriptionRequestBuilder' on a closed cached connection")
+	}
+	return t.connection.UnsubscriptionRequestBuilder()
+}
+
+func (t *plcConnectionLease) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
+	if t.connection == nil {
+		panic("Called 'BrowseRequestBuilder' on a closed cached connection")
+	}
+	return t.connection.BrowseRequestBuilder()
+}
+
+func (t *plcConnectionLease) String() string {
+	return fmt.Sprintf("plcConnectionLease{connectionContainer: %s, leaseId: %d, connection: %s}", t.connectionContainer, t.leaseId, t.connection)
+}
diff --git a/plc4go/pkg/api/cache/plc_connection_lease_test.go b/plc4go/pkg/api/cache/plc_connection_lease_test.go
new file mode 100644
index 000000000..b46e23bad
--- /dev/null
+++ b/plc4go/pkg/api/cache/plc_connection_lease_test.go
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cache
+
+import (
+	"github.com/apache/plc4x/plc4go/internal/simulated"
+	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
+	"github.com/apache/plc4x/plc4go/spi"
+	"github.com/stretchr/testify/assert"
+	"github.com/viney-shih/go-lock"
+	"testing"
+	"time"
+)
+
+func TestLeasedPlcConnection_IsTraceEnabled(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection().(spi.PlcConnection)
+				assert.True(t, connection.IsTraceEnabled())
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'IsTraceEnabled' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.IsTraceEnabled()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+
+	// The first and second connection should work fine
+	connectionResults = cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection().(spi.PlcConnection)
+				assert.False(t, connection.IsTraceEnabled())
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'IsTraceEnabled' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.IsTraceEnabled()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_GetTracer(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection().(spi.PlcConnection)
+				assert.NotNil(t, connection.GetTracer())
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'GetTracer' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.GetTracer()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_GetConnectionId(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection().(spi.PlcConnection)
+				assert.Greater(t, len(connection.GetConnectionId()), 0)
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'GetConnectionId' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.GetConnectionId()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_Connect(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection().(spi.PlcConnection)
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'Connect' on a cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.Connect()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_BlockingClose(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection()
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'BlockingClose' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.BlockingClose()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_Close(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection()
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'Close' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.Close()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_IsConnected(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection()
+				assert.True(t, connection.IsConnected())
+				connection.BlockingClose()
+				assert.False(t, connection.IsConnected())
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_Ping(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection()
+				connection.Ping()
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'Ping' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.Ping()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_GetMetadata(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection()
+				metadata := connection.GetMetadata()
+				if assert.NotNil(t, metadata) {
+					attributes := metadata.GetConnectionAttributes()
+					assert.NotNil(t, attributes)
+				}
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'GetMetadata' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.GetMetadata()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_ReadRequestBuilder(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection()
+				builder := connection.ReadRequestBuilder()
+				assert.NotNil(t, builder)
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'ReadRequestBuilder' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.ReadRequestBuilder()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_WriteRequestBuilder(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection()
+				builder := connection.WriteRequestBuilder()
+				assert.NotNil(t, builder)
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'WriteRequestBuilder' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.WriteRequestBuilder()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_SubscriptionRequestBuilder(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "not implemented")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.SubscriptionRequestBuilder()
+				}()
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'SubscriptionRequestBuilder' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.SubscriptionRequestBuilder()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_UnsubscriptionRequestBuilder(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "not implemented")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.UnsubscriptionRequestBuilder()
+				}()
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'UnsubscriptionRequestBuilder' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.UnsubscriptionRequestBuilder()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}
+
+func TestLeasedPlcConnection_BrowseRequestBuilder(t *testing.T) {
+	driverManager := plc4go.NewPlcDriverManager()
+	driverManager.RegisterDriver(simulated.NewDriver())
+	// Reduce the max lease time as this way we also reduce the max wait time.
+	cache := plcConnectionCache{
+		driverManager: driverManager,
+		maxLeaseTime:  time.Second * 1,
+		maxWaitTime:   time.Second * 5,
+		cacheLock:     lock.NewCASMutex(),
+		connections:   make(map[string]*connectionContainer),
+		tracer:        nil,
+	}
+	cache.EnableTracer()
+
+	// The first and second connection should work fine
+	connectionResults := cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true")
+	select {
+	case connectionResult := <-connectionResults:
+		if assert.NotNil(t, connectionResult) {
+			assert.Nil(t, connectionResult.GetErr())
+			if assert.NotNil(t, connectionResult.GetConnection()) {
+				connection := connectionResult.GetConnection()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "not implemented")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.BrowseRequestBuilder()
+				}()
+				connection.BlockingClose()
+				func() {
+					defer func() {
+						if r := recover(); r != nil {
+							assert.Equal(t, r, "Called 'BrowseRequestBuilder' on a closed cached connection")
+						} else {
+							t.Errorf("The code did not panic")
+						}
+					}()
+					connection.BrowseRequestBuilder()
+				}()
+			}
+		}
+	case <-time.After(1 * time.Second):
+		t.Errorf("Timeout")
+	}
+}