You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/08/23 17:45:55 UTC

[mynewt-newtmgr] 03/05: nmxact - Poll bhd for sync status.

This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git

commit f5c50f1e07c8d09a115920584cdb9993832b4b54
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Tue Aug 22 17:03:16 2017 -0700

    nmxact - Poll bhd for sync status.
---
 nmxact/nmble/ble_act.go     |  22 +++
 nmxact/nmble/ble_scanner.go |   8 +-
 nmxact/nmble/ble_sesn.go    |  17 +--
 nmxact/nmble/ble_util.go    |  46 ++----
 nmxact/nmble/ble_xport.go   | 340 ++++++++++++++++++--------------------------
 nmxact/nmble/conn.go        |  48 ++++---
 nmxact/nmble/sync.go        | 202 ++++++++++++++++++++++++++
 nmxact/nmxutil/block.go     |  44 ++++--
 8 files changed, 441 insertions(+), 286 deletions(-)

diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go
index 7e76686..7a8677a 100644
--- a/nmxact/nmble/ble_act.go
+++ b/nmxact/nmble/ble_act.go
@@ -1190,3 +1190,25 @@ func setPreferredMtu(x *BleXport, bl *Listener,
 		}
 	}
 }
+
+func checkSync(x *BleXport, bl *Listener, r *BleSyncReq) (bool, error) {
+	j, err := json.Marshal(r)
+	if err != nil {
+		return false, err
+	}
+
+	if err := x.txNoSync(j); err != nil {
+		return false, err
+	}
+	for {
+		select {
+		case err := <-bl.ErrChan:
+			return false, err
+		case bm := <-bl.MsgChan:
+			switch msg := bm.(type) {
+			case *BleSyncRsp:
+				return msg.Synced, nil
+			}
+		}
+	}
+}
diff --git a/nmxact/nmble/ble_scanner.go b/nmxact/nmble/ble_scanner.go
index 7d8c7b4..3d2a05e 100644
--- a/nmxact/nmble/ble_scanner.go
+++ b/nmxact/nmble/ble_scanner.go
@@ -123,10 +123,8 @@ func (s *BleScanner) readHwId() (string, error) {
 }
 
 func (s *BleScanner) scan() (*scan.ScanPeer, error) {
-	// Ensure subsequent calls to suspend() block.
-	s.suspendBlocker.Block()
-
-	// If the scanner is being suspended, unblock the suspend() call.
+	// Ensure subsequent calls to suspend() block until scanning has stopped.
+	s.suspendBlocker.Start()
 	defer s.suspendBlocker.Unblock(nil)
 
 	// Discover the first device which matches the specified predicate.
@@ -224,7 +222,7 @@ func (s *BleScanner) suspend() error {
 	}
 
 	// Block until scan is fully terminated.
-	s.suspendBlocker.Wait(nmxutil.DURATION_FOREVER)
+	s.suspendBlocker.Wait(nmxutil.DURATION_FOREVER, nil)
 
 	s.discoverer = nil
 	s.bos = nil
diff --git a/nmxact/nmble/ble_sesn.go b/nmxact/nmble/ble_sesn.go
index 7ed350f..763997d 100644
--- a/nmxact/nmble/ble_sesn.go
+++ b/nmxact/nmble/ble_sesn.go
@@ -89,13 +89,10 @@ func (s *BleSesn) AbortRx(seq uint8) error {
 	return nil
 }
 
+// Listens for disconnect in the background.
 func (s *BleSesn) disconnectListen() {
-	// Listen for disconnect in the background.
 	s.wg.Add(1)
 	go func() {
-		// If the session is being closed, unblock the close() call.
-		defer s.closeBlocker.Unblock(nil)
-
 		// Block until disconnect.
 		err := <-s.conn.DisconnectChan()
 		nmxutil.Assert(!s.IsOpen())
@@ -114,6 +111,9 @@ func (s *BleSesn) disconnectListen() {
 		if s.cfg.OnCloseCb != nil {
 			s.cfg.OnCloseCb(s, err)
 		}
+
+		// Finally, unblock the close() call, if any.
+		s.closeBlocker.Unblock(nil)
 	}()
 }
 
@@ -199,7 +199,7 @@ func (s *BleSesn) openOnce() (bool, error) {
 	}
 
 	// Ensure subsequent calls to Close() block.
-	s.closeBlocker.Block()
+	s.closeBlocker.Start()
 
 	// Listen for disconnect in the background.
 	s.disconnectListen()
@@ -270,13 +270,14 @@ func (s *BleSesn) OpenConnected(
 
 	if err := s.conn.Inherit(connHandle, eventListener); err != nil {
 		if !nmxutil.IsSesnAlreadyOpen(err) {
-			s.closeBlocker.Unblock(nil)
+			//s.closeBlocker.Unblock(nil)
+			s.Close()
 		}
 		return err
 	}
 
 	// Ensure subsequent calls to Close() block.
-	s.closeBlocker.Block()
+	s.closeBlocker.Start()
 
 	// Listen for disconnect in the background.
 	s.disconnectListen()
@@ -295,7 +296,7 @@ func (s *BleSesn) Close() error {
 	}
 
 	// Block until close completes.
-	s.closeBlocker.Wait(nmxutil.DURATION_FOREVER)
+	s.closeBlocker.Wait(nmxutil.DURATION_FOREVER, nil)
 	return nil
 }
 
diff --git a/nmxact/nmble/ble_util.go b/nmxact/nmble/ble_util.go
index 2903c41..6c814de 100644
--- a/nmxact/nmble/ble_util.go
+++ b/nmxact/nmble/ble_util.go
@@ -382,6 +382,14 @@ func NewFindChrReq() *BleFindChrReq {
 	}
 }
 
+func NewSyncReq() *BleSyncReq {
+	return &BleSyncReq{
+		Op:   MSG_OP_REQ,
+		Type: MSG_TYPE_SYNC,
+		Seq:  NextSeq(),
+	}
+}
+
 func ConnFindXact(x *BleXport, connHandle uint16) (BleConnDesc, error) {
 	r := NewBleConnFindReq()
 	r.ConnHandle = connHandle
@@ -835,41 +843,3 @@ func StopWaitingForMaster(bx *BleXport, prio MasterPrio, token interface{},
 		return fmt.Errorf("Invalid session priority: %+v", prio)
 	}
 }
-
-type MasterPrio int
-
-const (
-	// Lower number = higher priority.
-	MASTER_PRIO_CONNECT = 0
-	MASTER_PRIO_SCAN    = 1
-)
-
-func AcquireMaster(bx *BleXport, prio MasterPrio, token interface{}) error {
-	switch prio {
-	case MASTER_PRIO_CONNECT:
-		return bx.AcquireMasterConnect(token)
-
-	case MASTER_PRIO_SCAN:
-		return bx.AcquireMasterScan(token)
-
-	default:
-		return fmt.Errorf("Invalid session priority: %+v", prio)
-	}
-}
-
-func StopWaitingForMaster(bx *BleXport, prio MasterPrio, token interface{},
-	err error) error {
-
-	switch prio {
-	case MASTER_PRIO_CONNECT:
-		bx.StopWaitingForMasterConnect(token, err)
-		return nil
-
-	case MASTER_PRIO_SCAN:
-		bx.StopWaitingForMasterScan(token, err)
-		return nil
-
-	default:
-		return fmt.Errorf("Invalid session priority: %+v", prio)
-	}
-}
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index 36ad897..9261d21 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -21,7 +21,6 @@ package nmble
 
 import (
 	"encoding/hex"
-	"encoding/json"
 	"fmt"
 	"sync"
 	"time"
@@ -88,7 +87,7 @@ func NewXportCfg() XportCfg {
 		BlehostdRspTimeout:    10 * time.Second,
 		Restart:               true,
 		SyncTimeout:           10 * time.Second,
-		PreferredMtu:          264,
+		PreferredMtu:          512,
 	}
 }
 
@@ -104,31 +103,32 @@ const (
 
 // Implements xport.Xport.
 type BleXport struct {
-	cfg          XportCfg
-	d            *Dispatcher
-	client       *unixchild.Client
-	scanner      *BleScanner
-	state        BleXportState
-	stopChan     chan struct{}
-	shutdownChan chan bool
-	readyBcast   nmxutil.Bcaster
-	master       Master
-	slave        nmxutil.SingleResource
-	randAddr     *BleAddr
-	mtx          sync.Mutex
-	advertiser   *Advertiser
-	cm           ChrMgr
-	sesns        map[uint16]*BleSesn
+	advertiser      *Advertiser
+	cfg             XportCfg
+	client          *unixchild.Client
+	cm              ChrMgr
+	d               *Dispatcher
+	master          Master
+	mtx             sync.Mutex
+	randAddr        *BleAddr
+	readyBcast      nmxutil.Bcaster
+	scanner         *BleScanner
+	sesns           map[uint16]*BleSesn
+	shutdownBlocker nmxutil.Blocker
+	slave           nmxutil.SingleResource
+	state           BleXportState
+	stopChan        chan struct{}
+	syncer          Syncer
+	wg              sync.WaitGroup
 }
 
 func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	bx := &BleXport{
-		cfg:          cfg,
-		d:            NewDispatcher(),
-		shutdownChan: make(chan bool),
-		readyBcast:   nmxutil.Bcaster{},
-		slave:        nmxutil.NewSingleResource(),
-		sesns:        map[uint16]*BleSesn{},
+		cfg:        cfg,
+		d:          NewDispatcher(),
+		readyBcast: nmxutil.Bcaster{},
+		slave:      nmxutil.NewSingleResource(),
+		sesns:      map[uint16]*BleSesn{},
 	}
 
 	bx.advertiser = NewAdvertiser(bx)
@@ -182,139 +182,89 @@ func (bx *BleXport) BuildSesn(cfg sesn.SesnCfg) (sesn.Sesn, error) {
 	return NewBleSesn(bx, cfg, MASTER_PRIO_CONNECT)
 }
 
-func (bx *BleXport) addSyncListener() (*Listener, error) {
-	key := TchKey(MSG_TYPE_SYNC_EVT, -1)
-	nmxutil.LogAddListener(3, key, 0, "sync")
-	return bx.AddListener(key)
-}
-
-func (bx *BleXport) addResetListener() (*Listener, error) {
-	key := TchKey(MSG_TYPE_RESET_EVT, -1)
-	nmxutil.LogAddListener(3, key, 0, "reset")
-	return bx.AddListener(key)
-}
-
 func (bx *BleXport) addAccessListener() (*Listener, error) {
 	key := TchKey(MSG_TYPE_ACCESS_EVT, -1)
 	nmxutil.LogAddListener(3, key, 0, "access")
 	return bx.AddListener(key)
 }
 
-func (bx *BleXport) querySyncStatus() (bool, error) {
-	req := &BleSyncReq{
-		Op:   MSG_OP_REQ,
-		Type: MSG_TYPE_SYNC,
-		Seq:  NextSeq(),
-	}
+func (bx *BleXport) shutdown(restart bool, err error) {
+	nmxutil.Assert(nmxutil.IsXport(err))
 
-	j, err := json.Marshal(req)
-	if err != nil {
-		return false, err
-	}
+	// Prevents repeated shutdowns without keeping the mutex locked throughout
+	// the duration of the shutdown.
+	//
+	// @return bool             true if a shutdown was successfully initiated.
+	initiate := func() bool {
+		bx.mtx.Lock()
+		defer bx.mtx.Unlock()
 
-	key := SeqKey(req.Seq)
-	bl, err := bx.AddListener(key)
-	if err != nil {
-		return false, err
-	}
-	defer bx.RemoveListener(bl)
+		if bx.state == BLE_XPORT_STATE_STARTED ||
+			bx.state == BLE_XPORT_STATE_STARTING {
 
-	if err := bx.txNoSync(j); err != nil {
-		return false, err
-	}
-	for {
-		select {
-		case err := <-bl.ErrChan:
-			return false, err
-		case bm := <-bl.MsgChan:
-			switch msg := bm.(type) {
-			case *BleSyncRsp:
-				return msg.Synced, nil
-			}
+			bx.state = BLE_XPORT_STATE_STOPPING
+			return true
+		} else {
+			return false
 		}
 	}
-}
-
-func (bx *BleXport) initialSyncCheck() (bool, *Listener, error) {
-	bl, err := bx.addSyncListener()
-	if err != nil {
-		return false, nil, err
-	}
-
-	synced, err := bx.querySyncStatus()
-	if err != nil {
-		bx.RemoveListener(bl)
-		return false, nil, err
-	}
-
-	return synced, bl, nil
-}
 
-func (bx *BleXport) shutdown(restart bool, err error) {
-	nmxutil.Assert(nmxutil.IsXport(err))
-
-	log.Debugf("Shutting down BLE transport")
-
-	bx.mtx.Lock()
-
-	var fullyStarted bool
-	var already bool
+	go func() {
+		log.Debugf("Shutting down BLE transport")
 
-	switch bx.state {
-	case BLE_XPORT_STATE_STARTED:
-		already = false
-		fullyStarted = true
-	case BLE_XPORT_STATE_STARTING:
-		already = false
-		fullyStarted = false
-	default:
-		already = true
-	}
+		success := initiate()
+		if !success {
+			// Shutdown already in progress.
+			return
+		}
 
-	if !already {
-		bx.state = BLE_XPORT_STATE_STOPPING
-	}
+		bx.sesns = map[uint16]*BleSesn{}
 
-	bx.mtx.Unlock()
+		// Indicate error to all clients who are waiting for the master
+		// resource.
+		log.Debugf("Aborting BLE master")
+		bx.master.Abort(err)
 
-	if already {
-		// Shutdown already in progress.
-		return
-	}
+		// Indicate an error to all of this transport's listeners.  This
+		// prevents them from blocking endlessly while awaiting a BLE message.
+		log.Debugf("Stopping BLE dispatcher")
+		bx.d.ErrorAll(err)
 
-	bx.sesns = map[uint16]*BleSesn{}
+		synced, err := bx.syncer.Refresh()
+		if err == nil && synced {
+			// Reset controller so that all outstanding connections terminate.
+			ResetXact(bx)
+		}
 
-	// Indicate error to all clients who are waiting for the master resource.
-	log.Debugf("Aborting BLE master")
-	bx.master.Abort(err)
+		bx.syncer.Stop()
 
-	// Indicate an error to all of this transport's listeners.  This prevents
-	// them from blocking endlessly while awaiting a BLE message.
-	log.Debugf("Stopping BLE dispatcher")
-	bx.d.ErrorAll(err)
+		// Stop all of this transport's go routines.
+		close(bx.stopChan)
+		bx.wg.Wait()
 
-	synced, err := bx.querySyncStatus()
-	if err == nil && synced {
-		// Reset controller so that all outstanding connections terminate.
-		ResetXact(bx)
-	}
+		// Stop the unixchild instance (blehostd + socket).
+		if bx.client != nil {
+			log.Debugf("Stopping unixchild")
+			bx.client.Stop()
+		}
 
-	// Stop all of this transport's go routines.
-	close(bx.stopChan)
+		bx.setStateFrom(BLE_XPORT_STATE_STOPPING, BLE_XPORT_STATE_STOPPED)
 
-	// Stop the unixchild instance (blehostd + socket).
-	if bx.client != nil {
-		log.Debugf("Stopping unixchild")
-		bx.client.Stop()
-	}
+		// Indicate that the shutdown is complete.  If restarts are enabled on
+		// this transport, this signals that the transport should be started
+		// again.
+		bx.shutdownBlocker.UnblockAndRestart(restart)
+	}()
+}
 
-	bx.setStateFrom(BLE_XPORT_STATE_STOPPING, BLE_XPORT_STATE_STOPPED)
+func (bx *BleXport) waitForShutdown() bool {
+	itf, _ := bx.shutdownBlocker.Wait(nmxutil.DURATION_FOREVER, nil)
+	return itf.(bool)
+}
 
-	// Indicate that the shutdown is complete.  If restarts are enabled on this
-	// transport, this signals that the transport should be started again.
-	if fullyStarted {
-		bx.shutdownChan <- restart
-	}
+func (bx *BleXport) blockingShutdown(restart bool, err error) {
+	bx.shutdown(restart, err)
+	bx.waitForShutdown()
 }
 
 func (bx *BleXport) blockUntilReady() error {
@@ -374,7 +324,7 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
 }
 
 func (bx *BleXport) Stop() error {
-	bx.shutdown(false, nmxutil.NewXportError("xport stopped"))
+	bx.blockingShutdown(false, nmxutil.NewXportError("xport stopped"))
 	return nil
 }
 
@@ -386,12 +336,15 @@ func (bx *BleXport) startOnce() error {
 	bx.stopChan = make(chan struct{})
 
 	if err := bx.startUnixChild(); err != nil {
-		bx.shutdown(true, err)
+		bx.blockingShutdown(true, err)
 		return err
 	}
 
 	// Listen for errors and data from the blehostd process.
+	bx.wg.Add(1)
 	go func() {
+		defer bx.wg.Done()
+
 		for {
 			select {
 			case err := <-bx.client.ErrChild:
@@ -411,49 +364,28 @@ func (bx *BleXport) startOnce() error {
 		}
 	}()
 
-	synced, syncl, err := bx.initialSyncCheck()
-	if err != nil {
-		bx.shutdown(true, err)
+	if err := bx.syncer.Start(bx); err != nil {
+		bx.blockingShutdown(true, err)
 		return err
 	}
 
 	// Block until host and controller are synced.
-	if !synced {
-	SyncLoop:
-		for {
-			select {
-			case err := <-syncl.ErrChan:
-				bx.shutdown(true, err)
-				return err
-			case bm := <-syncl.MsgChan:
-				switch msg := bm.(type) {
-				case *BleSyncEvt:
-					if msg.Synced {
-						break SyncLoop
-					}
-				}
-			case <-time.After(bx.cfg.SyncTimeout):
-				err := nmxutil.NewXportError(
-					"Timeout waiting for host <-> controller sync")
-				bx.shutdown(true, err)
-				return err
-			case <-bx.stopChan:
-				return nmxutil.NewXportError("Transport startup aborted")
-			}
-		}
+	if err := bx.syncer.BlockUntilSynced(
+		bx.cfg.SyncTimeout, bx.stopChan); err != nil {
+
+		err = nmxutil.NewXportError(
+			"Error waiting for host <-> controller sync: " + err.Error())
+		bx.blockingShutdown(true, err)
+		return err
 	}
 
 	// Host and controller are synced.  Listen for events in the background:
 	//     * sync loss
 	//     * stack reset
 	//     * GATT access
+	bx.wg.Add(1)
 	go func() {
-		resetl, err := bx.addResetListener()
-		if err != nil {
-			bx.shutdown(true, err)
-			return
-		}
-		defer bx.RemoveListener(resetl)
+		defer bx.wg.Done()
 
 		accessl, err := bx.addAccessListener()
 		if err != nil {
@@ -464,24 +396,8 @@ func (bx *BleXport) startOnce() error {
 
 		for {
 			select {
-			case err := <-syncl.ErrChan:
-				bx.shutdown(true, err)
-				return
-			case bm := <-syncl.MsgChan:
-				switch msg := bm.(type) {
-				case *BleSyncEvt:
-					if !msg.Synced {
-						bx.shutdown(true, nmxutil.NewXportError(
-							"BLE host <-> controller sync lost"))
-					}
-				}
-
-			case err := <-resetl.ErrChan:
-				bx.shutdown(true, err)
-				return
-			case bm := <-resetl.MsgChan:
-				switch msg := bm.(type) {
-				case *BleResetEvt:
+			case reasonItf, ok := <-bx.syncer.ListenReset():
+				if ok {
 					// Only process the reset event if the transport is not
 					// already shutting down.  If in mid-shutdown, the reset
 					// event was likely elicited by the shutdown itself.
@@ -489,23 +405,36 @@ func (bx *BleXport) startOnce() error {
 					if state == BLE_XPORT_STATE_STARTING ||
 						state == BLE_XPORT_STATE_STARTED {
 
+						reason := reasonItf.(int)
 						bx.shutdown(true, nmxutil.NewXportError(fmt.Sprintf(
 							"The BLE controller has been reset by the host; "+
 								"reason=%s (%d)",
-							ErrCodeToString(msg.Reason), msg.Reason)))
-						return
+							ErrCodeToString(reason), reason)))
 					}
 				}
 
-			case err := <-accessl.ErrChan:
-				bx.shutdown(true, err)
-				return
-			case bm := <-accessl.MsgChan:
-				switch msg := bm.(type) {
-				case *BleAccessEvt:
-					if err := bx.cm.Access(bx, msg); err != nil {
-						log.Debugf("Error sending access status: %s",
-							err.Error())
+			case syncedItf, ok := <-bx.syncer.ListenSync():
+				if ok {
+					synced := syncedItf.(bool)
+					if !synced {
+						bx.shutdown(true, nmxutil.NewXportError(
+							"BLE host <-> controller sync lost"))
+					}
+				}
+
+			case err, ok := <-accessl.ErrChan:
+				if ok {
+					bx.shutdown(true, err)
+				}
+
+			case bm, ok := <-accessl.MsgChan:
+				if ok {
+					switch msg := bm.(type) {
+					case *BleAccessEvt:
+						if err := bx.cm.Access(bx, msg); err != nil {
+							log.Debugf("Error sending access status: %s",
+								err.Error())
+						}
 					}
 				}
 
@@ -519,7 +448,7 @@ func (bx *BleXport) startOnce() error {
 	if bx.randAddr == nil {
 		addr, err := GenRandAddrXact(bx)
 		if err != nil {
-			bx.shutdown(true, err)
+			bx.blockingShutdown(true, err)
 			return err
 		}
 
@@ -528,20 +457,19 @@ func (bx *BleXport) startOnce() error {
 
 	// Set the random address on the controller.
 	if err := SetRandAddrXact(bx, *bx.randAddr); err != nil {
-		bx.shutdown(true, err)
+		bx.blockingShutdown(true, err)
 		return err
 	}
 
 	// Set the preferred ATT MTU in the host.
 	if err := SetPreferredMtuXact(bx, bx.cfg.PreferredMtu); err != nil {
-		bx.shutdown(true, err)
+		bx.blockingShutdown(true, err)
 		return err
 	}
 
 	if !bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STARTED) {
-		bx.shutdown(true, err)
-		return nmxutil.NewXportError(
-			"Internal error; BLE transport in unexpected state")
+		bx.blockingShutdown(true, nmxutil.NewXportError(
+			"Internal error; BLE transport in unexpected state"))
 	}
 
 	return nil
@@ -552,6 +480,8 @@ func (bx *BleXport) Start() error {
 		return nmxutil.NewXportError("BLE xport started twice")
 	}
 
+	bx.shutdownBlocker.Start()
+
 	// Try to start the transport.  If this first attempt fails, report the
 	// error and don't retry.
 	if err := bx.startOnce(); err != nil {
@@ -562,10 +492,12 @@ func (bx *BleXport) Start() error {
 	}
 
 	// Now that the first start attempt has succeeded, start a restart loop in
-	// the background.
+	// the background.  This Go routine does not participate in the wait group
+	// because it terminates itself independent of the others.
 	go func() {
 		// Block until transport shuts down.
-		restart := <-bx.shutdownChan
+		restart := bx.waitForShutdown()
+
 		for {
 			// If restarts are disabled, or if the shutdown was a result of an
 			// explicit stop call (instead of an unexpected error), stop
@@ -587,7 +519,7 @@ func (bx *BleXport) Start() error {
 					err.Error())
 			} else {
 				// Success.  Block until the transport shuts down.
-				restart = <-bx.shutdownChan
+				restart = bx.waitForShutdown()
 			}
 		}
 	}()
diff --git a/nmxact/nmble/conn.go b/nmxact/nmble/conn.go
index 9e657f7..c8a98bf 100644
--- a/nmxact/nmble/conn.go
+++ b/nmxact/nmble/conn.go
@@ -84,11 +84,11 @@ func (c *Conn) abortNotifyListeners(err error) {
 	}
 }
 
-func (c *Conn) shutdown(err error) {
+func (c *Conn) shutdown(delay time.Duration, err error) {
 	// Returns true if a shutdown was successfully initiated.  Prevents
 	// repeated shutdowns without keeping the mutex locked throughout the
 	// duration of the shutdown.
-	commit := func() bool {
+	initiate := func() bool {
 		c.mtx.Lock()
 		defer c.mtx.Unlock()
 
@@ -101,16 +101,14 @@ func (c *Conn) shutdown(err error) {
 		return true
 	}
 
-	// This function runs in a Go routine to prevent deadlock.  The caller
-	// likely needs to return and release the wait group before this function
-	// can complete.
 	go func() {
-		if !commit() {
-			return
+		if delay > 0 {
+			time.Sleep(delay)
 		}
 
-		c.connecting = false
-		c.connHandle = BLE_CONN_HANDLE_NONE
+		if !initiate() {
+			return
+		}
 
 		StopWaitingForMaster(c.bx, c.prio, c, err)
 
@@ -119,6 +117,9 @@ func (c *Conn) shutdown(err error) {
 
 		c.wg.Wait()
 
+		c.connecting = false
+		c.connHandle = BLE_CONN_HANDLE_NONE
+
 		c.abortNotifyListeners(err)
 
 		c.disconnectChan <- err
@@ -126,12 +127,6 @@ func (c *Conn) shutdown(err error) {
 	}()
 }
 
-// Forces a shutdown after the specified delay.  If a shutdown happens in the
-// meantime, the delayed procedure is a no-op.
-func (c *Conn) shutdownIn(delay time.Duration, err error) {
-
-}
-
 func (c *Conn) newDisconnectError(reason int) error {
 	str := fmt.Sprintf("BLE peer disconnected; "+
 		"reason=\"%s\" (%d) connection=%s",
@@ -155,7 +150,7 @@ func (c *Conn) eventListen(bl *Listener) error {
 
 			case err, ok := <-bl.ErrChan:
 				if ok {
-					go c.shutdown(err)
+					c.shutdown(0, err)
 				}
 				return
 
@@ -194,7 +189,7 @@ func (c *Conn) eventListen(bl *Listener) error {
 					c.encBlocker.Unblock(err)
 
 				case *BleDisconnectEvt:
-					go c.shutdown(c.newDisconnectError(msg.Reason))
+					c.shutdown(0, c.newDisconnectError(msg.Reason))
 					return
 
 				default:
@@ -696,12 +691,12 @@ func (c *Conn) InitiateSecurity() error {
 	}
 	defer c.rxvr.RemoveListener("security-initiate", bl)
 
-	c.encBlocker.Block()
+	c.encBlocker.Start()
 	if err := securityInitiate(c.bx, bl, r); err != nil {
 		return err
 	}
 
-	encErr, tmoErr := c.encBlocker.Wait(time.Second * 15)
+	encErr, tmoErr := c.encBlocker.Wait(time.Second*15, c.stopChan)
 	if encErr != nil {
 		return encErr.(error)
 	}
@@ -758,19 +753,30 @@ func (c *Conn) Stop() error {
 	c.mtx.Lock()
 	defer c.mtx.Unlock()
 
+	var shutdownDelay time.Duration
+	var shutdownErr error
+
 	if c.connHandle != BLE_CONN_HANDLE_NONE {
 		// Terminate the connection.  On success, the conn object will shut
 		// down upon receipt of the disconnect event.  On failure, just force a
 		// shutdown manually.
 		if err := c.terminate(); err != nil {
-			go c.shutdown(err)
+			shutdownDelay = 0
+			shutdownErr = err
+		} else {
+			// Force a shutdown in 10 seconds in case we never receive a
+			// disconnect event.
+			shutdownDelay = 10 * time.Second
+			shutdownErr = fmt.Errorf("forced shutdown; disconnect timeout")
 		}
 	} else {
 		if c.connecting {
 			c.connCancel()
 		}
-		go c.shutdown(fmt.Errorf("Stopped"))
+		shutdownDelay = 0
+		shutdownErr = fmt.Errorf("Stopped before connect complete")
 	}
 
+	c.shutdown(shutdownDelay, shutdownErr)
 	return nil
 }
diff --git a/nmxact/nmble/sync.go b/nmxact/nmble/sync.go
new file mode 100644
index 0000000..95cae0f
--- /dev/null
+++ b/nmxact/nmble/sync.go
@@ -0,0 +1,202 @@
+package nmble
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
+)
+
+const syncPollRate = time.Second
+
+type Syncer struct {
+	x           *BleXport
+	stopCh      chan struct{}
+	wg          sync.WaitGroup
+	synced      bool
+	syncBlocker nmxutil.Blocker
+	mtx         sync.Mutex
+
+	resetBcaster nmxutil.Bcaster
+	syncBcaster  nmxutil.Bcaster
+}
+
+func (s *Syncer) Refresh() (bool, error) {
+	r := NewSyncReq()
+	bl, err := s.x.AddListener(SeqKey(r.Seq))
+	if err != nil {
+		return false, err
+	}
+	defer s.x.RemoveListener(bl)
+
+	synced, err := checkSync(s.x, bl, r)
+	if err != nil {
+		return false, err
+	}
+
+	s.setSynced(synced)
+	return synced, nil
+}
+
+func (s *Syncer) checkSyncLoop() {
+	doneCh := make(chan struct{})
+
+	s.wg.Add(1)
+	go func() {
+		defer s.wg.Done()
+
+		s.BlockUntilSynced(nmxutil.DURATION_FOREVER, s.stopCh)
+		close(doneCh)
+	}()
+
+	s.wg.Add(1)
+	go func() {
+		defer s.wg.Done()
+
+		for {
+			s.Refresh()
+
+			select {
+			case <-doneCh:
+				return
+
+			case <-s.stopCh:
+				return
+
+			case <-time.After(syncPollRate):
+			}
+		}
+	}()
+}
+
+func (s *Syncer) setSynced(synced bool) {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
+
+	if synced == s.synced {
+		return
+	}
+
+	s.synced = synced
+	if s.synced {
+		s.syncBlocker.Unblock(nil)
+	} else {
+		s.syncBlocker.Start()
+
+		// Listen for sync loss and reset in the background.
+		s.checkSyncLoop()
+	}
+	s.syncBcaster.Send(s.synced)
+}
+
+func (s *Syncer) addSyncListener() (*Listener, error) {
+	key := TchKey(MSG_TYPE_SYNC_EVT, -1)
+	nmxutil.LogAddListener(3, key, 0, "sync")
+	return s.x.AddListener(key)
+}
+
+func (s *Syncer) addResetListener() (*Listener, error) {
+	key := TchKey(MSG_TYPE_RESET_EVT, -1)
+	nmxutil.LogAddListener(3, key, 0, "reset")
+	return s.x.AddListener(key)
+}
+
+func (s *Syncer) listen() error {
+	errChan := make(chan error)
+
+	s.wg.Add(1)
+	go func() {
+		defer s.wg.Done()
+
+		// Initial actions can cause an error to be returned.
+		syncl, err := s.addSyncListener()
+		if err != nil {
+			errChan <- err
+			close(errChan)
+			return
+		}
+		defer s.x.RemoveListener(syncl)
+
+		resetl, err := s.addResetListener()
+		if err != nil {
+			errChan <- err
+			close(errChan)
+			return
+		}
+		defer s.x.RemoveListener(resetl)
+
+		// Initial actions complete.
+		close(errChan)
+
+		for {
+			select {
+			case <-syncl.ErrChan:
+				// XXX
+			case bm := <-syncl.MsgChan:
+				switch msg := bm.(type) {
+				case *BleSyncEvt:
+					s.setSynced(msg.Synced)
+				}
+
+			case <-resetl.ErrChan:
+				// XXX
+			case bm := <-resetl.MsgChan:
+				switch msg := bm.(type) {
+				case *BleResetEvt:
+					s.setSynced(false)
+					s.resetBcaster.Send(msg.Reason)
+				}
+
+			case <-s.stopCh:
+				return
+			}
+		}
+	}()
+
+	return <-errChan
+}
+
+func (s *Syncer) shutdown() {
+	s.syncBcaster.Clear()
+	s.resetBcaster.Clear()
+	s.syncBlocker.Unblock(nil)
+
+	s.stopCh = nil
+}
+
+func (s *Syncer) Start(x *BleXport) error {
+	s.x = x
+	s.stopCh = make(chan struct{})
+	s.syncBlocker.Start()
+	s.checkSyncLoop()
+	return s.listen()
+}
+
+func (s *Syncer) Stop() error {
+	if s.stopCh == nil {
+		return fmt.Errorf("Syncer already stopped")
+	}
+
+	close(s.stopCh)
+	s.wg.Wait()
+
+	s.shutdown()
+
+	return nil
+}
+
+func (s *Syncer) BlockUntilSynced(timeout time.Duration,
+	stopChan <-chan struct{}) error {
+
+	_, err := s.syncBlocker.Wait(timeout, stopChan)
+	return err
+}
+
+func (s *Syncer) ListenSync() chan interface{} {
+	return s.syncBcaster.Listen()
+}
+
+func (s *Syncer) ListenReset() chan interface{} {
+	return s.resetBcaster.Listen()
+}
diff --git a/nmxact/nmxutil/block.go b/nmxact/nmxutil/block.go
index 45b2e99..3ff5093 100644
--- a/nmxact/nmxutil/block.go
+++ b/nmxact/nmxutil/block.go
@@ -33,7 +33,23 @@ type Blocker struct {
 	val interface{}
 }
 
-func (b *Blocker) Wait(timeout time.Duration) (interface{}, error) {
+func (b *Blocker) unblockNoLock(val interface{}) {
+	if b.ch != nil {
+		b.val = val
+		close(b.ch)
+		b.ch = nil
+	}
+}
+
+func (b *Blocker) startNoLock() {
+	if b.ch == nil {
+		b.ch = make(chan struct{})
+	}
+}
+
+func (b *Blocker) Wait(timeout time.Duration, stopChan <-chan struct{}) (
+	interface{}, error) {
+
 	b.mtx.Lock()
 	ch := b.ch
 	b.mtx.Unlock()
@@ -42,6 +58,10 @@ func (b *Blocker) Wait(timeout time.Duration) (interface{}, error) {
 		return b.val, nil
 	}
 
+	if stopChan == nil {
+		stopChan = make(chan struct{})
+	}
+
 	timer := time.NewTimer(timeout)
 	select {
 	case <-ch:
@@ -49,25 +69,29 @@ func (b *Blocker) Wait(timeout time.Duration) (interface{}, error) {
 		return b.val, nil
 	case <-timer.C:
 		return nil, fmt.Errorf("timeout after %s", timeout.String())
+	case <-stopChan:
+		return nil, fmt.Errorf("aborted")
 	}
 }
 
-func (b *Blocker) Block() {
+func (b *Blocker) Start() {
 	b.mtx.Lock()
 	defer b.mtx.Unlock()
 
-	if b.ch == nil {
-		b.ch = make(chan struct{})
-	}
+	b.startNoLock()
 }
 
 func (b *Blocker) Unblock(val interface{}) {
 	b.mtx.Lock()
 	defer b.mtx.Unlock()
 
-	if b.ch != nil {
-		b.val = val
-		close(b.ch)
-		b.ch = nil
-	}
+	b.unblockNoLock(val)
+}
+
+func (b *Blocker) UnblockAndRestart(val interface{}) {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+
+	b.unblockNoLock(val)
+	b.startNoLock()
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.