You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/03/21 14:11:47 UTC

[plc4x] branch develop updated: fix(plc4go/cbus): use queues for discovery to not overwhelm small devices

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7701247420 fix(plc4go/cbus): use queues for discovery to not overwhelm small devices
7701247420 is described below

commit 7701247420650ea179cf305630efb6960f5018bc
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Mar 21 15:11:39 2023 +0100

    fix(plc4go/cbus): use queues for discovery to not overwhelm small devices
---
 plc4go/internal/cbus/Discoverer.go | 242 +++++++++++++++++++------------------
 1 file changed, 124 insertions(+), 118 deletions(-)

diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 786fa5930f..1523cfa17f 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -23,7 +23,6 @@ import (
 	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/transports/tcp"
-	"golang.org/x/sync/semaphore"
 	"net"
 	"net/url"
 	"sync"
@@ -40,16 +39,22 @@ import (
 )
 
 type Discoverer struct {
-	maxConcurrency int64
+	transportInstanceCreationQueue utils.Executor
+	deviceScanningQueue            utils.Executor
 }
 
 func NewDiscoverer() *Discoverer {
 	return &Discoverer{
-		maxConcurrency: 50,
+		// TODO: maybe a dynamic executor would be better to not waste cycles when not in use
+		transportInstanceCreationQueue: utils.NewFixedSizeExecutor(50, 100),
+		deviceScanningQueue:            utils.NewFixedSizeExecutor(50, 100),
 	}
 }
 
 func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
+	d.transportInstanceCreationQueue.Start()
+	d.deviceScanningQueue.Start()
+
 	tcpTransport := tcp.NewTransport()
 
 	allInterfaces, err := net.Interfaces()
@@ -76,7 +81,6 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 	}
 
 	transportInstances := make(chan transports.TransportInstance)
