You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/17 11:23:30 UTC

[plc4x] 03/04: feat(plc4go/knxnetip): use context for discovery

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 04fa66d532cedf17c6a9dc518c5ef2b4bd1a38f9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 13:22:49 2022 +0200

    feat(plc4go/knxnetip): use context for discovery
---
 plc4go/internal/knxnetip/Discoverer.go | 124 +++++++++++++++++----------------
 1 file changed, 63 insertions(+), 61 deletions(-)

diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index 4e7f776cc..7c435bca2 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -111,7 +111,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 				if err != nil {
 					return err
 				}
-				err = transportInstance.Connect()
+				err = transportInstance.ConnectWithContext(ctx)
 				if err != nil {
 					continue
 				}
@@ -121,72 +121,74 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 		}
 	}
 
-	if len(tranportInstances) > 0 {
-		for _, transportInstance := range tranportInstances {
-			// Create a codec for sending and receiving messages.
-			codec := NewMessageCodec(transportInstance, nil)
-			// Explicitly start the worker
-			if err := codec.Connect(); err != nil {
-				return errors.Wrap(err, "Error connecting")
-			}
+	if len(tranportInstances) <= 0 {
+		return nil
+	}
 
-			// Cast to the UDP transport instance so we can access information on the local port.
-			udpTransportInstance, ok := transportInstance.(*udp.TransportInstance)
-			if !ok {
-				return errors.New("couldn't cast transport instance to UDP transport instance")
-			}
-			localAddress := udpTransportInstance.LocalAddress
-			localAddr := driverModel.NewIPAddress(localAddress.IP)
-
-			// Prepare the discovery packet data
-			discoveryEndpoint := driverModel.NewHPAIDiscoveryEndpoint(
-				driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port))
-			searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
-			// Send the search request.
-			err = codec.Send(searchRequestMessage)
-			go func() {
-				// Keep on reading responses till the timeout is done.
-				// TODO: Make this configurable
-				timeout := time.NewTimer(time.Second * 1)
-				timeout.Stop()
-				for start := time.Now(); time.Since(start) < time.Second*5; {
-					timeout.Reset(time.Second * 1)
-					select {
-					case message := <-codec.GetDefaultIncomingMessageChannel():
-						{
-							if !timeout.Stop() {
-								<-timeout.C
+	for _, transportInstance := range tranportInstances {
+		// Create a codec for sending and receiving messages.
+		codec := NewMessageCodec(transportInstance, nil)
+		// Explicitly start the worker
+		if err := codec.Connect(); err != nil {
+			return errors.Wrap(err, "Error connecting")
+		}
+
+		// Cast to the UDP transport instance so we can access information on the local port.
+		udpTransportInstance, ok := transportInstance.(*udp.TransportInstance)
+		if !ok {
+			return errors.New("couldn't cast transport instance to UDP transport instance")
+		}
+		localAddress := udpTransportInstance.LocalAddress
+		localAddr := driverModel.NewIPAddress(localAddress.IP)
+
+		// Prepare the discovery packet data
+		discoveryEndpoint := driverModel.NewHPAIDiscoveryEndpoint(
+			driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port))
+		searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
+		// Send the search request.
+		err = codec.Send(searchRequestMessage)
+		go func() {
+			// Keep on reading responses till the timeout is done.
+			// TODO: Make this configurable
+			timeout := time.NewTimer(time.Second * 1)
+			timeout.Stop()
+			for start := time.Now(); time.Since(start) < time.Second*5; {
+				timeout.Reset(time.Second * 1)
+				select {
+				case message := <-codec.GetDefaultIncomingMessageChannel():
+					{
+						if !timeout.Stop() {
+							<-timeout.C
+						}
+						searchResponse := message.(driverModel.SearchResponse)
+						if searchResponse != nil {
+							addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
+							remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
+								uint8(addr[0]), uint8(addr[1]), uint8(addr[2]), uint8(addr[3]), searchResponse.GetHpaiControlEndpoint().GetIpPort()))
+							if err != nil {
+								continue
 							}
-							searchResponse := message.(driverModel.SearchResponse)
-							if searchResponse != nil {
-								addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
-								remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
-									uint8(addr[0]), uint8(addr[1]), uint8(addr[2]), uint8(addr[3]), searchResponse.GetHpaiControlEndpoint().GetIpPort()))
-								if err != nil {
-									continue
-								}
-								deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
-								discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
-									ProtocolCode:  "knxnet-ip",
-									TransportCode: "udp",
-									TransportUrl:  *remoteUrl,
-									Options:       nil,
-									Name:          deviceName,
-								}
-								// Pass the event back to the callback
-								callback(discoveryEvent)
+							deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
+							discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
+								ProtocolCode:  "knxnet-ip",
+								TransportCode: "udp",
+								TransportUrl:  *remoteUrl,
+								Options:       nil,
+								Name:          deviceName,
 							}
-							continue
-						}
-					case <-timeout.C:
-						{
-							timeout.Stop()
-							continue
+							// Pass the event back to the callback
+							callback(discoveryEvent)
 						}
+						continue
+					}
+				case <-timeout.C:
+					{
+						timeout.Stop()
+						continue
 					}
 				}
-			}()
-		}
+			}
+		}()
 	}
 	return nil
 }