You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/07/14 02:27:43 UTC

[mynewt-newtmgr] 01/02: nmxact - Remove some mutexes

This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git

commit cf276ea9920ba3429e84648445981402acbd9abd
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Wed Jul 12 18:33:41 2017 -0700

    nmxact - Remove some mutexes
---
 nmxact/example/ble_loop/ble_loop.go |  12 ++-
 nmxact/example/ble_scan/ble_scan.go |   7 +-
 nmxact/nmble/ble_act.go             |   4 +-
 nmxact/nmble/ble_fsm.go             |  93 ++++++++++++++---------
 nmxact/nmble/ble_oic_sesn.go        | 145 +++++++++++-------------------------
 nmxact/nmble/ble_plain_sesn.go      | 109 +++++++++++++--------------
 nmxact/nmble/ble_xport.go           | 110 +++++++++++++--------------
 nmxact/nmble/discover.go            |   2 +-
 nmxact/nmble/dispatch.go            |  20 +++--
 nmxact/nmble/receiver.go            |   3 +
 nmxact/nmxutil/bcast.go             |  43 +++++++++++
 nmxact/nmxutil/err_funnel.go        |  55 ++------------
 12 files changed, 284 insertions(+), 319 deletions(-)

diff --git a/nmxact/example/ble_loop/ble_loop.go b/nmxact/example/ble_loop/ble_loop.go
index 7a930a9..9e79d29 100644
--- a/nmxact/example/ble_loop/ble_loop.go
+++ b/nmxact/example/ble_loop/ble_loop.go
@@ -26,8 +26,11 @@ import (
 	"syscall"
 	"time"
 
+	log "github.com/Sirupsen/logrus"
+
 	"mynewt.apache.org/newtmgr/nmxact/bledefs"
 	"mynewt.apache.org/newtmgr/nmxact/nmble"
+	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
 	"mynewt.apache.org/newtmgr/nmxact/xact"
 	"mynewt.apache.org/newtmgr/nmxact/xport"
@@ -58,11 +61,14 @@ func configExitHandler(x xport.Xport, s sesn.Sesn) {
 }
 
 func main() {
+	//nmxutil.SetLogLevel(log.DebugLevel)
+	nmxutil.SetLogLevel(log.InfoLevel)
+
 	// Initialize the BLE transport.
 	params := nmble.NewXportCfg()
 	params.SockPath = "/tmp/blehostd-uds"
-	params.BlehostdPath = "blehostd.elf"
-	params.DevPath = "/dev/cu.usbmodem142111"
+	params.BlehostdPath = "blehostd"
+	params.DevPath = "/dev/cu.usbmodem142121"
 
 	x, err := nmble.NewBleXport(params)
 	if err != nil {
@@ -83,7 +89,7 @@ func main() {
 	//     * Peer has name "nimble-bleprph"
 	//     * We use a random address.
 	dev, err := nmble.DiscoverDeviceWithName(
-		x, bledefs.BLE_ADDR_TYPE_RANDOM, 10*time.Second, "nimble-bleprph")
+		x, bledefs.BLE_ADDR_TYPE_RANDOM, 10*time.Second, "c4")
 	if err != nil {
 		fmt.Fprintf(os.Stderr, "error discovering device: %s\n", err.Error())
 		os.Exit(1)
diff --git a/nmxact/example/ble_scan/ble_scan.go b/nmxact/example/ble_scan/ble_scan.go
index 73ca24f..3a01146 100644
--- a/nmxact/example/ble_scan/ble_scan.go
+++ b/nmxact/example/ble_scan/ble_scan.go
@@ -70,8 +70,8 @@ func main() {
 	// Initialize the BLE transport.
 	params := nmble.NewXportCfg()
 	params.SockPath = "/tmp/blehostd-uds"
-	params.BlehostdPath = "blehostd.elf"
-	params.DevPath = "/dev/cu.usbmodem14221"
+	params.BlehostdPath = "blehostd"
+	params.DevPath = "/dev/cu.usbmodem142121"
 
 	x, err := nmble.NewBleXport(params)
 	if err != nil {
@@ -105,9 +105,6 @@ func main() {
 
 	for {
 		sc := scan.BleOmpScanCfg(scanCb)
-		sc.Ble.ScanPred = func(adv bledefs.BleAdvReport) bool {
-			return adv.Fields.Name != nil && *adv.Fields.Name == "c5"
-		}
 		if err := scanner.Start(sc); err != nil {
 			fmt.Fprintf(os.Stderr, "error starting scan: %s\n", err.Error())
 			os.Exit(1)
diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go
index 2c4b71d..e51b8e9 100644
--- a/nmxact/nmble/ble_act.go
+++ b/nmxact/nmble/ble_act.go
@@ -324,8 +324,8 @@ func exchangeMtu(x *BleXport, bl *Listener, r *BleExchangeMtuReq) (
 	}
 }
 
-func actScan(x *BleXport, bl *Listener, r *BleScanReq,
-	abortChan chan struct{}, advRptCb BleAdvRptFn) error {
+func actScan(x *BleXport, bl *Listener, r *BleScanReq, abortChan chan struct{},
+	advRptCb BleAdvRptFn) error {
 
 	const rspType = MSG_TYPE_SCAN
 
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index 52b163e..44a7908 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -48,8 +48,11 @@ const (
 	FSM_DISCONNECT_TYPE_REQUESTED
 )
 
-type BleRxDataFn func(data []byte)
-type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error)
+type BleDisconnectEntry struct {
+	Dt   BleFsmDisconnectType
+	Peer BleDev
+	Err  error
+}
 
 type BleFsmParamsCentral struct {
 	PeerDev     BleDev
@@ -58,15 +61,13 @@ type BleFsmParamsCentral struct {
 }
 
 type BleFsmParams struct {
-	Bx           *BleXport
-	OwnAddrType  BleAddrType
-	EncryptWhen  BleEncryptWhen
-	Central      BleFsmParamsCentral
-	SvcUuids     []BleUuid
-	ReqChrUuid   BleUuid
-	RspChrUuid   BleUuid
-	RxDataCb     BleRxDataFn
-	DisconnectCb BleDisconnectFn
+	Bx          *BleXport
+	OwnAddrType BleAddrType
+	EncryptWhen BleEncryptWhen
+	Central     BleFsmParamsCentral
+	SvcUuids    []BleUuid
+	ReqChrUuid  BleUuid
+	RspChrUuid  BleUuid
 }
 
 type BleFsm struct {
@@ -83,8 +84,9 @@ type BleFsm struct {
 	errFunnel  nmxutil.ErrFunnel
 	id         uint32
 
-	// Conveys changes in encrypted state.
-	encChan chan error
+	encBcast       nmxutil.Bcaster
+	disconnectChan chan BleDisconnectEntry
+	rxNmpChan      chan []byte
 }
 
 func NewBleFsm(p BleFsmParams) *BleFsm {
@@ -99,7 +101,6 @@ func NewBleFsm(p BleFsmParams) *BleFsm {
 
 	bf.errFunnel.AccumDelay = 250 * time.Millisecond
 	bf.errFunnel.LessCb = fsmErrorLess
-	bf.errFunnel.ProcCb = func(err error) { bf.processErr(err) }
 
 	return bf
 }
@@ -163,7 +164,14 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
 	}
 }
 
-func (bf *BleFsm) processErr(err error) {
+// Listens for an error in the state machine.  On error, the session is
+// considered disconnected and the error is reported to the client.
+func (bf *BleFsm) listenForError() {
+	err := <-bf.errFunnel.Wait()
+
+	// Stop listening for NMP responses.
+	close(bf.rxNmpChan)
+
 	// Remember some fields before we clear them.
 	dt := calcDisconnectType(bf.state)
 
@@ -175,8 +183,10 @@ func (bf *BleFsm) processErr(err error) {
 	// Wait for all listeners to get removed.
 	bf.rxer.WaitUntilNoListeners()
 
-	bf.errFunnel.Reset()
-	bf.params.DisconnectCb(dt, bf.peerDev, err)
+	bf.disconnectChan <- BleDisconnectEntry{dt, bf.peerDev, err}
+	close(bf.disconnectChan)
+
+	bf.disconnectChan = make(chan BleDisconnectEntry)
 }
 
 // Listens for events in the background.
@@ -214,9 +224,9 @@ func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) error {
 						log.Debugf("Connection encrypted; conn_handle=%d",
 							msg.ConnHandle)
 					}
-					if bf.encChan != nil {
-						bf.encChan <- err
-					}
+
+					// Notify any listeners of the encryption change event.
+					bf.encBcast.SendAndClear(err)
 
 				case *BleDisconnectEvt:
 					err := bf.disconnectError(msg.Reason)
@@ -259,7 +269,7 @@ func (bf *BleFsm) nmpRspListen() error {
 					if bf.nmpRspChr != nil &&
 						msg.AttrHandle == bf.nmpRspChr.ValHandle {
 
-						bf.params.RxDataCb(msg.Data.Bytes)
+						bf.rxNmpChan <- msg.Data.Bytes
 					}
 				}
 			}
@@ -437,16 +447,14 @@ func (bf *BleFsm) encInitiate() error {
 	}
 	defer bf.rxer.RemoveSeqListener("enc-initiate", r.Seq)
 
-	bf.encChan = make(chan error)
-	defer func() { bf.encChan = nil }()
-
 	// Initiate the encryption procedure.
 	if err := encInitiate(bf.params.Bx, bl, r); err != nil {
 		return err
 	}
 
 	// Block until the procedure completes.
-	return <-bf.encChan
+	itf := <-bf.encBcast.Listen()
+	return itf.(error)
 }
 
 func (bf *BleFsm) discAllChrs() error {
@@ -621,6 +629,14 @@ func (bf *BleFsm) executeState() (bool, error) {
 	return false, nil
 }
 
+func (bf *BleFsm) DisconnectChan() <-chan BleDisconnectEntry {
+	return bf.disconnectChan
+}
+
+func (bf *BleFsm) RxNmpChan() <-chan []byte {
+	return bf.rxNmpChan
+}
+
 func (bf *BleFsm) startOnce() (bool, error) {
 	if !bf.IsClosed() {
 		return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
@@ -628,15 +644,16 @@ func (bf *BleFsm) startOnce() (bool, error) {
 			bf.state))
 	}
 
-	bf.errFunnel.Start()
-
 	for {
 		retry, err := bf.executeState()
 		if err != nil {
 			bf.errFunnel.Insert(err)
-			err = bf.errFunnel.Wait()
+			err = <-bf.errFunnel.Wait()
 			return retry, err
 		} else if bf.state == SESN_STATE_DONE {
+			// We are fully connected.  Listen for errors in the background.
+			go bf.listenForError()
+
 			return false, nil
 		}
 	}
@@ -648,6 +665,9 @@ func (bf *BleFsm) startOnce() (bool, error) {
 func (bf *BleFsm) Start() error {
 	var err error
 
+	bf.disconnectChan = make(chan BleDisconnectEntry)
+	bf.rxNmpChan = make(chan []byte)
+
 	for i := 0; i < bf.params.Central.ConnTries; i++ {
 		var retry bool
 		retry, err = bf.startOnce()
@@ -656,12 +676,16 @@ func (bf *BleFsm) Start() error {
 		}
 	}
 
-	return err
+	if err != nil {
+		return err
+	}
+
+	return nil
 }
 
 // @return bool                 true if stop complete;
 //                              false if disconnect is now pending.
-func (bf *BleFsm) Stop() (bool, error) {
+func (bf *BleFsm) Stop() error {
 	state := bf.state
 
 	switch state {
@@ -669,19 +693,18 @@ func (bf *BleFsm) Stop() (bool, error) {
 		SESN_STATE_TERMINATING,
 		SESN_STATE_CONN_CANCELLING:
 
-		return false,
-			bf.closedError("Attempt to close an unopened BLE session")
+		return bf.closedError("Attempt to close an unopened BLE session")
 
 	case SESN_STATE_CONNECTING:
 		bf.connCancel()
 		bf.errFunnel.Insert(fmt.Errorf("Connection attempt cancelled"))
-		return false, nil
+		return nil
 
 	default:
 		if err := bf.terminate(); err != nil {
-			return false, err
+			return err
 		}
-		return false, nil
+		return nil
 	}
 }
 
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index 9db1c84..e1bf28b 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -2,7 +2,6 @@ package nmble
 
 import (
 	"fmt"
-	"sync"
 	"time"
 
 	"github.com/runtimeco/go-coap"
@@ -22,8 +21,7 @@ type BleOicSesn struct {
 	closeTimeout time.Duration
 	onCloseCb    sesn.OnCloseFn
 
-	closeChan chan error
-	mtx       sync.Mutex
+	closeChan chan struct{}
 }
 
 func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
@@ -52,136 +50,81 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
 		ReqChrUuid:  reqChrUuid,
 		RspChrUuid:  rspChrUuid,
 		EncryptWhen: cfg.Ble.EncryptWhen,
-		RxDataCb:    func(d []byte) { bos.onRxNmp(d) },
-		DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
-			bos.onDisconnect(dt, p, e)
-		},
 	})
 
 	return bos
 }
 
-// Returns true if a new channel was assigned.
-func (bos *BleOicSesn) setCloseChan() error {
-	bos.mtx.Lock()
-	defer bos.mtx.Unlock()
-
-	if bos.closeChan != nil {
-		return fmt.Errorf("Multiple listeners waiting for session to close")
-	}
-
-	bos.closeChan = make(chan error, 1)
-	return nil
-}
-
-func (bos *BleOicSesn) clearCloseChan() {
-	bos.mtx.Lock()
-	defer bos.mtx.Unlock()
-
-	bos.closeChan = nil
+func (bos *BleOicSesn) AbortRx(seq uint8) error {
+	return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted"))
 }
 
-func (bos *BleOicSesn) listenForClose(timeout time.Duration) error {
-	select {
-	case <-bos.closeChan:
-		return nil
-	case <-time.After(timeout):
-		// Session never closed.
-		return fmt.Errorf("Timeout while waiting for session to close")
-	}
-}
+func (bos *BleOicSesn) Open() error {
+	// This channel gets closed when the session closes.
+	bos.closeChan = make(chan struct{})
 
-func (bos *BleOicSesn) blockUntilClosed(timeout time.Duration) error {
-	if err := bos.setCloseChan(); err != nil {
+	if err := bos.bf.Start(); err != nil {
+		close(bos.closeChan)
 		return err
 	}
-	defer bos.clearCloseChan()
 
-	// If the session is already closed, we're done.
-	if bos.bf.IsClosed() {
-		return nil
-	}
-
-	// Block until close completes or times out.
-	return bos.listenForClose(timeout)
-}
-
-func (bos *BleOicSesn) AbortRx(seq uint8) error {
-	return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted"))
-}
-
-func (bos *BleOicSesn) Open() error {
 	d, err := omp.NewDispatcher(true, 3)
 	if err != nil {
+		close(bos.closeChan)
 		return err
 	}
 	bos.d = d
 
-	if err := bos.bf.Start(); err != nil {
+	// Listen for disconnect in the background.
+	go func() {
+		// Block until disconnect.
+		entry := <-bos.bf.DisconnectChan()
+
+		// Signal error to all listeners.
+		bos.d.ErrorAll(entry.Err)
 		bos.d.Stop()
-		return err
-	}
+
+		// If the session is being closed, unblock the close() call.
+		close(bos.closeChan)
+
+		// Only execute the client's disconnect callback if the disconnect was
+		// unsolicited.
+		if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
+			bos.onCloseCb(bos, entry.Err)
+		}
+	}()
+
+	// Listen for NMP responses in the background.
+	go func() {
+		for {
+			data, ok := <-bos.bf.RxNmpChan()
+			if !ok {
+				// Disconnected.
+				return
+			} else {
+				bos.d.Dispatch(data)
+			}
+		}
+	}()
+
 	return nil
 }
 
 func (bos *BleOicSesn) Close() error {
-	if err := bos.setCloseChan(); err != nil {
-		return err
-	}
-	defer bos.clearCloseChan()
-
-	done, err := bos.bf.Stop()
+	err := bos.bf.Stop()
 	if err != nil {
 		return err
 	}
 
-	if done {
-		// Close complete.
-		return nil
-	}
-
-	// Block until close completes or times out.
-	return bos.listenForClose(bos.closeTimeout)
+	// Block until close completes.
+	<-bos.closeChan
+	return nil
 }
 
 func (bos *BleOicSesn) IsOpen() bool {
 	return bos.bf.IsOpen()
 }
 
-func (bos *BleOicSesn) onRxNmp(data []byte) {
-	bos.d.Dispatch(data)
-}
-
-// Called by the FSM when a blehostd disconnect event is received.
-func (bos *BleOicSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
-	err error) {
-
-	bos.d.ErrorAll(err)
-
-	bos.mtx.Lock()
-
-	// If the session is being closed, unblock the close() call.
-	if bos.closeChan != nil {
-		bos.closeChan <- err
-	}
-
-	bos.mtx.Unlock()
-
-	// Only stop the dispatcher and execute client's disconnect callback if the
-	// disconnect was unsolicited and the session was fully open.  If the
-	// session wasn't fully open, the dispatcher will get stopped when the fsm
-	// start function returns an error (right after this function returns).
-	if dt == FSM_DISCONNECT_TYPE_OPENED || dt == FSM_DISCONNECT_TYPE_REQUESTED {
-		bos.d.Stop()
-	}
-
-	if dt == FSM_DISCONNECT_TYPE_OPENED {
-		if bos.onCloseCb != nil {
-			bos.onCloseCb(bos, err)
-		}
-	}
-}
-
 func (bos *BleOicSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
 	return omp.EncodeOmpTcp(m)
 }
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index b69d4f7..782fce0 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -16,25 +16,17 @@ type BlePlainSesn struct {
 	closeTimeout time.Duration
 	onCloseCb    sesn.OnCloseFn
 
-	closeChan chan error
+	closeChan chan struct{}
 }
 
 func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
 	bps := &BlePlainSesn{
-		d:            nmp.NewDispatcher(1),
 		closeTimeout: cfg.Ble.CloseTimeout,
 		onCloseCb:    cfg.OnCloseCb,
 	}
 
-	svcUuid, err := ParseUuid(NmpPlainSvcUuid)
-	if err != nil {
-		panic(err.Error())
-	}
-
-	chrUuid, err := ParseUuid(NmpPlainChrUuid)
-	if err != nil {
-		panic(err.Error())
-	}
+	svcUuid, _ := ParseUuid(NmpPlainSvcUuid)
+	chrUuid, _ := ParseUuid(NmpPlainChrUuid)
 
 	bps.bf = NewBleFsm(BleFsmParams{
 		Bx:          bx,
@@ -48,84 +40,83 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
 		ReqChrUuid:  chrUuid,
 		RspChrUuid:  chrUuid,
 		EncryptWhen: cfg.Ble.EncryptWhen,
-		RxDataCb:    func(d []byte) { bps.onRxNmp(d) },
-		DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
-			bps.onDisconnect(dt, p, e)
-		},
 	})
 
 	return bps
 }
 
-func (bps *BlePlainSesn) setCloseChan() error {
-	if bps.closeChan != nil {
-		return fmt.Errorf("Multiple listeners waiting for session to close")
-	}
-
-	bps.closeChan = make(chan error, 1)
-	return nil
-}
-
-func (bps *BlePlainSesn) clearCloseChan() {
-	bps.closeChan = nil
-}
-
-func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error {
-	select {
-	case <-bps.closeChan:
-		return nil
-	case <-time.After(timeout):
-		// Session never closed.
-		return fmt.Errorf("Timeout while waiting for session to close")
-	}
-}
-
 func (bps *BlePlainSesn) AbortRx(seq uint8) error {
 	return bps.d.ErrorOne(seq, fmt.Errorf("Rx aborted"))
 }
 
 func (bps *BlePlainSesn) Open() error {
-	return bps.bf.Start()
-}
+	// This channel gets closed when the session closes.
+	bps.closeChan = make(chan struct{})
 
-func (bps *BlePlainSesn) Close() error {
-	if err := bps.setCloseChan(); err != nil {
+	if err := bps.bf.Start(); err != nil {
+		close(bps.closeChan)
 		return err
 	}
-	defer bps.clearCloseChan()
 
-	done, err := bps.bf.Stop()
+	bps.d = nmp.NewDispatcher(3)
+
+	// Listen for disconnect in the background.
+	go func() {
+		// Block until disconnect.
+		entry := <-bps.bf.DisconnectChan()
+
+		// Signal error to all listeners.
+		bps.d.ErrorAll(entry.Err)
+
+		// If the session is being closed, unblock the close() call.
+		close(bps.closeChan)
+
+		// Only execute the client's disconnect callback if the disconnect was
+		// unsolicited.
+		if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil {
+			bps.onCloseCb(bps, entry.Err)
+		}
+	}()
+
+	// Listen for NMP responses in the background.
+	go func() {
+		for {
+			data, ok := <-bps.bf.RxNmpChan()
+			if !ok {
+				// Disconnected.
+				return
+			} else {
+				bps.d.Dispatch(data)
+			}
+		}
+	}()
+
+	return nil
+}
+
+func (bps *BlePlainSesn) Close() error {
+	err := bps.bf.Stop()
 	if err != nil {
 		return err
 	}
 
-	if done {
-		// Close complete.
-		return nil
-	}
-
-	// Block until close completes or times out.
-	return bps.listenForClose(bps.closeTimeout)
+	// Block until close completes.
+	<-bps.closeChan
+	return nil
 }
 
 func (bps *BlePlainSesn) IsOpen() bool {
 	return bps.bf.IsOpen()
 }
 
-func (bps *BlePlainSesn) onRxNmp(data []byte) {
-	bps.d.Dispatch(data)
-}
-
 // Called by the FSM when a blehostd disconnect event is received.
 func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
 	err error) {
 
 	bps.d.ErrorAll(err)
 
-	// If someone is waiting for the session to close, unblock them.
-	if bps.closeChan != nil {
-		bps.closeChan <- err
-	}
+	// If the session is being closed, unblock the close() call.
+	close(bps.closeChan)
 
 	// Only execute client's disconnect callback if the disconnect was
 	// unsolicited and the session was fully open.
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index db0a8e3..67b2d60 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -84,19 +84,17 @@ const (
 
 // Implements xport.Xport.
 type BleXport struct {
-	Bd                *Dispatcher
-	client            *unixchild.Client
-	state             BleXportState
-	stopChan          chan struct{}
-	numStopListeners  int
-	shutdownChan      chan bool
-	readyChan         chan error
-	numReadyListeners int
-	master            nmxutil.SingleResource
-	slave             nmxutil.SingleResource
-	randAddr          *BleAddr
-	mtx               sync.Mutex
-	scanner           *BleScanner
+	Bd           *Dispatcher
+	client       *unixchild.Client
+	state        BleXportState
+	stopChan     chan struct{}
+	shutdownChan chan bool
+	readyBcast   nmxutil.Bcaster
+	master       nmxutil.SingleResource
+	slave        nmxutil.SingleResource
+	randAddr     *BleAddr
+	stateMtx     sync.Mutex
+	scanner      *BleScanner
 
 	cfg XportCfg
 }
@@ -105,7 +103,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	bx := &BleXport{
 		Bd:           NewDispatcher(),
 		shutdownChan: make(chan bool),
-		readyChan:    make(chan error),
+		readyBcast:   nmxutil.Bcaster{},
 		master:       nmxutil.NewSingleResource(),
 		slave:        nmxutil.NewSingleResource(),
 		cfg:          cfg,
@@ -114,7 +112,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	return bx, nil
 }
 
-func (bx *BleXport) createUnixChild() {
+func (bx *BleXport) startUnixChild() error {
 	config := unixchild.Config{
 		SockPath:      bx.cfg.SockPath,
 		ChildPath:     bx.cfg.BlehostdPath,
@@ -125,6 +123,20 @@ func (bx *BleXport) createUnixChild() {
 	}
 
 	bx.client = unixchild.New(config)
+
+	if err := bx.client.Start(); err != nil {
+		if unixchild.IsUcAcceptError(err) {
+			err = nmxutil.NewXportError(
+				"blehostd did not connect to socket; " +
+					"controller not attached?")
+		} else {
+			err = nmxutil.NewXportError(
+				"Failed to start child process: " + err.Error())
+		}
+		return err
+	}
+
+	return nil
 }
 
 func (bx *BleXport) BuildScanner() (scan.Scanner, error) {
@@ -240,7 +252,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 
 	log.Debugf("Shutting down BLE transport")
 
-	bx.mtx.Lock()
+	bx.stateMtx.Lock()
 
 	var fullyStarted bool
 	var already bool
@@ -260,7 +272,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 		bx.state = BLE_XPORT_STATE_STOPPING
 	}
 
-	bx.mtx.Unlock()
+	bx.stateMtx.Unlock()
 
 	if already {
 		// Shutdown already in progress.
@@ -283,10 +295,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 	}
 
 	// Stop all of this transport's go routines.
-	log.Debugf("Waiting for BLE transport goroutines to complete")
-	for i := 0; i < bx.numStopListeners; i++ {
-		bx.stopChan <- struct{}{}
-	}
+	close(bx.stopChan)
 
 	// Stop the unixchild instance (blehostd + socket).
 	if bx.client != nil {
@@ -304,37 +313,32 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 }
 
 func (bx *BleXport) blockUntilReady() error {
-	bx.mtx.Lock()
+	var ch chan interface{}
+
+	bx.stateMtx.Lock()
 	switch bx.state {
 	case BLE_XPORT_STATE_STARTED:
 		// Already started; don't block.
-		bx.mtx.Unlock()
+		bx.stateMtx.Unlock()
 		return nil
 
 	case BLE_XPORT_STATE_DORMANT:
 		// Not in the process of starting; the user will be waiting forever.
-		bx.mtx.Unlock()
+		bx.stateMtx.Unlock()
 		return fmt.Errorf("Attempt to use BLE transport without starting it")
 
 	default:
+		ch = bx.readyBcast.Listen()
 	}
+	bx.stateMtx.Unlock()
 
-	bx.numReadyListeners++
-	bx.mtx.Unlock()
-
-	return <-bx.readyChan
-}
-
-func (bx *BleXport) notifyReadyListeners(err error) {
-	for i := 0; i < bx.numReadyListeners; i++ {
-		bx.readyChan <- err
-	}
-	bx.numReadyListeners = 0
+	itf := <-ch
+	return itf.(error)
 }
 
 func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
-	bx.mtx.Lock()
-	defer bx.mtx.Unlock()
+	bx.stateMtx.Lock()
+	defer bx.stateMtx.Unlock()
 
 	if bx.state != from {
 		return false
@@ -343,9 +347,10 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
 	bx.state = to
 	switch bx.state {
 	case BLE_XPORT_STATE_STARTED:
-		bx.notifyReadyListeners(nil)
+		bx.readyBcast.SendAndClear(nil)
 	case BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_DORMANT:
-		bx.notifyReadyListeners(nmxutil.NewXportError("BLE transport stopped"))
+		bx.readyBcast.SendAndClear(
+			nmxutil.NewXportError("BLE transport stopped"))
 	default:
 	}
 
@@ -362,32 +367,21 @@ func (bx *BleXport) startOnce() error {
 		return nmxutil.NewXportError("BLE xport started twice")
 	}
 
-	bx.stopChan = make(chan struct{})
-	bx.numStopListeners = 0
-
-	bx.createUnixChild()
-	if err := bx.client.Start(); err != nil {
-		if unixchild.IsUcAcceptError(err) {
-			err = nmxutil.NewXportError(
-				"blehostd did not connect to socket; " +
-					"controller not attached?")
-		} else {
-			err = nmxutil.NewXportError(
-				"Failed to start child process: " + err.Error())
-		}
+	if err := bx.startUnixChild(); err != nil {
 		bx.shutdown(true, err)
 		return err
 	}
 
+	bx.stopChan = make(chan struct{})
+
 	// Listen for errors and data from the blehostd process.
 	go func() {
-		bx.numStopListeners++
 		for {
 			select {
 			case err := <-bx.client.ErrChild:
 				err = nmxutil.NewXportError("BLE transport error: " +
 					err.Error())
-				go bx.shutdown(true, err)
+				bx.shutdown(true, err)
 
 			case buf := <-bx.client.FromChild:
 				if len(buf) != 0 {
@@ -433,16 +427,16 @@ func (bx *BleXport) startOnce() error {
 
 	// Host and controller are synced.  Listen for sync loss in the background.
 	go func() {
-		bx.numStopListeners++
 		for {
 			select {
 			case err := <-bl.ErrChan:
-				go bx.shutdown(true, err)
+				bx.shutdown(true, err)
+				return
 			case bm := <-bl.MsgChan:
 				switch msg := bm.(type) {
 				case *BleSyncEvt:
 					if !msg.Synced {
-						go bx.shutdown(true, nmxutil.NewXportError(
+						bx.shutdown(true, nmxutil.NewXportError(
 							"BLE host <-> controller sync lost"))
 					}
 				}
@@ -452,7 +446,7 @@ func (bx *BleXport) startOnce() error {
 		}
 	}()
 
-	// Generate a new random address is none was specified.
+	// Generate a new random address if none was specified.
 	if bx.randAddr == nil {
 		addr, err := GenRandAddrXact(bx)
 		if err != nil {
diff --git a/nmxact/nmble/discover.go b/nmxact/nmble/discover.go
index 3858e75..2ac19ba 100644
--- a/nmxact/nmble/discover.go
+++ b/nmxact/nmble/discover.go
@@ -108,7 +108,7 @@ func (d *Discoverer) Stop() error {
 		return nmxutil.NewAlreadyError("Attempt to stop inactive discoverer")
 	}
 
-	ch <- struct{}{}
+	close(ch)
 	return nil
 }
 
diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go
index ba59f81..435b123 100644
--- a/nmxact/nmble/dispatch.go
+++ b/nmxact/nmble/dispatch.go
@@ -55,7 +55,7 @@ type Listener struct {
 func NewListener() *Listener {
 	return &Listener{
 		MsgChan: make(chan Msg, 16),
-		ErrChan: make(chan error, 4),
+		ErrChan: make(chan error, 1),
 		TmoChan: make(chan time.Time, 1),
 	}
 }
@@ -70,10 +70,14 @@ func (bl *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
 	return bl.TmoChan
 }
 
-func (bl *Listener) Stop() {
+func (bl *Listener) Close() {
 	if bl.timer != nil {
 		bl.timer.Stop()
 	}
+
+	close(bl.MsgChan)
+	close(bl.ErrChan)
+	close(bl.TmoChan)
 }
 
 type Dispatcher struct {
@@ -234,7 +238,7 @@ func (d *Dispatcher) RemoveListener(base MsgBase) *Listener {
 
 	base, bl := d.findListener(base)
 	if bl != nil {
-		bl.Stop()
+		bl.Close()
 		if base.Seq != BLE_SEQ_NONE {
 			delete(d.seqMap, base.Seq)
 		} else {
@@ -264,8 +268,8 @@ func decodeMsg(data []byte) (MsgBase, Msg, error) {
 	cb := msgCtorMap[opTypePair]
 	if cb == nil {
 		return base, nil, fmt.Errorf(
-			"Unrecognized op+type pair:") // %s, %s",
-		//MsgOpToString(base.Op), MsgTypeToString(base.Type))
+			"Unrecognized op+type pair: %s, %s",
+			MsgOpToString(base.Op), MsgTypeToString(base.Type))
 	}
 
 	msg := cb()
@@ -298,6 +302,10 @@ func (d *Dispatcher) Dispatch(data []byte) {
 }
 
 func (d *Dispatcher) ErrorAll(err error) {
+	if err == nil {
+		panic("NIL ERROR")
+	}
+
 	d.mtx.Lock()
 
 	m1 := d.seqMap
@@ -310,8 +318,10 @@ func (d *Dispatcher) ErrorAll(err error) {
 
 	for _, bl := range m1 {
 		bl.ErrChan <- err
+		bl.Close()
 	}
 	for _, bl := range m2 {
 		bl.ErrChan <- err
+		bl.Close()
 	}
 }
diff --git a/nmxact/nmble/receiver.go b/nmxact/nmble/receiver.go
index d089ba8..8660393 100644
--- a/nmxact/nmble/receiver.go
+++ b/nmxact/nmble/receiver.go
@@ -94,6 +94,9 @@ func (r *Receiver) RemoveSeqListener(name string, seq BleSeq) {
 }
 
 func (r *Receiver) ErrorAll(err error) {
+	if err == nil {
+		panic("NIL ERROR")
+	}
 	r.mtx.Lock()
 	defer r.mtx.Unlock()
 
diff --git a/nmxact/nmxutil/bcast.go b/nmxact/nmxutil/bcast.go
new file mode 100644
index 0000000..a7d68c0
--- /dev/null
+++ b/nmxact/nmxutil/bcast.go
@@ -0,0 +1,43 @@
+package nmxutil
+
+import (
+	"sync"
+)
+
+type Bcaster struct {
+	chs [](chan interface{})
+	mtx sync.Mutex
+}
+
+func (b *Bcaster) Listen() chan interface{} {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+
+	ch := make(chan interface{})
+	b.chs = append(b.chs, ch)
+
+	return ch
+}
+
+func (b *Bcaster) Send(val interface{}) {
+	b.mtx.Lock()
+	chs := b.chs
+	b.mtx.Unlock()
+
+	for _, ch := range chs {
+		ch <- val
+		close(ch)
+	}
+}
+
+func (b *Bcaster) Clear() {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+
+	b.chs = nil
+}
+
+func (b *Bcaster) SendAndClear(val interface{}) {
+	b.Send(val)
+	b.Clear()
+}
diff --git a/nmxact/nmxutil/err_funnel.go b/nmxact/nmxutil/err_funnel.go
index 58b0ca2..6d6b5a3 100644
--- a/nmxact/nmxutil/err_funnel.go
+++ b/nmxact/nmxutil/err_funnel.go
@@ -1,7 +1,6 @@
 package nmxutil
 
 import (
-	"fmt"
 	"sync"
 	"time"
 )
@@ -13,26 +12,15 @@ type ErrProcFn func(err error)
 // reported.
 type ErrFunnel struct {
 	LessCb     ErrLessFn
-	ProcCb     ErrProcFn
 	AccumDelay time.Duration
 
 	mtx      sync.Mutex
 	resetMtx sync.Mutex
 	curErr   error
 	errTimer *time.Timer
-	started  bool
 	waiters  [](chan error)
 }
 
-func (f *ErrFunnel) Start() {
-	f.resetMtx.Lock()
-
-	f.mtx.Lock()
-	defer f.mtx.Unlock()
-
-	f.started = true
-}
-
 func (f *ErrFunnel) Insert(err error) {
 	if err == nil {
 		panic("ErrFunnel nil insert")
@@ -41,10 +29,6 @@ func (f *ErrFunnel) Insert(err error) {
 	f.mtx.Lock()
 	defer f.mtx.Unlock()
 
-	if !f.started {
-		panic("ErrFunnel insert without start")
-	}
-
 	if f.curErr == nil {
 		f.curErr = err
 		f.errTimer = time.AfterFunc(f.AccumDelay, func() {
@@ -61,18 +45,6 @@ func (f *ErrFunnel) Insert(err error) {
 	}
 }
 
-func (f *ErrFunnel) Reset() {
-	f.mtx.Lock()
-	defer f.mtx.Unlock()
-
-	if f.started {
-		f.started = false
-		f.curErr = nil
-		f.errTimer.Stop()
-		f.resetMtx.Unlock()
-	}
-}
-
 func (f *ErrFunnel) timerExp() {
 	f.mtx.Lock()
 
@@ -88,35 +60,18 @@ func (f *ErrFunnel) timerExp() {
 		panic("ErrFunnel timer expired but no error")
 	}
 
-	f.ProcCb(err)
-
 	for _, w := range waiters {
 		w <- err
+		close(w)
 	}
 }
 
-func (f *ErrFunnel) Wait() error {
-	var err error
-	var c chan error
+func (f *ErrFunnel) Wait() chan error {
+	c := make(chan error)
 
 	f.mtx.Lock()
-
-	if !f.started {
-		if f.curErr == nil {
-			err = fmt.Errorf("Wait on unstarted ErrFunnel")
-		} else {
-			err = f.curErr
-		}
-	} else {
-		c = make(chan error)
-		f.waiters = append(f.waiters, c)
-	}
-
+	f.waiters = append(f.waiters, c)
 	f.mtx.Unlock()
 
-	if err != nil {
-		return err
-	} else {
-		return <-c
-	}
+	return c
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.