You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2021/08/21 13:29:09 UTC

[plc4x] branch develop updated: fixing timer leaks (#267)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new d059709  fixing timer leaks (#267)
d059709 is described below

commit d05970961a4dd92cd52e547a15ca5fdb3030a2ff
Author: Shaun <36...@users.noreply.github.com>
AuthorDate: Sat Aug 21 06:29:02 2021 -0700

    fixing timer leaks (#267)
    
    LGTM
---
 plc4go/internal/plc4go/knxnetip/Browser.go         | 21 ++++++--
 plc4go/internal/plc4go/knxnetip/Connection.go      | 30 +++++++++---
 .../knxnetip/ConnectionDriverSpecificOperations.go | 56 ++++++++++++++++++----
 .../internal/plc4go/knxnetip/ConnectionHelper.go   | 18 ++++---
 .../knxnetip/ConnectionInternalOperations.go       | 15 ++++--
 plc4go/internal/plc4go/knxnetip/Discoverer.go      | 16 +++++--
 plc4go/internal/plc4go/knxnetip/Reader.go          | 21 ++++++--
 plc4go/internal/plc4go/spi/default/DefaultCodec.go | 10 +++-
 .../plc4go/spi/default/DefaultConnection.go        | 10 +++-
 .../plc4go/spi/transports/tcp/Transport.go         | 14 ++++--
 .../plc4go/spi/transports/udp/Transport.go         | 14 ++++--
 11 files changed, 173 insertions(+), 52 deletions(-)

diff --git a/plc4go/internal/plc4go/knxnetip/Browser.go b/plc4go/internal/plc4go/knxnetip/Browser.go
index 2aa8d3d..d13ab99 100644
--- a/plc4go/internal/plc4go/knxnetip/Browser.go
+++ b/plc4go/internal/plc4go/knxnetip/Browser.go
@@ -22,6 +22,10 @@ package knxnetip
 import (
 	"encoding/hex"
 	"fmt"
+	"strconv"
+	"strings"
+	"time"
+
 	driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
@@ -30,9 +34,6 @@ import (
 	"github.com/apache/plc4x/plc4go/pkg/plc4go/values"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
-	"strconv"
-	"strings"
-	"time"
 )
 
 type Browser struct {
@@ -116,9 +117,13 @@ func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiMod
 	// Parse each of these expanded addresses and handle them accordingly.
 	for _, knxAddress := range knxAddresses {
 		// Send a connection request to the device
+		connectTtlTimer := time.NewTimer(m.connection.defaultTtl)
 		deviceConnections := m.connection.DeviceConnect(knxAddress)
 		select {
 		case deviceConnection := <-deviceConnections:
+			if !connectTtlTimer.Stop() {
+				<-connectTtlTimer.C
+			}
 			// If the request returned a connection, process it,
 			// otherwise just ignore it.
 			if deviceConnection.connection != nil {
@@ -147,14 +152,20 @@ func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiMod
 					queryResults = append(queryResults, queryResult)
 				}
 
+				disconnectTtlTimer := time.NewTimer(m.connection.defaultTtl * 10)
 				deviceDisconnections := m.connection.DeviceDisconnect(knxAddress)
 				select {
 				case _ = <-deviceDisconnections:
-				case <-time.After(m.connection.defaultTtl * 10):
+					if !disconnectTtlTimer.Stop() {
+						<-disconnectTtlTimer.C
+					}
+				case <-disconnectTtlTimer.C:
+					disconnectTtlTimer.Stop()
 					// Just ignore this case ...
 				}
 			}
-		case <-time.After(m.connection.defaultTtl):
+		case <-connectTtlTimer.C:
+			connectTtlTimer.Stop()
 			// In this case the remote was just not responding.
 		}
 		// Just to slow things down a bit (This way we can't exceed the max number of requests per minute)
diff --git a/plc4go/internal/plc4go/knxnetip/Connection.go b/plc4go/internal/plc4go/knxnetip/Connection.go
index 5a12897..387117b 100644
--- a/plc4go/internal/plc4go/knxnetip/Connection.go
+++ b/plc4go/internal/plc4go/knxnetip/Connection.go
@@ -23,6 +23,11 @@ import (
 	"bytes"
 	"encoding/hex"
 	"fmt"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+
 	driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/interceptors"
@@ -34,10 +39,6 @@ import (
 	"github.com/apache/plc4x/plc4go/pkg/plc4go/values"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
-	"strconv"
-	"strings"
-	"sync"
-	"time"
 )
 
 type ConnectionMetadata struct {
@@ -334,11 +335,16 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 }
 
 func (m *Connection) BlockingClose() {
+	ttlTimer := time.NewTimer(m.defaultTtl)
 	closeResults := m.Close()
 	select {
 	case <-closeResults:
+		if !ttlTimer.Stop() {
+			<-ttlTimer.C
+		}
 		return
-	case <-time.After(m.defaultTtl):
+	case <-ttlTimer.C:
+		ttlTimer.Stop()
 		return
 	}
 }
@@ -354,10 +360,15 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
 
 		// Disconnect from all knx devices we are still connected to.
 		for targetAddress := range m.DeviceConnections {
+			ttlTimer := time.NewTimer(m.defaultTtl)
 			disconnects := m.DeviceDisconnect(targetAddress)
 			select {
 			case _ = <-disconnects:
-			case <-time.After(m.defaultTtl):
+				if !ttlTimer.Stop() {
+					<-ttlTimer.C
+				}
+			case <-ttlTimer.C:
+				ttlTimer.Stop()
 				// If we got a timeout here, well just continue the device will just auto disconnect.
 				log.Debug().Msgf("Timeout disconnecting from device %s.", KnxAddressToString(&targetAddress))
 			}
@@ -377,11 +388,16 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
 
 func (m *Connection) IsConnected() bool {
 	if m.messageCodec != nil {
+		ttlTimer := time.NewTimer(m.defaultTtl)
 		pingChannel := m.Ping()
 		select {
 		case pingResponse := <-pingChannel:
+			if !ttlTimer.Stop() {
+				<-ttlTimer.C
+			}
 			return pingResponse.Err == nil
-		case <-time.After(m.defaultTtl):
+		case <-ttlTimer.C:
+			ttlTimer.Stop()
 			m.handleTimeout()
 			return false
 		}
diff --git a/plc4go/internal/plc4go/knxnetip/ConnectionDriverSpecificOperations.go b/plc4go/internal/plc4go/knxnetip/ConnectionDriverSpecificOperations.go
index 42e3c1e..126c5c4 100644
--- a/plc4go/internal/plc4go/knxnetip/ConnectionDriverSpecificOperations.go
+++ b/plc4go/internal/plc4go/knxnetip/ConnectionDriverSpecificOperations.go
@@ -20,15 +20,16 @@
 package knxnetip
 
 import (
+	"math"
+	"strconv"
+	"time"
+
 	driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
 	values2 "github.com/apache/plc4x/plc4go/internal/plc4go/spi/values"
 	"github.com/apache/plc4x/plc4go/pkg/plc4go/values"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
-	"math"
-	"strconv"
-	"time"
 )
 
 ///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -46,13 +47,18 @@ func (m *Connection) ReadGroupAddress(groupAddress []int8, datapointType *driver
 	result := make(chan KnxReadResult)
 
 	sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
+		timeout := time.NewTimer(time.Millisecond * 10)
 		select {
 		case result <- KnxReadResult{
 			value:    value,
 			numItems: numItems,
 			err:      err,
 		}:
-		case <-time.After(time.Millisecond * 10):
+			if !timeout.Stop() {
+				<-timeout.C
+			}
+		case <-timeout.C:
+			timeout.Stop()
 		}
 	}
 
@@ -96,12 +102,17 @@ func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan
 	result := make(chan KnxDeviceConnectResult)
 
 	sendResponse := func(connection *KnxDeviceConnection, err error) {
+		timeout := time.NewTimer(time.Millisecond * 10)
 		select {
 		case result <- KnxDeviceConnectResult{
 			connection: connection,
 			err:        err,
 		}:
-		case <-time.After(time.Millisecond * 10):
+			if !timeout.Stop() {
+				<-timeout.C
+			}
+		case <-timeout.C:
+			timeout.Stop()
 		}
 	}
 
@@ -182,12 +193,17 @@ func (m *Connection) DeviceDisconnect(targetAddress driverModel.KnxAddress) <-ch
 	result := make(chan KnxDeviceDisconnectResult)
 
 	sendResponse := func(connection *KnxDeviceConnection, err error) {
+		timeout := time.NewTimer(time.Millisecond * 10)
 		select {
 		case result <- KnxDeviceDisconnectResult{
 			connection: connection,
 			err:        err,
 		}:
-		case <-time.After(time.Millisecond * 10):
+			if !timeout.Stop() {
+				<-timeout.C
+			}
+		case <-timeout.C:
+			timeout.Stop()
 		}
 	}
 
@@ -211,11 +227,16 @@ func (m *Connection) DeviceAuthenticate(targetAddress driverModel.KnxAddress, bu
 	result := make(chan KnxDeviceAuthenticateResult)
 
 	sendResponse := func(err error) {
+		timeout := time.NewTimer(time.Millisecond * 10)
 		select {
 		case result <- KnxDeviceAuthenticateResult{
 			err: err,
 		}:
-		case <-time.After(time.Millisecond * 10):
+			if !timeout.Stop() {
+				<-timeout.C
+			}
+		case <-timeout.C:
+			timeout.Stop()
 		}
 	}
 
@@ -259,13 +280,18 @@ func (m *Connection) DeviceReadProperty(targetAddress driverModel.KnxAddress, ob
 	result := make(chan KnxReadResult)
 
 	sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
+		timeout := time.NewTimer(time.Millisecond * 10)
 		select {
 		case result <- KnxReadResult{
 			value:    value,
 			numItems: numItems,
 			err:      err,
 		}:
-		case <-time.After(time.Millisecond * 10):
+			if !timeout.Stop() {
+				<-timeout.C
+			}
+		case <-timeout.C:
+			timeout.Stop()
 		}
 	}
 
@@ -334,13 +360,18 @@ func (m *Connection) DeviceReadPropertyDescriptor(targetAddress driverModel.KnxA
 	result := make(chan KnxReadResult)
 
 	sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
+		timeout := time.NewTimer(time.Millisecond * 10)
 		select {
 		case result <- KnxReadResult{
 			value:    value,
 			numItems: numItems,
 			err:      err,
 		}:
-		case <-time.After(time.Millisecond * 10):
+			if !timeout.Stop() {
+				<-timeout.C
+			}
+		case <-timeout.C:
+			timeout.Stop()
 		}
 	}
 
@@ -389,13 +420,18 @@ func (m *Connection) DeviceReadMemory(targetAddress driverModel.KnxAddress, addr
 	result := make(chan KnxReadResult)
 
 	sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
+		timeout := time.NewTimer(time.Millisecond * 10)
 		select {
 		case result <- KnxReadResult{
 			value:    value,
 			numItems: numItems,
 			err:      err,
 		}:
-		case <-time.After(time.Millisecond * 10):
+			if !timeout.Stop() {
+				<-timeout.C
+			}
+		case <-timeout.C:
+			timeout.Stop()
 		}
 	}
 
diff --git a/plc4go/internal/plc4go/knxnetip/ConnectionHelper.go b/plc4go/internal/plc4go/knxnetip/ConnectionHelper.go
index 788d4e3..49e0fc3 100644
--- a/plc4go/internal/plc4go/knxnetip/ConnectionHelper.go
+++ b/plc4go/internal/plc4go/knxnetip/ConnectionHelper.go
@@ -21,17 +21,18 @@ package knxnetip
 
 import (
 	"fmt"
+	"math"
+	"net"
+	"strconv"
+	"sync/atomic"
+	"time"
+
 	driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports/udp"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
-	"math"
-	"net"
-	"strconv"
-	"sync/atomic"
-	"time"
 )
 
 ///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -136,7 +137,12 @@ func (m *Connection) handleTimeout() {
 
 func (m *Connection) resetTimeout() {
 	if m.connectionTimeoutTimer != nil {
-		m.connectionTimeoutTimer.Stop()
+		if !m.connectionTimeoutTimer.Stop() {
+			select {
+			case <-m.connectionTimeoutTimer.C:
+			default:
+			}
+		}
 		m.connectionTimeoutTimer = nil
 	}
 }
diff --git a/plc4go/internal/plc4go/knxnetip/ConnectionInternalOperations.go b/plc4go/internal/plc4go/knxnetip/ConnectionInternalOperations.go
index dea9c78..96de836 100644
--- a/plc4go/internal/plc4go/knxnetip/ConnectionInternalOperations.go
+++ b/plc4go/internal/plc4go/knxnetip/ConnectionInternalOperations.go
@@ -20,12 +20,13 @@
 package knxnetip
 
 import (
+	"reflect"
+	"time"
+
 	driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/plcerrors"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
 	"github.com/pkg/errors"
-	"reflect"
-	"time"
 )
 
 ///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -78,13 +79,21 @@ func (m *Connection) sendGatewaySearchRequest() (*driverModel.SearchResponse, er
 		return nil, errors.Wrap(err, "got error sending search request")
 	}
 
+	ttlTimer := time.NewTimer(m.defaultTtl)
 	select {
 	case response := <-result:
+		if !ttlTimer.Stop() {
+			<-ttlTimer.C
+		}
 		return response, nil
 	case errorResponse := <-errorResult:
+		if !ttlTimer.Stop() {
+			<-ttlTimer.C
+		}
 		return nil, errorResponse
 		// For search requests there is no timeout handler running, so we have to do it manually.
-	case <-time.After(m.defaultTtl):
+	case <-ttlTimer.C:
+		ttlTimer.Stop()
 		return nil, errors.New("timeout")
 	}
 }
diff --git a/plc4go/internal/plc4go/knxnetip/Discoverer.go b/plc4go/internal/plc4go/knxnetip/Discoverer.go
index 9f5666c..0e5d552 100644
--- a/plc4go/internal/plc4go/knxnetip/Discoverer.go
+++ b/plc4go/internal/plc4go/knxnetip/Discoverer.go
@@ -23,15 +23,16 @@ import (
 	"bytes"
 	"errors"
 	"fmt"
+	"net"
+	"net/url"
+	"time"
+
 	driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports/udp"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
 	"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
-	"net"
-	"net/url"
-	"time"
 )
 
 type Discoverer struct {
@@ -123,10 +124,16 @@ func (d *Discoverer) Discover(callback func(event model.PlcDiscoveryEvent)) erro
 			go func() {
 				// Keep on reading responses till the timeout is done.
 				// TODO: Make this configurable
+				timeout := time.NewTimer(time.Second * 1)
+				timeout.Stop()
 				for start := time.Now(); time.Since(start) < time.Second*5; {
+					timeout.Reset(time.Second * 1)
 					select {
 					case message := <-codec.GetDefaultIncomingMessageChannel():
 						{
+							if !timeout.Stop() {
+								<-timeout.C
+							}
 							searchResponse := driverModel.CastSearchResponse(message)
 							if searchResponse != nil {
 								addr := searchResponse.HpaiControlEndpoint.IpAddress.Addr
@@ -144,8 +151,9 @@ func (d *Discoverer) Discover(callback func(event model.PlcDiscoveryEvent)) erro
 							}
 							continue
 						}
-					case <-time.After(time.Second * 1):
+					case <-timeout.C:
 						{
+							timeout.Stop()
 							continue
 						}
 					}
diff --git a/plc4go/internal/plc4go/knxnetip/Reader.go b/plc4go/internal/plc4go/knxnetip/Reader.go
index 1968432..94525b1 100644
--- a/plc4go/internal/plc4go/knxnetip/Reader.go
+++ b/plc4go/internal/plc4go/knxnetip/Reader.go
@@ -21,15 +21,16 @@ package knxnetip
 
 import (
 	"errors"
+	"strconv"
+	"strings"
+	"time"
+
 	driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
 	internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
 	internalValues "github.com/apache/plc4x/plc4go/internal/plc4go/spi/values"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model"
 	apiValues "github.com/apache/plc4x/plc4go/pkg/plc4go/values"
-	"strconv"
-	"strings"
-	"time"
 )
 
 type Reader struct {
@@ -104,9 +105,13 @@ func (m Reader) Read(readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcRea
 				case DevicePropertyAddressPlcField:
 					propertyField := field.(DevicePropertyAddressPlcField)
 
+					timeout := time.NewTimer(m.connection.defaultTtl)
 					results := m.connection.DeviceReadProperty(deviceAddress, propertyField.ObjectId, propertyField.PropertyId, propertyField.PropertyIndex, propertyField.NumElements)
 					select {
 					case result := <-results:
+						if !timeout.Stop() {
+							<-timeout.C
+						}
 						if result.err == nil {
 							responseCodes[fieldName] = apiModel.PlcResponseCode_OK
 							plcValues[fieldName] = *result.value
@@ -114,15 +119,20 @@ func (m Reader) Read(readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcRea
 							responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
 							plcValues[fieldName] = nil
 						}
-					case <-time.After(m.connection.defaultTtl):
+					case <-timeout.C:
+						timeout.Stop()
 						responseCodes[fieldName] = apiModel.PlcResponseCode_REMOTE_BUSY
 						plcValues[fieldName] = nil
 					}
 				case DeviceMemoryAddressPlcField:
+					timeout := time.NewTimer(m.connection.defaultTtl)
 					memoryField := field.(DeviceMemoryAddressPlcField)
 					results := m.connection.DeviceReadMemory(deviceAddress, memoryField.Address, memoryField.NumElements, memoryField.FieldType)
 					select {
 					case result := <-results:
+						if !timeout.Stop() {
+							<-timeout.C
+						}
 						if result.err == nil {
 							responseCodes[fieldName] = apiModel.PlcResponseCode_OK
 							plcValues[fieldName] = *result.value
@@ -130,7 +140,8 @@ func (m Reader) Read(readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcRea
 							responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
 							plcValues[fieldName] = nil
 						}
-					case <-time.After(m.connection.defaultTtl):
+					case <-timeout.C:
+						timeout.Stop()
 						responseCodes[fieldName] = apiModel.PlcResponseCode_REMOTE_BUSY
 						plcValues[fieldName] = nil
 					}
diff --git a/plc4go/internal/plc4go/spi/default/DefaultCodec.go b/plc4go/internal/plc4go/spi/default/DefaultCodec.go
index 1f7310c..8e2bd44 100644
--- a/plc4go/internal/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/internal/plc4go/spi/default/DefaultCodec.go
@@ -21,6 +21,8 @@ package _default
 
 import (
 	"fmt"
+	"time"
+
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/plcerrors"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
@@ -28,7 +30,6 @@ import (
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/rs/zerolog/log"
-	"time"
 )
 
 // DefaultCodecRequirements adds required methods to MessageCodec that are needed when using DefaultCodec
@@ -288,9 +289,14 @@ mainLoop:
 		// If the message has not been handled and a default handler is provided, call this ...
 		if !messageHandled {
 			workerLog.Trace().Msg("Message was not handled")
+			timeout := time.NewTimer(time.Millisecond * 40)
 			select {
 			case m.defaultIncomingMessageChannel <- message:
-			case <-time.After(time.Millisecond * 40):
+				if !timeout.Stop() {
+					<-timeout.C
+				}
+			case <-timeout.C:
+				timeout.Stop()
 				workerLog.Warn().Msgf("Message discarded %s", message)
 			}
 		}
diff --git a/plc4go/internal/plc4go/spi/default/DefaultConnection.go b/plc4go/internal/plc4go/spi/default/DefaultConnection.go
index 069cce8..3651984 100644
--- a/plc4go/internal/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/internal/plc4go/spi/default/DefaultConnection.go
@@ -20,13 +20,14 @@
 package _default
 
 import (
+	"time"
+
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/pkg/plc4go"
 	"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
-	"time"
 )
 
 // DefaultConnectionRequirements defines the required at a implementing connection when using DefaultConnection
@@ -159,11 +160,16 @@ func (d *defaultConnection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 func (d *defaultConnection) BlockingClose() {
 	log.Trace().Msg("blocking close connection")
 	closeResults := d.GetConnection().Close()
+	timeout := time.NewTimer(d.GetTtl())
 	d.SetConnected(false)
 	select {
 	case <-closeResults:
+		if !timeout.Stop() {
+			<-timeout.C
+		}
 		return
-	case <-time.After(d.GetTtl()):
+	case <-timeout.C:
+		timeout.Stop()
 		return
 	}
 }
diff --git a/plc4go/internal/plc4go/spi/transports/tcp/Transport.go b/plc4go/internal/plc4go/spi/transports/tcp/Transport.go
index 52b150d..7262369 100644
--- a/plc4go/internal/plc4go/spi/transports/tcp/Transport.go
+++ b/plc4go/internal/plc4go/spi/transports/tcp/Transport.go
@@ -21,14 +21,15 @@ package tcp
 
 import (
 	"bufio"
-	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
-	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
-	"github.com/pkg/errors"
 	"net"
 	"net/url"
 	"regexp"
 	"strconv"
 	"time"
+
+	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
+	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
+	"github.com/pkg/errors"
 )
 
 type Transport struct {
@@ -157,10 +158,15 @@ func (m *TransportInstance) GetNumReadableBytes() (uint32, error) {
 		_, _ = m.reader.Peek(1)
 		peekChan <- true
 	}()
+	timeout := time.NewTimer(time.Millisecond * 10)
 	select {
 	case <-peekChan:
+		if !timeout.Stop() {
+			<-timeout.C
+		}
 		return uint32(m.reader.Buffered()), nil
-	case <-time.After(10 * time.Millisecond):
+	case <-timeout.C:
+		timeout.Stop()
 		return 0, nil
 	}
 }
diff --git a/plc4go/internal/plc4go/spi/transports/udp/Transport.go b/plc4go/internal/plc4go/spi/transports/udp/Transport.go
index 25322e9..ae46473 100644
--- a/plc4go/internal/plc4go/spi/transports/udp/Transport.go
+++ b/plc4go/internal/plc4go/spi/transports/udp/Transport.go
@@ -21,14 +21,15 @@ package udp
 
 import (
 	"bufio"
-	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
-	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
-	"github.com/pkg/errors"
 	"net"
 	"net/url"
 	"regexp"
 	"strconv"
 	"time"
+
+	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
+	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
+	"github.com/pkg/errors"
 )
 
 type Transport struct {
@@ -186,10 +187,15 @@ func (m *TransportInstance) GetNumReadableBytes() (uint32, error) {
 		_, _ = m.reader.Peek(1)
 		peekChan <- true
 	}()
+	timeout := time.NewTimer(time.Millisecond * 10)
 	select {
 	case <-peekChan:
+		if !timeout.Stop() {
+			<-timeout.C
+		}
 		return uint32(m.reader.Buffered()), nil
-	case <-time.After(10 * time.Millisecond):
+	case <-timeout.C:
+		timeout.Stop()
 		return 0, nil
 	}
 }