You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/09/15 16:41:30 UTC

[plc4x] branch develop updated: feat(plc4go/connection-cache): introduce connection cache options.

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5751ff358 feat(plc4go/connection-cache): introduce connection cache options.
5751ff358 is described below

commit 5751ff358c2b30425530e04edac6a789b7ee4327
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Sep 15 18:41:22 2022 +0200

    feat(plc4go/connection-cache): introduce connection cache options.
    
    + reworked the way TraceConnectionCache works. Now if controls the default behavior of the connection_cache. Nevertheless each cache can override its own log behavior
---
 plc4go/pkg/api/cache/plc_connection_cache.go     | 74 +++++++++++++++++-------
 plc4go/pkg/api/cache/plc_connection_common.go    | 18 ------
 plc4go/pkg/api/cache/plc_connection_container.go | 28 ++++-----
 plc4go/pkg/api/config/config.go                  |  2 +-
 4 files changed, 69 insertions(+), 53 deletions(-)

diff --git a/plc4go/pkg/api/cache/plc_connection_cache.go b/plc4go/pkg/api/cache/plc_connection_cache.go
index ecfb4f0cf..503302863 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache.go
@@ -22,10 +22,13 @@ package cache
 import (
 	"fmt"
 	"github.com/apache/plc4x/plc4go/pkg/api"
+	"github.com/apache/plc4x/plc4go/pkg/api/config"
 	"github.com/apache/plc4x/plc4go/spi"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
+	"github.com/rs/zerolog/log"
 	"github.com/viney-shih/go-lock"
 	"time"
 )
@@ -35,12 +38,14 @@ type PlcConnectionCache interface {
 	Close() <-chan PlcConnectionCacheCloseResult
 }
 
