You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/04/03 23:54:00 UTC
[3/6] incubator-mynewt-newtmgr git commit: nmxact - Automatically
restart BLE xport.
nmxact - Automatically restart BLE xport.
More work needs to be done here. If a client attempts to use the
transport while it is restarting, the attempt immediately fails.
It would be better if:
* The transmit attempt blocks until the transport is done
restarting.
* The transport gives up after X consecutive failures.
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/2aeb4daa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/2aeb4daa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/2aeb4daa
Branch: refs/heads/master
Commit: 2aeb4daab6e2a13152314c23cd1c79fed5ebbdd2
Parents: db639f9
Author: Christopher Collins <cc...@apache.org>
Authored: Mon Apr 3 16:38:03 2017 -0700
Committer: Christopher Collins <cc...@apache.org>
Committed: Mon Apr 3 16:49:58 2017 -0700
----------------------------------------------------------------------
nmxact/nmble/ble_xport.go | 167 +++++++++++++++++++++++++++++++----------
nmxact/nmble/dispatch.go | 15 ++++
2 files changed, 144 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/2aeb4daa/nmxact/nmble/ble_xport.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index 16a4128..32ced04 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -53,21 +53,26 @@ const (
BLE_XPORT_STATE_STOPPED BleXportState = iota
BLE_XPORT_STATE_STARTING
BLE_XPORT_STATE_STARTED
+ BLE_XPORT_STATE_STOPPING
)
// Implements xport.Xport.
type BleXport struct {
- Bd *BleDispatcher
- client *unixchild.Client
- state BleXportState
+ Bd *BleDispatcher
+ client *unixchild.Client
+ state BleXportState
+ stopChan chan struct{}
+ shutdownChan chan bool
+ numStopListeners int
cfg XportCfg
}
func NewBleXport(cfg XportCfg) (*BleXport, error) {
bx := &BleXport{
- Bd: NewBleDispatcher(),
- cfg: cfg,
+ Bd: NewBleDispatcher(),
+ shutdownChan: make(chan bool),
+ cfg: cfg,
}
return bx, nil
@@ -177,18 +182,46 @@ func (bx *BleXport) initialSyncCheck() (bool, *BleListener, error) {
return synced, bl, nil
}
-func (bx *BleXport) onError(err error) {
- if !bx.setStateFrom(BLE_XPORT_STATE_STARTED, BLE_XPORT_STATE_STOPPED) &&
- !bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STOPPED) {
+func (bx *BleXport) shutdown(restart bool, err error) {
+ var fullyStarted bool
+ if bx.setStateFrom(BLE_XPORT_STATE_STARTED,
+ BLE_XPORT_STATE_STOPPING) {
+
+ fullyStarted = true
+ } else if bx.setStateFrom(BLE_XPORT_STATE_STARTING,
+ BLE_XPORT_STATE_STOPPING) {
+
+ fullyStarted = false
+ } else {
// Stop already in progress.
return
}
+
+ // Stop the unixchild instance (blehostd + socket).
if bx.client != nil {
bx.client.Stop()
+
+ // Unblock the unixchild instance.
bx.client.FromChild <- nil
}
+
+ // Indicate an error to all of this transport's listeners. This prevents
+ // them from blocking endlessly while awaiting a BLE message.
bx.Bd.ErrorAll(err)
+
+ // Stop all of this transport's go routines.
+ for i := 0; i < bx.numStopListeners; i++ {
+ bx.stopChan <- struct{}{}
+ }
+
+ bx.setStateFrom(BLE_XPORT_STATE_STOPPING, BLE_XPORT_STATE_STOPPED)
+
+ // Indicate that the shutdown is complete. If restarts are enabled on this
+ // transport, this signals that the transport should be started again.
+ if fullyStarted {
+ bx.shutdownChan <- restart
+ }
}
func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
@@ -202,47 +235,68 @@ func (bx *BleXport) getState() BleXportState {
}
func (bx *BleXport) Stop() error {
- bx.onError(nil)
+ bx.shutdown(false, nil)
return nil
}
-func (bx *BleXport) Start() error {
+func (bx *BleXport) startOnce() error {
if !bx.setStateFrom(BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_STARTING) {
return nmxutil.NewXportError("BLE xport started twice")
}
+ bx.stopChan = make(chan struct{})
+ bx.numStopListeners = 0
+ bx.Bd.Clear()
+
bx.createUnixChild()
if err := bx.client.Start(); err != nil {
if unixchild.IsUcAcceptError(err) {
- err = nmxutil.NewXportError("blehostd did not connect to socket; " +
- "controller not attached?")
+ err = nmxutil.NewXportError(
+ "blehostd did not connect to socket; " +
+ "controller not attached?")
} else {
+ panic(err.Error())
err = nmxutil.NewXportError(
"Failed to start child process: " + err.Error())
}
- bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STOPPED)
+ bx.shutdown(true, err)
return err
}
go func() {
- err := <-bx.client.ErrChild
- err = nmxutil.NewXportError("BLE transport error: " + err.Error())
- fmt.Printf("%s\n", err.Error())
- bx.onError(err)
+ bx.numStopListeners++
+ for {
+ select {
+ case err := <-bx.client.ErrChild:
+ err = nmxutil.NewXportError("BLE transport error: " +
+ err.Error())
+ go bx.shutdown(true, err)
+
+ case <-bx.stopChan:
+ return
+ }
+ }
}()
go func() {
+ bx.numStopListeners++
for {
- if b := bx.rx(); b == nil {
- // The error should have been reported to everyone interested.
- break
+ select {
+ case buf := <-bx.client.FromChild:
+ if len(buf) != 0 {
+ log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
+ bx.Bd.Dispatch(buf)
+ }
+
+ case <-bx.stopChan:
+ return
}
}
}()
synced, bl, err := bx.initialSyncCheck()
if err != nil {
- bx.Stop()
+ bx.shutdown(true, err)
return err
}
@@ -253,6 +307,7 @@ func (bx *BleXport) Start() error {
for {
select {
case err := <-bl.ErrChan:
+ bx.shutdown(true, err)
return err
case bm := <-bl.BleChan:
switch msg := bm.(type) {
@@ -262,34 +317,37 @@ func (bx *BleXport) Start() error {
}
}
case <-time.After(bx.cfg.SyncTimeout):
- bx.Stop()
- return nmxutil.NewXportError(
+ err := nmxutil.NewXportError(
"Timeout waiting for host <-> controller sync")
+ bx.shutdown(true, err)
+ return err
}
}
}
// Host and controller are synced. Listen for sync loss in the background.
go func() {
+ bx.numStopListeners++
for {
select {
case err := <-bl.ErrChan:
- bx.onError(err)
- return
+ go bx.shutdown(true, err)
case bm := <-bl.BleChan:
switch msg := bm.(type) {
case *BleSyncEvt:
if !msg.Synced {
- bx.onError(nmxutil.NewXportError(
+ go bx.shutdown(true, nmxutil.NewXportError(
"BLE host <-> controller sync lost"))
- return
}
}
+ case <-bx.stopChan:
+ return
}
}
}()
if !bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STARTED) {
+ bx.shutdown(true, err)
return nmxutil.NewXportError(
"Internal error; BLE transport in unexpected state")
}
@@ -297,6 +355,47 @@ func (bx *BleXport) Start() error {
return nil
}
+func (bx *BleXport) Start() error {
+ // Try to start the transport. If this first attempt fails, report the
+ // error and don't retry.
+ if err := bx.startOnce(); err != nil {
+ log.Debugf("Error starting BLE transport: %s",
+ err.Error())
+ return err
+ }
+
+ // Now that the first start attempt has succeeded, start a restart loop in
+ // the background.
+ go func() {
+ // Block until transport shuts down.
+ restart := <-bx.shutdownChan
+ for {
+ // If restarts are disabled, or if the shutdown was a result of an
+ // explicit stop call (instead of an unexpected error), stop
+ // restarting the transport.
+ if !bx.cfg.BlehostdRestart || !restart {
+ break
+ }
+
+ // Wait a second before the next restart. This is necessary to
+ // ensure the unix domain socket can be rebound.
+ time.Sleep(time.Second)
+
+ // Attempt to start the transport again.
+ if err := bx.startOnce(); err != nil {
+ // Start attempt failed.
+ log.Debugf("Error starting BLE transport: %s",
+ err.Error())
+ } else {
+ // Success. Block until the transport shuts down.
+ restart = <-bx.shutdownChan
+ }
+ }
+ }()
+
+ return nil
+}
+
func (bx *BleXport) txNoSync(data []byte) {
log.Debugf("Tx to blehostd:\n%s", hex.Dump(data))
bx.client.ToChild <- data
@@ -304,23 +403,15 @@ func (bx *BleXport) txNoSync(data []byte) {
func (bx *BleXport) Tx(data []byte) error {
if bx.getState() != BLE_XPORT_STATE_STARTED {
- return nmxutil.NewXportError("Attempt to transmit before BLE xport " +
- "fully started")
+ return nmxutil.NewXportError(
+ fmt.Sprintf("Attempt to transmit before BLE xport fully started; "+
+ "state=%d", bx.getState()))
}
bx.txNoSync(data)
return nil
}
-func (bx *BleXport) rx() []byte {
- buf := <-bx.client.FromChild
- if len(buf) != 0 {
- log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
- bx.Bd.Dispatch(buf)
- }
- return buf
-}
-
func (bx *BleXport) RspTimeout() time.Duration {
return bx.cfg.BlehostdRspTimeout
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/2aeb4daa/nmxact/nmble/dispatch.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go
index 546fc11..c9ef39e 100644
--- a/nmxact/nmble/dispatch.go
+++ b/nmxact/nmble/dispatch.go
@@ -267,7 +267,10 @@ func (bd *BleDispatcher) Dispatch(data []byte) {
return
}
+ bd.mutex.Lock()
_, listener := bd.findListener(base)
+ bd.mutex.Unlock()
+
if listener == nil {
log.Debugf(
"No BLE listener for op=%d type=%d seq=%d connHandle=%d",
@@ -295,3 +298,15 @@ func (bd *BleDispatcher) ErrorAll(err error) {
listener.ErrChan <- err
}
}
+
+func (bd *BleDispatcher) Clear() {
+ bd.mutex.Lock()
+ defer bd.mutex.Unlock()
+
+ for s, _ := range bd.seqMap {
+ delete(bd.seqMap, s)
+ }
+ for b, _ := range bd.baseMap {
+ delete(bd.baseMap, b)
+ }
+}