You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/04/03 23:53:59 UTC
[2/6] incubator-mynewt-newtmgr git commit: nmxact - Stability when
xport fails
nmxact - Stability when xport fails
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/db639f93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/db639f93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/db639f93
Branch: refs/heads/master
Commit: db639f9313a27c051c0b6dd9e13840549f3f5acd
Parents: 0384fcf
Author: Christopher Collins <cc...@apache.org>
Authored: Fri Mar 31 18:26:35 2017 -0700
Committer: Christopher Collins <cc...@apache.org>
Committed: Mon Apr 3 16:49:25 2017 -0700
----------------------------------------------------------------------
nmxact/nmble/ble_act.go | 16 +++----
nmxact/nmble/ble_fsm.go | 53 +++++++++++++----------
nmxact/nmble/ble_oic_sesn.go | 12 ++++++
nmxact/nmble/ble_plain_sesn.go | 6 +++
nmxact/nmble/ble_xport.go | 85 ++++++++++++++++++++-----------------
nmxact/nmp/dispatch.go | 1 +
6 files changed, 103 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_act.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go
index 2aa6678..cb6cb00 100644
--- a/nmxact/nmble/ble_act.go
+++ b/nmxact/nmble/ble_act.go
@@ -56,7 +56,7 @@ func terminate(x *BleXport, bl *BleListener, r *BleTerminateReq) error {
default:
}
- case <-bl.AfterTimeout(x.rspTimeout):
+ case <-bl.AfterTimeout(x.RspTimeout()):
return BhdTimeoutError(MSG_TYPE_TERMINATE)
}
}
@@ -92,7 +92,7 @@ func connCancel(x *BleXport, bl *BleListener, r *BleConnCancelReq) error {
default:
}
- case <-bl.AfterTimeout(x.rspTimeout):
+ case <-bl.AfterTimeout(x.RspTimeout()):
return BhdTimeoutError(MSG_TYPE_TERMINATE)
}
}
@@ -148,7 +148,7 @@ func discSvcUuid(x *BleXport, bl *BleListener, r *BleDiscSvcUuidReq) (
default:
}
- case <-bl.AfterTimeout(x.rspTimeout):
+ case <-bl.AfterTimeout(x.RspTimeout()):
return nil, BhdTimeoutError(MSG_TYPE_DISC_SVC_UUID)
}
}
@@ -198,7 +198,7 @@ func discAllChrs(x *BleXport, bl *BleListener, r *BleDiscAllChrsReq) (
default:
}
- case <-bl.AfterTimeout(x.rspTimeout):
+ case <-bl.AfterTimeout(x.RspTimeout()):
return nil, BhdTimeoutError(MSG_TYPE_DISC_ALL_CHRS)
}
}
@@ -235,7 +235,7 @@ func writeCmd(x *BleXport, bl *BleListener, r *BleWriteCmdReq) error {
default:
}
- case <-bl.AfterTimeout(x.rspTimeout):
+ case <-bl.AfterTimeout(x.RspTimeout()):
return BhdTimeoutError(MSG_TYPE_WRITE_CMD)
}
}
@@ -281,7 +281,7 @@ func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) (
default:
}
- case <-bl.AfterTimeout(x.rspTimeout):
+ case <-bl.AfterTimeout(x.RspTimeout()):
return 0, BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU)
}
}
@@ -320,7 +320,7 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq,
default:
}
- case <-bl.AfterTimeout(x.rspTimeout):
+ case <-bl.AfterTimeout(x.RspTimeout()):
return BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU)
case <-abortChan:
@@ -356,7 +356,7 @@ func scanCancel(x *BleXport, bl *BleListener, r *BleScanCancelReq) error {
default:
}
- case <-bl.AfterTimeout(x.rspTimeout):
+ case <-bl.AfterTimeout(x.RspTimeout()):
return BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_fsm.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index cf12bdc..39bfdfc 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -216,6 +216,30 @@ func (bf *BleFsm) calcDisconnectType() BleFsmDisconnectType {
}
}
+func (bf *BleFsm) onDisconnect(err error) {
+ log.Debugf(err.Error())
+
+ bf.mtx.Lock()
+ bls := make([]*BleListener, 0, len(bf.bls))
+ for bl, _ := range bf.bls {
+ bls = append(bls, bl)
+ }
+ bf.mtx.Unlock()
+
+ // Remember some fields before we clear them.
+ dt := bf.calcDisconnectType()
+ peer := *bf.peerDev
+
+ bf.setState(SESN_STATE_UNCONNECTED)
+ bf.peerDev = nil
+
+ for _, bl := range bls {
+ bl.ErrChan <- err
+ }
+
+ bf.params.DisconnectCb(dt, peer, err)
+}
+
func (bf *BleFsm) connectListen(seq int) error {
bf.connChan = make(chan error, 1)
@@ -228,7 +252,10 @@ func (bf *BleFsm) connectListen(seq int) error {
defer bf.removeBleSeqListener(seq)
for {
select {
- case <-bl.ErrChan:
+ case err := <-bl.ErrChan:
+ // Transport reported error. Assume all connections have
+ // dropped.
+ bf.onDisconnect(err)
return
case bm := <-bl.BleChan:
@@ -281,33 +308,13 @@ func (bf *BleFsm) connectListen(seq int) error {
case *BleDisconnectEvt:
err := bf.disconnectError(msg.Reason)
- log.Debugf(err.Error())
-
- bf.mtx.Lock()
- bls := make([]*BleListener, 0, len(bf.bls))
- for bl, _ := range bf.bls {
- bls = append(bls, bl)
- }
- bf.mtx.Unlock()
-
- // Remember some fields before we clear them.
- dt := bf.calcDisconnectType()
- peer := *bf.peerDev
-
- bf.setState(SESN_STATE_UNCONNECTED)
- bf.peerDev = nil
-
- for _, bl := range bls {
- bl.ErrChan <- err
- }
-
- bf.params.DisconnectCb(dt, peer, err)
+ bf.onDisconnect(err)
return
default:
}
- case <-bl.AfterTimeout(bf.params.Bx.rspTimeout):
+ case <-bl.AfterTimeout(bf.params.Bx.RspTimeout()):
bf.connChan <- BhdTimeoutError(MSG_TYPE_CONNECT)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_oic_sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index 4001960..5fa9291 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -5,6 +5,8 @@ import (
"sync"
"time"
+ log "github.com/Sirupsen/logrus"
+
"mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/nmp"
@@ -135,11 +137,21 @@ func (bos *BleOicSesn) AbortRx(seq uint8) error {
func (bos *BleOicSesn) Open() error {
var err error
for i := 0; i < bos.connTries; i++ {
+ log.Debugf("Opening BLE session; try %d/%d", i+1, bos.connTries)
+
var retry bool
retry, err = bos.bf.Start()
if !retry {
break
}
+
+ if bos.blockUntilClosed(1*time.Second) != nil {
+ // Just close the session manually and report the original error.
+ bos.Close()
+ return err
+ }
+
+ log.Debugf("Connection to BLE peer dropped immediately; retrying")
}
return err
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_plain_sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index 012a301..15419a9 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -5,6 +5,8 @@ import (
"sync"
"time"
+ log "github.com/Sirupsen/logrus"
+
"mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/nmp"
@@ -128,6 +130,8 @@ func (bps *BlePlainSesn) AbortRx(seq uint8) error {
func (bps *BlePlainSesn) Open() error {
var err error
for i := 0; i < bps.connTries; i++ {
+ log.Debugf("Opening BLE session; try %d/%d", i+1, bps.connTries)
+
var retry bool
retry, err = bps.bf.Start()
if !retry {
@@ -139,6 +143,8 @@ func (bps *BlePlainSesn) Open() error {
bps.Close()
return err
}
+
+ log.Debugf("Connection to BLE peer dropped immediately; retrying")
}
return err
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_xport.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index ba38b9d..16a4128 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -9,9 +9,9 @@ import (
log "github.com/Sirupsen/logrus"
+ "mynewt.apache.org/newt/util/unixchild"
"mynewt.apache.org/newtmgr/nmxact/nmxutil"
"mynewt.apache.org/newtmgr/nmxact/sesn"
- "mynewt.apache.org/newt/util/unixchild"
)
type XportCfg struct {
@@ -33,13 +33,17 @@ type XportCfg struct {
// Path of the BLE controller device (e.g., /dev/ttyUSB0).
DevPath string
+
+ // How long to allow for the host and controller to sync at startup.
+ SyncTimeout time.Duration
}
func NewXportCfg() XportCfg {
return XportCfg{
BlehostdAcceptTimeout: time.Second,
- BlehostdRestart: true,
BlehostdRspTimeout: time.Second,
+ BlehostdRestart: true,
+ SyncTimeout: 10 * time.Second,
}
}
@@ -57,31 +61,29 @@ type BleXport struct {
client *unixchild.Client
state BleXportState
- syncTimeout time.Duration
- rspTimeout time.Duration
+ cfg XportCfg
}
func NewBleXport(cfg XportCfg) (*BleXport, error) {
- config := unixchild.Config{
- SockPath: cfg.SockPath,
- ChildPath: cfg.BlehostdPath,
- ChildArgs: []string{cfg.DevPath, cfg.SockPath},
- Depth: 10,
- MaxMsgSz: 10240,
- AcceptTimeout: cfg.BlehostdAcceptTimeout,
- Restart: cfg.BlehostdRestart,
+ bx := &BleXport{
+ Bd: NewBleDispatcher(),
+ cfg: cfg,
}
- c := unixchild.New(config)
+ return bx, nil
+}
- bx := &BleXport{
- client: c,
- Bd: NewBleDispatcher(),
- syncTimeout: 10 * time.Second,
- rspTimeout: cfg.BlehostdRspTimeout,
+func (bx *BleXport) createUnixChild() {
+ config := unixchild.Config{
+ SockPath: bx.cfg.SockPath,
+ ChildPath: bx.cfg.BlehostdPath,
+ ChildArgs: []string{bx.cfg.DevPath, bx.cfg.SockPath},
+ Depth: 10,
+ MaxMsgSz: 10240,
+ AcceptTimeout: bx.cfg.BlehostdAcceptTimeout,
}
- return bx, nil
+ bx.client = unixchild.New(config)
}
func (bx *BleXport) BuildSesn(cfg sesn.SesnCfg) (sesn.Sesn, error) {
@@ -182,11 +184,11 @@ func (bx *BleXport) onError(err error) {
// Stop already in progress.
return
}
- bx.Bd.ErrorAll(err)
if bx.client != nil {
bx.client.Stop()
bx.client.FromChild <- nil
}
+ bx.Bd.ErrorAll(err)
}
func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
@@ -209,24 +211,29 @@ func (bx *BleXport) Start() error {
return nmxutil.NewXportError("BLE xport started twice")
}
+ bx.createUnixChild()
if err := bx.client.Start(); err != nil {
- return nmxutil.NewXportError(
- "Failed to start child child process: " + err.Error())
+ if unixchild.IsUcAcceptError(err) {
+ err = nmxutil.NewXportError("blehostd did not connect to socket; " +
+ "controller not attached?")
+ } else {
+ err = nmxutil.NewXportError(
+ "Failed to start child process: " + err.Error())
+ }
+ bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STOPPED)
+ return err
}
go func() {
err := <-bx.client.ErrChild
- if unixchild.IsUcAcceptError(err) {
- err = fmt.Errorf("blehostd did not connect to socket; " +
- "controller not attached?")
- }
+ err = nmxutil.NewXportError("BLE transport error: " + err.Error())
+ fmt.Printf("%s\n", err.Error())
bx.onError(err)
- return
}()
go func() {
for {
- if _, err := bx.rx(); err != nil {
+ if b := bx.rx(); b == nil {
// The error should have been reported to everyone interested.
break
}
@@ -254,7 +261,7 @@ func (bx *BleXport) Start() error {
break SyncLoop
}
}
- case <-time.After(bx.syncTimeout):
+ case <-time.After(bx.cfg.SyncTimeout):
bx.Stop()
return nmxutil.NewXportError(
"Timeout waiting for host <-> controller sync")
@@ -305,15 +312,15 @@ func (bx *BleXport) Tx(data []byte) error {
return nil
}
-func (bx *BleXport) rx() ([]byte, error) {
- select {
- case err := <-bx.client.ErrChild:
- return nil, err
- case buf := <-bx.client.FromChild:
- if len(buf) != 0 {
- log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
- bx.Bd.Dispatch(buf)
- }
- return buf, nil
+func (bx *BleXport) rx() []byte {
+ buf := <-bx.client.FromChild
+ if len(buf) != 0 {
+ log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
+ bx.Bd.Dispatch(buf)
}
+ return buf
+}
+
+func (bx *BleXport) RspTimeout() time.Duration {
+ return bx.cfg.BlehostdRspTimeout
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmp/dispatch.go
----------------------------------------------------------------------
diff --git a/nmxact/nmp/dispatch.go b/nmxact/nmp/dispatch.go
index 7b1ad11..5b39a9b 100644
--- a/nmxact/nmp/dispatch.go
+++ b/nmxact/nmp/dispatch.go
@@ -39,6 +39,7 @@ func NewNmpListener() *NmpListener {
return &NmpListener{
RspChan: make(chan NmpRsp, 1),
ErrChan: make(chan error, 1),
+ tmoChan: make(chan time.Time, 1),
}
}