You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/12 16:41:05 UTC
[plc4x] 04/06: feat(plc4go/cbus): properly implemented Discoverer
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 87763de69b0ca9272a3b9c0dd34bc712d657ac9c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Aug 12 18:37:57 2022 +0200
feat(plc4go/cbus): properly implemented Discoverer
---
plc4go/internal/cbus/Discoverer.go | 266 ++++++++++++++++++++-----------------
1 file changed, 141 insertions(+), 125 deletions(-)
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index fdce0b5b7..0713ff658 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -35,7 +35,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/utils"
- "github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -73,147 +72,164 @@ func (d *Discoverer) Discover(callback func(event apiModel.PlcDiscoveryEvent), d
interfaces = allInterfaces
}
- var tranportInstances []transports.TransportInstance
+ transportInstances := make(chan transports.TransportInstance)
// Iterate over all network devices of this system.
- for _, interf := range interfaces {
- addrs, err := interf.Addrs()
+ for _, netInterface := range interfaces {
+ addrs, err := netInterface.Addrs()
if err != nil {
return err
}
- // Iterate over all addresses the current interface has configured
- // For KNX we're only interested in IPv4 addresses, as it doesn't
- // seem to work with IPv6.
- for _, addr := range addrs {
- var ipv4Addr net.IP
- switch addr.(type) {
- // If the device is configured to communicate with a subnet
- case *net.IPNet:
- ipv4Addr = addr.(*net.IPNet).IP.To4()
+ go func(netInterface net.Interface) {
+ // Iterate over all addresses the current interface has configured
+ // For KNX we're only interested in IPv4 addresses, as it doesn't
+ // seem to work with IPv6.
+ for _, addr := range addrs {
+ var ipv4Addr net.IP
+ switch addr.(type) {
+ // If the device is configured to communicate with a subnet
+ case *net.IPNet:
+ ipv4Addr = addr.(*net.IPNet).IP.To4()
- // If the device is configured for a point-to-point connection
- case *net.IPAddr:
- ipv4Addr = addr.(*net.IPAddr).IP.To4()
- }
+ // If the device is configured for a point-to-point connection
+ case *net.IPAddr:
+ ipv4Addr = addr.(*net.IPAddr).IP.To4()
+ }
- // If we found an IPv4 address and this is not a loopback address,
- // add it to the list of devices we will open ports and send discovery
- // messages from.
- if ipv4Addr != nil && !ipv4Addr.IsLoopback() {
- addresses, err := utils.GetIPAddresses(context.TODO(), interf, false)
+ // If we found an IPv4 address and this is not a loopback address,
+ // add it to the list of devices we will open ports and send discovery
+ // messages from.
+ if ipv4Addr == nil || ipv4Addr.IsLoopback() {
+ continue
+ }
+ addresses, err := utils.GetIPAddresses(context.TODO(), netInterface, false)
if err != nil {
- log.Warn().Err(err).Msgf("Can't get addresses for %s", interf)
+ log.Warn().Err(err).Msgf("Can't get addresses for %s", netInterface)
continue
}
- for ip := range addresses {
- // Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
- connectionUrl, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
- if err != nil {
- log.Error().Err(err).Msgf("Error parsing url for lookup")
- continue
- }
- transportInstance, err := tcpTransport.CreateTransportInstance(*connectionUrl, nil)
- if err != nil {
- return err
- }
- err = transportInstance.Connect()
- if err != nil {
- continue
- }
+ go func() {
+ for ip := range addresses {
+ go func(ip net.IP) {
+ // Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
+ connectionUrl, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
+ if err != nil {
+ log.Error().Err(err).Msgf("Error parsing url for lookup")
+ return
+ }
+ transportInstance, err := tcpTransport.CreateTransportInstance(*connectionUrl, nil)
+ if err != nil {
+ log.Error().Err(err).Msgf("Error creating transport instance")
+ return
+ }
+ log.Trace().Msgf("trying %s", connectionUrl)
+ err = transportInstance.Connect()
+ if err != nil {
+ secondErr := transportInstance.Connect()
+ if secondErr != nil {
+ log.Trace().Err(err).Msgf("Error connecting transport instance")
+ return
+ }
+ }
- tranportInstances = append(tranportInstances, transportInstance)
- }
+ transportInstances <- transportInstance
+ }(utils.DuplicateIP(ip))
+ }
+ }()
}
- }
+ }(netInterface)
}
- if len(tranportInstances) <= 0 {
- return nil
- }
- for _, transportInstance := range tranportInstances {
- tcpTransportInstance := transportInstance.(*tcp.TransportInstance)
- // Create a codec for sending and receiving messages.
- codec := NewMessageCodec(transportInstance)
- // Explicitly start the worker
- if err := codec.Connect(); err != nil {
- return errors.Wrap(err, "Error connecting")
- }
+ go func() {
+ for transportInstance := range transportInstances {
+ tcpTransportInstance := transportInstance.(*tcp.TransportInstance)
+ // Create a codec for sending and receiving messages.
+ codec := NewMessageCodec(transportInstance)
+ // Explicitly start the worker
+ if err := codec.Connect(); err != nil {
+ log.Debug().Err(err).Msg("Error connecting")
+ continue
+ }
- // Prepare the discovery packet data
- cBusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, true)
- requestContext := readWriteModel.NewRequestContext(false)
- calData := readWriteModel.NewCALDataIdentify(readWriteModel.Attribute_Manufacturer, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, requestContext)
- alpha := readWriteModel.NewAlpha('x')
- request := readWriteModel.NewRequestDirectCommandAccess(calData, alpha, 0x0, nil, nil, readWriteModel.RequestType_DIRECT_COMMAND, readWriteModel.NewRequestTermination(), cBusOptions)
- cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
- // Send the search request.
- err = codec.Send(cBusMessageToServer)
- go func() {
- // Keep on reading responses till the timeout is done.
- // TODO: Make this configurable
- timeout := time.NewTimer(time.Second * 1)
- timeout.Stop()
- for start := time.Now(); time.Since(start) < time.Second*5; {
- timeout.Reset(time.Second * 1)
- select {
- case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
- if !timeout.Stop() {
- <-timeout.C
- }
- cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
- if !ok {
- continue
- }
- messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
- if !ok {
+ // Prepare the discovery packet data
+ cBusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, true)
+ requestContext := readWriteModel.NewRequestContext(false)
+ calData := readWriteModel.NewCALDataIdentify(readWriteModel.Attribute_Manufacturer, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, requestContext)
+ alpha := readWriteModel.NewAlpha('x')
+ request := readWriteModel.NewRequestDirectCommandAccess(calData, alpha, 0x0, nil, nil, readWriteModel.RequestType_DIRECT_COMMAND, readWriteModel.NewRequestTermination(), cBusOptions)
+ cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
+ // Send the search request.
+ err = codec.Send(cBusMessageToServer)
+ go func() {
+ // Keep on reading responses till the timeout is done.
+ // TODO: Make this configurable
+ timeout := time.NewTimer(time.Second * 1)
+ timeout.Stop()
+ for start := time.Now(); time.Since(start) < time.Second*5; {
+ timeout.Reset(time.Second * 1)
+ select {
+ case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
+ if !ok {
+ continue
+ }
+ messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
+ if !ok {
+ continue
+ }
+ replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !ok {
+ continue
+ }
+ if receivedAlpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha(); receivedAlpha != nil && alpha.GetCharacter() != receivedAlpha.GetCharacter() {
+ continue
+ }
+ embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+ if !ok {
+ continue
+ }
+ encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
+ if !ok {
+ continue
+ }
+ encodedReplyCALReply, ok := encodedReply.GetEncodedReply().(readWriteModel.EncodedReplyCALReplyExactly)
+ if !ok {
+ continue
+ }
+ calDataIdentifyReply, ok := encodedReplyCALReply.GetCalReply().GetCalData().(readWriteModel.CALDataIdentifyReplyExactly)
+ if !ok {
+ continue
+ }
+ identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
+ if !ok {
+ continue
+ }
+ // TODO: we could check for the exact reponse
+ remoteUrl, err := url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
+ if err != nil {
+ log.Error().Err(err).Msg("Error creating url")
+ continue
+ }
+ // TODO: manufaturer + type would be good but this means two requests then
+ deviceName := identifyReplyCommand.GetManufacturerName()
+ discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
+ ProtocolCode: "c-bus",
+ TransportCode: "tcp",
+ TransportUrl: *remoteUrl,
+ Options: nil,
+ Name: deviceName,
+ }
+ // Pass the event back to the callback
+ callback(discoveryEvent)
continue
- }
- replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
- if !ok {
+ case <-timeout.C:
+ timeout.Stop()
continue
}
- if alpha != replyOrConfirmationConfirmation.GetConfirmation().GetAlpha() {
- continue
- }
- embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
- if !ok {
- continue
- }
- encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
- if !ok {
- continue
- }
- calDataIdentifyReply, ok := encodedReply.GetEncodedReply().(readWriteModel.CALDataIdentifyReplyExactly)
- if !ok {
- continue
- }
- identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
- if !ok {
- continue
- }
- // TODO: we could check for the exact reponse
- remoteUrl, err := url.Parse(fmt.Sprintf("tcp://%ds", tcpTransportInstance.RemoteAddress))
- if err != nil {
- log.Error().Err(err).Msg("Error creating url")
- continue
- }
- // TODO: manufaturer + type would be good but this means two requests then
- deviceName := identifyReplyCommand.GetManufacturerName()
- discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
- ProtocolCode: "c-bus",
- TransportCode: "tcp",
- TransportUrl: *remoteUrl,
- Options: nil,
- Name: deviceName,
- }
- // Pass the event back to the callback
- callback(discoveryEvent)
- continue
- case <-timeout.C:
- timeout.Stop()
- continue
}
- }
- }()
- }
+ }()
+ }
+ }()
return nil
}