You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/03/30 01:51:39 UTC

[2/3] incubator-mynewt-newtmgr git commit: nmxact - Reconnect on immediate spvtmo

nmxact - Reconnect on immediate spvtmo

A supervision timeout is much more likely to happen immediately after a
connection is established.  This change hides these spurious failures
from the user by automatically retrying the attempt to open the session
(default: 3 tries).


Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/4a3a72aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/4a3a72aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/4a3a72aa

Branch: refs/heads/master
Commit: 4a3a72aaf11a7346a4e0ca5537af122759f7f741
Parents: f02ca2a
Author: Christopher Collins <cc...@apache.org>
Authored: Wed Mar 29 18:49:04 2017 -0700
Committer: Christopher Collins <cc...@apache.org>
Committed: Wed Mar 29 18:51:13 2017 -0700

----------------------------------------------------------------------
 nmxact/nmble/ble_fsm.go        | 70 ++++++++++++++++++++-------
 nmxact/nmble/ble_oic_sesn.go   | 85 +++++++++++++++++++++++----------
 nmxact/nmble/ble_plain_sesn.go | 94 +++++++++++++++++++++++++++----------
 nmxact/sesn/sesn_cfg.go        |  2 +
 4 files changed, 185 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/4a3a72aa/nmxact/nmble/ble_fsm.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index cfdb760..cf12bdc 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -14,10 +14,10 @@ import (
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
 )
 
-type BleSesnState int32
-
 const DFLT_ATT_MTU = 23
 
