You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/04/10 22:15:18 UTC
[1/2] incubator-mynewt-newtmgr git commit: nmxact - ble_dual example:
open+close on each tx.
Repository: incubator-mynewt-newtmgr
Updated Branches:
refs/heads/master 0f1365665 -> 6c9269d72
nmxact - ble_dual example: open+close on each tx.
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/744cb38f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/744cb38f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/744cb38f
Branch: refs/heads/master
Commit: 744cb38f1121ce3d777016cf69ee7f7d692dadea
Parents: 0f13656
Author: Christopher Collins <cc...@apache.org>
Authored: Sun Apr 9 14:25:55 2017 -0700
Committer: Christopher Collins <cc...@apache.org>
Committed: Sun Apr 9 14:25:55 2017 -0700
----------------------------------------------------------------------
nmxact/example/ble_dual/ble_dual.go | 8 ++------
1 file changed, 2 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/744cb38f/nmxact/example/ble_dual/ble_dual.go
----------------------------------------------------------------------
diff --git a/nmxact/example/ble_dual/ble_dual.go b/nmxact/example/ble_dual/ble_dual.go
index a0167c3..a69b8d5 100644
--- a/nmxact/example/ble_dual/ble_dual.go
+++ b/nmxact/example/ble_dual/ble_dual.go
@@ -21,12 +21,10 @@ package main
import (
"fmt"
- "math/rand"
"os"
"os/signal"
"sync"
"syscall"
- "time"
"mynewt.apache.org/newt/util"
"mynewt.apache.org/newtmgr/nmxact/bledefs"
@@ -96,6 +94,8 @@ func sendOne(s sesn.Sesn) {
eres := res.(*xact.EchoResult)
fmt.Printf("Peer echoed back: %s\n", eres.Rsp.Payload)
+
+ s.Close()
}
func main() {
@@ -153,17 +153,13 @@ func main() {
go func() {
for {
sendOne(s1)
- time.Sleep(time.Duration(rand.Uint32()%100) * time.Millisecond)
}
}()
wg.Add(1)
- //time.Sleep(2 * time.Second)
-
go func() {
for {
sendOne(s2)
- time.Sleep(time.Duration(rand.Uint32()%100) * time.Millisecond)
}
}()
[2/2] incubator-mynewt-newtmgr git commit: nmxact - Thread-safety
fixes.
Posted by cc...@apache.org.
nmxact - Thread-safety fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/6c9269d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/6c9269d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/6c9269d7
Branch: refs/heads/master
Commit: 6c9269d72f52bcb0f7b474cc18fe18d5d9934b3d
Parents: 744cb38
Author: Christopher Collins <cc...@apache.org>
Authored: Sun Apr 9 14:26:33 2017 -0700
Committer: Christopher Collins <cc...@apache.org>
Committed: Mon Apr 10 15:13:46 2017 -0700
----------------------------------------------------------------------
nmxact/example/ble_dual/ble_dual.go | 2 +-
nmxact/nmble/ble_act.go | 16 +-
nmxact/nmble/ble_fsm.go | 583 ++++++++++++++++++-------------
nmxact/nmble/ble_oic_sesn.go | 26 +-
nmxact/nmble/ble_plain_sesn.go | 41 +--
nmxact/nmble/ble_proto.go | 11 +-
nmxact/nmble/ble_util.go | 2 +-
nmxact/nmble/ble_xport.go | 57 +--
nmxact/nmble/dispatch.go | 6 +-
nmxact/nmserial/serial_xport.go | 4 +-
nmxact/nmxutil/nmxerr.go | 32 +-
nmxact/nmxutil/nmxutil.go | 77 ++++
nmxact/sesn/sesn.go | 4 +-
nmxact/sesn/sesn_cfg.go | 2 +-
14 files changed, 477 insertions(+), 386 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/example/ble_dual/ble_dual.go
----------------------------------------------------------------------
diff --git a/nmxact/example/ble_dual/ble_dual.go b/nmxact/example/ble_dual/ble_dual.go
index a69b8d5..bedff95 100644
--- a/nmxact/example/ble_dual/ble_dual.go
+++ b/nmxact/example/ble_dual/ble_dual.go
@@ -103,7 +103,7 @@ func main() {
params := nmble.NewXportCfg()
params.SockPath = "/tmp/blehostd-uds"
params.BlehostdPath = "blehostd.elf"
- params.DevPath = "/dev/cu.usbmodem142111"
+ params.DevPath = "/dev/cu.usbmodem141121"
x, err := nmble.NewBleXport(params)
if err != nil {
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_act.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go
index 90ac53e..e5e336c 100644
--- a/nmxact/nmble/ble_act.go
+++ b/nmxact/nmble/ble_act.go
@@ -288,10 +288,11 @@ func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) (
}
}
-type scanFn func(r BleAdvReport)
+type scanSuccessFn func()
+type advRptFn func(r BleAdvReport)
-func scan(x *BleXport, bl *BleListener, r *BleScanReq,
- abortChan chan struct{}, scanCb scanFn) error {
+func scan(x *BleXport, bl *BleListener, r *BleScanReq, abortChan chan struct{},
+ scanSuccessCb scanSuccessFn, advRptCb advRptFn) error {
j, err := json.Marshal(r)
if err != nil {
@@ -313,17 +314,22 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq,
bl.Acked = true
if msg.Status != 0 {
return StatusError(MSG_OP_RSP, MSG_TYPE_SCAN, msg.Status)
+ } else {
+ scanSuccessCb()
}
case *BleScanEvt:
r := BleAdvReportFromScanEvt(msg)
- scanCb(r)
+ advRptCb(r)
+
+ case *BleScanTmoEvt:
+ return nmxutil.NewScanTmoError("scan duration expired")
default:
}
case <-bl.AfterTimeout(x.RspTimeout()):
- return BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU)
+ return BhdTimeoutError(MSG_TYPE_SCAN)
case <-abortChan:
return nil
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_fsm.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index d198844..03956dd 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -3,6 +3,9 @@ package nmble
import (
"encoding/hex"
"fmt"
+ "os"
+ "path"
+ "runtime"
"sync"
"time"
@@ -14,6 +17,14 @@ import (
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
+var curId int
+
+var listenLog = &log.Logger{
+ Out: os.Stderr,
+ Formatter: new(log.TextFormatter),
+ Level: log.InfoLevel,
+}
+
const DFLT_ATT_MTU = 23
type BleSesnState int32
@@ -22,15 +33,13 @@ const (
SESN_STATE_UNCONNECTED BleSesnState = 0
SESN_STATE_SCANNING = 1
SESN_STATE_CONNECTING = 2
- SESN_STATE_CONNECTED = 3
- SESN_STATE_EXCHANGING_MTU = 4
- SESN_STATE_EXCHANGED_MTU = 5
- SESN_STATE_DISCOVERING_SVC = 6
- SESN_STATE_DISCOVERED_SVC = 7
- SESN_STATE_DISCOVERING_CHR = 8
- SESN_STATE_DISCOVERED_CHR = 9
- SESN_STATE_TERMINATING = 10
- SESN_STATE_CONN_CANCELLING = 11
+ SESN_STATE_EXCHANGE_MTU = 3
+ SESN_STATE_DISCOVER_SVC = 4
+ SESN_STATE_DISCOVER_CHR = 5
+ SESN_STATE_SUBSCRIBE = 6
+ SESN_STATE_DONE = 7
+ SESN_STATE_TERMINATING = 8
+ SESN_STATE_CONN_CANCELLING = 9
)
type BleFsmDisconnectType int
@@ -49,6 +58,7 @@ type BleFsmParams struct {
Bx *BleXport
OwnAddrType BleAddrType
PeerSpec sesn.BlePeerSpec
+ ConnTries int
SvcUuid BleUuid
ReqChrUuid BleUuid
RspChrUuid BleUuid
@@ -66,8 +76,20 @@ type BleFsm struct {
nmpRspChr *BleChr
attMtu int
connChan chan error
- mtx sync.Mutex
lastStateChange time.Time
+ id int
+ curErr error
+ errTimer *time.Timer
+
+ // Protects all accesses to the FSM state variable.
+ stateMtx sync.Mutex
+
+ // Protects all accesses to the bls map.
+ blsMtx sync.Mutex
+
+ // Prevents the session from being opened while it is still being reset
+ // (cleaned up).
+ openMtx sync.Mutex
// These variables must be protected by the mutex.
bls map[*BleListener]struct{}
@@ -80,8 +102,11 @@ func NewBleFsm(p BleFsmParams) *BleFsm {
bls: map[*BleListener]struct{}{},
attMtu: DFLT_ATT_MTU,
+ id: curId,
}
+ curId++
+
return bf
}
@@ -99,63 +124,36 @@ func (bf *BleFsm) closedError(msg string) error {
}
func (bf *BleFsm) getState() BleSesnState {
- bf.mtx.Lock()
- defer bf.mtx.Unlock()
+ bf.stateMtx.Lock()
+ defer bf.stateMtx.Unlock()
return bf.state
}
-func stateRequiresMaster(s BleSesnState) bool {
- return s == SESN_STATE_SCANNING || s == SESN_STATE_CONNECTING
-}
-
-func (bf *BleFsm) setStateNoLock(toState BleSesnState) error {
- if !stateRequiresMaster(bf.state) && stateRequiresMaster(toState) {
- if err := bf.params.Bx.AcquireMaster(); err != nil {
- return err
- }
- } else if stateRequiresMaster(bf.state) && !stateRequiresMaster(toState) {
- bf.params.Bx.ReleaseMaster()
- }
-
+func (bf *BleFsm) setStateNoLock(toState BleSesnState) {
bf.state = toState
bf.lastStateChange = time.Now()
-
- return nil
}
-func (bf *BleFsm) setState(toState BleSesnState) error {
- bf.mtx.Lock()
- defer bf.mtx.Unlock()
+func (bf *BleFsm) setState(toState BleSesnState) {
+ bf.stateMtx.Lock()
+ defer bf.stateMtx.Unlock()
- return bf.setStateNoLock(toState)
+ bf.setStateNoLock(toState)
}
-func (bf *BleFsm) transitionState(fromState BleSesnState,
- toState BleSesnState) error {
+func (bf *BleFsm) addBleListener(name string, base BleMsgBase) (
+ *BleListener, error) {
- bf.mtx.Lock()
- defer bf.mtx.Unlock()
+ _, file, line, _ := runtime.Caller(2)
+ file = path.Base(file)
+ listenLog.Debugf("[%d] {add-listener} [%s:%d] %s: base=%+v",
+ bf.id, file, line, name, base)
- if bf.state != fromState {
- return fmt.Errorf(
- "Can't set BleFsm state to %d; current state != required "+
- "value: %d",
- toState, fromState)
- }
-
- if err := bf.setStateNoLock(toState); err != nil {
- return err
- }
-
- return nil
-}
-
-func (bf *BleFsm) addBleListener(base BleMsgBase) (*BleListener, error) {
bl := NewBleListener()
- bf.mtx.Lock()
- defer bf.mtx.Unlock()
+ bf.blsMtx.Lock()
+ defer bf.blsMtx.Unlock()
if err := bf.params.Bx.Bd.AddListener(base, bl); err != nil {
return nil, err
@@ -165,32 +163,42 @@ func (bf *BleFsm) addBleListener(base BleMsgBase) (*BleListener, error) {
return bl, nil
}
-func (bf *BleFsm) addBleSeqListener(seq BleSeq) (*BleListener, error) {
+func (bf *BleFsm) addBleBaseListener(name string, base BleMsgBase) (
+ *BleListener, error) {
+
+ return bf.addBleListener(name, base)
+}
+
+func (bf *BleFsm) addBleSeqListener(name string, seq BleSeq) (
+ *BleListener, error) {
+
base := BleMsgBase{
Op: -1,
Type: -1,
Seq: seq,
ConnHandle: -1,
}
- bl, err := bf.addBleListener(base)
- if err != nil {
- return nil, err
- }
-
- return bl, nil
+ return bf.addBleListener(name, base)
}
-func (bf *BleFsm) removeBleListener(base BleMsgBase) {
- bf.mtx.Lock()
- defer bf.mtx.Unlock()
+func (bf *BleFsm) removeBleListener(name string, base BleMsgBase) {
+ _, file, line, _ := runtime.Caller(2)
+ file = path.Base(file)
+ listenLog.Debugf("[%d] {remove-listener} [%s:%d] %s: base=%+v",
+ bf.id, file, line, name, base)
+
+ bf.blsMtx.Lock()
+ defer bf.blsMtx.Unlock()
bl := bf.params.Bx.Bd.RemoveListener(base)
- if bl != nil {
- delete(bf.bls, bl)
- }
+ delete(bf.bls, bl)
}
-func (bf *BleFsm) removeBleSeqListener(seq BleSeq) {
+func (bf *BleFsm) removeBleBaseListener(name string, base BleMsgBase) {
+ bf.removeBleListener(name, base)
+}
+
+func (bf *BleFsm) removeBleSeqListener(name string, seq BleSeq) {
base := BleMsgBase{
Op: -1,
Type: -1,
@@ -198,30 +206,7 @@ func (bf *BleFsm) removeBleSeqListener(seq BleSeq) {
ConnHandle: -1,
}
- bf.removeBleListener(base)
-}
-
-func (bf *BleFsm) action(
- preState BleSesnState,
- inState BleSesnState,
- postState BleSesnState,
- cb func() error) error {
-
- if err := bf.transitionState(preState, inState); err != nil {
- return err
- }
-
- if err := cb(); err != nil {
- if err := bf.setState(preState); err != nil {
- return err
- }
- return err
- }
-
- if err := bf.setState(postState); err != nil {
- return err
- }
- return nil
+ bf.removeBleListener(name, base)
}
func (bf *BleFsm) logConnection() {
@@ -235,10 +220,10 @@ func (bf *BleFsm) logConnection() {
func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
switch state {
- case SESN_STATE_EXCHANGING_MTU:
+ case SESN_STATE_EXCHANGE_MTU:
return FSM_DISCONNECT_TYPE_IMMEDIATE_TIMEOUT
- case SESN_STATE_DISCOVERED_CHR:
+ case SESN_STATE_DONE:
return FSM_DISCONNECT_TYPE_OPENED
case SESN_STATE_TERMINATING, SESN_STATE_CONN_CANCELLING:
@@ -249,56 +234,107 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
}
}
-func (bf *BleFsm) resetState(err error) {
- bf.mtx.Lock()
+func (bf *BleFsm) errorAll(err error) {
+ bf.blsMtx.Lock()
+ defer bf.blsMtx.Unlock()
- if err := bf.setStateNoLock(SESN_STATE_UNCONNECTED); err != nil {
- // Change to unconnected state should never fail.
- panic(fmt.Sprintf(
- "BleFsm state change resulted in unexpected error: %s", err))
- }
- bf.peerDev = nil
-
- // Make a copy of all the listeners so we don't have to keep the mutex
- // locked while we send error signals to them.
- bls := make([]*BleListener, 0, len(bf.bls))
for bl, _ := range bf.bls {
- bls = append(bls, bl)
- }
-
- bf.mtx.Unlock()
-
- for _, bl := range bls {
bl.ErrChan <- err
}
+
+ bf.bls = map[*BleListener]struct{}{}
}
-func (bf *BleFsm) onDisconnect(err error) {
+func (bf *BleFsm) processErr() {
// Remember some fields before we clear them.
dt := calcDisconnectType(bf.state)
- peer := *bf.peerDev
- bf.resetState(err)
+ var peer BleDev
+ if bf.peerDev != nil {
+ peer = *bf.peerDev
+ }
+
+ err := bf.curErr
+ bf.reset(err)
+
+ bf.openMtx.Unlock()
bf.params.DisconnectCb(dt, peer, err)
}
+func (bf *BleFsm) onError(err error) {
+ if bf.curErr == nil {
+ // Subsequent start attempts will block until the reset is complete.
+ bf.openMtx.Lock()
+
+ bf.curErr = err
+ bf.errTimer = time.AfterFunc(time.Second, func() {
+ bf.processErr()
+ })
+ } else {
+ var replace bool
+ if nmxutil.IsXport(err) {
+ replace = true
+ } else if !nmxutil.IsXport(bf.curErr) &&
+ nmxutil.IsBleSesnDisconnect(err) {
+
+ replace = true
+ } else if !nmxutil.IsXport(bf.curErr) &&
+ !nmxutil.IsBleSesnDisconnect(bf.curErr) {
+
+ replace = true
+ } else {
+ replace = false
+ }
+
+ if replace {
+ if !bf.errTimer.Stop() {
+ <-bf.errTimer.C
+ }
+ bf.curErr = err
+ bf.errTimer.Reset(time.Second)
+ }
+ }
+}
+
+func (bf *BleFsm) reset(err error) {
+ bf.errorAll(err)
+
+ bf.stateMtx.Lock()
+ defer bf.stateMtx.Unlock()
+
+ bf.setStateNoLock(SESN_STATE_UNCONNECTED)
+ bf.peerDev = nil
+ bf.curErr = nil
+}
+
+// Blocks until the current reset is complete. If there is no reset in
+// progress, this function returns immediately. The purpose of this function
+// is to prevent the client from opening the session while it is still being
+// closed.
+func (bf *BleFsm) blockUntilReset() {
+ bf.openMtx.Lock()
+ bf.openMtx.Unlock()
+}
+
func (bf *BleFsm) connectListen(seq BleSeq) error {
bf.connChan = make(chan error, 1)
- bl, err := bf.addBleSeqListener(seq)
+ bl, err := bf.addBleSeqListener("connect", seq)
if err != nil {
return err
}
go func() {
- defer bf.removeBleSeqListener(seq)
+ defer func() {
+ bf.removeBleSeqListener("connect", seq)
+ }()
for {
select {
case err := <-bl.ErrChan:
// Transport reported error. Assume the connection has
// dropped.
- bf.onDisconnect(err)
+ bf.onError(err)
return
case bm := <-bl.BleChan:
@@ -313,6 +349,8 @@ func (bf *BleFsm) connectListen(seq BleSeq) error {
log.Debugf(str)
bf.connChan <- nmxutil.NewBleHostError(msg.Status, str)
return
+ } else {
+ bf.connChan <- nil
}
case *BleConnectEvt:
@@ -349,7 +387,7 @@ func (bf *BleFsm) connectListen(seq BleSeq) error {
case *BleDisconnectEvt:
err := bf.disconnectError(msg.Reason)
- bf.onDisconnect(err)
+ bf.onError(err)
return
default:
@@ -371,13 +409,15 @@ func (bf *BleFsm) nmpRspListen() error {
ConnHandle: int(bf.connHandle),
}
- bl, err := bf.addBleListener(base)
+ bl, err := bf.addBleBaseListener("nmp-rsp", base)
if err != nil {
return err
}
go func() {
- defer bf.removeBleListener(base)
+ defer func() {
+ bf.removeBleBaseListener("nmp-rsp", base)
+ }()
for {
select {
case <-bl.ErrChan:
@@ -406,15 +446,37 @@ func (bf *BleFsm) connect() error {
r.PeerAddrType = bf.peerDev.AddrType
r.PeerAddr = bf.peerDev.Addr
+ if err := bf.params.Bx.AcquireMaster(); err != nil {
+ return err
+ }
+ defer bf.params.Bx.ReleaseMaster()
+
if err := bf.connectListen(r.Seq); err != nil {
return err
}
if err := connect(bf.params.Bx, bf.connChan, r); err != nil {
+ bf.params.Bx.ReleaseMaster()
return err
}
- return nil
+ bf.state = SESN_STATE_CONNECTING
+
+ err := <-bf.connChan
+ if !nmxutil.IsXport(err) {
+ // The transport did not restart; always attempt to cancel the connect
+ // operation. In most cases, the host has already stopped connecting
+ // and will respond with an "ealready" error that can be ignored.
+ if err := bf.connCancel(); err != nil {
+ bhe := nmxutil.ToBleHost(err)
+ if bhe == nil || bhe.Status != ERR_CODE_EALREADY {
+ log.Errorf("Failed to cancel connect in progress: %s",
+ err.Error())
+ }
+ }
+ }
+
+ return err
}
func (bf *BleFsm) scan() error {
@@ -426,16 +488,26 @@ func (bf *BleFsm) scan() error {
r.Passive = false
r.FilterDuplicates = true
- bl, err := bf.addBleSeqListener(r.Seq)
+ if err := bf.params.Bx.AcquireMaster(); err != nil {
+ return err
+ }
+ defer bf.params.Bx.ReleaseMaster()
+
+ bl, err := bf.addBleSeqListener("scan", r.Seq)
if err != nil {
return err
}
- defer bf.removeBleSeqListener(r.Seq)
+ defer func() {
+ bf.removeBleSeqListener("scan", r.Seq)
+ }()
abortChan := make(chan struct{}, 1)
+ // This function gets called when scanning begins.
+ scanSuccessCb := func() { bf.state = SESN_STATE_SCANNING }
+
// This function gets called for each incoming advertisement.
- scanCb := func(r BleAdvReport) {
+ advRptCb := func(r BleAdvReport) {
// Ask client if we should connect to this advertiser.
if bf.params.PeerSpec.ScanPred(r) {
bf.peerDev = &r.Sender
@@ -443,22 +515,33 @@ func (bf *BleFsm) scan() error {
}
}
- if err := scan(bf.params.Bx, bl, r, abortChan, scanCb); err != nil {
- return err
+ err = scan(bf.params.Bx, bl, r, abortChan, scanSuccessCb, advRptCb)
+ if !nmxutil.IsXport(err) {
+ // The transport did not restart; always attempt to cancel the scan
+ // operation. In most cases, the host has already stopped scanning
+ // and will respond with an "ealready" error that can be ignored.
+ if err := bf.scanCancel(); err != nil {
+ bhe := nmxutil.ToBleHost(err)
+ if bhe == nil || bhe.Status != ERR_CODE_EALREADY {
+ log.Errorf("Failed to cancel scan in progress: %s",
+ err.Error())
+ }
+ }
}
- // Scanning still in progress; cancel the operation.
- return bf.scanCancel()
+ return err
}
func (bf *BleFsm) scanCancel() error {
r := NewBleScanCancelReq()
- bl, err := bf.addBleSeqListener(r.Seq)
+ bl, err := bf.addBleSeqListener("scan-cancel", r.Seq)
if err != nil {
return err
}
- defer bf.removeBleSeqListener(r.Seq)
+ defer func() {
+ bf.removeBleSeqListener("scan-cancel", r.Seq)
+ }()
if err := scanCancel(bf.params.Bx, bl, r); err != nil {
return err
@@ -468,8 +551,8 @@ func (bf *BleFsm) scanCancel() error {
}
func (bf *BleFsm) terminateSetState() error {
- bf.mtx.Lock()
- defer bf.mtx.Unlock()
+ bf.stateMtx.Lock()
+ defer bf.stateMtx.Unlock()
switch bf.state {
case SESN_STATE_UNCONNECTED,
@@ -480,11 +563,7 @@ func (bf *BleFsm) terminateSetState() error {
return fmt.Errorf(
"BLE terminate failed; session already being closed")
default:
- if err := bf.setStateNoLock(SESN_STATE_TERMINATING); err != nil {
- // Change to terminating state should never fail.
- panic(fmt.Sprintf(
- "BleFsm state change resulted in unexpected error: %s", err))
- }
+ bf.setStateNoLock(SESN_STATE_TERMINATING)
}
return nil
@@ -499,11 +578,13 @@ func (bf *BleFsm) terminate() error {
r.ConnHandle = bf.connHandle
r.HciReason = ERR_CODE_HCI_REM_USER_CONN_TERM
- bl, err := bf.addBleSeqListener(r.Seq)
+ bl, err := bf.addBleSeqListener("terminate", r.Seq)
if err != nil {
return err
}
- defer bf.removeBleSeqListener(r.Seq)
+ defer func() {
+ bf.removeBleSeqListener("terminate", r.Seq)
+ }()
if err := terminate(bf.params.Bx, bl, r); err != nil {
return err
@@ -513,19 +594,15 @@ func (bf *BleFsm) terminate() error {
}
func (bf *BleFsm) connCancel() error {
- if err := bf.transitionState(
- SESN_STATE_CONNECTING,
- SESN_STATE_CONN_CANCELLING); err != nil {
-
- return fmt.Errorf("BLE connect cancel failed; not connecting")
- }
-
r := NewBleConnCancelReq()
- bl, err := bf.addBleSeqListener(r.Seq)
+
+ bl, err := bf.addBleSeqListener("conn-cancel", r.Seq)
if err != nil {
return err
}
- defer bf.removeBleSeqListener(r.Seq)
+ defer func() {
+ bf.removeBleSeqListener("conn-cancel", r.Seq)
+ }()
if err := connCancel(bf.params.Bx, bl, r); err != nil {
return err
@@ -539,11 +616,13 @@ func (bf *BleFsm) discSvcUuid() error {
r.ConnHandle = bf.connHandle
r.Uuid = bf.params.SvcUuid
- bl, err := bf.addBleSeqListener(r.Seq)
+ bl, err := bf.addBleSeqListener("disc-svc-uuid", r.Seq)
if err != nil {
return err
}
- defer bf.removeBleSeqListener(r.Seq)
+ defer func() {
+ bf.removeBleSeqListener("disc-svc-uuid", r.Seq)
+ }()
bf.nmpSvc, err = discSvcUuid(bf.params.Bx, bl, r)
if err != nil {
@@ -559,11 +638,13 @@ func (bf *BleFsm) discAllChrs() error {
r.StartHandle = bf.nmpSvc.StartHandle
r.EndHandle = bf.nmpSvc.EndHandle
- bl, err := bf.addBleSeqListener(r.Seq)
+ bl, err := bf.addBleSeqListener("disc-all-chrs", r.Seq)
if err != nil {
return err
}
- defer bf.removeBleSeqListener(r.Seq)
+ defer func() {
+ bf.removeBleSeqListener("disc-all-chrs", r.Seq)
+ }()
chrs, err := discAllChrs(bf.params.Bx, bl, r)
if err != nil {
@@ -598,11 +679,13 @@ func (bf *BleFsm) exchangeMtu() error {
r := NewBleExchangeMtuReq()
r.ConnHandle = bf.connHandle
- bl, err := bf.addBleSeqListener(r.Seq)
+ bl, err := bf.addBleSeqListener("exchange-mtu", r.Seq)
if err != nil {
return err
}
- defer bf.removeBleSeqListener(r.Seq)
+ defer func() {
+ bf.removeBleSeqListener("exchange-mtu", r.Seq)
+ }()
mtu, err := exchangeMtu(bf.params.Bx, bl, r)
if err != nil {
@@ -619,11 +702,13 @@ func (bf *BleFsm) writeCmd(data []byte) error {
r.AttrHandle = bf.nmpReqChr.ValHandle
r.Data.Bytes = data
- bl, err := bf.addBleSeqListener(r.Seq)
+ bl, err := bf.addBleSeqListener("write-cmd", r.Seq)
if err != nil {
return err
}
- defer bf.removeBleSeqListener(r.Seq)
+ defer func() {
+ bf.removeBleSeqListener("write-cmd", r.Seq)
+ }()
if err := writeCmd(bf.params.Bx, bl, r); err != nil {
return err
@@ -638,11 +723,13 @@ func (bf *BleFsm) subscribe() error {
r.AttrHandle = bf.nmpRspChr.ValHandle + 1
r.Data.Bytes = []byte{1, 0}
- bl, err := bf.addBleSeqListener(r.Seq)
+ bl, err := bf.addBleSeqListener("subscribe", r.Seq)
if err != nil {
return err
}
- defer bf.removeBleSeqListener(r.Seq)
+ defer func() {
+ bf.removeBleSeqListener("subscribe", r.Seq)
+ }()
if err := writeCmd(bf.params.Bx, bl, r); err != nil {
return err
@@ -668,109 +755,105 @@ func (bf *BleFsm) tryFillPeerDev() bool {
return false
}
-// @return bool Whether another start attempt should be made;
-// error The error that caused the start attempt to
-// fail; nil on success.
-func (bf *BleFsm) Start() (bool, error) {
- if !bf.IsClosed() {
- return false, nmxutil.NewSesnAlreadyOpenError(
- "Attempt to open an already-open BLE session")
- }
+func (bf *BleFsm) executeState() (bool, error) {
+ bf.stateMtx.Lock()
+ defer bf.stateMtx.Unlock()
- for {
- state := bf.getState()
- switch state {
- case SESN_STATE_UNCONNECTED:
- var err error
-
- // Determine if we can immediately initiate a connection, or if we
- // need to scan for a peer first. If the client specified a peer
- // address, or if we have already successfully scanned, we initiate
- // a connection now. Otherwise, we need to scan to determine which
- // peer meets the specified scan criteria.
- bf.tryFillPeerDev()
- if bf.peerDev == nil {
- // Peer not inferred yet. Initiate scan.
- cb := func() error { return bf.scan() }
- err = bf.action(
- SESN_STATE_UNCONNECTED,
- SESN_STATE_SCANNING,
- SESN_STATE_UNCONNECTED,
- cb)
- } else {
- // We already know the address we want to connect to. Initiate
- // a connection.
- cb := func() error { return bf.connect() }
- err = bf.action(
- SESN_STATE_UNCONNECTED,
- SESN_STATE_CONNECTING,
- SESN_STATE_CONNECTED,
- cb)
+ switch bf.state {
+ case SESN_STATE_UNCONNECTED:
+ // Determine if we can immediately initiate a connection, or if we
+ // need to scan for a peer first. If the client specified a peer
+ // address, or if we have already successfully scanned, we initiate
+ // a connection now. Otherwise, we need to scan to determine which
+ // peer meets the specified scan criteria.
+ bf.tryFillPeerDev()
+ if bf.peerDev == nil {
+ // Peer not inferred yet. Initiate scan.
+ if err := bf.scan(); err != nil {
+ return false, err
}
-
- if err != nil {
- log.Info("[%p] FAILED FROM UNCONNECTED STATE: %s", bf, err.Error())
- bf.resetState(err)
+ bf.state = SESN_STATE_UNCONNECTED
+ } else {
+ // We already know the address we want to connect to. Initiate
+ // a connection.
+ if err := bf.connect(); err != nil {
return false, err
}
+ bf.state = SESN_STATE_EXCHANGE_MTU
+ }
- case SESN_STATE_CONNECTED:
- cb := func() error { return bf.exchangeMtu() }
- err := bf.action(
- SESN_STATE_CONNECTED,
- SESN_STATE_EXCHANGING_MTU,
- SESN_STATE_EXCHANGED_MTU,
- cb)
- if err != nil {
- bhe := nmxutil.ToBleHost(err)
- retry := bhe != nil && bhe.Status == ERR_CODE_ENOTCONN
- bf.resetState(err)
- return retry, err
- }
+ case SESN_STATE_EXCHANGE_MTU:
+ if err := bf.exchangeMtu(); err != nil {
+ bhe := nmxutil.ToBleHost(err)
+ retry := bhe != nil && bhe.Status == ERR_CODE_ENOTCONN
+ return retry, err
+ }
+ bf.state = SESN_STATE_DISCOVER_SVC
- case SESN_STATE_EXCHANGED_MTU:
- cb := func() error { return bf.discSvcUuid() }
- err := bf.action(
- SESN_STATE_EXCHANGED_MTU,
- SESN_STATE_DISCOVERING_SVC,
- SESN_STATE_DISCOVERED_SVC,
- cb)
- if err != nil {
- bf.resetState(err)
- return false, err
- }
+ case SESN_STATE_DISCOVER_SVC:
+ if err := bf.discSvcUuid(); err != nil {
+ return false, err
+ }
+ bf.state = SESN_STATE_DISCOVER_CHR
- case SESN_STATE_DISCOVERED_SVC:
- cb := func() error {
- return bf.discAllChrs()
- }
+ case SESN_STATE_DISCOVER_CHR:
+ if err := bf.discAllChrs(); err != nil {
+ return false, err
+ }
+ bf.state = SESN_STATE_SUBSCRIBE
- err := bf.action(
- SESN_STATE_DISCOVERED_SVC,
- SESN_STATE_DISCOVERING_CHR,
- SESN_STATE_DISCOVERED_CHR,
- cb)
- if err != nil {
- bf.resetState(err)
- return false, err
- }
+ case SESN_STATE_SUBSCRIBE:
+ if err := bf.subscribe(); err != nil {
+ return false, err
+ }
+ bf.state = SESN_STATE_DONE
- if err := bf.subscribe(); err != nil {
- bf.resetState(err)
- return false, err
- }
+ case SESN_STATE_DONE:
+ /* Open complete. */
+ return false, fmt.Errorf("BleFsm already done being opened")
+
+ default:
+ return false, fmt.Errorf("BleFsm already being opened")
+ }
+
+ return false, nil
+}
+
+func (bf *BleFsm) startOnce() (bool, error) {
+ bf.blockUntilReset()
+
+ if !bf.IsClosed() {
+ return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
+ "Attempt to open an already-open BLE session (state=%d)",
+ bf.getState()))
+ }
- case SESN_STATE_DISCOVERED_CHR:
- /* Open complete. */
+ for {
+ retry, err := bf.executeState()
+ if err != nil {
+ bf.onError(err)
+ return retry, err
+ } else if bf.getState() == SESN_STATE_DONE {
return false, nil
+ }
+ }
+}
- case SESN_STATE_CONNECTING,
- SESN_STATE_DISCOVERING_SVC,
- SESN_STATE_DISCOVERING_CHR,
- SESN_STATE_TERMINATING:
- return false, fmt.Errorf("BleFsm already being opened")
+// @return bool Whether another start attempt should be made;
+// error The error that caused the start attempt to
+// fail; nil on success.
+func (bf *BleFsm) Start() error {
+ var err error
+
+ for i := 0; i < bf.params.ConnTries; i++ {
+ var retry bool
+ retry, err = bf.startOnce()
+ if !retry {
+ break
}
}
+
+ return err
}
// @return bool true if stop complete;
@@ -787,10 +870,8 @@ func (bf *BleFsm) Stop() (bool, error) {
bf.closedError("Attempt to close an unopened BLE session")
case SESN_STATE_CONNECTING:
- if err := bf.connCancel(); err != nil {
- return false, err
- }
- return true, nil
+ bf.onError(fmt.Errorf("Connection attempt cancelled"))
+ return false, nil
default:
if err := bf.terminate(); err != nil {
@@ -801,7 +882,7 @@ func (bf *BleFsm) Stop() (bool, error) {
}
func (bf *BleFsm) IsOpen() bool {
- return bf.getState() == SESN_STATE_DISCOVERED_CHR
+ return bf.getState() == SESN_STATE_DONE
}
func (bf *BleFsm) IsClosed() bool {
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_oic_sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index e5139be..0c7b052 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -5,8 +5,6 @@ import (
"sync"
"time"
- log "github.com/Sirupsen/logrus"
-
"mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/nmp"
@@ -18,7 +16,6 @@ type BleOicSesn struct {
bf *BleFsm
nls map[*nmp.NmpListener]struct{}
od *omp.OmpDispatcher
- connTries int
closeTimeout time.Duration
onCloseCb sesn.BleOnCloseFn
@@ -30,7 +27,6 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
bos := &BleOicSesn{
nls: map[*nmp.NmpListener]struct{}{},
od: omp.NewOmpDispatcher(),
- connTries: cfg.Ble.ConnTries,
closeTimeout: cfg.Ble.CloseTimeout,
onCloseCb: cfg.Ble.OnCloseCb,
}
@@ -54,6 +50,7 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
Bx: bx,
OwnAddrType: cfg.Ble.OwnAddrType,
PeerSpec: cfg.Ble.PeerSpec,
+ ConnTries: cfg.Ble.ConnTries,
SvcUuid: svcUuid,
ReqChrUuid: reqChrUuid,
RspChrUuid: rspChrUuid,
@@ -139,26 +136,7 @@ func (bos *BleOicSesn) AbortRx(seq uint8) error {
}
func (bos *BleOicSesn) Open() error {
- var err error
- for i := 0; i < bos.connTries; i++ {
- log.Debugf("Opening BLE session; try %d/%d", i+1, bos.connTries)
-
- var retry bool
- retry, err = bos.bf.Start()
- if !retry {
- break
- }
-
- if bos.blockUntilClosed(1*time.Second) != nil {
- // Just close the session manually and report the original error.
- bos.Close()
- return err
- }
-
- log.Debugf("Connection to BLE peer dropped immediately; retrying")
- }
-
- return err
+ return bos.bf.Start()
}
func (bos *BleOicSesn) Close() error {
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_plain_sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index 803eb45..9e1f70d 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -5,8 +5,6 @@ import (
"sync"
"time"
- log "github.com/Sirupsen/logrus"
-
"mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/nmp"
@@ -17,7 +15,6 @@ type BlePlainSesn struct {
bf *BleFsm
nls map[*nmp.NmpListener]struct{}
nd *nmp.NmpDispatcher
- connTries int
closeTimeout time.Duration
onCloseCb sesn.BleOnCloseFn
@@ -29,7 +26,6 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
bps := &BlePlainSesn{
nls: map[*nmp.NmpListener]struct{}{},
nd: nmp.NewNmpDispatcher(),
- connTries: cfg.Ble.ConnTries,
closeTimeout: cfg.Ble.CloseTimeout,
onCloseCb: cfg.Ble.OnCloseCb,
}
@@ -48,6 +44,7 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
Bx: bx,
OwnAddrType: cfg.Ble.OwnAddrType,
PeerSpec: cfg.Ble.PeerSpec,
+ ConnTries: cfg.Ble.ConnTries,
SvcUuid: svcUuid,
ReqChrUuid: chrUuid,
RspChrUuid: chrUuid,
@@ -112,46 +109,12 @@ func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error {
}
}
-func (bps *BlePlainSesn) blockUntilClosed(timeout time.Duration) error {
- if err := bps.setCloseChan(); err != nil {
- return err
- }
- defer bps.clearCloseChan()
-
- // If the session is already closed, we're done.
- if bps.bf.IsClosed() {
- return nil
- }
-
- // Block until close completes or times out.
- return bps.listenForClose(timeout)
-}
-
func (bps *BlePlainSesn) AbortRx(seq uint8) error {
return bps.nd.FakeRxError(seq, fmt.Errorf("Rx aborted"))
}
func (bps *BlePlainSesn) Open() error {
- var err error
- for i := 0; i < bps.connTries; i++ {
- log.Debugf("Opening BLE session; try %d/%d", i+1, bps.connTries)
-
- var retry bool
- retry, err = bps.bf.Start()
- if !retry {
- break
- }
-
- if bps.blockUntilClosed(1*time.Second) != nil {
- // Just close the session manually and report the original error.
- bps.Close()
- return err
- }
-
- log.Debugf("Connection to BLE peer dropped immediately; retrying")
- }
-
- return err
+ return bps.bf.Start()
}
func (bps *BlePlainSesn) Close() error {
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_proto.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_proto.go b/nmxact/nmble/ble_proto.go
index 69c456f..6a99feb 100644
--- a/nmxact/nmble/ble_proto.go
+++ b/nmxact/nmble/ble_proto.go
@@ -241,7 +241,8 @@ const (
MSG_TYPE_NOTIFY_RX_EVT = 2055
MSG_TYPE_MTU_CHANGE_EVT = 2056
MSG_TYPE_SCAN_EVT = 2057
- MSG_TYPE_ENC_CHANGE_EVT = 2058
+ MSG_TYPE_SCAN_TMO_EVT = 2058
+ MSG_TYPE_ENC_CHANGE_EVT = 2059
)
var MsgOpStringMap = map[MsgOp]string{
@@ -276,6 +277,7 @@ var MsgTypeStringMap = map[MsgType]string{
MSG_TYPE_NOTIFY_RX_EVT: "notify_rx_evt",
MSG_TYPE_MTU_CHANGE_EVT: "mtu_change_evt",
MSG_TYPE_SCAN_EVT: "scan_evt",
+ MSG_TYPE_SCAN_TMO_EVT: "scan_tmo_evt",
MSG_TYPE_ENC_CHANGE_EVT: "enc_change_evt",
}
@@ -693,6 +695,13 @@ type BleScanEvt struct {
DataMfgData BleBytes `json:"data_mfg_data"`
}
+type BleScanTmoEvt struct {
+ // Header
+ Op MsgOp `json:"op"`
+ Type MsgType `json:"type"`
+ Seq BleSeq `json:"seq"`
+}
+
type BleScanCancelReq struct {
// Header
Op MsgOp `json:"op"`
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_util.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_util.go b/nmxact/nmble/ble_util.go
index f0a6c04..66c8c6b 100644
--- a/nmxact/nmble/ble_util.go
+++ b/nmxact/nmble/ble_util.go
@@ -30,7 +30,7 @@ func BhdTimeoutError(rspType MsgType) error {
MsgTypeToString(rspType))
log.Debug(str)
- return nmxutil.NewXportTimeoutError(str)
+ return nmxutil.NewXportError(str)
}
func StatusError(op MsgOp, msgType MsgType, status int) error {
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_xport.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index f5a9272..d87f0c8 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -91,8 +91,7 @@ type BleXport struct {
shutdownChan chan bool
readyChan chan error
numReadyListeners int
- masterQueue [](chan error)
- masterActive bool
+ master nmxutil.SingleResource
randAddr *BleAddr
mtx sync.Mutex
@@ -104,7 +103,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
Bd: NewBleDispatcher(),
shutdownChan: make(chan bool),
readyChan: make(chan error),
- masterQueue: [](chan error){},
+ master: nmxutil.NewSingleResource(),
cfg: cfg,
}
@@ -216,6 +215,12 @@ func (bx *BleXport) initialSyncCheck() (bool, *BleListener, error) {
}
func (bx *BleXport) shutdown(restart bool, err error) {
+ if !nmxutil.IsXport(err) {
+ panic(fmt.Sprintf(
+ "BleXport.shutdown() received error that isn't an XportError: %+v",
+ err))
+ }
+
bx.mtx.Lock()
var fullyStarted bool
@@ -246,15 +251,12 @@ func (bx *BleXport) shutdown(restart bool, err error) {
bx.client.Stop()
}
+ bx.master.Abort(err)
+
// Indicate an error to all of this transport's listeners. This prevents
// them from blocking endlessly while awaiting a BLE message.
bx.Bd.ErrorAll(err)
- for _, listener := range bx.masterQueue {
- listener <- err
- }
- bx.masterQueue = [](chan error){}
-
// Stop all of this transport's go routines.
for i := 0; i < bx.numStopListeners; i++ {
bx.stopChan <- struct{}{}
@@ -311,7 +313,7 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
case BLE_XPORT_STATE_STARTED:
bx.notifyReadyListeners(nil)
case BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_DORMANT:
- bx.notifyReadyListeners(fmt.Errorf("BLE transport stopped"))
+ bx.notifyReadyListeners(nmxutil.NewXportError("BLE transport stopped"))
default:
}
@@ -319,7 +321,7 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
}
func (bx *BleXport) Stop() error {
- bx.shutdown(false, nil)
+ bx.shutdown(false, nmxutil.NewXportError("xport stopped"))
return nil
}
@@ -524,40 +526,9 @@ func (bx *BleXport) RspTimeout() time.Duration {
}
func (bx *BleXport) AcquireMaster() error {
- bx.mtx.Lock()
-
- if !bx.masterActive {
- bx.masterActive = true
- bx.mtx.Unlock()
- return nil
- }
-
- listener := make(chan error)
- bx.masterQueue = append(bx.masterQueue, listener)
-
- bx.mtx.Unlock()
-
- return <-listener
+ return bx.master.Acquire()
}
func (bx *BleXport) ReleaseMaster() {
- bx.mtx.Lock()
-
- if !bx.masterActive {
- bx.mtx.Unlock()
- return
- }
-
- if len(bx.masterQueue) == 0 {
- bx.masterActive = false
- bx.mtx.Unlock()
- return
- }
-
- listener := bx.masterQueue[0]
- bx.masterQueue = bx.masterQueue[1:]
-
- bx.mtx.Unlock()
-
- listener <- nil
+ bx.master.Release()
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/dispatch.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go
index 40ec8f2..83f1f0f 100644
--- a/nmxact/nmble/dispatch.go
+++ b/nmxact/nmble/dispatch.go
@@ -109,6 +109,7 @@ func discChrEvtCtor() BleMsg { return &BleDiscChrEvt{} }
func notifyRxEvtCtor() BleMsg { return &BleNotifyRxEvt{} }
func mtuChangeEvtCtor() BleMsg { return &BleMtuChangeEvt{} }
func scanEvtCtor() BleMsg { return &BleScanEvt{} }
+func scanTmoEvtCtor() BleMsg { return &BleScanTmoEvt{} }
var msgCtorMap = map[OpTypePair]msgCtor{
{MSG_OP_RSP, MSG_TYPE_ERR}: errRspCtor,
@@ -136,6 +137,7 @@ var msgCtorMap = map[OpTypePair]msgCtor{
{MSG_OP_EVT, MSG_TYPE_NOTIFY_RX_EVT}: notifyRxEvtCtor,
{MSG_OP_EVT, MSG_TYPE_MTU_CHANGE_EVT}: mtuChangeEvtCtor,
{MSG_OP_EVT, MSG_TYPE_SCAN_EVT}: scanEvtCtor,
+ {MSG_OP_EVT, MSG_TYPE_SCAN_TMO_EVT}: scanTmoEvtCtor,
}
func NewBleDispatcher() *BleDispatcher {
@@ -256,8 +258,8 @@ func decodeBleMsg(data []byte) (BleMsgBase, BleMsg, error) {
cb := msgCtorMap[opTypePair]
if cb == nil {
return base, nil, fmt.Errorf(
- "Unrecognized op+type pair: %s, %s",
- MsgOpToString(base.Op), MsgTypeToString(base.Type))
+ "Unrecognized op+type pair:") // %s, %s",
+ //MsgOpToString(base.Op), MsgTypeToString(base.Type))
}
msg := cb()
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmserial/serial_xport.go
----------------------------------------------------------------------
diff --git a/nmxact/nmserial/serial_xport.go b/nmxact/nmserial/serial_xport.go
index 7dbb5ca..37f991f 100644
--- a/nmxact/nmserial/serial_xport.go
+++ b/nmxact/nmserial/serial_xport.go
@@ -12,9 +12,9 @@ import (
"github.com/joaojeronimo/go-crc16"
"github.com/tarm/serial"
+ "mynewt.apache.org/newt/util"
"mynewt.apache.org/newtmgr/nmxact/nmxutil"
"mynewt.apache.org/newtmgr/nmxact/sesn"
- "mynewt.apache.org/newt/util"
)
type XportCfg struct {
@@ -209,7 +209,7 @@ func (sx *SerialXport) Rx() ([]byte, error) {
if err == nil {
// Scanner hit EOF, so we'll need to create a new one. This only
// happens on timeouts.
- err = nmxutil.NewXportTimeoutError(
+ err = nmxutil.NewXportError(
"Timeout reading from serial connection")
sx.scanner = bufio.NewScanner(sx.port)
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmxutil/nmxerr.go
----------------------------------------------------------------------
diff --git a/nmxact/nmxutil/nmxerr.go b/nmxact/nmxutil/nmxerr.go
index aa9da23..28930e3 100644
--- a/nmxact/nmxutil/nmxerr.go
+++ b/nmxact/nmxutil/nmxerr.go
@@ -89,38 +89,44 @@ func IsSesnClosed(err error) bool {
return ok
}
-// Represents a low-level transport error.
-type XportError struct {
+type ScanTmoError struct {
Text string
}
-func NewXportError(text string) *XportError {
- return &XportError{text}
+func NewScanTmoError(text string) *ScanTmoError {
+ return &ScanTmoError{
+ Text: text,
+ }
}
-func (e *XportError) Error() string {
+func (e *ScanTmoError) Error() string {
return e.Text
}
-func IsXport(err error) bool {
- _, ok := err.(*XportError)
+func IsScanTmo(err error) bool {
+ _, ok := err.(*ScanTmoError)
return ok
}
-type XportTimeoutError struct {
+// Represents a low-level transport error.
+type XportError struct {
Text string
}
-func NewXportTimeoutError(text string) *XportTimeoutError {
- return &XportTimeoutError{text}
+func NewXportError(text string) *XportError {
+ return &XportError{text}
}
-func (e *XportTimeoutError) Error() string {
+func (e *XportError) Error() string {
return e.Text
}
-func IsXportTimeout(err error) bool {
- _, ok := err.(*XportTimeoutError)
+func IsXport(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ _, ok := err.(*XportError)
return ok
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmxutil/nmxutil.go
----------------------------------------------------------------------
diff --git a/nmxact/nmxutil/nmxutil.go b/nmxact/nmxutil/nmxutil.go
index ef1ecd4..cb299e9 100644
--- a/nmxact/nmxutil/nmxutil.go
+++ b/nmxact/nmxutil/nmxutil.go
@@ -23,3 +23,80 @@ func NextNmpSeq() uint8 {
return val
}
+
+type SingleResource struct {
+ acquired bool
+ waitQueue [](chan error)
+ mtx sync.Mutex
+}
+
+func NewSingleResource() SingleResource {
+ return SingleResource{
+ waitQueue: [](chan error){},
+ }
+}
+
+func (s *SingleResource) removeWaiter(waiter chan error) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ for i, w := range s.waitQueue {
+ if w == waiter {
+ s.waitQueue = append(s.waitQueue[:i], s.waitQueue[i+1:]...)
+ }
+ }
+}
+
+func (s *SingleResource) Acquire() error {
+ s.mtx.Lock()
+
+ if !s.acquired {
+ s.acquired = true
+ s.mtx.Unlock()
+ return nil
+ }
+
+ w := make(chan error)
+ s.waitQueue = append(s.waitQueue, w)
+
+ s.mtx.Unlock()
+
+ err := <-w
+ if err != nil {
+ s.removeWaiter(w)
+ return err
+ }
+
+ return nil
+}
+
+func (s *SingleResource) Release() {
+ s.mtx.Lock()
+
+ if !s.acquired {
+ s.mtx.Unlock()
+ return
+ }
+
+ if len(s.waitQueue) == 0 {
+ s.acquired = false
+ s.mtx.Unlock()
+ return
+ }
+
+ w := s.waitQueue[0]
+ s.waitQueue = s.waitQueue[1:]
+
+ s.mtx.Unlock()
+
+ w <- nil
+}
+
+func (s *SingleResource) Abort(err error) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ for _, w := range s.waitQueue {
+ w <- err
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/sesn/sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/sesn/sesn.go b/nmxact/sesn/sesn.go
index 0782e63..ea3f43a 100644
--- a/nmxact/sesn/sesn.go
+++ b/nmxact/sesn/sesn.go
@@ -77,9 +77,7 @@ func TxNmp(s Sesn, m *nmp.NmpMsg, o TxOptions) (nmp.NmpRsp, error) {
return r, nil
}
- if (!nmxutil.IsNmpTimeout(err) && !nmxutil.IsXportTimeout(err)) ||
- i >= retries {
-
+ if !nmxutil.IsNmpTimeout(err) || i >= retries {
return nil, err
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/sesn/sesn_cfg.go
----------------------------------------------------------------------
diff --git a/nmxact/sesn/sesn_cfg.go b/nmxact/sesn/sesn_cfg.go
index 9c84db1..3c57f24 100644
--- a/nmxact/sesn/sesn_cfg.go
+++ b/nmxact/sesn/sesn_cfg.go
@@ -61,7 +61,7 @@ func NewSesnCfg() SesnCfg {
return SesnCfg{
Ble: SesnCfgBle{
ConnTries: 3,
- CloseTimeout: 5 * time.Second,
+ CloseTimeout: 15 * time.Second,
},
}
}