You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/18 14:51:10 UTC

[plc4x] 02/02: fix(plc4go): fixed some timer leaks

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 4d22dd159756d97a75592b30adca8f63dff84083
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Aug 18 16:51:01 2022 +0200

    fix(plc4go): fixed some timer leaks
---
 plc4go/internal/bacnetip/Connection.go             |  5 +++-
 plc4go/internal/cbus/Connection.go                 |  9 ++++--
 plc4go/internal/simulated/Connection_test.go       | 17 ++++++++---
 plc4go/internal/simulated/Driver_test.go           |  5 +++-
 plc4go/internal/simulated/Reader_test.go           |  5 +++-
 plc4go/internal/simulated/Writer_test.go           |  5 +++-
 plc4go/pkg/api/cache/plc_connection_cache.go       | 25 ++++++++++++----
 plc4go/spi/utils/{MultiError.go => Errors.go}      | 26 ++++++++++++++++
 plc4go/spi/utils/{Utils.go => Misc.go}             | 35 +++++++---------------
 plc4go/spi/utils/{net.go => Net.go}                | 10 +++++--
 .../tests/drivers/tests/manual_cbus_driver_test.go |  6 ++--
 11 files changed, 104 insertions(+), 44 deletions(-)

diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 33af188a1..d5c583ca9 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	internalModel "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/rs/zerolog/log"
 	"sync"
 	"time"
@@ -81,11 +82,13 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 			for c.IsConnected() {
 				log.Debug().Msg("Polling data")
 				incomingMessageChannel := c.messageCodec.GetDefaultIncomingMessageChannel()
+				timeout := time.NewTimer(20 * time.Millisecond)
 				select {
 				case message := <-incomingMessageChannel:
 					// TODO: implement mapping to subscribers
 					log.Info().Msgf("Received \n%v", message)
-				case <-time.After(20 * time.Millisecond):
+					utils.CleanupTimer(timeout)
+				case <-timeout.C:
 				}
 			}
 			log.Info().Msg("Ending incoming message transfer")
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index f339eed62..567a689fa 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	internalModel "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 	"sync"
@@ -294,6 +295,8 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
 	}
 
 	startTime := time.Now()
+	timeout := time.NewTimer(time.Millisecond * 500)
+	defer utils.CleanupTimer(timeout)
 	select {
 	case <-receivedResetEchoChan:
 		log.Debug().Msgf("We received the echo")
@@ -304,7 +307,7 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
 			log.Trace().Err(err).Msg("connect failed")
 		}
 		return false
-	case timeout := <-time.After(time.Millisecond * 500):
+	case timeout := <-timeout.C:
 		if sendOutErrorNotification {
 			c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
 		} else {
@@ -430,13 +433,15 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon
 	}
 
 	startTime := time.Now()
+	timeout := time.NewTimer(time.Second * 2)
+	defer utils.CleanupTimer(timeout)
 	select {
 	case <-directCommandAckChan:
 		log.Debug().Msgf("We received the ack")
 	case err := <-directCommandAckErrorChan:
 		c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch)
 		return false
-	case timeout := <-time.After(time.Second * 2):
+	case timeout := <-timeout.C:
 		c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
 		return false
 	}
diff --git a/plc4go/internal/simulated/Connection_test.go b/plc4go/internal/simulated/Connection_test.go
index 57cbf2d26..ff45723f2 100644
--- a/plc4go/internal/simulated/Connection_test.go
+++ b/plc4go/internal/simulated/Connection_test.go
@@ -25,6 +25,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
 	internalModel "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"reflect"
 	"testing"
 	"time"
@@ -122,6 +123,8 @@ func TestConnection_Connect(t *testing.T) {
 			}
 			timeBeforeConnect := time.Now()
 			connectionChan := c.Connect()
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case connectResult := <-connectionChan:
 				timeAfterConnect := time.Now()
@@ -142,7 +145,7 @@ func TestConnection_Connect(t *testing.T) {
 						t.Errorf("TestConnection.Connect() = %v, want %v", connectResult, tt.want)
 					}
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("TestConnection.Connect() got timeout")
 			}
 		})
@@ -237,6 +240,8 @@ func TestConnection_Close(t *testing.T) {
 			}
 			timeBeforeClose := time.Now()
 			closeChan := c.Close()
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case closeResult := <-closeChan:
 				timeAfterClose := time.Now()
@@ -256,7 +261,7 @@ func TestConnection_Close(t *testing.T) {
 						t.Errorf("TestConnection.Close() = %v, want %v", closeResult, tt.want)
 					}
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("TestConnection.Close() got timeout")
 			}
 		})
