You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/04/03 23:54:00 UTC

[3/6] incubator-mynewt-newtmgr git commit: nmxact - Automatically restart BLE xport.

nmxact - Automatically restart BLE xport.

More work needs to be done here.  If a client attempts to use the
transport while it is restarting, the attempt immediately fails.

It would be better if:
    * The transmit attempt blocks until the transport is done
      restarting.
    * The transport gives up after X consecutive failures.


Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/2aeb4daa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/2aeb4daa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/2aeb4daa

Branch: refs/heads/master
Commit: 2aeb4daab6e2a13152314c23cd1c79fed5ebbdd2
Parents: db639f9
Author: Christopher Collins <cc...@apache.org>
Authored: Mon Apr 3 16:38:03 2017 -0700
Committer: Christopher Collins <cc...@apache.org>
Committed: Mon Apr 3 16:49:58 2017 -0700

----------------------------------------------------------------------
 nmxact/nmble/ble_xport.go | 167 +++++++++++++++++++++++++++++++----------
 nmxact/nmble/dispatch.go  |  15 ++++
 2 files changed, 144 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/2aeb4daa/nmxact/nmble/ble_xport.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index 16a4128..32ced04 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -53,21 +53,26 @@ const (
 	BLE_XPORT_STATE_STOPPED BleXportState = iota
 	BLE_XPORT_STATE_STARTING
 	BLE_XPORT_STATE_STARTED
+	BLE_XPORT_STATE_STOPPING
 )
 
 // Implements xport.Xport.
 type BleXport struct {
-	Bd     *BleDispatcher
-	client *unixchild.Client
-	state  BleXportState
+	Bd               *BleDispatcher
+	client           *unixchild.Client
+	state            BleXportState
+	stopChan         chan struct{}
+	shutdownChan     chan bool
+	numStopListeners int
 
 	cfg XportCfg
 }
 
 func NewBleXport(cfg XportCfg) (*BleXport, error) {
 	bx := &BleXport{
-		Bd:  NewBleDispatcher(),
-		cfg: cfg,
+		Bd:           NewBleDispatcher(),
+		shutdownChan: make(chan bool),
+		cfg:          cfg,
 	}
 
 	return bx, nil
@@ -177,18 +182,46 @@ func (bx *BleXport) initialSyncCheck() (bool, *BleListener, error) {
 	return synced, bl, nil
 }
 
-func (bx *BleXport) onError(err error) {
-	if !bx.setStateFrom(BLE_XPORT_STATE_STARTED, BLE_XPORT_STATE_STOPPED) &&
-		!bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STOPPED) {
+func (bx *BleXport) shutdown(restart bool, err error) {
+	var fullyStarted bool
 
+	if bx.setStateFrom(BLE_XPORT_STATE_STARTED,
+		BLE_XPORT_STATE_STOPPING) {
+
+		fullyStarted = true
+	} else if bx.setStateFrom(BLE_XPORT_STATE_STARTING,
+		BLE_XPORT_STATE_STOPPING) {
+
+		fullyStarted = false
+	} else {
 		// Stop already in progress.
 		return
 	}
+
+	// Stop the unixchild instance (blehostd + socket).
 	if bx.client != nil {
 		bx.client.Stop()
+
+		// Unblock the unixchild instance.
 		bx.client.FromChild <- nil
 	}
+
+	// Indicate an error to all of this transport's listeners.  This prevents
+	// them from blocking endlessly while awaiting a BLE message.
 	bx.Bd.ErrorAll(err)
+
+	// Stop all of this transport's go routines.
+	for i := 0; i < bx.numStopListeners; i++ {
+		bx.stopChan <- struct{}{}
+	}
+
+	bx.setStateFrom(BLE_XPORT_STATE_STOPPING, BLE_XPORT_STATE_STOPPED)
+
+	// Indicate that the shutdown is complete.  If restarts are enabled on this
+	// transport, this signals that the transport should be started again.
+	if fullyStarted {
+		bx.shutdownChan <- restart
+	}
 }
 
 func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
@@ -202,47 +235,68 @@ func (bx *BleXport) getState() BleXportState {
 }
 
 func (bx *BleXport) Stop() error {
-	bx.onError(nil)
+	bx.shutdown(false, nil)
 	return nil
 }
 
-func (bx *BleXport) Start() error {
+func (bx *BleXport) startOnce() error {
 	if !bx.setStateFrom(BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_STARTING) {
 		return nmxutil.NewXportError("BLE xport started twice")
 	}
 
+	bx.stopChan = make(chan struct{})
+	bx.numStopListeners = 0
+	bx.Bd.Clear()
+
 	bx.createUnixChild()
 	if err := bx.client.Start(); err != nil {
 		if unixchild.IsUcAcceptError(err) {
-			err = nmxutil.NewXportError("blehostd did not connect to socket; " +
-				"controller not attached?")
+			err = nmxutil.NewXportError(
+				"blehostd did not connect to socket; " +
+					"controller not attached?")
 		} else {
+			panic(err.Error())
 			err = nmxutil.NewXportError(
 				"Failed to start child process: " + err.Error())
 		}
-		bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STOPPED)
+		bx.shutdown(true, err)
 		return err
 	}
 
 	go func() {
-		err := <-bx.client.ErrChild
-		err = nmxutil.NewXportError("BLE transport error: " + err.Error())
-		fmt.Printf("%s\n", err.Error())
-		bx.onError(err)
+		bx.numStopListeners++
+		for {
+			select {
+			case err := <-bx.client.ErrChild:
+				err = nmxutil.NewXportError("BLE transport error: " +
+					err.Error())
+				go bx.shutdown(true, err)
+
+			case <-bx.stopChan:
+				return
+			}
+		}
 	}()
 
 	go func() {
+		bx.numStopListeners++
 		for {
-			if b := bx.rx(); b == nil {
-				// The error should have been reported to everyone interested.
-				break
+			select {
+			case buf := <-bx.client.FromChild:
+				if len(buf) != 0 {
+					log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
+					bx.Bd.Dispatch(buf)
+				}
+
+			case <-bx.stopChan:
+				return
 			}
 		}
 	}()
 
 	synced, bl, err := bx.initialSyncCheck()
 	if err != nil {
-		bx.Stop()
+		bx.shutdown(true, err)
 		return err
 	}
 
@@ -253,6 +307,7 @@ func (bx *BleXport) Start() error {
 		for {
 			select {
 			case err := <-bl.ErrChan:
+				bx.shutdown(true, err)
 				return err
 			case bm := <-bl.BleChan:
 				switch msg := bm.(type) {
@@ -262,34 +317,37 @@ func (bx *BleXport) Start() error {
 					}
 				}
 			case <-time.After(bx.cfg.SyncTimeout):
-				bx.Stop()
-				return nmxutil.NewXportError(
+				err := nmxutil.NewXportError(
 					"Timeout waiting for host <-> controller sync")
+				bx.shutdown(true, err)
+				return err
 			}
 		}
 	}
 
 	// Host and controller are synced.  Listen for sync loss in the background.
 	go func() {
+		bx.numStopListeners++
 		for {
 			select {
 			case err := <-bl.ErrChan:
-				bx.onError(err)
-				return
+				go bx.shutdown(true, err)
 			case bm := <-bl.BleChan:
 				switch msg := bm.(type) {
 				case *BleSyncEvt:
 					if !msg.Synced {
-						bx.onError(nmxutil.NewXportError(
+						go bx.shutdown(true, nmxutil.NewXportError(
 							"BLE host <-> controller sync lost"))
-						return
 					}
 				}
+			case <-bx.stopChan:
+				return
 			}
 		}
 	}()
 
 	if !bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STARTED) {
+		bx.shutdown(true, err)
 		return nmxutil.NewXportError(
 			"Internal error; BLE transport in unexpected state")
 	}
@@ -297,6 +355,47 @@ func (bx *BleXport) Start() error {
 	return nil
 }
 
+func (bx *BleXport) Start() error {
+	// Try to start the transport.  If this first attempt fails, report the
+	// error and don't retry.
+	if err := bx.startOnce(); err != nil {
+		log.Debugf("Error starting BLE transport: %s",
+			err.Error())
+		return err
+	}
+
+	// Now that the first start attempt has succeeded, start a restart loop in
+	// the background.
+	go func() {
+		// Block until transport shuts down.
+		restart := <-bx.shutdownChan
+		for {
+			// If restarts are disabled, or if the shutdown was a result of an
+			// explicit stop call (instead of an unexpected error), stop
+			// restarting the transport.
+			if !bx.cfg.BlehostdRestart || !restart {
+				break
+			}
+
+			// Wait a second before the next restart.  This is necessary to
+			// ensure the unix domain socket can be rebound.
+			time.Sleep(time.Second)
+
+			// Attempt to start the transport again.
+			if err := bx.startOnce(); err != nil {
+				// Start attempt failed.
+				log.Debugf("Error starting BLE transport: %s",
+					err.Error())
+			} else {
+				// Success.  Block until the transport shuts down.
+				restart = <-bx.shutdownChan
+			}
+		}
+	}()
+
+	return nil
+}
+
 func (bx *BleXport) txNoSync(data []byte) {
 	log.Debugf("Tx to blehostd:\n%s", hex.Dump(data))
 	bx.client.ToChild <- data
@@ -304,23 +403,15 @@ func (bx *BleXport) txNoSync(data []byte) {
 
 func (bx *BleXport) Tx(data []byte) error {
 	if bx.getState() != BLE_XPORT_STATE_STARTED {
-		return nmxutil.NewXportError("Attempt to transmit before BLE xport " +
-			"fully started")
+		return nmxutil.NewXportError(
+			fmt.Sprintf("Attempt to transmit before BLE xport fully started; "+
+				"state=%d", bx.getState()))
 	}
 
 	bx.txNoSync(data)
 	return nil
 }
 
-func (bx *BleXport) rx() []byte {
-	buf := <-bx.client.FromChild
-	if len(buf) != 0 {
-		log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
-		bx.Bd.Dispatch(buf)
-	}
-	return buf
-}
-
 func (bx *BleXport) RspTimeout() time.Duration {
 	return bx.cfg.BlehostdRspTimeout
 }

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/2aeb4daa/nmxact/nmble/dispatch.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go
index 546fc11..c9ef39e 100644
--- a/nmxact/nmble/dispatch.go
+++ b/nmxact/nmble/dispatch.go
@@ -267,7 +267,10 @@ func (bd *BleDispatcher) Dispatch(data []byte) {
 		return
 	}
 
+	bd.mutex.Lock()
 	_, listener := bd.findListener(base)
+	bd.mutex.Unlock()
+
 	if listener == nil {
 		log.Debugf(
 			"No BLE listener for op=%d type=%d seq=%d connHandle=%d",
@@ -295,3 +298,15 @@ func (bd *BleDispatcher) ErrorAll(err error) {
 		listener.ErrChan <- err
 	}
 }
+
+func (bd *BleDispatcher) Clear() {
+	bd.mutex.Lock()
+	defer bd.mutex.Unlock()
+
+	for s, _ := range bd.seqMap {
+		delete(bd.seqMap, s)
+	}
+	for b, _ := range bd.baseMap {
+		delete(bd.baseMap, b)
+	}
+}