You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/18 14:51:10 UTC
[plc4x] 02/02: fix(plc4go): fixed some timer leaks
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 4d22dd159756d97a75592b30adca8f63dff84083
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Aug 18 16:51:01 2022 +0200
fix(plc4go): fixed some timer leaks
---
plc4go/internal/bacnetip/Connection.go | 5 +++-
plc4go/internal/cbus/Connection.go | 9 ++++--
plc4go/internal/simulated/Connection_test.go | 17 ++++++++---
plc4go/internal/simulated/Driver_test.go | 5 +++-
plc4go/internal/simulated/Reader_test.go | 5 +++-
plc4go/internal/simulated/Writer_test.go | 5 +++-
plc4go/pkg/api/cache/plc_connection_cache.go | 25 ++++++++++++----
plc4go/spi/utils/{MultiError.go => Errors.go} | 26 ++++++++++++++++
plc4go/spi/utils/{Utils.go => Misc.go} | 35 +++++++---------------
plc4go/spi/utils/{net.go => Net.go} | 10 +++++--
.../tests/drivers/tests/manual_cbus_driver_test.go | 6 ++--
11 files changed, 104 insertions(+), 44 deletions(-)
diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 33af188a1..d5c583ca9 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
internalModel "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"github.com/rs/zerolog/log"
"sync"
"time"
@@ -81,11 +82,13 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
for c.IsConnected() {
log.Debug().Msg("Polling data")
incomingMessageChannel := c.messageCodec.GetDefaultIncomingMessageChannel()
+ timeout := time.NewTimer(20 * time.Millisecond)
select {
case message := <-incomingMessageChannel:
// TODO: implement mapping to subscribers
log.Info().Msgf("Received \n%v", message)
- case <-time.After(20 * time.Millisecond):
+ utils.CleanupTimer(timeout)
+ case <-timeout.C:
}
}
log.Info().Msg("Ending incoming message transfer")
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index f339eed62..567a689fa 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
internalModel "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"sync"
@@ -294,6 +295,8 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
}
startTime := time.Now()
+ timeout := time.NewTimer(time.Millisecond * 500)
+ defer utils.CleanupTimer(timeout)
select {
case <-receivedResetEchoChan:
log.Debug().Msgf("We received the echo")
@@ -304,7 +307,7 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
log.Trace().Err(err).Msg("connect failed")
}
return false
- case timeout := <-time.After(time.Millisecond * 500):
+ case timeout := <-timeout.C:
if sendOutErrorNotification {
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
} else {
@@ -430,13 +433,15 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon
}
startTime := time.Now()
+ timeout := time.NewTimer(time.Second * 2)
+ defer utils.CleanupTimer(timeout)
select {
case <-directCommandAckChan:
log.Debug().Msgf("We received the ack")
case err := <-directCommandAckErrorChan:
c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch)
return false
- case timeout := <-time.After(time.Second * 2):
+ case timeout := <-timeout.C:
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
return false
}
diff --git a/plc4go/internal/simulated/Connection_test.go b/plc4go/internal/simulated/Connection_test.go
index 57cbf2d26..ff45723f2 100644
--- a/plc4go/internal/simulated/Connection_test.go
+++ b/plc4go/internal/simulated/Connection_test.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
internalModel "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"reflect"
"testing"
"time"
@@ -122,6 +123,8 @@ func TestConnection_Connect(t *testing.T) {
}
timeBeforeConnect := time.Now()
connectionChan := c.Connect()
+ timeout := time.NewTimer(3 * time.Second)
+ defer utils.CleanupTimer(timeout)
select {
case connectResult := <-connectionChan:
timeAfterConnect := time.Now()
@@ -142,7 +145,7 @@ func TestConnection_Connect(t *testing.T) {
t.Errorf("TestConnection.Connect() = %v, want %v", connectResult, tt.want)
}
}
- case <-time.After(3 * time.Second):
+ case <-timeout.C:
t.Errorf("TestConnection.Connect() got timeout")
}
})
@@ -237,6 +240,8 @@ func TestConnection_Close(t *testing.T) {
}
timeBeforeClose := time.Now()
closeChan := c.Close()
+ timeout := time.NewTimer(3 * time.Second)
+ defer utils.CleanupTimer(timeout)
select {
case closeResult := <-closeChan:
timeAfterClose := time.Now()
@@ -256,7 +261,7 @@ func TestConnection_Close(t *testing.T) {
t.Errorf("TestConnection.Close() = %v, want %v", closeResult, tt.want)
}
}
- case <-time.After(3 * time.Second):
+ case <-timeout.C:
t.Errorf("TestConnection.Close() got timeout")
}
})
@@ -330,6 +335,8 @@ func TestConnection_BlockingClose(t *testing.T) {
}()
return ch
}
+ timeout := time.NewTimer(3 * time.Second)
+ defer utils.CleanupTimer(timeout)
select {
case <-executor():
timeAfterClose := time.Now()
@@ -341,7 +348,7 @@ func TestConnection_BlockingClose(t *testing.T) {
t.Errorf("TestConnection.Close() connected too fast. Expected at least %v but connected after %v", tt.delayAtLeast, connectionTime)
}
}
- case <-time.After(3 * time.Second):
+ case <-timeout.C:
t.Errorf("TestConnection.Close() got timeout")
}
})
@@ -499,6 +506,8 @@ func TestConnection_Ping(t *testing.T) {
}
timeBeforePing := time.Now()
pingChan := c.Ping()
+ timeout := time.NewTimer(3 * time.Second)
+ defer utils.CleanupTimer(timeout)
select {
case pingResult := <-pingChan:
timeAfterPing := time.Now()
@@ -513,7 +522,7 @@ func TestConnection_Ping(t *testing.T) {
if !reflect.DeepEqual(pingResult, tt.want) {
t.Errorf("TestConnection.Ping() = %v, want %v", pingResult, tt.want)
}
- case <-time.After(3 * time.Second):
+ case <-timeout.C:
t.Errorf("TestConnection.Ping() got timeout")
}
})
diff --git a/plc4go/internal/simulated/Driver_test.go b/plc4go/internal/simulated/Driver_test.go
index ad34db3ec..92b0b5ae2 100644
--- a/plc4go/internal/simulated/Driver_test.go
+++ b/plc4go/internal/simulated/Driver_test.go
@@ -21,6 +21,7 @@ package simulated
import (
"github.com/apache/plc4x/plc4go/spi/transports"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"net/url"
"testing"
"time"
@@ -91,6 +92,8 @@ func TestDriver_GetConnection(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
d := NewDriver()
connectionChan := d.GetConnection(tt.args.in0, tt.args.in1, tt.args.options)
+ timeout := time.NewTimer(3 * time.Second)
+ defer utils.CleanupTimer(timeout)
select {
case connectResult := <-connectionChan:
if tt.wantErr && (connectResult.GetErr() == nil) {
@@ -98,7 +101,7 @@ func TestDriver_GetConnection(t *testing.T) {
} else if connectResult.GetErr() != nil {
t.Errorf("PlcConnectionPool.GetConnection() error = %v, wantErr %v", connectResult.GetErr(), tt.wantErr)
}
- case <-time.After(3 * time.Second):
+ case <-timeout.C:
t.Errorf("PlcConnectionPool.GetConnection() got timeout")
}
})
diff --git a/plc4go/internal/simulated/Reader_test.go b/plc4go/internal/simulated/Reader_test.go
index f5331b627..f8fab660c 100644
--- a/plc4go/internal/simulated/Reader_test.go
+++ b/plc4go/internal/simulated/Reader_test.go
@@ -27,6 +27,7 @@ import (
model4 "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
model2 "github.com/apache/plc4x/plc4go/protocols/simulated/readwrite/model"
model3 "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/utils"
values2 "github.com/apache/plc4x/plc4go/spi/values"
"reflect"
"testing"
@@ -163,6 +164,8 @@ func TestReader_Read(t *testing.T) {
readRequest := model3.NewDefaultPlcReadRequest(tt.args.fields, tt.args.fieldNames, r, nil)
timeBeforeReadRequest := time.Now()
readResponseChannel := r.Read(context.TODO(), readRequest)
+ timeout := time.NewTimer(3 * time.Second)
+ defer utils.CleanupTimer(timeout)
select {
case readResponse := <-readResponseChannel:
timeAfterReadRequest := time.Now()
@@ -187,7 +190,7 @@ func TestReader_Read(t *testing.T) {
readResponse.GetResponse().GetValue(fieldName), tt.want.GetValue(fieldName))
}
}
- case <-time.After(3 * time.Second):
+ case <-timeout.C:
t.Errorf("Reader.Read() got timeout")
}
})
diff --git a/plc4go/internal/simulated/Writer_test.go b/plc4go/internal/simulated/Writer_test.go
index 9291b801c..88c9caf43 100644
--- a/plc4go/internal/simulated/Writer_test.go
+++ b/plc4go/internal/simulated/Writer_test.go
@@ -27,6 +27,7 @@ import (
model4 "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
model2 "github.com/apache/plc4x/plc4go/protocols/simulated/readwrite/model"
model3 "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/utils"
values2 "github.com/apache/plc4x/plc4go/spi/values"
"reflect"
"testing"
@@ -175,6 +176,8 @@ func TestWriter_Write(t *testing.T) {
writeRequest := model3.NewDefaultPlcWriteRequest(tt.args.fields, tt.args.fieldNames, tt.args.values, w, nil)
timeBeforeWriteRequest := time.Now()
writeResponseChannel := w.Write(context.TODO(), writeRequest)
+ timeout := time.NewTimer(3 * time.Second)
+ defer utils.CleanupTimer(timeout)
select {
case writeResponse := <-writeResponseChannel:
timeAfterWriteRequest := time.Now()
@@ -199,7 +202,7 @@ func TestWriter_Write(t *testing.T) {
t.Errorf("Writer.Write() Device State = %v, want %v",
tt.fields.device.State, tt.newState)
}
- case <-time.After(3 * time.Second):
+ case <-timeout.C:
t.Errorf("Reader.Read() got timeout")
}
})
diff --git a/plc4go/pkg/api/cache/plc_connection_cache.go b/plc4go/pkg/api/cache/plc_connection_cache.go
index a5bb8edbd..fbaf48aad 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/viney-shih/go-lock"
@@ -117,16 +118,20 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
txId = t.tracer.AddTransactionalStartTrace("get-connection", "lease")
}
leaseChan := connection.lease()
+ maximumWaitTimeout := time.NewTimer(t.maxWaitTime)
+ defer utils.CleanupTimer(maximumWaitTimeout)
select {
// Wait till we get a lease.
case connectionResponse := <-leaseChan:
log.Debug().Str("connectionString", connectionString).Msg("Successfully got lease to connection")
+ responseTimeout := time.NewTimer(10 * time.Millisecond)
+ defer utils.CleanupTimer(responseTimeout)
select {
case ch <- connectionResponse:
if t.tracer != nil {
t.tracer.AddTransactionalTrace(txId, "get-connection", "success")
}
- case <-time.After(10 * time.Millisecond):
+ case <-responseTimeout.C:
// Log a message, that the client has given up
if t.tracer != nil {
t.tracer.AddTransactionalTrace(txId, "get-connection", "client given up")
@@ -140,7 +145,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
}
// Timeout after the maximum waiting time.
- case <-time.After(t.maxWaitTime):
+ case <-maximumWaitTimeout.C:
if t.tracer != nil {
t.tracer.AddTransactionalTrace(txId, "get-connection", "timeout")
}
@@ -161,9 +166,11 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
defer t.cacheLock.Unlock()
if len(t.connections) == 0 {
+ responseDeliveryTimeout := time.NewTimer(10 * time.Millisecond)
+ defer utils.CleanupTimer(responseDeliveryTimeout)
select {
case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
- case <-time.After(time.Millisecond * 10):
+ case <-responseDeliveryTimeout.C:
}
log.Debug().Msg("Closing connection cache finished.")
return
@@ -176,6 +183,8 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
// while some go func is still using it.
go func(container *connectionContainer) {
leaseResults := container.lease()
+ closeTimeout := time.NewTimer(t.maxWaitTime)
+ defer utils.CleanupTimer(closeTimeout)
select {
// We're just getting the lease as this way we can be sure nobody else is using it.
// We also really don't care if it worked, or not ... it's just an attempt of being
@@ -185,15 +194,17 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
// Give back the connection.
container.connection.Close()
// If we're timing out brutally kill the connection.
- case <-time.After(t.maxWaitTime):
+ case <-closeTimeout.C:
log.Debug().Str("connectionString", container.connectionString).Msg("Forcefully closing connection ...")
// Forcefully close this connection.
container.connection.Close()
}
+ responseDeliveryTimeout := time.NewTimer(10 * time.Millisecond)
+ defer utils.CleanupTimer(responseDeliveryTimeout)
select {
case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
- case <-time.After(time.Millisecond * 10):
+ case <-responseDeliveryTimeout.C:
}
log.Debug().Msg("Closing connection cache finished.")
}(cc)
@@ -485,9 +496,11 @@ func (t *plcConnectionLease) Close() <-chan plc4go.PlcConnectionCloseResult {
err := t.connectionContainer.returnConnection(newState)
// Finish closing the connection.
+ timeout := time.NewTimer(10 * time.Millisecond)
+ defer utils.CleanupTimer(timeout)
select {
case result <- _default.NewDefaultPlcConnectionCloseResultWithTraces(t, err, traces):
- case <-time.After(time.Millisecond * 10):
+ case <-timeout.C:
}
// Detach the connection from this lease, so it can no longer be used by the client.
diff --git a/plc4go/spi/utils/MultiError.go b/plc4go/spi/utils/Errors.go
similarity index 76%
rename from plc4go/spi/utils/MultiError.go
rename to plc4go/spi/utils/Errors.go
index ba8c1d9e0..03c6e333b 100644
--- a/plc4go/spi/utils/MultiError.go
+++ b/plc4go/spi/utils/Errors.go
@@ -45,3 +45,29 @@ func (m MultiError) Error() string {
return result
}(m.Errors), "\n")
}
+
+type ParseAssertError struct {
+ Message string
+}
+
+func (e ParseAssertError) Error() string {
+ return e.Message
+}
+
+func (e ParseAssertError) Is(target error) bool {
+ _, ok := target.(ParseAssertError)
+ return ok
+}
+
+type ParseValidationError struct {
+ Message string
+}
+
+func (e ParseValidationError) Error() string {
+ return e.Message
+}
+
+func (e ParseValidationError) Is(target error) bool {
+ _, ok := target.(ParseValidationError)
+ return ok
+}
diff --git a/plc4go/spi/utils/Utils.go b/plc4go/spi/utils/Misc.go
similarity index 70%
rename from plc4go/spi/utils/Utils.go
rename to plc4go/spi/utils/Misc.go
index 06985728c..55046851e 100644
--- a/plc4go/spi/utils/Utils.go
+++ b/plc4go/spi/utils/Misc.go
@@ -19,6 +19,8 @@
package utils
+import "time"
+
// InlineIf is basically a inline if like construct for golang
func InlineIf(test bool, a func() interface{}, b func() interface{}) interface{} {
if test {
@@ -28,28 +30,13 @@ func InlineIf(test bool, a func() interface{}, b func() interface{}) interface{}
}
}
-type ParseAssertError struct {
- Message string
-}
-
-func (e ParseAssertError) Error() string {
- return e.Message
-}
-
-func (e ParseAssertError) Is(target error) bool {
- _, ok := target.(ParseAssertError)
- return ok
-}
-
-type ParseValidationError struct {
- Message string
-}
-
-func (e ParseValidationError) Error() string {
- return e.Message
-}
-
-func (e ParseValidationError) Is(target error) bool {
- _, ok := target.(ParseValidationError)
- return ok
+// CleanupTimer stops a timer and purges anything left in the channel
+// and is safe to call even if the channel has already been received
+func CleanupTimer(timer *time.Timer) {
+ if !timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
+ }
}
diff --git a/plc4go/spi/utils/net.go b/plc4go/spi/utils/Net.go
similarity index 97%
rename from plc4go/spi/utils/net.go
rename to plc4go/spi/utils/Net.go
index 476d08bb4..712a1709e 100644
--- a/plc4go/spi/utils/net.go
+++ b/plc4go/spi/utils/Net.go
@@ -143,11 +143,14 @@ func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *n
// Schedule a discovery operation for this ip.
ip := net.IP(arp.SourceProtAddress)
log.Trace().Msgf("Scheduling discovery for IP %s", ip)
+ timeout := time.NewTimer(2 * time.Second)
go func(ip net.IP) {
select {
case <-ctx.Done():
+ CleanupTimer(timeout)
case foundIps <- DuplicateIP(ip):
- case <-time.After(2 * time.Second):
+ CleanupTimer(timeout)
+ case <-timeout.C:
}
}(DuplicateIP(ip))
}
@@ -226,12 +229,15 @@ func lookupIps(ctx context.Context, ipnet *net.IPNet, foundIps chan net.IP, wg *
}
wg.Add(1)
+ timeout := time.NewTimer(2 * time.Second)
go func(ip net.IP) {
defer func() { wg.Done() }()
select {
case <-ctx.Done():
+ CleanupTimer(timeout)
case foundIps <- ip:
- case <-time.After(2 * time.Second):
+ CleanupTimer(timeout)
+ case <-timeout.C:
}
}(DuplicateIP(ip))
log.Trace().Stringer("IP", ip).Msg("Expanded CIDR")
diff --git a/plc4go/tests/drivers/tests/manual_cbus_driver_test.go b/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
index 798111bf7..20d25c5e1 100644
--- a/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
+++ b/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/transports"
"github.com/apache/plc4x/plc4go/spi/testutils"
+ "github.com/apache/plc4x/plc4go/spi/utils"
_ "github.com/apache/plc4x/plc4go/tests/initializetest"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
@@ -73,13 +74,14 @@ func TestManualCBusDriver(t *testing.T) {
Build()
require.NoError(t, err)
subscriptionRequest.Execute()
- timeout := time.After(30 * time.Second)
+ timeout := time.NewTimer(30 * time.Second)
+ defer utils.CleanupTimer(timeout)
// We expect couple monitors
monitorCount := 0
waitingForMonitors:
for {
select {
- case at := <-timeout:
+ case at := <-timeout.C:
t.Errorf("timeout at %s", at)
break waitingForMonitors
case <-gotMonitor: