You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/04/11 05:32:18 UTC
incubator-mynewt-newtmgr git commit: nmxact - misc cleanup.
Repository: incubator-mynewt-newtmgr
Updated Branches:
refs/heads/master 6c9269d72 -> e5dcf07e4
nmxact - misc cleanup.
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/e5dcf07e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/e5dcf07e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/e5dcf07e
Branch: refs/heads/master
Commit: e5dcf07e47fc4198cadf9c77304dc41772095f68
Parents: 6c9269d
Author: Christopher Collins <cc...@apache.org>
Authored: Mon Apr 10 22:25:22 2017 -0700
Committer: Christopher Collins <cc...@apache.org>
Committed: Mon Apr 10 22:30:19 2017 -0700
----------------------------------------------------------------------
newtmgr/newtmgr.go | 7 +-
nmxact/bledefs/bledefs.go | 6 +-
nmxact/example/ble_dual/ble_dual.go | 8 +-
nmxact/nmble/ble_act.go | 103 +++++++-------
nmxact/nmble/ble_fsm.go | 225 ++++++++++++-------------------
nmxact/nmble/ble_proto.go | 4 +-
nmxact/nmble/ble_xport.go | 1 +
nmxact/nmxutil/nmxutil.go | 77 +++++++++++
nmxact/sesn/sesn_cfg.go | 2 +-
9 files changed, 229 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/newtmgr/newtmgr.go
----------------------------------------------------------------------
diff --git a/newtmgr/newtmgr.go b/newtmgr/newtmgr.go
index 0307020..f1f10b5 100644
--- a/newtmgr/newtmgr.go
+++ b/newtmgr/newtmgr.go
@@ -25,10 +25,10 @@ import (
"os/signal"
"syscall"
+ "mynewt.apache.org/newt/util"
"mynewt.apache.org/newtmgr/newtmgr/cli"
"mynewt.apache.org/newtmgr/newtmgr/config"
"mynewt.apache.org/newtmgr/nmxact/nmserial"
- "mynewt.apache.org/newt/util"
)
func main() {
@@ -38,11 +38,6 @@ func main() {
}
onExit := func() {
- s, err := cli.GetSesnIfOpen()
- if err == nil {
- s.Close()
- }
-
x, err := cli.GetXportIfOpen()
if err == nil {
// Don't attempt to close a serial transport. Attempting to close
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/bledefs/bledefs.go
----------------------------------------------------------------------
diff --git a/nmxact/bledefs/bledefs.go b/nmxact/bledefs/bledefs.go
index a64da61..b811dc0 100644
--- a/nmxact/bledefs/bledefs.go
+++ b/nmxact/bledefs/bledefs.go
@@ -48,7 +48,7 @@ var BleAddrTypeStringMap = map[BleAddrType]string{
func BleAddrTypeToString(addrType BleAddrType) string {
s := BleAddrTypeStringMap[addrType]
if s == "" {
- panic(fmt.Sprintf("Invalid BleAddrType: %d", int(addrType)))
+ return "???"
}
return s
@@ -241,7 +241,7 @@ var BleScanFilterPolicyStringMap = map[BleScanFilterPolicy]string{
func BleScanFilterPolicyToString(filtPolicy BleScanFilterPolicy) string {
s := BleScanFilterPolicyStringMap[filtPolicy]
if s == "" {
- panic(fmt.Sprintf("Invalid BleScanFilterPolicy: %d", int(filtPolicy)))
+ return "???"
}
return s
@@ -295,7 +295,7 @@ var BleAdvEventTypeStringMap = map[BleAdvEventType]string{
func BleAdvEventTypeToString(advEventType BleAdvEventType) string {
s := BleAdvEventTypeStringMap[advEventType]
if s == "" {
- panic(fmt.Sprintf("Invalid BleAdvEventType: %d", int(advEventType)))
+ return "???"
}
return s
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/example/ble_dual/ble_dual.go
----------------------------------------------------------------------
diff --git a/nmxact/example/ble_dual/ble_dual.go b/nmxact/example/ble_dual/ble_dual.go
index bedff95..7a43b45 100644
--- a/nmxact/example/ble_dual/ble_dual.go
+++ b/nmxact/example/ble_dual/ble_dual.go
@@ -36,10 +36,6 @@ import (
func configExitHandler(x xport.Xport, s sesn.Sesn) {
onExit := func() {
- if s.IsOpen() {
- s.Close()
- }
-
x.Stop()
}
@@ -70,8 +66,8 @@ func sendOne(s sesn.Sesn) {
if !s.IsOpen() {
// Connect to the peer (open the session).
if err := s.Open(); err != nil {
- fmt.Fprintf(os.Stderr, "error starting BLE session: %s\n",
- err.Error())
+ fmt.Fprintf(os.Stderr, "error starting BLE session: %s (%+v)\n",
+ err.Error(), err)
return
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/nmble/ble_act.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go
index e5e336c..e924d2a 100644
--- a/nmxact/nmble/ble_act.go
+++ b/nmxact/nmble/ble_act.go
@@ -28,6 +28,8 @@ func connect(x *BleXport, connChan chan error, r *BleConnectReq) error {
// Blocking
func terminate(x *BleXport, bl *BleListener, r *BleTerminateReq) error {
+ const rspType = MSG_TYPE_TERMINATE
+
j, err := json.Marshal(r)
if err != nil {
return err
@@ -47,9 +49,7 @@ func terminate(x *BleXport, bl *BleListener, r *BleTerminateReq) error {
case *BleTerminateRsp:
bl.Acked = true
if msg.Status != 0 {
- return StatusError(MSG_OP_RSP,
- MSG_TYPE_TERMINATE,
- msg.Status)
+ return StatusError(MSG_OP_RSP, rspType, msg.Status)
} else {
return nil
}
@@ -58,12 +58,14 @@ func terminate(x *BleXport, bl *BleListener, r *BleTerminateReq) error {
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return BhdTimeoutError(MSG_TYPE_TERMINATE)
+ return BhdTimeoutError(rspType)
}
}
}
func connCancel(x *BleXport, bl *BleListener, r *BleConnCancelReq) error {
+ const rspType = MSG_TYPE_CONN_CANCEL
+
j, err := json.Marshal(r)
if err != nil {
return err
@@ -83,9 +85,7 @@ func connCancel(x *BleXport, bl *BleListener, r *BleConnCancelReq) error {
case *BleConnCancelRsp:
bl.Acked = true
if msg.Status != 0 {
- return StatusError(MSG_OP_RSP,
- MSG_TYPE_CONN_CANCEL,
- msg.Status)
+ return StatusError(MSG_OP_RSP, rspType, msg.Status)
} else {
return nil
}
@@ -94,7 +94,7 @@ func connCancel(x *BleXport, bl *BleListener, r *BleConnCancelReq) error {
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return BhdTimeoutError(MSG_TYPE_TERMINATE)
+ return BhdTimeoutError(rspType)
}
}
}
@@ -103,6 +103,9 @@ func connCancel(x *BleXport, bl *BleListener, r *BleConnCancelReq) error {
func discSvcUuid(x *BleXport, bl *BleListener, r *BleDiscSvcUuidReq) (
*BleSvc, error) {
+ const rspType = MSG_TYPE_DISC_SVC_UUID
+ const evtType = MSG_TYPE_DISC_SVC_EVT
+
j, err := json.Marshal(r)
if err != nil {
return nil, err
@@ -123,9 +126,7 @@ func discSvcUuid(x *BleXport, bl *BleListener, r *BleDiscSvcUuidReq) (
case *BleDiscSvcUuidRsp:
bl.Acked = true
if msg.Status != 0 {
- return nil, StatusError(MSG_OP_RSP,
- MSG_TYPE_DISC_SVC_UUID,
- msg.Status)
+ return nil, StatusError(MSG_OP_RSP, rspType, msg.Status)
}
case *BleDiscSvcEvt:
@@ -141,16 +142,14 @@ func discSvcUuid(x *BleXport, bl *BleListener, r *BleDiscSvcUuidReq) (
}
return svc, nil
default:
- return nil, StatusError(MSG_OP_EVT,
- MSG_TYPE_DISC_SVC_EVT,
- msg.Status)
+ return nil, StatusError(MSG_OP_EVT, evtType, msg.Status)
}
default:
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return nil, BhdTimeoutError(MSG_TYPE_DISC_SVC_UUID)
+ return nil, BhdTimeoutError(rspType)
}
}
}
@@ -159,6 +158,9 @@ func discSvcUuid(x *BleXport, bl *BleListener, r *BleDiscSvcUuidReq) (
func discAllChrs(x *BleXport, bl *BleListener, r *BleDiscAllChrsReq) (
[]*BleChr, error) {
+ const rspType = MSG_TYPE_DISC_ALL_CHRS
+ const evtType = MSG_TYPE_DISC_CHR_EVT
+
j, err := json.Marshal(r)
if err != nil {
return nil, err
@@ -179,9 +181,7 @@ func discAllChrs(x *BleXport, bl *BleListener, r *BleDiscAllChrsReq) (
case *BleDiscAllChrsRsp:
bl.Acked = true
if msg.Status != 0 {
- return nil, StatusError(MSG_OP_RSP,
- MSG_TYPE_DISC_ALL_CHRS,
- msg.Status)
+ return nil, StatusError(MSG_OP_RSP, rspType, msg.Status)
}
case *BleDiscChrEvt:
@@ -191,22 +191,22 @@ func discAllChrs(x *BleXport, bl *BleListener, r *BleDiscAllChrsReq) (
case ERR_CODE_EDONE:
return chrs, nil
default:
- return nil, StatusError(MSG_OP_EVT,
- MSG_TYPE_DISC_CHR_EVT,
- msg.Status)
+ return nil, StatusError(MSG_OP_EVT, evtType, msg.Status)
}
default:
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return nil, BhdTimeoutError(MSG_TYPE_DISC_ALL_CHRS)
+ return nil, BhdTimeoutError(rspType)
}
}
}
// Blocking.
func writeCmd(x *BleXport, bl *BleListener, r *BleWriteCmdReq) error {
+ const rspType = MSG_TYPE_WRITE_CMD
+
j, err := json.Marshal(r)
if err != nil {
return err
@@ -226,9 +226,7 @@ func writeCmd(x *BleXport, bl *BleListener, r *BleWriteCmdReq) error {
case *BleWriteCmdRsp:
bl.Acked = true
if msg.Status != 0 {
- return StatusError(MSG_OP_RSP,
- MSG_TYPE_WRITE_CMD,
- msg.Status)
+ return StatusError(MSG_OP_RSP, rspType, msg.Status)
} else {
return nil
}
@@ -237,7 +235,7 @@ func writeCmd(x *BleXport, bl *BleListener, r *BleWriteCmdReq) error {
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return BhdTimeoutError(MSG_TYPE_WRITE_CMD)
+ return BhdTimeoutError(rspType)
}
}
}
@@ -246,6 +244,9 @@ func writeCmd(x *BleXport, bl *BleListener, r *BleWriteCmdReq) error {
func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) (
int, error) {
+ const rspType = MSG_TYPE_EXCHANGE_MTU
+ const evtType = MSG_TYPE_MTU_CHANGE_EVT
+
j, err := json.Marshal(r)
if err != nil {
return 0, err
@@ -265,16 +266,12 @@ func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) (
case *BleExchangeMtuRsp:
bl.Acked = true
if msg.Status != 0 {
- return 0, StatusError(MSG_OP_RSP,
- MSG_TYPE_EXCHANGE_MTU,
- msg.Status)
+ return 0, StatusError(MSG_OP_RSP, rspType, msg.Status)
}
case *BleMtuChangeEvt:
if msg.Status != 0 {
- return 0, StatusError(MSG_OP_EVT,
- MSG_TYPE_MTU_CHANGE_EVT,
- msg.Status)
+ return 0, StatusError(MSG_OP_EVT, evtType, msg.Status)
} else {
return int(msg.Mtu), nil
}
@@ -283,7 +280,7 @@ func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) (
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return 0, BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU)
+ return 0, BhdTimeoutError(rspType)
}
}
}
@@ -291,9 +288,12 @@ func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) (
type scanSuccessFn func()
type advRptFn func(r BleAdvReport)
-func scan(x *BleXport, bl *BleListener, r *BleScanReq, abortChan chan struct{},
+func scan(x *BleXport, bl *BleListener, r *BleScanReq,
+ abortChan chan struct{},
scanSuccessCb scanSuccessFn, advRptCb advRptFn) error {
+ const rspType = MSG_TYPE_SCAN
+
j, err := json.Marshal(r)
if err != nil {
return err
@@ -313,7 +313,7 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq, abortChan chan struct{},
case *BleScanRsp:
bl.Acked = true
if msg.Status != 0 {
- return StatusError(MSG_OP_RSP, MSG_TYPE_SCAN, msg.Status)
+ return StatusError(MSG_OP_RSP, rspType, msg.Status)
} else {
scanSuccessCb()
}
@@ -329,7 +329,7 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq, abortChan chan struct{},
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return BhdTimeoutError(MSG_TYPE_SCAN)
+ return BhdTimeoutError(rspType)
case <-abortChan:
return nil
@@ -338,6 +338,8 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq, abortChan chan struct{},
}
func scanCancel(x *BleXport, bl *BleListener, r *BleScanCancelReq) error {
+ const rspType = MSG_TYPE_SCAN_CANCEL
+
j, err := json.Marshal(r)
if err != nil {
return err
@@ -357,7 +359,7 @@ func scanCancel(x *BleXport, bl *BleListener, r *BleScanCancelReq) error {
case *BleScanCancelRsp:
bl.Acked = true
if msg.Status != 0 {
- return StatusError(MSG_OP_RSP, MSG_TYPE_SCAN, msg.Status)
+ return StatusError(MSG_OP_RSP, rspType, msg.Status)
}
return nil
@@ -365,7 +367,7 @@ func scanCancel(x *BleXport, bl *BleListener, r *BleScanCancelReq) error {
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU)
+ return BhdTimeoutError(rspType)
}
}
}
@@ -373,7 +375,7 @@ func scanCancel(x *BleXport, bl *BleListener, r *BleScanCancelReq) error {
func connFind(x *BleXport, bl *BleListener, r *BleConnFindReq) (
BleConnDesc, error) {
- const msgType = MSG_TYPE_CONN_FIND
+ const rspType = MSG_TYPE_CONN_FIND
j, err := json.Marshal(r)
if err != nil {
@@ -395,7 +397,7 @@ func connFind(x *BleXport, bl *BleListener, r *BleConnFindReq) (
bl.Acked = true
if msg.Status != 0 {
return BleConnDesc{},
- StatusError(MSG_OP_RSP, msgType, msg.Status)
+ StatusError(MSG_OP_RSP, rspType, msg.Status)
}
return BleDescFromConnFindRsp(msg), nil
@@ -404,7 +406,7 @@ func connFind(x *BleXport, bl *BleListener, r *BleConnFindReq) (
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return BleConnDesc{}, BhdTimeoutError(msgType)
+ return BleConnDesc{}, BhdTimeoutError(rspType)
}
}
}
@@ -415,6 +417,8 @@ func connFind(x *BleXport, bl *BleListener, r *BleConnFindReq) (
func genRandAddr(x *BleXport, bl *BleListener, r *BleGenRandAddrReq) (
BleAddr, error) {
+ const rspType = MSG_TYPE_GEN_RAND_ADDR
+
j, err := json.Marshal(r)
if err != nil {
return BleAddr{}, err
@@ -432,8 +436,7 @@ func genRandAddr(x *BleXport, bl *BleListener, r *BleGenRandAddrReq) (
bl.Acked = true
if msg.Status != 0 {
return BleAddr{},
- StatusError(MSG_OP_RSP, MSG_TYPE_GEN_RAND_ADDR,
- msg.Status)
+ StatusError(MSG_OP_RSP, rspType, msg.Status)
}
return msg.Addr, nil
@@ -441,7 +444,7 @@ func genRandAddr(x *BleXport, bl *BleListener, r *BleGenRandAddrReq) (
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return BleAddr{}, BhdTimeoutError(MSG_TYPE_GEN_RAND_ADDR)
+ return BleAddr{}, BhdTimeoutError(rspType)
}
}
}
@@ -450,7 +453,7 @@ func genRandAddr(x *BleXport, bl *BleListener, r *BleGenRandAddrReq) (
// when the transport is starting up, and therefore does not require the
// transport to be synced. Only the transport should call this function.
func setRandAddr(x *BleXport, bl *BleListener, r *BleSetRandAddrReq) error {
- const msgType = MSG_TYPE_SET_RAND_ADDR
+ const rspType = MSG_TYPE_SET_RAND_ADDR
j, err := json.Marshal(r)
if err != nil {
@@ -468,7 +471,7 @@ func setRandAddr(x *BleXport, bl *BleListener, r *BleSetRandAddrReq) error {
case *BleSetRandAddrRsp:
bl.Acked = true
if msg.Status != 0 {
- return StatusError(MSG_OP_RSP, msgType, msg.Status)
+ return StatusError(MSG_OP_RSP, rspType, msg.Status)
}
return nil
@@ -476,7 +479,7 @@ func setRandAddr(x *BleXport, bl *BleListener, r *BleSetRandAddrReq) error {
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return BhdTimeoutError(msgType)
+ return BhdTimeoutError(rspType)
}
}
}
@@ -487,7 +490,7 @@ func setRandAddr(x *BleXport, bl *BleListener, r *BleSetRandAddrReq) error {
func setPreferredMtu(x *BleXport, bl *BleListener,
r *BleSetPreferredMtuReq) error {
- const msgType = MSG_TYPE_SET_PREFERRED_MTU
+ const rspType = MSG_TYPE_SET_PREFERRED_MTU
j, err := json.Marshal(r)
if err != nil {
@@ -505,7 +508,7 @@ func setPreferredMtu(x *BleXport, bl *BleListener,
case *BleSetPreferredMtuRsp:
bl.Acked = true
if msg.Status != 0 {
- return StatusError(MSG_OP_RSP, msgType, msg.Status)
+ return StatusError(MSG_OP_RSP, rspType, msg.Status)
}
return nil
@@ -513,7 +516,7 @@ func setPreferredMtu(x *BleXport, bl *BleListener,
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return BhdTimeoutError(msgType)
+ return BhdTimeoutError(rspType)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/nmble/ble_fsm.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index 03956dd..19f626d 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -7,6 +7,7 @@ import (
"path"
"runtime"
"sync"
+ "sync/atomic"
"time"
log "github.com/Sirupsen/logrus"
@@ -17,7 +18,11 @@ import (
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
-var curId int
+var nextId uint32
+
+func getNextId() uint32 {
+ return atomic.AddUint32(&nextId, 1) - 1
+}
var listenLog = &log.Logger{
Out: os.Stderr,
@@ -69,31 +74,23 @@ type BleFsmParams struct {
type BleFsm struct {
params BleFsmParams
- peerDev *BleDev
- connHandle uint16
- nmpSvc *BleSvc
- nmpReqChr *BleChr
- nmpRspChr *BleChr
- attMtu int
- connChan chan error
- lastStateChange time.Time
- id int
- curErr error
- errTimer *time.Timer
+ peerDev *BleDev
+ connHandle uint16
+ nmpSvc *BleSvc
+ nmpReqChr *BleChr
+ nmpRspChr *BleChr
+ attMtu int
+ connChan chan error
+ bls map[*BleListener]struct{}
+ state BleSesnState
+ errFunnel nmxutil.ErrFunnel
+ id uint32
// Protects all accesses to the FSM state variable.
stateMtx sync.Mutex
// Protects all accesses to the bls map.
blsMtx sync.Mutex
-
- // Prevents the session from being opened while it is still being reset
- // (cleaned up).
- openMtx sync.Mutex
-
- // These variables must be protected by the mutex.
- bls map[*BleListener]struct{}
- state BleSesnState
}
func NewBleFsm(p BleFsmParams) *BleFsm {
@@ -102,10 +99,12 @@ func NewBleFsm(p BleFsmParams) *BleFsm {
bls: map[*BleListener]struct{}{},
attMtu: DFLT_ATT_MTU,
- id: curId,
+ id: getNextId(),
}
- curId++
+ bf.errFunnel.AccumDelay = time.Second
+ bf.errFunnel.LessCb = fsmErrorLess
+ bf.errFunnel.ProcCb = func(err error) { bf.processErr(err) }
return bf
}
@@ -118,9 +117,8 @@ func (bf *BleFsm) disconnectError(reason int) error {
}
func (bf *BleFsm) closedError(msg string) error {
- return nmxutil.NewSesnClosedError(fmt.Sprintf(
- "%s; state=%d last-state-change=%s",
- msg, bf.getState(), bf.lastStateChange))
+ return nmxutil.NewSesnClosedError(
+ fmt.Sprintf("%s; state=%d", msg, bf.getState()))
}
func (bf *BleFsm) getState() BleSesnState {
@@ -130,16 +128,11 @@ func (bf *BleFsm) getState() BleSesnState {
return bf.state
}
-func (bf *BleFsm) setStateNoLock(toState BleSesnState) {
- bf.state = toState
- bf.lastStateChange = time.Now()
-}
-
func (bf *BleFsm) setState(toState BleSesnState) {
bf.stateMtx.Lock()
defer bf.stateMtx.Unlock()
- bf.setStateNoLock(toState)
+ bf.state = toState
}
func (bf *BleFsm) addBleListener(name string, base BleMsgBase) (
@@ -218,6 +211,23 @@ func (bf *BleFsm) logConnection() {
log.Debugf("BLE connection attempt succeeded; %s", desc.String())
}
+func fsmErrorLess(a error, b error) bool {
+ aIsXport := nmxutil.IsXport(a)
+ bIsXport := nmxutil.IsXport(b)
+ aIsDisc := nmxutil.IsBleSesnDisconnect(a)
+ bIsDisc := nmxutil.IsBleSesnDisconnect(b)
+
+ if aIsXport {
+ return false
+ }
+
+ if aIsDisc {
+ return !bIsXport && !bIsDisc
+ }
+
+ return false
+}
+
func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
switch state {
case SESN_STATE_EXCHANGE_MTU:
@@ -238,14 +248,15 @@ func (bf *BleFsm) errorAll(err error) {
bf.blsMtx.Lock()
defer bf.blsMtx.Unlock()
- for bl, _ := range bf.bls {
+ bls := bf.bls
+ bf.bls = map[*BleListener]struct{}{}
+
+ for bl, _ := range bls {
bl.ErrChan <- err
}
-
- bf.bls = map[*BleListener]struct{}{}
}
-func (bf *BleFsm) processErr() {
+func (bf *BleFsm) processErr(err error) {
// Remember some fields before we clear them.
dt := calcDisconnectType(bf.state)
@@ -254,67 +265,15 @@ func (bf *BleFsm) processErr() {
peer = *bf.peerDev
}
- err := bf.curErr
- bf.reset(err)
-
- bf.openMtx.Unlock()
-
- bf.params.DisconnectCb(dt, peer, err)
-}
-
-func (bf *BleFsm) onError(err error) {
- if bf.curErr == nil {
- // Subsequent start attempts will block until the reset is complete.
- bf.openMtx.Lock()
-
- bf.curErr = err
- bf.errTimer = time.AfterFunc(time.Second, func() {
- bf.processErr()
- })
- } else {
- var replace bool
- if nmxutil.IsXport(err) {
- replace = true
- } else if !nmxutil.IsXport(bf.curErr) &&
- nmxutil.IsBleSesnDisconnect(err) {
-
- replace = true
- } else if !nmxutil.IsXport(bf.curErr) &&
- !nmxutil.IsBleSesnDisconnect(bf.curErr) {
-
- replace = true
- } else {
- replace = false
- }
-
- if replace {
- if !bf.errTimer.Stop() {
- <-bf.errTimer.C
- }
- bf.curErr = err
- bf.errTimer.Reset(time.Second)
- }
- }
-}
-
-func (bf *BleFsm) reset(err error) {
bf.errorAll(err)
bf.stateMtx.Lock()
- defer bf.stateMtx.Unlock()
+ bf.state = SESN_STATE_UNCONNECTED
+ bf.stateMtx.Unlock()
- bf.setStateNoLock(SESN_STATE_UNCONNECTED)
bf.peerDev = nil
- bf.curErr = nil
-}
-// Blocks until the current reset is complete. If there is no reset in
-// progress, this function returns immediately. The purpose of this function
-// is to prevent the client from opening the session while it is still being
-// closed.
-func (bf *BleFsm) blockUntilReset() {
- bf.openMtx.Lock()
- bf.openMtx.Unlock()
+ go bf.params.DisconnectCb(dt, peer, err)
}
func (bf *BleFsm) connectListen(seq BleSeq) error {
@@ -326,15 +285,12 @@ func (bf *BleFsm) connectListen(seq BleSeq) error {
}
go func() {
- defer func() {
- bf.removeBleSeqListener("connect", seq)
- }()
+ defer bf.removeBleSeqListener("connect", seq)
for {
select {
case err := <-bl.ErrChan:
- // Transport reported error. Assume the connection has
- // dropped.
- bf.onError(err)
+ bf.connChan <- err
+ bf.errFunnel.Insert(err)
return
case bm := <-bl.BleChan:
@@ -347,7 +303,9 @@ func (bf *BleFsm) connectListen(seq BleSeq) error {
ErrCodeToString(msg.Status), msg.Status,
bf.peerDev.String())
log.Debugf(str)
- bf.connChan <- nmxutil.NewBleHostError(msg.Status, str)
+ err := nmxutil.NewBleHostError(msg.Status, str)
+ bf.connChan <- err
+ bf.errFunnel.Insert(err)
return
} else {
bf.connChan <- nil
@@ -360,6 +318,7 @@ func (bf *BleFsm) connectListen(seq BleSeq) error {
bf.logConnection()
if err := bf.nmpRspListen(); err != nil {
bf.connChan <- err
+ bf.errFunnel.Insert(err)
return
}
bf.connChan <- nil
@@ -369,7 +328,9 @@ func (bf *BleFsm) connectListen(seq BleSeq) error {
ErrCodeToString(msg.Status), msg.Status,
bf.peerDev.String())
log.Debugf(str)
- bf.connChan <- nmxutil.NewBleHostError(msg.Status, str)
+ err := nmxutil.NewBleHostError(msg.Status, str)
+ bf.connChan <- err
+ bf.errFunnel.Insert(err)
return
}
@@ -387,14 +348,16 @@ func (bf *BleFsm) connectListen(seq BleSeq) error {
case *BleDisconnectEvt:
err := bf.disconnectError(msg.Reason)
- bf.onError(err)
+ bf.errFunnel.Insert(err)
return
default:
}
case <-bl.AfterTimeout(bf.params.Bx.RspTimeout()):
- bf.connChan <- BhdTimeoutError(MSG_TYPE_CONNECT)
+ err := BhdTimeoutError(MSG_TYPE_CONNECT)
+ bf.connChan <- err
+ bf.errFunnel.Insert(err)
}
}
}()
@@ -415,13 +378,13 @@ func (bf *BleFsm) nmpRspListen() error {
}
go func() {
- defer func() {
- bf.removeBleBaseListener("nmp-rsp", base)
- }()
+ defer bf.removeBleBaseListener("nmp-rsp", base)
for {
select {
case <-bl.ErrChan:
- // The session encountered an error; stop listening.
+ if err != nil {
+ bf.errFunnel.Insert(err)
+ }
return
case bm := <-bl.BleChan:
switch msg := bm.(type) {
@@ -446,6 +409,7 @@ func (bf *BleFsm) connect() error {
r.PeerAddrType = bf.peerDev.AddrType
r.PeerAddr = bf.peerDev.Addr
+ // Initiating a connection requires dedicated master privileges.
if err := bf.params.Bx.AcquireMaster(); err != nil {
return err
}
@@ -455,11 +419,17 @@ func (bf *BleFsm) connect() error {
return err
}
+ // Tell blehostd to initiate connection.
if err := connect(bf.params.Bx, bf.connChan, r); err != nil {
- bf.params.Bx.ReleaseMaster()
+ bhe := nmxutil.ToBleHost(err)
+ if bhe != nil && bhe.Status == ERR_CODE_EDONE {
+ // Already connected.
+ return nil
+ }
return err
}
+ // Connection operation now in progress.
bf.state = SESN_STATE_CONNECTING
err := <-bf.connChan
@@ -488,6 +458,7 @@ func (bf *BleFsm) scan() error {
r.Passive = false
r.FilterDuplicates = true
+ // Scanning requires dedicated master privileges.
if err := bf.params.Bx.AcquireMaster(); err != nil {
return err
}
@@ -497,9 +468,7 @@ func (bf *BleFsm) scan() error {
if err != nil {
return err
}
- defer func() {
- bf.removeBleSeqListener("scan", r.Seq)
- }()
+ defer bf.removeBleSeqListener("scan", r.Seq)
abortChan := make(chan struct{}, 1)
@@ -539,9 +508,7 @@ func (bf *BleFsm) scanCancel() error {
if err != nil {
return err
}
- defer func() {
- bf.removeBleSeqListener("scan-cancel", r.Seq)
- }()
+ defer bf.removeBleSeqListener("scan-cancel", r.Seq)
if err := scanCancel(bf.params.Bx, bl, r); err != nil {
return err
@@ -563,7 +530,7 @@ func (bf *BleFsm) terminateSetState() error {
return fmt.Errorf(
"BLE terminate failed; session already being closed")
default:
- bf.setStateNoLock(SESN_STATE_TERMINATING)
+ bf.state = SESN_STATE_TERMINATING
}
return nil
@@ -582,9 +549,7 @@ func (bf *BleFsm) terminate() error {
if err != nil {
return err
}
- defer func() {
- bf.removeBleSeqListener("terminate", r.Seq)
- }()
+ defer bf.removeBleSeqListener("terminate", r.Seq)
if err := terminate(bf.params.Bx, bl, r); err != nil {
return err
@@ -600,9 +565,7 @@ func (bf *BleFsm) connCancel() error {
if err != nil {
return err
}
- defer func() {
- bf.removeBleSeqListener("conn-cancel", r.Seq)
- }()
+ defer bf.removeBleSeqListener("conn-cancel", r.Seq)
if err := connCancel(bf.params.Bx, bl, r); err != nil {
return err
@@ -620,9 +583,7 @@ func (bf *BleFsm) discSvcUuid() error {
if err != nil {
return err
}
- defer func() {
- bf.removeBleSeqListener("disc-svc-uuid", r.Seq)
- }()
+ defer bf.removeBleSeqListener("disc-svc-uuid", r.Seq)
bf.nmpSvc, err = discSvcUuid(bf.params.Bx, bl, r)
if err != nil {
@@ -642,9 +603,7 @@ func (bf *BleFsm) discAllChrs() error {
if err != nil {
return err
}
- defer func() {
- bf.removeBleSeqListener("disc-all-chrs", r.Seq)
- }()
+ defer bf.removeBleSeqListener("disc-all-chrs", r.Seq)
chrs, err := discAllChrs(bf.params.Bx, bl, r)
if err != nil {
@@ -683,9 +642,7 @@ func (bf *BleFsm) exchangeMtu() error {
if err != nil {
return err
}
- defer func() {
- bf.removeBleSeqListener("exchange-mtu", r.Seq)
- }()
+ defer bf.removeBleSeqListener("exchange-mtu", r.Seq)
mtu, err := exchangeMtu(bf.params.Bx, bl, r)
if err != nil {
@@ -706,9 +663,7 @@ func (bf *BleFsm) writeCmd(data []byte) error {
if err != nil {
return err
}
- defer func() {
- bf.removeBleSeqListener("write-cmd", r.Seq)
- }()
+ defer bf.removeBleSeqListener("write-cmd", r.Seq)
if err := writeCmd(bf.params.Bx, bl, r); err != nil {
return err
@@ -727,9 +682,7 @@ func (bf *BleFsm) subscribe() error {
if err != nil {
return err
}
- defer func() {
- bf.removeBleSeqListener("subscribe", r.Seq)
- }()
+ defer bf.removeBleSeqListener("subscribe", r.Seq)
if err := writeCmd(bf.params.Bx, bl, r); err != nil {
return err
@@ -820,7 +773,7 @@ func (bf *BleFsm) executeState() (bool, error) {
}
func (bf *BleFsm) startOnce() (bool, error) {
- bf.blockUntilReset()
+ bf.errFunnel.BlockUntilReset()
if !bf.IsClosed() {
return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
@@ -831,7 +784,7 @@ func (bf *BleFsm) startOnce() (bool, error) {
for {
retry, err := bf.executeState()
if err != nil {
- bf.onError(err)
+ bf.errFunnel.Insert(err)
return retry, err
} else if bf.getState() == SESN_STATE_DONE {
return false, nil
@@ -870,7 +823,7 @@ func (bf *BleFsm) Stop() (bool, error) {
bf.closedError("Attempt to close an unopened BLE session")
case SESN_STATE_CONNECTING:
- bf.onError(fmt.Errorf("Connection attempt cancelled"))
+ bf.errFunnel.Insert(fmt.Errorf("Connection attempt cancelled"))
return false, nil
default:
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/nmble/ble_proto.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_proto.go b/nmxact/nmble/ble_proto.go
index 6a99feb..a43348d 100644
--- a/nmxact/nmble/ble_proto.go
+++ b/nmxact/nmble/ble_proto.go
@@ -793,7 +793,7 @@ func ErrCodeToString(e int) string {
func MsgOpToString(op MsgOp) string {
s := MsgOpStringMap[op]
if s == "" {
- panic(fmt.Sprintf("Invalid MsgOp: %d", int(op)))
+ return "???"
}
return s
@@ -812,7 +812,7 @@ func MsgOpFromString(s string) (MsgOp, error) {
func MsgTypeToString(msgType MsgType) string {
s := MsgTypeStringMap[msgType]
if s == "" {
- panic(fmt.Sprintf("Invalid MsgType: %d", int(msgType)))
+ return "???"
}
return s
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/nmble/ble_xport.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index d87f0c8..d78a000 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -251,6 +251,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
bx.client.Stop()
}
+ // Indicate error to all clients who are waiting for the master resource.
bx.master.Abort(err)
// Indicate an error to all of this transport's listeners. This prevents
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/nmxutil/nmxutil.go
----------------------------------------------------------------------
diff --git a/nmxact/nmxutil/nmxutil.go b/nmxact/nmxutil/nmxutil.go
index cb299e9..e63a7e2 100644
--- a/nmxact/nmxutil/nmxutil.go
+++ b/nmxact/nmxutil/nmxutil.go
@@ -3,6 +3,7 @@ package nmxutil
import (
"math/rand"
"sync"
+ "time"
)
var nextNmpSeq uint8
@@ -100,3 +101,79 @@ func (s *SingleResource) Abort(err error) {
w <- err
}
}
+
+type ErrLessFn func(a error, b error) bool
+type ErrProcFn func(err error)
+
+// Aggregates errors that occur close in time. The most severe error gets
+// reported.
+type ErrFunnel struct {
+ LessCb ErrLessFn
+ ProcCb ErrProcFn
+ AccumDelay time.Duration
+
+ mtx sync.Mutex
+ resetMtx sync.Mutex
+ curErr error
+ errTimer *time.Timer
+}
+
+func (f *ErrFunnel) Insert(err error) {
+ if err == nil {
+ panic("ErrFunnel nil insert")
+ }
+
+ f.mtx.Lock()
+ defer f.mtx.Unlock()
+
+ if f.curErr == nil {
+ // Subsequent use attempts will block until the funnel is inactive.
+ f.resetMtx.Lock()
+
+ f.curErr = err
+ f.errTimer = time.AfterFunc(f.AccumDelay, func() {
+ f.timerExp()
+ })
+ } else {
+ if f.LessCb(f.curErr, err) {
+ if !f.errTimer.Stop() {
+ <-f.errTimer.C
+ }
+ f.curErr = err
+ f.errTimer.Reset(f.AccumDelay)
+ }
+ }
+}
+
+func (f *ErrFunnel) resetNoLock() {
+ if f.curErr != nil {
+ f.curErr = nil
+ f.errTimer.Stop()
+ f.resetMtx.Unlock()
+ }
+}
+
+func (f *ErrFunnel) Reset() {
+ f.mtx.Lock()
+ defer f.mtx.Unlock()
+
+ f.resetNoLock()
+}
+
+func (f *ErrFunnel) BlockUntilReset() {
+ f.resetMtx.Lock()
+ f.resetMtx.Unlock()
+}
+
+func (f *ErrFunnel) timerExp() {
+ f.mtx.Lock()
+ defer f.mtx.Unlock()
+
+ if f.curErr == nil {
+ panic("ErrFunnel timer expired but no error")
+ }
+
+ f.ProcCb(f.curErr)
+
+ f.resetNoLock()
+}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/sesn/sesn_cfg.go
----------------------------------------------------------------------
diff --git a/nmxact/sesn/sesn_cfg.go b/nmxact/sesn/sesn_cfg.go
index 3c57f24..60cdc81 100644
--- a/nmxact/sesn/sesn_cfg.go
+++ b/nmxact/sesn/sesn_cfg.go
@@ -61,7 +61,7 @@ func NewSesnCfg() SesnCfg {
return SesnCfg{
Ble: SesnCfgBle{
ConnTries: 3,
- CloseTimeout: 15 * time.Second,
+ CloseTimeout: 30 * time.Second,
},
}
}