You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/03/22 09:56:59 UTC
[plc4x] branch develop updated: fix(plc4go/knx): use queues for discovery to not overwhelm small devices
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 55066a78fe fix(plc4go/knx): use queues for discovery to not overwhelm small devices
55066a78fe is described below
commit 55066a78fe4f2dad5fce76fd54da0afa9457144c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Mar 22 10:56:51 2023 +0100
fix(plc4go/knx): use queues for discovery to not overwhelm small devices
---
plc4go/internal/knxnetip/Discoverer.go | 206 +++++++++++++++++++--------------
1 file changed, 117 insertions(+), 89 deletions(-)
diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index 8a31da2de8..44aea6a154 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -23,31 +23,38 @@ import (
"bytes"
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/utils"
+ "github.com/rs/zerolog/log"
"net"
"net/url"
+ "sync"
"time"
- "github.com/apache/plc4x/plc4go/spi/options"
- "github.com/pkg/errors"
-
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
- "github.com/apache/plc4x/plc4go/spi"
internalModel "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/udp"
)
type Discoverer struct {
- messageCodec spi.MessageCodec
+ transportInstanceCreationQueue utils.Executor
+ deviceScanningQueue utils.Executor
}
func NewDiscoverer() *Discoverer {
- return &Discoverer{}
+ return &Discoverer{
+ // TODO: maybe a dynamic executor would be better to not waste cycles when not in use
+ transportInstanceCreationQueue: utils.NewFixedSizeExecutor(50, 100),
+ deviceScanningQueue: utils.NewFixedSizeExecutor(50, 100),
+ }
}
func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
- // TODO: handle ctx
+ d.transportInstanceCreationQueue.Start()
+ d.deviceScanningQueue.Start()
+
udpTransport := udp.NewTransport()
// Create a connection string for the KNX broadcast discovery address.
@@ -79,66 +86,87 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
interfaces = allInterfaces
}
- var transportInstances []transports.TransportInstance
+ transportInstances := make(chan transports.TransportInstance)
+ wg := &sync.WaitGroup{}
// Iterate over all network devices of this system.
- for _, interf := range interfaces {
- addrs, err := interf.Addrs()
+ for _, netInterface := range interfaces {
+ addrs, err := netInterface.Addrs()
if err != nil {
return err
}
- // Iterate over all addresses the current interface has configured
- // For KNX we're only interested in IPv4 addresses, as it doesn't
- // seem to work with IPv6.
- for _, addr := range addrs {
- var ipv4Addr net.IP
- switch addr.(type) {
- // If the device is configured to communicate with a subnet
- case *net.IPNet:
- ipv4Addr = addr.(*net.IPNet).IP.To4()
-
- // If the device is configured for a point-to-point connection
- case *net.IPAddr:
- ipv4Addr = addr.(*net.IPAddr).IP.To4()
- }
-
- // If we found an IPv4 address and this is not a loopback address,
- // add it to the list of devices we will open ports and send discovery
- // messages from.
- if ipv4Addr != nil && !ipv4Addr.IsLoopback() {
- // Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
- transportInstance, err :=
- udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil,
- &net.UDPAddr{IP: ipv4Addr, Port: 0})
- if err != nil {
- return err
+ wg.Add(1)
+ go func(netInterface net.Interface) {
+ defer func() { wg.Done() }()
+ // Iterate over all addresses the current interface has configured
+ // For KNX we're only interested in IPv4 addresses, as it doesn't
+ // seem to work with IPv6.
+ for _, addr := range addrs {
+ var ipv4Addr net.IP
+ switch addr.(type) {
+ // If the device is configured to communicate with a subnet
+ case *net.IPNet:
+ ipv4Addr = addr.(*net.IPNet).IP.To4()
+
+ // If the device is configured for a point-to-point connection
+ case *net.IPAddr:
+ ipv4Addr = addr.(*net.IPAddr).IP.To4()
}
- err = transportInstance.ConnectWithContext(ctx)
- if err != nil {
+
+ // If we found an IPv4 address and this is not a loopback address,
+ // add it to the list of devices we will open ports and send discovery
+ // messages from.
+ if ipv4Addr == nil || ipv4Addr.IsLoopback() {
continue
}
-
- transportInstances = append(transportInstances, transportInstance)
+ d.transportInstanceCreationQueue.Submit(ctx, 0, d.createTransportInstanceDispatcher(ctx, wg, connectionUrl, ipv4Addr, udpTransport, transportInstances))
}
- }
+ }(netInterface)
}
+ go func() {
+ wg.Wait()
+ log.Trace().Msg("Closing transport instance channel")
+ close(transportInstances)
+ }()
+
+ go func() {
+ for transportInstance := range transportInstances {
+ d.deviceScanningQueue.Submit(ctx, 0, d.createDeviceScanDispatcher(transportInstance.(*udp.TransportInstance), callback))
+ }
+ }()
+ return nil
+}
- if len(transportInstances) <= 0 {
- return nil
+func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, connectionUrl *url.URL, ipv4Addr net.IP, udpTransport *udp.Transport, transportInstances chan transports.TransportInstance) utils.Runnable {
+ wg.Add(1)
+ return func() {
+ defer wg.Done()
+ // Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
+ transportInstance, err :=
+ udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil,
+ &net.UDPAddr{IP: ipv4Addr, Port: 0})
+ if err != nil {
+ log.Error().Err(err).Msg("error creating transport instance")
+ return
+ }
+ err = transportInstance.ConnectWithContext(ctx)
+ if err != nil {
+ log.Debug().Err(err).Msg("Error Connecting")
+ return
+ }
+ transportInstances <- transportInstance
}
+}
- for _, transportInstance := range transportInstances {
+func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) utils.Runnable {
+ return func() {
// Create a codec for sending and receiving messages.
- codec := NewMessageCodec(transportInstance, nil)
+ codec := NewMessageCodec(udpTransportInstance, nil)
// Explicitly start the worker
if err := codec.Connect(); err != nil {
- return errors.Wrap(err, "Error connecting")
+ log.Error().Err(err).Msg("Error connecting")
+ return
}
- // Cast to the UDP transport instance, so we can access information on the local port.
- udpTransportInstance, ok := transportInstance.(*udp.TransportInstance)
- if !ok {
- return errors.New("couldn't cast transport instance to UDP transport instance")
- }
localAddress := udpTransportInstance.LocalAddress
localAddr := driverModel.NewIPAddress(localAddress.IP)
@@ -147,49 +175,49 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port))
searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
// Send the search request.
- err = codec.Send(searchRequestMessage)
- go func() {
- // Keep on reading responses till the timeout is done.
- // TODO: Make this configurable
- timeout := time.NewTimer(time.Second * 1)
- timeout.Stop()
- for start := time.Now(); time.Since(start) < time.Second*5; {
- timeout.Reset(time.Second * 1)
- select {
- case message := <-codec.GetDefaultIncomingMessageChannel():
- {
- if !timeout.Stop() {
- <-timeout.C
+ if err := codec.Send(searchRequestMessage); err != nil {
+ log.Debug().Err(err).Msgf("Error sending message:\n%s", searchRequestMessage)
+ return
+ }
+ // Keep on reading responses till the timeout is done.
+ // TODO: Make this configurable
+ timeout := time.NewTimer(time.Second * 1)
+ timeout.Stop()
+ for start := time.Now(); time.Since(start) < time.Second*5; {
+ timeout.Reset(time.Second * 1)
+ select {
+ case message := <-codec.GetDefaultIncomingMessageChannel():
+ {
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ searchResponse := message.(driverModel.SearchResponse)
+ if searchResponse != nil {
+ addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
+ remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
+ addr[0], addr[1], addr[2], addr[3], searchResponse.GetHpaiControlEndpoint().GetIpPort()))
+ if err != nil {
+ continue
}
- searchResponse := message.(driverModel.SearchResponse)
- if searchResponse != nil {
- addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
- remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
- addr[0], addr[1], addr[2], addr[3], searchResponse.GetHpaiControlEndpoint().GetIpPort()))
- if err != nil {
- continue
- }
- deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
- discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{
- ProtocolCode: "knxnet-ip",
- TransportCode: "udp",
- TransportUrl: *remoteUrl,
- Options: nil,
- Name: deviceName,
- }
- // Pass the event back to the callback
- callback(discoveryEvent)
+ deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
+ discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{
+ ProtocolCode: "knxnet-ip",
+ TransportCode: "udp",
+ TransportUrl: *remoteUrl,
+ Options: nil,
+ Name: deviceName,
}
- continue
- }
- case <-timeout.C:
- {
- timeout.Stop()
- continue
+ // Pass the event back to the callback
+ callback(discoveryEvent)
}
+ continue
+ }
+ case <-timeout.C:
+ {
+ timeout.Stop()
+ continue
}
}
- }()
+ }
}
- return nil
}