You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/17 11:23:30 UTC
[plc4x] 03/04: feat(plc4go/knxnetip): use context for discovery
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 04fa66d532cedf17c6a9dc518c5ef2b4bd1a38f9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 13:22:49 2022 +0200
feat(plc4go/knxnetip): use context for discovery
---
plc4go/internal/knxnetip/Discoverer.go | 124 +++++++++++++++++----------------
1 file changed, 63 insertions(+), 61 deletions(-)
diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index 4e7f776cc..7c435bca2 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -111,7 +111,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
if err != nil {
return err
}
- err = transportInstance.Connect()
+ err = transportInstance.ConnectWithContext(ctx)
if err != nil {
continue
}
@@ -121,72 +121,74 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}
}
- if len(tranportInstances) > 0 {
- for _, transportInstance := range tranportInstances {
- // Create a codec for sending and receiving messages.
- codec := NewMessageCodec(transportInstance, nil)
- // Explicitly start the worker
- if err := codec.Connect(); err != nil {
- return errors.Wrap(err, "Error connecting")
- }
+ if len(tranportInstances) <= 0 {
+ return nil
+ }
- // Cast to the UDP transport instance so we can access information on the local port.
- udpTransportInstance, ok := transportInstance.(*udp.TransportInstance)
- if !ok {
- return errors.New("couldn't cast transport instance to UDP transport instance")
- }
- localAddress := udpTransportInstance.LocalAddress
- localAddr := driverModel.NewIPAddress(localAddress.IP)
-
- // Prepare the discovery packet data
- discoveryEndpoint := driverModel.NewHPAIDiscoveryEndpoint(
- driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port))
- searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
- // Send the search request.
- err = codec.Send(searchRequestMessage)
- go func() {
- // Keep on reading responses till the timeout is done.
- // TODO: Make this configurable
- timeout := time.NewTimer(time.Second * 1)
- timeout.Stop()
- for start := time.Now(); time.Since(start) < time.Second*5; {
- timeout.Reset(time.Second * 1)
- select {
- case message := <-codec.GetDefaultIncomingMessageChannel():
- {
- if !timeout.Stop() {
- <-timeout.C
+ for _, transportInstance := range tranportInstances {
+ // Create a codec for sending and receiving messages.
+ codec := NewMessageCodec(transportInstance, nil)
+ // Explicitly start the worker
+ if err := codec.Connect(); err != nil {
+ return errors.Wrap(err, "Error connecting")
+ }
+
+ // Cast to the UDP transport instance so we can access information on the local port.
+ udpTransportInstance, ok := transportInstance.(*udp.TransportInstance)
+ if !ok {
+ return errors.New("couldn't cast transport instance to UDP transport instance")
+ }
+ localAddress := udpTransportInstance.LocalAddress
+ localAddr := driverModel.NewIPAddress(localAddress.IP)
+
+ // Prepare the discovery packet data
+ discoveryEndpoint := driverModel.NewHPAIDiscoveryEndpoint(
+ driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port))
+ searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
+ // Send the search request.
+ err = codec.Send(searchRequestMessage)
+ go func() {
+ // Keep on reading responses till the timeout is done.
+ // TODO: Make this configurable
+ timeout := time.NewTimer(time.Second * 1)
+ timeout.Stop()
+ for start := time.Now(); time.Since(start) < time.Second*5; {
+ timeout.Reset(time.Second * 1)
+ select {
+ case message := <-codec.GetDefaultIncomingMessageChannel():
+ {
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ searchResponse := message.(driverModel.SearchResponse)
+ if searchResponse != nil {
+ addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
+ remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
+ uint8(addr[0]), uint8(addr[1]), uint8(addr[2]), uint8(addr[3]), searchResponse.GetHpaiControlEndpoint().GetIpPort()))
+ if err != nil {
+ continue
}
- searchResponse := message.(driverModel.SearchResponse)
- if searchResponse != nil {
- addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
- remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
- uint8(addr[0]), uint8(addr[1]), uint8(addr[2]), uint8(addr[3]), searchResponse.GetHpaiControlEndpoint().GetIpPort()))
- if err != nil {
- continue
- }
- deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
- discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
- ProtocolCode: "knxnet-ip",
- TransportCode: "udp",
- TransportUrl: *remoteUrl,
- Options: nil,
- Name: deviceName,
- }
- // Pass the event back to the callback
- callback(discoveryEvent)
+ deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
+ discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
+ ProtocolCode: "knxnet-ip",
+ TransportCode: "udp",
+ TransportUrl: *remoteUrl,
+ Options: nil,
+ Name: deviceName,
}
- continue
- }
- case <-timeout.C:
- {
- timeout.Stop()
- continue
+ // Pass the event back to the callback
+ callback(discoveryEvent)
}
+ continue
+ }
+ case <-timeout.C:
+ {
+ timeout.Stop()
+ continue
}
}
- }()
- }
+ }
+ }()
}
return nil
}