You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 11:02:14 UTC

[plc4x] 05/05: test(plc4go/cbus): shutdown workers of discoverer once done

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 6dac19d75e08e8955fdc49c636aa119d5bd77f97
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 13:01:57 2023 +0200

    test(plc4go/cbus): shutdown workers of discoverer once done
---
 plc4go/internal/cbus/Discoverer.go | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 8c3606b64d..f65850f413 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -161,9 +161,22 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 				d.log.Error().Msgf("panic-ed %v. Stack: %s; ", err, debug.Stack())
 			}
 		}()
+		deviceScanWg := sync.WaitGroup{}
 		for transportInstance := range transportInstances {
 			d.log.Debug().Stringer("transportInstance", transportInstance).Msg("submitting device scan")
-			d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(transportInstance.(*tcp.TransportInstance), callback))
+			completionFuture := d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(transportInstance.(*tcp.TransportInstance), callback))
+			deviceScanWg.Add(1)
+			go func() {
+				defer deviceScanWg.Done()
+				if err := completionFuture.AwaitCompletion(context.TODO()); err != nil {
+					d.log.Debug().Err(err).Msg("error waiting for completion")
+				}
+			}()
+			deviceScanWg.Wait()
+			d.log.Info().Msg("Discovery done")
+			d.transportInstanceCreationQueue.Stop()
+			d.deviceScanningQueue.Stop()
+			// TODO: do we maybe want a callback for that? As option for example
 		}
 	}()
 	return nil
@@ -320,6 +333,12 @@ func (d *Discoverer) extractDeviceNames(discoveryOptions ...options.WithDiscover
 	return deviceNames
 }
 
+func (d *Discoverer) Close() error {
+	d.transportInstanceCreationQueue.Stop()
+	d.deviceScanningQueue.Stop()
+	return nil
+}
+
 // addressProvider is used to make discover testable
 type addressProvider interface {
 	fmt.Stringer