You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/03/30 01:51:38 UTC
[1/3] incubator-mynewt-newtmgr git commit: nmxact - Keep ble_fsm
config in params object.
Repository: incubator-mynewt-newtmgr
Updated Branches:
refs/heads/master e234593ab -> ff255cbfa
nmxact - Keep ble_fsm config in params object.
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/f02ca2a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/f02ca2a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/f02ca2a1
Branch: refs/heads/master
Commit: f02ca2a1c152e9cce0131d46ff5055529cc82d68
Parents: e234593
Author: Christopher Collins <cc...@apache.org>
Authored: Wed Mar 29 12:34:38 2017 -0700
Committer: Christopher Collins <cc...@apache.org>
Committed: Wed Mar 29 12:34:38 2017 -0700
----------------------------------------------------------------------
nmxact/nmble/ble_fsm.go | 70 ++++++++++++++++++--------------------------
1 file changed, 28 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/f02ca2a1/nmxact/nmble/ble_fsm.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index 9c96526..cfdb760 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -48,16 +48,9 @@ type BleFsmParams struct {
}
type BleFsm struct {
- bx *BleXport
- ownAddrType BleAddrType
- peerSpec sesn.BlePeerSpec
- peerDev *BleDev
- svcUuid BleUuid
- reqChrUuid BleUuid
- rspChrUuid BleUuid
- rxNmpCb BleRxNmpFn
- disconnectCb BleDisconnectFn
+ params BleFsmParams
+ peerDev *BleDev
connHandle int
nmpSvc *BleSvc
nmpReqChr *BleChr
@@ -75,14 +68,7 @@ type BleFsm struct {
func NewBleFsm(p BleFsmParams) *BleFsm {
bf := &BleFsm{
- bx: p.Bx,
- peerSpec: p.PeerSpec,
- ownAddrType: p.OwnAddrType,
- svcUuid: p.SvcUuid,
- reqChrUuid: p.ReqChrUuid,
- rspChrUuid: p.RspChrUuid,
- rxNmpCb: p.RxNmpCb,
- disconnectCb: p.DisconnectCb,
+ params: p,
bls: map[*BleListener]struct{}{},
attMtu: DFLT_ATT_MTU,
@@ -143,7 +129,7 @@ func (bf *BleFsm) addBleListener(base BleMsgBase) (*BleListener, error) {
bf.bls[bl] = struct{}{}
bf.mtx.Unlock()
- if err := bf.bx.Bd.AddListener(base, bl); err != nil {
+ if err := bf.params.Bx.Bd.AddListener(base, bl); err != nil {
delete(bf.bls, bl)
return nil, err
}
@@ -167,7 +153,7 @@ func (bf *BleFsm) addBleSeqListener(seq int) (*BleListener, error) {
}
func (bf *BleFsm) removeBleListener(base BleMsgBase) {
- bl := bf.bx.Bd.RemoveListener(base)
+ bl := bf.params.Bx.Bd.RemoveListener(base)
if bl != nil {
bf.mtx.Lock()
delete(bf.bls, bl)
@@ -286,13 +272,13 @@ func (bf *BleFsm) connectListen(seq int) error {
bf.setState(SESN_STATE_UNCONNECTED)
peer := *bf.peerDev
bf.peerDev = nil
- bf.disconnectCb(peer, err)
+ bf.params.DisconnectCb(peer, err)
return
default:
}
- case <-bl.AfterTimeout(bf.bx.rspTimeout):
+ case <-bl.AfterTimeout(bf.params.Bx.rspTimeout):
bf.connChan <- BhdTimeoutError(MSG_TYPE_CONNECT)
}
}
@@ -326,7 +312,7 @@ func (bf *BleFsm) nmpRspListen() error {
if bf.nmpRspChr != nil &&
msg.AttrHandle == bf.nmpRspChr.ValHandle {
- bf.rxNmpCb(msg.Data.Bytes)
+ bf.params.RxNmpCb(msg.Data.Bytes)
}
default:
@@ -339,7 +325,7 @@ func (bf *BleFsm) nmpRspListen() error {
func (bf *BleFsm) connect() error {
r := NewBleConnectReq()
- r.OwnAddrType = bf.ownAddrType
+ r.OwnAddrType = bf.params.OwnAddrType
r.PeerAddrType = bf.peerDev.AddrType
r.PeerAddr = bf.peerDev.Addr
@@ -347,7 +333,7 @@ func (bf *BleFsm) connect() error {
return err
}
- if err := connect(bf.bx, bf.connChan, r); err != nil {
+ if err := connect(bf.params.Bx, bf.connChan, r); err != nil {
return err
}
@@ -356,7 +342,7 @@ func (bf *BleFsm) connect() error {
func (bf *BleFsm) scan() error {
r := NewBleScanReq()
- r.OwnAddrType = bf.ownAddrType
+ r.OwnAddrType = bf.params.OwnAddrType
r.DurationMs = 15000
r.FilterPolicy = BLE_SCAN_FILT_NO_WL
r.Limited = false
@@ -388,13 +374,13 @@ func (bf *BleFsm) scan() error {
}
// Ask client if we should connect to this advertiser.
- if bf.peerSpec.ScanPred(r) {
+ if bf.params.PeerSpec.ScanPred(r) {
bf.peerDev = &r.Sender
abortChan <- struct{}{}
}
}
- if err := scan(bf.bx, bl, r, abortChan, scanCb); err != nil {
+ if err := scan(bf.params.Bx, bl, r, abortChan, scanCb); err != nil {
return err
}
@@ -411,7 +397,7 @@ func (bf *BleFsm) scanCancel() error {
}
defer bf.removeBleSeqListener(r.Seq)
- if err := scanCancel(bf.bx, bl, r); err != nil {
+ if err := scanCancel(bf.params.Bx, bl, r); err != nil {
return err
}
@@ -452,7 +438,7 @@ func (bf *BleFsm) terminate() error {
}
defer bf.removeBleSeqListener(r.Seq)
- if err := terminate(bf.bx, bl, r); err != nil {
+ if err := terminate(bf.params.Bx, bl, r); err != nil {
return err
}
@@ -474,7 +460,7 @@ func (bf *BleFsm) connCancel() error {
}
defer bf.removeBleSeqListener(r.Seq)
- if err := connCancel(bf.bx, bl, r); err != nil {
+ if err := connCancel(bf.params.Bx, bl, r); err != nil {
return err
}
@@ -484,7 +470,7 @@ func (bf *BleFsm) connCancel() error {
func (bf *BleFsm) discSvcUuid() error {
r := NewBleDiscSvcUuidReq()
r.ConnHandle = bf.connHandle
- r.Uuid = bf.svcUuid
+ r.Uuid = bf.params.SvcUuid
bl, err := bf.addBleSeqListener(r.Seq)
if err != nil {
@@ -492,7 +478,7 @@ func (bf *BleFsm) discSvcUuid() error {
}
defer bf.removeBleSeqListener(r.Seq)
- bf.nmpSvc, err = discSvcUuid(bf.bx, bl, r)
+ bf.nmpSvc, err = discSvcUuid(bf.params.Bx, bl, r)
if err != nil {
return err
}
@@ -512,16 +498,16 @@ func (bf *BleFsm) discAllChrs() error {
}
defer bf.removeBleSeqListener(r.Seq)
- chrs, err := discAllChrs(bf.bx, bl, r)
+ chrs, err := discAllChrs(bf.params.Bx, bl, r)
if err != nil {
return err
}
for _, c := range chrs {
- if CompareUuids(bf.reqChrUuid, c.Uuid) == 0 {
+ if CompareUuids(bf.params.ReqChrUuid, c.Uuid) == 0 {
bf.nmpReqChr = c
}
- if CompareUuids(bf.rspChrUuid, c.Uuid) == 0 {
+ if CompareUuids(bf.params.RspChrUuid, c.Uuid) == 0 {
bf.nmpRspChr = c
}
}
@@ -529,13 +515,13 @@ func (bf *BleFsm) discAllChrs() error {
if bf.nmpReqChr == nil {
return fmt.Errorf(
"Peer doesn't support required characteristic: %s",
- bf.reqChrUuid.String())
+ bf.params.ReqChrUuid.String())
}
if bf.nmpRspChr == nil {
return fmt.Errorf(
"Peer doesn't support required characteristic: %s",
- bf.rspChrUuid.String())
+ bf.params.RspChrUuid.String())
}
return nil
@@ -551,7 +537,7 @@ func (bf *BleFsm) exchangeMtu() error {
}
defer bf.removeBleSeqListener(r.Seq)
- mtu, err := exchangeMtu(bf.bx, bl, r)
+ mtu, err := exchangeMtu(bf.params.Bx, bl, r)
if err != nil {
return err
}
@@ -572,7 +558,7 @@ func (bf *BleFsm) writeCmd(data []byte) error {
}
defer bf.removeBleSeqListener(r.Seq)
- if err := writeCmd(bf.bx, bl, r); err != nil {
+ if err := writeCmd(bf.params.Bx, bl, r); err != nil {
return err
}
@@ -591,7 +577,7 @@ func (bf *BleFsm) subscribe() error {
}
defer bf.removeBleSeqListener(r.Seq)
- if err := writeCmd(bf.bx, bl, r); err != nil {
+ if err := writeCmd(bf.params.Bx, bl, r); err != nil {
return err
}
@@ -607,8 +593,8 @@ func (bf *BleFsm) tryFillPeerDev() bool {
// If a peer address is specified, fill in the peer field now so the
// scanning step can be skipped. Otherwise, the peer field gets populated
// during scanning.
- if bf.peerSpec.ScanPred == nil {
- bf.peerDev = &bf.peerSpec.Dev
+ if bf.params.PeerSpec.ScanPred == nil {
+ bf.peerDev = &bf.params.PeerSpec.Dev
return true
}
[2/3] incubator-mynewt-newtmgr git commit: nmxact - Reconnect on
immediate spvtmo
Posted by cc...@apache.org.
nmxact - Reconnect on immediate spvtmo
A supervision timeout is much more likely to happen immediately after a
connection is established. This change hides these spurious failures
from the user by automatically retrying the attempt to open the session
(default: 3 tries).
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/4a3a72aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/4a3a72aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/4a3a72aa
Branch: refs/heads/master
Commit: 4a3a72aaf11a7346a4e0ca5537af122759f7f741
Parents: f02ca2a
Author: Christopher Collins <cc...@apache.org>
Authored: Wed Mar 29 18:49:04 2017 -0700
Committer: Christopher Collins <cc...@apache.org>
Committed: Wed Mar 29 18:51:13 2017 -0700
----------------------------------------------------------------------
nmxact/nmble/ble_fsm.go | 70 ++++++++++++++++++++-------
nmxact/nmble/ble_oic_sesn.go | 85 +++++++++++++++++++++++----------
nmxact/nmble/ble_plain_sesn.go | 94 +++++++++++++++++++++++++++----------
nmxact/sesn/sesn_cfg.go | 2 +
4 files changed, 185 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/4a3a72aa/nmxact/nmble/ble_fsm.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index cfdb760..cf12bdc 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -14,10 +14,10 @@ import (
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
-type BleSesnState int32
-
const DFLT_ATT_MTU = 23
+type BleSesnState int32
+
const (
SESN_STATE_UNCONNECTED BleSesnState = 0
SESN_STATE_SCANNING = 1
@@ -33,8 +33,17 @@ const (
SESN_STATE_CONN_CANCELLING = 11
)
+type BleFsmDisconnectType int
+
+const (
+ FSM_DISCONNECT_TYPE_UNOPENED BleFsmDisconnectType = iota
+ FSM_DISCONNECT_TYPE_IMMEDIATE_TIMEOUT
+ FSM_DISCONNECT_TYPE_OPENED
+ FSM_DISCONNECT_TYPE_REQUESTED
+)
+
type BleRxNmpFn func(data []byte)
-type BleDisconnectFn func(peer BleDev, err error)
+type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error)
type BleFsmParams struct {
Bx *BleXport
@@ -191,6 +200,22 @@ func (bf *BleFsm) action(
return nil
}
+func (bf *BleFsm) calcDisconnectType() BleFsmDisconnectType {
+ switch bf.getState() {
+ case SESN_STATE_EXCHANGING_MTU:
+ return FSM_DISCONNECT_TYPE_IMMEDIATE_TIMEOUT
+
+ case SESN_STATE_DISCOVERED_CHR:
+ return FSM_DISCONNECT_TYPE_OPENED
+
+ case SESN_STATE_TERMINATING, SESN_STATE_CONN_CANCELLING:
+ return FSM_DISCONNECT_TYPE_REQUESTED
+
+ default:
+ return FSM_DISCONNECT_TYPE_UNOPENED
+ }
+}
+
func (bf *BleFsm) connectListen(seq int) error {
bf.connChan = make(chan error, 1)
@@ -265,14 +290,18 @@ func (bf *BleFsm) connectListen(seq int) error {
}
bf.mtx.Unlock()
+ // Remember some fields before we clear them.
+ dt := bf.calcDisconnectType()
+ peer := *bf.peerDev
+
+ bf.setState(SESN_STATE_UNCONNECTED)
+ bf.peerDev = nil
+
for _, bl := range bls {
bl.ErrChan <- err
}
- bf.setState(SESN_STATE_UNCONNECTED)
- peer := *bf.peerDev
- bf.peerDev = nil
- bf.params.DisconnectCb(peer, err)
+ bf.params.DisconnectCb(dt, peer, err)
return
default:
@@ -601,9 +630,12 @@ func (bf *BleFsm) tryFillPeerDev() bool {
return false
}
-func (bf *BleFsm) Start() error {
+// @return bool Whether another start attempt should be made;
+// error The error that caused the start attempt to
+// fail; nil on success.
+func (bf *BleFsm) Start() (bool, error) {
if bf.getState() != SESN_STATE_UNCONNECTED {
- return nmxutil.NewSesnAlreadyOpenError(
+ return false, nmxutil.NewSesnAlreadyOpenError(
"Attempt to open an already-open BLE session")
}
@@ -639,7 +671,7 @@ func (bf *BleFsm) Start() error {
}
if err != nil {
- return err
+ return false, err
}
case SESN_STATE_CONNECTED:
@@ -650,7 +682,9 @@ func (bf *BleFsm) Start() error {
SESN_STATE_EXCHANGED_MTU,
cb)
if err != nil {
- return err
+ bhe := nmxutil.ToBleHost(err)
+ retry := bhe != nil && bhe.Status == ERR_CODE_ENOTCONN
+ return retry, err
}
case SESN_STATE_EXCHANGED_MTU:
@@ -661,7 +695,7 @@ func (bf *BleFsm) Start() error {
SESN_STATE_DISCOVERED_SVC,
cb)
if err != nil {
- return err
+ return false, err
}
case SESN_STATE_DISCOVERED_SVC:
@@ -675,22 +709,22 @@ func (bf *BleFsm) Start() error {
SESN_STATE_DISCOVERED_CHR,
cb)
if err != nil {
- return err
+ return false, err
}
if err := bf.subscribe(); err != nil {
- return err
+ return false, err
}
case SESN_STATE_DISCOVERED_CHR:
/* Open complete. */
- return nil
+ return false, nil
case SESN_STATE_CONNECTING,
SESN_STATE_DISCOVERING_SVC,
SESN_STATE_DISCOVERING_CHR,
SESN_STATE_TERMINATING:
- return fmt.Errorf("BleFsm already being opened")
+ return false, fmt.Errorf("BleFsm already being opened")
}
}
}
@@ -726,6 +760,10 @@ func (bf *BleFsm) IsOpen() bool {
return bf.getState() == SESN_STATE_DISCOVERED_CHR
}
+func (bf *BleFsm) IsClosed() bool {
+ return bf.getState() == SESN_STATE_UNCONNECTED
+}
+
func (bf *BleFsm) TxNmp(payload []byte, nl *nmp.NmpListener,
timeout time.Duration) (nmp.NmpRsp, error) {
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/4a3a72aa/nmxact/nmble/ble_oic_sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index 365efc6..4001960 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -5,17 +5,18 @@ import (
"sync"
"time"
+ "mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/omp"
"mynewt.apache.org/newtmgr/nmxact/sesn"
- "mynewt.apache.org/newt/util"
)
type BleOicSesn struct {
bf *BleFsm
nls map[*nmp.NmpListener]struct{}
od *omp.OmpDispatcher
+ connTries int
closeTimeout time.Duration
onCloseCb sesn.BleOnCloseFn
@@ -27,6 +28,7 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
bos := &BleOicSesn{
nls: map[*nmp.NmpListener]struct{}{},
od: omp.NewOmpDispatcher(),
+ connTries: cfg.Ble.ConnTries,
closeTimeout: cfg.Ble.CloseTimeout,
onCloseCb: cfg.Ble.OnCloseCb,
}
@@ -47,14 +49,16 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
}
bos.bf = NewBleFsm(BleFsmParams{
- Bx: bx,
- OwnAddrType: cfg.Ble.OwnAddrType,
- PeerSpec: cfg.Ble.PeerSpec,
- SvcUuid: svcUuid,
- ReqChrUuid: reqChrUuid,
- RspChrUuid: rspChrUuid,
- RxNmpCb: func(d []byte) { bos.onRxNmp(d) },
- DisconnectCb: func(p BleDev, e error) { bos.onDisconnect(p, e) },
+ Bx: bx,
+ OwnAddrType: cfg.Ble.OwnAddrType,
+ PeerSpec: cfg.Ble.PeerSpec,
+ SvcUuid: svcUuid,
+ ReqChrUuid: reqChrUuid,
+ RspChrUuid: rspChrUuid,
+ RxNmpCb: func(d []byte) { bos.onRxNmp(d) },
+ DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
+ bos.onDisconnect(dt, p, e)
+ },
})
return bos
@@ -80,16 +84,16 @@ func (bos *BleOicSesn) removeNmpListener(seq uint8) {
}
// Returns true if a new channel was assigned.
-func (bos *BleOicSesn) setCloseChan() bool {
+func (bos *BleOicSesn) setCloseChan() error {
bos.mx.Lock()
defer bos.mx.Unlock()
if bos.closeChan != nil {
- return false
+ return fmt.Errorf("Multiple listeners waiting for session to close")
}
bos.closeChan = make(chan error, 1)
- return true
+ return nil
}
func (bos *BleOicSesn) clearCloseChan() {
@@ -99,18 +103,51 @@ func (bos *BleOicSesn) clearCloseChan() {
bos.closeChan = nil
}
+func (bos *BleOicSesn) listenForClose(timeout time.Duration) error {
+ select {
+ case <-bos.closeChan:
+ return nil
+ case <-time.After(timeout):
+ // Session never closed.
+ return fmt.Errorf("Timeout while waiting for session to close")
+ }
+}
+
+func (bos *BleOicSesn) blockUntilClosed(timeout time.Duration) error {
+ if err := bos.setCloseChan(); err != nil {
+ return err
+ }
+ defer bos.clearCloseChan()
+
+ // If the session is already closed, we're done.
+ if bos.bf.IsClosed() {
+ return nil
+ }
+
+ // Block until close completes or timeout.
+ return bos.listenForClose(timeout)
+}
+
func (bos *BleOicSesn) AbortRx(seq uint8) error {
return bos.od.FakeRxError(seq, fmt.Errorf("Rx aborted"))
}
func (bos *BleOicSesn) Open() error {
- return bos.bf.Start()
+ var err error
+ for i := 0; i < bos.connTries; i++ {
+ var retry bool
+ retry, err = bos.bf.Start()
+ if !retry {
+ break
+ }
+ }
+
+ return err
}
func (bos *BleOicSesn) Close() error {
- if !bos.setCloseChan() {
- return bos.bf.closedError(
- "Attempt to close an unopened BLE session")
+ if err := bos.setCloseChan(); err != nil {
+ return err
}
defer bos.clearCloseChan()
@@ -125,12 +162,7 @@ func (bos *BleOicSesn) Close() error {
}
// Block until close completes or timeout.
- select {
- case <-bos.closeChan:
- case <-time.After(bos.closeTimeout):
- }
-
- return nil
+ return bos.listenForClose(bos.closeTimeout)
}
func (bos *BleOicSesn) IsOpen() bool {
@@ -141,7 +173,9 @@ func (bos *BleOicSesn) onRxNmp(data []byte) {
bos.od.Dispatch(data)
}
-func (bos *BleOicSesn) onDisconnect(peer BleDev, err error) {
+func (bos *BleOicSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
+ err error) {
+
for nl, _ := range bos.nls {
nl.ErrChan <- err
}
@@ -150,7 +184,10 @@ func (bos *BleOicSesn) onDisconnect(peer BleDev, err error) {
if bos.closeChan != nil {
bos.closeChan <- err
}
- if bos.onCloseCb != nil {
+
+ // Only execute client's disconnect callback if the disconnect was
+ // unsolicited and the session was fully open.
+ if dt == FSM_DISCONNECT_TYPE_OPENED && bos.onCloseCb != nil {
bos.onCloseCb(bos, peer, err)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/4a3a72aa/nmxact/nmble/ble_plain_sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index a8a46d6..012a301 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -5,16 +5,17 @@ import (
"sync"
"time"
+ "mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/sesn"
- "mynewt.apache.org/newt/util"
)
type BlePlainSesn struct {
bf *BleFsm
nls map[*nmp.NmpListener]struct{}
nd *nmp.NmpDispatcher
+ connTries int
closeTimeout time.Duration
onCloseCb sesn.BleOnCloseFn
@@ -26,6 +27,7 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
bps := &BlePlainSesn{
nls: map[*nmp.NmpListener]struct{}{},
nd: nmp.NewNmpDispatcher(),
+ connTries: cfg.Ble.ConnTries,
closeTimeout: cfg.Ble.CloseTimeout,
onCloseCb: cfg.Ble.OnCloseCb,
}
@@ -41,14 +43,16 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
}
bps.bf = NewBleFsm(BleFsmParams{
- Bx: bx,
- OwnAddrType: cfg.Ble.OwnAddrType,
- PeerSpec: cfg.Ble.PeerSpec,
- SvcUuid: svcUuid,
- ReqChrUuid: chrUuid,
- RspChrUuid: chrUuid,
- RxNmpCb: func(d []byte) { bps.onRxNmp(d) },
- DisconnectCb: func(p BleDev, e error) { bps.onDisconnect(p, e) },
+ Bx: bx,
+ OwnAddrType: cfg.Ble.OwnAddrType,
+ PeerSpec: cfg.Ble.PeerSpec,
+ SvcUuid: svcUuid,
+ ReqChrUuid: chrUuid,
+ RspChrUuid: chrUuid,
+ RxNmpCb: func(d []byte) { bps.onRxNmp(d) },
+ DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
+ bps.onDisconnect(dt, p, e)
+ },
})
return bps
@@ -73,17 +77,16 @@ func (bps *BlePlainSesn) removeNmpListener(seq uint8) {
}
}
-// Returns true if a new channel was assigned.
-func (bps *BlePlainSesn) setCloseChan() bool {
+func (bps *BlePlainSesn) setCloseChan() error {
bps.mx.Lock()
defer bps.mx.Unlock()
if bps.closeChan != nil {
- return false
+ return fmt.Errorf("Multiple listeners waiting for session to close")
}
bps.closeChan = make(chan error, 1)
- return true
+ return nil
}
func (bps *BlePlainSesn) clearCloseChan() {
@@ -93,18 +96,57 @@ func (bps *BlePlainSesn) clearCloseChan() {
bps.closeChan = nil
}
+func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error {
+ select {
+ case <-bps.closeChan:
+ return nil
+ case <-time.After(timeout):
+ // Session never closed.
+ return fmt.Errorf("Timeout while waiting for session to close")
+ }
+}
+
+func (bps *BlePlainSesn) blockUntilClosed(timeout time.Duration) error {
+ if err := bps.setCloseChan(); err != nil {
+ return err
+ }
+ defer bps.clearCloseChan()
+
+ // If the session is already closed, we're done.
+ if bps.bf.IsClosed() {
+ return nil
+ }
+
+ // Block until close completes or timeout.
+ return bps.listenForClose(timeout)
+}
+
func (bps *BlePlainSesn) AbortRx(seq uint8) error {
return bps.nd.FakeRxError(seq, fmt.Errorf("Rx aborted"))
}
func (bps *BlePlainSesn) Open() error {
- return bps.bf.Start()
+ var err error
+ for i := 0; i < bps.connTries; i++ {
+ var retry bool
+ retry, err = bps.bf.Start()
+ if !retry {
+ break
+ }
+
+ if bps.blockUntilClosed(1*time.Second) != nil {
+ // Just close the session manually and report the original error.
+ bps.Close()
+ return err
+ }
+ }
+
+ return err
}
func (bps *BlePlainSesn) Close() error {
- if !bps.setCloseChan() {
- return bps.bf.closedError(
- "Attempt to close an unopened BLE session")
+ if err := bps.setCloseChan(); err != nil {
+ return err
}
defer bps.clearCloseChan()
@@ -119,12 +161,7 @@ func (bps *BlePlainSesn) Close() error {
}
// Block until close completes or timeout.
- select {
- case <-bps.closeChan:
- case <-time.After(bps.closeTimeout):
- }
-
- return nil
+ return bps.listenForClose(bps.closeTimeout)
}
func (bps *BlePlainSesn) IsOpen() bool {
@@ -135,16 +172,21 @@ func (bps *BlePlainSesn) onRxNmp(data []byte) {
bps.nd.Dispatch(data)
}
-func (bps *BlePlainSesn) onDisconnect(peer BleDev, err error) {
+func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
+ err error) {
+
for nl, _ := range bps.nls {
nl.ErrChan <- err
}
- // If the session is being closed, unblock the close() call.
+ // If someone is waiting for the session to close, unblock them.
if bps.closeChan != nil {
bps.closeChan <- err
}
- if bps.onCloseCb != nil {
+
+ // Only execute client's disconnect callback if the disconnect was
+ // unsolicited and the session was fully open.
+ if dt == FSM_DISCONNECT_TYPE_OPENED && bps.onCloseCb != nil {
bps.onCloseCb(bps, peer, err)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/4a3a72aa/nmxact/sesn/sesn_cfg.go
----------------------------------------------------------------------
diff --git a/nmxact/sesn/sesn_cfg.go b/nmxact/sesn/sesn_cfg.go
index e18711d..9c84db1 100644
--- a/nmxact/sesn/sesn_cfg.go
+++ b/nmxact/sesn/sesn_cfg.go
@@ -44,6 +44,7 @@ func BlePeerSpecName(name string) BlePeerSpec {
type SesnCfgBle struct {
OwnAddrType bledefs.BleAddrType
PeerSpec BlePeerSpec
+ ConnTries int
CloseTimeout time.Duration
OnCloseCb BleOnCloseFn
}
@@ -59,6 +60,7 @@ type SesnCfg struct {
func NewSesnCfg() SesnCfg {
return SesnCfg{
Ble: SesnCfgBle{
+ ConnTries: 3,
CloseTimeout: 5 * time.Second,
},
}
[3/3] incubator-mynewt-newtmgr git commit: newtmgr - revendor
Posted by cc...@apache.org.
newtmgr - revendor
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/ff255cbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/ff255cbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/ff255cbf
Branch: refs/heads/master
Commit: ff255cbfab3a81c2e012e6a7390bfe10c339f634
Parents: 4a3a72a
Author: Christopher Collins <cc...@apache.org>
Authored: Wed Mar 29 16:00:30 2017 -0700
Committer: Christopher Collins <cc...@apache.org>
Committed: Wed Mar 29 18:51:17 2017 -0700
----------------------------------------------------------------------
newtmgr/Godeps/Godeps.json | 36 ++---
.../newtmgr/nmxact/nmble/ble_fsm.go | 138 +++++++++++--------
.../newtmgr/nmxact/nmble/ble_oic_sesn.go | 85 ++++++++----
.../newtmgr/nmxact/nmble/ble_plain_sesn.go | 94 +++++++++----
.../newtmgr/nmxact/sesn/sesn_cfg.go | 2 +
5 files changed, 230 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/ff255cbf/newtmgr/Godeps/Godeps.json
----------------------------------------------------------------------
diff --git a/newtmgr/Godeps/Godeps.json b/newtmgr/Godeps/Godeps.json
index fbbd835..4461c33 100644
--- a/newtmgr/Godeps/Godeps.json
+++ b/newtmgr/Godeps/Godeps.json
@@ -96,48 +96,48 @@
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/bledefs",
- "Comment": "mynewt_0_9_0_tag-446-gabaa035",
- "Rev": "abaa03594e726b6d2b749bc405f4006eb4820c1b"
+ "Comment": "mynewt_0_9_0_tag-448-ga808bde",
+ "Rev": "a808bde2b242416e3ad2e52d125fa26f098dc6e1"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmble",
- "Comment": "mynewt_0_9_0_tag-446-gabaa035",
- "Rev": "abaa03594e726b6d2b749bc405f4006eb4820c1b"
+ "Comment": "mynewt_0_9_0_tag-448-ga808bde",
+ "Rev": "a808bde2b242416e3ad2e52d125fa26f098dc6e1"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmp",
- "Comment": "mynewt_0_9_0_tag-446-gabaa035",
- "Rev": "abaa03594e726b6d2b749bc405f4006eb4820c1b"
+ "Comment": "mynewt_0_9_0_tag-448-ga808bde",
+ "Rev": "a808bde2b242416e3ad2e52d125fa26f098dc6e1"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmserial",
- "Comment": "mynewt_0_9_0_tag-446-gabaa035",
- "Rev": "abaa03594e726b6d2b749bc405f4006eb4820c1b"
+ "Comment": "mynewt_0_9_0_tag-448-ga808bde",
+ "Rev": "a808bde2b242416e3ad2e52d125fa26f098dc6e1"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmxutil",
- "Comment": "mynewt_0_9_0_tag-446-gabaa035",
- "Rev": "abaa03594e726b6d2b749bc405f4006eb4820c1b"
+ "Comment": "mynewt_0_9_0_tag-448-ga808bde",
+ "Rev": "a808bde2b242416e3ad2e52d125fa26f098dc6e1"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/omp",
- "Comment": "mynewt_0_9_0_tag-446-gabaa035",
- "Rev": "abaa03594e726b6d2b749bc405f4006eb4820c1b"
+ "Comment": "mynewt_0_9_0_tag-448-ga808bde",
+ "Rev": "a808bde2b242416e3ad2e52d125fa26f098dc6e1"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/sesn",
- "Comment": "mynewt_0_9_0_tag-446-gabaa035",
- "Rev": "abaa03594e726b6d2b749bc405f4006eb4820c1b"
+ "Comment": "mynewt_0_9_0_tag-448-ga808bde",
+ "Rev": "a808bde2b242416e3ad2e52d125fa26f098dc6e1"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/xact",
- "Comment": "mynewt_0_9_0_tag-446-gabaa035",
- "Rev": "abaa03594e726b6d2b749bc405f4006eb4820c1b"
+ "Comment": "mynewt_0_9_0_tag-448-ga808bde",
+ "Rev": "a808bde2b242416e3ad2e52d125fa26f098dc6e1"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/xport",
- "Comment": "mynewt_0_9_0_tag-446-gabaa035",
- "Rev": "abaa03594e726b6d2b749bc405f4006eb4820c1b"
+ "Comment": "mynewt_0_9_0_tag-448-ga808bde",
+ "Rev": "a808bde2b242416e3ad2e52d125fa26f098dc6e1"
}
]
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/ff255cbf/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
----------------------------------------------------------------------
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
index 9c96526..cf12bdc 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
@@ -14,10 +14,10 @@ import (
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
-type BleSesnState int32
-
const DFLT_ATT_MTU = 23
+type BleSesnState int32
+
const (
SESN_STATE_UNCONNECTED BleSesnState = 0
SESN_STATE_SCANNING = 1
@@ -33,8 +33,17 @@ const (
SESN_STATE_CONN_CANCELLING = 11
)
+type BleFsmDisconnectType int
+
+const (
+ FSM_DISCONNECT_TYPE_UNOPENED BleFsmDisconnectType = iota
+ FSM_DISCONNECT_TYPE_IMMEDIATE_TIMEOUT
+ FSM_DISCONNECT_TYPE_OPENED
+ FSM_DISCONNECT_TYPE_REQUESTED
+)
+
type BleRxNmpFn func(data []byte)
-type BleDisconnectFn func(peer BleDev, err error)
+type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error)
type BleFsmParams struct {
Bx *BleXport
@@ -48,16 +57,9 @@ type BleFsmParams struct {
}
type BleFsm struct {
- bx *BleXport
- ownAddrType BleAddrType
- peerSpec sesn.BlePeerSpec
- peerDev *BleDev
- svcUuid BleUuid
- reqChrUuid BleUuid
- rspChrUuid BleUuid
- rxNmpCb BleRxNmpFn
- disconnectCb BleDisconnectFn
+ params BleFsmParams
+ peerDev *BleDev
connHandle int
nmpSvc *BleSvc
nmpReqChr *BleChr
@@ -75,14 +77,7 @@ type BleFsm struct {
func NewBleFsm(p BleFsmParams) *BleFsm {
bf := &BleFsm{
- bx: p.Bx,
- peerSpec: p.PeerSpec,
- ownAddrType: p.OwnAddrType,
- svcUuid: p.SvcUuid,
- reqChrUuid: p.ReqChrUuid,
- rspChrUuid: p.RspChrUuid,
- rxNmpCb: p.RxNmpCb,
- disconnectCb: p.DisconnectCb,
+ params: p,
bls: map[*BleListener]struct{}{},
attMtu: DFLT_ATT_MTU,
@@ -143,7 +138,7 @@ func (bf *BleFsm) addBleListener(base BleMsgBase) (*BleListener, error) {
bf.bls[bl] = struct{}{}
bf.mtx.Unlock()
- if err := bf.bx.Bd.AddListener(base, bl); err != nil {
+ if err := bf.params.Bx.Bd.AddListener(base, bl); err != nil {
delete(bf.bls, bl)
return nil, err
}
@@ -167,7 +162,7 @@ func (bf *BleFsm) addBleSeqListener(seq int) (*BleListener, error) {
}
func (bf *BleFsm) removeBleListener(base BleMsgBase) {
- bl := bf.bx.Bd.RemoveListener(base)
+ bl := bf.params.Bx.Bd.RemoveListener(base)
if bl != nil {
bf.mtx.Lock()
delete(bf.bls, bl)
@@ -205,6 +200,22 @@ func (bf *BleFsm) action(
return nil
}
+func (bf *BleFsm) calcDisconnectType() BleFsmDisconnectType {
+ switch bf.getState() {
+ case SESN_STATE_EXCHANGING_MTU:
+ return FSM_DISCONNECT_TYPE_IMMEDIATE_TIMEOUT
+
+ case SESN_STATE_DISCOVERED_CHR:
+ return FSM_DISCONNECT_TYPE_OPENED
+
+ case SESN_STATE_TERMINATING, SESN_STATE_CONN_CANCELLING:
+ return FSM_DISCONNECT_TYPE_REQUESTED
+
+ default:
+ return FSM_DISCONNECT_TYPE_UNOPENED
+ }
+}
+
func (bf *BleFsm) connectListen(seq int) error {
bf.connChan = make(chan error, 1)
@@ -279,20 +290,24 @@ func (bf *BleFsm) connectListen(seq int) error {
}
bf.mtx.Unlock()
+ // Remember some fields before we clear them.
+ dt := bf.calcDisconnectType()
+ peer := *bf.peerDev
+
+ bf.setState(SESN_STATE_UNCONNECTED)
+ bf.peerDev = nil
+
for _, bl := range bls {
bl.ErrChan <- err
}
- bf.setState(SESN_STATE_UNCONNECTED)
- peer := *bf.peerDev
- bf.peerDev = nil
- bf.disconnectCb(peer, err)
+ bf.params.DisconnectCb(dt, peer, err)
return
default:
}
- case <-bl.AfterTimeout(bf.bx.rspTimeout):
+ case <-bl.AfterTimeout(bf.params.Bx.rspTimeout):
bf.connChan <- BhdTimeoutError(MSG_TYPE_CONNECT)
}
}
@@ -326,7 +341,7 @@ func (bf *BleFsm) nmpRspListen() error {
if bf.nmpRspChr != nil &&
msg.AttrHandle == bf.nmpRspChr.ValHandle {
- bf.rxNmpCb(msg.Data.Bytes)
+ bf.params.RxNmpCb(msg.Data.Bytes)
}
default:
@@ -339,7 +354,7 @@ func (bf *BleFsm) nmpRspListen() error {
func (bf *BleFsm) connect() error {
r := NewBleConnectReq()
- r.OwnAddrType = bf.ownAddrType
+ r.OwnAddrType = bf.params.OwnAddrType
r.PeerAddrType = bf.peerDev.AddrType
r.PeerAddr = bf.peerDev.Addr
@@ -347,7 +362,7 @@ func (bf *BleFsm) connect() error {
return err
}
- if err := connect(bf.bx, bf.connChan, r); err != nil {
+ if err := connect(bf.params.Bx, bf.connChan, r); err != nil {
return err
}
@@ -356,7 +371,7 @@ func (bf *BleFsm) connect() error {
func (bf *BleFsm) scan() error {
r := NewBleScanReq()
- r.OwnAddrType = bf.ownAddrType
+ r.OwnAddrType = bf.params.OwnAddrType
r.DurationMs = 15000
r.FilterPolicy = BLE_SCAN_FILT_NO_WL
r.Limited = false
@@ -388,13 +403,13 @@ func (bf *BleFsm) scan() error {
}
// Ask client if we should connect to this advertiser.
- if bf.peerSpec.ScanPred(r) {
+ if bf.params.PeerSpec.ScanPred(r) {
bf.peerDev = &r.Sender
abortChan <- struct{}{}
}
}
- if err := scan(bf.bx, bl, r, abortChan, scanCb); err != nil {
+ if err := scan(bf.params.Bx, bl, r, abortChan, scanCb); err != nil {
return err
}
@@ -411,7 +426,7 @@ func (bf *BleFsm) scanCancel() error {
}
defer bf.removeBleSeqListener(r.Seq)
- if err := scanCancel(bf.bx, bl, r); err != nil {
+ if err := scanCancel(bf.params.Bx, bl, r); err != nil {
return err
}
@@ -452,7 +467,7 @@ func (bf *BleFsm) terminate() error {
}
defer bf.removeBleSeqListener(r.Seq)
- if err := terminate(bf.bx, bl, r); err != nil {
+ if err := terminate(bf.params.Bx, bl, r); err != nil {
return err
}
@@ -474,7 +489,7 @@ func (bf *BleFsm) connCancel() error {
}
defer bf.removeBleSeqListener(r.Seq)
- if err := connCancel(bf.bx, bl, r); err != nil {
+ if err := connCancel(bf.params.Bx, bl, r); err != nil {
return err
}
@@ -484,7 +499,7 @@ func (bf *BleFsm) connCancel() error {
func (bf *BleFsm) discSvcUuid() error {
r := NewBleDiscSvcUuidReq()
r.ConnHandle = bf.connHandle
- r.Uuid = bf.svcUuid
+ r.Uuid = bf.params.SvcUuid
bl, err := bf.addBleSeqListener(r.Seq)
if err != nil {
@@ -492,7 +507,7 @@ func (bf *BleFsm) discSvcUuid() error {
}
defer bf.removeBleSeqListener(r.Seq)
- bf.nmpSvc, err = discSvcUuid(bf.bx, bl, r)
+ bf.nmpSvc, err = discSvcUuid(bf.params.Bx, bl, r)
if err != nil {
return err
}
@@ -512,16 +527,16 @@ func (bf *BleFsm) discAllChrs() error {
}
defer bf.removeBleSeqListener(r.Seq)
- chrs, err := discAllChrs(bf.bx, bl, r)
+ chrs, err := discAllChrs(bf.params.Bx, bl, r)
if err != nil {
return err
}
for _, c := range chrs {
- if CompareUuids(bf.reqChrUuid, c.Uuid) == 0 {
+ if CompareUuids(bf.params.ReqChrUuid, c.Uuid) == 0 {
bf.nmpReqChr = c
}
- if CompareUuids(bf.rspChrUuid, c.Uuid) == 0 {
+ if CompareUuids(bf.params.RspChrUuid, c.Uuid) == 0 {
bf.nmpRspChr = c
}
}
@@ -529,13 +544,13 @@ func (bf *BleFsm) discAllChrs() error {
if bf.nmpReqChr == nil {
return fmt.Errorf(
"Peer doesn't support required characteristic: %s",
- bf.reqChrUuid.String())
+ bf.params.ReqChrUuid.String())
}
if bf.nmpRspChr == nil {
return fmt.Errorf(
"Peer doesn't support required characteristic: %s",
- bf.rspChrUuid.String())
+ bf.params.RspChrUuid.String())
}
return nil
@@ -551,7 +566,7 @@ func (bf *BleFsm) exchangeMtu() error {
}
defer bf.removeBleSeqListener(r.Seq)
- mtu, err := exchangeMtu(bf.bx, bl, r)
+ mtu, err := exchangeMtu(bf.params.Bx, bl, r)
if err != nil {
return err
}
@@ -572,7 +587,7 @@ func (bf *BleFsm) writeCmd(data []byte) error {
}
defer bf.removeBleSeqListener(r.Seq)
- if err := writeCmd(bf.bx, bl, r); err != nil {
+ if err := writeCmd(bf.params.Bx, bl, r); err != nil {
return err
}
@@ -591,7 +606,7 @@ func (bf *BleFsm) subscribe() error {
}
defer bf.removeBleSeqListener(r.Seq)
- if err := writeCmd(bf.bx, bl, r); err != nil {
+ if err := writeCmd(bf.params.Bx, bl, r); err != nil {
return err
}
@@ -607,17 +622,20 @@ func (bf *BleFsm) tryFillPeerDev() bool {
// If a peer address is specified, fill in the peer field now so the
// scanning step can be skipped. Otherwise, the peer field gets populated
// during scanning.
- if bf.peerSpec.ScanPred == nil {
- bf.peerDev = &bf.peerSpec.Dev
+ if bf.params.PeerSpec.ScanPred == nil {
+ bf.peerDev = &bf.params.PeerSpec.Dev
return true
}
return false
}
-func (bf *BleFsm) Start() error {
+// @return bool Whether another start attempt should be made;
+// error The error that caused the start attempt to
+// fail; nil on success.
+func (bf *BleFsm) Start() (bool, error) {
if bf.getState() != SESN_STATE_UNCONNECTED {
- return nmxutil.NewSesnAlreadyOpenError(
+ return false, nmxutil.NewSesnAlreadyOpenError(
"Attempt to open an already-open BLE session")
}
@@ -653,7 +671,7 @@ func (bf *BleFsm) Start() error {
}
if err != nil {
- return err
+ return false, err
}
case SESN_STATE_CONNECTED:
@@ -664,7 +682,9 @@ func (bf *BleFsm) Start() error {
SESN_STATE_EXCHANGED_MTU,
cb)
if err != nil {
- return err
+ bhe := nmxutil.ToBleHost(err)
+ retry := bhe != nil && bhe.Status == ERR_CODE_ENOTCONN
+ return retry, err
}
case SESN_STATE_EXCHANGED_MTU:
@@ -675,7 +695,7 @@ func (bf *BleFsm) Start() error {
SESN_STATE_DISCOVERED_SVC,
cb)
if err != nil {
- return err
+ return false, err
}
case SESN_STATE_DISCOVERED_SVC:
@@ -689,22 +709,22 @@ func (bf *BleFsm) Start() error {
SESN_STATE_DISCOVERED_CHR,
cb)
if err != nil {
- return err
+ return false, err
}
if err := bf.subscribe(); err != nil {
- return err
+ return false, err
}
case SESN_STATE_DISCOVERED_CHR:
/* Open complete. */
- return nil
+ return false, nil
case SESN_STATE_CONNECTING,
SESN_STATE_DISCOVERING_SVC,
SESN_STATE_DISCOVERING_CHR,
SESN_STATE_TERMINATING:
- return fmt.Errorf("BleFsm already being opened")
+ return false, fmt.Errorf("BleFsm already being opened")
}
}
}
@@ -740,6 +760,10 @@ func (bf *BleFsm) IsOpen() bool {
return bf.getState() == SESN_STATE_DISCOVERED_CHR
}
+func (bf *BleFsm) IsClosed() bool {
+ return bf.getState() == SESN_STATE_UNCONNECTED
+}
+
func (bf *BleFsm) TxNmp(payload []byte, nl *nmp.NmpListener,
timeout time.Duration) (nmp.NmpRsp, error) {
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/ff255cbf/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
----------------------------------------------------------------------
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
index 365efc6..4001960 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
@@ -5,17 +5,18 @@ import (
"sync"
"time"
+ "mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/omp"
"mynewt.apache.org/newtmgr/nmxact/sesn"
- "mynewt.apache.org/newt/util"
)
type BleOicSesn struct {
bf *BleFsm
nls map[*nmp.NmpListener]struct{}
od *omp.OmpDispatcher
+ connTries int
closeTimeout time.Duration
onCloseCb sesn.BleOnCloseFn
@@ -27,6 +28,7 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
bos := &BleOicSesn{
nls: map[*nmp.NmpListener]struct{}{},
od: omp.NewOmpDispatcher(),
+ connTries: cfg.Ble.ConnTries,
closeTimeout: cfg.Ble.CloseTimeout,
onCloseCb: cfg.Ble.OnCloseCb,
}
@@ -47,14 +49,16 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
}
bos.bf = NewBleFsm(BleFsmParams{
- Bx: bx,
- OwnAddrType: cfg.Ble.OwnAddrType,
- PeerSpec: cfg.Ble.PeerSpec,
- SvcUuid: svcUuid,
- ReqChrUuid: reqChrUuid,
- RspChrUuid: rspChrUuid,
- RxNmpCb: func(d []byte) { bos.onRxNmp(d) },
- DisconnectCb: func(p BleDev, e error) { bos.onDisconnect(p, e) },
+ Bx: bx,
+ OwnAddrType: cfg.Ble.OwnAddrType,
+ PeerSpec: cfg.Ble.PeerSpec,
+ SvcUuid: svcUuid,
+ ReqChrUuid: reqChrUuid,
+ RspChrUuid: rspChrUuid,
+ RxNmpCb: func(d []byte) { bos.onRxNmp(d) },
+ DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
+ bos.onDisconnect(dt, p, e)
+ },
})
return bos
@@ -80,16 +84,16 @@ func (bos *BleOicSesn) removeNmpListener(seq uint8) {
}
// Returns true if a new channel was assigned.
-func (bos *BleOicSesn) setCloseChan() bool {
+func (bos *BleOicSesn) setCloseChan() error {
bos.mx.Lock()
defer bos.mx.Unlock()
if bos.closeChan != nil {
- return false
+ return fmt.Errorf("Multiple listeners waiting for session to close")
}
bos.closeChan = make(chan error, 1)
- return true
+ return nil
}
func (bos *BleOicSesn) clearCloseChan() {
@@ -99,18 +103,51 @@ func (bos *BleOicSesn) clearCloseChan() {
bos.closeChan = nil
}
+func (bos *BleOicSesn) listenForClose(timeout time.Duration) error {
+ select {
+ case <-bos.closeChan:
+ return nil
+ case <-time.After(timeout):
+ // Session never closed.
+ return fmt.Errorf("Timeout while waiting for session to close")
+ }
+}
+
+func (bos *BleOicSesn) blockUntilClosed(timeout time.Duration) error {
+ if err := bos.setCloseChan(); err != nil {
+ return err
+ }
+ defer bos.clearCloseChan()
+
+ // If the session is already closed, we're done.
+ if bos.bf.IsClosed() {
+ return nil
+ }
+
+ // Block until close completes or timeout.
+ return bos.listenForClose(timeout)
+}
+
func (bos *BleOicSesn) AbortRx(seq uint8) error {
return bos.od.FakeRxError(seq, fmt.Errorf("Rx aborted"))
}
func (bos *BleOicSesn) Open() error {
- return bos.bf.Start()
+ var err error
+ for i := 0; i < bos.connTries; i++ {
+ var retry bool
+ retry, err = bos.bf.Start()
+ if !retry {
+ break
+ }
+ }
+
+ return err
}
func (bos *BleOicSesn) Close() error {
- if !bos.setCloseChan() {
- return bos.bf.closedError(
- "Attempt to close an unopened BLE session")
+ if err := bos.setCloseChan(); err != nil {
+ return err
}
defer bos.clearCloseChan()
@@ -125,12 +162,7 @@ func (bos *BleOicSesn) Close() error {
}
// Block until close completes or timeout.
- select {
- case <-bos.closeChan:
- case <-time.After(bos.closeTimeout):
- }
-
- return nil
+ return bos.listenForClose(bos.closeTimeout)
}
func (bos *BleOicSesn) IsOpen() bool {
@@ -141,7 +173,9 @@ func (bos *BleOicSesn) onRxNmp(data []byte) {
bos.od.Dispatch(data)
}
-func (bos *BleOicSesn) onDisconnect(peer BleDev, err error) {
+func (bos *BleOicSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
+ err error) {
+
for nl, _ := range bos.nls {
nl.ErrChan <- err
}
@@ -150,7 +184,10 @@ func (bos *BleOicSesn) onDisconnect(peer BleDev, err error) {
if bos.closeChan != nil {
bos.closeChan <- err
}
- if bos.onCloseCb != nil {
+
+ // Only execute client's disconnect callback if the disconnect was
+ // unsolicited and the session was fully open.
+ if dt == FSM_DISCONNECT_TYPE_OPENED && bos.onCloseCb != nil {
bos.onCloseCb(bos, peer, err)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/ff255cbf/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
----------------------------------------------------------------------
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
index a8a46d6..012a301 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
@@ -5,16 +5,17 @@ import (
"sync"
"time"
+ "mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/sesn"
- "mynewt.apache.org/newt/util"
)
type BlePlainSesn struct {
bf *BleFsm
nls map[*nmp.NmpListener]struct{}
nd *nmp.NmpDispatcher
+ connTries int
closeTimeout time.Duration
onCloseCb sesn.BleOnCloseFn
@@ -26,6 +27,7 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
bps := &BlePlainSesn{
nls: map[*nmp.NmpListener]struct{}{},
nd: nmp.NewNmpDispatcher(),
+ connTries: cfg.Ble.ConnTries,
closeTimeout: cfg.Ble.CloseTimeout,
onCloseCb: cfg.Ble.OnCloseCb,
}
@@ -41,14 +43,16 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
}
bps.bf = NewBleFsm(BleFsmParams{
- Bx: bx,
- OwnAddrType: cfg.Ble.OwnAddrType,
- PeerSpec: cfg.Ble.PeerSpec,
- SvcUuid: svcUuid,
- ReqChrUuid: chrUuid,
- RspChrUuid: chrUuid,
- RxNmpCb: func(d []byte) { bps.onRxNmp(d) },
- DisconnectCb: func(p BleDev, e error) { bps.onDisconnect(p, e) },
+ Bx: bx,
+ OwnAddrType: cfg.Ble.OwnAddrType,
+ PeerSpec: cfg.Ble.PeerSpec,
+ SvcUuid: svcUuid,
+ ReqChrUuid: chrUuid,
+ RspChrUuid: chrUuid,
+ RxNmpCb: func(d []byte) { bps.onRxNmp(d) },
+ DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
+ bps.onDisconnect(dt, p, e)
+ },
})
return bps
@@ -73,17 +77,16 @@ func (bps *BlePlainSesn) removeNmpListener(seq uint8) {
}
}
-// Returns true if a new channel was assigned.
-func (bps *BlePlainSesn) setCloseChan() bool {
+func (bps *BlePlainSesn) setCloseChan() error {
bps.mx.Lock()
defer bps.mx.Unlock()
if bps.closeChan != nil {
- return false
+ return fmt.Errorf("Multiple listeners waiting for session to close")
}
bps.closeChan = make(chan error, 1)
- return true
+ return nil
}
func (bps *BlePlainSesn) clearCloseChan() {
@@ -93,18 +96,57 @@ func (bps *BlePlainSesn) clearCloseChan() {
bps.closeChan = nil
}
+func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error {
+ select {
+ case <-bps.closeChan:
+ return nil
+ case <-time.After(timeout):
+ // Session never closed.
+ return fmt.Errorf("Timeout while waiting for session to close")
+ }
+}
+
+func (bps *BlePlainSesn) blockUntilClosed(timeout time.Duration) error {
+ if err := bps.setCloseChan(); err != nil {
+ return err
+ }
+ defer bps.clearCloseChan()
+
+ // If the session is already closed, we're done.
+ if bps.bf.IsClosed() {
+ return nil
+ }
+
+ // Block until close completes or timeout.
+ return bps.listenForClose(timeout)
+}
+
func (bps *BlePlainSesn) AbortRx(seq uint8) error {
return bps.nd.FakeRxError(seq, fmt.Errorf("Rx aborted"))
}
func (bps *BlePlainSesn) Open() error {
- return bps.bf.Start()
+ var err error
+ for i := 0; i < bps.connTries; i++ {
+ var retry bool
+ retry, err = bps.bf.Start()
+ if !retry {
+ break
+ }
+
+ if bps.blockUntilClosed(1*time.Second) != nil {
+ // Just close the session manually and report the original error.
+ bps.Close()
+ return err
+ }
+ }
+
+ return err
}
func (bps *BlePlainSesn) Close() error {
- if !bps.setCloseChan() {
- return bps.bf.closedError(
- "Attempt to close an unopened BLE session")
+ if err := bps.setCloseChan(); err != nil {
+ return err
}
defer bps.clearCloseChan()
@@ -119,12 +161,7 @@ func (bps *BlePlainSesn) Close() error {
}
// Block until close completes or timeout.
- select {
- case <-bps.closeChan:
- case <-time.After(bps.closeTimeout):
- }
-
- return nil
+ return bps.listenForClose(bps.closeTimeout)
}
func (bps *BlePlainSesn) IsOpen() bool {
@@ -135,16 +172,21 @@ func (bps *BlePlainSesn) onRxNmp(data []byte) {
bps.nd.Dispatch(data)
}
-func (bps *BlePlainSesn) onDisconnect(peer BleDev, err error) {
+func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
+ err error) {
+
for nl, _ := range bps.nls {
nl.ErrChan <- err
}
- // If the session is being closed, unblock the close() call.
+ // If someone is waiting for the session to close, unblock them.
if bps.closeChan != nil {
bps.closeChan <- err
}
- if bps.onCloseCb != nil {
+
+ // Only execute client's disconnect callback if the disconnect was
+ // unsolicited and the session was fully open.
+ if dt == FSM_DISCONNECT_TYPE_OPENED && bps.onCloseCb != nil {
bps.onCloseCb(bps, peer, err)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/ff255cbf/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/sesn/sesn_cfg.go
----------------------------------------------------------------------
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/sesn/sesn_cfg.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/sesn/sesn_cfg.go
index e18711d..9c84db1 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/sesn/sesn_cfg.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/sesn/sesn_cfg.go
@@ -44,6 +44,7 @@ func BlePeerSpecName(name string) BlePeerSpec {
type SesnCfgBle struct {
OwnAddrType bledefs.BleAddrType
PeerSpec BlePeerSpec
+ ConnTries int
CloseTimeout time.Duration
OnCloseCb BleOnCloseFn
}
@@ -59,6 +60,7 @@ type SesnCfg struct {
func NewSesnCfg() SesnCfg {
return SesnCfg{
Ble: SesnCfgBle{
+ ConnTries: 3,
CloseTimeout: 5 * time.Second,
},
}