@@ -330,6 +335,8 @@ func TestConnection_BlockingClose(t *testing.T) {
 				}()
 				return ch
 			}
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case <-executor():
 				timeAfterClose := time.Now()
@@ -341,7 +348,7 @@ func TestConnection_BlockingClose(t *testing.T) {
 						t.Errorf("TestConnection.Close() connected too fast. Expected at least %v but connected after %v", tt.delayAtLeast, connectionTime)
 					}
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("TestConnection.Close() got timeout")
 			}
 		})
@@ -499,6 +506,8 @@ func TestConnection_Ping(t *testing.T) {
 			}
 			timeBeforePing := time.Now()
 			pingChan := c.Ping()
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case pingResult := <-pingChan:
 				timeAfterPing := time.Now()
@@ -513,7 +522,7 @@ func TestConnection_Ping(t *testing.T) {
 				if !reflect.DeepEqual(pingResult, tt.want) {
 					t.Errorf("TestConnection.Ping() = %v, want %v", pingResult, tt.want)
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("TestConnection.Ping() got timeout")
 			}
 		})
diff --git a/plc4go/internal/simulated/Driver_test.go b/plc4go/internal/simulated/Driver_test.go
index ad34db3ec..92b0b5ae2 100644
--- a/plc4go/internal/simulated/Driver_test.go
+++ b/plc4go/internal/simulated/Driver_test.go
@@ -21,6 +21,7 @@ package simulated
 
 import (
 	"github.com/apache/plc4x/plc4go/spi/transports"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"net/url"
 	"testing"
 	"time"
@@ -91,6 +92,8 @@ func TestDriver_GetConnection(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			d := NewDriver()
 			connectionChan := d.GetConnection(tt.args.in0, tt.args.in1, tt.args.options)
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case connectResult := <-connectionChan:
 				if tt.wantErr && (connectResult.GetErr() == nil) {
@@ -98,7 +101,7 @@ func TestDriver_GetConnection(t *testing.T) {
 				} else if connectResult.GetErr() != nil {
 					t.Errorf("PlcConnectionPool.GetConnection() error = %v, wantErr %v", connectResult.GetErr(), tt.wantErr)
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("PlcConnectionPool.GetConnection() got timeout")
 			}
 		})
diff --git a/plc4go/internal/simulated/Reader_test.go b/plc4go/internal/simulated/Reader_test.go
index f5331b627..f8fab660c 100644
--- a/plc4go/internal/simulated/Reader_test.go
+++ b/plc4go/internal/simulated/Reader_test.go
@@ -27,6 +27,7 @@ import (
 	model4 "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
 	model2 "github.com/apache/plc4x/plc4go/protocols/simulated/readwrite/model"
 	model3 "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	values2 "github.com/apache/plc4x/plc4go/spi/values"
 	"reflect"
 	"testing"
@@ -163,6 +164,8 @@ func TestReader_Read(t *testing.T) {
 			readRequest := model3.NewDefaultPlcReadRequest(tt.args.fields, tt.args.fieldNames, r, nil)
 			timeBeforeReadRequest := time.Now()
 			readResponseChannel := r.Read(context.TODO(), readRequest)
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case readResponse := <-readResponseChannel:
 				timeAfterReadRequest := time.Now()
@@ -187,7 +190,7 @@ func TestReader_Read(t *testing.T) {
 							readResponse.GetResponse().GetValue(fieldName), tt.want.GetValue(fieldName))
 					}
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("Reader.Read() got timeout")
 			}
 		})
diff --git a/plc4go/internal/simulated/Writer_test.go b/plc4go/internal/simulated/Writer_test.go
index 9291b801c..88c9caf43 100644
--- a/plc4go/internal/simulated/Writer_test.go
+++ b/plc4go/internal/simulated/Writer_test.go
@@ -27,6 +27,7 @@ import (
 	model4 "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
 	model2 "github.com/apache/plc4x/plc4go/protocols/simulated/readwrite/model"
 	model3 "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	values2 "github.com/apache/plc4x/plc4go/spi/values"
 	"reflect"
 	"testing"
@@ -175,6 +176,8 @@ func TestWriter_Write(t *testing.T) {
 			writeRequest := model3.NewDefaultPlcWriteRequest(tt.args.fields, tt.args.fieldNames, tt.args.values, w, nil)
 			timeBeforeWriteRequest := time.Now()
 			writeResponseChannel := w.Write(context.TODO(), writeRequest)
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case writeResponse := <-writeResponseChannel:
 				timeAfterWriteRequest := time.Now()
@@ -199,7 +202,7 @@ func TestWriter_Write(t *testing.T) {
 					t.Errorf("Writer.Write() Device State = %v, want %v",
 						tt.fields.device.State, tt.newState)
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("Reader.Read() got timeout")
 			}
 		})
