You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/09/15 16:41:30 UTC
[plc4x] branch develop updated: feat(plc4go/connection-cache): introduce connection cache options.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 5751ff358 feat(plc4go/connection-cache): introduce connection cache options.
5751ff358 is described below
commit 5751ff358c2b30425530e04edac6a789b7ee4327
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Sep 15 18:41:22 2022 +0200
feat(plc4go/connection-cache): introduce connection cache options.
+ reworked the way TraceConnectionCache works. Now if controls the default behavior of the connection_cache. Nevertheless each cache can override its own log behavior
---
plc4go/pkg/api/cache/plc_connection_cache.go | 74 +++++++++++++++++-------
plc4go/pkg/api/cache/plc_connection_common.go | 18 ------
plc4go/pkg/api/cache/plc_connection_container.go | 28 ++++-----
plc4go/pkg/api/config/config.go | 2 +-
4 files changed, 69 insertions(+), 53 deletions(-)
diff --git a/plc4go/pkg/api/cache/plc_connection_cache.go b/plc4go/pkg/api/cache/plc_connection_cache.go
index ecfb4f0cf..503302863 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache.go
@@ -22,10 +22,13 @@ package cache
import (
"fmt"
"github.com/apache/plc4x/plc4go/pkg/api"
+ "github.com/apache/plc4x/plc4go/pkg/api/config"
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
+ "github.com/rs/zerolog/log"
"github.com/viney-shih/go-lock"
"time"
)
@@ -35,12 +38,14 @@ type PlcConnectionCache interface {
Close() <-chan PlcConnectionCacheCloseResult
}
-func NewPlcConnectionCache(driverManager plc4go.PlcDriverManager) PlcConnectionCache {
- return NewPlcConnectionCacheWithMaxLeaseTime(driverManager, time.Second*5)
-}
-
-func NewPlcConnectionCacheWithMaxLeaseTime(driverManager plc4go.PlcDriverManager, maxLeaseTime time.Duration) PlcConnectionCache {
- return &plcConnectionCache{
+func NewPlcConnectionCache(driverManager plc4go.PlcDriverManager, withConnectionCacheOptions ...WithConnectionCacheOption) PlcConnectionCache {
+ cacheLog := log.Logger
+ if !config.TraceConnectionCache {
+ cacheLog = zerolog.Nop()
+ }
+ maxLeaseTime := time.Second * 5
+ cc := &plcConnectionCache{
+ cacheLog: cacheLog,
driverManager: driverManager,
maxLeaseTime: maxLeaseTime,
maxWaitTime: maxLeaseTime * 5,
@@ -48,6 +53,36 @@ func NewPlcConnectionCacheWithMaxLeaseTime(driverManager plc4go.PlcDriverManager
connections: make(map[string]*connectionContainer),
tracer: nil,
}
+ for _, option := range withConnectionCacheOptions {
+ option(cc)
+ }
+ return cc
+}
+
+type WithConnectionCacheOption func(plcConnectionCache *plcConnectionCache)
+
+func WithMaxLeaseTime(duration time.Duration) WithConnectionCacheOption {
+ return func(plcConnectionCache *plcConnectionCache) {
+ plcConnectionCache.maxLeaseTime = duration
+ }
+}
+
+func WithMaxWaitTime(duration time.Duration) WithConnectionCacheOption {
+ return func(plcConnectionCache *plcConnectionCache) {
+ plcConnectionCache.maxLeaseTime = duration
+ }
+}
+
+func WithTracer() WithConnectionCacheOption {
+ return func(plcConnectionCache *plcConnectionCache) {
+ plcConnectionCache.EnableTracer()
+ }
+}
+
+func WithLogger(logger zerolog.Logger) WithConnectionCacheOption {
+ return func(plcConnectionCache *plcConnectionCache) {
+ plcConnectionCache.cacheLog = logger
+ }
}
///////////////////////////////////////
@@ -57,6 +92,8 @@ func NewPlcConnectionCacheWithMaxLeaseTime(driverManager plc4go.PlcDriverManager
//
type plcConnectionCache struct {
+ cacheLog zerolog.Logger
+
driverManager plc4go.PlcDriverManager
// Maximum duration a connection can be used per lease.
@@ -70,13 +107,12 @@ type plcConnectionCache struct {
}
func (t *plcConnectionCache) onConnectionEvent(event connectionEvent) {
- setCacheLog()
connectionContainerInstance := event.getConnectionContainer()
if errorEvent, ok := event.(connectionErrorEvent); ok {
if t.tracer != nil {
t.tracer.AddTrace("destroy-connection", errorEvent.getError().Error())
}
- cacheLog.Debug().Str("connectionString", connectionContainerInstance.connectionString)
+ t.cacheLog.Debug().Str("connectionString", connectionContainerInstance.connectionString)
}
}
@@ -95,7 +131,6 @@ func (t *plcConnectionCache) GetTracer() *spi.Tracer {
}
func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4go.PlcConnectionConnectResult {
- setCacheLog()
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
@@ -107,9 +142,9 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
if t.tracer != nil {
t.tracer.AddTrace("get-connection", "create new cached connection")
}
- cacheLog.Debug().Str("connectionString", connectionString).Msg("Create new cached connection")
+ t.cacheLog.Debug().Str("connectionString", connectionString).Msg("Create new cached connection")
// Create a new connection container.
- cc := newConnectionContainer(t.driverManager, connectionString)
+ cc := newConnectionContainer(&t.cacheLog, t.driverManager, connectionString)
// Register for connection events (Like connection closed or error).
cc.addListener(t)
// Store the new connection container in the cache of connections.
@@ -137,7 +172,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
select {
// Wait till we get a lease.
case connectionResponse := <-leaseChan:
- cacheLog.Debug().Str("connectionString", connectionString).Msg("Successfully got lease to connection")
+ t.cacheLog.Debug().Str("connectionString", connectionString).Msg("Successfully got lease to connection")
responseTimeout := time.NewTimer(10 * time.Millisecond)
defer utils.CleanupTimer(responseTimeout)
select {
@@ -151,7 +186,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
t.tracer.AddTransactionalTrace(txId, "get-connection", "client given up")
}
close(ch)
- cacheLog.Debug().Str("connectionString", connectionString).Msg("Client not available returning connection to cache.")
+ t.cacheLog.Debug().Str("connectionString", connectionString).Msg("Client not available returning connection to cache.")
// Return the connection to give another connection the chance to use it.
if connectionResponse.GetConnection() != nil {
connectionResponse.GetConnection().Close()
@@ -168,7 +203,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
if t.tracer != nil {
t.tracer.AddTransactionalTrace(txId, "get-connection", "timeout")
}
- cacheLog.Debug().Str("connectionString", connectionString).Msg("Timeout while waiting for connection.")
+ t.cacheLog.Debug().Str("connectionString", connectionString).Msg("Timeout while waiting for connection.")
ch <- _default.NewDefaultPlcConnectionCloseResult(nil, errors.New("timeout while waiting for connection"))
}
}()
@@ -177,8 +212,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
}
func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
- setCacheLog()
- cacheLog.Debug().Msg("Closing connection cache started.")
+ t.cacheLog.Debug().Msg("Closing connection cache started.")
ch := make(chan PlcConnectionCacheCloseResult)
go func() {
@@ -192,7 +226,7 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
case <-responseDeliveryTimeout.C:
}
- cacheLog.Debug().Msg("Closing connection cache finished.")
+ t.cacheLog.Debug().Msg("Closing connection cache finished.")
return
}
@@ -210,14 +244,14 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
// We also really don't care if it worked, or not ... it's just an attempt of being
// nice.
case _ = <-leaseResults:
- cacheLog.Debug().Str("connectionString", container.connectionString).Msg("Gracefully closing connection ...")
+ t.cacheLog.Debug().Str("connectionString", container.connectionString).Msg("Gracefully closing connection ...")
// Give back the connection.
if container.connection != nil {
container.connection.Close()
}
// If we're timing out brutally kill the connection.
case <-closeTimeout.C:
- cacheLog.Debug().Str("connectionString", container.connectionString).Msg("Forcefully closing connection ...")
+ t.cacheLog.Debug().Str("connectionString", container.connectionString).Msg("Forcefully closing connection ...")
// Forcefully close this connection.
if container.connection != nil {
container.connection.Close()
@@ -230,7 +264,7 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
case <-responseDeliveryTimeout.C:
}
- cacheLog.Debug().Msg("Closing connection cache finished.")
+ t.cacheLog.Debug().Msg("Closing connection cache finished.")
}(cc)
}
}()
diff --git a/plc4go/pkg/api/cache/plc_connection_common.go b/plc4go/pkg/api/cache/plc_connection_common.go
index ea966830e..906f15778 100644
--- a/plc4go/pkg/api/cache/plc_connection_common.go
+++ b/plc4go/pkg/api/cache/plc_connection_common.go
@@ -19,24 +19,6 @@
package cache
-import (
- "github.com/apache/plc4x/plc4go/pkg/api/config"
- "github.com/rs/zerolog"
- "github.com/rs/zerolog/log"
-)
-
-var cacheLog = &log.Logger
-
-// Deprecated: ugly solution for switching logs... The Trace variables should be replaced by methods and listeners.
-func setCacheLog() {
- if config.TraceConnectionCache {
- cacheLog = &log.Logger
- } else {
- nopper := zerolog.Nop()
- cacheLog = &nopper
- }
-}
-
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// cachedPlcConnectionState
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/plc4go/pkg/api/cache/plc_connection_container.go b/plc4go/pkg/api/cache/plc_connection_container.go
index c4f1faf04..bb3bd9d55 100644
--- a/plc4go/pkg/api/cache/plc_connection_container.go
+++ b/plc4go/pkg/api/cache/plc_connection_container.go
@@ -25,10 +25,12 @@ import (
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"github.com/viney-shih/go-lock"
)
type connectionContainer struct {
+ cacheLog *zerolog.Logger
lock lock.RWMutex
connectionString string
driverManager plc4go.PlcDriverManager
@@ -44,8 +46,9 @@ type connectionContainer struct {
listeners []connectionListener
}
-func newConnectionContainer(driverManager plc4go.PlcDriverManager, connectionString string) *connectionContainer {
+func newConnectionContainer(cacheLog *zerolog.Logger, driverManager plc4go.PlcDriverManager, connectionString string) *connectionContainer {
return &connectionContainer{
+ cacheLog: cacheLog,
driverManager: driverManager,
connectionString: connectionString,
lock: lock.NewCASMutex(),
@@ -57,8 +60,7 @@ func newConnectionContainer(driverManager plc4go.PlcDriverManager, connectionStr
}
func (t *connectionContainer) connect() {
- setCacheLog()
- cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Connecting new cached connection ...")
+ t.cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Connecting new cached connection ...")
// Initialize the new connection.
connectionResultChan := t.driverManager.GetConnection(t.connectionString)
@@ -74,7 +76,7 @@ func (t *connectionContainer) connect() {
// If the connection was successful, pass the active connection into the container.
// If something went wrong, we have to remove the connection from the cache and return the error.
if err := connectionResult.GetErr(); err != nil {
- cacheLog.Debug().Str("connectionString", t.connectionString).
+ t.cacheLog.Debug().Str("connectionString", t.connectionString).
Err(err).
Msg("Error connecting new cached connection.")
// Tell the connection cache that the connection is no longer available.
@@ -98,7 +100,7 @@ func (t *connectionContainer) connect() {
return
}
- cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Successfully connected new cached connection.")
+ t.cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Successfully connected new cached connection.")
// Inject the real connection into the container.
if connection, ok := connectionResult.GetConnection().(spi.PlcConnection); !ok {
panic("Return connection doesn't implement the spi.PlcConnection interface")
@@ -133,7 +135,6 @@ func (t *connectionContainer) addListener(listener connectionListener) {
}
func (t *connectionContainer) lease() <-chan plc4go.PlcConnectionConnectResult {
- setCacheLog()
t.lock.Lock()
defer t.lock.Unlock()
@@ -147,7 +148,7 @@ func (t *connectionContainer) lease() <-chan plc4go.PlcConnectionConnectResult {
// In this case we don't need to check for blocks
// as the getConnection function of the connection cache
// is definitely eagerly waiting for input.
- cacheLog.Debug().Str("connectionString", t.connectionString).
+ t.cacheLog.Debug().Str("connectionString", t.connectionString).
Msg("Got lease instantly as connection was idle.")
go func() {
ch <- _default.NewDefaultPlcConnectionConnectResult(connection, nil)
@@ -156,17 +157,16 @@ func (t *connectionContainer) lease() <-chan plc4go.PlcConnectionConnectResult {
// If the connection is currently busy or not finished initializing,
// add the new channel to the queue for this connection.
t.queue = append(t.queue, ch)
- cacheLog.Debug().Str("connectionString", t.connectionString).
+ t.cacheLog.Debug().Str("connectionString", t.connectionString).
Int("waiting-queue-size", len(t.queue)).
Msg("Added lease-request to queue.")
case StateInvalid:
- cacheLog.Debug().Str("connectionString", t.connectionString).Msg("No lease because invalid")
+ t.cacheLog.Debug().Str("connectionString", t.connectionString).Msg("No lease because invalid")
}
return ch
}
func (t *connectionContainer) returnConnection(newState cachedPlcConnectionState) error {
- setCacheLog()
// Intentionally not locking anything, as there are two cases, where the connection is returned:
// 1) The connection failed to get established (No connection has a lock anyway)
// 2) The connection is returned, then the one returning it already has a lock on it.
@@ -174,11 +174,11 @@ func (t *connectionContainer) returnConnection(newState cachedPlcConnectionState
switch newState {
case StateInitialized, StateInvalid:
// TODO: Perhaps do a maximum number of retries and then call failConnection()
- cacheLog.Debug().Str("connectionString", t.connectionString).
+ t.cacheLog.Debug().Str("connectionString", t.connectionString).
Msgf("Client returned a %s connection, reconnecting.", newState)
t.connect()
default:
- cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Client returned valid connection.")
+ t.cacheLog.Debug().Str("connectionString", t.connectionString).Msg("Client returned valid connection.")
}
t.lock.Lock()
defer t.lock.Unlock()
@@ -202,13 +202,13 @@ func (t *connectionContainer) returnConnection(newState cachedPlcConnectionState
// as the getConnection function of the connection cache
// is definitely eagerly waiting for input.
next <- _default.NewDefaultPlcConnectionConnectResult(connection, nil)
- cacheLog.Debug().Str("connectionString", t.connectionString).
+ t.cacheLog.Debug().Str("connectionString", t.connectionString).
Int("waiting-queue-size", len(t.queue)).
Msg("Returned connection to the next client waiting.")
}()
} else {
// Otherwise, just mark the connection as idle.
- cacheLog.Debug().Str("connectionString", t.connectionString).
+ t.cacheLog.Debug().Str("connectionString", t.connectionString).
Msg("Connection set to 'idle'.")
t.state = StateIdle
}
diff --git a/plc4go/pkg/api/config/config.go b/plc4go/pkg/api/config/config.go
index c36d5d4a8..3800755c2 100644
--- a/plc4go/pkg/api/config/config.go
+++ b/plc4go/pkg/api/config/config.go
@@ -26,7 +26,7 @@ var (
TraceDefaultMessageCodecWorker bool
)
-// TraceConnectionCache when set to true the connection cache outputs debug logs
+// TraceConnectionCache when set to true the connection cache outputs logs by default
var (
TraceConnectionCache bool
)