You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/12 13:44:41 UTC
[plc4x] 03/05: feat(plc4go/cbus): implemented Discovery
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 840b324701e63da8a0c38674c98ac6a47b518014
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Aug 12 15:25:07 2022 +0200
feat(plc4go/cbus): implemented Discovery
---
plc4go/internal/cbus/Discoverer.go | 219 +++++++++++++++++++++++++++++++++++++
plc4go/internal/cbus/Driver.go | 10 ++
plc4go/internal/cbus/Reader.go | 6 +-
3 files changed, 233 insertions(+), 2 deletions(-)
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
new file mode 100644
index 000000000..fdce0b5b7
--- /dev/null
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cbus
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/plc4x/plc4go/spi/transports/tcp"
+ "net"
+ "net/url"
+ "time"
+
+ apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+ readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
+ "github.com/apache/plc4x/plc4go/spi"
+ internalModel "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/transports"
+ "github.com/apache/plc4x/plc4go/spi/utils"
+
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog/log"
+)
+
+type Discoverer struct {
+ messageCodec spi.MessageCodec
+}
+
+func NewDiscoverer() *Discoverer {
+ return &Discoverer{}
+}
+
+func (d *Discoverer) Discover(callback func(event apiModel.PlcDiscoveryEvent), discoveryOptions ...options.WithDiscoveryOption) error {
+ tcpTransport := tcp.NewTransport()
+
+ allInterfaces, err := net.Interfaces()
+ if err != nil {
+ return err
+ }
+
+ // If no device is explicitly selected via option, simply use all of them
+ // However if a discovery option is present to select a device by name, only
+ // add those devices matching any of the given names.
+ var interfaces []net.Interface
+ deviceNames := options.FilterDiscoveryOptionsDeviceName(discoveryOptions)
+ if len(deviceNames) > 0 {
+ for _, curInterface := range allInterfaces {
+ for _, deviceNameOption := range deviceNames {
+ if curInterface.Name == deviceNameOption.GetDeviceName() {
+ interfaces = append(interfaces, curInterface)
+ break
+ }
+ }
+ }
+ } else {
+ interfaces = allInterfaces
+ }
+
+ var tranportInstances []transports.TransportInstance
+ // Iterate over all network devices of this system.
+ for _, interf := range interfaces {
+ addrs, err := interf.Addrs()
+ if err != nil {
+ return err
+ }
+ // Iterate over all addresses the current interface has configured
+ // For KNX we're only interested in IPv4 addresses, as it doesn't
+ // seem to work with IPv6.
+ for _, addr := range addrs {
+ var ipv4Addr net.IP
+ switch addr.(type) {
+ // If the device is configured to communicate with a subnet
+ case *net.IPNet:
+ ipv4Addr = addr.(*net.IPNet).IP.To4()
+
+ // If the device is configured for a point-to-point connection
+ case *net.IPAddr:
+ ipv4Addr = addr.(*net.IPAddr).IP.To4()
+ }
+
+ // If we found an IPv4 address and this is not a loopback address,
+ // add it to the list of devices we will open ports and send discovery
+ // messages from.
+ if ipv4Addr != nil && !ipv4Addr.IsLoopback() {
+ addresses, err := utils.GetIPAddresses(context.TODO(), interf, false)
+ if err != nil {
+ log.Warn().Err(err).Msgf("Can't get addresses for %s", interf)
+ continue
+ }
+ for ip := range addresses {
+ // Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
+ connectionUrl, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
+ if err != nil {
+ log.Error().Err(err).Msgf("Error parsing url for lookup")
+ continue
+ }
+ transportInstance, err := tcpTransport.CreateTransportInstance(*connectionUrl, nil)
+ if err != nil {
+ return err
+ }
+ err = transportInstance.Connect()
+ if err != nil {
+ continue
+ }
+
+ tranportInstances = append(tranportInstances, transportInstance)
+ }
+ }
+ }
+ }
+
+ if len(tranportInstances) <= 0 {
+ return nil
+ }
+ for _, transportInstance := range tranportInstances {
+ tcpTransportInstance := transportInstance.(*tcp.TransportInstance)
+ // Create a codec for sending and receiving messages.
+ codec := NewMessageCodec(transportInstance)
+ // Explicitly start the worker
+ if err := codec.Connect(); err != nil {
+ return errors.Wrap(err, "Error connecting")
+ }
+
+ // Prepare the discovery packet data
+ cBusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, true)
+ requestContext := readWriteModel.NewRequestContext(false)
+ calData := readWriteModel.NewCALDataIdentify(readWriteModel.Attribute_Manufacturer, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, requestContext)
+ alpha := readWriteModel.NewAlpha('x')
+ request := readWriteModel.NewRequestDirectCommandAccess(calData, alpha, 0x0, nil, nil, readWriteModel.RequestType_DIRECT_COMMAND, readWriteModel.NewRequestTermination(), cBusOptions)
+ cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
+ // Send the search request.
+ err = codec.Send(cBusMessageToServer)
+ go func() {
+ // Keep on reading responses till the timeout is done.
+ // TODO: Make this configurable
+ timeout := time.NewTimer(time.Second * 1)
+ timeout.Stop()
+ for start := time.Now(); time.Since(start) < time.Second*5; {
+ timeout.Reset(time.Second * 1)
+ select {
+ case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
+ if !ok {
+ continue
+ }
+ messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
+ if !ok {
+ continue
+ }
+ replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !ok {
+ continue
+ }
+ if alpha != replyOrConfirmationConfirmation.GetConfirmation().GetAlpha() {
+ continue
+ }
+ embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+ if !ok {
+ continue
+ }
+ encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
+ if !ok {
+ continue
+ }
+ calDataIdentifyReply, ok := encodedReply.GetEncodedReply().(readWriteModel.CALDataIdentifyReplyExactly)
+ if !ok {
+ continue
+ }
+ identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
+ if !ok {
+ continue
+ }
+ // TODO: we could check for the exact reponse
+ remoteUrl, err := url.Parse(fmt.Sprintf("tcp://%ds", tcpTransportInstance.RemoteAddress))
+ if err != nil {
+ log.Error().Err(err).Msg("Error creating url")
+ continue
+ }
+ // TODO: manufaturer + type would be good but this means two requests then
+ deviceName := identifyReplyCommand.GetManufacturerName()
+ discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
+ ProtocolCode: "c-bus",
+ TransportCode: "tcp",
+ TransportUrl: *remoteUrl,
+ Options: nil,
+ Name: deviceName,
+ }
+ // Pass the event back to the callback
+ callback(discoveryEvent)
+ continue
+ case <-timeout.C:
+ timeout.Stop()
+ continue
+ }
+ }
+ }()
+ }
+ return nil
+}
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index 220e8016d..fbdd11bb2 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -21,9 +21,11 @@ package cbus
import (
"github.com/apache/plc4x/plc4go/pkg/api"
+ apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
@@ -110,3 +112,11 @@ func (m *Driver) SetAwaitSetupComplete(awaitComplete bool) {
func (m *Driver) SetAwaitDisconnectComplete(awaitComplete bool) {
m.awaitDisconnectComplete = awaitComplete
}
+
+func (m *Driver) SupportsDiscovery() bool {
+ return true
+}
+
+func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryEvent), discoveryOptions ...options.WithDiscoveryOption) error {
+ return NewDiscoverer().Discover(callback, discoveryOptions...)
+}
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 09d799f2a..b34382df9 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -21,16 +21,18 @@ package cbus
import (
"fmt"
+ "sync"
+ "time"
+
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
+
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
- "sync"
- "time"
)
type Reader struct {