You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/08/29 19:03:19 UTC

[mynewt-newtmgr] 03/03: nxmact - More thread-safety fixes.

This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git

commit 9c626bdf56d271f88eae5bb6d9e83f3c2791eacd
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Tue Aug 29 12:02:19 2017 -0700

    nxmact - More thread-safety fixes.
---
 nmxact/nmble/ble_scanner.go | 118 ++++++++++++++++++++++++++------------------
 nmxact/nmble/conn.go        |   3 ++
 nmxact/nmble/discover.go    |  27 +++++++---
 nmxact/nmble/master.go      |  18 ++++---
 4 files changed, 105 insertions(+), 61 deletions(-)

diff --git a/nmxact/nmble/ble_scanner.go b/nmxact/nmble/ble_scanner.go
index f99d533..ba187a3 100644
--- a/nmxact/nmble/ble_scanner.go
+++ b/nmxact/nmble/ble_scanner.go
@@ -41,16 +41,17 @@ type BleScanner struct {
 	cfg scan.Cfg
 
 	bx             *BleXport
-	discoverer     *Discoverer
-	reportedDevs   map[BleDev]string
-	failedDevs     map[BleDev]struct{}
-	ses            *BleSesn
-	enabled        bool
 	scanBlocker    nmxutil.Blocker
 	suspendBlocker nmxutil.Blocker
 
-	// Protects accesses to the reported devices map.
 	mtx sync.Mutex
+
+	// Protected by the mutex.
+	discoverer   *Discoverer
+	reportedDevs map[BleDev]string
+	failedDevs   map[BleDev]struct{}
+	ses          *BleSesn
+	enabled      bool
 }
 
 func NewBleScanner(bx *BleXport) *BleScanner {
@@ -61,6 +62,48 @@ func NewBleScanner(bx *BleXport) *BleScanner {
 	}
 }
 
