You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/03/21 14:11:47 UTC
[plc4x] branch develop updated: fix(plc4go/cbus): use queues for discovery to not overwhelm small devices
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 7701247420 fix(plc4go/cbus): use queues for discovery to not overwhelm small devices
7701247420 is described below
commit 7701247420650ea179cf305630efb6960f5018bc
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Mar 21 15:11:39 2023 +0100
fix(plc4go/cbus): use queues for discovery to not overwhelm small devices
---
plc4go/internal/cbus/Discoverer.go | 242 +++++++++++++++++++------------------
1 file changed, 124 insertions(+), 118 deletions(-)
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 786fa5930f..1523cfa17f 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -23,7 +23,6 @@ import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/transports/tcp"
- "golang.org/x/sync/semaphore"
"net"
"net/url"
"sync"
@@ -40,16 +39,22 @@ import (
)
type Discoverer struct {
- maxConcurrency int64
+ transportInstanceCreationQueue utils.Executor
+ deviceScanningQueue utils.Executor
}
func NewDiscoverer() *Discoverer {
return &Discoverer{
- maxConcurrency: 50,
+ // TODO: maybe a dynamic executor would be better to not waste cycles when not in use
+ transportInstanceCreationQueue: utils.NewFixedSizeExecutor(50, 100),
+ deviceScanningQueue: utils.NewFixedSizeExecutor(50, 100),
}
}
func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
+ d.transportInstanceCreationQueue.Start()
+ d.deviceScanningQueue.Start()
+
tcpTransport := tcp.NewTransport()
allInterfaces, err := net.Interfaces()
@@ -76,7 +81,6 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}
transportInstances := make(chan transports.TransportInstance)
- sem := semaphore.NewWeighted(d.maxConcurrency)
wg := &sync.WaitGroup{}
// Iterate over all network devices of this system.
for _, netInterface := range interfaces {
@@ -88,8 +92,6 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
go func(netInterface net.Interface) {
defer func() { wg.Done() }()
// Iterate over all addresses the current interface has configured
- // For KNX we're only interested in IPv4 addresses, as it doesn't
- // seem to work with IPv6.
for _, addr := range addrs {
var ipv4Addr net.IP
switch addr.(type) {
@@ -118,42 +120,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
defer func() { wg.Done() }()
for ip := range addresses {
log.Trace().Msgf("Handling found ip %v", ip)
- wg.Add(1)
- go func(ip net.IP) {
- if err := sem.Acquire(ctx, 1); err != nil {
- log.Debug().Err(err).Msg("Error acquiring")
- return
- }
- defer sem.Release(1)
- defer wg.Done()
- // Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
- var connectionUrl url.URL
- {
- connectionUrlParsed, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
- if err != nil {
- log.Error().Err(err).Msgf("Error parsing url for lookup")
- return
- }
- connectionUrl = *connectionUrlParsed
- }
-
- transportInstance, err := tcpTransport.CreateTransportInstance(connectionUrl, nil)
- if err != nil {
- log.Error().Err(err).Msgf("Error creating transport instance")
- return
- }
- log.Trace().Msgf("trying %v", connectionUrl)
- err = transportInstance.ConnectWithContext(ctx)
- if err != nil {
- secondErr := transportInstance.ConnectWithContext(ctx)
- if secondErr != nil {
- log.Trace().Err(err).Msgf("Error connecting transport instance")
- return
- }
- }
-
- transportInstances <- transportInstance
- }(utils.DuplicateIP(ip))
+ d.transportInstanceCreationQueue.Submit(ctx, 0, d.createTransportInstanceDispatcher(ctx, wg, ip, tcpTransport, transportInstances))
}
}()
}
@@ -166,13 +133,52 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}()
for transportInstance := range transportInstances {
- tcpTransportInstance := transportInstance.(*tcp.TransportInstance)
+ d.deviceScanningQueue.Submit(ctx, 0, d.createDeviceScanDispatcher(transportInstance.(*tcp.TransportInstance), callback))
+ }
+ return nil
+}
+
+func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, ip net.IP, tcpTransport *tcp.Transport, transportInstances chan transports.TransportInstance) utils.Runnable {
+ wg.Add(1)
+ return func() {
+ defer wg.Done()
+ // Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
+ var connectionUrl url.URL
+ {
+ connectionUrlParsed, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
+ if err != nil {
+ log.Error().Err(err).Msgf("Error parsing url for lookup")
+ return
+ }
+ connectionUrl = *connectionUrlParsed
+ }
+
+ transportInstance, err := tcpTransport.CreateTransportInstance(connectionUrl, nil)
+ if err != nil {
+ log.Error().Err(err).Msgf("Error creating transport instance")
+ return
+ }
+ log.Trace().Msgf("trying %v", connectionUrl)
+ err = transportInstance.ConnectWithContext(ctx)
+ if err != nil {
+ secondErr := transportInstance.ConnectWithContext(ctx)
+ if secondErr != nil {
+ log.Trace().Err(err).Msgf("Error connecting transport instance")
+ return
+ }
+ }
+ transportInstances <- transportInstance
+ }
+}
+
+func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) utils.Runnable {
+ return func() {
// Create a codec for sending and receiving messages.
- codec := NewMessageCodec(transportInstance)
+ codec := NewMessageCodec(tcpTransportInstance)
// Explicitly start the worker
if err := codec.Connect(); err != nil {
log.Debug().Err(err).Msg("Error connecting")
- continue
+ return
}
// Prepare the discovery packet data
@@ -183,82 +189,82 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
request := readWriteModel.NewRequestDirectCommandAccess(calData, alpha, 0x0, nil, nil, readWriteModel.RequestType_DIRECT_COMMAND, readWriteModel.NewRequestTermination(), cBusOptions)
cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
// Send the search request.
- err = codec.Send(cBusMessageToServer)
- go func() {
- // Keep on reading responses till the timeout is done.
- // TODO: Make this configurable
- timeout := time.NewTimer(time.Second * 1)
- timeout.Stop()
- for start := time.Now(); time.Since(start) < time.Second*5; {
- timeout.Reset(time.Second * 1)
- select {
- case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
- if !timeout.Stop() {
- <-timeout.C
- }
- cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
- if !ok {
- continue
- }
- messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
- if !ok {
- continue
- }
- replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
- if !ok {
- continue
- }
- if receivedAlpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha(); receivedAlpha != nil && alpha.GetCharacter() != receivedAlpha.GetCharacter() {
- continue
- }
- embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
- if !ok {
- continue
- }
- encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
- if !ok {
- continue
- }
- encodedReplyCALReply, ok := encodedReply.GetEncodedReply().(readWriteModel.EncodedReplyCALReplyExactly)
- if !ok {
- continue
- }
- calDataIdentifyReply, ok := encodedReplyCALReply.GetCalReply().GetCalData().(readWriteModel.CALDataIdentifyReplyExactly)
- if !ok {
- continue
- }
- identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
- if !ok {
- continue
- }
- var remoteUrl url.URL
- {
- // TODO: we could check for the exact response
- remoteUrlParse, err := url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
- if err != nil {
- log.Error().Err(err).Msg("Error creating url")
- continue
- }
- remoteUrl = *remoteUrlParse
- }
- // TODO: manufacturer + type would be good but this means two requests then
- deviceName := identifyReplyCommand.GetManufacturerName()
- discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{
- ProtocolCode: "c-bus",
- TransportCode: "tcp",
- TransportUrl: remoteUrl,
- Options: nil,
- Name: deviceName,
- }
- // Pass the event back to the callback
- callback(discoveryEvent)
+ if err := codec.Send(cBusMessageToServer); err != nil {
+ log.Debug().Err(err).Msgf("Error sending message:\n%s", cBusMessageToServer)
+ return
+ }
+ // Keep on reading responses till the timeout is done.
+ // TODO: Make this configurable
+ timeout := time.NewTimer(time.Second * 1)
+ timeout.Stop()
+ for start := time.Now(); time.Since(start) < time.Second*5; {
+ timeout.Reset(time.Second * 1)
+ select {
+ case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
+ if !ok {
+ continue
+ }
+ messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
+ if !ok {
continue
- case <-timeout.C:
- timeout.Stop()
+ }
+ replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !ok {
+ continue
+ }
+ if receivedAlpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha(); receivedAlpha != nil && alpha.GetCharacter() != receivedAlpha.GetCharacter() {
continue
}
+ embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+ if !ok {
+ continue
+ }
+ encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
+ if !ok {
+ continue
+ }
+ encodedReplyCALReply, ok := encodedReply.GetEncodedReply().(readWriteModel.EncodedReplyCALReplyExactly)
+ if !ok {
+ continue
+ }
+ calDataIdentifyReply, ok := encodedReplyCALReply.GetCalReply().GetCalData().(readWriteModel.CALDataIdentifyReplyExactly)
+ if !ok {
+ continue
+ }
+ identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
+ if !ok {
+ continue
+ }
+ var remoteUrl url.URL
+ {
+ // TODO: we could check for the exact response
+ remoteUrlParse, err := url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
+ if err != nil {
+ log.Error().Err(err).Msg("Error creating url")
+ continue
+ }
+ remoteUrl = *remoteUrlParse
+ }
+ // TODO: manufacturer + type would be good but this means two requests then
+ deviceName := identifyReplyCommand.GetManufacturerName()
+ discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{
+ ProtocolCode: "c-bus",
+ TransportCode: "tcp",
+ TransportUrl: remoteUrl,
+ Options: nil,
+ Name: deviceName,
+ }
+ // Pass the event back to the callback
+ callback(discoveryEvent)
+ continue
+ case <-timeout.C:
+ timeout.Stop()
+ continue
}
- }()
+ }
}
- return nil
}