You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/07/14 02:27:42 UTC

[mynewt-newtmgr] branch master updated (40b9104 -> 854a8d9)

This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git.


    from 40b9104  newtmgr revendor
     new cf276ea  nmxact - Remove some mutexes
     new 854a8d9  newtmgr - revendor

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 newtmgr/Godeps/Godeps.json                         |  48 +++----
 .../newtmgr/nmxact/nmble/ble_act.go                |   4 +-
 .../newtmgr/nmxact/nmble/ble_fsm.go                |  93 ++++++++-----
 .../newtmgr/nmxact/nmble/ble_oic_sesn.go           | 145 +++++++--------------
 .../newtmgr/nmxact/nmble/ble_plain_sesn.go         | 109 +++++++---------
 .../newtmgr/nmxact/nmble/ble_xport.go              | 110 ++++++++--------
 .../newtmgr/nmxact/nmble/discover.go               |   2 +-
 .../newtmgr/nmxact/nmble/dispatch.go               |  20 ++-
 .../newtmgr/nmxact/nmble/receiver.go               |   3 +
 .../newtmgr/nmxact/nmxutil/bcast.go                |  43 ++++++
 .../newtmgr/nmxact/nmxutil/err_funnel.go           |  55 +-------
 nmxact/example/ble_loop/ble_loop.go                |  12 +-
 nmxact/example/ble_scan/ble_scan.go                |   7 +-
 nmxact/nmble/ble_act.go                            |   4 +-
 nmxact/nmble/ble_fsm.go                            |  93 ++++++++-----
 nmxact/nmble/ble_oic_sesn.go                       | 145 +++++++--------------
 nmxact/nmble/ble_plain_sesn.go                     | 109 +++++++---------
 nmxact/nmble/ble_xport.go                          | 110 ++++++++--------
 nmxact/nmble/discover.go                           |   2 +-
 nmxact/nmble/dispatch.go                           |  20 ++-
 nmxact/nmble/receiver.go                           |   3 +
 nmxact/nmxutil/bcast.go                            |  43 ++++++
 nmxact/nmxutil/err_funnel.go                       |  55 +-------
 23 files changed, 581 insertions(+), 654 deletions(-)
 create mode 100644 newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/bcast.go
 create mode 100644 nmxact/nmxutil/bcast.go

-- 
To stop receiving notification emails like this one, please contact
['"commits@mynewt.apache.org" <co...@mynewt.apache.org>'].

[mynewt-newtmgr] 02/02: newtmgr - revendor

Posted by cc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git

commit 854a8d93054ab8aa43bbbd43b3bbf477d1d90b5f
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Thu Jul 13 19:25:28 2017 -0700

    newtmgr - revendor
---
 newtmgr/Godeps/Godeps.json                         |  48 +++----
 .../newtmgr/nmxact/nmble/ble_act.go                |   4 +-
 .../newtmgr/nmxact/nmble/ble_fsm.go                |  93 ++++++++-----
 .../newtmgr/nmxact/nmble/ble_oic_sesn.go           | 145 +++++++--------------
 .../newtmgr/nmxact/nmble/ble_plain_sesn.go         | 109 +++++++---------
 .../newtmgr/nmxact/nmble/ble_xport.go              | 110 ++++++++--------
 .../newtmgr/nmxact/nmble/discover.go               |   2 +-
 .../newtmgr/nmxact/nmble/dispatch.go               |  20 ++-
 .../newtmgr/nmxact/nmble/receiver.go               |   3 +
 .../newtmgr/nmxact/nmxutil/bcast.go                |  43 ++++++
 .../newtmgr/nmxact/nmxutil/err_funnel.go           |  55 +-------
 11 files changed, 297 insertions(+), 335 deletions(-)

diff --git a/newtmgr/Godeps/Godeps.json b/newtmgr/Godeps/Godeps.json
index a30f60f..a89fd26 100644
--- a/newtmgr/Godeps/Godeps.json
+++ b/newtmgr/Godeps/Godeps.json
@@ -126,63 +126,63 @@
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/bledefs",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmble",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmp",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmserial",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmxutil",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/oic",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/omp",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/scan",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/sesn",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/udp",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/xact",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		},
 		{
 			"ImportPath": "mynewt.apache.org/newtmgr/nmxact/xport",
-			"Comment": "mynewt_0_9_0_tag-547-gf24b504",
-			"Rev": "f24b504146dabab8247096e523561a21f3330e13"
+			"Comment": "mynewt_0_9_0_tag-549-g46f62fa",
+			"Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
 		}
 	]
 }
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_act.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_act.go
index 2c4b71d..e51b8e9 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_act.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_act.go
@@ -324,8 +324,8 @@ func exchangeMtu(x *BleXport, bl *Listener, r *BleExchangeMtuReq) (
 	}
 }
 
