You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/09/15 12:22:49 UTC
[plc4x] branch develop updated: fix(plc4go/connection-cache): guard against returning broken connections
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new fa18004c8 fix(plc4go/connection-cache): guard against returning broken connections
fa18004c8 is described below
commit fa18004c81d22a08b725c6974c2310072a43b2f3
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Sep 15 14:22:38 2022 +0200
fix(plc4go/connection-cache): guard against returning broken connections
---
plc4go/pkg/api/cache/plc_connection_cache.go | 10 +-
plc4go/pkg/api/cache/plc_connection_cache_test.go | 6 +-
plc4go/pkg/api/cache/plc_connection_container.go | 25 +-
.../pkg/api/cache/plc_connection_container_test.go | 289 +++++++++++++++++++++
4 files changed, 316 insertions(+), 14 deletions(-)
diff --git a/plc4go/pkg/api/cache/plc_connection_cache.go b/plc4go/pkg/api/cache/plc_connection_cache.go
index 71c77e307..925a42365 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache.go
@@ -108,15 +108,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
}
log.Debug().Str("connectionString", connectionString).Msg("Create new cached connection")
// Create a new connection container.
- cc := &connectionContainer{
- driverManager: t.driverManager,
- connectionString: connectionString,
- lock: lock.NewCASMutex(),
- leaseCounter: 0,
- closed: false,
- state: StateInitialized,
- queue: []chan plc4go.PlcConnectionConnectResult{},
- }
+ cc := newConnectionContainer(t.driverManager, connectionString)
// Register for connection events (Like connection closed or error).
cc.addListener(t)
// Store the new connection container in the cache of connections.
diff --git a/plc4go/pkg/api/cache/plc_connection_cache_test.go b/plc4go/pkg/api/cache/plc_connection_cache_test.go
index 4eb436d83..d2bcd34de 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache_test.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache_test.go
@@ -46,7 +46,8 @@ func TestPlcConnectionCache_GetConnection(t *testing.T) {
wantErr bool
wantTimeout bool
}{
- {name: "simple",
+ {
+ name: "simple",
fields: fields{
driverManager: func() plc4go.PlcDriverManager {
driverManager := plc4go.NewPlcDriverManager()
@@ -59,7 +60,8 @@ func TestPlcConnectionCache_GetConnection(t *testing.T) {
wantErr: false,
wantTimeout: false,
},
- {name: "simpleWithTimeout",
+ {
+ name: "simpleWithTimeout",
fields: fields{
driverManager: func() plc4go.PlcDriverManager {
driverManager := plc4go.NewPlcDriverManager()
diff --git a/plc4go/pkg/api/cache/plc_connection_container.go b/plc4go/pkg/api/cache/plc_connection_container.go
index 621a8e2ed..82bc09e4c 100644
--- a/plc4go/pkg/api/cache/plc_connection_container.go
+++ b/plc4go/pkg/api/cache/plc_connection_container.go
@@ -24,6 +24,7 @@ import (
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
+ "github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/viney-shih/go-lock"
)
@@ -44,6 +45,18 @@ type connectionContainer struct {
listeners []connectionListener
}
+func newConnectionContainer(driverManager plc4go.PlcDriverManager, connectionString string) *connectionContainer {
+ return &connectionContainer{
+ driverManager: driverManager,
+ connectionString: connectionString,
+ lock: lock.NewCASMutex(),
+ leaseCounter: 0,
+ closed: false,
+ state: StateInitialized,
+ queue: []chan plc4go.PlcConnectionConnectResult{},
+ }
+}
+
func (t *connectionContainer) connect() {
log.Debug().Str("connectionString", t.connectionString).Msg("Connecting new cached connection ...")
// Initialize the new connection.
@@ -151,20 +164,26 @@ func (t *connectionContainer) lease() <-chan plc4go.PlcConnectionConnectResult {
return ch
}
-func (t *connectionContainer) returnConnection(state cachedPlcConnectionState) error {
+func (t *connectionContainer) returnConnection(newState cachedPlcConnectionState) error {
// Intentionally not locking anything, as there are two cases, where the connection is returned:
// 1) The connection failed to get established (No connection has a lock anyway)
// 2) The connection is returned, then the one returning it already has a lock on it.
// If the connection is marked as "invalid", destroy it and remove it from the cache.
- switch state {
+ switch newState {
case StateInitialized, StateInvalid:
// TODO: Perhaps do a maximum number of retries and then call failConnection()
log.Debug().Str("connectionString", t.connectionString).
- Msgf("Client returned a %s connection, reconnecting.", state)
+ Msgf("Client returned a %s connection, reconnecting.", newState)
t.connect()
default:
log.Debug().Str("connectionString", t.connectionString).Msg("Client returned valid connection.")
}
+ t.lock.Lock()
+ defer t.lock.Unlock()
+ if t.connection == nil {
+ t.state = StateInvalid
+ return errors.New("Can't return a broken connection")
+ }
// Check how many others are waiting for this connection.
if len(t.queue) > 0 {
diff --git a/plc4go/pkg/api/cache/plc_connection_container_test.go b/plc4go/pkg/api/cache/plc_connection_container_test.go
new file mode 100644
index 000000000..94f784260
--- /dev/null
+++ b/plc4go/pkg/api/cache/plc_connection_container_test.go
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cache
+
+import (
+ "fmt"
+ "github.com/apache/plc4x/plc4go/internal/simulated"
+ plc4go "github.com/apache/plc4x/plc4go/pkg/api"
+ "github.com/apache/plc4x/plc4go/spi"
+ "github.com/stretchr/testify/assert"
+ "github.com/viney-shih/go-lock"
+ "testing"
+)
+
+func Test_connectionContainer_String(t1 *testing.T) {
+ type fields struct {
+ lock lock.RWMutex
+ connectionString string
+ driverManager plc4go.PlcDriverManager
+ tracerEnabled bool
+ connection spi.PlcConnection
+ leaseCounter uint32
+ closed bool
+ state cachedPlcConnectionState
+ queue []chan plc4go.PlcConnectionConnectResult
+ listeners []connectionListener
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ // TODO: Add test cases.
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t1 *testing.T) {
+ t := &connectionContainer{
+ lock: tt.fields.lock,
+ connectionString: tt.fields.connectionString,
+ driverManager: tt.fields.driverManager,
+ tracerEnabled: tt.fields.tracerEnabled,
+ connection: tt.fields.connection,
+ leaseCounter: tt.fields.leaseCounter,
+ closed: tt.fields.closed,
+ state: tt.fields.state,
+ queue: tt.fields.queue,
+ listeners: tt.fields.listeners,
+ }
+ assert.Equalf(t1, tt.want, t.String(), "String()")
+ })
+ }
+}
+
+func Test_connectionContainer_addListener(t1 *testing.T) {
+ type fields struct {
+ lock lock.RWMutex
+ connectionString string
+ driverManager plc4go.PlcDriverManager
+ tracerEnabled bool
+ connection spi.PlcConnection
+ leaseCounter uint32
+ closed bool
+ state cachedPlcConnectionState
+ queue []chan plc4go.PlcConnectionConnectResult
+ listeners []connectionListener
+ }
+ type args struct {
+ listener connectionListener
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ }{
+ // TODO: Add test cases.
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t1 *testing.T) {
+ t := &connectionContainer{
+ lock: tt.fields.lock,
+ connectionString: tt.fields.connectionString,
+ driverManager: tt.fields.driverManager,
+ tracerEnabled: tt.fields.tracerEnabled,
+ connection: tt.fields.connection,
+ leaseCounter: tt.fields.leaseCounter,
+ closed: tt.fields.closed,
+ state: tt.fields.state,
+ queue: tt.fields.queue,
+ listeners: tt.fields.listeners,
+ }
+ t.addListener(tt.args.listener)
+ })
+ }
+}
+
+func Test_connectionContainer_connect(t1 *testing.T) {
+ type fields struct {
+ lock lock.RWMutex
+ connectionString string
+ driverManager plc4go.PlcDriverManager
+ tracerEnabled bool
+ connection spi.PlcConnection
+ leaseCounter uint32
+ closed bool
+ state cachedPlcConnectionState
+ queue []chan plc4go.PlcConnectionConnectResult
+ listeners []connectionListener
+ }
+ tests := []struct {
+ name string
+ fields fields
+ }{
+ {
+ name: "connect fresh",
+ fields: fields{
+ driverManager: func() plc4go.PlcDriverManager {
+ driverManager := plc4go.NewPlcDriverManager()
+ driverManager.RegisterDriver(simulated.NewDriver())
+ return driverManager
+ }(),
+ connectionString: "simulated://1.2.3.4:42",
+ lock: lock.NewCASMutex(),
+ queue: []chan plc4go.PlcConnectionConnectResult{},
+ },
+ },
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t1 *testing.T) {
+ t := &connectionContainer{
+ lock: tt.fields.lock,
+ connectionString: tt.fields.connectionString,
+ driverManager: tt.fields.driverManager,
+ tracerEnabled: tt.fields.tracerEnabled,
+ connection: tt.fields.connection,
+ leaseCounter: tt.fields.leaseCounter,
+ closed: tt.fields.closed,
+ state: tt.fields.state,
+ queue: tt.fields.queue,
+ listeners: tt.fields.listeners,
+ }
+ t.connect()
+ })
+ }
+}
+
+func Test_connectionContainer_lease(t1 *testing.T) {
+ type fields struct {
+ lock lock.RWMutex
+ connectionString string
+ driverManager plc4go.PlcDriverManager
+ tracerEnabled bool
+ connection spi.PlcConnection
+ leaseCounter uint32
+ closed bool
+ state cachedPlcConnectionState
+ queue []chan plc4go.PlcConnectionConnectResult
+ listeners []connectionListener
+ }
+ tests := []struct {
+ name string
+ fields fields
+ wantNotNil bool
+ }{
+ {
+ name: "lease fresh",
+ fields: fields{
+ driverManager: func() plc4go.PlcDriverManager {
+ driverManager := plc4go.NewPlcDriverManager()
+ driverManager.RegisterDriver(simulated.NewDriver())
+ return driverManager
+ }(),
+ connectionString: "simulated://1.2.3.4:42",
+ lock: lock.NewCASMutex(),
+ queue: []chan plc4go.PlcConnectionConnectResult{},
+ },
+ wantNotNil: true,
+ },
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t1 *testing.T) {
+ t := &connectionContainer{
+ lock: tt.fields.lock,
+ connectionString: tt.fields.connectionString,
+ driverManager: tt.fields.driverManager,
+ tracerEnabled: tt.fields.tracerEnabled,
+ connection: tt.fields.connection,
+ leaseCounter: tt.fields.leaseCounter,
+ closed: tt.fields.closed,
+ state: tt.fields.state,
+ queue: tt.fields.queue,
+ listeners: tt.fields.listeners,
+ }
+ assert.True(t1, tt.wantNotNil, t.lease(), "lease()")
+ })
+ }
+}
+
+func Test_connectionContainer_returnConnection(t1 *testing.T) {
+ type fields struct {
+ lock lock.RWMutex
+ connectionString string
+ driverManager plc4go.PlcDriverManager
+ tracerEnabled bool
+ connection spi.PlcConnection
+ leaseCounter uint32
+ closed bool
+ state cachedPlcConnectionState
+ queue []chan plc4go.PlcConnectionConnectResult
+ listeners []connectionListener
+ }
+ type args struct {
+ state cachedPlcConnectionState
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "return connection fresh",
+ fields: fields{
+ driverManager: func() plc4go.PlcDriverManager {
+ driverManager := plc4go.NewPlcDriverManager()
+ driverManager.RegisterDriver(simulated.NewDriver())
+ return driverManager
+ }(),
+ connectionString: "simulated://1.2.3.4:42",
+ lock: lock.NewCASMutex(),
+ queue: []chan plc4go.PlcConnectionConnectResult{},
+ },
+ args: args{
+ state: StateInitialized,
+ },
+ wantErr: assert.NoError,
+ },
+ {
+ name: "return unconnected connection",
+ fields: fields{
+ driverManager: func() plc4go.PlcDriverManager {
+ driverManager := plc4go.NewPlcDriverManager()
+ driverManager.RegisterDriver(simulated.NewDriver())
+ return driverManager
+ }(),
+ connectionString: "simulated://1.2.3.4:42",
+ lock: lock.NewCASMutex(),
+ queue: []chan plc4go.PlcConnectionConnectResult{},
+ },
+ args: args{
+ state: StateInUse,
+ },
+ wantErr: assert.Error,
+ },
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t1 *testing.T) {
+ t := &connectionContainer{
+ lock: tt.fields.lock,
+ connectionString: tt.fields.connectionString,
+ driverManager: tt.fields.driverManager,
+ tracerEnabled: tt.fields.tracerEnabled,
+ connection: tt.fields.connection,
+ leaseCounter: tt.fields.leaseCounter,
+ closed: tt.fields.closed,
+ state: tt.fields.state,
+ queue: tt.fields.queue,
+ listeners: tt.fields.listeners,
+ }
+ tt.wantErr(t1, t.returnConnection(tt.args.state), fmt.Sprintf("returnConnection(%v)", tt.args.state))
+ })
+ }
+}