+func (s *BleScanner) isEnabled() bool {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
+
+	return s.enabled
+}
+
+// Performs a compare-and-swap of the enabled state.
+func (s *BleScanner) toggleEnabled(to bool) bool {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
+
+	if s.enabled == to {
+		return false
+	}
+
+	s.enabled = to
+	return true
+}
+
+func (s *BleScanner) setSession(ses *BleSesn) {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
+
+	nmxutil.Assert(s.ses == nil || ses == nil)
+	s.ses = ses
+}
+
+func (s *BleScanner) addReportedDev(dev BleDev, hwid string) {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
+
+	s.reportedDevs[dev] = hwid
+}
+
+func (s *BleScanner) addFailedDev(dev BleDev) {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
+
+	s.failedDevs[dev] = struct{}{}
+}
+
 func (s *BleScanner) discover() (*BleDev, error) {
 	s.mtx.Lock()
 	s.discoverer = NewDiscoverer(DiscovererParams{
@@ -75,15 +118,12 @@ func (s *BleScanner) discover() (*BleDev, error) {
 
 	var dev *BleDev
 	advRptCb := func(r BleAdvReport) {
-		if dev == nil {
-			if s.cfg.Ble.ScanPred(r) {
-				s.mtx.Lock()
-
-				dev = &r.Sender
-				s.discoverer.Stop()
+		if s.cfg.Ble.ScanPred(r) {
+			s.mtx.Lock()
+			defer s.mtx.Unlock()
 
-				s.mtx.Unlock()
-			}
+			dev = &r.Sender
+			s.discoverer.Stop()
 		}
 	}
 	if err := s.discoverer.Start(advRptCb); err != nil {
@@ -95,15 +135,12 @@ func (s *BleScanner) discover() (*BleDev, error) {
 
 func (s *BleScanner) connect(dev BleDev) error {
 	s.cfg.SesnCfg.PeerSpec.Ble = dev
-	bs, err := NewBleSesn(s.bx, s.cfg.SesnCfg, MASTER_PRIO_SCAN)
+	ses, err := NewBleSesn(s.bx, s.cfg.SesnCfg, MASTER_PRIO_SCAN)
 	if err != nil {
 		return err
 	}
 
-	s.mtx.Lock()
-	s.ses = bs
-	s.mtx.Unlock()
-
+	s.setSession(ses)
 	if err := s.ses.Open(); err != nil {
 		return err
 	}
@@ -164,7 +201,10 @@ func (s *BleScanner) scan() (*scan.ScanPeer, error) {
 	if err := s.connect(*dev); err != nil {
 		return nil, err
 	}
-	defer s.ses.Close()
+	defer func() {
+		s.ses.Close()
+		s.setSession(nil)
+	}()
 
 	// Now that we have successfully connected to this device, we will report
 	// it regardless of success or failure.
@@ -184,7 +224,11 @@ func (s *BleScanner) scan() (*scan.ScanPeer, error) {
 	return &peer, nil
 }
 
+// Caller must lock mutex.
 func (s *BleScanner) seenDevice(dev BleDev) bool {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
+
 	if _, ok := s.reportedDevs[dev]; ok {
 		return true
 	}
@@ -197,7 +241,7 @@ func (s *BleScanner) seenDevice(dev BleDev) bool {
 }
 
 func (s *BleScanner) Start(cfg scan.Cfg) error {
-	if s.enabled {
+	if !s.toggleEnabled(true) {
 		return nmxutil.NewAlreadyError("Attempt to start BLE scanner twice")
 	}
 
@@ -205,18 +249,12 @@ func (s *BleScanner) Start(cfg scan.Cfg) error {
 	innerPred := cfg.Ble.ScanPred
 	cfg.Ble.ScanPred = func(adv BleAdvReport) bool {
 		// Filter devices that have already been reported.
-		s.mtx.Lock()
-		seen := s.seenDevice(adv.Sender)
-		s.mtx.Unlock()
-
-		if seen {
+		if s.seenDevice(adv.Sender) {
 			return false
 		} else {
 			return innerPred(adv)
 		}
 	}
-
-	s.enabled = true
 	s.cfg = cfg
 
 	// Start background scanning.
@@ -225,7 +263,7 @@ func (s *BleScanner) Start(cfg scan.Cfg) error {
 			// Wait for suspend-in-progress to complete, if any.
 			s.suspendBlocker.Wait(nmxutil.DURATION_FOREVER, nil)
 
-			if !s.enabled {
+			if !s.isEnabled() {
 				break
 			}
 
@@ -233,14 +271,11 @@ func (s *BleScanner) Start(cfg scan.Cfg) error {
 			if err != nil {
 				log.Debugf("Scan error: %s", err.Error())
 				if p != nil {
-					s.failedDevs[p.PeerSpec.Ble] = struct{}{}
+					s.addFailedDev(p.PeerSpec.Ble)
 				}
 				time.Sleep(scanRetryRate)
 			} else if p != nil {
-				s.mtx.Lock()
-				s.reportedDevs[p.PeerSpec.Ble] = p.HwId
-				s.mtx.Unlock()
-
+				s.addReportedDev(p.PeerSpec.Ble, p.HwId)
 				s.cfg.ScanCb(*p)
 			}
 		}
@@ -283,21 +318,10 @@ func (s *BleScanner) Preempt() error {
 
 // Stops the scanner.  Scanning won't resume unless Start() gets called.
 func (s *BleScanner) Stop() error {
-	initiate := func() error {
-		s.mtx.Lock()
-		defer s.mtx.Unlock()
-
-		if !s.enabled {
-			return nmxutil.NewAlreadyError("Attempt to stop BLE scanner twice")
-		}
-		s.enabled = false
-
-		return nil
+	if !s.toggleEnabled(false) {
+		return nmxutil.NewAlreadyError("Attempt to stop BLE scanner twice")
 	}
 
-	if err := initiate(); err != nil {
-		return err
-	}
 	return s.suspend()
 }
 
diff --git a/nmxact/nmble/conn.go b/nmxact/nmble/conn.go
index f76907a..063b48f 100644
--- a/nmxact/nmble/conn.go
+++ b/nmxact/nmble/conn.go
@@ -271,6 +271,9 @@ func (c *Conn) startConnecting() error {
 		return nmxutil.NewSesnAlreadyOpenError(
 			"BLE connection already being established")
 	}
+	if c.stopped {
+		return fmt.Errorf("Attempt to re-use Conn object")
+	}
 
 	c.connecting = true
 	return nil
diff --git a/nmxact/nmble/discover.go b/nmxact/nmble/discover.go
index 1b191f9..4431497 100644
--- a/nmxact/nmble/discover.go
+++ b/nmxact/nmble/discover.go
@@ -36,10 +36,11 @@ type DiscovererParams struct {
 }
 
 // Listens for advertisements; reports the ones that match the specified
-// predicate.
+// predicate.  This type is not thread-safe.
 type Discoverer struct {
 	params    DiscovererParams
 	abortChan chan struct{}
+	blocker   nmxutil.Blocker
 }
 
 func NewDiscoverer(params DiscovererParams) *Discoverer {
@@ -51,8 +52,7 @@ func NewDiscoverer(params DiscovererParams) *Discoverer {
 func (d *Discoverer) scanCancel() error {
 	r := NewBleScanCancelReq()
 
-	key := SeqKey(r.Seq)
-	bl, err := d.params.Bx.AddListener(key)
+	bl, err := d.params.Bx.AddListener(SeqKey(r.Seq))
 	if err != nil {
 		return err
 	}
@@ -84,17 +84,26 @@ func (d *Discoverer) Start(advRptCb BleAdvRptFn) error {
 	r.Passive = d.params.Passive
 	r.FilterDuplicates = true
 
-	key := SeqKey(r.Seq)
-	bl, err := d.params.Bx.AddListener(key)
+	bl, err := d.params.Bx.AddListener(SeqKey(r.Seq))
 	if err != nil {
 		return err
 	}
 	defer d.params.Bx.RemoveListener(bl)
 
+	// Set up the abort channel to allow discovery to be cancelled.
 	d.abortChan = make(chan struct{}, 1)
 	defer func() { d.abortChan = nil }()
 
-	err = actScan(d.params.Bx, bl, r, d.abortChan, advRptCb)
+	// Ensure subsequent calls to Stop() block until discovery is fully
+	// stopped.
+	d.blocker.Start()
+	defer d.blocker.Unblock(nil)
+
+	// Report devices in a separate Goroutine.  This is done to prevent
+	// deadlock in case the callback tries to stop the discoverer.
+	cb := func(r BleAdvReport) { go advRptCb(r) }
+
+	err = actScan(d.params.Bx, bl, r, d.abortChan, cb)
 	if !nmxutil.IsXport(err) {
 		// The transport did not restart; always attempt to cancel the scan
 		// operation.  In some cases, the host has already stopped scanning
@@ -108,14 +117,18 @@ func (d *Discoverer) Start(advRptCb BleAdvRptFn) error {
 	return err
 }
 
+// Ensures the discoverer is stopped.  Errors can typically be ignored.
 func (d *Discoverer) Stop() error {
 	ch := d.abortChan
 
 	if ch == nil {
 		return nmxutil.NewAlreadyError("Attempt to stop inactive discoverer")
 	}
-
 	close(ch)
+
+	// Don't return until discovery is fully stopped.
+	d.blocker.Wait(nmxutil.DURATION_FOREVER, nil)
+
 	return nil
 }
 
diff --git a/nmxact/nmble/master.go b/nmxact/nmble/master.go
index 350a827..16c6d85 100644
--- a/nmxact/nmble/master.go
+++ b/nmxact/nmble/master.go
@@ -38,11 +38,8 @@ func NewMaster(x *BleXport, s *BleScanner) Master {
 	}
 }
 
-// Unblocks a waiting scanner.
+// Unblocks a waiting scanner.  The caller must lock the mutex.
 func (m *Master) unblockScanner(err error) bool {
-	m.mtx.Lock()
-	defer m.mtx.Unlock()
-
 	if m.scanWait == nil {
 		return false
 	}
@@ -54,11 +51,9 @@ func (m *Master) unblockScanner(err error) bool {
 }
 
 func (m *Master) AcquireConnect(token interface{}) error {
-	m.mtx.Lock()
-
 	// Append the connector to the wait queue.
+	m.mtx.Lock()
 	ch := m.res.Acquire(token)
-
 	m.mtx.Unlock()
 
 	// Stop the scanner in case it is active; connections take priority.  We do
@@ -125,6 +120,9 @@ func (m *Master) AcquireScan(token interface{}) error {
 }
 
 func (m *Master) Release() {
+	m.mtx.Lock()
+	defer m.mtx.Unlock()
+
 	if m.res.Release() {
 		// Next waiting connector acquired the resource.
 		return
@@ -147,11 +145,17 @@ func (m *Master) StopWaitingConnect(token interface{}, err error) {
 
 // Removes the specified scanner from the wait queue.
 func (m *Master) StopWaitingScan(token interface{}, err error) {
+	m.mtx.Lock()
+	defer m.mtx.Unlock()
+
 	m.unblockScanner(err)
 }
 
 // Releases the resource and clears the wait queue.
 func (m *Master) Abort(err error) {
+	m.mtx.Lock()
+	defer m.mtx.Unlock()
+
 	m.unblockScanner(err)
 	m.res.Abort(err)
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.