You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/07/15 00:43:59 UTC

[mynewt-newtmgr] branch master updated (097bb15 -> 16457af)

This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git.


    from 097bb15  nmxact - Some more concurrency fixes.
     new 05156e0  nmxact - Clarify channel owners.
     new 16457af  newtmgr - Revendor.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 newtmgr/Godeps/Godeps.json                         | 48 +++++------
 .../newtmgr/nmxact/nmble/ble_fsm.go                | 92 ++++++++++++++--------
 .../newtmgr/nmxact/nmble/ble_oic_sesn.go           | 24 ++++--
 .../newtmgr/nmxact/nmble/ble_plain_sesn.go         | 42 +++++-----
 .../newtmgr/nmxact/nmble/ble_scanner.go            |  4 +-
 .../newtmgr/nmxact/nmble/ble_util.go               | 20 ++---
 .../newtmgr/nmxact/nmble/ble_xport.go              | 31 +++++---
 .../newtmgr/nmxact/nmble/discover.go               |  8 +-
 .../newtmgr/nmxact/nmble/dispatch.go               |  2 +
 .../newtmgr/nmxact/nmble/receiver.go               | 28 +++----
 .../newtmgr/nmxact/nmp/dispatch.go                 | 10 ++-
 .../newtmgr/nmxact/nmxutil/nmxutil.go              |  8 ++
 .../newtmgr/nmxact/oic/dispatch.go                 | 17 +++-
 .../newtmgr/nmxact/omp/dispatch.go                 |  8 ++
 .../mynewt.apache.org/newtmgr/nmxact/scan/scan.go  |  8 --
 nmxact/nmble/ble_fsm.go                            |  6 +-
 nmxact/nmble/ble_oic_sesn.go                       |  7 +-
 nmxact/nmble/ble_plain_sesn.go                     | 33 ++++----
 nmxact/nmble/ble_util.go                           | 20 ++---
 nmxact/nmble/ble_xport.go                          | 27 ++++---
 nmxact/nmble/discover.go                           |  8 +-
 nmxact/nmble/dispatch.go                           |  2 +
 nmxact/nmble/receiver.go                           | 28 +++----
 nmxact/nmp/dispatch.go                             | 10 ++-
 nmxact/oic/dispatch.go                             | 17 +++-
 nmxact/omp/dispatch.go                             |  8 ++
 26 files changed, 311 insertions(+), 205 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@mynewt.apache.org" <co...@mynewt.apache.org>'].

[mynewt-newtmgr] 01/02: nmxact - Clarify channel owners.

Posted by cc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git

commit 05156e0c75ec6e710c7ea0bf89db7e31df3cbbd1
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Fri Jul 14 17:41:17 2017 -0700

    nmxact - Clarify channel owners.
---
 nmxact/nmble/ble_fsm.go        |  6 +++---
 nmxact/nmble/ble_oic_sesn.go   |  7 ++++---
 nmxact/nmble/ble_plain_sesn.go | 33 ++++++++++++++-------------------
 nmxact/nmble/ble_util.go       | 20 ++++++++++----------
 nmxact/nmble/ble_xport.go      | 27 +++++++++++++++++----------
 nmxact/nmble/discover.go       |  8 ++++----
 nmxact/nmble/dispatch.go       |  2 ++
 nmxact/nmble/receiver.go       | 28 ++++++++++++++--------------
 nmxact/nmp/dispatch.go         | 10 ++++++++--
 nmxact/oic/dispatch.go         | 17 ++++++++++++++++-
 nmxact/omp/dispatch.go         |  8 ++++++++
 11 files changed, 100 insertions(+), 66 deletions(-)

diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index 4bf20b3..6721eb0 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -169,7 +169,7 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
 
 func (bf *BleFsm) shutdown(err error) {
 	bf.params.Bx.StopWaitingForMaster(bf, err)
-	bf.rxer.ErrorAll(err)
+	bf.rxer.RemoveAll("shutdown")
 
 	bf.state = SESN_STATE_UNCONNECTED
 
@@ -202,8 +202,8 @@ func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) error {
 	bf.wg.Add(1)
 
 	go func() {
-		defer bf.rxer.RemoveSeqListener("connect", seq)
 		defer bf.wg.Done()
+		defer bf.rxer.RemoveSeqListener("connect", seq)
 
 		for {
 			select {
@@ -269,8 +269,8 @@ func (bf *BleFsm) nmpRspListen() error {
 	bf.wg.Add(1)
 
 	go func() {
-		defer bf.rxer.RemoveBaseListener("nmp-rsp", base)
 		defer bf.wg.Done()
+		defer bf.rxer.RemoveBaseListener("nmp-rsp", base)
 
 		for {
 			select {
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index 8964865..bbdafb1 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -80,8 +80,12 @@ func (bos *BleOicSesn) Open() error {
 	// Listen for disconnect in the background.
 	bos.wg.Add(1)
 	go func() {
+		// If the session is being closed, unblock the close() call.
+		defer close(bos.closeChan)
+
 		// Block until disconnect.
 		<-bos.bf.DisconnectChan()
+		nmxutil.Assert(!bos.IsOpen())
 		pd := bos.bf.PrevDisconnect()
 
 		// Signal error to all listeners.
@@ -90,9 +94,6 @@ func (bos *BleOicSesn) Open() error {
 		bos.wg.Done()
 		bos.wg.Wait()
 
-		// If the session is being closed, unblock the close() call.
-		close(bos.closeChan)
-
 		// Only execute the client's disconnect callback if the disconnect was
 		// unsolicited.
 		if pd.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index 0d77d63..9268c3d 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -2,11 +2,13 @@ package nmble
 
 import (
 	"fmt"
+	"sync"
 	"time"
 
 	"mynewt.apache.org/newt/util"
 	. "mynewt.apache.org/newtmgr/nmxact/bledefs"
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
+	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
 )
 
@@ -15,6 +17,7 @@ type BlePlainSesn struct {
 	d            *nmp.Dispatcher
 	closeTimeout time.Duration
 	onCloseCb    sesn.OnCloseFn
+	wg           sync.WaitGroup
 
 	closeChan chan struct{}
 }
@@ -61,16 +64,21 @@ func (bps *BlePlainSesn) Open() error {
 	bps.d = nmp.NewDispatcher(3)
 
 	// Listen for disconnect in the background.
+	bps.wg.Add(1)
 	go func() {
+		// If the session is being closed, unblock the close() call.
+		defer close(bps.closeChan)
+
 		// Block until disconnect.
 		<-bps.bf.DisconnectChan()
+		nmxutil.Assert(!bps.IsOpen())
+
 		pd := bps.bf.PrevDisconnect()
 
 		// Signal error to all listeners.
 		bps.d.ErrorAll(pd.Err)
-
-		// If the session is being closed, unblock the close() call.
-		close(bps.closeChan)
+		bps.wg.Done()
+		bps.wg.Wait()
 
 		// Only execute the client's disconnect callback if the disconnect was
 		// unsolicited.
@@ -80,7 +88,10 @@ func (bps *BlePlainSesn) Open() error {
 	}()
 
 	// Listen for NMP responses in the background.
+	bps.wg.Add(1)
 	go func() {
+		defer bps.wg.Done()
+
 		for {
 			data, ok := <-bps.bf.RxNmpChan()
 			if !ok {
@@ -110,22 +121,6 @@ func (bps *BlePlainSesn) IsOpen() bool {
 	return bps.bf.IsOpen()
 }
 
-// Called by the FSM when a blehostd disconnect event is received.
-func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
-	err error) {
-
-	bps.d.ErrorAll(err)
-
-	// If the session is being closed, unblock the close() call.
-	close(bps.closeChan)
-
-	// Only execute client's disconnect callback if the disconnect was
-	// unsolicited and the session was fully open.
-	if dt == FSM_DISCONNECT_TYPE_OPENED && bps.onCloseCb != nil {
-		bps.onCloseCb(bps, err)
-	}
-}
-
 func (bps *BlePlainSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
 	return nmp.EncodeNmpPlain(m)
 }
diff --git a/nmxact/nmble/ble_util.go b/nmxact/nmble/ble_util.go
index b6d2eea..3550e33 100644
--- a/nmxact/nmble/ble_util.go
+++ b/nmxact/nmble/ble_util.go
@@ -305,10 +305,10 @@ func ConnFindXact(x *BleXport, connHandle uint16) (BleConnDesc, error) {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return BleConnDesc{}, err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return connFind(x, bl, r)
 }
@@ -323,10 +323,10 @@ func GenRandAddrXact(x *BleXport) (BleAddr, error) {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return BleAddr{}, err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return genRandAddr(x, bl, r)
 }
@@ -343,10 +343,10 @@ func SetRandAddrXact(x *BleXport, addr BleAddr) error {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return setRandAddr(x, bl, r)
 }
@@ -363,10 +363,10 @@ func SetPreferredMtuXact(x *BleXport, mtu uint16) error {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return setPreferredMtu(x, bl, r)
 }
@@ -382,10 +382,10 @@ func ResetXact(x *BleXport) error {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return reset(x, bl, r)
 }
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index 1a8f45e..188a78f 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -84,7 +84,8 @@ const (
 
 // Implements xport.Xport.
 type BleXport struct {
-	Bd           *Dispatcher
+	cfg          XportCfg
+	d            *Dispatcher
 	client       *unixchild.Client
 	state        BleXportState
 	stopChan     chan struct{}
@@ -95,13 +96,11 @@ type BleXport struct {
 	randAddr     *BleAddr
 	stateMtx     sync.Mutex
 	scanner      *BleScanner
-
-	cfg XportCfg
 }
 
 func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	bx := &BleXport{
-		Bd:           NewDispatcher(),
+		d:            NewDispatcher(),
 		shutdownChan: make(chan bool),
 		readyBcast:   nmxutil.Bcaster{},
 		master:       nmxutil.NewSingleResource(),
@@ -171,7 +170,7 @@ func (bx *BleXport) addSyncListener() (*Listener, error) {
 		Seq:        BLE_SEQ_NONE,
 		ConnHandle: -1,
 	}
-	if err := bx.Bd.AddListener(base, bl); err != nil {
+	if err := bx.d.AddListener(base, bl); err != nil {
 		return nil, err
 	}
 
@@ -185,7 +184,7 @@ func (bx *BleXport) removeSyncListener() {
 		Seq:        BLE_SEQ_NONE,
 		ConnHandle: -1,
 	}
-	bx.Bd.RemoveListener(base)
+	bx.d.RemoveListener(base)
 }
 
 func (bx *BleXport) querySyncStatus() (bool, error) {
@@ -207,10 +206,10 @@ func (bx *BleXport) querySyncStatus() (bool, error) {
 		Seq:        req.Seq,
 		ConnHandle: -1,
 	}
-	if err := bx.Bd.AddListener(base, bl); err != nil {
+	if err := bx.d.AddListener(base, bl); err != nil {
 		return false, err
 	}
-	defer bx.Bd.RemoveListener(base)
+	defer bx.d.RemoveListener(base)
 
 	if err := bx.txNoSync(j); err != nil {
 		return false, err
@@ -286,7 +285,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 	// Indicate an error to all of this transport's listeners.  This prevents
 	// them from blocking endlessly while awaiting a BLE message.
 	log.Debugf("Stopping BLE dispatcher")
-	bx.Bd.ErrorAll(err)
+	bx.d.ErrorAll(err)
 
 	synced, err := bx.querySyncStatus()
 	if err == nil && synced {
@@ -386,7 +385,7 @@ func (bx *BleXport) startOnce() error {
 			case buf := <-bx.client.FromChild:
 				if len(buf) != 0 {
 					log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
-					bx.Bd.Dispatch(buf)
+					bx.d.Dispatch(buf)
 				}
 
 			case <-bx.stopChan:
@@ -542,6 +541,14 @@ func (bx *BleXport) Tx(data []byte) error {
 	return bx.txNoSync(data)
 }
 
+func (bx *BleXport) AddListener(base MsgBase, listener *Listener) error {
+	return bx.d.AddListener(base, listener)
+}
+
+func (bx *BleXport) RemoveListener(base MsgBase) *Listener {
+	return bx.d.RemoveListener(base)
+}
+
 func (bx *BleXport) RspTimeout() time.Duration {
 	return bx.cfg.BlehostdRspTimeout
 }
diff --git a/nmxact/nmble/discover.go b/nmxact/nmble/discover.go
index 2ac19ba..c8d4a3f 100644
--- a/nmxact/nmble/discover.go
+++ b/nmxact/nmble/discover.go
@@ -40,10 +40,10 @@ func (d *Discoverer) scanCancel() error {
 	}
 
 	bl := NewListener()
-	if err := d.params.Bx.Bd.AddListener(base, bl); err != nil {
+	if err := d.params.Bx.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer d.params.Bx.Bd.RemoveListener(base)
+	defer d.params.Bx.RemoveListener(base)
 
 	if err := scanCancel(d.params.Bx, bl, r); err != nil {
 		return err
@@ -79,10 +79,10 @@ func (d *Discoverer) Start(advRptCb BleAdvRptFn) error {
 	}
 
 	bl := NewListener()
-	if err := d.params.Bx.Bd.AddListener(base, bl); err != nil {
+	if err := d.params.Bx.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer d.params.Bx.Bd.RemoveListener(base)
+	defer d.params.Bx.RemoveListener(base)
 
 	d.abortChan = make(chan struct{}, 1)
 	defer func() { d.abortChan = nil }()
diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go
index 435b123..76bd18c 100644
--- a/nmxact/nmble/dispatch.go
+++ b/nmxact/nmble/dispatch.go
@@ -80,6 +80,8 @@ func (bl *Listener) Close() {
 	close(bl.TmoChan)
 }
 
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	seqMap  map[BleSeq]*Listener
 	baseMap map[MsgBase]*Listener
diff --git a/nmxact/nmble/receiver.go b/nmxact/nmble/receiver.go
index 8660393..231ad5e 100644
--- a/nmxact/nmble/receiver.go
+++ b/nmxact/nmble/receiver.go
@@ -6,11 +6,15 @@ import (
 	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 )
 
+// The receiver never writes to any of its listeners.  It only maintains a set
+// of listeners so that their lifetimes can be tracked and to facilitate their
+// removal from the BLE transport.
+
 type Receiver struct {
 	id       uint32
 	bx       *BleXport
 	logDepth int
-	bls      map[*Listener]struct{}
+	bls      map[*Listener]MsgBase
 	mtx      sync.Mutex
 	wg       sync.WaitGroup
 }
@@ -20,7 +24,7 @@ func NewReceiver(id uint32, bx *BleXport, logDepth int) *Receiver {
 		id:       id,
 		bx:       bx,
 		logDepth: logDepth + 3,
-		bls:      map[*Listener]struct{}{},
+		bls:      map[*Listener]MsgBase{},
 	}
 }
 
@@ -34,11 +38,11 @@ func (r *Receiver) addListener(name string, base MsgBase) (
 	r.mtx.Lock()
 	defer r.mtx.Unlock()
 
-	if err := r.bx.Bd.AddListener(base, bl); err != nil {
+	if err := r.bx.AddListener(base, bl); err != nil {
 		return nil, err
 	}
 
-	r.bls[bl] = struct{}{}
+	r.bls[bl] = base
 	r.wg.Add(1)
 
 	return bl, nil
@@ -68,7 +72,7 @@ func (r *Receiver) removeListener(name string, base MsgBase) *Listener {
 	r.mtx.Lock()
 	defer r.mtx.Unlock()
 
-	bl := r.bx.Bd.RemoveListener(base)
+	bl := r.bx.RemoveListener(base)
 	delete(r.bls, bl)
 
 	if bl != nil {
@@ -93,18 +97,14 @@ func (r *Receiver) RemoveSeqListener(name string, seq BleSeq) {
 	r.removeListener(name, base)
 }
 
-func (r *Receiver) ErrorAll(err error) {
-	if err == nil {
-		panic("NIL ERROR")
-	}
+func (r *Receiver) RemoveAll(name string) {
 	r.mtx.Lock()
-	defer r.mtx.Unlock()
-
 	bls := r.bls
-	r.bls = map[*Listener]struct{}{}
+	r.bls = map[*Listener]MsgBase{}
+	r.mtx.Unlock()
 
-	for bl, _ := range bls {
-		bl.ErrChan <- err
+	for _, base := range bls {
+		r.removeListener(name, base)
 	}
 }
 
diff --git a/nmxact/nmp/dispatch.go b/nmxact/nmp/dispatch.go
index fee372e..9051b5b 100644
--- a/nmxact/nmp/dispatch.go
+++ b/nmxact/nmp/dispatch.go
@@ -53,12 +53,18 @@ func (nl *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
 	return nl.tmoChan
 }
 
-func (nl *Listener) Stop() {
+func (nl *Listener) Close() {
 	if nl.timer != nil {
 		nl.timer.Stop()
 	}
+
+	close(nl.RspChan)
+	close(nl.ErrChan)
+	close(nl.tmoChan)
 }
 
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	seqListenerMap map[uint8]*Listener
 	reassembler    *Reassembler
@@ -97,7 +103,7 @@ func (d *Dispatcher) RemoveListener(seq uint8) *Listener {
 
 	nl := d.seqListenerMap[seq]
 	if nl != nil {
-		nl.Stop()
+		nl.Close()
 		delete(d.seqListenerMap, seq)
 	}
 	return nl
diff --git a/nmxact/oic/dispatch.go b/nmxact/oic/dispatch.go
index aacc277..810abe7 100644
--- a/nmxact/oic/dispatch.go
+++ b/nmxact/oic/dispatch.go
@@ -72,6 +72,18 @@ func (ol *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
 	return ol.tmoChan
 }
 
+func (ol *Listener) Close() {
+	if ol.timer != nil {
+		ol.timer.Stop()
+	}
+
+	close(ol.RspChan)
+	close(ol.ErrChan)
+	close(ol.tmoChan)
+}
+
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	tokenListenerMap map[Token]*Listener
 	reassembler      *Reassembler
@@ -124,7 +136,10 @@ func (d *Dispatcher) RemoveListener(token []byte) *Listener {
 	}
 
 	ol := d.tokenListenerMap[ot]
-	delete(d.tokenListenerMap, ot)
+	if ol != nil {
+		ol.Close()
+		delete(d.tokenListenerMap, ot)
+	}
 
 	return ol
 }
diff --git a/nmxact/omp/dispatch.go b/nmxact/omp/dispatch.go
index d0ba7d4..7647048 100644
--- a/nmxact/omp/dispatch.go
+++ b/nmxact/omp/dispatch.go
@@ -20,16 +20,21 @@
 package omp
 
 import (
+	"sync"
+
 	log "github.com/Sirupsen/logrus"
 
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
 	"mynewt.apache.org/newtmgr/nmxact/oic"
 )
 
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	nmpd   *nmp.Dispatcher
 	oicd   *oic.Dispatcher
 	stopCh chan struct{}
+	wg     sync.WaitGroup
 }
 
 func NewDispatcher(isTcp bool, logDepth int) (*Dispatcher, error) {
@@ -56,8 +61,10 @@ func (r *Dispatcher) addOmpListener() error {
 		return err
 	}
 
+	r.wg.Add(1)
 	go func() {
 		defer r.RemoveOicListener(nil)
+		defer r.wg.Done()
 
 		for {
 			select {
@@ -83,6 +90,7 @@ func (r *Dispatcher) addOmpListener() error {
 
 func (r *Dispatcher) Stop() {
 	r.stopCh <- struct{}{}
+	r.wg.Wait()
 }
 
 func (r *Dispatcher) Dispatch(data []byte) bool {

-- 
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.

[mynewt-newtmgr] 02/02: newtmgr - Revendor.

Posted by cc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git

commit 16457af3638bdf7a4aee40c5010d8509cfe2ce05
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Fri Jul 14 15:51:17 2017 -0700

    newtmgr - Revendor.
---
 newtmgr/Godeps/Godeps.json                         | 48 +++++------
 .../newtmgr/nmxact/nmble/ble_fsm.go                | 92 ++++++++++++++--------
 .../newtmgr/nmxact/nmble/ble_oic_sesn.go           | 24 ++++--
 .../newtmgr/nmxact/nmble/ble_plain_sesn.go         | 42 +++++-----
 .../newtmgr/nmxact/nmble/ble_scanner.go            |  4 +-
 .../newtmgr/nmxact/nmble/ble_util.go               | 20 ++---
 .../newtmgr/nmxact/nmble/ble_xport.go              | 31 +++++---
 .../newtmgr/nmxact/nmble/discover.go               |  8 +-
 .../newtmgr/nmxact/nmble/dispatch.go               |  2 +
 .../newtmgr/nmxact/nmble/receiver.go               | 28 +++----
 .../newtmgr/nmxact/nmp/dispatch.go                 | 10 ++-
 .../newtmgr/nmxact/nmxutil/nmxutil.go              |  8 ++
 .../newtmgr/nmxact/oic/dispatch.go                 | 17 +++-
 .../newtmgr/nmxact/omp/dispatch.go                 |  8 ++
 .../mynewt.apache.org/newtmgr/nmxact/scan/scan.go  |  8 --
 15 files changed, 211 insertions(+), 139 deletions(-)

diff --git a/newtmgr/Godeps/Godeps.json b/newtmgr/Godeps/Godeps.json
index a89fd26..d205eef 100644
--- a/newtmgr/Godeps/Godeps.json
+++ b/newtmgr/Godeps/Godeps.json
@@ -126,63 +126,63 @@
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/bledefs",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmble",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmp",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmserial",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmxutil",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/oic",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/omp",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/scan",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/sesn",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/udp",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/xact",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/xport",
-			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
-			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+			"Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+			"Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
 		}
 	]
 }
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
index 44a7908..6721eb0 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
@@ -4,6 +4,7 @@ import (
 	"encoding/hex"
 	"fmt"
 	"strings"
+	"sync"
 	"sync/atomic"
 	"time"
 
@@ -73,19 +74,21 @@ type BleFsmParams struct {
 type BleFsm struct {
 	params BleFsmParams
 
-	connHandle uint16
-	peerDev    BleDev
-	nmpSvc     *BleSvc
-	nmpReqChr  *BleChr
-	nmpRspChr  *BleChr
-	attMtu     int
-	state      BleSesnState
-	rxer       *Receiver
-	errFunnel  nmxutil.ErrFunnel
-	id         uint32
+	connHandle     uint16
+	peerDev        BleDev
+	nmpSvc         *BleSvc
+	nmpReqChr      *BleChr
+	nmpRspChr      *BleChr
+	prevDisconnect BleDisconnectEntry
+	attMtu         int
+	state          BleSesnState
+	rxer           *Receiver
+	errFunnel      nmxutil.ErrFunnel
+	id             uint32
+	wg             sync.WaitGroup
 
 	encBcast       nmxutil.Bcaster
-	disconnectChan chan BleDisconnectEntry
+	disconnectChan chan struct{}
 	rxNmpChan      chan []byte
 }
 
@@ -164,35 +167,44 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
 	}
 }
 
-// Listens for an error in the state machine.  On error, the session is
-// considered disconnected and the error is reported to the client.
-func (bf *BleFsm) listenForError() {
-	err := <-bf.errFunnel.Wait()
-
-	// Stop listening for NMP responses.
-	close(bf.rxNmpChan)
-
-	// Remember some fields before we clear them.
-	dt := calcDisconnectType(bf.state)
-
+func (bf *BleFsm) shutdown(err error) {
 	bf.params.Bx.StopWaitingForMaster(bf, err)
-	bf.rxer.ErrorAll(err)
+	bf.rxer.RemoveAll("shutdown")
 
 	bf.state = SESN_STATE_UNCONNECTED
 
 	// Wait for all listeners to get removed.
 	bf.rxer.WaitUntilNoListeners()
+	bf.wg.Wait()
 
-	bf.disconnectChan <- BleDisconnectEntry{dt, bf.peerDev, err}
+	close(bf.rxNmpChan)
 	close(bf.disconnectChan)
+}
+
+// Listens for an error in the state machine.  On error, the session is
+// considered disconnected and the error is reported to the client.
+func (bf *BleFsm) listenForError() {
+	bf.wg.Add(1)
+
+	go func() {
+		err := <-bf.errFunnel.Wait()
+
+		dt := calcDisconnectType(bf.state)
+		bf.prevDisconnect = BleDisconnectEntry{dt, bf.peerDev, err}
 
-	bf.disconnectChan = make(chan BleDisconnectEntry)
+		bf.wg.Done()
+		bf.shutdown(err)
+	}()
 }
 
 // Listens for events in the background.
 func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) error {
+	bf.wg.Add(1)
+
 	go func() {
+		defer bf.wg.Done()
 		defer bf.rxer.RemoveSeqListener("connect", seq)
+
 		for {
 			select {
 			case err := <-bl.ErrChan:
@@ -254,8 +266,12 @@ func (bf *BleFsm) nmpRspListen() error {
 		return err
 	}
 
+	bf.wg.Add(1)
+
 	go func() {
+		defer bf.wg.Done()
 		defer bf.rxer.RemoveBaseListener("nmp-rsp", base)
+
 		for {
 			select {
 			case <-bl.ErrChan:
@@ -629,7 +645,7 @@ func (bf *BleFsm) executeState() (bool, error) {
 	return false, nil
 }
 
-func (bf *BleFsm) DisconnectChan() <-chan BleDisconnectEntry {
+func (bf *BleFsm) DisconnectChan() <-chan struct{} {
 	return bf.disconnectChan
 }
 
@@ -637,6 +653,10 @@ func (bf *BleFsm) RxNmpChan() <-chan []byte {
 	return bf.rxNmpChan
 }
 
+func (bf *BleFsm) PrevDisconnect() BleDisconnectEntry {
+	return bf.prevDisconnect
+}
+
 func (bf *BleFsm) startOnce() (bool, error) {
 	if !bf.IsClosed() {
 		return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
@@ -644,16 +664,23 @@ func (bf *BleFsm) startOnce() (bool, error) {
 			bf.state))
 	}
 
+	bf.disconnectChan = make(chan struct{})
+	bf.rxNmpChan = make(chan []byte)
+
+	bf.listenForError()
+
 	for {
 		retry, err := bf.executeState()
 		if err != nil {
-			bf.errFunnel.Insert(err)
-			err = <-bf.errFunnel.Wait()
+			// If stop fails, assume the connection wasn't established and
+			// force an error.
+			if bf.Stop() != nil {
+				bf.errFunnel.Insert(err)
+			}
+			<-bf.disconnectChan
 			return retry, err
 		} else if bf.state == SESN_STATE_DONE {
 			// We are fully connected.  Listen for errors in the background.
-			go bf.listenForError()
-
 			return false, nil
 		}
 	}
@@ -665,9 +692,6 @@ func (bf *BleFsm) startOnce() (bool, error) {
 func (bf *BleFsm) Start() error {
 	var err error
 
-	bf.disconnectChan = make(chan BleDisconnectEntry)
-	bf.rxNmpChan = make(chan []byte)
-
 	for i := 0; i < bf.params.Central.ConnTries; i++ {
 		var retry bool
 		retry, err = bf.startOnce()
@@ -677,6 +701,8 @@ func (bf *BleFsm) Start() error {
 	}
 
 	if err != nil {
+		nmxutil.Assert(!bf.IsOpen())
+		nmxutil.Assert(bf.IsClosed())
 		return err
 	}
 
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
index e1bf28b..bbdafb1 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
@@ -2,6 +2,7 @@ package nmble
 
 import (
 	"fmt"
+	"sync"
 	"time"
 
 	"github.com/runtimeco/go-coap"
@@ -20,6 +21,7 @@ type BleOicSesn struct {
 	d            *omp.Dispatcher
 	closeTimeout time.Duration
 	onCloseCb    sesn.OnCloseFn
+	wg           sync.WaitGroup
 
 	closeChan chan struct{}
 }
@@ -76,26 +78,34 @@ func (bos *BleOicSesn) Open() error {
 	bos.d = d
 
 	// Listen for disconnect in the background.
+	bos.wg.Add(1)
 	go func() {
+		// If the session is being closed, unblock the close() call.
+		defer close(bos.closeChan)
+
 		// Block until disconnect.
-		entry := <-bos.bf.DisconnectChan()
+		<-bos.bf.DisconnectChan()
+		nmxutil.Assert(!bos.IsOpen())
+		pd := bos.bf.PrevDisconnect()
 
 		// Signal error to all listeners.
-		bos.d.ErrorAll(entry.Err)
+		bos.d.ErrorAll(pd.Err)
 		bos.d.Stop()
-
-		// If the session is being closed, unblock the close() call.
-		close(bos.closeChan)
+		bos.wg.Done()
+		bos.wg.Wait()
 
 		// Only execute the client's disconnect callback if the disconnect was
 		// unsolicited.
-		if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
-			bos.onCloseCb(bos, entry.Err)
+		if pd.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
+			bos.onCloseCb(bos, pd.Err)
 		}
 	}()
 
 	// Listen for NMP responses in the background.
+	bos.wg.Add(1)
 	go func() {
+		defer bos.wg.Done()
+
 		for {
 			data, ok := <-bos.bf.RxNmpChan()
 			if !ok {
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
index 782fce0..9268c3d 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
@@ -2,11 +2,13 @@ package nmble
 
 import (
 	"fmt"
+	"sync"
 	"time"
 
 	"mynewt.apache.org/newt/util"
 	. "mynewt.apache.org/newtmgr/nmxact/bledefs"
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
+	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
 )
 
@@ -15,6 +17,7 @@ type BlePlainSesn struct {
 	d            *nmp.Dispatcher
 	closeTimeout time.Duration
 	onCloseCb    sesn.OnCloseFn
+	wg           sync.WaitGroup
 
 	closeChan chan struct{}
 }
@@ -61,25 +64,34 @@ func (bps *BlePlainSesn) Open() error {
 	bps.d = nmp.NewDispatcher(3)
 
 	// Listen for disconnect in the background.
+	bps.wg.Add(1)
 	go func() {
+		// If the session is being closed, unblock the close() call.
+		defer close(bps.closeChan)
+
 		// Block until disconnect.
-		entry := <-bps.bf.DisconnectChan()
+		<-bps.bf.DisconnectChan()
+		nmxutil.Assert(!bps.IsOpen())
 
-		// Signal error to all listeners.
-		bps.d.ErrorAll(entry.Err)
+		pd := bps.bf.PrevDisconnect()
 
-		// If the session is being closed, unblock the close() call.
-		close(bps.closeChan)
+		// Signal error to all listeners.
+		bps.d.ErrorAll(pd.Err)
+		bps.wg.Done()
+		bps.wg.Wait()
 
 		// Only execute the client's disconnect callback if the disconnect was
 		// unsolicited.
-		if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil {
-			bps.onCloseCb(bps, entry.Err)
+		if pd.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil {
+			bps.onCloseCb(bps, pd.Err)
 		}
 	}()
 
 	// Listen for NMP responses in the background.
+	bps.wg.Add(1)
 	go func() {
+		defer bps.wg.Done()
+
 		for {
 			data, ok := <-bps.bf.RxNmpChan()
 			if !ok {
@@ -109,22 +121,6 @@ func (bps *BlePlainSesn) IsOpen() bool {
 	return bps.bf.IsOpen()
 }
 
-// Called by the FSM when a blehostd disconnect event is received.
-func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
-	err error) {
-
-	bps.d.ErrorAll(err)
-
-	// If the session is being closed, unblock the close() call.
-	close(bps.closeChan)
-
-	// Only execute client's disconnect callback if the disconnect was
-	// unsolicited and the session was fully open.
-	if dt == FSM_DISCONNECT_TYPE_OPENED && bps.onCloseCb != nil {
-		bps.onCloseCb(bps, err)
-	}
-}
-
 func (bps *BlePlainSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
 	return nmp.EncodeNmpPlain(m)
 }
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go
index da8532c..6d4ee0f 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go
@@ -114,7 +114,9 @@ func (s *BleScanner) scan() (*scan.ScanPeer, error) {
 		return nil, nil
 	}
 
-	s.connect(*dev)
+	if err := s.connect(*dev); err != nil {
+		return nil, err
+	}
 	defer s.bos.Close()
 
 	// Now we are connected (and paired if required).  Read the peer's hardware
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go
index b6d2eea..3550e33 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go
@@ -305,10 +305,10 @@ func ConnFindXact(x *BleXport, connHandle uint16) (BleConnDesc, error) {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return BleConnDesc{}, err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return connFind(x, bl, r)
 }
@@ -323,10 +323,10 @@ func GenRandAddrXact(x *BleXport) (BleAddr, error) {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return BleAddr{}, err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return genRandAddr(x, bl, r)
 }
@@ -343,10 +343,10 @@ func SetRandAddrXact(x *BleXport, addr BleAddr) error {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return setRandAddr(x, bl, r)
 }
@@ -363,10 +363,10 @@ func SetPreferredMtuXact(x *BleXport, mtu uint16) error {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return setPreferredMtu(x, bl, r)
 }
@@ -382,10 +382,10 @@ func ResetXact(x *BleXport) error {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return reset(x, bl, r)
 }
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go
index 67b2d60..188a78f 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go
@@ -84,7 +84,8 @@ const (
 
 // Implements xport.Xport.
 type BleXport struct {
-	Bd           *Dispatcher
+	cfg          XportCfg
+	d            *Dispatcher
 	client       *unixchild.Client
 	state        BleXportState
 	stopChan     chan struct{}
@@ -95,13 +96,11 @@ type BleXport struct {
 	randAddr     *BleAddr
 	stateMtx     sync.Mutex
 	scanner      *BleScanner
-
-	cfg XportCfg
 }
 
 func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	bx := &BleXport{
-		Bd:           NewDispatcher(),
+		d:            NewDispatcher(),
 		shutdownChan: make(chan bool),
 		readyBcast:   nmxutil.Bcaster{},
 		master:       nmxutil.NewSingleResource(),
@@ -171,7 +170,7 @@ func (bx *BleXport) addSyncListener() (*Listener, error) {
 		Seq:        BLE_SEQ_NONE,
 		ConnHandle: -1,
 	}
-	if err := bx.Bd.AddListener(base, bl); err != nil {
+	if err := bx.d.AddListener(base, bl); err != nil {
 		return nil, err
 	}
 
@@ -185,7 +184,7 @@ func (bx *BleXport) removeSyncListener() {
 		Seq:        BLE_SEQ_NONE,
 		ConnHandle: -1,
 	}
-	bx.Bd.RemoveListener(base)
+	bx.d.RemoveListener(base)
 }
 
 func (bx *BleXport) querySyncStatus() (bool, error) {
@@ -207,10 +206,10 @@ func (bx *BleXport) querySyncStatus() (bool, error) {
 		Seq:        req.Seq,
 		ConnHandle: -1,
 	}
-	if err := bx.Bd.AddListener(base, bl); err != nil {
+	if err := bx.d.AddListener(base, bl); err != nil {
 		return false, err
 	}
-	defer bx.Bd.RemoveListener(base)
+	defer bx.d.RemoveListener(base)
 
 	if err := bx.txNoSync(j); err != nil {
 		return false, err
@@ -286,7 +285,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 	// Indicate an error to all of this transport's listeners.  This prevents
 	// them from blocking endlessly while awaiting a BLE message.
 	log.Debugf("Stopping BLE dispatcher")
-	bx.Bd.ErrorAll(err)
+	bx.d.ErrorAll(err)
 
 	synced, err := bx.querySyncStatus()
 	if err == nil && synced {
@@ -367,13 +366,13 @@ func (bx *BleXport) startOnce() error {
 		return nmxutil.NewXportError("BLE xport started twice")
 	}
 
+	bx.stopChan = make(chan struct{})
+
 	if err := bx.startUnixChild(); err != nil {
 		bx.shutdown(true, err)
 		return err
 	}
 
-	bx.stopChan = make(chan struct{})
-
 	// Listen for errors and data from the blehostd process.
 	go func() {
 		for {
@@ -386,7 +385,7 @@ func (bx *BleXport) startOnce() error {
 			case buf := <-bx.client.FromChild:
 				if len(buf) != 0 {
 					log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
-					bx.Bd.Dispatch(buf)
+					bx.d.Dispatch(buf)
 				}
 
 			case <-bx.stopChan:
@@ -542,6 +541,14 @@ func (bx *BleXport) Tx(data []byte) error {
 	return bx.txNoSync(data)
 }
 
+func (bx *BleXport) AddListener(base MsgBase, listener *Listener) error {
+	return bx.d.AddListener(base, listener)
+}
+
+func (bx *BleXport) RemoveListener(base MsgBase) *Listener {
+	return bx.d.RemoveListener(base)
+}
+
 func (bx *BleXport) RspTimeout() time.Duration {
 	return bx.cfg.BlehostdRspTimeout
 }
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go
index 2ac19ba..c8d4a3f 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go
@@ -40,10 +40,10 @@ func (d *Discoverer) scanCancel() error {
 	}
 
 	bl := NewListener()
-	if err := d.params.Bx.Bd.AddListener(base, bl); err != nil {
+	if err := d.params.Bx.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer d.params.Bx.Bd.RemoveListener(base)
+	defer d.params.Bx.RemoveListener(base)
 
 	if err := scanCancel(d.params.Bx, bl, r); err != nil {
 		return err
@@ -79,10 +79,10 @@ func (d *Discoverer) Start(advRptCb BleAdvRptFn) error {
 	}
 
 	bl := NewListener()
-	if err := d.params.Bx.Bd.AddListener(base, bl); err != nil {
+	if err := d.params.Bx.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer d.params.Bx.Bd.RemoveListener(base)
+	defer d.params.Bx.RemoveListener(base)
 
 	d.abortChan = make(chan struct{}, 1)
 	defer func() { d.abortChan = nil }()
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go
index 435b123..76bd18c 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go
@@ -80,6 +80,8 @@ func (bl *Listener) Close() {
 	close(bl.TmoChan)
 }
 
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	seqMap  map[BleSeq]*Listener
 	baseMap map[MsgBase]*Listener
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go
index 8660393..231ad5e 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go
@@ -6,11 +6,15 @@ import (
 	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 )
 
+// The receiver never writes to any of its listeners.  It only maintains a set
+// of listeners so that their lifetimes can be tracked and to facilitate their
+// removal from the BLE transport.
+
 type Receiver struct {
 	id       uint32
 	bx       *BleXport
 	logDepth int
-	bls      map[*Listener]struct{}
+	bls      map[*Listener]MsgBase
 	mtx      sync.Mutex
 	wg       sync.WaitGroup
 }
@@ -20,7 +24,7 @@ func NewReceiver(id uint32, bx *BleXport, logDepth int) *Receiver {
 		id:       id,
 		bx:       bx,
 		logDepth: logDepth + 3,
-		bls:      map[*Listener]struct{}{},
+		bls:      map[*Listener]MsgBase{},
 	}
 }
 
@@ -34,11 +38,11 @@ func (r *Receiver) addListener(name string, base MsgBase) (
 	r.mtx.Lock()
 	defer r.mtx.Unlock()
 
-	if err := r.bx.Bd.AddListener(base, bl); err != nil {
+	if err := r.bx.AddListener(base, bl); err != nil {
 		return nil, err
 	}
 
-	r.bls[bl] = struct{}{}
+	r.bls[bl] = base
 	r.wg.Add(1)
 
 	return bl, nil
@@ -68,7 +72,7 @@ func (r *Receiver) removeListener(name string, base MsgBase) *Listener {
 	r.mtx.Lock()
 	defer r.mtx.Unlock()
 
-	bl := r.bx.Bd.RemoveListener(base)
+	bl := r.bx.RemoveListener(base)
 	delete(r.bls, bl)
 
 	if bl != nil {
@@ -93,18 +97,14 @@ func (r *Receiver) RemoveSeqListener(name string, seq BleSeq) {
 	r.removeListener(name, base)
 }
 
-func (r *Receiver) ErrorAll(err error) {
-	if err == nil {
-		panic("NIL ERROR")
-	}
+func (r *Receiver) RemoveAll(name string) {
 	r.mtx.Lock()
-	defer r.mtx.Unlock()
-
 	bls := r.bls
-	r.bls = map[*Listener]struct{}{}
+	r.bls = map[*Listener]MsgBase{}
+	r.mtx.Unlock()
 
-	for bl, _ := range bls {
-		bl.ErrChan <- err
+	for _, base := range bls {
+		r.removeListener(name, base)
 	}
 }
 
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmp/dispatch.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmp/dispatch.go
index fee372e..9051b5b 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmp/dispatch.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmp/dispatch.go
@@ -53,12 +53,18 @@ func (nl *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
 	return nl.tmoChan
 }
 
-func (nl *Listener) Stop() {
+func (nl *Listener) Close() {
 	if nl.timer != nil {
 		nl.timer.Stop()
 	}
+
+	close(nl.RspChan)
+	close(nl.ErrChan)
+	close(nl.tmoChan)
 }
 
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	seqListenerMap map[uint8]*Listener
 	reassembler    *Reassembler
@@ -97,7 +103,7 @@ func (d *Dispatcher) RemoveListener(seq uint8) *Listener {
 
 	nl := d.seqListenerMap[seq]
 	if nl != nil {
-		nl.Stop()
+		nl.Close()
 		delete(d.seqListenerMap, seq)
 	}
 	return nl
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/nmxutil.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/nmxutil.go
index 00e28c4..078a06d 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/nmxutil.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/nmxutil.go
@@ -17,6 +17,8 @@ import (
 
 const DURATION_FOREVER time.Duration = math.MaxInt64
 
+var Debug bool
+
 var nextNmpSeq uint8
 var nmpSeqBeenRead bool
 var nextOicSeq uint8
@@ -41,6 +43,12 @@ func SetLogLevel(level log.Level) {
 	ListenLog.Level = level
 }
 
+func Assert(cond bool) {
+	if Debug && !cond {
+		panic("Failed assertion")
+	}
+}
+
 func NextNmpSeq() uint8 {
 	seqMutex.Lock()
 	defer seqMutex.Unlock()
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/oic/dispatch.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/oic/dispatch.go
index aacc277..810abe7 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/oic/dispatch.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/oic/dispatch.go
@@ -72,6 +72,18 @@ func (ol *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
 	return ol.tmoChan
 }
 
+func (ol *Listener) Close() {
+	if ol.timer != nil {
+		ol.timer.Stop()
+	}
+
+	close(ol.RspChan)
+	close(ol.ErrChan)
+	close(ol.tmoChan)
+}
+
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	tokenListenerMap map[Token]*Listener
 	reassembler      *Reassembler
@@ -124,7 +136,10 @@ func (d *Dispatcher) RemoveListener(token []byte) *Listener {
 	}
 
 	ol := d.tokenListenerMap[ot]
-	delete(d.tokenListenerMap, ot)
+	if ol != nil {
+		ol.Close()
+		delete(d.tokenListenerMap, ot)
+	}
 
 	return ol
 }
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/omp/dispatch.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/omp/dispatch.go
index d0ba7d4..7647048 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/omp/dispatch.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/omp/dispatch.go
@@ -20,16 +20,21 @@
 package omp
 
 import (
+	"sync"
+
 	log "github.com/Sirupsen/logrus"
 
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
 	"mynewt.apache.org/newtmgr/nmxact/oic"
 )
 
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	nmpd   *nmp.Dispatcher
 	oicd   *oic.Dispatcher
 	stopCh chan struct{}
+	wg     sync.WaitGroup
 }
 
 func NewDispatcher(isTcp bool, logDepth int) (*Dispatcher, error) {
@@ -56,8 +61,10 @@ func (r *Dispatcher) addOmpListener() error {
 		return err
 	}
 
+	r.wg.Add(1)
 	go func() {
 		defer r.RemoveOicListener(nil)
+		defer r.wg.Done()
 
 		for {
 			select {
@@ -83,6 +90,7 @@ func (r *Dispatcher) addOmpListener() error {
 
 func (r *Dispatcher) Stop() {
 	r.stopCh <- struct{}{}
+	r.wg.Wait()
 }
 
 func (r *Dispatcher) Dispatch(data []byte) bool {
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go
index 318e777..ba1a081 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go
@@ -57,14 +57,6 @@ func BleOmpScanCfg(ScanCb ScanFn) Cfg {
 					}
 				}
 
-				iotUuid, _ := bledefs.ParseUuid(bledefs.OmpUnsecSvcUuid)
-				for _, u128 := range adv.Fields.Uuids128 {
-					u := bledefs.BleUuid{U128: u128}
-					if bledefs.CompareUuids(u, iotUuid) == 0 {
-						return true
-					}
-				}
-
 				return false
 			},
 		},

-- 
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.