You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2021/08/21 13:29:09 UTC
[plc4x] branch develop updated: fixing timer leaks (#267)
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new d059709 fixing timer leaks (#267)
d059709 is described below
commit d05970961a4dd92cd52e547a15ca5fdb3030a2ff
Author: Shaun <36...@users.noreply.github.com>
AuthorDate: Sat Aug 21 06:29:02 2021 -0700
fixing timer leaks (#267)
LGTM
---
plc4go/internal/plc4go/knxnetip/Browser.go | 21 ++++++--
plc4go/internal/plc4go/knxnetip/Connection.go | 30 +++++++++---
.../knxnetip/ConnectionDriverSpecificOperations.go | 56 ++++++++++++++++++----
.../internal/plc4go/knxnetip/ConnectionHelper.go | 18 ++++---
.../knxnetip/ConnectionInternalOperations.go | 15 ++++--
plc4go/internal/plc4go/knxnetip/Discoverer.go | 16 +++++--
plc4go/internal/plc4go/knxnetip/Reader.go | 21 ++++++--
plc4go/internal/plc4go/spi/default/DefaultCodec.go | 10 +++-
.../plc4go/spi/default/DefaultConnection.go | 10 +++-
.../plc4go/spi/transports/tcp/Transport.go | 14 ++++--
.../plc4go/spi/transports/udp/Transport.go | 14 ++++--
11 files changed, 173 insertions(+), 52 deletions(-)
diff --git a/plc4go/internal/plc4go/knxnetip/Browser.go b/plc4go/internal/plc4go/knxnetip/Browser.go
index 2aa8d3d..d13ab99 100644
--- a/plc4go/internal/plc4go/knxnetip/Browser.go
+++ b/plc4go/internal/plc4go/knxnetip/Browser.go
@@ -22,6 +22,10 @@ package knxnetip
import (
"encoding/hex"
"fmt"
+ "strconv"
+ "strings"
+ "time"
+
driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
@@ -30,9 +34,6 @@ import (
"github.com/apache/plc4x/plc4go/pkg/plc4go/values"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
- "strconv"
- "strings"
- "time"
)
type Browser struct {
@@ -116,9 +117,13 @@ func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiMod
// Parse each of these expanded addresses and handle them accordingly.
for _, knxAddress := range knxAddresses {
// Send a connection request to the device
+ connectTtlTimer := time.NewTimer(m.connection.defaultTtl)
deviceConnections := m.connection.DeviceConnect(knxAddress)
select {
case deviceConnection := <-deviceConnections:
+ if !connectTtlTimer.Stop() {
+ <-connectTtlTimer.C
+ }
// If the request returned a connection, process it,
// otherwise just ignore it.
if deviceConnection.connection != nil {
@@ -147,14 +152,20 @@ func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiMod
queryResults = append(queryResults, queryResult)
}
+ disconnectTtlTimer := time.NewTimer(m.connection.defaultTtl * 10)
deviceDisconnections := m.connection.DeviceDisconnect(knxAddress)
select {
case _ = <-deviceDisconnections:
- case <-time.After(m.connection.defaultTtl * 10):
+ if !disconnectTtlTimer.Stop() {
+ <-disconnectTtlTimer.C
+ }
+ case <-disconnectTtlTimer.C:
+ disconnectTtlTimer.Stop()
// Just ignore this case ...
}
}
- case <-time.After(m.connection.defaultTtl):
+ case <-connectTtlTimer.C:
+ connectTtlTimer.Stop()
// In this case the remote was just not responding.
}
// Just to slow things down a bit (This way we can't exceed the max number of requests per minute)
diff --git a/plc4go/internal/plc4go/knxnetip/Connection.go b/plc4go/internal/plc4go/knxnetip/Connection.go
index 5a12897..387117b 100644
--- a/plc4go/internal/plc4go/knxnetip/Connection.go
+++ b/plc4go/internal/plc4go/knxnetip/Connection.go
@@ -23,6 +23,11 @@ import (
"bytes"
"encoding/hex"
"fmt"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/interceptors"
@@ -34,10 +39,6 @@ import (
"github.com/apache/plc4x/plc4go/pkg/plc4go/values"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
- "strconv"
- "strings"
- "sync"
- "time"
)
type ConnectionMetadata struct {
@@ -334,11 +335,16 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
}
func (m *Connection) BlockingClose() {
+ ttlTimer := time.NewTimer(m.defaultTtl)
closeResults := m.Close()
select {
case <-closeResults:
+ if !ttlTimer.Stop() {
+ <-ttlTimer.C
+ }
return
- case <-time.After(m.defaultTtl):
+ case <-ttlTimer.C:
+ ttlTimer.Stop()
return
}
}
@@ -354,10 +360,15 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
// Disconnect from all knx devices we are still connected to.
for targetAddress := range m.DeviceConnections {
+ ttlTimer := time.NewTimer(m.defaultTtl)
disconnects := m.DeviceDisconnect(targetAddress)
select {
case _ = <-disconnects:
- case <-time.After(m.defaultTtl):
+ if !ttlTimer.Stop() {
+ <-ttlTimer.C
+ }
+ case <-ttlTimer.C:
+ ttlTimer.Stop()
// If we got a timeout here, well just continue the device will just auto disconnect.
log.Debug().Msgf("Timeout disconnecting from device %s.", KnxAddressToString(&targetAddress))
}
@@ -377,11 +388,16 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
func (m *Connection) IsConnected() bool {
if m.messageCodec != nil {
+ ttlTimer := time.NewTimer(m.defaultTtl)
pingChannel := m.Ping()
select {
case pingResponse := <-pingChannel:
+ if !ttlTimer.Stop() {
+ <-ttlTimer.C
+ }
return pingResponse.Err == nil
- case <-time.After(m.defaultTtl):
+ case <-ttlTimer.C:
+ ttlTimer.Stop()
m.handleTimeout()
return false
}
diff --git a/plc4go/internal/plc4go/knxnetip/ConnectionDriverSpecificOperations.go b/plc4go/internal/plc4go/knxnetip/ConnectionDriverSpecificOperations.go
index 42e3c1e..126c5c4 100644
--- a/plc4go/internal/plc4go/knxnetip/ConnectionDriverSpecificOperations.go
+++ b/plc4go/internal/plc4go/knxnetip/ConnectionDriverSpecificOperations.go
@@ -20,15 +20,16 @@
package knxnetip
import (
+ "math"
+ "strconv"
+ "time"
+
driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
values2 "github.com/apache/plc4x/plc4go/internal/plc4go/spi/values"
"github.com/apache/plc4x/plc4go/pkg/plc4go/values"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
- "math"
- "strconv"
- "time"
)
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -46,13 +47,18 @@ func (m *Connection) ReadGroupAddress(groupAddress []int8, datapointType *driver
result := make(chan KnxReadResult)
sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
+ timeout := time.NewTimer(time.Millisecond * 10)
select {
case result <- KnxReadResult{
value: value,
numItems: numItems,
err: err,
}:
- case <-time.After(time.Millisecond * 10):
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ case <-timeout.C:
+ timeout.Stop()
}
}
@@ -96,12 +102,17 @@ func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan
result := make(chan KnxDeviceConnectResult)
sendResponse := func(connection *KnxDeviceConnection, err error) {
+ timeout := time.NewTimer(time.Millisecond * 10)
select {
case result <- KnxDeviceConnectResult{
connection: connection,
err: err,
}:
- case <-time.After(time.Millisecond * 10):
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ case <-timeout.C:
+ timeout.Stop()
}
}
@@ -182,12 +193,17 @@ func (m *Connection) DeviceDisconnect(targetAddress driverModel.KnxAddress) <-ch
result := make(chan KnxDeviceDisconnectResult)
sendResponse := func(connection *KnxDeviceConnection, err error) {
+ timeout := time.NewTimer(time.Millisecond * 10)
select {
case result <- KnxDeviceDisconnectResult{
connection: connection,
err: err,
}:
- case <-time.After(time.Millisecond * 10):
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ case <-timeout.C:
+ timeout.Stop()
}
}
@@ -211,11 +227,16 @@ func (m *Connection) DeviceAuthenticate(targetAddress driverModel.KnxAddress, bu
result := make(chan KnxDeviceAuthenticateResult)
sendResponse := func(err error) {
+ timeout := time.NewTimer(time.Millisecond * 10)
select {
case result <- KnxDeviceAuthenticateResult{
err: err,
}:
- case <-time.After(time.Millisecond * 10):
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ case <-timeout.C:
+ timeout.Stop()
}
}
@@ -259,13 +280,18 @@ func (m *Connection) DeviceReadProperty(targetAddress driverModel.KnxAddress, ob
result := make(chan KnxReadResult)
sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
+ timeout := time.NewTimer(time.Millisecond * 10)
select {
case result <- KnxReadResult{
value: value,
numItems: numItems,
err: err,
}:
- case <-time.After(time.Millisecond * 10):
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ case <-timeout.C:
+ timeout.Stop()
}
}
@@ -334,13 +360,18 @@ func (m *Connection) DeviceReadPropertyDescriptor(targetAddress driverModel.KnxA
result := make(chan KnxReadResult)
sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
+ timeout := time.NewTimer(time.Millisecond * 10)
select {
case result <- KnxReadResult{
value: value,
numItems: numItems,
err: err,
}:
- case <-time.After(time.Millisecond * 10):
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ case <-timeout.C:
+ timeout.Stop()
}
}
@@ -389,13 +420,18 @@ func (m *Connection) DeviceReadMemory(targetAddress driverModel.KnxAddress, addr
result := make(chan KnxReadResult)
sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
+ timeout := time.NewTimer(time.Millisecond * 10)
select {
case result <- KnxReadResult{
value: value,
numItems: numItems,
err: err,
}:
- case <-time.After(time.Millisecond * 10):
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ case <-timeout.C:
+ timeout.Stop()
}
}
diff --git a/plc4go/internal/plc4go/knxnetip/ConnectionHelper.go b/plc4go/internal/plc4go/knxnetip/ConnectionHelper.go
index 788d4e3..49e0fc3 100644
--- a/plc4go/internal/plc4go/knxnetip/ConnectionHelper.go
+++ b/plc4go/internal/plc4go/knxnetip/ConnectionHelper.go
@@ -21,17 +21,18 @@ package knxnetip
import (
"fmt"
+ "math"
+ "net"
+ "strconv"
+ "sync/atomic"
+ "time"
+
driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports/udp"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
- "math"
- "net"
- "strconv"
- "sync/atomic"
- "time"
)
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -136,7 +137,12 @@ func (m *Connection) handleTimeout() {
func (m *Connection) resetTimeout() {
if m.connectionTimeoutTimer != nil {
- m.connectionTimeoutTimer.Stop()
+ if !m.connectionTimeoutTimer.Stop() {
+ select {
+ case <-m.connectionTimeoutTimer.C:
+ default:
+ }
+ }
m.connectionTimeoutTimer = nil
}
}
diff --git a/plc4go/internal/plc4go/knxnetip/ConnectionInternalOperations.go b/plc4go/internal/plc4go/knxnetip/ConnectionInternalOperations.go
index dea9c78..96de836 100644
--- a/plc4go/internal/plc4go/knxnetip/ConnectionInternalOperations.go
+++ b/plc4go/internal/plc4go/knxnetip/ConnectionInternalOperations.go
@@ -20,12 +20,13 @@
package knxnetip
import (
+ "reflect"
+ "time"
+
driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/plcerrors"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
"github.com/pkg/errors"
- "reflect"
- "time"
)
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -78,13 +79,21 @@ func (m *Connection) sendGatewaySearchRequest() (*driverModel.SearchResponse, er
return nil, errors.Wrap(err, "got error sending search request")
}
+ ttlTimer := time.NewTimer(m.defaultTtl)
select {
case response := <-result:
+ if !ttlTimer.Stop() {
+ <-ttlTimer.C
+ }
return response, nil
case errorResponse := <-errorResult:
+ if !ttlTimer.Stop() {
+ <-ttlTimer.C
+ }
return nil, errorResponse
// For search requests there is no timeout handler running, so we have to do it manually.
- case <-time.After(m.defaultTtl):
+ case <-ttlTimer.C:
+ ttlTimer.Stop()
return nil, errors.New("timeout")
}
}
diff --git a/plc4go/internal/plc4go/knxnetip/Discoverer.go b/plc4go/internal/plc4go/knxnetip/Discoverer.go
index 9f5666c..0e5d552 100644
--- a/plc4go/internal/plc4go/knxnetip/Discoverer.go
+++ b/plc4go/internal/plc4go/knxnetip/Discoverer.go
@@ -23,15 +23,16 @@ import (
"bytes"
"errors"
"fmt"
+ "net"
+ "net/url"
+ "time"
+
driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports/udp"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
- "net"
- "net/url"
- "time"
)
type Discoverer struct {
@@ -123,10 +124,16 @@ func (d *Discoverer) Discover(callback func(event model.PlcDiscoveryEvent)) erro
go func() {
// Keep on reading responses till the timeout is done.
// TODO: Make this configurable
+ timeout := time.NewTimer(time.Second * 1)
+ timeout.Stop()
for start := time.Now(); time.Since(start) < time.Second*5; {
+ timeout.Reset(time.Second * 1)
select {
case message := <-codec.GetDefaultIncomingMessageChannel():
{
+ if !timeout.Stop() {
+ <-timeout.C
+ }
searchResponse := driverModel.CastSearchResponse(message)
if searchResponse != nil {
addr := searchResponse.HpaiControlEndpoint.IpAddress.Addr
@@ -144,8 +151,9 @@ func (d *Discoverer) Discover(callback func(event model.PlcDiscoveryEvent)) erro
}
continue
}
- case <-time.After(time.Second * 1):
+ case <-timeout.C:
{
+ timeout.Stop()
continue
}
}
diff --git a/plc4go/internal/plc4go/knxnetip/Reader.go b/plc4go/internal/plc4go/knxnetip/Reader.go
index 1968432..94525b1 100644
--- a/plc4go/internal/plc4go/knxnetip/Reader.go
+++ b/plc4go/internal/plc4go/knxnetip/Reader.go
@@ -21,15 +21,16 @@ package knxnetip
import (
"errors"
+ "strconv"
+ "strings"
+ "time"
+
driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
internalValues "github.com/apache/plc4x/plc4go/internal/plc4go/spi/values"
apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/plc4go/values"
- "strconv"
- "strings"
- "time"
)
type Reader struct {
@@ -104,9 +105,13 @@ func (m Reader) Read(readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcRea
case DevicePropertyAddressPlcField:
propertyField := field.(DevicePropertyAddressPlcField)
+ timeout := time.NewTimer(m.connection.defaultTtl)
results := m.connection.DeviceReadProperty(deviceAddress, propertyField.ObjectId, propertyField.PropertyId, propertyField.PropertyIndex, propertyField.NumElements)
select {
case result := <-results:
+ if !timeout.Stop() {
+ <-timeout.C
+ }
if result.err == nil {
responseCodes[fieldName] = apiModel.PlcResponseCode_OK
plcValues[fieldName] = *result.value
@@ -114,15 +119,20 @@ func (m Reader) Read(readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcRea
responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
plcValues[fieldName] = nil
}
- case <-time.After(m.connection.defaultTtl):
+ case <-timeout.C:
+ timeout.Stop()
responseCodes[fieldName] = apiModel.PlcResponseCode_REMOTE_BUSY
plcValues[fieldName] = nil
}
case DeviceMemoryAddressPlcField:
+ timeout := time.NewTimer(m.connection.defaultTtl)
memoryField := field.(DeviceMemoryAddressPlcField)
results := m.connection.DeviceReadMemory(deviceAddress, memoryField.Address, memoryField.NumElements, memoryField.FieldType)
select {
case result := <-results:
+ if !timeout.Stop() {
+ <-timeout.C
+ }
if result.err == nil {
responseCodes[fieldName] = apiModel.PlcResponseCode_OK
plcValues[fieldName] = *result.value
@@ -130,7 +140,8 @@ func (m Reader) Read(readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcRea
responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
plcValues[fieldName] = nil
}
- case <-time.After(m.connection.defaultTtl):
+ case <-timeout.C:
+ timeout.Stop()
responseCodes[fieldName] = apiModel.PlcResponseCode_REMOTE_BUSY
plcValues[fieldName] = nil
}
diff --git a/plc4go/internal/plc4go/spi/default/DefaultCodec.go b/plc4go/internal/plc4go/spi/default/DefaultCodec.go
index 1f7310c..8e2bd44 100644
--- a/plc4go/internal/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/internal/plc4go/spi/default/DefaultCodec.go
@@ -21,6 +21,8 @@ package _default
import (
"fmt"
+ "time"
+
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/plcerrors"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
@@ -28,7 +30,6 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
- "time"
)
// DefaultCodecRequirements adds required methods to MessageCodec that are needed when using DefaultCodec
@@ -288,9 +289,14 @@ mainLoop:
// If the message has not been handled and a default handler is provided, call this ...
if !messageHandled {
workerLog.Trace().Msg("Message was not handled")
+ timeout := time.NewTimer(time.Millisecond * 40)
select {
case m.defaultIncomingMessageChannel <- message:
- case <-time.After(time.Millisecond * 40):
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ case <-timeout.C:
+ timeout.Stop()
workerLog.Warn().Msgf("Message discarded %s", message)
}
}
diff --git a/plc4go/internal/plc4go/spi/default/DefaultConnection.go b/plc4go/internal/plc4go/spi/default/DefaultConnection.go
index 069cce8..3651984 100644
--- a/plc4go/internal/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/internal/plc4go/spi/default/DefaultConnection.go
@@ -20,13 +20,14 @@
package _default
import (
+ "time"
+
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/pkg/plc4go"
"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
- "time"
)
// DefaultConnectionRequirements defines the required at a implementing connection when using DefaultConnection
@@ -159,11 +160,16 @@ func (d *defaultConnection) Connect() <-chan plc4go.PlcConnectionConnectResult {
func (d *defaultConnection) BlockingClose() {
log.Trace().Msg("blocking close connection")
closeResults := d.GetConnection().Close()
+ timeout := time.NewTimer(d.GetTtl())
d.SetConnected(false)
select {
case <-closeResults:
+ if !timeout.Stop() {
+ <-timeout.C
+ }
return
- case <-time.After(d.GetTtl()):
+ case <-timeout.C:
+ timeout.Stop()
return
}
}
diff --git a/plc4go/internal/plc4go/spi/transports/tcp/Transport.go b/plc4go/internal/plc4go/spi/transports/tcp/Transport.go
index 52b150d..7262369 100644
--- a/plc4go/internal/plc4go/spi/transports/tcp/Transport.go
+++ b/plc4go/internal/plc4go/spi/transports/tcp/Transport.go
@@ -21,14 +21,15 @@ package tcp
import (
"bufio"
- "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
- "github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
- "github.com/pkg/errors"
"net"
"net/url"
"regexp"
"strconv"
"time"
+
+ "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
+ "github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
+ "github.com/pkg/errors"
)
type Transport struct {
@@ -157,10 +158,15 @@ func (m *TransportInstance) GetNumReadableBytes() (uint32, error) {
_, _ = m.reader.Peek(1)
peekChan <- true
}()
+ timeout := time.NewTimer(time.Millisecond * 10)
select {
case <-peekChan:
+ if !timeout.Stop() {
+ <-timeout.C
+ }
return uint32(m.reader.Buffered()), nil
- case <-time.After(10 * time.Millisecond):
+ case <-timeout.C:
+ timeout.Stop()
return 0, nil
}
}
diff --git a/plc4go/internal/plc4go/spi/transports/udp/Transport.go b/plc4go/internal/plc4go/spi/transports/udp/Transport.go
index 25322e9..ae46473 100644
--- a/plc4go/internal/plc4go/spi/transports/udp/Transport.go
+++ b/plc4go/internal/plc4go/spi/transports/udp/Transport.go
@@ -21,14 +21,15 @@ package udp
import (
"bufio"
- "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
- "github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
- "github.com/pkg/errors"
"net"
"net/url"
"regexp"
"strconv"
"time"
+
+ "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
+ "github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
+ "github.com/pkg/errors"
)
type Transport struct {
@@ -186,10 +187,15 @@ func (m *TransportInstance) GetNumReadableBytes() (uint32, error) {
_, _ = m.reader.Peek(1)
peekChan <- true
}()
+ timeout := time.NewTimer(time.Millisecond * 10)
select {
case <-peekChan:
+ if !timeout.Stop() {
+ <-timeout.C
+ }
return uint32(m.reader.Buffered()), nil
- case <-time.After(10 * time.Millisecond):
+ case <-timeout.C:
+ timeout.Stop()
return 0, nil
}
}