You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/12 13:44:41 UTC

[plc4x] 03/05: feat(plc4go/cbus): implemented Discovery

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 840b324701e63da8a0c38674c98ac6a47b518014
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Aug 12 15:25:07 2022 +0200

    feat(plc4go/cbus): implemented Discovery
---
 plc4go/internal/cbus/Discoverer.go | 219 +++++++++++++++++++++++++++++++++++++
 plc4go/internal/cbus/Driver.go     |  10 ++
 plc4go/internal/cbus/Reader.go     |   6 +-
 3 files changed, 233 insertions(+), 2 deletions(-)

diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
new file mode 100644
index 000000000..fdce0b5b7
--- /dev/null
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cbus
+
+import (
+	"context"
+	"fmt"
+	"github.com/apache/plc4x/plc4go/spi/transports/tcp"
+	"net"
+	"net/url"
+	"time"
+
+	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
+	"github.com/apache/plc4x/plc4go/spi"
+	internalModel "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
+	"github.com/apache/plc4x/plc4go/spi/utils"
+
+	"github.com/pkg/errors"
+	"github.com/rs/zerolog/log"
+)
+
+type Discoverer struct {
+	messageCodec spi.MessageCodec
+}
+
+func NewDiscoverer() *Discoverer {
+	return &Discoverer{}
+}
+
+func (d *Discoverer) Discover(callback func(event apiModel.PlcDiscoveryEvent), discoveryOptions ...options.WithDiscoveryOption) error {
+	tcpTransport := tcp.NewTransport()
+
+	allInterfaces, err := net.Interfaces()
+	if err != nil {
+		return err
+	}
+
+	// If no device is explicitly selected via option, simply use all of them
+	// However if a discovery option is present to select a device by name, only
+	// add those devices matching any of the given names.
+	var interfaces []net.Interface
+	deviceNames := options.FilterDiscoveryOptionsDeviceName(discoveryOptions)
+	if len(deviceNames) > 0 {
+		for _, curInterface := range allInterfaces {
+			for _, deviceNameOption := range deviceNames {
+				if curInterface.Name == deviceNameOption.GetDeviceName() {
+					interfaces = append(interfaces, curInterface)
+					break
+				}
+			}
+		}
+	} else {
+		interfaces = allInterfaces
+	}
+
+	var tranportInstances []transports.TransportInstance
+	// Iterate over all network devices of this system.
+	for _, interf := range interfaces {
+		addrs, err := interf.Addrs()
+		if err != nil {
+			return err
+		}
+		// Iterate over all addresses the current interface has configured
+		// For KNX we're only interested in IPv4 addresses, as it doesn't
+		// seem to work with IPv6.
+		for _, addr := range addrs {
+			var ipv4Addr net.IP
+			switch addr.(type) {
+			// If the device is configured to communicate with a subnet
+			case *net.IPNet:
+				ipv4Addr = addr.(*net.IPNet).IP.To4()
+
+			// If the device is configured for a point-to-point connection
+			case *net.IPAddr:
+				ipv4Addr = addr.(*net.IPAddr).IP.To4()
+			}
+
+			// If we found an IPv4 address and this is not a loopback address,
+			// add it to the list of devices we will open ports and send discovery
+			// messages from.
+			if ipv4Addr != nil && !ipv4Addr.IsLoopback() {
+				addresses, err := utils.GetIPAddresses(context.TODO(), interf, false)
+				if err != nil {
+					log.Warn().Err(err).Msgf("Can't get addresses for %s", interf)
+					continue
+				}
+				for ip := range addresses {
+					// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
+					connectionUrl, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
+					if err != nil {
+						log.Error().Err(err).Msgf("Error parsing url for lookup")
+						continue
+					}
+					transportInstance, err := tcpTransport.CreateTransportInstance(*connectionUrl, nil)
+					if err != nil {
+						return err
+					}
+					err = transportInstance.Connect()
+					if err != nil {
+						continue
+					}
+
+					tranportInstances = append(tranportInstances, transportInstance)
+				}
+			}
+		}
+	}
+
+	if len(tranportInstances) <= 0 {
+		return nil
+	}
+	for _, transportInstance := range tranportInstances {
+		tcpTransportInstance := transportInstance.(*tcp.TransportInstance)
+		// Create a codec for sending and receiving messages.
+		codec := NewMessageCodec(transportInstance)
+		// Explicitly start the worker
+		if err := codec.Connect(); err != nil {
+			return errors.Wrap(err, "Error connecting")
+		}
+
+		// Prepare the discovery packet data
+		cBusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, true)
+		requestContext := readWriteModel.NewRequestContext(false)
+		calData := readWriteModel.NewCALDataIdentify(readWriteModel.Attribute_Manufacturer, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, requestContext)
+		alpha := readWriteModel.NewAlpha('x')
+		request := readWriteModel.NewRequestDirectCommandAccess(calData, alpha, 0x0, nil, nil, readWriteModel.RequestType_DIRECT_COMMAND, readWriteModel.NewRequestTermination(), cBusOptions)
+		cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
+		// Send the search request.
+		err = codec.Send(cBusMessageToServer)
+		go func() {
+			// Keep on reading responses till the timeout is done.
+			// TODO: Make this configurable
+			timeout := time.NewTimer(time.Second * 1)
+			timeout.Stop()
+			for start := time.Now(); time.Since(start) < time.Second*5; {
+				timeout.Reset(time.Second * 1)
+				select {
+				case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
+					if !timeout.Stop() {
+						<-timeout.C
+					}
+					cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
+					if !ok {
+						continue
+					}
+					messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
+					if !ok {
+						continue
+					}
+					replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+					if !ok {
+						continue
+					}
+					if alpha != replyOrConfirmationConfirmation.GetConfirmation().GetAlpha() {
+						continue
+					}
+					embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+					if !ok {
+						continue
+					}
+					encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
+					if !ok {
+						continue
+					}
+					calDataIdentifyReply, ok := encodedReply.GetEncodedReply().(readWriteModel.CALDataIdentifyReplyExactly)
+					if !ok {
+						continue
+					}
+					identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
+					if !ok {
+						continue
+					}
+					// TODO: we could check for the exact reponse
+					remoteUrl, err := url.Parse(fmt.Sprintf("tcp://%ds", tcpTransportInstance.RemoteAddress))
+					if err != nil {
+						log.Error().Err(err).Msg("Error creating url")
+						continue
+					}
+					// TODO: manufaturer + type would be good but this means two requests then
+					deviceName := identifyReplyCommand.GetManufacturerName()
+					discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
+						ProtocolCode:  "c-bus",
+						TransportCode: "tcp",
+						TransportUrl:  *remoteUrl,
+						Options:       nil,
+						Name:          deviceName,
+					}
+					// Pass the event back to the callback
+					callback(discoveryEvent)
+					continue
+				case <-timeout.C:
+					timeout.Stop()
+					continue
+				}
+			}
+		}()
+	}
+	return nil
+}
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index 220e8016d..fbdd11bb2 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -21,9 +21,11 @@ package cbus
 
 import (
 	"github.com/apache/plc4x/plc4go/pkg/api"
+	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
+	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
@@ -110,3 +112,11 @@ func (m *Driver) SetAwaitSetupComplete(awaitComplete bool) {
 func (m *Driver) SetAwaitDisconnectComplete(awaitComplete bool) {
 	m.awaitDisconnectComplete = awaitComplete
 }
+
+func (m *Driver) SupportsDiscovery() bool {
+	return true
+}
+
+func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryEvent), discoveryOptions ...options.WithDiscoveryOption) error {
+	return NewDiscoverer().Discover(callback, discoveryOptions...)
+}
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 09d799f2a..b34382df9 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -21,16 +21,18 @@ package cbus
 
 import (
 	"fmt"
+	"sync"
+	"time"
+
 	"github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/pkg/api/values"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	spiModel "github.com/apache/plc4x/plc4go/spi/model"
 	spiValues "github.com/apache/plc4x/plc4go/spi/values"
+
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
-	"sync"
-	"time"
 )
 
 type Reader struct {