You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/03/22 09:56:59 UTC

[plc4x] branch develop updated: fix(plc4go/knx): use queues for discovery to not overwhelm small devices

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 55066a78fe fix(plc4go/knx): use queues for discovery to not overwhelm small devices
55066a78fe is described below

commit 55066a78fe4f2dad5fce76fd54da0afa9457144c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Mar 22 10:56:51 2023 +0100

    fix(plc4go/knx): use queues for discovery to not overwhelm small devices
---
 plc4go/internal/knxnetip/Discoverer.go | 206 +++++++++++++++++++--------------
 1 file changed, 117 insertions(+), 89 deletions(-)

diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index 8a31da2de8..44aea6a154 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -23,31 +23,38 @@ import (
 	"bytes"
 	"context"
 	"fmt"
+	"github.com/apache/plc4x/plc4go/spi/utils"
+	"github.com/rs/zerolog/log"
 	"net"
 	"net/url"
+	"sync"
 	"time"
 
-	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/pkg/errors"
-
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
-	"github.com/apache/plc4x/plc4go/spi"
 	internalModel "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/transports/udp"
 )
 
 type Discoverer struct {
-	messageCodec spi.MessageCodec
+	transportInstanceCreationQueue utils.Executor
+	deviceScanningQueue            utils.Executor
 }
 
 func NewDiscoverer() *Discoverer {
-	return &Discoverer{}
+	return &Discoverer{
+		// TODO: maybe a dynamic executor would be better to not waste cycles when not in use
+		transportInstanceCreationQueue: utils.NewFixedSizeExecutor(50, 100),
+		deviceScanningQueue:            utils.NewFixedSizeExecutor(50, 100),
+	}
 }
 
 func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
-	// TODO: handle ctx
+	d.transportInstanceCreationQueue.Start()
+	d.deviceScanningQueue.Start()
+
 	udpTransport := udp.NewTransport()
 
 	// Create a connection string for the KNX broadcast discovery address.
@@ -79,66 +86,87 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 		interfaces = allInterfaces
 	}
 
-	var transportInstances []transports.TransportInstance
+	transportInstances := make(chan transports.TransportInstance)
+	wg := &sync.WaitGroup{}
 	// Iterate over all network devices of this system.
-	for _, interf := range interfaces {
-		addrs, err := interf.Addrs()
+	for _, netInterface := range interfaces {
+		addrs, err := netInterface.Addrs()
 		if err != nil {
 			return err
 		}
-		// Iterate over all addresses the current interface has configured
-		// For KNX we're only interested in IPv4 addresses, as it doesn't
-		// seem to work with IPv6.
-		for _, addr := range addrs {
-			var ipv4Addr net.IP
-			switch addr.(type) {
-			// If the device is configured to communicate with a subnet
-			case *net.IPNet:
-				ipv4Addr = addr.(*net.IPNet).IP.To4()
-
-			// If the device is configured for a point-to-point connection
-			case *net.IPAddr:
-				ipv4Addr = addr.(*net.IPAddr).IP.To4()
-			}
-
-			// If we found an IPv4 address and this is not a loopback address,
-			// add it to the list of devices we will open ports and send discovery
-			// messages from.
-			if ipv4Addr != nil && !ipv4Addr.IsLoopback() {
-				// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
-				transportInstance, err :=
-					udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil,
-						&net.UDPAddr{IP: ipv4Addr, Port: 0})
-				if err != nil {
-					return err
+		wg.Add(1)
+		go func(netInterface net.Interface) {
+			defer func() { wg.Done() }()
+			// Iterate over all addresses the current interface has configured
+			// For KNX we're only interested in IPv4 addresses, as it doesn't
+			// seem to work with IPv6.
+			for _, addr := range addrs {
+				var ipv4Addr net.IP
+				switch addr.(type) {
+				// If the device is configured to communicate with a subnet
+				case *net.IPNet:
+					ipv4Addr = addr.(*net.IPNet).IP.To4()
+
+				// If the device is configured for a point-to-point connection
+				case *net.IPAddr:
+					ipv4Addr = addr.(*net.IPAddr).IP.To4()
 				}
-				err = transportInstance.ConnectWithContext(ctx)
-				if err != nil {
+
+				// If we found an IPv4 address and this is not a loopback address,
+				// add it to the list of devices we will open ports and send discovery
+				// messages from.
+				if ipv4Addr == nil || ipv4Addr.IsLoopback() {
 					continue
 				}
-
-				transportInstances = append(transportInstances, transportInstance)
+				d.transportInstanceCreationQueue.Submit(ctx, 0, d.createTransportInstanceDispatcher(ctx, wg, connectionUrl, ipv4Addr, udpTransport, transportInstances))
 			}
-		}
+		}(netInterface)
 	}
+	go func() {
+		wg.Wait()
+		log.Trace().Msg("Closing transport instance channel")
+		close(transportInstances)
+	}()
+
+	go func() {
+		for transportInstance := range transportInstances {
+			d.deviceScanningQueue.Submit(ctx, 0, d.createDeviceScanDispatcher(transportInstance.(*udp.TransportInstance), callback))
+		}
+	}()
+	return nil
+}
 
-	if len(transportInstances) <= 0 {
-		return nil
+func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, connectionUrl *url.URL, ipv4Addr net.IP, udpTransport *udp.Transport, transportInstances chan transports.TransportInstance) utils.Runnable {
+	wg.Add(1)
+	return func() {
+		defer wg.Done()
+		// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
+		transportInstance, err :=
+			udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil,
+				&net.UDPAddr{IP: ipv4Addr, Port: 0})
+		if err != nil {
+			log.Error().Err(err).Msg("error creating transport instance")
+			return
+		}
+		err = transportInstance.ConnectWithContext(ctx)
+		if err != nil {
+			log.Debug().Err(err).Msg("Error Connecting")
+			return
+		}
+		transportInstances <- transportInstance
 	}
+}
 
-	for _, transportInstance := range transportInstances {
+func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) utils.Runnable {
+	return func() {
 		// Create a codec for sending and receiving messages.
-		codec := NewMessageCodec(transportInstance, nil)
+		codec := NewMessageCodec(udpTransportInstance, nil)
 		// Explicitly start the worker
 		if err := codec.Connect(); err != nil {
-			return errors.Wrap(err, "Error connecting")
+			log.Error().Err(err).Msg("Error connecting")
+			return
 		}
 
-		// Cast to the UDP transport instance, so we can access information on the local port.
-		udpTransportInstance, ok := transportInstance.(*udp.TransportInstance)
-		if !ok {
-			return errors.New("couldn't cast transport instance to UDP transport instance")
-		}
 		localAddress := udpTransportInstance.LocalAddress
 		localAddr := driverModel.NewIPAddress(localAddress.IP)
 
@@ -147,49 +175,49 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 			driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port))
 		searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
 		// Send the search request.
-		err = codec.Send(searchRequestMessage)
-		go func() {
-			// Keep on reading responses till the timeout is done.
-			// TODO: Make this configurable
-			timeout := time.NewTimer(time.Second * 1)
-			timeout.Stop()
-			for start := time.Now(); time.Since(start) < time.Second*5; {
-				timeout.Reset(time.Second * 1)
-				select {
-				case message := <-codec.GetDefaultIncomingMessageChannel():
-					{
-						if !timeout.Stop() {
-							<-timeout.C
+		if err := codec.Send(searchRequestMessage); err != nil {
+			log.Debug().Err(err).Msgf("Error sending message:\n%s", searchRequestMessage)
+			return
+		}
+		// Keep on reading responses till the timeout is done.
+		// TODO: Make this configurable
+		timeout := time.NewTimer(time.Second * 1)
+		timeout.Stop()
+		for start := time.Now(); time.Since(start) < time.Second*5; {
+			timeout.Reset(time.Second * 1)
+			select {
+			case message := <-codec.GetDefaultIncomingMessageChannel():
+				{
+					if !timeout.Stop() {
+						<-timeout.C
+					}
+					searchResponse := message.(driverModel.SearchResponse)
+					if searchResponse != nil {
+						addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
+						remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
+							addr[0], addr[1], addr[2], addr[3], searchResponse.GetHpaiControlEndpoint().GetIpPort()))
+						if err != nil {
+							continue
 						}
-						searchResponse := message.(driverModel.SearchResponse)
-						if searchResponse != nil {
-							addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
-							remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
-								addr[0], addr[1], addr[2], addr[3], searchResponse.GetHpaiControlEndpoint().GetIpPort()))
-							if err != nil {
-								continue
-							}
-							deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
-							discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{
-								ProtocolCode:  "knxnet-ip",
-								TransportCode: "udp",
-								TransportUrl:  *remoteUrl,
-								Options:       nil,
-								Name:          deviceName,
-							}
-							// Pass the event back to the callback
-							callback(discoveryEvent)
+						deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
+						discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{
+							ProtocolCode:  "knxnet-ip",
+							TransportCode: "udp",
+							TransportUrl:  *remoteUrl,
+							Options:       nil,
+							Name:          deviceName,
 						}
-						continue
-					}
-				case <-timeout.C:
-					{
-						timeout.Stop()
-						continue
+						// Pass the event back to the callback
+						callback(discoveryEvent)
 					}
+					continue
+				}
+			case <-timeout.C:
+				{
+					timeout.Stop()
+					continue
 				}
 			}
-		}()
+		}
 	}
-	return nil
 }