You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/07/15 00:44:00 UTC

[mynewt-newtmgr] 01/02: nmxact - Clarify channel owners.

This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git

commit 05156e0c75ec6e710c7ea0bf89db7e31df3cbbd1
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Fri Jul 14 17:41:17 2017 -0700

    nmxact - Clarify channel owners.
---
 nmxact/nmble/ble_fsm.go        |  6 +++---
 nmxact/nmble/ble_oic_sesn.go   |  7 ++++---
 nmxact/nmble/ble_plain_sesn.go | 33 ++++++++++++++-------------------
 nmxact/nmble/ble_util.go       | 20 ++++++++++----------
 nmxact/nmble/ble_xport.go      | 27 +++++++++++++++++----------
 nmxact/nmble/discover.go       |  8 ++++----
 nmxact/nmble/dispatch.go       |  2 ++
 nmxact/nmble/receiver.go       | 28 ++++++++++++++--------------
 nmxact/nmp/dispatch.go         | 10 ++++++++--
 nmxact/oic/dispatch.go         | 17 ++++++++++++++++-
 nmxact/omp/dispatch.go         |  8 ++++++++
 11 files changed, 100 insertions(+), 66 deletions(-)

diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index 4bf20b3..6721eb0 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -169,7 +169,7 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
 
 func (bf *BleFsm) shutdown(err error) {
 	bf.params.Bx.StopWaitingForMaster(bf, err)
-	bf.rxer.ErrorAll(err)
+	bf.rxer.RemoveAll("shutdown")
 
 	bf.state = SESN_STATE_UNCONNECTED
 
@@ -202,8 +202,8 @@ func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) error {
 	bf.wg.Add(1)
 
 	go func() {
-		defer bf.rxer.RemoveSeqListener("connect", seq)
 		defer bf.wg.Done()
+		defer bf.rxer.RemoveSeqListener("connect", seq)
 
 		for {
 			select {
@@ -269,8 +269,8 @@ func (bf *BleFsm) nmpRspListen() error {
 	bf.wg.Add(1)
 
 	go func() {
-		defer bf.rxer.RemoveBaseListener("nmp-rsp", base)
 		defer bf.wg.Done()
+		defer bf.rxer.RemoveBaseListener("nmp-rsp", base)
 
 		for {
 			select {
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index 8964865..bbdafb1 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -80,8 +80,12 @@ func (bos *BleOicSesn) Open() error {
 	// Listen for disconnect in the background.
 	bos.wg.Add(1)
 	go func() {
+		// If the session is being closed, unblock the close() call.
+		defer close(bos.closeChan)
+
 		// Block until disconnect.
 		<-bos.bf.DisconnectChan()
+		nmxutil.Assert(!bos.IsOpen())
 		pd := bos.bf.PrevDisconnect()
 
 		// Signal error to all listeners.
@@ -90,9 +94,6 @@ func (bos *BleOicSesn) Open() error {
 		bos.wg.Done()
 		bos.wg.Wait()
 
-		// If the session is being closed, unblock the close() call.
-		close(bos.closeChan)
-
 		// Only execute the client's disconnect callback if the disconnect was
 		// unsolicited.
 		if pd.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index 0d77d63..9268c3d 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -2,11 +2,13 @@ package nmble
 
 import (
 	"fmt"
+	"sync"
 	"time"
 
 	"mynewt.apache.org/newt/util"
 	. "mynewt.apache.org/newtmgr/nmxact/bledefs"
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
+	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
 )
 
@@ -15,6 +17,7 @@ type BlePlainSesn struct {
 	d            *nmp.Dispatcher
 	closeTimeout time.Duration
 	onCloseCb    sesn.OnCloseFn
+	wg           sync.WaitGroup
 
 	closeChan chan struct{}
 }
@@ -61,16 +64,21 @@ func (bps *BlePlainSesn) Open() error {
 	bps.d = nmp.NewDispatcher(3)
 
 	// Listen for disconnect in the background.
+	bps.wg.Add(1)
 	go func() {
+		// If the session is being closed, unblock the close() call.
+		defer close(bps.closeChan)
+
 		// Block until disconnect.
 		<-bps.bf.DisconnectChan()
+		nmxutil.Assert(!bps.IsOpen())
+
 		pd := bps.bf.PrevDisconnect()
 
 		// Signal error to all listeners.
 		bps.d.ErrorAll(pd.Err)
-
-		// If the session is being closed, unblock the close() call.
-		close(bps.closeChan)
+		bps.wg.Done()
+		bps.wg.Wait()
 
 		// Only execute the client's disconnect callback if the disconnect was
 		// unsolicited.
@@ -80,7 +88,10 @@ func (bps *BlePlainSesn) Open() error {
 	}()
 
 	// Listen for NMP responses in the background.
+	bps.wg.Add(1)
 	go func() {
+		defer bps.wg.Done()
+
 		for {
 			data, ok := <-bps.bf.RxNmpChan()
 			if !ok {
@@ -110,22 +121,6 @@ func (bps *BlePlainSesn) IsOpen() bool {
 	return bps.bf.IsOpen()
 }
 
-// Called by the FSM when a blehostd disconnect event is received.
-func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
-	err error) {
-
-	bps.d.ErrorAll(err)
-
-	// If the session is being closed, unblock the close() call.
-	close(bps.closeChan)
-
-	// Only execute client's disconnect callback if the disconnect was
-	// unsolicited and the session was fully open.
-	if dt == FSM_DISCONNECT_TYPE_OPENED && bps.onCloseCb != nil {
-		bps.onCloseCb(bps, err)
-	}
-}
-
 func (bps *BlePlainSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
 	return nmp.EncodeNmpPlain(m)
 }
diff --git a/nmxact/nmble/ble_util.go b/nmxact/nmble/ble_util.go
index b6d2eea..3550e33 100644
--- a/nmxact/nmble/ble_util.go
+++ b/nmxact/nmble/ble_util.go
@@ -305,10 +305,10 @@ func ConnFindXact(x *BleXport, connHandle uint16) (BleConnDesc, error) {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return BleConnDesc{}, err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return connFind(x, bl, r)
 }
@@ -323,10 +323,10 @@ func GenRandAddrXact(x *BleXport) (BleAddr, error) {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return BleAddr{}, err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return genRandAddr(x, bl, r)
 }
@@ -343,10 +343,10 @@ func SetRandAddrXact(x *BleXport, addr BleAddr) error {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return setRandAddr(x, bl, r)
 }
@@ -363,10 +363,10 @@ func SetPreferredMtuXact(x *BleXport, mtu uint16) error {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return setPreferredMtu(x, bl, r)
 }
@@ -382,10 +382,10 @@ func ResetXact(x *BleXport) error {
 	}
 
 	bl := NewListener()
-	if err := x.Bd.AddListener(base, bl); err != nil {
+	if err := x.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer x.Bd.RemoveListener(base)
+	defer x.RemoveListener(base)
 
 	return reset(x, bl, r)
 }
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index 1a8f45e..188a78f 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -84,7 +84,8 @@ const (
 
 // Implements xport.Xport.
 type BleXport struct {
-	Bd           *Dispatcher
+	cfg          XportCfg
+	d            *Dispatcher
 	client       *unixchild.Client
 	state        BleXportState
 	stopChan     chan struct{}
@@ -95,13 +96,11 @@ type BleXport struct {
 	randAddr     *BleAddr
 	stateMtx     sync.Mutex
 	scanner      *BleScanner
-
-	cfg XportCfg
 }
 
 func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	bx := &BleXport{
-		Bd:           NewDispatcher(),
+		d:            NewDispatcher(),
 		shutdownChan: make(chan bool),
 		readyBcast:   nmxutil.Bcaster{},
 		master:       nmxutil.NewSingleResource(),
@@ -171,7 +170,7 @@ func (bx *BleXport) addSyncListener() (*Listener, error) {
 		Seq:        BLE_SEQ_NONE,
 		ConnHandle: -1,
 	}
-	if err := bx.Bd.AddListener(base, bl); err != nil {
+	if err := bx.d.AddListener(base, bl); err != nil {
 		return nil, err
 	}
 
@@ -185,7 +184,7 @@ func (bx *BleXport) removeSyncListener() {
 		Seq:        BLE_SEQ_NONE,
 		ConnHandle: -1,
 	}
-	bx.Bd.RemoveListener(base)
+	bx.d.RemoveListener(base)
 }
 
 func (bx *BleXport) querySyncStatus() (bool, error) {
@@ -207,10 +206,10 @@ func (bx *BleXport) querySyncStatus() (bool, error) {
 		Seq:        req.Seq,
 		ConnHandle: -1,
 	}
-	if err := bx.Bd.AddListener(base, bl); err != nil {
+	if err := bx.d.AddListener(base, bl); err != nil {
 		return false, err
 	}
-	defer bx.Bd.RemoveListener(base)
+	defer bx.d.RemoveListener(base)
 
 	if err := bx.txNoSync(j); err != nil {
 		return false, err
@@ -286,7 +285,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 	// Indicate an error to all of this transport's listeners.  This prevents
 	// them from blocking endlessly while awaiting a BLE message.
 	log.Debugf("Stopping BLE dispatcher")
-	bx.Bd.ErrorAll(err)
+	bx.d.ErrorAll(err)
 
 	synced, err := bx.querySyncStatus()
 	if err == nil && synced {
@@ -386,7 +385,7 @@ func (bx *BleXport) startOnce() error {
 			case buf := <-bx.client.FromChild:
 				if len(buf) != 0 {
 					log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
-					bx.Bd.Dispatch(buf)
+					bx.d.Dispatch(buf)
 				}
 
 			case <-bx.stopChan:
@@ -542,6 +541,14 @@ func (bx *BleXport) Tx(data []byte) error {
 	return bx.txNoSync(data)
 }
 
+func (bx *BleXport) AddListener(base MsgBase, listener *Listener) error {
+	return bx.d.AddListener(base, listener)
+}
+
+func (bx *BleXport) RemoveListener(base MsgBase) *Listener {
+	return bx.d.RemoveListener(base)
+}
+
 func (bx *BleXport) RspTimeout() time.Duration {
 	return bx.cfg.BlehostdRspTimeout
 }
diff --git a/nmxact/nmble/discover.go b/nmxact/nmble/discover.go
index 2ac19ba..c8d4a3f 100644
--- a/nmxact/nmble/discover.go
+++ b/nmxact/nmble/discover.go
@@ -40,10 +40,10 @@ func (d *Discoverer) scanCancel() error {
 	}
 
 	bl := NewListener()
-	if err := d.params.Bx.Bd.AddListener(base, bl); err != nil {
+	if err := d.params.Bx.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer d.params.Bx.Bd.RemoveListener(base)
+	defer d.params.Bx.RemoveListener(base)
 
 	if err := scanCancel(d.params.Bx, bl, r); err != nil {
 		return err
@@ -79,10 +79,10 @@ func (d *Discoverer) Start(advRptCb BleAdvRptFn) error {
 	}
 
 	bl := NewListener()
-	if err := d.params.Bx.Bd.AddListener(base, bl); err != nil {
+	if err := d.params.Bx.AddListener(base, bl); err != nil {
 		return err
 	}
-	defer d.params.Bx.Bd.RemoveListener(base)
+	defer d.params.Bx.RemoveListener(base)
 
 	d.abortChan = make(chan struct{}, 1)
 	defer func() { d.abortChan = nil }()
diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go
index 435b123..76bd18c 100644
--- a/nmxact/nmble/dispatch.go
+++ b/nmxact/nmble/dispatch.go
@@ -80,6 +80,8 @@ func (bl *Listener) Close() {
 	close(bl.TmoChan)
 }
 
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	seqMap  map[BleSeq]*Listener
 	baseMap map[MsgBase]*Listener
diff --git a/nmxact/nmble/receiver.go b/nmxact/nmble/receiver.go
index 8660393..231ad5e 100644
--- a/nmxact/nmble/receiver.go
+++ b/nmxact/nmble/receiver.go
@@ -6,11 +6,15 @@ import (
 	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 )
 
+// The receiver never writes to any of its listeners.  It only maintains a set
+// of listeners so that their lifetimes can be tracked and to facilitate their
+// removal from the BLE transport.
+
 type Receiver struct {
 	id       uint32
 	bx       *BleXport
 	logDepth int
-	bls      map[*Listener]struct{}
+	bls      map[*Listener]MsgBase
 	mtx      sync.Mutex
 	wg       sync.WaitGroup
 }
@@ -20,7 +24,7 @@ func NewReceiver(id uint32, bx *BleXport, logDepth int) *Receiver {
 		id:       id,
 		bx:       bx,
 		logDepth: logDepth + 3,
-		bls:      map[*Listener]struct{}{},
+		bls:      map[*Listener]MsgBase{},
 	}
 }
 
@@ -34,11 +38,11 @@ func (r *Receiver) addListener(name string, base MsgBase) (
 	r.mtx.Lock()
 	defer r.mtx.Unlock()
 
-	if err := r.bx.Bd.AddListener(base, bl); err != nil {
+	if err := r.bx.AddListener(base, bl); err != nil {
 		return nil, err
 	}
 
-	r.bls[bl] = struct{}{}
+	r.bls[bl] = base
 	r.wg.Add(1)
 
 	return bl, nil
@@ -68,7 +72,7 @@ func (r *Receiver) removeListener(name string, base MsgBase) *Listener {
 	r.mtx.Lock()
 	defer r.mtx.Unlock()
 
-	bl := r.bx.Bd.RemoveListener(base)
+	bl := r.bx.RemoveListener(base)
 	delete(r.bls, bl)
 
 	if bl != nil {
@@ -93,18 +97,14 @@ func (r *Receiver) RemoveSeqListener(name string, seq BleSeq) {
 	r.removeListener(name, base)
 }
 
-func (r *Receiver) ErrorAll(err error) {
-	if err == nil {
-		panic("NIL ERROR")
-	}
+func (r *Receiver) RemoveAll(name string) {
 	r.mtx.Lock()
-	defer r.mtx.Unlock()
-
 	bls := r.bls
-	r.bls = map[*Listener]struct{}{}
+	r.bls = map[*Listener]MsgBase{}
+	r.mtx.Unlock()
 
-	for bl, _ := range bls {
-		bl.ErrChan <- err
+	for _, base := range bls {
+		r.removeListener(name, base)
 	}
 }
 
diff --git a/nmxact/nmp/dispatch.go b/nmxact/nmp/dispatch.go
index fee372e..9051b5b 100644
--- a/nmxact/nmp/dispatch.go
+++ b/nmxact/nmp/dispatch.go
@@ -53,12 +53,18 @@ func (nl *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
 	return nl.tmoChan
 }
 
-func (nl *Listener) Stop() {
+func (nl *Listener) Close() {
 	if nl.timer != nil {
 		nl.timer.Stop()
 	}
+
+	close(nl.RspChan)
+	close(nl.ErrChan)
+	close(nl.tmoChan)
 }
 
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	seqListenerMap map[uint8]*Listener
 	reassembler    *Reassembler
@@ -97,7 +103,7 @@ func (d *Dispatcher) RemoveListener(seq uint8) *Listener {
 
 	nl := d.seqListenerMap[seq]
 	if nl != nil {
-		nl.Stop()
+		nl.Close()
 		delete(d.seqListenerMap, seq)
 	}
 	return nl
diff --git a/nmxact/oic/dispatch.go b/nmxact/oic/dispatch.go
index aacc277..810abe7 100644
--- a/nmxact/oic/dispatch.go
+++ b/nmxact/oic/dispatch.go
@@ -72,6 +72,18 @@ func (ol *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
 	return ol.tmoChan
 }
 
+func (ol *Listener) Close() {
+	if ol.timer != nil {
+		ol.timer.Stop()
+	}
+
+	close(ol.RspChan)
+	close(ol.ErrChan)
+	close(ol.tmoChan)
+}
+
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	tokenListenerMap map[Token]*Listener
 	reassembler      *Reassembler
@@ -124,7 +136,10 @@ func (d *Dispatcher) RemoveListener(token []byte) *Listener {
 	}
 
 	ol := d.tokenListenerMap[ot]
-	delete(d.tokenListenerMap, ot)
+	if ol != nil {
+		ol.Close()
+		delete(d.tokenListenerMap, ot)
+	}
 
 	return ol
 }
diff --git a/nmxact/omp/dispatch.go b/nmxact/omp/dispatch.go
index d0ba7d4..7647048 100644
--- a/nmxact/omp/dispatch.go
+++ b/nmxact/omp/dispatch.go
@@ -20,16 +20,21 @@
 package omp
 
 import (
+	"sync"
+
 	log "github.com/Sirupsen/logrus"
 
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
 	"mynewt.apache.org/newtmgr/nmxact/oic"
 )
 
+// The dispatcher is the owner of the listeners it points to.  Only the
+// dispatcher writes to these listeners.
 type Dispatcher struct {
 	nmpd   *nmp.Dispatcher
 	oicd   *oic.Dispatcher
 	stopCh chan struct{}
+	wg     sync.WaitGroup
 }
 
 func NewDispatcher(isTcp bool, logDepth int) (*Dispatcher, error) {
@@ -56,8 +61,10 @@ func (r *Dispatcher) addOmpListener() error {
 		return err
 	}
 
+	r.wg.Add(1)
 	go func() {
 		defer r.RemoveOicListener(nil)
+		defer r.wg.Done()
 
 		for {
 			select {
@@ -83,6 +90,7 @@ func (r *Dispatcher) addOmpListener() error {
 
 func (r *Dispatcher) Stop() {
 	r.stopCh <- struct{}{}
+	r.wg.Wait()
 }
 
 func (r *Dispatcher) Dispatch(data []byte) bool {

-- 
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.