+type BleSesnState int32
+
 const (
 	SESN_STATE_UNCONNECTED     BleSesnState = 0
 	SESN_STATE_SCANNING                     = 1
@@ -33,8 +33,17 @@ const (
 	SESN_STATE_CONN_CANCELLING              = 11
 )
 
+type BleFsmDisconnectType int
+
+const (
+	FSM_DISCONNECT_TYPE_UNOPENED BleFsmDisconnectType = iota
+	FSM_DISCONNECT_TYPE_IMMEDIATE_TIMEOUT
+	FSM_DISCONNECT_TYPE_OPENED
+	FSM_DISCONNECT_TYPE_REQUESTED
+)
+
 type BleRxNmpFn func(data []byte)
-type BleDisconnectFn func(peer BleDev, err error)
+type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error)
 
 type BleFsmParams struct {
 	Bx           *BleXport
@@ -191,6 +200,22 @@ func (bf *BleFsm) action(
 	return nil
 }
 
+func (bf *BleFsm) calcDisconnectType() BleFsmDisconnectType {
+	switch bf.getState() {
+	case SESN_STATE_EXCHANGING_MTU:
+		return FSM_DISCONNECT_TYPE_IMMEDIATE_TIMEOUT
+
+	case SESN_STATE_DISCOVERED_CHR:
+		return FSM_DISCONNECT_TYPE_OPENED
+
+	case SESN_STATE_TERMINATING, SESN_STATE_CONN_CANCELLING:
+		return FSM_DISCONNECT_TYPE_REQUESTED
+
+	default:
+		return FSM_DISCONNECT_TYPE_UNOPENED
+	}
+}
+
 func (bf *BleFsm) connectListen(seq int) error {
 	bf.connChan = make(chan error, 1)
 
@@ -265,14 +290,18 @@ func (bf *BleFsm) connectListen(seq int) error {
 					}
 					bf.mtx.Unlock()
 
+					// Remember some fields before we clear them.
+					dt := bf.calcDisconnectType()
+					peer := *bf.peerDev
+
+					bf.setState(SESN_STATE_UNCONNECTED)
+					bf.peerDev = nil
+
 					for _, bl := range bls {
 						bl.ErrChan <- err
 					}
 
-					bf.setState(SESN_STATE_UNCONNECTED)
-					peer := *bf.peerDev
-					bf.peerDev = nil
-					bf.params.DisconnectCb(peer, err)
+					bf.params.DisconnectCb(dt, peer, err)
 					return
 
 				default:
@@ -601,9 +630,12 @@ func (bf *BleFsm) tryFillPeerDev() bool {
 	return false
 }
 
-func (bf *BleFsm) Start() error {
+// @return bool                 Whether another start attempt should be made;
+//         error                The error that caused the start attempt to
+//                                  fail; nil on success.
+func (bf *BleFsm) Start() (bool, error) {
 	if bf.getState() != SESN_STATE_UNCONNECTED {
-		return nmxutil.NewSesnAlreadyOpenError(
+		return false, nmxutil.NewSesnAlreadyOpenError(
 			"Attempt to open an already-open BLE session")
 	}
 
@@ -639,7 +671,7 @@ func (bf *BleFsm) Start() error {
 			}
 
 			if err != nil {
-				return err
+				return false, err
 			}
 
 		case SESN_STATE_CONNECTED:
@@ -650,7 +682,9 @@ func (bf *BleFsm) Start() error {
 				SESN_STATE_EXCHANGED_MTU,
 				cb)
 			if err != nil {
-				return err
+				bhe := nmxutil.ToBleHost(err)
+				retry := bhe != nil && bhe.Status == ERR_CODE_ENOTCONN
+				return retry, err
 			}
 
 		case SESN_STATE_EXCHANGED_MTU:
@@ -661,7 +695,7 @@ func (bf *BleFsm) Start() error {
 				SESN_STATE_DISCOVERED_SVC,
 				cb)
 			if err != nil {
-				return err
+				return false, err
 			}
 
 		case SESN_STATE_DISCOVERED_SVC:
@@ -675,22 +709,22 @@ func (bf *BleFsm) Start() error {
 				SESN_STATE_DISCOVERED_CHR,
 				cb)
 			if err != nil {
-				return err
+				return false, err
 			}
 
 			if err := bf.subscribe(); err != nil {
-				return err
+				return false, err
 			}
 
 		case SESN_STATE_DISCOVERED_CHR:
 			/* Open complete. */
-			return nil
+			return false, nil
 
 		case SESN_STATE_CONNECTING,
 			SESN_STATE_DISCOVERING_SVC,
 			SESN_STATE_DISCOVERING_CHR,
 			SESN_STATE_TERMINATING:
-			return fmt.Errorf("BleFsm already being opened")
+			return false, fmt.Errorf("BleFsm already being opened")
 		}
 	}
 }
@@ -726,6 +760,10 @@ func (bf *BleFsm) IsOpen() bool {
 	return bf.getState() == SESN_STATE_DISCOVERED_CHR
 }
 
+func (bf *BleFsm) IsClosed() bool {
+	return bf.getState() == SESN_STATE_UNCONNECTED
+}
+
 func (bf *BleFsm) TxNmp(payload []byte, nl *nmp.NmpListener,
 	timeout time.Duration) (nmp.NmpRsp, error) {
 

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/4a3a72aa/nmxact/nmble/ble_oic_sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index 365efc6..4001960 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -5,17 +5,18 @@ import (
 	"sync"
 	"time"
 
+	"mynewt.apache.org/newt/util"
 	. "mynewt.apache.org/newtmgr/nmxact/bledefs"
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
 	"mynewt.apache.org/newtmgr/nmxact/omp"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
-	"mynewt.apache.org/newt/util"
 )
 
 type BleOicSesn struct {
 	bf           *BleFsm
 	nls          map[*nmp.NmpListener]struct{}
 	od           *omp.OmpDispatcher
+	connTries    int
 	closeTimeout time.Duration
 	onCloseCb    sesn.BleOnCloseFn
 
@@ -27,6 +28,7 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
 	bos := &BleOicSesn{
 		nls:          map[*nmp.NmpListener]struct{}{},
 		od:           omp.NewOmpDispatcher(),
+		connTries:    cfg.Ble.ConnTries,
 		closeTimeout: cfg.Ble.CloseTimeout,
 		onCloseCb:    cfg.Ble.OnCloseCb,
 	}
@@ -47,14 +49,16 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
 	}
 
 	bos.bf = NewBleFsm(BleFsmParams{
-		Bx:           bx,
-		OwnAddrType:  cfg.Ble.OwnAddrType,
-		PeerSpec:     cfg.Ble.PeerSpec,
-		SvcUuid:      svcUuid,
-		ReqChrUuid:   reqChrUuid,
-		RspChrUuid:   rspChrUuid,
-		RxNmpCb:      func(d []byte) { bos.onRxNmp(d) },
-		DisconnectCb: func(p BleDev, e error) { bos.onDisconnect(p, e) },
+		Bx:          bx,
+		OwnAddrType: cfg.Ble.OwnAddrType,
+		PeerSpec:    cfg.Ble.PeerSpec,
+		SvcUuid:     svcUuid,
+		ReqChrUuid:  reqChrUuid,
+		RspChrUuid:  rspChrUuid,
+		RxNmpCb:     func(d []byte) { bos.onRxNmp(d) },
+		DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
+			bos.onDisconnect(dt, p, e)
+		},
 	})
 
 	return bos
@@ -80,16 +84,16 @@ func (bos *BleOicSesn) removeNmpListener(seq uint8) {
 }
 
 // Returns true if a new channel was assigned.
-func (bos *BleOicSesn) setCloseChan() bool {
+func (bos *BleOicSesn) setCloseChan() error {
 	bos.mx.Lock()
 	defer bos.mx.Unlock()
 
 	if bos.closeChan != nil {
-		return false
+		return fmt.Errorf("Multiple listeners waiting for session to close")
 	}
 
 	bos.closeChan = make(chan error, 1)
-	return true
+	return nil
 }
 
 func (bos *BleOicSesn) clearCloseChan() {
@@ -99,18 +103,51 @@ func (bos *BleOicSesn) clearCloseChan() {
 	bos.closeChan = nil
 }
 
+func (bos *BleOicSesn) listenForClose(timeout time.Duration) error {
+	select {
+	case <-bos.closeChan:
+		return nil
+	case <-time.After(timeout):
+		// Session never closed.
+		return fmt.Errorf("Timeout while waiting for session to close")
+	}
+}
+
+func (bos *BleOicSesn) blockUntilClosed(timeout time.Duration) error {
+	if err := bos.setCloseChan(); err != nil {
+		return err
+	}
+	defer bos.clearCloseChan()
+
+	// If the session is already closed, we're done.
+	if bos.bf.IsClosed() {
+		return nil
+	}
+
+	// Block until close completes or timeout.
+	return bos.listenForClose(timeout)
+}
+
 func (bos *BleOicSesn) AbortRx(seq uint8) error {
 	return bos.od.FakeRxError(seq, fmt.Errorf("Rx aborted"))
 }
 
 func (bos *BleOicSesn) Open() error {
-	return bos.bf.Start()
+	var err error
+	for i := 0; i < bos.connTries; i++ {
+		var retry bool
+		retry, err = bos.bf.Start()
+		if !retry {
+			break
+		}
+	}
+
+	return err
 }
 
 func (bos *BleOicSesn) Close() error {
-	if !bos.setCloseChan() {
-		return bos.bf.closedError(
-			"Attempt to close an unopened BLE session")
+	if err := bos.setCloseChan(); err != nil {
+		return err
 	}
 	defer bos.clearCloseChan()
 
@@ -125,12 +162,7 @@ func (bos *BleOicSesn) Close() error {
 	}
 
 	// Block until close completes or timeout.
-	select {
-	case <-bos.closeChan:
-	case <-time.After(bos.closeTimeout):
-	}
-
-	return nil
+	return bos.listenForClose(bos.closeTimeout)
 }
 
 func (bos *BleOicSesn) IsOpen() bool {
@@ -141,7 +173,9 @@ func (bos *BleOicSesn) onRxNmp(data []byte) {
 	bos.od.Dispatch(data)
 }
 
-func (bos *BleOicSesn) onDisconnect(peer BleDev, err error) {
+func (bos *BleOicSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
+	err error) {
+
 	for nl, _ := range bos.nls {
 		nl.ErrChan <- err
 	}
@@ -150,7 +184,10 @@ func (bos *BleOicSesn) onDisconnect(peer BleDev, err error) {
 	if bos.closeChan != nil {
 		bos.closeChan <- err
 	}
-	if bos.onCloseCb != nil {
+
+	// Only execute client's disconnect callback if the disconnect was
+	// unsolicited and the session was fully open.
+	if dt == FSM_DISCONNECT_TYPE_OPENED && bos.onCloseCb != nil {
 		bos.onCloseCb(bos, peer, err)
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/4a3a72aa/nmxact/nmble/ble_plain_sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index a8a46d6..012a301 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -5,16 +5,17 @@ import (
 	"sync"
 	"time"
 
+	"mynewt.apache.org/newt/util"
 	. "mynewt.apache.org/newtmgr/nmxact/bledefs"
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
-	"mynewt.apache.org/newt/util"
 )
 
 type BlePlainSesn struct {
 	bf           *BleFsm
 	nls          map[*nmp.NmpListener]struct{}
 	nd           *nmp.NmpDispatcher
+	connTries    int
 	closeTimeout time.Duration
 	onCloseCb    sesn.BleOnCloseFn
 
@@ -26,6 +27,7 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
 	bps := &BlePlainSesn{
 		nls:          map[*nmp.NmpListener]struct{}{},
 		nd:           nmp.NewNmpDispatcher(),
+		connTries:    cfg.Ble.ConnTries,
 		closeTimeout: cfg.Ble.CloseTimeout,
 		onCloseCb:    cfg.Ble.OnCloseCb,
 	}
@@ -41,14 +43,16 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
 	}
 
 	bps.bf = NewBleFsm(BleFsmParams{
-		Bx:           bx,
-		OwnAddrType:  cfg.Ble.OwnAddrType,
-		PeerSpec:     cfg.Ble.PeerSpec,
-		SvcUuid:      svcUuid,
-		ReqChrUuid:   chrUuid,
-		RspChrUuid:   chrUuid,
-		RxNmpCb:      func(d []byte) { bps.onRxNmp(d) },
-		DisconnectCb: func(p BleDev, e error) { bps.onDisconnect(p, e) },
+		Bx:          bx,
+		OwnAddrType: cfg.Ble.OwnAddrType,
+		PeerSpec:    cfg.Ble.PeerSpec,
+		SvcUuid:     svcUuid,
+		ReqChrUuid:  chrUuid,
+		RspChrUuid:  chrUuid,
+		RxNmpCb:     func(d []byte) { bps.onRxNmp(d) },
+		DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
+			bps.onDisconnect(dt, p, e)
+		},
 	})
 
 	return bps
@@ -73,17 +77,16 @@ func (bps *BlePlainSesn) removeNmpListener(seq uint8) {
 	}
 }
 
-// Returns true if a new channel was assigned.
-func (bps *BlePlainSesn) setCloseChan() bool {
+func (bps *BlePlainSesn) setCloseChan() error {
 	bps.mx.Lock()
 	defer bps.mx.Unlock()
 
 	if bps.closeChan != nil {
-		return false
+		return fmt.Errorf("Multiple listeners waiting for session to close")
 	}
 
 	bps.closeChan = make(chan error, 1)
-	return true
+	return nil
 }
 
 func (bps *BlePlainSesn) clearCloseChan() {
@@ -93,18 +96,57 @@ func (bps *BlePlainSesn) clearCloseChan() {
 	bps.closeChan = nil
 }
 
+func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error {
+	select {
+	case <-bps.closeChan:
+		return nil
+	case <-time.After(timeout):
+		// Session never closed.
+		return fmt.Errorf("Timeout while waiting for session to close")
+	}
+}
+
+func (bps *BlePlainSesn) blockUntilClosed(timeout time.Duration) error {
+	if err := bps.setCloseChan(); err != nil {
+		return err
+	}
+	defer bps.clearCloseChan()
+
+	// If the session is already closed, we're done.
+	if bps.bf.IsClosed() {
+		return nil
+	}
+
+	// Block until close completes or timeout.
+	return bps.listenForClose(timeout)
+}
+
 func (bps *BlePlainSesn) AbortRx(seq uint8) error {
 	return bps.nd.FakeRxError(seq, fmt.Errorf("Rx aborted"))
 }
 
 func (bps *BlePlainSesn) Open() error {
-	return bps.bf.Start()
+	var err error
+	for i := 0; i < bps.connTries; i++ {
+		var retry bool
+		retry, err = bps.bf.Start()
+		if !retry {
+			break
+		}
+
+		if bps.blockUntilClosed(1*time.Second) != nil {
+			// Just close the session manually and report the original error.
+			bps.Close()
+			return err
+		}
+	}
+
+	return err
 }
 
 func (bps *BlePlainSesn) Close() error {
-	if !bps.setCloseChan() {
-		return bps.bf.closedError(
-			"Attempt to close an unopened BLE session")
+	if err := bps.setCloseChan(); err != nil {
+		return err
 	}
 	defer bps.clearCloseChan()
 
@@ -119,12 +161,7 @@ func (bps *BlePlainSesn) Close() error {
 	}
 
 	// Block until close completes or timeout.
-	select {
-	case <-bps.closeChan:
-	case <-time.After(bps.closeTimeout):
-	}
-
-	return nil
+	return bps.listenForClose(bps.closeTimeout)
 }
 
 func (bps *BlePlainSesn) IsOpen() bool {
@@ -135,16 +172,21 @@ func (bps *BlePlainSesn) onRxNmp(data []byte) {
 	bps.nd.Dispatch(data)
 }
 
-func (bps *BlePlainSesn) onDisconnect(peer BleDev, err error) {
+func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
+	err error) {
+
 	for nl, _ := range bps.nls {
 		nl.ErrChan <- err
 	}
 
-	// If the session is being closed, unblock the close() call.
+	// If someone is waiting for the session to close, unblock them.
 	if bps.closeChan != nil {
 		bps.closeChan <- err
 	}
-	if bps.onCloseCb != nil {
+
+	// Only execute client's disconnect callback if the disconnect was
+	// unsolicited and the session was fully open.
+	if dt == FSM_DISCONNECT_TYPE_OPENED && bps.onCloseCb != nil {
 		bps.onCloseCb(bps, peer, err)
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/4a3a72aa/nmxact/sesn/sesn_cfg.go
----------------------------------------------------------------------
diff --git a/nmxact/sesn/sesn_cfg.go b/nmxact/sesn/sesn_cfg.go
index e18711d..9c84db1 100644
--- a/nmxact/sesn/sesn_cfg.go
+++ b/nmxact/sesn/sesn_cfg.go
@@ -44,6 +44,7 @@ func BlePeerSpecName(name string) BlePeerSpec {
 type SesnCfgBle struct {
 	OwnAddrType  bledefs.BleAddrType
 	PeerSpec     BlePeerSpec
+	ConnTries    int
 	CloseTimeout time.Duration
 	OnCloseCb    BleOnCloseFn
 }
@@ -59,6 +60,7 @@ type SesnCfg struct {
 func NewSesnCfg() SesnCfg {
 	return SesnCfg{
 		Ble: SesnCfgBle{
+			ConnTries:    3,
 			CloseTimeout: 5 * time.Second,
 		},
 	}