-	sem := semaphore.NewWeighted(d.maxConcurrency)
 	wg := &sync.WaitGroup{}
 	// Iterate over all network devices of this system.
 	for _, netInterface := range interfaces {
@@ -88,8 +92,6 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 		go func(netInterface net.Interface) {
 			defer func() { wg.Done() }()
 			// Iterate over all addresses the current interface has configured
-			// For KNX we're only interested in IPv4 addresses, as it doesn't
-			// seem to work with IPv6.
 			for _, addr := range addrs {
 				var ipv4Addr net.IP
 				switch addr.(type) {
@@ -118,42 +120,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 					defer func() { wg.Done() }()
 					for ip := range addresses {
 						log.Trace().Msgf("Handling found ip %v", ip)
-						wg.Add(1)
-						go func(ip net.IP) {
-							if err := sem.Acquire(ctx, 1); err != nil {
-								log.Debug().Err(err).Msg("Error acquiring")
-								return
-							}
-							defer sem.Release(1)
-							defer wg.Done()
-							// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
-							var connectionUrl url.URL
-							{
-								connectionUrlParsed, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
-								if err != nil {
-									log.Error().Err(err).Msgf("Error parsing url for lookup")
-									return
-								}
-								connectionUrl = *connectionUrlParsed
-							}
-
-							transportInstance, err := tcpTransport.CreateTransportInstance(connectionUrl, nil)
-							if err != nil {
-								log.Error().Err(err).Msgf("Error creating transport instance")
-								return
-							}
-							log.Trace().Msgf("trying %v", connectionUrl)
-							err = transportInstance.ConnectWithContext(ctx)
-							if err != nil {
-								secondErr := transportInstance.ConnectWithContext(ctx)
-								if secondErr != nil {
-									log.Trace().Err(err).Msgf("Error connecting transport instance")
-									return
-								}
-							}
-
-							transportInstances <- transportInstance
-						}(utils.DuplicateIP(ip))
+						d.transportInstanceCreationQueue.Submit(ctx, 0, d.createTransportInstanceDispatcher(ctx, wg, ip, tcpTransport, transportInstances))
 					}
 				}()
 			}
@@ -166,13 +133,52 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 	}()
 
 	for transportInstance := range transportInstances {
-		tcpTransportInstance := transportInstance.(*tcp.TransportInstance)
+		d.deviceScanningQueue.Submit(ctx, 0, d.createDeviceScanDispatcher(transportInstance.(*tcp.TransportInstance), callback))
+	}
+	return nil
+}
+
+func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, ip net.IP, tcpTransport *tcp.Transport, transportInstances chan transports.TransportInstance) utils.Runnable {
+	wg.Add(1)
+	return func() {
+		defer wg.Done()
+		// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
+		var connectionUrl url.URL
+		{
+			connectionUrlParsed, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
+			if err != nil {
+				log.Error().Err(err).Msgf("Error parsing url for lookup")
+				return
+			}
+			connectionUrl = *connectionUrlParsed
+		}
+
+		transportInstance, err := tcpTransport.CreateTransportInstance(connectionUrl, nil)
+		if err != nil {
+			log.Error().Err(err).Msgf("Error creating transport instance")
+			return
+		}
+		log.Trace().Msgf("trying %v", connectionUrl)
+		err = transportInstance.ConnectWithContext(ctx)
+		if err != nil {
+			secondErr := transportInstance.ConnectWithContext(ctx)
+			if secondErr != nil {
+				log.Trace().Err(err).Msgf("Error connecting transport instance")
+				return
+			}
+		}
+		transportInstances <- transportInstance
+	}
+}
+
+func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) utils.Runnable {
+	return func() {
 		// Create a codec for sending and receiving messages.
-		codec := NewMessageCodec(transportInstance)
+		codec := NewMessageCodec(tcpTransportInstance)
 		// Explicitly start the worker
 		if err := codec.Connect(); err != nil {
 			log.Debug().Err(err).Msg("Error connecting")
-			continue
+			return
 		}
 
 		// Prepare the discovery packet data
@@ -183,82 +189,82 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 		request := readWriteModel.NewRequestDirectCommandAccess(calData, alpha, 0x0, nil, nil, readWriteModel.RequestType_DIRECT_COMMAND, readWriteModel.NewRequestTermination(), cBusOptions)
 		cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
 		// Send the search request.
-		err = codec.Send(cBusMessageToServer)
-		go func() {
-			// Keep on reading responses till the timeout is done.
-			// TODO: Make this configurable
-			timeout := time.NewTimer(time.Second * 1)
-			timeout.Stop()
-			for start := time.Now(); time.Since(start) < time.Second*5; {
-				timeout.Reset(time.Second * 1)
-				select {
-				case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
-					if !timeout.Stop() {
-						<-timeout.C
-					}
-					cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
-					if !ok {
-						continue
-					}
-					messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
-					if !ok {
-						continue
-					}
-					replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-					if !ok {
-						continue
-					}
-					if receivedAlpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha(); receivedAlpha != nil && alpha.GetCharacter() != receivedAlpha.GetCharacter() {
-						continue
-					}
-					embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
-					if !ok {
-						continue
-					}
-					encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
-					if !ok {
-						continue
-					}
-					encodedReplyCALReply, ok := encodedReply.GetEncodedReply().(readWriteModel.EncodedReplyCALReplyExactly)
-					if !ok {
-						continue
-					}
-					calDataIdentifyReply, ok := encodedReplyCALReply.GetCalReply().GetCalData().(readWriteModel.CALDataIdentifyReplyExactly)
-					if !ok {
-						continue
-					}
-					identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
-					if !ok {
-						continue
-					}
-					var remoteUrl url.URL
-					{
-						// TODO: we could check for the exact response
-						remoteUrlParse, err := url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
-						if err != nil {
-							log.Error().Err(err).Msg("Error creating url")
-							continue
-						}
-						remoteUrl = *remoteUrlParse
-					}
-					// TODO: manufacturer + type would be good but this means two requests then
-					deviceName := identifyReplyCommand.GetManufacturerName()
-					discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{
-						ProtocolCode:  "c-bus",
-						TransportCode: "tcp",
-						TransportUrl:  remoteUrl,
-						Options:       nil,
-						Name:          deviceName,
-					}
-					// Pass the event back to the callback
-					callback(discoveryEvent)
+		if err := codec.Send(cBusMessageToServer); err != nil {
+			log.Debug().Err(err).Msgf("Error sending message:\n%s", cBusMessageToServer)
+			return
+		}
+		// Keep on reading responses till the timeout is done.
+		// TODO: Make this configurable
+		timeout := time.NewTimer(time.Second * 1)
+		timeout.Stop()
+		for start := time.Now(); time.Since(start) < time.Second*5; {
+			timeout.Reset(time.Second * 1)
+			select {
+			case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
+				if !timeout.Stop() {
+					<-timeout.C
+				}
+				cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
+				if !ok {
+					continue
+				}
+				messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
+				if !ok {
 					continue
-				case <-timeout.C:
-					timeout.Stop()
+				}
+				replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+				if !ok {
+					continue
+				}
+				if receivedAlpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha(); receivedAlpha != nil && alpha.GetCharacter() != receivedAlpha.GetCharacter() {
 					continue
 				}
+				embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+				if !ok {
+					continue
+				}
+				encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
+				if !ok {
+					continue
+				}
+				encodedReplyCALReply, ok := encodedReply.GetEncodedReply().(readWriteModel.EncodedReplyCALReplyExactly)
+				if !ok {
+					continue
+				}
+				calDataIdentifyReply, ok := encodedReplyCALReply.GetCalReply().GetCalData().(readWriteModel.CALDataIdentifyReplyExactly)
+				if !ok {
+					continue
+				}
+				identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
+				if !ok {
+					continue
+				}
+				var remoteUrl url.URL
+				{
+					// TODO: we could check for the exact response
+					remoteUrlParse, err := url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
+					if err != nil {
+						log.Error().Err(err).Msg("Error creating url")
+						continue
+					}
+					remoteUrl = *remoteUrlParse
+				}
+				// TODO: manufacturer + type would be good but this means two requests then
+				deviceName := identifyReplyCommand.GetManufacturerName()
+				discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{
+					ProtocolCode:  "c-bus",
+					TransportCode: "tcp",
+					TransportUrl:  remoteUrl,
+					Options:       nil,
+					Name:          deviceName,
+				}
+				// Pass the event back to the callback
+				callback(discoveryEvent)
+				continue
+			case <-timeout.C:
+				timeout.Stop()
+				continue
 			}
-		}()
+		}
 	}
-	return nil
 }