-func NewPlcConnectionCache(driverManager plc4go.PlcDriverManager) PlcConnectionCache {
-	return NewPlcConnectionCacheWithMaxLeaseTime(driverManager, time.Second*5)
-}
-
-func NewPlcConnectionCacheWithMaxLeaseTime(driverManager plc4go.PlcDriverManager, maxLeaseTime time.Duration) PlcConnectionCache {
-	return &plcConnectionCache{
+func NewPlcConnectionCache(driverManager plc4go.PlcDriverManager, withConnectionCacheOptions ...WithConnectionCacheOption) PlcConnectionCache {
+	cacheLog := log.Logger
+	if !config.TraceConnectionCache {
+		cacheLog = zerolog.Nop()
+	}
+	maxLeaseTime := time.Second * 5
+	cc := &plcConnectionCache{
+		cacheLog:      cacheLog,
 		driverManager: driverManager,
 		maxLeaseTime:  maxLeaseTime,
 		maxWaitTime:   maxLeaseTime * 5,
@@ -48,6 +53,36 @@ func NewPlcConnectionCacheWithMaxLeaseTime(driverManager plc4go.PlcDriverManager
 		connections:   make(map[string]*connectionContainer),
 		tracer:        nil,
 	}
+	for _, option := range withConnectionCacheOptions {
+		option(cc)
+	}
+	return cc
+}
+
+type WithConnectionCacheOption func(plcConnectionCache *plcConnectionCache)
+
+func WithMaxLeaseTime(duration time.Duration) WithConnectionCacheOption {
+	return func(plcConnectionCache *plcConnectionCache) {
+		plcConnectionCache.maxLeaseTime = duration
+	}
+}
+
+func WithMaxWaitTime(duration time.Duration) WithConnectionCacheOption {
+	return func(plcConnectionCache *plcConnectionCache) {
+		plcConnectionCache.maxLeaseTime = duration
+	}
+}
+
+func WithTracer() WithConnectionCacheOption {
+	return func(plcConnectionCache *plcConnectionCache) {
+		plcConnectionCache.EnableTracer()
+	}
+}
+
+func WithLogger(logger zerolog.Logger) WithConnectionCacheOption {
+	return func(plcConnectionCache *plcConnectionCache) {
+		plcConnectionCache.cacheLog = logger
+	}
 }
 
 ///////////////////////////////////////
@@ -57,6 +92,8 @@ func NewPlcConnectionCacheWithMaxLeaseTime(driverManager plc4go.PlcDriverManager
 //
 
 type plcConnectionCache struct {
+	cacheLog zerolog.Logger
+
 	driverManager plc4go.PlcDriverManager
 
 	// Maximum duration a connection can be used per lease.
@@ -70,13 +107,12 @@ type plcConnectionCache struct {
 }
 
 func (t *plcConnectionCache) onConnectionEvent(event connectionEvent) {
-	setCacheLog()
 	connectionContainerInstance := event.getConnectionContainer()
 	if errorEvent, ok := event.(connectionErrorEvent); ok {
 		if t.tracer != nil {
 			t.tracer.AddTrace("destroy-connection", errorEvent.getError().Error())
 		}
-		cacheLog.Debug().Str("connectionString", connectionContainerInstance.connectionString)
+		t.cacheLog.Debug().Str("connectionString", connectionContainerInstance.connectionString)
 	}
 }
 
@@ -95,7 +131,6 @@ func (t *plcConnectionCache) GetTracer() *spi.Tracer {
 }
 
 func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4go.PlcConnectionConnectResult {
-	setCacheLog()
 	ch := make(chan plc4go.PlcConnectionConnectResult)
 
 	go func() {
@@ -107,9 +142,9 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
 			if t.tracer != nil {
 				t.tracer.AddTrace("get-connection", "create new cached connection")
 			}
-			cacheLog.Debug().Str("connectionString", connectionString).Msg("Create new cached connection")
+			t.cacheLog.Debug().Str("connectionString", connectionString).Msg("Create new cached connection")
 			// Create a new connection container.
-			cc := newConnectionContainer(t.driverManager, connectionString)
+			cc := newConnectionContainer(&t.cacheLog, t.driverManager, connectionString)
 			// Register for connection events (Like connection closed or error).
 			cc.addListener(t)
 			// Store the new connection container in the cache of connections.
@@ -137,7 +172,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
 		select {
 		// Wait till we get a lease.
 		case connectionResponse := <-leaseChan:
-			cacheLog.Debug().Str("connectionString", connectionString).Msg("Successfully got lease to connection")
+			t.cacheLog.Debug().Str("connectionString", connectionString).Msg("Successfully got lease to connection")
 			responseTimeout := time.NewTimer(10 * time.Millisecond)
 			defer utils.CleanupTimer(responseTimeout)
 			select {
@@ -151,7 +186,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
 					t.tracer.AddTransactionalTrace(txId, "get-connection", "client given up")
 				}
 				close(ch)
-				cacheLog.Debug().Str("connectionString", connectionString).Msg("Client not available returning connection to cache.")
+				t.cacheLog.Debug().Str("connectionString", connectionString).Msg("Client not available returning connection to cache.")
 				// Return the connection to give another connection the chance to use it.
 				if connectionResponse.GetConnection() != nil {
 					connectionResponse.GetConnection().Close()
@@ -168,7 +203,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
 			if t.tracer != nil {
 				t.tracer.AddTransactionalTrace(txId, "get-connection", "timeout")
 			}
-			cacheLog.Debug().Str("connectionString", connectionString).Msg("Timeout while waiting for connection.")
+			t.cacheLog.Debug().Str("connectionString", connectionString).Msg("Timeout while waiting for connection.")
 			ch <- _default.NewDefaultPlcConnectionCloseResult(nil, errors.New("timeout while waiting for connection"))
 		}
 	}()
@@ -177,8 +212,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
 }
 
 func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
-	setCacheLog()
-	cacheLog.Debug().Msg("Closing connection cache started.")
+	t.cacheLog.Debug().Msg("Closing connection cache started.")
 	ch := make(chan PlcConnectionCacheCloseResult)
 
 	go func() {
@@ -192,7 +226,7 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
 			case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
 			case <-responseDeliveryTimeout.C:
 			}
-			cacheLog.Debug().Msg("Closing connection cache finished.")
+			t.cacheLog.Debug().Msg("Closing connection cache finished.")
 			return
 		}
 
@@ -210,14 +244,14 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
 				// We also really don't care if it worked, or not ... it's just an attempt of being
 				// nice.
 				case _ = <-leaseResults:
-					cacheLog.Debug().Str("connectionString", container.connectionString).Msg("Gracefully closing connection ...")
+					t.cacheLog.Debug().Str("connectionString", container.connectionString).Msg("Gracefully closing connection ...")
 					// Give back the connection.
 					if container.connection != nil {
 						container.connection.Close()
 					}
 				// If we're timing out brutally kill the connection.
 				case <-closeTimeout.C:
-					cacheLog.Debug().Str("connectionString", container.connectionString).Msg("Forcefully closing connection ...")
+					t.cacheLog.Debug().Str("connectionString", container.connectionString).Msg("Forcefully closing connection ...")
 					// Forcefully close this connection.
 					if container.connection != nil {
 						container.connection.Close()
@@ -230,7 +264,7 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
 				case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
 				case <-responseDeliveryTimeout.C:
 				}
-				cacheLog.Debug().Msg("Closing connection cache finished.")
+				t.cacheLog.Debug().Msg("Closing connection cache finished.")
 			}(cc)
 		}
 	}()
diff --git a/plc4go/pkg/api/cache/plc_connection_common.go b/plc4go/pkg/api/cache/plc_connection_common.go
index ea966830e..906f15778 100644
--- a/plc4go/pkg/api/cache/plc_connection_common.go
+++ b/plc4go/pkg/api/cache/plc_connection_common.go
@@ -19,24 +19,6 @@
 
 package cache
 
-import (
-	"github.com/apache/plc4x/plc4go/pkg/api/config"
-	"github.com/rs/zerolog"
-	"github.com/rs/zerolog/log"
-)
-
-var cacheLog = &log.Logger
-
-// Deprecated: ugly solution for switching logs... The Trace variables should be replaced by methods and listeners.
-func setCacheLog() {
-	if config.TraceConnectionCache {
-		cacheLog = &log.Logger
-	} else {
-		nopper := zerolog.Nop()
-		cacheLog = &nopper
-	}
-}
-
 ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 // cachedPlcConnectionState
 ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/plc4go/pkg/api/cache/plc_connection_container.go b/plc4go/pkg/api/cache/plc_connection_container.go
index c4f1faf04..bb3bd9d55 100644
--- a/plc4go/pkg/api/cache/plc_connection_container.go
+++ b/plc4go/pkg/api/cache/plc_connection_container.go
@@ -25,10 +25,12 @@ import (
 	"github.com/apache/plc4x/plc4go/spi"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
 	"github.com/viney-shih/go-lock"
 )
 
 type connectionContainer struct {
+	cacheLog         *zerolog.Logger
 	lock             lock.RWMutex
 	connectionString string
 	driverManager    plc4go.PlcDriverManager
@@ -44,8 +46,9 @@ type connectionContainer struct {
 	listeners []connectionListener
 }
 
-func newConnectionContainer(driverManager plc4go.PlcDriverManager, connectionString string) *connectionContainer {
+func newConnectionContainer(cacheLog *zerolog.Logger, driverManager plc4go.PlcDriverManager, connectionString string) *connectionContainer {
 	return &connectionContainer{
+		cacheLog:         cacheLog,
 		driverManager:    driverManager,
 		connectionString: connectionString,
 		lock:             lock.NewCASMutex(),
@@ -57,8 +60,7 @@ func newConnectionContainer(driverManager plc4go.PlcDriverManager, connectionStr
 }
 
 func (t *connectionContainer) connect() {
-	setCacheLog()
-	cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Connecting new cached connection ...")
+	t.cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Connecting new cached connection ...")
 	// Initialize the new connection.
 	connectionResultChan := t.driverManager.GetConnection(t.connectionString)
 
@@ -74,7 +76,7 @@ func (t *connectionContainer) connect() {
 	// If the connection was successful, pass the active connection into the container.
 	// If something went wrong, we have to remove the connection from the cache and return the error.
 	if err := connectionResult.GetErr(); err != nil {
-		cacheLog.Debug().Str("connectionString", t.connectionString).
+		t.cacheLog.Debug().Str("connectionString", t.connectionString).
 			Err(err).
 			Msg("Error connecting new cached connection.")
 		// Tell the connection cache that the connection is no longer available.
@@ -98,7 +100,7 @@ func (t *connectionContainer) connect() {
 		return
 	}
 
-	cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Successfully connected new cached connection.")
+	t.cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Successfully connected new cached connection.")
 	// Inject the real connection into the container.
 	if connection, ok := connectionResult.GetConnection().(spi.PlcConnection); !ok {
 		panic("Return connection doesn't implement the spi.PlcConnection interface")
@@ -133,7 +135,6 @@ func (t *connectionContainer) addListener(listener connectionListener) {
 }
 
 func (t *connectionContainer) lease() <-chan plc4go.PlcConnectionConnectResult {
-	setCacheLog()
 	t.lock.Lock()
 	defer t.lock.Unlock()
 
@@ -147,7 +148,7 @@ func (t *connectionContainer) lease() <-chan plc4go.PlcConnectionConnectResult {
 		// In this case we don't need to check for blocks
 		// as the getConnection function of the connection cache
 		// is definitely eagerly waiting for input.
-		cacheLog.Debug().Str("connectionString", t.connectionString).
+		t.cacheLog.Debug().Str("connectionString", t.connectionString).
 			Msg("Got lease instantly as connection was idle.")
 		go func() {
 			ch <- _default.NewDefaultPlcConnectionConnectResult(connection, nil)
@@ -156,17 +157,16 @@ func (t *connectionContainer) lease() <-chan plc4go.PlcConnectionConnectResult {
 		// If the connection is currently busy or not finished initializing,
 		// add the new channel to the queue for this connection.
 		t.queue = append(t.queue, ch)
-		cacheLog.Debug().Str("connectionString", t.connectionString).
+		t.cacheLog.Debug().Str("connectionString", t.connectionString).
 			Int("waiting-queue-size", len(t.queue)).
 			Msg("Added lease-request to queue.")
 	case StateInvalid:
-		cacheLog.Debug().Str("connectionString", t.connectionString).Msg("No lease because invalid")
+		t.cacheLog.Debug().Str("connectionString", t.connectionString).Msg("No lease because invalid")
 	}
 	return ch
 }
 
 func (t *connectionContainer) returnConnection(newState cachedPlcConnectionState) error {
-	setCacheLog()
 	// Intentionally not locking anything, as there are two cases, where the connection is returned:
 	// 1) The connection failed to get established (No connection has a lock anyway)
 	// 2) The connection is returned, then the one returning it already has a lock on it.
@@ -174,11 +174,11 @@ func (t *connectionContainer) returnConnection(newState cachedPlcConnectionState
 	switch newState {
 	case StateInitialized, StateInvalid:
 		// TODO: Perhaps do a maximum number of retries and then call failConnection()
-		cacheLog.Debug().Str("connectionString", t.connectionString).
+		t.cacheLog.Debug().Str("connectionString", t.connectionString).
 			Msgf("Client returned a %s connection, reconnecting.", newState)
 		t.connect()
 	default:
-		cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Client returned valid connection.")
+		t.cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Client returned valid connection.")
 	}
 	t.lock.Lock()
 	defer t.lock.Unlock()
@@ -202,13 +202,13 @@ func (t *connectionContainer) returnConnection(newState cachedPlcConnectionState
 			// as the getConnection function of the connection cache
 			// is definitely eagerly waiting for input.
 			next <- _default.NewDefaultPlcConnectionConnectResult(connection, nil)
-			cacheLog.Debug().Str("connectionString", t.connectionString).
+			t.cacheLog.Debug().Str("connectionString", t.connectionString).
 				Int("waiting-queue-size", len(t.queue)).
 				Msg("Returned connection to the next client waiting.")
 		}()
 	} else {
 		// Otherwise, just mark the connection as idle.
-		cacheLog.Debug().Str("connectionString", t.connectionString).
+		t.cacheLog.Debug().Str("connectionString", t.connectionString).
 			Msg("Connection set to 'idle'.")
 		t.state = StateIdle
 	}
diff --git a/plc4go/pkg/api/config/config.go b/plc4go/pkg/api/config/config.go
index c36d5d4a8..3800755c2 100644
--- a/plc4go/pkg/api/config/config.go
+++ b/plc4go/pkg/api/config/config.go
@@ -26,7 +26,7 @@ var (
 	TraceDefaultMessageCodecWorker      bool
 )
 
-// TraceConnectionCache when set to true the connection cache outputs debug logs
+// TraceConnectionCache when set to true the connection cache outputs logs by default
 var (
 	TraceConnectionCache bool
 )