You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/09/15 12:22:49 UTC

[plc4x] branch develop updated: fix(plc4go/connection-cache): guard against returning broken connections

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new fa18004c8 fix(plc4go/connection-cache): guard against returning broken connections
fa18004c8 is described below

commit fa18004c81d22a08b725c6974c2310072a43b2f3
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Sep 15 14:22:38 2022 +0200

    fix(plc4go/connection-cache): guard against returning broken connections
---
 plc4go/pkg/api/cache/plc_connection_cache.go       |  10 +-
 plc4go/pkg/api/cache/plc_connection_cache_test.go  |   6 +-
 plc4go/pkg/api/cache/plc_connection_container.go   |  25 +-
 .../pkg/api/cache/plc_connection_container_test.go | 289 +++++++++++++++++++++
 4 files changed, 316 insertions(+), 14 deletions(-)

diff --git a/plc4go/pkg/api/cache/plc_connection_cache.go b/plc4go/pkg/api/cache/plc_connection_cache.go
index 71c77e307..925a42365 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache.go
@@ -108,15 +108,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
 			}
 			log.Debug().Str("connectionString", connectionString).Msg("Create new cached connection")
 			// Create a new connection container.
-			cc := &connectionContainer{
-				driverManager:    t.driverManager,
-				connectionString: connectionString,
-				lock:             lock.NewCASMutex(),
-				leaseCounter:     0,
-				closed:           false,
-				state:            StateInitialized,
-				queue:            []chan plc4go.PlcConnectionConnectResult{},
-			}
+			cc := newConnectionContainer(t.driverManager, connectionString)
 			// Register for connection events (Like connection closed or error).
 			cc.addListener(t)
 			// Store the new connection container in the cache of connections.
