You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/08/23 17:45:55 UTC
[mynewt-newtmgr] 03/05: nmxact - Poll bhd for sync status.
This is an automated email from the ASF dual-hosted git repository.
ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git
commit f5c50f1e07c8d09a115920584cdb9993832b4b54
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Tue Aug 22 17:03:16 2017 -0700
nmxact - Poll bhd for sync status.
---
nmxact/nmble/ble_act.go | 22 +++
nmxact/nmble/ble_scanner.go | 8 +-
nmxact/nmble/ble_sesn.go | 17 +--
nmxact/nmble/ble_util.go | 46 ++----
nmxact/nmble/ble_xport.go | 340 ++++++++++++++++++--------------------------
nmxact/nmble/conn.go | 48 ++++---
nmxact/nmble/sync.go | 202 ++++++++++++++++++++++++++
nmxact/nmxutil/block.go | 44 ++++--
8 files changed, 441 insertions(+), 286 deletions(-)
diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go
index 7e76686..7a8677a 100644
--- a/nmxact/nmble/ble_act.go
+++ b/nmxact/nmble/ble_act.go
@@ -1190,3 +1190,25 @@ func setPreferredMtu(x *BleXport, bl *Listener,
}
}
}
+
+func checkSync(x *BleXport, bl *Listener, r *BleSyncReq) (bool, error) {
+ j, err := json.Marshal(r)
+ if err != nil {
+ return false, err
+ }
+
+ if err := x.txNoSync(j); err != nil {
+ return false, err
+ }
+ for {
+ select {
+ case err := <-bl.ErrChan:
+ return false, err
+ case bm := <-bl.MsgChan:
+ switch msg := bm.(type) {
+ case *BleSyncRsp:
+ return msg.Synced, nil
+ }
+ }
+ }
+}
diff --git a/nmxact/nmble/ble_scanner.go b/nmxact/nmble/ble_scanner.go
index 7d8c7b4..3d2a05e 100644
--- a/nmxact/nmble/ble_scanner.go
+++ b/nmxact/nmble/ble_scanner.go
@@ -123,10 +123,8 @@ func (s *BleScanner) readHwId() (string, error) {
}
func (s *BleScanner) scan() (*scan.ScanPeer, error) {
- // Ensure subsequent calls to suspend() block.
- s.suspendBlocker.Block()
-
- // If the scanner is being suspended, unblock the suspend() call.
+ // Ensure subsequent calls to suspend() block until scanning has stopped.
+ s.suspendBlocker.Start()
defer s.suspendBlocker.Unblock(nil)
// Discover the first device which matches the specified predicate.
@@ -224,7 +222,7 @@ func (s *BleScanner) suspend() error {
}
// Block until scan is fully terminated.
- s.suspendBlocker.Wait(nmxutil.DURATION_FOREVER)
+ s.suspendBlocker.Wait(nmxutil.DURATION_FOREVER, nil)
s.discoverer = nil
s.bos = nil
diff --git a/nmxact/nmble/ble_sesn.go b/nmxact/nmble/ble_sesn.go
index 7ed350f..763997d 100644
--- a/nmxact/nmble/ble_sesn.go
+++ b/nmxact/nmble/ble_sesn.go
@@ -89,13 +89,10 @@ func (s *BleSesn) AbortRx(seq uint8) error {
return nil
}
+// Listens for disconnect in the background.
func (s *BleSesn) disconnectListen() {
- // Listen for disconnect in the background.
s.wg.Add(1)
go func() {
- // If the session is being closed, unblock the close() call.
- defer s.closeBlocker.Unblock(nil)
-
// Block until disconnect.
err := <-s.conn.DisconnectChan()
nmxutil.Assert(!s.IsOpen())
@@ -114,6 +111,9 @@ func (s *BleSesn) disconnectListen() {
if s.cfg.OnCloseCb != nil {
s.cfg.OnCloseCb(s, err)
}
+
+ // Finally, unblock the close() call, if any.
+ s.closeBlocker.Unblock(nil)
}()
}
@@ -199,7 +199,7 @@ func (s *BleSesn) openOnce() (bool, error) {
}
// Ensure subsequent calls to Close() block.
- s.closeBlocker.Block()
+ s.closeBlocker.Start()
// Listen for disconnect in the background.
s.disconnectListen()
@@ -270,13 +270,14 @@ func (s *BleSesn) OpenConnected(
if err := s.conn.Inherit(connHandle, eventListener); err != nil {
if !nmxutil.IsSesnAlreadyOpen(err) {
- s.closeBlocker.Unblock(nil)
+ //s.closeBlocker.Unblock(nil)
+ s.Close()
}
return err
}
// Ensure subsequent calls to Close() block.
- s.closeBlocker.Block()
+ s.closeBlocker.Start()
// Listen for disconnect in the background.
s.disconnectListen()
@@ -295,7 +296,7 @@ func (s *BleSesn) Close() error {
}
// Block until close completes.
- s.closeBlocker.Wait(nmxutil.DURATION_FOREVER)
+ s.closeBlocker.Wait(nmxutil.DURATION_FOREVER, nil)
return nil
}
diff --git a/nmxact/nmble/ble_util.go b/nmxact/nmble/ble_util.go
index 2903c41..6c814de 100644
--- a/nmxact/nmble/ble_util.go
+++ b/nmxact/nmble/ble_util.go
@@ -382,6 +382,14 @@ func NewFindChrReq() *BleFindChrReq {
}
}
+func NewSyncReq() *BleSyncReq {
+ return &BleSyncReq{
+ Op: MSG_OP_REQ,
+ Type: MSG_TYPE_SYNC,
+ Seq: NextSeq(),
+ }
+}
+
func ConnFindXact(x *BleXport, connHandle uint16) (BleConnDesc, error) {
r := NewBleConnFindReq()
r.ConnHandle = connHandle
@@ -835,41 +843,3 @@ func StopWaitingForMaster(bx *BleXport, prio MasterPrio, token interface{},
return fmt.Errorf("Invalid session priority: %+v", prio)
}
}
-
-type MasterPrio int
-
-const (
- // Lower number = higher priority.
- MASTER_PRIO_CONNECT = 0
- MASTER_PRIO_SCAN = 1
-)
-
-func AcquireMaster(bx *BleXport, prio MasterPrio, token interface{}) error {
- switch prio {
- case MASTER_PRIO_CONNECT:
- return bx.AcquireMasterConnect(token)
-
- case MASTER_PRIO_SCAN:
- return bx.AcquireMasterScan(token)
-
- default:
- return fmt.Errorf("Invalid session priority: %+v", prio)
- }
-}
-
-func StopWaitingForMaster(bx *BleXport, prio MasterPrio, token interface{},
- err error) error {
-
- switch prio {
- case MASTER_PRIO_CONNECT:
- bx.StopWaitingForMasterConnect(token, err)
- return nil
-
- case MASTER_PRIO_SCAN:
- bx.StopWaitingForMasterScan(token, err)
- return nil
-
- default:
- return fmt.Errorf("Invalid session priority: %+v", prio)
- }
-}
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index 36ad897..9261d21 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -21,7 +21,6 @@ package nmble
import (
"encoding/hex"
- "encoding/json"
"fmt"
"sync"
"time"
@@ -88,7 +87,7 @@ func NewXportCfg() XportCfg {
BlehostdRspTimeout: 10 * time.Second,
Restart: true,
SyncTimeout: 10 * time.Second,
- PreferredMtu: 264,
+ PreferredMtu: 512,
}
}
@@ -104,31 +103,32 @@ const (
// Implements xport.Xport.
type BleXport struct {
- cfg XportCfg
- d *Dispatcher
- client *unixchild.Client
- scanner *BleScanner
- state BleXportState
- stopChan chan struct{}
- shutdownChan chan bool
- readyBcast nmxutil.Bcaster
- master Master
- slave nmxutil.SingleResource
- randAddr *BleAddr
- mtx sync.Mutex
- advertiser *Advertiser
- cm ChrMgr
- sesns map[uint16]*BleSesn
+ advertiser *Advertiser
+ cfg XportCfg
+ client *unixchild.Client
+ cm ChrMgr
+ d *Dispatcher
+ master Master
+ mtx sync.Mutex
+ randAddr *BleAddr
+ readyBcast nmxutil.Bcaster
+ scanner *BleScanner
+ sesns map[uint16]*BleSesn
+ shutdownBlocker nmxutil.Blocker
+ slave nmxutil.SingleResource
+ state BleXportState
+ stopChan chan struct{}
+ syncer Syncer
+ wg sync.WaitGroup
}
func NewBleXport(cfg XportCfg) (*BleXport, error) {
bx := &BleXport{
- cfg: cfg,
- d: NewDispatcher(),
- shutdownChan: make(chan bool),
- readyBcast: nmxutil.Bcaster{},
- slave: nmxutil.NewSingleResource(),
- sesns: map[uint16]*BleSesn{},
+ cfg: cfg,
+ d: NewDispatcher(),
+ readyBcast: nmxutil.Bcaster{},
+ slave: nmxutil.NewSingleResource(),
+ sesns: map[uint16]*BleSesn{},
}
bx.advertiser = NewAdvertiser(bx)
@@ -182,139 +182,89 @@ func (bx *BleXport) BuildSesn(cfg sesn.SesnCfg) (sesn.Sesn, error) {
return NewBleSesn(bx, cfg, MASTER_PRIO_CONNECT)
}
-func (bx *BleXport) addSyncListener() (*Listener, error) {
- key := TchKey(MSG_TYPE_SYNC_EVT, -1)
- nmxutil.LogAddListener(3, key, 0, "sync")
- return bx.AddListener(key)
-}
-
-func (bx *BleXport) addResetListener() (*Listener, error) {
- key := TchKey(MSG_TYPE_RESET_EVT, -1)
- nmxutil.LogAddListener(3, key, 0, "reset")
- return bx.AddListener(key)
-}
-
func (bx *BleXport) addAccessListener() (*Listener, error) {
key := TchKey(MSG_TYPE_ACCESS_EVT, -1)
nmxutil.LogAddListener(3, key, 0, "access")
return bx.AddListener(key)
}
-func (bx *BleXport) querySyncStatus() (bool, error) {
- req := &BleSyncReq{
- Op: MSG_OP_REQ,
- Type: MSG_TYPE_SYNC,
- Seq: NextSeq(),
- }
+func (bx *BleXport) shutdown(restart bool, err error) {
+ nmxutil.Assert(nmxutil.IsXport(err))
- j, err := json.Marshal(req)
- if err != nil {
- return false, err
- }
+ // Prevents repeated shutdowns without keeping the mutex locked throughout
+ // the duration of the shutdown.
+ //
+ // @return bool true if a shutdown was successfully initiated.
+ initiate := func() bool {
+ bx.mtx.Lock()
+ defer bx.mtx.Unlock()
- key := SeqKey(req.Seq)
- bl, err := bx.AddListener(key)
- if err != nil {
- return false, err
- }
- defer bx.RemoveListener(bl)
+ if bx.state == BLE_XPORT_STATE_STARTED ||
+ bx.state == BLE_XPORT_STATE_STARTING {
- if err := bx.txNoSync(j); err != nil {
- return false, err
- }
- for {
- select {
- case err := <-bl.ErrChan:
- return false, err
- case bm := <-bl.MsgChan:
- switch msg := bm.(type) {
- case *BleSyncRsp:
- return msg.Synced, nil
- }
+ bx.state = BLE_XPORT_STATE_STOPPING
+ return true
+ } else {
+ return false
}
}
-}
-
-func (bx *BleXport) initialSyncCheck() (bool, *Listener, error) {
- bl, err := bx.addSyncListener()
- if err != nil {
- return false, nil, err
- }
-
- synced, err := bx.querySyncStatus()
- if err != nil {
- bx.RemoveListener(bl)
- return false, nil, err
- }
-
- return synced, bl, nil
-}
-func (bx *BleXport) shutdown(restart bool, err error) {
- nmxutil.Assert(nmxutil.IsXport(err))
-
- log.Debugf("Shutting down BLE transport")
-
- bx.mtx.Lock()
-
- var fullyStarted bool
- var already bool
+ go func() {
+ log.Debugf("Shutting down BLE transport")
- switch bx.state {
- case BLE_XPORT_STATE_STARTED:
- already = false
- fullyStarted = true
- case BLE_XPORT_STATE_STARTING:
- already = false
- fullyStarted = false
- default:
- already = true
- }
+ success := initiate()
+ if !success {
+ // Shutdown already in progress.
+ return
+ }
- if !already {
- bx.state = BLE_XPORT_STATE_STOPPING
- }
+ bx.sesns = map[uint16]*BleSesn{}
- bx.mtx.Unlock()
+ // Indicate error to all clients who are waiting for the master
+ // resource.
+ log.Debugf("Aborting BLE master")
+ bx.master.Abort(err)
- if already {
- // Shutdown already in progress.
- return
- }
+ // Indicate an error to all of this transport's listeners. This
+ // prevents them from blocking endlessly while awaiting a BLE message.
+ log.Debugf("Stopping BLE dispatcher")
+ bx.d.ErrorAll(err)
- bx.sesns = map[uint16]*BleSesn{}
+ synced, err := bx.syncer.Refresh()
+ if err == nil && synced {
+ // Reset controller so that all outstanding connections terminate.
+ ResetXact(bx)
+ }
- // Indicate error to all clients who are waiting for the master resource.
- log.Debugf("Aborting BLE master")
- bx.master.Abort(err)
+ bx.syncer.Stop()
- // Indicate an error to all of this transport's listeners. This prevents
- // them from blocking endlessly while awaiting a BLE message.
- log.Debugf("Stopping BLE dispatcher")
- bx.d.ErrorAll(err)
+ // Stop all of this transport's go routines.
+ close(bx.stopChan)
+ bx.wg.Wait()
- synced, err := bx.querySyncStatus()
- if err == nil && synced {
- // Reset controller so that all outstanding connections terminate.
- ResetXact(bx)
- }
+ // Stop the unixchild instance (blehostd + socket).
+ if bx.client != nil {
+ log.Debugf("Stopping unixchild")
+ bx.client.Stop()
+ }
- // Stop all of this transport's go routines.
- close(bx.stopChan)
+ bx.setStateFrom(BLE_XPORT_STATE_STOPPING, BLE_XPORT_STATE_STOPPED)
- // Stop the unixchild instance (blehostd + socket).
- if bx.client != nil {
- log.Debugf("Stopping unixchild")
- bx.client.Stop()
- }
+ // Indicate that the shutdown is complete. If restarts are enabled on
+ // this transport, this signals that the transport should be started
+ // again.
+ bx.shutdownBlocker.UnblockAndRestart(restart)
+ }()
+}
- bx.setStateFrom(BLE_XPORT_STATE_STOPPING, BLE_XPORT_STATE_STOPPED)
+func (bx *BleXport) waitForShutdown() bool {
+ itf, _ := bx.shutdownBlocker.Wait(nmxutil.DURATION_FOREVER, nil)
+ return itf.(bool)
+}
- // Indicate that the shutdown is complete. If restarts are enabled on this
- // transport, this signals that the transport should be started again.
- if fullyStarted {
- bx.shutdownChan <- restart
- }
+func (bx *BleXport) blockingShutdown(restart bool, err error) {
+ bx.shutdown(restart, err)
+ bx.waitForShutdown()
}
func (bx *BleXport) blockUntilReady() error {
@@ -374,7 +324,7 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
}
func (bx *BleXport) Stop() error {
- bx.shutdown(false, nmxutil.NewXportError("xport stopped"))
+ bx.blockingShutdown(false, nmxutil.NewXportError("xport stopped"))
return nil
}
@@ -386,12 +336,15 @@ func (bx *BleXport) startOnce() error {
bx.stopChan = make(chan struct{})
if err := bx.startUnixChild(); err != nil {
- bx.shutdown(true, err)
+ bx.blockingShutdown(true, err)
return err
}
// Listen for errors and data from the blehostd process.
+ bx.wg.Add(1)
go func() {
+ defer bx.wg.Done()
+
for {
select {
case err := <-bx.client.ErrChild:
@@ -411,49 +364,28 @@ func (bx *BleXport) startOnce() error {
}
}()
- synced, syncl, err := bx.initialSyncCheck()
- if err != nil {
- bx.shutdown(true, err)
+ if err := bx.syncer.Start(bx); err != nil {
+ bx.blockingShutdown(true, err)
return err
}
// Block until host and controller are synced.
- if !synced {
- SyncLoop:
- for {
- select {
- case err := <-syncl.ErrChan:
- bx.shutdown(true, err)
- return err
- case bm := <-syncl.MsgChan:
- switch msg := bm.(type) {
- case *BleSyncEvt:
- if msg.Synced {
- break SyncLoop
- }
- }
- case <-time.After(bx.cfg.SyncTimeout):
- err := nmxutil.NewXportError(
- "Timeout waiting for host <-> controller sync")
- bx.shutdown(true, err)
- return err
- case <-bx.stopChan:
- return nmxutil.NewXportError("Transport startup aborted")
- }
- }
+ if err := bx.syncer.BlockUntilSynced(
+ bx.cfg.SyncTimeout, bx.stopChan); err != nil {
+
+ err = nmxutil.NewXportError(
+ "Error waiting for host <-> controller sync: " + err.Error())
+ bx.blockingShutdown(true, err)
+ return err
}
// Host and controller are synced. Listen for events in the background:
// * sync loss
// * stack reset
// * GATT access
+ bx.wg.Add(1)
go func() {
- resetl, err := bx.addResetListener()
- if err != nil {
- bx.shutdown(true, err)
- return
- }
- defer bx.RemoveListener(resetl)
+ defer bx.wg.Done()
accessl, err := bx.addAccessListener()
if err != nil {
@@ -464,24 +396,8 @@ func (bx *BleXport) startOnce() error {
for {
select {
- case err := <-syncl.ErrChan:
- bx.shutdown(true, err)
- return
- case bm := <-syncl.MsgChan:
- switch msg := bm.(type) {
- case *BleSyncEvt:
- if !msg.Synced {
- bx.shutdown(true, nmxutil.NewXportError(
- "BLE host <-> controller sync lost"))
- }
- }
-
- case err := <-resetl.ErrChan:
- bx.shutdown(true, err)
- return
- case bm := <-resetl.MsgChan:
- switch msg := bm.(type) {
- case *BleResetEvt:
+ case reasonItf, ok := <-bx.syncer.ListenReset():
+ if ok {
// Only process the reset event if the transport is not
// already shutting down. If in mid-shutdown, the reset
// event was likely elicited by the shutdown itself.
@@ -489,23 +405,36 @@ func (bx *BleXport) startOnce() error {
if state == BLE_XPORT_STATE_STARTING ||
state == BLE_XPORT_STATE_STARTED {
+ reason := reasonItf.(int)
bx.shutdown(true, nmxutil.NewXportError(fmt.Sprintf(
"The BLE controller has been reset by the host; "+
"reason=%s (%d)",
- ErrCodeToString(msg.Reason), msg.Reason)))
- return
+ ErrCodeToString(reason), reason)))
}
}
- case err := <-accessl.ErrChan:
- bx.shutdown(true, err)
- return
- case bm := <-accessl.MsgChan:
- switch msg := bm.(type) {
- case *BleAccessEvt:
- if err := bx.cm.Access(bx, msg); err != nil {
- log.Debugf("Error sending access status: %s",
- err.Error())
+ case syncedItf, ok := <-bx.syncer.ListenSync():
+ if ok {
+ synced := syncedItf.(bool)
+ if !synced {
+ bx.shutdown(true, nmxutil.NewXportError(
+ "BLE host <-> controller sync lost"))
+ }
+ }
+
+ case err, ok := <-accessl.ErrChan:
+ if ok {
+ bx.shutdown(true, err)
+ }
+
+ case bm, ok := <-accessl.MsgChan:
+ if ok {
+ switch msg := bm.(type) {
+ case *BleAccessEvt:
+ if err := bx.cm.Access(bx, msg); err != nil {
+ log.Debugf("Error sending access status: %s",
+ err.Error())
+ }
}
}
@@ -519,7 +448,7 @@ func (bx *BleXport) startOnce() error {
if bx.randAddr == nil {
addr, err := GenRandAddrXact(bx)
if err != nil {
- bx.shutdown(true, err)
+ bx.blockingShutdown(true, err)
return err
}
@@ -528,20 +457,19 @@ func (bx *BleXport) startOnce() error {
// Set the random address on the controller.
if err := SetRandAddrXact(bx, *bx.randAddr); err != nil {
- bx.shutdown(true, err)
+ bx.blockingShutdown(true, err)
return err
}
// Set the preferred ATT MTU in the host.
if err := SetPreferredMtuXact(bx, bx.cfg.PreferredMtu); err != nil {
- bx.shutdown(true, err)
+ bx.blockingShutdown(true, err)
return err
}
if !bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STARTED) {
- bx.shutdown(true, err)
- return nmxutil.NewXportError(
- "Internal error; BLE transport in unexpected state")
+ bx.blockingShutdown(true, nmxutil.NewXportError(
+ "Internal error; BLE transport in unexpected state"))
}
return nil
@@ -552,6 +480,8 @@ func (bx *BleXport) Start() error {
return nmxutil.NewXportError("BLE xport started twice")
}
+ bx.shutdownBlocker.Start()
+
// Try to start the transport. If this first attempt fails, report the
// error and don't retry.
if err := bx.startOnce(); err != nil {
@@ -562,10 +492,12 @@ func (bx *BleXport) Start() error {
}
// Now that the first start attempt has succeeded, start a restart loop in
- // the background.
+ // the background. This Go routine does not participate in the wait group
+ // because it terminates itself independent of the others.
go func() {
// Block until transport shuts down.
- restart := <-bx.shutdownChan
+ restart := bx.waitForShutdown()
+
for {
// If restarts are disabled, or if the shutdown was a result of an
// explicit stop call (instead of an unexpected error), stop
@@ -587,7 +519,7 @@ func (bx *BleXport) Start() error {
err.Error())
} else {
// Success. Block until the transport shuts down.
- restart = <-bx.shutdownChan
+ restart = bx.waitForShutdown()
}
}
}()
diff --git a/nmxact/nmble/conn.go b/nmxact/nmble/conn.go
index 9e657f7..c8a98bf 100644
--- a/nmxact/nmble/conn.go
+++ b/nmxact/nmble/conn.go
@@ -84,11 +84,11 @@ func (c *Conn) abortNotifyListeners(err error) {
}
}
-func (c *Conn) shutdown(err error) {
+func (c *Conn) shutdown(delay time.Duration, err error) {
// Returns true if a shutdown was successfully initiated. Prevents
// repeated shutdowns without keeping the mutex locked throughout the
// duration of the shutdown.
- commit := func() bool {
+ initiate := func() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
@@ -101,16 +101,14 @@ func (c *Conn) shutdown(err error) {
return true
}
- // This function runs in a Go routine to prevent deadlock. The caller
- // likely needs to return and release the wait group before this function
- // can complete.
go func() {
- if !commit() {
- return
+ if delay > 0 {
+ time.Sleep(delay)
}
- c.connecting = false
- c.connHandle = BLE_CONN_HANDLE_NONE
+ if !initiate() {
+ return
+ }
StopWaitingForMaster(c.bx, c.prio, c, err)
@@ -119,6 +117,9 @@ func (c *Conn) shutdown(err error) {
c.wg.Wait()
+ c.connecting = false
+ c.connHandle = BLE_CONN_HANDLE_NONE
+
c.abortNotifyListeners(err)
c.disconnectChan <- err
@@ -126,12 +127,6 @@ func (c *Conn) shutdown(err error) {
}()
}
-// Forces a shutdown after the specified delay. If a shutdown happens in the
-// meantime, the delayed procedure is a no-op.
-func (c *Conn) shutdownIn(delay time.Duration, err error) {
-
-}
-
func (c *Conn) newDisconnectError(reason int) error {
str := fmt.Sprintf("BLE peer disconnected; "+
"reason=\"%s\" (%d) connection=%s",
@@ -155,7 +150,7 @@ func (c *Conn) eventListen(bl *Listener) error {
case err, ok := <-bl.ErrChan:
if ok {
- go c.shutdown(err)
+ c.shutdown(0, err)
}
return
@@ -194,7 +189,7 @@ func (c *Conn) eventListen(bl *Listener) error {
c.encBlocker.Unblock(err)
case *BleDisconnectEvt:
- go c.shutdown(c.newDisconnectError(msg.Reason))
+ c.shutdown(0, c.newDisconnectError(msg.Reason))
return
default:
@@ -696,12 +691,12 @@ func (c *Conn) InitiateSecurity() error {
}
defer c.rxvr.RemoveListener("security-initiate", bl)
- c.encBlocker.Block()
+ c.encBlocker.Start()
if err := securityInitiate(c.bx, bl, r); err != nil {
return err
}
- encErr, tmoErr := c.encBlocker.Wait(time.Second * 15)
+ encErr, tmoErr := c.encBlocker.Wait(time.Second*15, c.stopChan)
if encErr != nil {
return encErr.(error)
}
@@ -758,19 +753,30 @@ func (c *Conn) Stop() error {
c.mtx.Lock()
defer c.mtx.Unlock()
+ var shutdownDelay time.Duration
+ var shutdownErr error
+
if c.connHandle != BLE_CONN_HANDLE_NONE {
// Terminate the connection. On success, the conn object will shut
// down upon receipt of the disconnect event. On failure, just force a
// shutdown manually.
if err := c.terminate(); err != nil {
- go c.shutdown(err)
+ shutdownDelay = 0
+ shutdownErr = err
+ } else {
+ // Force a shutdown in 10 seconds in case we never receive a
+ // disconnect event.
+ shutdownDelay = 10 * time.Second
+ shutdownErr = fmt.Errorf("forced shutdown; disconnect timeout")
}
} else {
if c.connecting {
c.connCancel()
}
- go c.shutdown(fmt.Errorf("Stopped"))
+ shutdownDelay = 0
+ shutdownErr = fmt.Errorf("Stopped before connect complete")
}
+ c.shutdown(shutdownDelay, shutdownErr)
return nil
}
diff --git a/nmxact/nmble/sync.go b/nmxact/nmble/sync.go
new file mode 100644
index 0000000..95cae0f
--- /dev/null
+++ b/nmxact/nmble/sync.go
@@ -0,0 +1,202 @@
+package nmble
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "mynewt.apache.org/newtmgr/nmxact/nmxutil"
+)
+
+const syncPollRate = time.Second
+
+type Syncer struct {
+ x *BleXport
+ stopCh chan struct{}
+ wg sync.WaitGroup
+ synced bool
+ syncBlocker nmxutil.Blocker
+ mtx sync.Mutex
+
+ resetBcaster nmxutil.Bcaster
+ syncBcaster nmxutil.Bcaster
+}
+
+func (s *Syncer) Refresh() (bool, error) {
+ r := NewSyncReq()
+ bl, err := s.x.AddListener(SeqKey(r.Seq))
+ if err != nil {
+ return false, err
+ }
+ defer s.x.RemoveListener(bl)
+
+ synced, err := checkSync(s.x, bl, r)
+ if err != nil {
+ return false, err
+ }
+
+ s.setSynced(synced)
+ return synced, nil
+}
+
+func (s *Syncer) checkSyncLoop() {
+ doneCh := make(chan struct{})
+
+ s.wg.Add(1)
+ go func() {
+ defer s.wg.Done()
+
+ s.BlockUntilSynced(nmxutil.DURATION_FOREVER, s.stopCh)
+ close(doneCh)
+ }()
+
+ s.wg.Add(1)
+ go func() {
+ defer s.wg.Done()
+
+ for {
+ s.Refresh()
+
+ select {
+ case <-doneCh:
+ return
+
+ case <-s.stopCh:
+ return
+
+ case <-time.After(syncPollRate):
+ }
+ }
+ }()
+}
+
+func (s *Syncer) setSynced(synced bool) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ if synced == s.synced {
+ return
+ }
+
+ s.synced = synced
+ if s.synced {
+ s.syncBlocker.Unblock(nil)
+ } else {
+ s.syncBlocker.Start()
+
+ // Listen for sync loss and reset in the background.
+ s.checkSyncLoop()
+ }
+ s.syncBcaster.Send(s.synced)
+}
+
+func (s *Syncer) addSyncListener() (*Listener, error) {
+ key := TchKey(MSG_TYPE_SYNC_EVT, -1)
+ nmxutil.LogAddListener(3, key, 0, "sync")
+ return s.x.AddListener(key)
+}
+
+func (s *Syncer) addResetListener() (*Listener, error) {
+ key := TchKey(MSG_TYPE_RESET_EVT, -1)
+ nmxutil.LogAddListener(3, key, 0, "reset")
+ return s.x.AddListener(key)
+}
+
+func (s *Syncer) listen() error {
+ errChan := make(chan error)
+
+ s.wg.Add(1)
+ go func() {
+ defer s.wg.Done()
+
+ // Initial actions can cause an error to be returned.
+ syncl, err := s.addSyncListener()
+ if err != nil {
+ errChan <- err
+ close(errChan)
+ return
+ }
+ defer s.x.RemoveListener(syncl)
+
+ resetl, err := s.addResetListener()
+ if err != nil {
+ errChan <- err
+ close(errChan)
+ return
+ }
+ defer s.x.RemoveListener(resetl)
+
+ // Initial actions complete.
+ close(errChan)
+
+ for {
+ select {
+ case <-syncl.ErrChan:
+ // XXX
+ case bm := <-syncl.MsgChan:
+ switch msg := bm.(type) {
+ case *BleSyncEvt:
+ s.setSynced(msg.Synced)
+ }
+
+ case <-resetl.ErrChan:
+ // XXX
+ case bm := <-resetl.MsgChan:
+ switch msg := bm.(type) {
+ case *BleResetEvt:
+ s.setSynced(false)
+ s.resetBcaster.Send(msg.Reason)
+ }
+
+ case <-s.stopCh:
+ return
+ }
+ }
+ }()
+
+ return <-errChan
+}
+
+func (s *Syncer) shutdown() {
+ s.syncBcaster.Clear()
+ s.resetBcaster.Clear()
+ s.syncBlocker.Unblock(nil)
+
+ s.stopCh = nil
+}
+
+func (s *Syncer) Start(x *BleXport) error {
+ s.x = x
+ s.stopCh = make(chan struct{})
+ s.syncBlocker.Start()
+ s.checkSyncLoop()
+ return s.listen()
+}
+
+func (s *Syncer) Stop() error {
+ if s.stopCh == nil {
+ return fmt.Errorf("Syncer already stopped")
+ }
+
+ close(s.stopCh)
+ s.wg.Wait()
+
+ s.shutdown()
+
+ return nil
+}
+
+func (s *Syncer) BlockUntilSynced(timeout time.Duration,
+ stopChan <-chan struct{}) error {
+
+ _, err := s.syncBlocker.Wait(timeout, stopChan)
+ return err
+}
+
+func (s *Syncer) ListenSync() chan interface{} {
+ return s.syncBcaster.Listen()
+}
+
+func (s *Syncer) ListenReset() chan interface{} {
+ return s.resetBcaster.Listen()
+}
diff --git a/nmxact/nmxutil/block.go b/nmxact/nmxutil/block.go
index 45b2e99..3ff5093 100644
--- a/nmxact/nmxutil/block.go
+++ b/nmxact/nmxutil/block.go
@@ -33,7 +33,23 @@ type Blocker struct {
val interface{}
}
-func (b *Blocker) Wait(timeout time.Duration) (interface{}, error) {
+func (b *Blocker) unblockNoLock(val interface{}) {
+ if b.ch != nil {
+ b.val = val
+ close(b.ch)
+ b.ch = nil
+ }
+}
+
+func (b *Blocker) startNoLock() {
+ if b.ch == nil {
+ b.ch = make(chan struct{})
+ }
+}
+
+func (b *Blocker) Wait(timeout time.Duration, stopChan <-chan struct{}) (
+ interface{}, error) {
+
b.mtx.Lock()
ch := b.ch
b.mtx.Unlock()
@@ -42,6 +58,10 @@ func (b *Blocker) Wait(timeout time.Duration) (interface{}, error) {
return b.val, nil
}
+ if stopChan == nil {
+ stopChan = make(chan struct{})
+ }
+
timer := time.NewTimer(timeout)
select {
case <-ch:
@@ -49,25 +69,29 @@ func (b *Blocker) Wait(timeout time.Duration) (interface{}, error) {
return b.val, nil
case <-timer.C:
return nil, fmt.Errorf("timeout after %s", timeout.String())
+ case <-stopChan:
+ return nil, fmt.Errorf("aborted")
}
}
-func (b *Blocker) Block() {
+func (b *Blocker) Start() {
b.mtx.Lock()
defer b.mtx.Unlock()
- if b.ch == nil {
- b.ch = make(chan struct{})
- }
+ b.startNoLock()
}
func (b *Blocker) Unblock(val interface{}) {
b.mtx.Lock()
defer b.mtx.Unlock()
- if b.ch != nil {
- b.val = val
- close(b.ch)
- b.ch = nil
- }
+ b.unblockNoLock(val)
+}
+
+func (b *Blocker) UnblockAndRestart(val interface{}) {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+
+ b.unblockNoLock(val)
+ b.startNoLock()
}
--
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.