You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 11:02:14 UTC
[plc4x] 05/05: test(plc4go/cbus): shutdown workers of discoverer once done
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 6dac19d75e08e8955fdc49c636aa119d5bd77f97
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 13:01:57 2023 +0200
test(plc4go/cbus): shutdown workers of discoverer once done
---
plc4go/internal/cbus/Discoverer.go | 21 ++++++++++++++++++++-
1 file changed, 20 insertions(+), 1 deletion(-)
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 8c3606b64d..f65850f413 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -161,9 +161,22 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
d.log.Error().Msgf("panic-ed %v. Stack: %s; ", err, debug.Stack())
}
}()
+ deviceScanWg := sync.WaitGroup{}
for transportInstance := range transportInstances {
d.log.Debug().Stringer("transportInstance", transportInstance).Msg("submitting device scan")
- d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(transportInstance.(*tcp.TransportInstance), callback))
+ completionFuture := d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(transportInstance.(*tcp.TransportInstance), callback))
+ deviceScanWg.Add(1)
+ go func() {
+ defer deviceScanWg.Done()
+ if err := completionFuture.AwaitCompletion(context.TODO()); err != nil {
+ d.log.Debug().Err(err).Msg("error waiting for completion")
+ }
+ }()
+ deviceScanWg.Wait()
+ d.log.Info().Msg("Discovery done")
+ d.transportInstanceCreationQueue.Stop()
+ d.deviceScanningQueue.Stop()
+ // TODO: do we maybe want a callback for that? As option for example
}
}()
return nil
@@ -320,6 +333,12 @@ func (d *Discoverer) extractDeviceNames(discoveryOptions ...options.WithDiscover
return deviceNames
}
+func (d *Discoverer) Close() error {
+ d.transportInstanceCreationQueue.Stop()
+ d.deviceScanningQueue.Stop()
+ return nil
+}
+
// addressProvider is used to make discover testable
type addressProvider interface {
fmt.Stringer