diff --git a/plc4go/pkg/api/cache/plc_connection_cache_test.go b/plc4go/pkg/api/cache/plc_connection_cache_test.go
index 4eb436d83..d2bcd34de 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache_test.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache_test.go
@@ -46,7 +46,8 @@ func TestPlcConnectionCache_GetConnection(t *testing.T) {
 		wantErr     bool
 		wantTimeout bool
 	}{
-		{name: "simple",
+		{
+			name: "simple",
 			fields: fields{
 				driverManager: func() plc4go.PlcDriverManager {
 					driverManager := plc4go.NewPlcDriverManager()
@@ -59,7 +60,8 @@ func TestPlcConnectionCache_GetConnection(t *testing.T) {
 			wantErr:     false,
 			wantTimeout: false,
 		},
-		{name: "simpleWithTimeout",
+		{
+			name: "simpleWithTimeout",
 			fields: fields{
 				driverManager: func() plc4go.PlcDriverManager {
 					driverManager := plc4go.NewPlcDriverManager()
diff --git a/plc4go/pkg/api/cache/plc_connection_container.go b/plc4go/pkg/api/cache/plc_connection_container.go
index 621a8e2ed..82bc09e4c 100644
--- a/plc4go/pkg/api/cache/plc_connection_container.go
+++ b/plc4go/pkg/api/cache/plc_connection_container.go
@@ -24,6 +24,7 @@ import (
 	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
 	"github.com/apache/plc4x/plc4go/spi"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
+	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 	"github.com/viney-shih/go-lock"
 )
@@ -44,6 +45,18 @@ type connectionContainer struct {
 	listeners []connectionListener
 }
 
+func newConnectionContainer(driverManager plc4go.PlcDriverManager, connectionString string) *connectionContainer {
+	return &connectionContainer{
+		driverManager:    driverManager,
+		connectionString: connectionString,
+		lock:             lock.NewCASMutex(),
+		leaseCounter:     0,
+		closed:           false,
+		state:            StateInitialized,
+		queue:            []chan plc4go.PlcConnectionConnectResult{},
+	}
+}
+
 func (t *connectionContainer) connect() {
 	log.Debug().Str("connectionString", t.connectionString).Msg("Connecting new cached connection ...")
 	// Initialize the new connection.
@@ -151,20 +164,26 @@ func (t *connectionContainer) lease() <-chan plc4go.PlcConnectionConnectResult {
 	return ch
 }
 
-func (t *connectionContainer) returnConnection(state cachedPlcConnectionState) error {
+func (t *connectionContainer) returnConnection(newState cachedPlcConnectionState) error {
 	// Intentionally not locking anything, as there are two cases, where the connection is returned:
 	// 1) The connection failed to get established (No connection has a lock anyway)
 	// 2) The connection is returned, then the one returning it already has a lock on it.
 	// If the connection is marked as "invalid", destroy it and remove it from the cache.
-	switch state {
+	switch newState {
 	case StateInitialized, StateInvalid:
 		// TODO: Perhaps do a maximum number of retries and then call failConnection()
 		log.Debug().Str("connectionString", t.connectionString).
-			Msgf("Client returned a %s connection, reconnecting.", state)
+			Msgf("Client returned a %s connection, reconnecting.", newState)
 		t.connect()
 	default:
 		log.Debug().Str("connectionString", t.connectionString).Msg("Client returned valid connection.")
 	}
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	if t.connection == nil {
+		t.state = StateInvalid
+		return errors.New("Can't return a broken connection")
+	}
 
 	// Check how many others are waiting for this connection.
 	if len(t.queue) > 0 {
diff --git a/plc4go/pkg/api/cache/plc_connection_container_test.go b/plc4go/pkg/api/cache/plc_connection_container_test.go
new file mode 100644
index 000000000..94f784260
--- /dev/null
+++ b/plc4go/pkg/api/cache/plc_connection_container_test.go
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cache
+
+import (
+	"fmt"
+	"github.com/apache/plc4x/plc4go/internal/simulated"
+	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
+	"github.com/apache/plc4x/plc4go/spi"
+	"github.com/stretchr/testify/assert"
+	"github.com/viney-shih/go-lock"
+	"testing"
+)
+
+func Test_connectionContainer_String(t1 *testing.T) {
+	type fields struct {
+		lock             lock.RWMutex
+		connectionString string
+		driverManager    plc4go.PlcDriverManager
+		tracerEnabled    bool
+		connection       spi.PlcConnection
+		leaseCounter     uint32
+		closed           bool
+		state            cachedPlcConnectionState
+		queue            []chan plc4go.PlcConnectionConnectResult
+		listeners        []connectionListener
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &connectionContainer{
+				lock:             tt.fields.lock,
+				connectionString: tt.fields.connectionString,
+				driverManager:    tt.fields.driverManager,
+				tracerEnabled:    tt.fields.tracerEnabled,
+				connection:       tt.fields.connection,
+				leaseCounter:     tt.fields.leaseCounter,
+				closed:           tt.fields.closed,
+				state:            tt.fields.state,
+				queue:            tt.fields.queue,
+				listeners:        tt.fields.listeners,
+			}
+			assert.Equalf(t1, tt.want, t.String(), "String()")
+		})
+	}
+}
+
+func Test_connectionContainer_addListener(t1 *testing.T) {
+	type fields struct {
+		lock             lock.RWMutex
+		connectionString string
+		driverManager    plc4go.PlcDriverManager
+		tracerEnabled    bool
+		connection       spi.PlcConnection
+		leaseCounter     uint32
+		closed           bool
+		state            cachedPlcConnectionState
+		queue            []chan plc4go.PlcConnectionConnectResult
+		listeners        []connectionListener
+	}
+	type args struct {
+		listener connectionListener
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		args   args
+	}{
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &connectionContainer{
+				lock:             tt.fields.lock,
+				connectionString: tt.fields.connectionString,
+				driverManager:    tt.fields.driverManager,
+				tracerEnabled:    tt.fields.tracerEnabled,
+				connection:       tt.fields.connection,
+				leaseCounter:     tt.fields.leaseCounter,
+				closed:           tt.fields.closed,
+				state:            tt.fields.state,
+				queue:            tt.fields.queue,
+				listeners:        tt.fields.listeners,
+			}
+			t.addListener(tt.args.listener)
+		})
+	}
+}
+
+func Test_connectionContainer_connect(t1 *testing.T) {
+	type fields struct {
+		lock             lock.RWMutex
+		connectionString string
+		driverManager    plc4go.PlcDriverManager
+		tracerEnabled    bool
+		connection       spi.PlcConnection
+		leaseCounter     uint32
+		closed           bool
+		state            cachedPlcConnectionState
+		queue            []chan plc4go.PlcConnectionConnectResult
+		listeners        []connectionListener
+	}
+	tests := []struct {
+		name   string
+		fields fields
+	}{
+		{
+			name: "connect fresh",
+			fields: fields{
+				driverManager: func() plc4go.PlcDriverManager {
+					driverManager := plc4go.NewPlcDriverManager()
+					driverManager.RegisterDriver(simulated.NewDriver())
+					return driverManager
+				}(),
+				connectionString: "simulated://1.2.3.4:42",
+				lock:             lock.NewCASMutex(),
+				queue:            []chan plc4go.PlcConnectionConnectResult{},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &connectionContainer{
+				lock:             tt.fields.lock,
+				connectionString: tt.fields.connectionString,
+				driverManager:    tt.fields.driverManager,
+				tracerEnabled:    tt.fields.tracerEnabled,
+				connection:       tt.fields.connection,
+				leaseCounter:     tt.fields.leaseCounter,
+				closed:           tt.fields.closed,
+				state:            tt.fields.state,
+				queue:            tt.fields.queue,
+				listeners:        tt.fields.listeners,
+			}
+			t.connect()
+		})
+	}
+}
+
+func Test_connectionContainer_lease(t1 *testing.T) {
+	type fields struct {
+		lock             lock.RWMutex
+		connectionString string
+		driverManager    plc4go.PlcDriverManager
+		tracerEnabled    bool
+		connection       spi.PlcConnection
+		leaseCounter     uint32
+		closed           bool
+		state            cachedPlcConnectionState
+		queue            []chan plc4go.PlcConnectionConnectResult
+		listeners        []connectionListener
+	}
+	tests := []struct {
+		name       string
+		fields     fields
+		wantNotNil bool
+	}{
+		{
+			name: "lease fresh",
+			fields: fields{
+				driverManager: func() plc4go.PlcDriverManager {
+					driverManager := plc4go.NewPlcDriverManager()
+					driverManager.RegisterDriver(simulated.NewDriver())
+					return driverManager
+				}(),
+				connectionString: "simulated://1.2.3.4:42",
+				lock:             lock.NewCASMutex(),
+				queue:            []chan plc4go.PlcConnectionConnectResult{},
+			},
+			wantNotNil: true,
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &connectionContainer{
+				lock:             tt.fields.lock,
+				connectionString: tt.fields.connectionString,
+				driverManager:    tt.fields.driverManager,
+				tracerEnabled:    tt.fields.tracerEnabled,
+				connection:       tt.fields.connection,
+				leaseCounter:     tt.fields.leaseCounter,
+				closed:           tt.fields.closed,
+				state:            tt.fields.state,
+				queue:            tt.fields.queue,
+				listeners:        tt.fields.listeners,
+			}
+			assert.True(t1, tt.wantNotNil, t.lease(), "lease()")
+		})
+	}
+}
+
+func Test_connectionContainer_returnConnection(t1 *testing.T) {
+	type fields struct {
+		lock             lock.RWMutex
+		connectionString string
+		driverManager    plc4go.PlcDriverManager
+		tracerEnabled    bool
+		connection       spi.PlcConnection
+		leaseCounter     uint32
+		closed           bool
+		state            cachedPlcConnectionState
+		queue            []chan plc4go.PlcConnectionConnectResult
+		listeners        []connectionListener
+	}
+	type args struct {
+		state cachedPlcConnectionState
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		args    args
+		wantErr assert.ErrorAssertionFunc
+	}{
+		{
+			name: "return connection fresh",
+			fields: fields{
+				driverManager: func() plc4go.PlcDriverManager {
+					driverManager := plc4go.NewPlcDriverManager()
+					driverManager.RegisterDriver(simulated.NewDriver())
+					return driverManager
+				}(),
+				connectionString: "simulated://1.2.3.4:42",
+				lock:             lock.NewCASMutex(),
+				queue:            []chan plc4go.PlcConnectionConnectResult{},
+			},
+			args: args{
+				state: StateInitialized,
+			},
+			wantErr: assert.NoError,
+		},
+		{
+			name: "return unconnected connection",
+			fields: fields{
+				driverManager: func() plc4go.PlcDriverManager {
+					driverManager := plc4go.NewPlcDriverManager()
+					driverManager.RegisterDriver(simulated.NewDriver())
+					return driverManager
+				}(),
+				connectionString: "simulated://1.2.3.4:42",
+				lock:             lock.NewCASMutex(),
+				queue:            []chan plc4go.PlcConnectionConnectResult{},
+			},
+			args: args{
+				state: StateInUse,
+			},
+			wantErr: assert.Error,
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &connectionContainer{
+				lock:             tt.fields.lock,
+				connectionString: tt.fields.connectionString,
+				driverManager:    tt.fields.driverManager,
+				tracerEnabled:    tt.fields.tracerEnabled,
+				connection:       tt.fields.connection,
+				leaseCounter:     tt.fields.leaseCounter,
+				closed:           tt.fields.closed,
+				state:            tt.fields.state,
+				queue:            tt.fields.queue,
+				listeners:        tt.fields.listeners,
+			}
+			tt.wantErr(t1, t.returnConnection(tt.args.state), fmt.Sprintf("returnConnection(%v)", tt.args.state))
+		})
+	}
+}