You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/12 16:41:05 UTC

[plc4x] 04/06: feat(plc4go/cbus): properly implemented Discoverer

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 87763de69b0ca9272a3b9c0dd34bc712d657ac9c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Aug 12 18:37:57 2022 +0200

    feat(plc4go/cbus): properly implemented Discoverer
---
 plc4go/internal/cbus/Discoverer.go | 266 ++++++++++++++++++++-----------------
 1 file changed, 141 insertions(+), 125 deletions(-)

diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index fdce0b5b7..0713ff658 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -35,7 +35,6 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/utils"
 
-	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 )
 
@@ -73,147 +72,164 @@ func (d *Discoverer) Discover(callback func(event apiModel.PlcDiscoveryEvent), d
 		interfaces = allInterfaces
 	}
 
-	var tranportInstances []transports.TransportInstance
+	transportInstances := make(chan transports.TransportInstance)
 	// Iterate over all network devices of this system.
-	for _, interf := range interfaces {
-		addrs, err := interf.Addrs()
+	for _, netInterface := range interfaces {
+		addrs, err := netInterface.Addrs()
 		if err != nil {
 			return err
 		}
-		// Iterate over all addresses the current interface has configured
-		// For KNX we're only interested in IPv4 addresses, as it doesn't
-		// seem to work with IPv6.
-		for _, addr := range addrs {
-			var ipv4Addr net.IP
-			switch addr.(type) {
-			// If the device is configured to communicate with a subnet
-			case *net.IPNet:
-				ipv4Addr = addr.(*net.IPNet).IP.To4()
+		go func(netInterface net.Interface) {
+			// Iterate over all addresses the current interface has configured
+			// For KNX we're only interested in IPv4 addresses, as it doesn't
+			// seem to work with IPv6.
+			for _, addr := range addrs {
+				var ipv4Addr net.IP
+				switch addr.(type) {
+				// If the device is configured to communicate with a subnet
+				case *net.IPNet:
+					ipv4Addr = addr.(*net.IPNet).IP.To4()
 
-			// If the device is configured for a point-to-point connection
-			case *net.IPAddr:
-				ipv4Addr = addr.(*net.IPAddr).IP.To4()
-			}
+				// If the device is configured for a point-to-point connection
+				case *net.IPAddr:
+					ipv4Addr = addr.(*net.IPAddr).IP.To4()
+				}
 
-			// If we found an IPv4 address and this is not a loopback address,
-			// add it to the list of devices we will open ports and send discovery
-			// messages from.
-			if ipv4Addr != nil && !ipv4Addr.IsLoopback() {
-				addresses, err := utils.GetIPAddresses(context.TODO(), interf, false)
+				// If we found an IPv4 address and this is not a loopback address,
+				// add it to the list of devices we will open ports and send discovery
+				// messages from.
+				if ipv4Addr == nil || ipv4Addr.IsLoopback() {
+					continue
+				}
+				addresses, err := utils.GetIPAddresses(context.TODO(), netInterface, false)
 				if err != nil {
-					log.Warn().Err(err).Msgf("Can't get addresses for %s", interf)
+					log.Warn().Err(err).Msgf("Can't get addresses for %s", netInterface)
 					continue
 				}
-				for ip := range addresses {
-					// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
-					connectionUrl, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
-					if err != nil {
-						log.Error().Err(err).Msgf("Error parsing url for lookup")
-						continue
-					}
-					transportInstance, err := tcpTransport.CreateTransportInstance(*connectionUrl, nil)
-					if err != nil {
-						return err
-					}
-					err = transportInstance.Connect()
-					if err != nil {
-						continue
-					}
+				go func() {
+					for ip := range addresses {
+						go func(ip net.IP) {
+							// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
+							connectionUrl, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
+							if err != nil {
+								log.Error().Err(err).Msgf("Error parsing url for lookup")
+								return
+							}
+							transportInstance, err := tcpTransport.CreateTransportInstance(*connectionUrl, nil)
+							if err != nil {
+								log.Error().Err(err).Msgf("Error creating transport instance")
+								return
+							}
+							log.Trace().Msgf("trying %s", connectionUrl)
+							err = transportInstance.Connect()
+							if err != nil {
+								secondErr := transportInstance.Connect()
+								if secondErr != nil {
+									log.Trace().Err(err).Msgf("Error connecting transport instance")
+									return
+								}
+							}
 
-					tranportInstances = append(tranportInstances, transportInstance)
-				}
+							transportInstances <- transportInstance
+						}(utils.DuplicateIP(ip))
+					}
+				}()
 			}
-		}
+		}(netInterface)
 	}
 
-	if len(tranportInstances) <= 0 {
-		return nil
-	}
-	for _, transportInstance := range tranportInstances {
-		tcpTransportInstance := transportInstance.(*tcp.TransportInstance)
-		// Create a codec for sending and receiving messages.
-		codec := NewMessageCodec(transportInstance)
-		// Explicitly start the worker
-		if err := codec.Connect(); err != nil {
-			return errors.Wrap(err, "Error connecting")
-		}
+	go func() {
+		for transportInstance := range transportInstances {
+			tcpTransportInstance := transportInstance.(*tcp.TransportInstance)
+			// Create a codec for sending and receiving messages.
+			codec := NewMessageCodec(transportInstance)
+			// Explicitly start the worker
+			if err := codec.Connect(); err != nil {
+				log.Debug().Err(err).Msg("Error connecting")
+				continue
+			}
 
-		// Prepare the discovery packet data
-		cBusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, true)
-		requestContext := readWriteModel.NewRequestContext(false)
-		calData := readWriteModel.NewCALDataIdentify(readWriteModel.Attribute_Manufacturer, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, requestContext)
-		alpha := readWriteModel.NewAlpha('x')
-		request := readWriteModel.NewRequestDirectCommandAccess(calData, alpha, 0x0, nil, nil, readWriteModel.RequestType_DIRECT_COMMAND, readWriteModel.NewRequestTermination(), cBusOptions)
-		cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
-		// Send the search request.
-		err = codec.Send(cBusMessageToServer)
-		go func() {
-			// Keep on reading responses till the timeout is done.
-			// TODO: Make this configurable
-			timeout := time.NewTimer(time.Second * 1)
-			timeout.Stop()
-			for start := time.Now(); time.Since(start) < time.Second*5; {
-				timeout.Reset(time.Second * 1)
-				select {
-				case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
-					if !timeout.Stop() {
-						<-timeout.C
-					}
-					cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
-					if !ok {
-						continue
-					}
-					messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
-					if !ok {
+			// Prepare the discovery packet data
+			cBusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, true)
+			requestContext := readWriteModel.NewRequestContext(false)
+			calData := readWriteModel.NewCALDataIdentify(readWriteModel.Attribute_Manufacturer, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, requestContext)
+			alpha := readWriteModel.NewAlpha('x')
+			request := readWriteModel.NewRequestDirectCommandAccess(calData, alpha, 0x0, nil, nil, readWriteModel.RequestType_DIRECT_COMMAND, readWriteModel.NewRequestTermination(), cBusOptions)
+			cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
+			// Send the search request.
+			err = codec.Send(cBusMessageToServer)
+			go func() {
+				// Keep on reading responses till the timeout is done.
+				// TODO: Make this configurable
+				timeout := time.NewTimer(time.Second * 1)
+				timeout.Stop()
+				for start := time.Now(); time.Since(start) < time.Second*5; {
+					timeout.Reset(time.Second * 1)
+					select {
+					case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
+						if !timeout.Stop() {
+							<-timeout.C
+						}
+						cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
+						if !ok {
+							continue
+						}
+						messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
+						if !ok {
+							continue
+						}
+						replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+						if !ok {
+							continue
+						}
+						if receivedAlpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha(); receivedAlpha != nil && alpha.GetCharacter() != receivedAlpha.GetCharacter() {
+							continue
+						}
+						embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+						if !ok {
+							continue
+						}
+						encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
+						if !ok {
+							continue
+						}
+						encodedReplyCALReply, ok := encodedReply.GetEncodedReply().(readWriteModel.EncodedReplyCALReplyExactly)
+						if !ok {
+							continue
+						}
+						calDataIdentifyReply, ok := encodedReplyCALReply.GetCalReply().GetCalData().(readWriteModel.CALDataIdentifyReplyExactly)
+						if !ok {
+							continue
+						}
+						identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
+						if !ok {
+							continue
+						}
+						// TODO: we could check for the exact reponse
+						remoteUrl, err := url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
+						if err != nil {
+							log.Error().Err(err).Msg("Error creating url")
+							continue
+						}
+						// TODO: manufaturer + type would be good but this means two requests then
+						deviceName := identifyReplyCommand.GetManufacturerName()
+						discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
+							ProtocolCode:  "c-bus",
+							TransportCode: "tcp",
+							TransportUrl:  *remoteUrl,
+							Options:       nil,
+							Name:          deviceName,
+						}
+						// Pass the event back to the callback
+						callback(discoveryEvent)
 						continue
-					}
-					replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-					if !ok {
+					case <-timeout.C:
+						timeout.Stop()
 						continue
 					}
-					if alpha != replyOrConfirmationConfirmation.GetConfirmation().GetAlpha() {
-						continue
-					}
-					embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
-					if !ok {
-						continue
-					}
-					encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
-					if !ok {
-						continue
-					}
-					calDataIdentifyReply, ok := encodedReply.GetEncodedReply().(readWriteModel.CALDataIdentifyReplyExactly)
-					if !ok {
-						continue
-					}
-					identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
-					if !ok {
-						continue
-					}
-					// TODO: we could check for the exact reponse
-					remoteUrl, err := url.Parse(fmt.Sprintf("tcp://%ds", tcpTransportInstance.RemoteAddress))
-					if err != nil {
-						log.Error().Err(err).Msg("Error creating url")
-						continue
-					}
-					// TODO: manufaturer + type would be good but this means two requests then
-					deviceName := identifyReplyCommand.GetManufacturerName()
-					discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
-						ProtocolCode:  "c-bus",
-						TransportCode: "tcp",
-						TransportUrl:  *remoteUrl,
-						Options:       nil,
-						Name:          deviceName,
-					}
-					// Pass the event back to the callback
-					callback(discoveryEvent)
-					continue
-				case <-timeout.C:
-					timeout.Stop()
-					continue
 				}
-			}
-		}()
-	}
+			}()
+		}
+	}()
 	return nil
 }