-func actScan(x *BleXport, bl *Listener, r *BleScanReq,
-	abortChan chan struct{}, advRptCb BleAdvRptFn) error {
+func actScan(x *BleXport, bl *Listener, r *BleScanReq, abortChan chan struct{},
+	advRptCb BleAdvRptFn) error {
 
 	const rspType = MSG_TYPE_SCAN
 
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
index 52b163e..44a7908 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
@@ -48,8 +48,11 @@ const (
 	FSM_DISCONNECT_TYPE_REQUESTED
 )
 
-type BleRxDataFn func(data []byte)
-type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error)
+type BleDisconnectEntry struct {
+	Dt   BleFsmDisconnectType
+	Peer BleDev
+	Err  error
+}
 
 type BleFsmParamsCentral struct {
 	PeerDev     BleDev
@@ -58,15 +61,13 @@ type BleFsmParamsCentral struct {
 }
 
 type BleFsmParams struct {
-	Bx           *BleXport
-	OwnAddrType  BleAddrType
-	EncryptWhen  BleEncryptWhen
-	Central      BleFsmParamsCentral
-	SvcUuids     []BleUuid
-	ReqChrUuid   BleUuid
-	RspChrUuid   BleUuid
-	RxDataCb     BleRxDataFn
-	DisconnectCb BleDisconnectFn
+	Bx          *BleXport
+	OwnAddrType BleAddrType
+	EncryptWhen BleEncryptWhen
+	Central     BleFsmParamsCentral
+	SvcUuids    []BleUuid
+	ReqChrUuid  BleUuid
+	RspChrUuid  BleUuid
 }
 
 type BleFsm struct {
@@ -83,8 +84,9 @@ type BleFsm struct {
 	errFunnel  nmxutil.ErrFunnel
 	id         uint32
 
-	// Conveys changes in encrypted state.
-	encChan chan error
+	encBcast       nmxutil.Bcaster
+	disconnectChan chan BleDisconnectEntry
+	rxNmpChan      chan []byte
 }
 
 func NewBleFsm(p BleFsmParams) *BleFsm {
@@ -99,7 +101,6 @@ func NewBleFsm(p BleFsmParams) *BleFsm {
 
 	bf.errFunnel.AccumDelay = 250 * time.Millisecond
 	bf.errFunnel.LessCb = fsmErrorLess
-	bf.errFunnel.ProcCb = func(err error) { bf.processErr(err) }
 
 	return bf
 }
@@ -163,7 +164,14 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
 	}
 }
 
-func (bf *BleFsm) processErr(err error) {
+// Listens for an error in the state machine.  On error, the session is
+// considered disconnected and the error is reported to the client.
+func (bf *BleFsm) listenForError() {
+	err := <-bf.errFunnel.Wait()
+
+	// Stop listening for NMP responses.
+	close(bf.rxNmpChan)
+
 	// Remember some fields before we clear them.
 	dt := calcDisconnectType(bf.state)
 
@@ -175,8 +183,10 @@ func (bf *BleFsm) processErr(err error) {
 	// Wait for all listeners to get removed.
 	bf.rxer.WaitUntilNoListeners()
 
-	bf.errFunnel.Reset()
-	bf.params.DisconnectCb(dt, bf.peerDev, err)
+	bf.disconnectChan <- BleDisconnectEntry{dt, bf.peerDev, err}
+	close(bf.disconnectChan)
+
+	bf.disconnectChan = make(chan BleDisconnectEntry)
 }
 
 // Listens for events in the background.
@@ -214,9 +224,9 @@ func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) error {
 						log.Debugf("Connection encrypted; conn_handle=%d",
 							msg.ConnHandle)
 					}
-					if bf.encChan != nil {
-						bf.encChan <- err
-					}
+
+					// Notify any listeners of the encryption change event.
+					bf.encBcast.SendAndClear(err)
 
 				case *BleDisconnectEvt:
 					err := bf.disconnectError(msg.Reason)
@@ -259,7 +269,7 @@ func (bf *BleFsm) nmpRspListen() error {
 					if bf.nmpRspChr != nil &&
 						msg.AttrHandle == bf.nmpRspChr.ValHandle {
 
-						bf.params.RxDataCb(msg.Data.Bytes)
+						bf.rxNmpChan <- msg.Data.Bytes
 					}
 				}
 			}
@@ -437,16 +447,14 @@ func (bf *BleFsm) encInitiate() error {
 	}
 	defer bf.rxer.RemoveSeqListener("enc-initiate", r.Seq)
 
-	bf.encChan = make(chan error)
-	defer func() { bf.encChan = nil }()
-
 	// Initiate the encryption procedure.
 	if err := encInitiate(bf.params.Bx, bl, r); err != nil {
 		return err
 	}
 
 	// Block until the procedure completes.
-	return <-bf.encChan
+	itf := <-bf.encBcast.Listen()
+	return itf.(error)
 }
 
 func (bf *BleFsm) discAllChrs() error {
@@ -621,6 +629,14 @@ func (bf *BleFsm) executeState() (bool, error) {
 	return false, nil
 }
 
+func (bf *BleFsm) DisconnectChan() <-chan BleDisconnectEntry {
+	return bf.disconnectChan
+}
+
+func (bf *BleFsm) RxNmpChan() <-chan []byte {
+	return bf.rxNmpChan
+}
+
 func (bf *BleFsm) startOnce() (bool, error) {
 	if !bf.IsClosed() {
 		return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
@@ -628,15 +644,16 @@ func (bf *BleFsm) startOnce() (bool, error) {
 			bf.state))
 	}
 
-	bf.errFunnel.Start()
-
 	for {
 		retry, err := bf.executeState()
 		if err != nil {
 			bf.errFunnel.Insert(err)
-			err = bf.errFunnel.Wait()
+			err = <-bf.errFunnel.Wait()
 			return retry, err
 		} else if bf.state == SESN_STATE_DONE {
+			// We are fully connected.  Listen for errors in the background.
+			go bf.listenForError()
+
 			return false, nil
 		}
 	}
@@ -648,6 +665,9 @@ func (bf *BleFsm) startOnce() (bool, error) {
 func (bf *BleFsm) Start() error {
 	var err error
 
+	bf.disconnectChan = make(chan BleDisconnectEntry)
+	bf.rxNmpChan = make(chan []byte)
+
 	for i := 0; i < bf.params.Central.ConnTries; i++ {
 		var retry bool
 		retry, err = bf.startOnce()
@@ -656,12 +676,16 @@ func (bf *BleFsm) Start() error {
 		}
 	}
 
-	return err
+	if err != nil {
+		return err
+	}
+
+	return nil
 }
 
 // @return bool                 true if stop complete;
 //                              false if disconnect is now pending.
-func (bf *BleFsm) Stop() (bool, error) {
+func (bf *BleFsm) Stop() error {
 	state := bf.state
 
 	switch state {
@@ -669,19 +693,18 @@ func (bf *BleFsm) Stop() (bool, error) {
 		SESN_STATE_TERMINATING,
 		SESN_STATE_CONN_CANCELLING:
 
-		return false,
-			bf.closedError("Attempt to close an unopened BLE session")
+		return bf.closedError("Attempt to close an unopened BLE session")
 
 	case SESN_STATE_CONNECTING:
 		bf.connCancel()
 		bf.errFunnel.Insert(fmt.Errorf("Connection attempt cancelled"))
-		return false, nil
+		return nil
 
 	default:
 		if err := bf.terminate(); err != nil {
-			return false, err
+			return err
 		}
-		return false, nil
+		return nil
 	}
 }
 
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
index 9db1c84..e1bf28b 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
@@ -2,7 +2,6 @@ package nmble
 
 import (
 	"fmt"
-	"sync"
 	"time"
 
 	"github.com/runtimeco/go-coap"
@@ -22,8 +21,7 @@ type BleOicSesn struct {
 	closeTimeout time.Duration
 	onCloseCb    sesn.OnCloseFn
 
-	closeChan chan error
-	mtx       sync.Mutex
+	closeChan chan struct{}
 }
 
 func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
@@ -52,136 +50,81 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
 		ReqChrUuid:  reqChrUuid,
 		RspChrUuid:  rspChrUuid,
 		EncryptWhen: cfg.Ble.EncryptWhen,
-		RxDataCb:    func(d []byte) { bos.onRxNmp(d) },
-		DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
-			bos.onDisconnect(dt, p, e)
-		},
 	})
 
 	return bos
 }
 
-// Returns true if a new channel was assigned.
-func (bos *BleOicSesn) setCloseChan() error {
-	bos.mtx.Lock()
-	defer bos.mtx.Unlock()
-
-	if bos.closeChan != nil {
-		return fmt.Errorf("Multiple listeners waiting for session to close")
-	}
-
-	bos.closeChan = make(chan error, 1)
-	return nil
-}
-
-func (bos *BleOicSesn) clearCloseChan() {
-	bos.mtx.Lock()
-	defer bos.mtx.Unlock()
-
-	bos.closeChan = nil
+func (bos *BleOicSesn) AbortRx(seq uint8) error {
+	return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted"))
 }
 
-func (bos *BleOicSesn) listenForClose(timeout time.Duration) error {
-	select {
-	case <-bos.closeChan:
-		return nil
-	case <-time.After(timeout):
-		// Session never closed.
-		return fmt.Errorf("Timeout while waiting for session to close")
-	}
-}
+func (bos *BleOicSesn) Open() error {
+	// This channel gets closed when the session closes.
+	bos.closeChan = make(chan struct{})
 
-func (bos *BleOicSesn) blockUntilClosed(timeout time.Duration) error {
-	if err := bos.setCloseChan(); err != nil {
+	if err := bos.bf.Start(); err != nil {
+		close(bos.closeChan)
 		return err
 	}
-	defer bos.clearCloseChan()
 
-	// If the session is already closed, we're done.
-	if bos.bf.IsClosed() {
-		return nil
-	}
-
-	// Block until close completes or times out.
-	return bos.listenForClose(timeout)
-}
-
-func (bos *BleOicSesn) AbortRx(seq uint8) error {
-	return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted"))
-}
-
-func (bos *BleOicSesn) Open() error {
 	d, err := omp.NewDispatcher(true, 3)
 	if err != nil {
+		close(bos.closeChan)
 		return err
 	}
 	bos.d = d
 
-	if err := bos.bf.Start(); err != nil {
+	// Listen for disconnect in the background.
+	go func() {
+		// Block until disconnect.
+		entry := <-bos.bf.DisconnectChan()
+
+		// Signal error to all listeners.
+		bos.d.ErrorAll(entry.Err)
 		bos.d.Stop()
-		return err
-	}
+
+		// If the session is being closed, unblock the close() call.
+		close(bos.closeChan)
+
+		// Only execute the client's disconnect callback if the disconnect was
+		// unsolicited.
+		if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
+			bos.onCloseCb(bos, entry.Err)
+		}
+	}()
+
+	// Listen for NMP responses in the background.
+	go func() {
+		for {
+			data, ok := <-bos.bf.RxNmpChan()
+			if !ok {
+				// Disconnected.
+				return
+			} else {
+				bos.d.Dispatch(data)
+			}
+		}
+	}()
+
 	return nil
 }
 
 func (bos *BleOicSesn) Close() error {
-	if err := bos.setCloseChan(); err != nil {
-		return err
-	}
-	defer bos.clearCloseChan()
-
-	done, err := bos.bf.Stop()
+	err := bos.bf.Stop()
 	if err != nil {
 		return err
 	}
 
-	if done {
-		// Close complete.
-		return nil
-	}
-
-	// Block until close completes or times out.
-	return bos.listenForClose(bos.closeTimeout)
+	// Block until close completes.
+	<-bos.closeChan
+	return nil
 }
 
 func (bos *BleOicSesn) IsOpen() bool {
 	return bos.bf.IsOpen()
 }
 
-func (bos *BleOicSesn) onRxNmp(data []byte) {
-	bos.d.Dispatch(data)
-}
-
-// Called by the FSM when a blehostd disconnect event is received.
-func (bos *BleOicSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
-	err error) {
-
-	bos.d.ErrorAll(err)
-
-	bos.mtx.Lock()
-
-	// If the session is being closed, unblock the close() call.
-	if bos.closeChan != nil {
-		bos.closeChan <- err
-	}
-
-	bos.mtx.Unlock()
-
-	// Only stop the dispatcher and execute client's disconnect callback if the
-	// disconnect was unsolicited and the session was fully open.  If the
-	// session wasn't fully open, the dispatcher will get stopped when the fsm
-	// start function returns an error (right after this function returns).
-	if dt == FSM_DISCONNECT_TYPE_OPENED || dt == FSM_DISCONNECT_TYPE_REQUESTED {
-		bos.d.Stop()
-	}
-
-	if dt == FSM_DISCONNECT_TYPE_OPENED {
-		if bos.onCloseCb != nil {
-			bos.onCloseCb(bos, err)
-		}
-	}
-}
-
 func (bos *BleOicSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
 	return omp.EncodeOmpTcp(m)
 }
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
index b69d4f7..782fce0 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
@@ -16,25 +16,17 @@ type BlePlainSesn struct {
 	closeTimeout time.Duration
 	onCloseCb    sesn.OnCloseFn
 
-	closeChan chan error
+	closeChan chan struct{}
 }
 
 func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
 	bps := &BlePlainSesn{
-		d:            nmp.NewDispatcher(1),
 		closeTimeout: cfg.Ble.CloseTimeout,
 		onCloseCb:    cfg.OnCloseCb,
 	}
 
-	svcUuid, err := ParseUuid(NmpPlainSvcUuid)
-	if err != nil {
-		panic(err.Error())
-	}
-
-	chrUuid, err := ParseUuid(NmpPlainChrUuid)
-	if err != nil {
-		panic(err.Error())
-	}
+	svcUuid, _ := ParseUuid(NmpPlainSvcUuid)
+	chrUuid, _ := ParseUuid(NmpPlainChrUuid)
 
 	bps.bf = NewBleFsm(BleFsmParams{
 		Bx:          bx,
@@ -48,84 +40,83 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
 		ReqChrUuid:  chrUuid,
 		RspChrUuid:  chrUuid,
 		EncryptWhen: cfg.Ble.EncryptWhen,
-		RxDataCb:    func(d []byte) { bps.onRxNmp(d) },
-		DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
-			bps.onDisconnect(dt, p, e)
-		},
 	})
 
 	return bps
 }
 
-func (bps *BlePlainSesn) setCloseChan() error {
-	if bps.closeChan != nil {
-		return fmt.Errorf("Multiple listeners waiting for session to close")
-	}
-
-	bps.closeChan = make(chan error, 1)
-	return nil
-}
-
-func (bps *BlePlainSesn) clearCloseChan() {
-	bps.closeChan = nil
-}
-
-func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error {
-	select {
-	case <-bps.closeChan:
-		return nil
-	case <-time.After(timeout):
-		// Session never closed.
-		return fmt.Errorf("Timeout while waiting for session to close")
-	}
-}
-
 func (bps *BlePlainSesn) AbortRx(seq uint8) error {
 	return bps.d.ErrorOne(seq, fmt.Errorf("Rx aborted"))
 }
 
 func (bps *BlePlainSesn) Open() error {
-	return bps.bf.Start()
-}
+	// This channel gets closed when the session closes.
+	bps.closeChan = make(chan struct{})
 
-func (bps *BlePlainSesn) Close() error {
-	if err := bps.setCloseChan(); err != nil {
+	if err := bps.bf.Start(); err != nil {
+		close(bps.closeChan)
 		return err
 	}
-	defer bps.clearCloseChan()
 
-	done, err := bps.bf.Stop()
+	bps.d = nmp.NewDispatcher(3)
+
+	// Listen for disconnect in the background.
+	go func() {
+		// Block until disconnect.
+		entry := <-bps.bf.DisconnectChan()
+
+		// Signal error to all listeners.
+		bps.d.ErrorAll(entry.Err)
+
+		// If the session is being closed, unblock the close() call.
+		close(bps.closeChan)
+
+		// Only execute the client's disconnect callback if the disconnect was
+		// unsolicited.
+		if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil {
+			bps.onCloseCb(bps, entry.Err)
+		}
+	}()
+
+	// Listen for NMP responses in the background.
+	go func() {
+		for {
+			data, ok := <-bps.bf.RxNmpChan()
+			if !ok {
+				// Disconnected.
+				return
+			} else {
+				bps.d.Dispatch(data)
+			}
+		}
+	}()
+
+	return nil
+}
+
+func (bps *BlePlainSesn) Close() error {
+	err := bps.bf.Stop()
 	if err != nil {
 		return err
 	}
 
-	if done {
-		// Close complete.
-		return nil
-	}
-
-	// Block until close completes or times out.
-	return bps.listenForClose(bps.closeTimeout)
+	// Block until close completes.
+	<-bps.closeChan
+	return nil
 }
 
 func (bps *BlePlainSesn) IsOpen() bool {
 	return bps.bf.IsOpen()
 }
 
-func (bps *BlePlainSesn) onRxNmp(data []byte) {
-	bps.d.Dispatch(data)
-}
-
 // Called by the FSM when a blehostd disconnect event is received.
 func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
 	err error) {
 
 	bps.d.ErrorAll(err)
 
-	// If someone is waiting for the session to close, unblock them.
-	if bps.closeChan != nil {
-		bps.closeChan <- err
-	}
+	// If the session is being closed, unblock the close() call.
+	close(bps.closeChan)
 
 	// Only execute client's disconnect callback if the disconnect was
 	// unsolicited and the session was fully open.
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go
index db0a8e3..67b2d60 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go
@@ -84,19 +84,17 @@ const (
 
 // Implements xport.Xport.
 type BleXport struct {
-	Bd                *Dispatcher
-	client            *unixchild.Client
-	state             BleXportState
-	stopChan          chan struct{}
-	numStopListeners  int
-	shutdownChan      chan bool
-	readyChan         chan error
-	numReadyListeners int
-	master            nmxutil.SingleResource
-	slave             nmxutil.SingleResource
-	randAddr          *BleAddr
-	mtx               sync.Mutex
-	scanner           *BleScanner
+	Bd           *Dispatcher
+	client       *unixchild.Client
+	state        BleXportState
+	stopChan     chan struct{}
+	shutdownChan chan bool
+	readyBcast   nmxutil.Bcaster
+	master       nmxutil.SingleResource
+	slave        nmxutil.SingleResource
+	randAddr     *BleAddr
+	stateMtx     sync.Mutex
+	scanner      *BleScanner
 
 	cfg XportCfg
 }
@@ -105,7 +103,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	bx := &BleXport{
 		Bd:           NewDispatcher(),
 		shutdownChan: make(chan bool),
-		readyChan:    make(chan error),
+		readyBcast:   nmxutil.Bcaster{},
 		master:       nmxutil.NewSingleResource(),
 		slave:        nmxutil.NewSingleResource(),
 		cfg:          cfg,
@@ -114,7 +112,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	return bx, nil
 }
 
-func (bx *BleXport) createUnixChild() {
+func (bx *BleXport) startUnixChild() error {
 	config := unixchild.Config{
 		SockPath:      bx.cfg.SockPath,
 		ChildPath:     bx.cfg.BlehostdPath,
@@ -125,6 +123,20 @@ func (bx *BleXport) createUnixChild() {
 	}
 
 	bx.client = unixchild.New(config)
+
+	if err := bx.client.Start(); err != nil {
+		if unixchild.IsUcAcceptError(err) {
+			err = nmxutil.NewXportError(
+				"blehostd did not connect to socket; " +
+					"controller not attached?")
+		} else {
+			err = nmxutil.NewXportError(
+				"Failed to start child process: " + err.Error())
+		}
+		return err
+	}
+
+	return nil
 }
 
 func (bx *BleXport) BuildScanner() (scan.Scanner, error) {
@@ -240,7 +252,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 
 	log.Debugf("Shutting down BLE transport")
 
-	bx.mtx.Lock()
+	bx.stateMtx.Lock()
 
 	var fullyStarted bool
 	var already bool
@@ -260,7 +272,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 		bx.state = BLE_XPORT_STATE_STOPPING
 	}
 
-	bx.mtx.Unlock()
+	bx.stateMtx.Unlock()
 
 	if already {
 		// Shutdown already in progress.
@@ -283,10 +295,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 	}
 
 	// Stop all of this transport's go routines.
-	log.Debugf("Waiting for BLE transport goroutines to complete")
-	for i := 0; i < bx.numStopListeners; i++ {
-		bx.stopChan <- struct{}{}
-	}
+	close(bx.stopChan)
 
 	// Stop the unixchild instance (blehostd + socket).
 	if bx.client != nil {
@@ -304,37 +313,32 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 }
 
 func (bx *BleXport) blockUntilReady() error {
-	bx.mtx.Lock()
+	var ch chan interface{}
+
+	bx.stateMtx.Lock()
 	switch bx.state {
 	case BLE_XPORT_STATE_STARTED:
 		// Already started; don't block.
-		bx.mtx.Unlock()
+		bx.stateMtx.Unlock()
 		return nil
 
 	case BLE_XPORT_STATE_DORMANT:
 		// Not in the process of starting; the user will be waiting forever.
-		bx.mtx.Unlock()
+		bx.stateMtx.Unlock()
 		return fmt.Errorf("Attempt to use BLE transport without starting it")
 
 	default:
+		ch = bx.readyBcast.Listen()
 	}
+	bx.stateMtx.Unlock()
 
-	bx.numReadyListeners++
-	bx.mtx.Unlock()
-
-	return <-bx.readyChan
-}
-
-func (bx *BleXport) notifyReadyListeners(err error) {
-	for i := 0; i < bx.numReadyListeners; i++ {
-		bx.readyChan <- err
-	}
-	bx.numReadyListeners = 0
+	itf := <-ch
+	return itf.(error)
 }
 
 func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
-	bx.mtx.Lock()
-	defer bx.mtx.Unlock()
+	bx.stateMtx.Lock()
+	defer bx.stateMtx.Unlock()
 
 	if bx.state != from {
 		return false
@@ -343,9 +347,10 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
 	bx.state = to
 	switch bx.state {
 	case BLE_XPORT_STATE_STARTED:
-		bx.notifyReadyListeners(nil)
+		bx.readyBcast.SendAndClear(nil)
 	case BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_DORMANT:
-		bx.notifyReadyListeners(nmxutil.NewXportError("BLE transport stopped"))
+		bx.readyBcast.SendAndClear(
+			nmxutil.NewXportError("BLE transport stopped"))
 	default:
 	}
 
@@ -362,32 +367,21 @@ func (bx *BleXport) startOnce() error {
 		return nmxutil.NewXportError("BLE xport started twice")
 	}
 
-	bx.stopChan = make(chan struct{})
-	bx.numStopListeners = 0
-
-	bx.createUnixChild()
-	if err := bx.client.Start(); err != nil {
-		if unixchild.IsUcAcceptError(err) {
-			err = nmxutil.NewXportError(
-				"blehostd did not connect to socket; " +
-					"controller not attached?")
-		} else {
-			err = nmxutil.NewXportError(
-				"Failed to start child process: " + err.Error())
-		}
+	if err := bx.startUnixChild(); err != nil {
 		bx.shutdown(true, err)
 		return err
 	}
 
+	bx.stopChan = make(chan struct{})
+
 	// Listen for errors and data from the blehostd process.
 	go func() {
-		bx.numStopListeners++
 		for {
 			select {
 			case err := <-bx.client.ErrChild:
 				err = nmxutil.NewXportError("BLE transport error: " +
 					err.Error())
-				go bx.shutdown(true, err)
+				bx.shutdown(true, err)
 
 			case buf := <-bx.client.FromChild:
 				if len(buf) != 0 {
@@ -433,16 +427,16 @@ func (bx *BleXport) startOnce() error {
 
 	// Host and controller are synced.  Listen for sync loss in the background.
 	go func() {
-		bx.numStopListeners++
 		for {
 			select {
 			case err := <-bl.ErrChan:
-				go bx.shutdown(true, err)
+				bx.shutdown(true, err)
+				return
 			case bm := <-bl.MsgChan:
 				switch msg := bm.(type) {
 				case *BleSyncEvt:
 					if !msg.Synced {
-						go bx.shutdown(true, nmxutil.NewXportError(
+						bx.shutdown(true, nmxutil.NewXportError(
 							"BLE host <-> controller sync lost"))
 					}
 				}
@@ -452,7 +446,7 @@ func (bx *BleXport) startOnce() error {
 		}
 	}()
 
-	// Generate a new random address is none was specified.
+	// Generate a new random address if none was specified.
 	if bx.randAddr == nil {
 		addr, err := GenRandAddrXact(bx)
 		if err != nil {
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go
index 3858e75..2ac19ba 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go
@@ -108,7 +108,7 @@ func (d *Discoverer) Stop() error {
 		return nmxutil.NewAlreadyError("Attempt to stop inactive discoverer")
 	}
 
-	ch <- struct{}{}
+	close(ch)
 	return nil
 }
 
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go
index ba59f81..435b123 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go
@@ -55,7 +55,7 @@ type Listener struct {
 func NewListener() *Listener {
 	return &Listener{
 		MsgChan: make(chan Msg, 16),
-		ErrChan: make(chan error, 4),
+		ErrChan: make(chan error, 1),
 		TmoChan: make(chan time.Time, 1),
 	}
 }
@@ -70,10 +70,14 @@ func (bl *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
 	return bl.TmoChan
 }
 
-func (bl *Listener) Stop() {
+func (bl *Listener) Close() {
 	if bl.timer != nil {
 		bl.timer.Stop()
 	}
+
+	close(bl.MsgChan)
+	close(bl.ErrChan)
+	close(bl.TmoChan)
 }
 
 type Dispatcher struct {
@@ -234,7 +238,7 @@ func (d *Dispatcher) RemoveListener(base MsgBase) *Listener {
 
 	base, bl := d.findListener(base)
 	if bl != nil {
-		bl.Stop()
+		bl.Close()
 		if base.Seq != BLE_SEQ_NONE {
 			delete(d.seqMap, base.Seq)
 		} else {
@@ -264,8 +268,8 @@ func decodeMsg(data []byte) (MsgBase, Msg, error) {
 	cb := msgCtorMap[opTypePair]
 	if cb == nil {
 		return base, nil, fmt.Errorf(
-			"Unrecognized op+type pair:") // %s, %s",
-		//MsgOpToString(base.Op), MsgTypeToString(base.Type))
+			"Unrecognized op+type pair: %s, %s",
+			MsgOpToString(base.Op), MsgTypeToString(base.Type))
 	}
 
 	msg := cb()
@@ -298,6 +302,10 @@ func (d *Dispatcher) Dispatch(data []byte) {
 }
 
 func (d *Dispatcher) ErrorAll(err error) {
+	if err == nil {
+		panic("NIL ERROR")
+	}
+
 	d.mtx.Lock()
 
 	m1 := d.seqMap
@@ -310,8 +318,10 @@ func (d *Dispatcher) ErrorAll(err error) {
 
 	for _, bl := range m1 {
 		bl.ErrChan <- err
+		bl.Close()
 	}
 	for _, bl := range m2 {
 		bl.ErrChan <- err
+		bl.Close()
 	}
 }
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go
index d089ba8..8660393 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go
@@ -94,6 +94,9 @@ func (r *Receiver) RemoveSeqListener(name string, seq BleSeq) {
 }
 
 func (r *Receiver) ErrorAll(err error) {
+	if err == nil {
+		panic("NIL ERROR")
+	}
 	r.mtx.Lock()
 	defer r.mtx.Unlock()
 
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/bcast.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/bcast.go
new file mode 100644
index 0000000..a7d68c0
--- /dev/null
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/bcast.go
@@ -0,0 +1,43 @@
+package nmxutil
+
+import (
+	"sync"
+)
+
+type Bcaster struct {
+	chs [](chan interface{})
+	mtx sync.Mutex
+}
+
+func (b *Bcaster) Listen() chan interface{} {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+
+	ch := make(chan interface{})
+	b.chs = append(b.chs, ch)
+
+	return ch
+}
+
+func (b *Bcaster) Send(val interface{}) {
+	b.mtx.Lock()
+	chs := b.chs
+	b.mtx.Unlock()
+
+	for _, ch := range chs {
+		ch <- val
+		close(ch)
+	}
+}
+
+func (b *Bcaster) Clear() {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+
+	b.chs = nil
+}
+
+func (b *Bcaster) SendAndClear(val interface{}) {
+	b.Send(val)
+	b.Clear()
+}
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/err_funnel.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/err_funnel.go
index 58b0ca2..6d6b5a3 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/err_funnel.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/err_funnel.go
@@ -1,7 +1,6 @@
 package nmxutil
 
 import (
-	"fmt"
 	"sync"
 	"time"
 )
@@ -13,26 +12,15 @@ type ErrProcFn func(err error)
 // reported.
 type ErrFunnel struct {
 	LessCb     ErrLessFn
-	ProcCb     ErrProcFn
 	AccumDelay time.Duration
 
 	mtx      sync.Mutex
 	resetMtx sync.Mutex
 	curErr   error
 	errTimer *time.Timer
-	started  bool
 	waiters  [](chan error)
 }
 
-func (f *ErrFunnel) Start() {
-	f.resetMtx.Lock()
-
-	f.mtx.Lock()
-	defer f.mtx.Unlock()
-
-	f.started = true
-}
-
 func (f *ErrFunnel) Insert(err error) {
 	if err == nil {
 		panic("ErrFunnel nil insert")
@@ -41,10 +29,6 @@ func (f *ErrFunnel) Insert(err error) {
 	f.mtx.Lock()
 	defer f.mtx.Unlock()
 
-	if !f.started {
-		panic("ErrFunnel insert without start")
-	}
-
 	if f.curErr == nil {
 		f.curErr = err
 		f.errTimer = time.AfterFunc(f.AccumDelay, func() {
@@ -61,18 +45,6 @@ func (f *ErrFunnel) Insert(err error) {
 	}
 }
 
-func (f *ErrFunnel) Reset() {
-	f.mtx.Lock()
-	defer f.mtx.Unlock()
-
-	if f.started {
-		f.started = false
-		f.curErr = nil
-		f.errTimer.Stop()
-		f.resetMtx.Unlock()
-	}
-}
-
 func (f *ErrFunnel) timerExp() {
 	f.mtx.Lock()
 
@@ -88,35 +60,18 @@ func (f *ErrFunnel) timerExp() {
 		panic("ErrFunnel timer expired but no error")
 	}
 
-	f.ProcCb(err)
-
 	for _, w := range waiters {
 		w <- err
+		close(w)
 	}
 }
 
-func (f *ErrFunnel) Wait() error {
-	var err error
-	var c chan error
+func (f *ErrFunnel) Wait() chan error {
+	c := make(chan error)
 
 	f.mtx.Lock()
-
-	if !f.started {
-		if f.curErr == nil {
-			err = fmt.Errorf("Wait on unstarted ErrFunnel")
-		} else {
-			err = f.curErr
-		}
-	} else {
-		c = make(chan error)
-		f.waiters = append(f.waiters, c)
-	}
-
+	f.waiters = append(f.waiters, c)
 	f.mtx.Unlock()
 
-	if err != nil {
-		return err
-	} else {
-		return <-c
-	}
+	return c
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.

[mynewt-newtmgr] 01/02: nmxact - Remove some mutexes

Posted by cc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git

commit cf276ea9920ba3429e84648445981402acbd9abd
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Wed Jul 12 18:33:41 2017 -0700

    nmxact - Remove some mutexes
---
 nmxact/example/ble_loop/ble_loop.go |  12 ++-
 nmxact/example/ble_scan/ble_scan.go |   7 +-
 nmxact/nmble/ble_act.go             |   4 +-
 nmxact/nmble/ble_fsm.go             |  93 ++++++++++++++---------
 nmxact/nmble/ble_oic_sesn.go        | 145 +++++++++++-------------------------
 nmxact/nmble/ble_plain_sesn.go      | 109 +++++++++++++--------------
 nmxact/nmble/ble_xport.go           | 110 +++++++++++++--------------
 nmxact/nmble/discover.go            |   2 +-
 nmxact/nmble/dispatch.go            |  20 +++--
 nmxact/nmble/receiver.go            |   3 +
 nmxact/nmxutil/bcast.go             |  43 +++++++++++
 nmxact/nmxutil/err_funnel.go        |  55 ++------------
 12 files changed, 284 insertions(+), 319 deletions(-)

diff --git a/nmxact/example/ble_loop/ble_loop.go b/nmxact/example/ble_loop/ble_loop.go
index 7a930a9..9e79d29 100644
--- a/nmxact/example/ble_loop/ble_loop.go
+++ b/nmxact/example/ble_loop/ble_loop.go
@@ -26,8 +26,11 @@ import (
 	"syscall"
 	"time"
 
+	log "github.com/Sirupsen/logrus"
+
 	"mynewt.apache.org/newtmgr/nmxact/bledefs"
 	"mynewt.apache.org/newtmgr/nmxact/nmble"
+	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
 	"mynewt.apache.org/newtmgr/nmxact/xact"
 	"mynewt.apache.org/newtmgr/nmxact/xport"
@@ -58,11 +61,14 @@ func configExitHandler(x xport.Xport, s sesn.Sesn) {
 }
 
 func main() {
+	//nmxutil.SetLogLevel(log.DebugLevel)
+	nmxutil.SetLogLevel(log.InfoLevel)
+
 	// Initialize the BLE transport.
 	params := nmble.NewXportCfg()
 	params.SockPath = "/tmp/blehostd-uds"
-	params.BlehostdPath = "blehostd.elf"
-	params.DevPath = "/dev/cu.usbmodem142111"
+	params.BlehostdPath = "blehostd"
+	params.DevPath = "/dev/cu.usbmodem142121"
 
 	x, err := nmble.NewBleXport(params)
 	if err != nil {
@@ -83,7 +89,7 @@ func main() {
 	//     * Peer has name "nimble-bleprph"
 	//     * We use a random address.
 	dev, err := nmble.DiscoverDeviceWithName(
-		x, bledefs.BLE_ADDR_TYPE_RANDOM, 10*time.Second, "nimble-bleprph")
+		x, bledefs.BLE_ADDR_TYPE_RANDOM, 10*time.Second, "c4")
 	if err != nil {
 		fmt.Fprintf(os.Stderr, "error discovering device: %s\n", err.Error())
 		os.Exit(1)
diff --git a/nmxact/example/ble_scan/ble_scan.go b/nmxact/example/ble_scan/ble_scan.go
index 73ca24f..3a01146 100644
--- a/nmxact/example/ble_scan/ble_scan.go
+++ b/nmxact/example/ble_scan/ble_scan.go
@@ -70,8 +70,8 @@ func main() {
 	// Initialize the BLE transport.
 	params := nmble.NewXportCfg()
 	params.SockPath = "/tmp/blehostd-uds"
-	params.BlehostdPath = "blehostd.elf"
-	params.DevPath = "/dev/cu.usbmodem14221"
+	params.BlehostdPath = "blehostd"
+	params.DevPath = "/dev/cu.usbmodem142121"
 
 	x, err := nmble.NewBleXport(params)
 	if err != nil {
@@ -105,9 +105,6 @@ func main() {
 
 	for {
 		sc := scan.BleOmpScanCfg(scanCb)
-		sc.Ble.ScanPred = func(adv bledefs.BleAdvReport) bool {
-			return adv.Fields.Name != nil && *adv.Fields.Name == "c5"
-		}
 		if err := scanner.Start(sc); err != nil {
 			fmt.Fprintf(os.Stderr, "error starting scan: %s\n", err.Error())
 			os.Exit(1)
diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go
index 2c4b71d..e51b8e9 100644
--- a/nmxact/nmble/ble_act.go
+++ b/nmxact/nmble/ble_act.go
@@ -324,8 +324,8 @@ func exchangeMtu(x *BleXport, bl *Listener, r *BleExchangeMtuReq) (
 	}
 }
 
-func actScan(x *BleXport, bl *Listener, r *BleScanReq,
-	abortChan chan struct{}, advRptCb BleAdvRptFn) error {
+func actScan(x *BleXport, bl *Listener, r *BleScanReq, abortChan chan struct{},
+	advRptCb BleAdvRptFn) error {
 
 	const rspType = MSG_TYPE_SCAN
 
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index 52b163e..44a7908 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -48,8 +48,11 @@ const (
 	FSM_DISCONNECT_TYPE_REQUESTED
 )
 
-type BleRxDataFn func(data []byte)
-type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error)
+type BleDisconnectEntry struct {
+	Dt   BleFsmDisconnectType
+	Peer BleDev
+	Err  error
+}
 
 type BleFsmParamsCentral struct {
 	PeerDev     BleDev
@@ -58,15 +61,13 @@ type BleFsmParamsCentral struct {
 }
 
 type BleFsmParams struct {
-	Bx           *BleXport
-	OwnAddrType  BleAddrType
-	EncryptWhen  BleEncryptWhen
-	Central      BleFsmParamsCentral
-	SvcUuids     []BleUuid
-	ReqChrUuid   BleUuid
-	RspChrUuid   BleUuid
-	RxDataCb     BleRxDataFn
-	DisconnectCb BleDisconnectFn
+	Bx          *BleXport
+	OwnAddrType BleAddrType
+	EncryptWhen BleEncryptWhen
+	Central     BleFsmParamsCentral
+	SvcUuids    []BleUuid
+	ReqChrUuid  BleUuid
+	RspChrUuid  BleUuid
 }
 
 type BleFsm struct {
@@ -83,8 +84,9 @@ type BleFsm struct {
 	errFunnel  nmxutil.ErrFunnel
 	id         uint32
 
-	// Conveys changes in encrypted state.
-	encChan chan error
+	encBcast       nmxutil.Bcaster
+	disconnectChan chan BleDisconnectEntry
+	rxNmpChan      chan []byte
 }
 
 func NewBleFsm(p BleFsmParams) *BleFsm {
@@ -99,7 +101,6 @@ func NewBleFsm(p BleFsmParams) *BleFsm {
 
 	bf.errFunnel.AccumDelay = 250 * time.Millisecond
 	bf.errFunnel.LessCb = fsmErrorLess
-	bf.errFunnel.ProcCb = func(err error) { bf.processErr(err) }
 
 	return bf
 }
@@ -163,7 +164,14 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
 	}
 }
 
-func (bf *BleFsm) processErr(err error) {
+// Listens for an error in the state machine.  On error, the session is
+// considered disconnected and the error is reported to the client.
+func (bf *BleFsm) listenForError() {
+	err := <-bf.errFunnel.Wait()
+
+	// Stop listening for NMP responses.
+	close(bf.rxNmpChan)
+
 	// Remember some fields before we clear them.
 	dt := calcDisconnectType(bf.state)
 
@@ -175,8 +183,10 @@ func (bf *BleFsm) processErr(err error) {
 	// Wait for all listeners to get removed.
 	bf.rxer.WaitUntilNoListeners()
 
-	bf.errFunnel.Reset()
-	bf.params.DisconnectCb(dt, bf.peerDev, err)
+	bf.disconnectChan <- BleDisconnectEntry{dt, bf.peerDev, err}
+	close(bf.disconnectChan)
+
+	bf.disconnectChan = make(chan BleDisconnectEntry)
 }
 
 // Listens for events in the background.
@@ -214,9 +224,9 @@ func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) error {
 						log.Debugf("Connection encrypted; conn_handle=%d",
 							msg.ConnHandle)
 					}
-					if bf.encChan != nil {
-						bf.encChan <- err
-					}
+
+					// Notify any listeners of the encryption change event.
+					bf.encBcast.SendAndClear(err)
 
 				case *BleDisconnectEvt:
 					err := bf.disconnectError(msg.Reason)
@@ -259,7 +269,7 @@ func (bf *BleFsm) nmpRspListen() error {
 					if bf.nmpRspChr != nil &&
 						msg.AttrHandle == bf.nmpRspChr.ValHandle {
 
-						bf.params.RxDataCb(msg.Data.Bytes)
+						bf.rxNmpChan <- msg.Data.Bytes
 					}
 				}
 			}
@@ -437,16 +447,14 @@ func (bf *BleFsm) encInitiate() error {
 	}
 	defer bf.rxer.RemoveSeqListener("enc-initiate", r.Seq)
 
-	bf.encChan = make(chan error)
-	defer func() { bf.encChan = nil }()
-
 	// Initiate the encryption procedure.
 	if err := encInitiate(bf.params.Bx, bl, r); err != nil {
 		return err
 	}
 
 	// Block until the procedure completes.
-	return <-bf.encChan
+	itf := <-bf.encBcast.Listen()
+	return itf.(error)
 }
 
 func (bf *BleFsm) discAllChrs() error {
@@ -621,6 +629,14 @@ func (bf *BleFsm) executeState() (bool, error) {
 	return false, nil
 }
 
+func (bf *BleFsm) DisconnectChan() <-chan BleDisconnectEntry {
+	return bf.disconnectChan
+}
+
+func (bf *BleFsm) RxNmpChan() <-chan []byte {
+	return bf.rxNmpChan
+}
+
 func (bf *BleFsm) startOnce() (bool, error) {
 	if !bf.IsClosed() {
 		return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
@@ -628,15 +644,16 @@ func (bf *BleFsm) startOnce() (bool, error) {
 			bf.state))
 	}
 
-	bf.errFunnel.Start()
-
 	for {
 		retry, err := bf.executeState()
 		if err != nil {
 			bf.errFunnel.Insert(err)
-			err = bf.errFunnel.Wait()
+			err = <-bf.errFunnel.Wait()
 			return retry, err
 		} else if bf.state == SESN_STATE_DONE {
+			// We are fully connected.  Listen for errors in the background.
+			go bf.listenForError()
+
 			return false, nil
 		}
 	}
@@ -648,6 +665,9 @@ func (bf *BleFsm) startOnce() (bool, error) {
 func (bf *BleFsm) Start() error {
 	var err error
 
+	bf.disconnectChan = make(chan BleDisconnectEntry)
+	bf.rxNmpChan = make(chan []byte)
+
 	for i := 0; i < bf.params.Central.ConnTries; i++ {
 		var retry bool
 		retry, err = bf.startOnce()
@@ -656,12 +676,16 @@ func (bf *BleFsm) Start() error {
 		}
 	}
 
-	return err
+	if err != nil {
+		return err
+	}
+
+	return nil
 }
 
 // @return bool                 true if stop complete;
 //                              false if disconnect is now pending.
-func (bf *BleFsm) Stop() (bool, error) {
+func (bf *BleFsm) Stop() error {
 	state := bf.state
 
 	switch state {
@@ -669,19 +693,18 @@ func (bf *BleFsm) Stop() (bool, error) {
 		SESN_STATE_TERMINATING,
 		SESN_STATE_CONN_CANCELLING:
 
-		return false,
-			bf.closedError("Attempt to close an unopened BLE session")
+		return bf.closedError("Attempt to close an unopened BLE session")
 
 	case SESN_STATE_CONNECTING:
 		bf.connCancel()
 		bf.errFunnel.Insert(fmt.Errorf("Connection attempt cancelled"))
-		return false, nil
+		return nil
 
 	default:
 		if err := bf.terminate(); err != nil {
-			return false, err
+			return err
 		}
-		return false, nil
+		return nil
 	}
 }
 
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index 9db1c84..e1bf28b 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -2,7 +2,6 @@ package nmble
 
 import (
 	"fmt"
-	"sync"
 	"time"
 
 	"github.com/runtimeco/go-coap"
@@ -22,8 +21,7 @@ type BleOicSesn struct {
 	closeTimeout time.Duration
 	onCloseCb    sesn.OnCloseFn
 
-	closeChan chan error
-	mtx       sync.Mutex
+	closeChan chan struct{}
 }
 
 func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
@@ -52,136 +50,81 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
 		ReqChrUuid:  reqChrUuid,
 		RspChrUuid:  rspChrUuid,
 		EncryptWhen: cfg.Ble.EncryptWhen,
-		RxDataCb:    func(d []byte) { bos.onRxNmp(d) },
-		DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
-			bos.onDisconnect(dt, p, e)
-		},
 	})
 
 	return bos
 }
 
-// Returns true if a new channel was assigned.
-func (bos *BleOicSesn) setCloseChan() error {
-	bos.mtx.Lock()
-	defer bos.mtx.Unlock()
-
-	if bos.closeChan != nil {
-		return fmt.Errorf("Multiple listeners waiting for session to close")
-	}
-
-	bos.closeChan = make(chan error, 1)
-	return nil
-}
-
-func (bos *BleOicSesn) clearCloseChan() {
-	bos.mtx.Lock()
-	defer bos.mtx.Unlock()
-
-	bos.closeChan = nil
+func (bos *BleOicSesn) AbortRx(seq uint8) error {
+	return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted"))
 }
 
-func (bos *BleOicSesn) listenForClose(timeout time.Duration) error {
-	select {
-	case <-bos.closeChan:
-		return nil
-	case <-time.After(timeout):
-		// Session never closed.
-		return fmt.Errorf("Timeout while waiting for session to close")
-	}
-}
+func (bos *BleOicSesn) Open() error {
+	// This channel gets closed when the session closes.
+	bos.closeChan = make(chan struct{})
 
-func (bos *BleOicSesn) blockUntilClosed(timeout time.Duration) error {
-	if err := bos.setCloseChan(); err != nil {
+	if err := bos.bf.Start(); err != nil {
+		close(bos.closeChan)
 		return err
 	}
-	defer bos.clearCloseChan()
 
-	// If the session is already closed, we're done.
-	if bos.bf.IsClosed() {
-		return nil
-	}
-
-	// Block until close completes or times out.
-	return bos.listenForClose(timeout)
-}
-
-func (bos *BleOicSesn) AbortRx(seq uint8) error {
-	return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted"))
-}
-
-func (bos *BleOicSesn) Open() error {
 	d, err := omp.NewDispatcher(true, 3)
 	if err != nil {
+		close(bos.closeChan)
 		return err
 	}
 	bos.d = d
 
-	if err := bos.bf.Start(); err != nil {
+	// Listen for disconnect in the background.
+	go func() {
+		// Block until disconnect.
+		entry := <-bos.bf.DisconnectChan()
+
+		// Signal error to all listeners.
+		bos.d.ErrorAll(entry.Err)
 		bos.d.Stop()
-		return err
-	}
+
+		// If the session is being closed, unblock the close() call.
+		close(bos.closeChan)
+
+		// Only execute the client's disconnect callback if the disconnect was
+		// unsolicited.
+		if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
+			bos.onCloseCb(bos, entry.Err)
+		}
+	}()
+
+	// Listen for NMP responses in the background.
+	go func() {
+		for {
+			data, ok := <-bos.bf.RxNmpChan()
+			if !ok {
+				// Disconnected.
+				return
+			} else {
+				bos.d.Dispatch(data)
+			}
+		}
+	}()
+
 	return nil
 }
 
 func (bos *BleOicSesn) Close() error {
-	if err := bos.setCloseChan(); err != nil {
-		return err
-	}
-	defer bos.clearCloseChan()
-
-	done, err := bos.bf.Stop()
+	err := bos.bf.Stop()
 	if err != nil {
 		return err
 	}
 
-	if done {
-		// Close complete.
-		return nil
-	}
-
-	// Block until close completes or times out.
-	return bos.listenForClose(bos.closeTimeout)
+	// Block until close completes.
+	<-bos.closeChan
+	return nil
 }
 
 func (bos *BleOicSesn) IsOpen() bool {
 	return bos.bf.IsOpen()
 }
 
-func (bos *BleOicSesn) onRxNmp(data []byte) {
-	bos.d.Dispatch(data)
-}
-
-// Called by the FSM when a blehostd disconnect event is received.
-func (bos *BleOicSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
-	err error) {
-
-	bos.d.ErrorAll(err)
-
-	bos.mtx.Lock()
-
-	// If the session is being closed, unblock the close() call.
-	if bos.closeChan != nil {
-		bos.closeChan <- err
-	}
-
-	bos.mtx.Unlock()
-
-	// Only stop the dispatcher and execute client's disconnect callback if the
-	// disconnect was unsolicited and the session was fully open.  If the
-	// session wasn't fully open, the dispatcher will get stopped when the fsm
-	// start function returns an error (right after this function returns).
-	if dt == FSM_DISCONNECT_TYPE_OPENED || dt == FSM_DISCONNECT_TYPE_REQUESTED {
-		bos.d.Stop()
-	}
-
-	if dt == FSM_DISCONNECT_TYPE_OPENED {
-		if bos.onCloseCb != nil {
-			bos.onCloseCb(bos, err)
-		}
-	}
-}
-
 func (bos *BleOicSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
 	return omp.EncodeOmpTcp(m)
 }
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index b69d4f7..782fce0 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -16,25 +16,17 @@ type BlePlainSesn struct {
 	closeTimeout time.Duration
 	onCloseCb    sesn.OnCloseFn
 
-	closeChan chan error
+	closeChan chan struct{}
 }
 
 func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
 	bps := &BlePlainSesn{
-		d:            nmp.NewDispatcher(1),
 		closeTimeout: cfg.Ble.CloseTimeout,
 		onCloseCb:    cfg.OnCloseCb,
 	}
 
-	svcUuid, err := ParseUuid(NmpPlainSvcUuid)
-	if err != nil {
-		panic(err.Error())
-	}
-
-	chrUuid, err := ParseUuid(NmpPlainChrUuid)
-	if err != nil {
-		panic(err.Error())
-	}
+	svcUuid, _ := ParseUuid(NmpPlainSvcUuid)
+	chrUuid, _ := ParseUuid(NmpPlainChrUuid)
 
 	bps.bf = NewBleFsm(BleFsmParams{
 		Bx:          bx,
@@ -48,84 +40,83 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
 		ReqChrUuid:  chrUuid,
 		RspChrUuid:  chrUuid,
 		EncryptWhen: cfg.Ble.EncryptWhen,
-		RxDataCb:    func(d []byte) { bps.onRxNmp(d) },
-		DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
-			bps.onDisconnect(dt, p, e)
-		},
 	})
 
 	return bps
 }
 
-func (bps *BlePlainSesn) setCloseChan() error {
-	if bps.closeChan != nil {
-		return fmt.Errorf("Multiple listeners waiting for session to close")
-	}
-
-	bps.closeChan = make(chan error, 1)
-	return nil
-}
-
-func (bps *BlePlainSesn) clearCloseChan() {
-	bps.closeChan = nil
-}
-
-func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error {
-	select {
-	case <-bps.closeChan:
-		return nil
-	case <-time.After(timeout):
-		// Session never closed.
-		return fmt.Errorf("Timeout while waiting for session to close")
-	}
-}
-
 func (bps *BlePlainSesn) AbortRx(seq uint8) error {
 	return bps.d.ErrorOne(seq, fmt.Errorf("Rx aborted"))
 }
 
 func (bps *BlePlainSesn) Open() error {
-	return bps.bf.Start()
-}
+	// This channel gets closed when the session closes.
+	bps.closeChan = make(chan struct{})
 
-func (bps *BlePlainSesn) Close() error {
-	if err := bps.setCloseChan(); err != nil {
+	if err := bps.bf.Start(); err != nil {
+		close(bps.closeChan)
 		return err
 	}
-	defer bps.clearCloseChan()
 
-	done, err := bps.bf.Stop()
+	bps.d = nmp.NewDispatcher(3)
+
+	// Listen for disconnect in the background.
+	go func() {
+		// Block until disconnect.
+		entry := <-bps.bf.DisconnectChan()
+
+		// Signal error to all listeners.
+		bps.d.ErrorAll(entry.Err)
+
+		// If the session is being closed, unblock the close() call.
+		close(bps.closeChan)
+
+		// Only execute the client's disconnect callback if the disconnect was
+		// unsolicited.
+		if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil {
+			bps.onCloseCb(bps, entry.Err)
+		}
+	}()
+
+	// Listen for NMP responses in the background.
+	go func() {
+		for {
+			data, ok := <-bps.bf.RxNmpChan()
+			if !ok {
+				// Disconnected.
+				return
+			} else {
+				bps.d.Dispatch(data)
+			}
+		}
+	}()
+
+	return nil
+}
+
+func (bps *BlePlainSesn) Close() error {
+	err := bps.bf.Stop()
 	if err != nil {
 		return err
 	}
 
-	if done {
-		// Close complete.
-		return nil
-	}
-
-	// Block until close completes or times out.
-	return bps.listenForClose(bps.closeTimeout)
+	// Block until close completes.
+	<-bps.closeChan
+	return nil
 }
 
 func (bps *BlePlainSesn) IsOpen() bool {
 	return bps.bf.IsOpen()
 }
 
-func (bps *BlePlainSesn) onRxNmp(data []byte) {
-	bps.d.Dispatch(data)
-}
-
 // Called by the FSM when a blehostd disconnect event is received.
 func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
 	err error) {
 
 	bps.d.ErrorAll(err)
 
-	// If someone is waiting for the session to close, unblock them.
-	if bps.closeChan != nil {
-		bps.closeChan <- err
-	}
+	// If the session is being closed, unblock the close() call.
+	close(bps.closeChan)
 
 	// Only execute client's disconnect callback if the disconnect was
 	// unsolicited and the session was fully open.
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index db0a8e3..67b2d60 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -84,19 +84,17 @@ const (
 
 // Implements xport.Xport.
 type BleXport struct {
-	Bd                *Dispatcher
-	client            *unixchild.Client
-	state             BleXportState
-	stopChan          chan struct{}
-	numStopListeners  int
-	shutdownChan      chan bool
-	readyChan         chan error
-	numReadyListeners int
-	master            nmxutil.SingleResource
-	slave             nmxutil.SingleResource
-	randAddr          *BleAddr
-	mtx               sync.Mutex
-	scanner           *BleScanner
+	Bd           *Dispatcher
+	client       *unixchild.Client
+	state        BleXportState
+	stopChan     chan struct{}
+	shutdownChan chan bool
+	readyBcast   nmxutil.Bcaster
+	master       nmxutil.SingleResource
+	slave        nmxutil.SingleResource
+	randAddr     *BleAddr
+	stateMtx     sync.Mutex
+	scanner      *BleScanner
 
 	cfg XportCfg
 }
@@ -105,7 +103,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	bx := &BleXport{
 		Bd:           NewDispatcher(),
 		shutdownChan: make(chan bool),
-		readyChan:    make(chan error),
+		readyBcast:   nmxutil.Bcaster{},
 		master:       nmxutil.NewSingleResource(),
 		slave:        nmxutil.NewSingleResource(),
 		cfg:          cfg,
@@ -114,7 +112,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	return bx, nil
 }
 
-func (bx *BleXport) createUnixChild() {
+func (bx *BleXport) startUnixChild() error {
 	config := unixchild.Config{
 		SockPath:      bx.cfg.SockPath,
 		ChildPath:     bx.cfg.BlehostdPath,
@@ -125,6 +123,20 @@ func (bx *BleXport) createUnixChild() {
 	}
 
 	bx.client = unixchild.New(config)
+
+	if err := bx.client.Start(); err != nil {
+		if unixchild.IsUcAcceptError(err) {
+			err = nmxutil.NewXportError(
+				"blehostd did not connect to socket; " +
+					"controller not attached?")
+		} else {
+			err = nmxutil.NewXportError(
+				"Failed to start child process: " + err.Error())
+		}
+		return err
+	}
+
+	return nil
 }
 
 func (bx *BleXport) BuildScanner() (scan.Scanner, error) {
@@ -240,7 +252,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 
 	log.Debugf("Shutting down BLE transport")
 
-	bx.mtx.Lock()
+	bx.stateMtx.Lock()
 
 	var fullyStarted bool
 	var already bool
@@ -260,7 +272,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 		bx.state = BLE_XPORT_STATE_STOPPING
 	}
 
-	bx.mtx.Unlock()
+	bx.stateMtx.Unlock()
 
 	if already {
 		// Shutdown already in progress.
@@ -283,10 +295,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 	}
 
 	// Stop all of this transport's go routines.
-	log.Debugf("Waiting for BLE transport goroutines to complete")
-	for i := 0; i < bx.numStopListeners; i++ {
-		bx.stopChan <- struct{}{}
-	}
+	close(bx.stopChan)
 
 	// Stop the unixchild instance (blehostd + socket).
 	if bx.client != nil {
@@ -304,37 +313,32 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 }
 
 func (bx *BleXport) blockUntilReady() error {
-	bx.mtx.Lock()
+	var ch chan interface{}
+
+	bx.stateMtx.Lock()
 	switch bx.state {
 	case BLE_XPORT_STATE_STARTED:
 		// Already started; don't block.
-		bx.mtx.Unlock()
+		bx.stateMtx.Unlock()
 		return nil
 
 	case BLE_XPORT_STATE_DORMANT:
 		// Not in the process of starting; the user will be waiting forever.
-		bx.mtx.Unlock()
+		bx.stateMtx.Unlock()
 		return fmt.Errorf("Attempt to use BLE transport without starting it")
 
 	default:
+		ch = bx.readyBcast.Listen()
 	}
+	bx.stateMtx.Unlock()
 
-	bx.numReadyListeners++
-	bx.mtx.Unlock()
-
-	return <-bx.readyChan
-}
-
-func (bx *BleXport) notifyReadyListeners(err error) {
-	for i := 0; i < bx.numReadyListeners; i++ {
-		bx.readyChan <- err
-	}
-	bx.numReadyListeners = 0
+	itf := <-ch
+	return itf.(error)
 }
 
 func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
-	bx.mtx.Lock()
-	defer bx.mtx.Unlock()
+	bx.stateMtx.Lock()
+	defer bx.stateMtx.Unlock()
 
 	if bx.state != from {
 		return false
@@ -343,9 +347,10 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
 	bx.state = to
 	switch bx.state {
 	case BLE_XPORT_STATE_STARTED:
-		bx.notifyReadyListeners(nil)
+		bx.readyBcast.SendAndClear(nil)
 	case BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_DORMANT:
-		bx.notifyReadyListeners(nmxutil.NewXportError("BLE transport stopped"))
+		bx.readyBcast.SendAndClear(
+			nmxutil.NewXportError("BLE transport stopped"))
 	default:
 	}
 
@@ -362,32 +367,21 @@ func (bx *BleXport) startOnce() error {
 		return nmxutil.NewXportError("BLE xport started twice")
 	}
 
-	bx.stopChan = make(chan struct{})
-	bx.numStopListeners = 0
-
-	bx.createUnixChild()
-	if err := bx.client.Start(); err != nil {
-		if unixchild.IsUcAcceptError(err) {
-			err = nmxutil.NewXportError(
-				"blehostd did not connect to socket; " +
-					"controller not attached?")
-		} else {
-			err = nmxutil.NewXportError(
-				"Failed to start child process: " + err.Error())
-		}
+	if err := bx.startUnixChild(); err != nil {
 		bx.shutdown(true, err)
 		return err
 	}
 
+	bx.stopChan = make(chan struct{})
+
 	// Listen for errors and data from the blehostd process.
 	go func() {
-		bx.numStopListeners++
 		for {
 			select {
 			case err := <-bx.client.ErrChild:
 				err = nmxutil.NewXportError("BLE transport error: " +
 					err.Error())
-				go bx.shutdown(true, err)
+				bx.shutdown(true, err)
 
 			case buf := <-bx.client.FromChild:
 				if len(buf) != 0 {
@@ -433,16 +427,16 @@ func (bx *BleXport) startOnce() error {
 
 	// Host and controller are synced.  Listen for sync loss in the background.
 	go func() {
-		bx.numStopListeners++
 		for {
 			select {
 			case err := <-bl.ErrChan:
-				go bx.shutdown(true, err)
+				bx.shutdown(true, err)
+				return
 			case bm := <-bl.MsgChan:
 				switch msg := bm.(type) {
 				case *BleSyncEvt:
 					if !msg.Synced {
-						go bx.shutdown(true, nmxutil.NewXportError(
+						bx.shutdown(true, nmxutil.NewXportError(
 							"BLE host <-> controller sync lost"))
 					}
 				}
@@ -452,7 +446,7 @@ func (bx *BleXport) startOnce() error {
 		}
 	}()
 
-	// Generate a new random address is none was specified.
+	// Generate a new random address if none was specified.
 	if bx.randAddr == nil {
 		addr, err := GenRandAddrXact(bx)
 		if err != nil {
diff --git a/nmxact/nmble/discover.go b/nmxact/nmble/discover.go
index 3858e75..2ac19ba 100644
--- a/nmxact/nmble/discover.go
+++ b/nmxact/nmble/discover.go
@@ -108,7 +108,7 @@ func (d *Discoverer) Stop() error {
 		return nmxutil.NewAlreadyError("Attempt to stop inactive discoverer")
 	}
 
-	ch <- struct{}{}
+	close(ch)
 	return nil
 }
 
diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go
index ba59f81..435b123 100644
--- a/nmxact/nmble/dispatch.go
+++ b/nmxact/nmble/dispatch.go
@@ -55,7 +55,7 @@ type Listener struct {
 func NewListener() *Listener {
 	return &Listener{
 		MsgChan: make(chan Msg, 16),
-		ErrChan: make(chan error, 4),
+		ErrChan: make(chan error, 1),
 		TmoChan: make(chan time.Time, 1),
 	}
 }
@@ -70,10 +70,14 @@ func (bl *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
 	return bl.TmoChan
 }
 
-func (bl *Listener) Stop() {
+func (bl *Listener) Close() {
 	if bl.timer != nil {
 		bl.timer.Stop()
 	}
+
+	close(bl.MsgChan)
+	close(bl.ErrChan)
+	close(bl.TmoChan)
 }
 
 type Dispatcher struct {
@@ -234,7 +238,7 @@ func (d *Dispatcher) RemoveListener(base MsgBase) *Listener {
 
 	base, bl := d.findListener(base)
 	if bl != nil {
-		bl.Stop()
+		bl.Close()
 		if base.Seq != BLE_SEQ_NONE {
 			delete(d.seqMap, base.Seq)
 		} else {
@@ -264,8 +268,8 @@ func decodeMsg(data []byte) (MsgBase, Msg, error) {
 	cb := msgCtorMap[opTypePair]
 	if cb == nil {
 		return base, nil, fmt.Errorf(
-			"Unrecognized op+type pair:") // %s, %s",
-		//MsgOpToString(base.Op), MsgTypeToString(base.Type))
+			"Unrecognized op+type pair: %s, %s",
+			MsgOpToString(base.Op), MsgTypeToString(base.Type))
 	}
 
 	msg := cb()
@@ -298,6 +302,10 @@ func (d *Dispatcher) Dispatch(data []byte) {
 }
 
 func (d *Dispatcher) ErrorAll(err error) {
+	if err == nil {
+		panic("NIL ERROR")
+	}
+
 	d.mtx.Lock()
 
 	m1 := d.seqMap
@@ -310,8 +318,10 @@ func (d *Dispatcher) ErrorAll(err error) {
 
 	for _, bl := range m1 {
 		bl.ErrChan <- err
+		bl.Close()
 	}
 	for _, bl := range m2 {
 		bl.ErrChan <- err
+		bl.Close()
 	}
 }
diff --git a/nmxact/nmble/receiver.go b/nmxact/nmble/receiver.go
index d089ba8..8660393 100644
--- a/nmxact/nmble/receiver.go
+++ b/nmxact/nmble/receiver.go
@@ -94,6 +94,9 @@ func (r *Receiver) RemoveSeqListener(name string, seq BleSeq) {
 }
 
 func (r *Receiver) ErrorAll(err error) {
+	if err == nil {
+		panic("NIL ERROR")
+	}
 	r.mtx.Lock()
 	defer r.mtx.Unlock()
 
diff --git a/nmxact/nmxutil/bcast.go b/nmxact/nmxutil/bcast.go
new file mode 100644
index 0000000..a7d68c0
--- /dev/null
+++ b/nmxact/nmxutil/bcast.go
@@ -0,0 +1,43 @@
+package nmxutil
+
+import (
+	"sync"
+)
+
+type Bcaster struct {
+	chs [](chan interface{})
+	mtx sync.Mutex
+}
+
+func (b *Bcaster) Listen() chan interface{} {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+
+	ch := make(chan interface{})
+	b.chs = append(b.chs, ch)
+
+	return ch
+}
+
+func (b *Bcaster) Send(val interface{}) {
+	b.mtx.Lock()
+	chs := b.chs
+	b.mtx.Unlock()
+
+	for _, ch := range chs {
+		ch <- val
+		close(ch)
+	}
+}
+
+func (b *Bcaster) Clear() {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+
+	b.chs = nil
+}
+
+func (b *Bcaster) SendAndClear(val interface{}) {
+	b.Send(val)
+	b.Clear()
+}
diff --git a/nmxact/nmxutil/err_funnel.go b/nmxact/nmxutil/err_funnel.go
index 58b0ca2..6d6b5a3 100644
--- a/nmxact/nmxutil/err_funnel.go
+++ b/nmxact/nmxutil/err_funnel.go
@@ -1,7 +1,6 @@
 package nmxutil
 
 import (
-	"fmt"
 	"sync"
 	"time"
 )
@@ -13,26 +12,15 @@ type ErrProcFn func(err error)
 // reported.
 type ErrFunnel struct {
 	LessCb     ErrLessFn
-	ProcCb     ErrProcFn
 	AccumDelay time.Duration
 
 	mtx      sync.Mutex
 	resetMtx sync.Mutex
 	curErr   error
 	errTimer *time.Timer
-	started  bool
 	waiters  [](chan error)
 }
 
-func (f *ErrFunnel) Start() {
-	f.resetMtx.Lock()
-
-	f.mtx.Lock()
-	defer f.mtx.Unlock()
-
-	f.started = true
-}
-
 func (f *ErrFunnel) Insert(err error) {
 	if err == nil {
 		panic("ErrFunnel nil insert")
@@ -41,10 +29,6 @@ func (f *ErrFunnel) Insert(err error) {
 	f.mtx.Lock()
 	defer f.mtx.Unlock()
 
-	if !f.started {
-		panic("ErrFunnel insert without start")
-	}
-
 	if f.curErr == nil {
 		f.curErr = err
 		f.errTimer = time.AfterFunc(f.AccumDelay, func() {
@@ -61,18 +45,6 @@ func (f *ErrFunnel) Insert(err error) {
 	}
 }
 
-func (f *ErrFunnel) Reset() {
-	f.mtx.Lock()
-	defer f.mtx.Unlock()
-
-	if f.started {
-		f.started = false
-		f.curErr = nil
-		f.errTimer.Stop()
-		f.resetMtx.Unlock()
-	}
-}
-
 func (f *ErrFunnel) timerExp() {
 	f.mtx.Lock()
 
@@ -88,35 +60,18 @@ func (f *ErrFunnel) timerExp() {
 		panic("ErrFunnel timer expired but no error")
 	}
 
-	f.ProcCb(err)
-
 	for _, w := range waiters {
 		w <- err
+		close(w)
 	}
 }
 
-func (f *ErrFunnel) Wait() error {
-	var err error
-	var c chan error
+func (f *ErrFunnel) Wait() chan error {
+	c := make(chan error)
 
 	f.mtx.Lock()
-
-	if !f.started {
-		if f.curErr == nil {
-			err = fmt.Errorf("Wait on unstarted ErrFunnel")
-		} else {
-			err = f.curErr
-		}
-	} else {
-		c = make(chan error)
-		f.waiters = append(f.waiters, c)
-	}
-
+	f.waiters = append(f.waiters, c)
 	f.mtx.Unlock()
 
-	if err != nil {
-		return err
-	} else {
-		return <-c
-	}
+	return c
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.