You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/08/29 19:03:19 UTC
[mynewt-newtmgr] 03/03: nxmact - More thread-safety fixes.
This is an automated email from the ASF dual-hosted git repository.
ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git
commit 9c626bdf56d271f88eae5bb6d9e83f3c2791eacd
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Tue Aug 29 12:02:19 2017 -0700
nxmact - More thread-safety fixes.
---
nmxact/nmble/ble_scanner.go | 118 ++++++++++++++++++++++++++------------------
nmxact/nmble/conn.go | 3 ++
nmxact/nmble/discover.go | 27 +++++++---
nmxact/nmble/master.go | 18 ++++---
4 files changed, 105 insertions(+), 61 deletions(-)
diff --git a/nmxact/nmble/ble_scanner.go b/nmxact/nmble/ble_scanner.go
index f99d533..ba187a3 100644
--- a/nmxact/nmble/ble_scanner.go
+++ b/nmxact/nmble/ble_scanner.go
@@ -41,16 +41,17 @@ type BleScanner struct {
cfg scan.Cfg
bx *BleXport
- discoverer *Discoverer
- reportedDevs map[BleDev]string
- failedDevs map[BleDev]struct{}
- ses *BleSesn
- enabled bool
scanBlocker nmxutil.Blocker
suspendBlocker nmxutil.Blocker
- // Protects accesses to the reported devices map.
mtx sync.Mutex
+
+ // Protected by the mutex.
+ discoverer *Discoverer
+ reportedDevs map[BleDev]string
+ failedDevs map[BleDev]struct{}
+ ses *BleSesn
+ enabled bool
}
func NewBleScanner(bx *BleXport) *BleScanner {
@@ -61,6 +62,48 @@ func NewBleScanner(bx *BleXport) *BleScanner {
}
}
+func (s *BleScanner) isEnabled() bool {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ return s.enabled
+}
+
+// Performs a compare-and-swap of the enabled state.
+func (s *BleScanner) toggleEnabled(to bool) bool {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ if s.enabled == to {
+ return false
+ }
+
+ s.enabled = to
+ return true
+}
+
+func (s *BleScanner) setSession(ses *BleSesn) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ nmxutil.Assert(s.ses == nil || ses == nil)
+ s.ses = ses
+}
+
+func (s *BleScanner) addReportedDev(dev BleDev, hwid string) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ s.reportedDevs[dev] = hwid
+}
+
+func (s *BleScanner) addFailedDev(dev BleDev) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ s.failedDevs[dev] = struct{}{}
+}
+
func (s *BleScanner) discover() (*BleDev, error) {
s.mtx.Lock()
s.discoverer = NewDiscoverer(DiscovererParams{
@@ -75,15 +118,12 @@ func (s *BleScanner) discover() (*BleDev, error) {
var dev *BleDev
advRptCb := func(r BleAdvReport) {
- if dev == nil {
- if s.cfg.Ble.ScanPred(r) {
- s.mtx.Lock()
-
- dev = &r.Sender
- s.discoverer.Stop()
+ if s.cfg.Ble.ScanPred(r) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
- s.mtx.Unlock()
- }
+ dev = &r.Sender
+ s.discoverer.Stop()
}
}
if err := s.discoverer.Start(advRptCb); err != nil {
@@ -95,15 +135,12 @@ func (s *BleScanner) discover() (*BleDev, error) {
func (s *BleScanner) connect(dev BleDev) error {
s.cfg.SesnCfg.PeerSpec.Ble = dev
- bs, err := NewBleSesn(s.bx, s.cfg.SesnCfg, MASTER_PRIO_SCAN)
+ ses, err := NewBleSesn(s.bx, s.cfg.SesnCfg, MASTER_PRIO_SCAN)
if err != nil {
return err
}
- s.mtx.Lock()
- s.ses = bs
- s.mtx.Unlock()
-
+ s.setSession(ses)
if err := s.ses.Open(); err != nil {
return err
}
@@ -164,7 +201,10 @@ func (s *BleScanner) scan() (*scan.ScanPeer, error) {
if err := s.connect(*dev); err != nil {
return nil, err
}
- defer s.ses.Close()
+ defer func() {
+ s.ses.Close()
+ s.setSession(nil)
+ }()
// Now that we have successfully connected to this device, we will report
// it regardless of success or failure.
@@ -184,7 +224,11 @@ func (s *BleScanner) scan() (*scan.ScanPeer, error) {
return &peer, nil
}
+// Caller must lock mutex.
func (s *BleScanner) seenDevice(dev BleDev) bool {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
if _, ok := s.reportedDevs[dev]; ok {
return true
}
@@ -197,7 +241,7 @@ func (s *BleScanner) seenDevice(dev BleDev) bool {
}
func (s *BleScanner) Start(cfg scan.Cfg) error {
- if s.enabled {
+ if !s.toggleEnabled(true) {
return nmxutil.NewAlreadyError("Attempt to start BLE scanner twice")
}
@@ -205,18 +249,12 @@ func (s *BleScanner) Start(cfg scan.Cfg) error {
innerPred := cfg.Ble.ScanPred
cfg.Ble.ScanPred = func(adv BleAdvReport) bool {
// Filter devices that have already been reported.
- s.mtx.Lock()
- seen := s.seenDevice(adv.Sender)
- s.mtx.Unlock()
-
- if seen {
+ if s.seenDevice(adv.Sender) {
return false
} else {
return innerPred(adv)
}
}
-
- s.enabled = true
s.cfg = cfg
// Start background scanning.
@@ -225,7 +263,7 @@ func (s *BleScanner) Start(cfg scan.Cfg) error {
// Wait for suspend-in-progress to complete, if any.
s.suspendBlocker.Wait(nmxutil.DURATION_FOREVER, nil)
- if !s.enabled {
+ if !s.isEnabled() {
break
}
@@ -233,14 +271,11 @@ func (s *BleScanner) Start(cfg scan.Cfg) error {
if err != nil {
log.Debugf("Scan error: %s", err.Error())
if p != nil {
- s.failedDevs[p.PeerSpec.Ble] = struct{}{}
+ s.addFailedDev(p.PeerSpec.Ble)
}
time.Sleep(scanRetryRate)
} else if p != nil {
- s.mtx.Lock()
- s.reportedDevs[p.PeerSpec.Ble] = p.HwId
- s.mtx.Unlock()
-
+ s.addReportedDev(p.PeerSpec.Ble, p.HwId)
s.cfg.ScanCb(*p)
}
}
@@ -283,21 +318,10 @@ func (s *BleScanner) Preempt() error {
// Stops the scanner. Scanning won't resume unless Start() gets called.
func (s *BleScanner) Stop() error {
- initiate := func() error {
- s.mtx.Lock()
- defer s.mtx.Unlock()
-
- if !s.enabled {
- return nmxutil.NewAlreadyError("Attempt to stop BLE scanner twice")
- }
- s.enabled = false
-
- return nil
+ if !s.toggleEnabled(false) {
+ return nmxutil.NewAlreadyError("Attempt to stop BLE scanner twice")
}
- if err := initiate(); err != nil {
- return err
- }
return s.suspend()
}
diff --git a/nmxact/nmble/conn.go b/nmxact/nmble/conn.go
index f76907a..063b48f 100644
--- a/nmxact/nmble/conn.go
+++ b/nmxact/nmble/conn.go
@@ -271,6 +271,9 @@ func (c *Conn) startConnecting() error {
return nmxutil.NewSesnAlreadyOpenError(
"BLE connection already being established")
}
+ if c.stopped {
+ return fmt.Errorf("Attempt to re-use Conn object")
+ }
c.connecting = true
return nil
diff --git a/nmxact/nmble/discover.go b/nmxact/nmble/discover.go
index 1b191f9..4431497 100644
--- a/nmxact/nmble/discover.go
+++ b/nmxact/nmble/discover.go
@@ -36,10 +36,11 @@ type DiscovererParams struct {
}
// Listens for advertisements; reports the ones that match the specified
-// predicate.
+// predicate. This type is not thread-safe.
type Discoverer struct {
params DiscovererParams
abortChan chan struct{}
+ blocker nmxutil.Blocker
}
func NewDiscoverer(params DiscovererParams) *Discoverer {
@@ -51,8 +52,7 @@ func NewDiscoverer(params DiscovererParams) *Discoverer {
func (d *Discoverer) scanCancel() error {
r := NewBleScanCancelReq()
- key := SeqKey(r.Seq)
- bl, err := d.params.Bx.AddListener(key)
+ bl, err := d.params.Bx.AddListener(SeqKey(r.Seq))
if err != nil {
return err
}
@@ -84,17 +84,26 @@ func (d *Discoverer) Start(advRptCb BleAdvRptFn) error {
r.Passive = d.params.Passive
r.FilterDuplicates = true
- key := SeqKey(r.Seq)
- bl, err := d.params.Bx.AddListener(key)
+ bl, err := d.params.Bx.AddListener(SeqKey(r.Seq))
if err != nil {
return err
}
defer d.params.Bx.RemoveListener(bl)
+ // Set up the abort channel to allow discovery to be cancelled.
d.abortChan = make(chan struct{}, 1)
defer func() { d.abortChan = nil }()
- err = actScan(d.params.Bx, bl, r, d.abortChan, advRptCb)
+ // Ensure subsequent calls to Stop() block until discovery is fully
+ // stopped.
+ d.blocker.Start()
+ defer d.blocker.Unblock(nil)
+
+ // Report devices in a separate Goroutine. This is done to prevent
+ // deadlock in case the callback tries to stop the discoverer.
+ cb := func(r BleAdvReport) { go advRptCb(r) }
+
+ err = actScan(d.params.Bx, bl, r, d.abortChan, cb)
if !nmxutil.IsXport(err) {
// The transport did not restart; always attempt to cancel the scan
// operation. In some cases, the host has already stopped scanning
@@ -108,14 +117,18 @@ func (d *Discoverer) Start(advRptCb BleAdvRptFn) error {
return err
}
+// Ensures the discoverer is stopped. Errors can typically be ignored.
func (d *Discoverer) Stop() error {
ch := d.abortChan
if ch == nil {
return nmxutil.NewAlreadyError("Attempt to stop inactive discoverer")
}
-
close(ch)
+
+ // Don't return until discovery is fully stopped.
+ d.blocker.Wait(nmxutil.DURATION_FOREVER, nil)
+
return nil
}
diff --git a/nmxact/nmble/master.go b/nmxact/nmble/master.go
index 350a827..16c6d85 100644
--- a/nmxact/nmble/master.go
+++ b/nmxact/nmble/master.go
@@ -38,11 +38,8 @@ func NewMaster(x *BleXport, s *BleScanner) Master {
}
}
-// Unblocks a waiting scanner.
+// Unblocks a waiting scanner. The caller must lock the mutex.
func (m *Master) unblockScanner(err error) bool {
- m.mtx.Lock()
- defer m.mtx.Unlock()
-
if m.scanWait == nil {
return false
}
@@ -54,11 +51,9 @@ func (m *Master) unblockScanner(err error) bool {
}
func (m *Master) AcquireConnect(token interface{}) error {
- m.mtx.Lock()
-
// Append the connector to the wait queue.
+ m.mtx.Lock()
ch := m.res.Acquire(token)
-
m.mtx.Unlock()
// Stop the scanner in case it is active; connections take priority. We do
@@ -125,6 +120,9 @@ func (m *Master) AcquireScan(token interface{}) error {
}
func (m *Master) Release() {
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
+
if m.res.Release() {
// Next waiting connector acquired the resource.
return
@@ -147,11 +145,17 @@ func (m *Master) StopWaitingConnect(token interface{}, err error) {
// Removes the specified scanner from the wait queue.
func (m *Master) StopWaitingScan(token interface{}, err error) {
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
+
m.unblockScanner(err)
}
// Releases the resource and clears the wait queue.
func (m *Master) Abort(err error) {
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
+
m.unblockScanner(err)
m.res.Abort(err)
}
--
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.