diff --git a/plc4go/pkg/api/cache/plc_connection_cache.go b/plc4go/pkg/api/cache/plc_connection_cache.go
index a5bb8edbd..fbaf48aad 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache.go
@@ -25,6 +25,7 @@ import (
 	"github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 	"github.com/viney-shih/go-lock"
@@ -117,16 +118,20 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
 			txId = t.tracer.AddTransactionalStartTrace("get-connection", "lease")
 		}
 		leaseChan := connection.lease()
+		maximumWaitTimeout := time.NewTimer(t.maxWaitTime)
+		defer utils.CleanupTimer(maximumWaitTimeout)
 		select {
 		// Wait till we get a lease.
 		case connectionResponse := <-leaseChan:
 			log.Debug().Str("connectionString", connectionString).Msg("Successfully got lease to connection")
+			responseTimeout := time.NewTimer(10 * time.Millisecond)
+			defer utils.CleanupTimer(responseTimeout)
 			select {
 			case ch <- connectionResponse:
 				if t.tracer != nil {
 					t.tracer.AddTransactionalTrace(txId, "get-connection", "success")
 				}
-			case <-time.After(10 * time.Millisecond):
+			case <-responseTimeout.C:
 				// Log a message, that the client has given up
 				if t.tracer != nil {
 					t.tracer.AddTransactionalTrace(txId, "get-connection", "client given up")
@@ -140,7 +145,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
 			}
 
 		// Timeout after the maximum waiting time.
-		case <-time.After(t.maxWaitTime):
+		case <-maximumWaitTimeout.C:
 			if t.tracer != nil {
 				t.tracer.AddTransactionalTrace(txId, "get-connection", "timeout")
 			}
@@ -161,9 +166,11 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
 		defer t.cacheLock.Unlock()
 
 		if len(t.connections) == 0 {
+			responseDeliveryTimeout := time.NewTimer(10 * time.Millisecond)
+			defer utils.CleanupTimer(responseDeliveryTimeout)
 			select {
 			case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
-			case <-time.After(time.Millisecond * 10):
+			case <-responseDeliveryTimeout.C:
 			}
 			log.Debug().Msg("Closing connection cache finished.")
 			return
@@ -176,6 +183,8 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
 			// while some go func is still using it.
 			go func(container *connectionContainer) {
 				leaseResults := container.lease()
+				closeTimeout := time.NewTimer(t.maxWaitTime)
+				defer utils.CleanupTimer(closeTimeout)
 				select {
 				// We're just getting the lease as this way we can be sure nobody else is using it.
 				// We also really don't care if it worked, or not ... it's just an attempt of being
@@ -185,15 +194,17 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
 					// Give back the connection.
 					container.connection.Close()
 				// If we're timing out brutally kill the connection.
-				case <-time.After(t.maxWaitTime):
+				case <-closeTimeout.C:
 					log.Debug().Str("connectionString", container.connectionString).Msg("Forcefully closing connection ...")
 					// Forcefully close this connection.
 					container.connection.Close()
 				}
 
+				responseDeliveryTimeout := time.NewTimer(10 * time.Millisecond)
+				defer utils.CleanupTimer(responseDeliveryTimeout)
 				select {
 				case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
-				case <-time.After(time.Millisecond * 10):
+				case <-responseDeliveryTimeout.C:
 				}
 				log.Debug().Msg("Closing connection cache finished.")
 			}(cc)
@@ -485,9 +496,11 @@ func (t *plcConnectionLease) Close() <-chan plc4go.PlcConnectionCloseResult {
 		err := t.connectionContainer.returnConnection(newState)
 
 		// Finish closing the connection.
+		timeout := time.NewTimer(10 * time.Millisecond)
+		defer utils.CleanupTimer(timeout)
 		select {
 		case result <- _default.NewDefaultPlcConnectionCloseResultWithTraces(t, err, traces):
-		case <-time.After(time.Millisecond * 10):
+		case <-timeout.C:
 		}
 
 		// Detach the connection from this lease, so it can no longer be used by the client.
diff --git a/plc4go/spi/utils/MultiError.go b/plc4go/spi/utils/Errors.go
similarity index 76%
rename from plc4go/spi/utils/MultiError.go
rename to plc4go/spi/utils/Errors.go
index ba8c1d9e0..03c6e333b 100644
--- a/plc4go/spi/utils/MultiError.go
+++ b/plc4go/spi/utils/Errors.go
@@ -45,3 +45,29 @@ func (m MultiError) Error() string {
 		return result
 	}(m.Errors), "\n")
 }
+
+type ParseAssertError struct {
+	Message string
+}
+
+func (e ParseAssertError) Error() string {
+	return e.Message
+}
+
+func (e ParseAssertError) Is(target error) bool {
+	_, ok := target.(ParseAssertError)
+	return ok
+}
+
+type ParseValidationError struct {
+	Message string
+}
+
+func (e ParseValidationError) Error() string {
+	return e.Message
+}
+
+func (e ParseValidationError) Is(target error) bool {
+	_, ok := target.(ParseValidationError)
+	return ok
+}
diff --git a/plc4go/spi/utils/Utils.go b/plc4go/spi/utils/Misc.go
similarity index 70%
rename from plc4go/spi/utils/Utils.go
rename to plc4go/spi/utils/Misc.go
index 06985728c..55046851e 100644
--- a/plc4go/spi/utils/Utils.go
+++ b/plc4go/spi/utils/Misc.go
@@ -19,6 +19,8 @@
 
 package utils
 
+import "time"
+
 // InlineIf is basically a inline if like construct for golang
 func InlineIf(test bool, a func() interface{}, b func() interface{}) interface{} {
 	if test {
@@ -28,28 +30,13 @@ func InlineIf(test bool, a func() interface{}, b func() interface{}) interface{}
 	}
 }
 
-type ParseAssertError struct {
-	Message string
-}
-
-func (e ParseAssertError) Error() string {
-	return e.Message
-}
-
-func (e ParseAssertError) Is(target error) bool {
-	_, ok := target.(ParseAssertError)
-	return ok
-}
-
-type ParseValidationError struct {
-	Message string
-}
-
-func (e ParseValidationError) Error() string {
-	return e.Message
-}
-
-func (e ParseValidationError) Is(target error) bool {
-	_, ok := target.(ParseValidationError)
-	return ok
+// CleanupTimer stops a timer and purges anything left in the channel
+//              and is safe to call even if the channel has already been received
+func CleanupTimer(timer *time.Timer) {
+	if !timer.Stop() {
+		select {
+		case <-timer.C:
+		default:
+		}
+	}
 }
diff --git a/plc4go/spi/utils/net.go b/plc4go/spi/utils/Net.go
similarity index 97%
rename from plc4go/spi/utils/net.go
rename to plc4go/spi/utils/Net.go
index 476d08bb4..712a1709e 100644
--- a/plc4go/spi/utils/net.go
+++ b/plc4go/spi/utils/Net.go
@@ -143,11 +143,14 @@ func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *n
 				// Schedule a discovery operation for this ip.
 				ip := net.IP(arp.SourceProtAddress)
 				log.Trace().Msgf("Scheduling discovery for IP %s", ip)
+				timeout := time.NewTimer(2 * time.Second)
 				go func(ip net.IP) {
 					select {
 					case <-ctx.Done():
+						CleanupTimer(timeout)
 					case foundIps <- DuplicateIP(ip):
-					case <-time.After(2 * time.Second):
+						CleanupTimer(timeout)
+					case <-timeout.C:
 					}
 				}(DuplicateIP(ip))
 			}
@@ -226,12 +229,15 @@ func lookupIps(ctx context.Context, ipnet *net.IPNet, foundIps chan net.IP, wg *
 		}
 
 		wg.Add(1)
+		timeout := time.NewTimer(2 * time.Second)
 		go func(ip net.IP) {
 			defer func() { wg.Done() }()
 			select {
 			case <-ctx.Done():
+				CleanupTimer(timeout)
 			case foundIps <- ip:
-			case <-time.After(2 * time.Second):
+				CleanupTimer(timeout)
+			case <-timeout.C:
 			}
 		}(DuplicateIP(ip))
 		log.Trace().Stringer("IP", ip).Msg("Expanded CIDR")
diff --git a/plc4go/tests/drivers/tests/manual_cbus_driver_test.go b/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
index 798111bf7..20d25c5e1 100644
--- a/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
+++ b/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/pkg/api/transports"
 	"github.com/apache/plc4x/plc4go/spi/testutils"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	_ "github.com/apache/plc4x/plc4go/tests/initializetest"
 	"github.com/rs/zerolog"
 	"github.com/rs/zerolog/log"
@@ -73,13 +74,14 @@ func TestManualCBusDriver(t *testing.T) {
 			Build()
 		require.NoError(t, err)
 		subscriptionRequest.Execute()
-		timeout := time.After(30 * time.Second)
+		timeout := time.NewTimer(30 * time.Second)
+		defer utils.CleanupTimer(timeout)
 		// We expect couple monitors
 		monitorCount := 0
 	waitingForMonitors:
 		for {
 			select {
-			case at := <-timeout:
+			case at := <-timeout.C:
 				t.Errorf("timeout at %s", at)
 				break waitingForMonitors
 			case <-gotMonitor: