You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/07/14 02:27:43 UTC
[mynewt-newtmgr] 01/02: nmxact - Remove some mutexes
This is an automated email from the ASF dual-hosted git repository.
ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git
commit cf276ea9920ba3429e84648445981402acbd9abd
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Wed Jul 12 18:33:41 2017 -0700
nmxact - Remove some mutexes
---
nmxact/example/ble_loop/ble_loop.go | 12 ++-
nmxact/example/ble_scan/ble_scan.go | 7 +-
nmxact/nmble/ble_act.go | 4 +-
nmxact/nmble/ble_fsm.go | 93 ++++++++++++++---------
nmxact/nmble/ble_oic_sesn.go | 145 +++++++++++-------------------------
nmxact/nmble/ble_plain_sesn.go | 109 +++++++++++++--------------
nmxact/nmble/ble_xport.go | 110 +++++++++++++--------------
nmxact/nmble/discover.go | 2 +-
nmxact/nmble/dispatch.go | 20 +++--
nmxact/nmble/receiver.go | 3 +
nmxact/nmxutil/bcast.go | 43 +++++++++++
nmxact/nmxutil/err_funnel.go | 55 ++------------
12 files changed, 284 insertions(+), 319 deletions(-)
diff --git a/nmxact/example/ble_loop/ble_loop.go b/nmxact/example/ble_loop/ble_loop.go
index 7a930a9..9e79d29 100644
--- a/nmxact/example/ble_loop/ble_loop.go
+++ b/nmxact/example/ble_loop/ble_loop.go
@@ -26,8 +26,11 @@ import (
"syscall"
"time"
+ log "github.com/Sirupsen/logrus"
+
"mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/nmble"
+ "mynewt.apache.org/newtmgr/nmxact/nmxutil"
"mynewt.apache.org/newtmgr/nmxact/sesn"
"mynewt.apache.org/newtmgr/nmxact/xact"
"mynewt.apache.org/newtmgr/nmxact/xport"
@@ -58,11 +61,14 @@ func configExitHandler(x xport.Xport, s sesn.Sesn) {
}
func main() {
+ //nmxutil.SetLogLevel(log.DebugLevel)
+ nmxutil.SetLogLevel(log.InfoLevel)
+
// Initialize the BLE transport.
params := nmble.NewXportCfg()
params.SockPath = "/tmp/blehostd-uds"
- params.BlehostdPath = "blehostd.elf"
- params.DevPath = "/dev/cu.usbmodem142111"
+ params.BlehostdPath = "blehostd"
+ params.DevPath = "/dev/cu.usbmodem142121"
x, err := nmble.NewBleXport(params)
if err != nil {
@@ -83,7 +89,7 @@ func main() {
// * Peer has name "nimble-bleprph"
// * We use a random address.
dev, err := nmble.DiscoverDeviceWithName(
- x, bledefs.BLE_ADDR_TYPE_RANDOM, 10*time.Second, "nimble-bleprph")
+ x, bledefs.BLE_ADDR_TYPE_RANDOM, 10*time.Second, "c4")
if err != nil {
fmt.Fprintf(os.Stderr, "error discovering device: %s\n", err.Error())
os.Exit(1)
diff --git a/nmxact/example/ble_scan/ble_scan.go b/nmxact/example/ble_scan/ble_scan.go
index 73ca24f..3a01146 100644
--- a/nmxact/example/ble_scan/ble_scan.go
+++ b/nmxact/example/ble_scan/ble_scan.go
@@ -70,8 +70,8 @@ func main() {
// Initialize the BLE transport.
params := nmble.NewXportCfg()
params.SockPath = "/tmp/blehostd-uds"
- params.BlehostdPath = "blehostd.elf"
- params.DevPath = "/dev/cu.usbmodem14221"
+ params.BlehostdPath = "blehostd"
+ params.DevPath = "/dev/cu.usbmodem142121"
x, err := nmble.NewBleXport(params)
if err != nil {
@@ -105,9 +105,6 @@ func main() {
for {
sc := scan.BleOmpScanCfg(scanCb)
- sc.Ble.ScanPred = func(adv bledefs.BleAdvReport) bool {
- return adv.Fields.Name != nil && *adv.Fields.Name == "c5"
- }
if err := scanner.Start(sc); err != nil {
fmt.Fprintf(os.Stderr, "error starting scan: %s\n", err.Error())
os.Exit(1)
diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go
index 2c4b71d..e51b8e9 100644
--- a/nmxact/nmble/ble_act.go
+++ b/nmxact/nmble/ble_act.go
@@ -324,8 +324,8 @@ func exchangeMtu(x *BleXport, bl *Listener, r *BleExchangeMtuReq) (
}
}
-func actScan(x *BleXport, bl *Listener, r *BleScanReq,
- abortChan chan struct{}, advRptCb BleAdvRptFn) error {
+func actScan(x *BleXport, bl *Listener, r *BleScanReq, abortChan chan struct{},
+ advRptCb BleAdvRptFn) error {
const rspType = MSG_TYPE_SCAN
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index 52b163e..44a7908 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -48,8 +48,11 @@ const (
FSM_DISCONNECT_TYPE_REQUESTED
)
-type BleRxDataFn func(data []byte)
-type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error)
+type BleDisconnectEntry struct {
+ Dt BleFsmDisconnectType
+ Peer BleDev
+ Err error
+}
type BleFsmParamsCentral struct {
PeerDev BleDev
@@ -58,15 +61,13 @@ type BleFsmParamsCentral struct {
}
type BleFsmParams struct {
- Bx *BleXport
- OwnAddrType BleAddrType
- EncryptWhen BleEncryptWhen
- Central BleFsmParamsCentral
- SvcUuids []BleUuid
- ReqChrUuid BleUuid
- RspChrUuid BleUuid
- RxDataCb BleRxDataFn
- DisconnectCb BleDisconnectFn
+ Bx *BleXport
+ OwnAddrType BleAddrType
+ EncryptWhen BleEncryptWhen
+ Central BleFsmParamsCentral
+ SvcUuids []BleUuid
+ ReqChrUuid BleUuid
+ RspChrUuid BleUuid
}
type BleFsm struct {
@@ -83,8 +84,9 @@ type BleFsm struct {
errFunnel nmxutil.ErrFunnel
id uint32
- // Conveys changes in encrypted state.
- encChan chan error
+ encBcast nmxutil.Bcaster
+ disconnectChan chan BleDisconnectEntry
+ rxNmpChan chan []byte
}
func NewBleFsm(p BleFsmParams) *BleFsm {
@@ -99,7 +101,6 @@ func NewBleFsm(p BleFsmParams) *BleFsm {
bf.errFunnel.AccumDelay = 250 * time.Millisecond
bf.errFunnel.LessCb = fsmErrorLess
- bf.errFunnel.ProcCb = func(err error) { bf.processErr(err) }
return bf
}
@@ -163,7 +164,14 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
}
}
-func (bf *BleFsm) processErr(err error) {
+// Listens for an error in the state machine. On error, the session is
+// considered disconnected and the error is reported to the client.
+func (bf *BleFsm) listenForError() {
+ err := <-bf.errFunnel.Wait()
+
+ // Stop listening for NMP responses.
+ close(bf.rxNmpChan)
+
// Remember some fields before we clear them.
dt := calcDisconnectType(bf.state)
@@ -175,8 +183,10 @@ func (bf *BleFsm) processErr(err error) {
// Wait for all listeners to get removed.
bf.rxer.WaitUntilNoListeners()
- bf.errFunnel.Reset()
- bf.params.DisconnectCb(dt, bf.peerDev, err)
+ bf.disconnectChan <- BleDisconnectEntry{dt, bf.peerDev, err}
+ close(bf.disconnectChan)
+
+ bf.disconnectChan = make(chan BleDisconnectEntry)
}
// Listens for events in the background.
@@ -214,9 +224,9 @@ func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) error {
log.Debugf("Connection encrypted; conn_handle=%d",
msg.ConnHandle)
}
- if bf.encChan != nil {
- bf.encChan <- err
- }
+
+ // Notify any listeners of the encryption change event.
+ bf.encBcast.SendAndClear(err)
case *BleDisconnectEvt:
err := bf.disconnectError(msg.Reason)
@@ -259,7 +269,7 @@ func (bf *BleFsm) nmpRspListen() error {
if bf.nmpRspChr != nil &&
msg.AttrHandle == bf.nmpRspChr.ValHandle {
- bf.params.RxDataCb(msg.Data.Bytes)
+ bf.rxNmpChan <- msg.Data.Bytes
}
}
}
@@ -437,16 +447,14 @@ func (bf *BleFsm) encInitiate() error {
}
defer bf.rxer.RemoveSeqListener("enc-initiate", r.Seq)
- bf.encChan = make(chan error)
- defer func() { bf.encChan = nil }()
-
// Initiate the encryption procedure.
if err := encInitiate(bf.params.Bx, bl, r); err != nil {
return err
}
// Block until the procedure completes.
- return <-bf.encChan
+ itf := <-bf.encBcast.Listen()
+ return itf.(error)
}
func (bf *BleFsm) discAllChrs() error {
@@ -621,6 +629,14 @@ func (bf *BleFsm) executeState() (bool, error) {
return false, nil
}
+func (bf *BleFsm) DisconnectChan() <-chan BleDisconnectEntry {
+ return bf.disconnectChan
+}
+
+func (bf *BleFsm) RxNmpChan() <-chan []byte {
+ return bf.rxNmpChan
+}
+
func (bf *BleFsm) startOnce() (bool, error) {
if !bf.IsClosed() {
return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
@@ -628,15 +644,16 @@ func (bf *BleFsm) startOnce() (bool, error) {
bf.state))
}
- bf.errFunnel.Start()
-
for {
retry, err := bf.executeState()
if err != nil {
bf.errFunnel.Insert(err)
- err = bf.errFunnel.Wait()
+ err = <-bf.errFunnel.Wait()
return retry, err
} else if bf.state == SESN_STATE_DONE {
+ // We are fully connected. Listen for errors in the background.
+ go bf.listenForError()
+
return false, nil
}
}
@@ -648,6 +665,9 @@ func (bf *BleFsm) startOnce() (bool, error) {
func (bf *BleFsm) Start() error {
var err error
+ bf.disconnectChan = make(chan BleDisconnectEntry)
+ bf.rxNmpChan = make(chan []byte)
+
for i := 0; i < bf.params.Central.ConnTries; i++ {
var retry bool
retry, err = bf.startOnce()
@@ -656,12 +676,16 @@ func (bf *BleFsm) Start() error {
}
}
- return err
+ if err != nil {
+ return err
+ }
+
+ return nil
}
// @return bool true if stop complete;
// false if disconnect is now pending.
-func (bf *BleFsm) Stop() (bool, error) {
+func (bf *BleFsm) Stop() error {
state := bf.state
switch state {
@@ -669,19 +693,18 @@ func (bf *BleFsm) Stop() (bool, error) {
SESN_STATE_TERMINATING,
SESN_STATE_CONN_CANCELLING:
- return false,
- bf.closedError("Attempt to close an unopened BLE session")
+ return bf.closedError("Attempt to close an unopened BLE session")
case SESN_STATE_CONNECTING:
bf.connCancel()
bf.errFunnel.Insert(fmt.Errorf("Connection attempt cancelled"))
- return false, nil
+ return nil
default:
if err := bf.terminate(); err != nil {
- return false, err
+ return err
}
- return false, nil
+ return nil
}
}
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index 9db1c84..e1bf28b 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -2,7 +2,6 @@ package nmble
import (
"fmt"
- "sync"
"time"
"github.com/runtimeco/go-coap"
@@ -22,8 +21,7 @@ type BleOicSesn struct {
closeTimeout time.Duration
onCloseCb sesn.OnCloseFn
- closeChan chan error
- mtx sync.Mutex
+ closeChan chan struct{}
}
func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
@@ -52,136 +50,81 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
ReqChrUuid: reqChrUuid,
RspChrUuid: rspChrUuid,
EncryptWhen: cfg.Ble.EncryptWhen,
- RxDataCb: func(d []byte) { bos.onRxNmp(d) },
- DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
- bos.onDisconnect(dt, p, e)
- },
})
return bos
}
-// Returns true if a new channel was assigned.
-func (bos *BleOicSesn) setCloseChan() error {
- bos.mtx.Lock()
- defer bos.mtx.Unlock()
-
- if bos.closeChan != nil {
- return fmt.Errorf("Multiple listeners waiting for session to close")
- }
-
- bos.closeChan = make(chan error, 1)
- return nil
-}
-
-func (bos *BleOicSesn) clearCloseChan() {
- bos.mtx.Lock()
- defer bos.mtx.Unlock()
-
- bos.closeChan = nil
+func (bos *BleOicSesn) AbortRx(seq uint8) error {
+ return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted"))
}
-func (bos *BleOicSesn) listenForClose(timeout time.Duration) error {
- select {
- case <-bos.closeChan:
- return nil
- case <-time.After(timeout):
- // Session never closed.
- return fmt.Errorf("Timeout while waiting for session to close")
- }
-}
+func (bos *BleOicSesn) Open() error {
+ // This channel gets closed when the session closes.
+ bos.closeChan = make(chan struct{})
-func (bos *BleOicSesn) blockUntilClosed(timeout time.Duration) error {
- if err := bos.setCloseChan(); err != nil {
+ if err := bos.bf.Start(); err != nil {
+ close(bos.closeChan)
return err
}
- defer bos.clearCloseChan()
- // If the session is already closed, we're done.
- if bos.bf.IsClosed() {
- return nil
- }
-
- // Block until close completes or times out.
- return bos.listenForClose(timeout)
-}
-
-func (bos *BleOicSesn) AbortRx(seq uint8) error {
- return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted"))
-}
-
-func (bos *BleOicSesn) Open() error {
d, err := omp.NewDispatcher(true, 3)
if err != nil {
+ close(bos.closeChan)
return err
}
bos.d = d
- if err := bos.bf.Start(); err != nil {
+ // Listen for disconnect in the background.
+ go func() {
+ // Block until disconnect.
+ entry := <-bos.bf.DisconnectChan()
+
+ // Signal error to all listeners.
+ bos.d.ErrorAll(entry.Err)
bos.d.Stop()
- return err
- }
+
+ // If the session is being closed, unblock the close() call.
+ close(bos.closeChan)
+
+ // Only execute the client's disconnect callback if the disconnect was
+ // unsolicited.
+ if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
+ bos.onCloseCb(bos, entry.Err)
+ }
+ }()
+
+ // Listen for NMP responses in the background.
+ go func() {
+ for {
+ data, ok := <-bos.bf.RxNmpChan()
+ if !ok {
+ // Disconnected.
+ return
+ } else {
+ bos.d.Dispatch(data)
+ }
+ }
+ }()
+
return nil
}
func (bos *BleOicSesn) Close() error {
- if err := bos.setCloseChan(); err != nil {
- return err
- }
- defer bos.clearCloseChan()
-
- done, err := bos.bf.Stop()
+ err := bos.bf.Stop()
if err != nil {
return err
}
- if done {
- // Close complete.
- return nil
- }
-
- // Block until close completes or times out.
- return bos.listenForClose(bos.closeTimeout)
+ // Block until close completes.
+ <-bos.closeChan
+ return nil
}
func (bos *BleOicSesn) IsOpen() bool {
return bos.bf.IsOpen()
}
-func (bos *BleOicSesn) onRxNmp(data []byte) {
- bos.d.Dispatch(data)
-}
-
-// Called by the FSM when a blehostd disconnect event is received.
-func (bos *BleOicSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
- err error) {
-
- bos.d.ErrorAll(err)
-
- bos.mtx.Lock()
-
- // If the session is being closed, unblock the close() call.
- if bos.closeChan != nil {
- bos.closeChan <- err
- }
-
- bos.mtx.Unlock()
-
- // Only stop the dispatcher and execute client's disconnect callback if the
- // disconnect was unsolicited and the session was fully open. If the
- // session wasn't fully open, the dispatcher will get stopped when the fsm
- // start function returns an error (right after this function returns).
- if dt == FSM_DISCONNECT_TYPE_OPENED || dt == FSM_DISCONNECT_TYPE_REQUESTED {
- bos.d.Stop()
- }
-
- if dt == FSM_DISCONNECT_TYPE_OPENED {
- if bos.onCloseCb != nil {
- bos.onCloseCb(bos, err)
- }
- }
-}
-
func (bos *BleOicSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
return omp.EncodeOmpTcp(m)
}
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index b69d4f7..782fce0 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -16,25 +16,17 @@ type BlePlainSesn struct {
closeTimeout time.Duration
onCloseCb sesn.OnCloseFn
- closeChan chan error
+ closeChan chan struct{}
}
func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
bps := &BlePlainSesn{
- d: nmp.NewDispatcher(1),
closeTimeout: cfg.Ble.CloseTimeout,
onCloseCb: cfg.OnCloseCb,
}
- svcUuid, err := ParseUuid(NmpPlainSvcUuid)
- if err != nil {
- panic(err.Error())
- }
-
- chrUuid, err := ParseUuid(NmpPlainChrUuid)
- if err != nil {
- panic(err.Error())
- }
+ svcUuid, _ := ParseUuid(NmpPlainSvcUuid)
+ chrUuid, _ := ParseUuid(NmpPlainChrUuid)
bps.bf = NewBleFsm(BleFsmParams{
Bx: bx,
@@ -48,84 +40,83 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
ReqChrUuid: chrUuid,
RspChrUuid: chrUuid,
EncryptWhen: cfg.Ble.EncryptWhen,
- RxDataCb: func(d []byte) { bps.onRxNmp(d) },
- DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
- bps.onDisconnect(dt, p, e)
- },
})
return bps
}
-func (bps *BlePlainSesn) setCloseChan() error {
- if bps.closeChan != nil {
- return fmt.Errorf("Multiple listeners waiting for session to close")
- }
-
- bps.closeChan = make(chan error, 1)
- return nil
-}
-
-func (bps *BlePlainSesn) clearCloseChan() {
- bps.closeChan = nil
-}
-
-func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error {
- select {
- case <-bps.closeChan:
- return nil
- case <-time.After(timeout):
- // Session never closed.
- return fmt.Errorf("Timeout while waiting for session to close")
- }
-}
-
func (bps *BlePlainSesn) AbortRx(seq uint8) error {
return bps.d.ErrorOne(seq, fmt.Errorf("Rx aborted"))
}
func (bps *BlePlainSesn) Open() error {
- return bps.bf.Start()
-}
+ // This channel gets closed when the session closes.
+ bps.closeChan = make(chan struct{})
-func (bps *BlePlainSesn) Close() error {
- if err := bps.setCloseChan(); err != nil {
+ if err := bps.bf.Start(); err != nil {
+ close(bps.closeChan)
return err
}
- defer bps.clearCloseChan()
- done, err := bps.bf.Stop()
+ bps.d = nmp.NewDispatcher(3)
+
+ // Listen for disconnect in the background.
+ go func() {
+ // Block until disconnect.
+ entry := <-bps.bf.DisconnectChan()
+
+ // Signal error to all listeners.
+ bps.d.ErrorAll(entry.Err)
+
+ // If the session is being closed, unblock the close() call.
+ close(bps.closeChan)
+
+ // Only execute the client's disconnect callback if the disconnect was
+ // unsolicited.
+ if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil {
+ bps.onCloseCb(bps, entry.Err)
+ }
+ }()
+
+ // Listen for NMP responses in the background.
+ go func() {
+ for {
+ data, ok := <-bps.bf.RxNmpChan()
+ if !ok {
+ // Disconnected.
+ return
+ } else {
+ bps.d.Dispatch(data)
+ }
+ }
+ }()
+
+ return nil
+}
+
+func (bps *BlePlainSesn) Close() error {
+ err := bps.bf.Stop()
if err != nil {
return err
}
- if done {
- // Close complete.
- return nil
- }
-
- // Block until close completes or times out.
- return bps.listenForClose(bps.closeTimeout)
+ // Block until close completes.
+ <-bps.closeChan
+ return nil
}
func (bps *BlePlainSesn) IsOpen() bool {
return bps.bf.IsOpen()
}
-func (bps *BlePlainSesn) onRxNmp(data []byte) {
- bps.d.Dispatch(data)
-}
-
// Called by the FSM when a blehostd disconnect event is received.
func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
err error) {
bps.d.ErrorAll(err)
- // If someone is waiting for the session to close, unblock them.
- if bps.closeChan != nil {
- bps.closeChan <- err
- }
+ // If the session is being closed, unblock the close() call.
+ close(bps.closeChan)
// Only execute client's disconnect callback if the disconnect was
// unsolicited and the session was fully open.
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index db0a8e3..67b2d60 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -84,19 +84,17 @@ const (
// Implements xport.Xport.
type BleXport struct {
- Bd *Dispatcher
- client *unixchild.Client
- state BleXportState
- stopChan chan struct{}
- numStopListeners int
- shutdownChan chan bool
- readyChan chan error
- numReadyListeners int
- master nmxutil.SingleResource
- slave nmxutil.SingleResource
- randAddr *BleAddr
- mtx sync.Mutex
- scanner *BleScanner
+ Bd *Dispatcher
+ client *unixchild.Client
+ state BleXportState
+ stopChan chan struct{}
+ shutdownChan chan bool
+ readyBcast nmxutil.Bcaster
+ master nmxutil.SingleResource
+ slave nmxutil.SingleResource
+ randAddr *BleAddr
+ stateMtx sync.Mutex
+ scanner *BleScanner
cfg XportCfg
}
@@ -105,7 +103,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
bx := &BleXport{
Bd: NewDispatcher(),
shutdownChan: make(chan bool),
- readyChan: make(chan error),
+ readyBcast: nmxutil.Bcaster{},
master: nmxutil.NewSingleResource(),
slave: nmxutil.NewSingleResource(),
cfg: cfg,
@@ -114,7 +112,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
return bx, nil
}
-func (bx *BleXport) createUnixChild() {
+func (bx *BleXport) startUnixChild() error {
config := unixchild.Config{
SockPath: bx.cfg.SockPath,
ChildPath: bx.cfg.BlehostdPath,
@@ -125,6 +123,20 @@ func (bx *BleXport) createUnixChild() {
}
bx.client = unixchild.New(config)
+
+ if err := bx.client.Start(); err != nil {
+ if unixchild.IsUcAcceptError(err) {
+ err = nmxutil.NewXportError(
+ "blehostd did not connect to socket; " +
+ "controller not attached?")
+ } else {
+ err = nmxutil.NewXportError(
+ "Failed to start child process: " + err.Error())
+ }
+ return err
+ }
+
+ return nil
}
func (bx *BleXport) BuildScanner() (scan.Scanner, error) {
@@ -240,7 +252,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
log.Debugf("Shutting down BLE transport")
- bx.mtx.Lock()
+ bx.stateMtx.Lock()
var fullyStarted bool
var already bool
@@ -260,7 +272,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
bx.state = BLE_XPORT_STATE_STOPPING
}
- bx.mtx.Unlock()
+ bx.stateMtx.Unlock()
if already {
// Shutdown already in progress.
@@ -283,10 +295,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
}
// Stop all of this transport's go routines.
- log.Debugf("Waiting for BLE transport goroutines to complete")
- for i := 0; i < bx.numStopListeners; i++ {
- bx.stopChan <- struct{}{}
- }
+ close(bx.stopChan)
// Stop the unixchild instance (blehostd + socket).
if bx.client != nil {
@@ -304,37 +313,32 @@ func (bx *BleXport) shutdown(restart bool, err error) {
}
func (bx *BleXport) blockUntilReady() error {
- bx.mtx.Lock()
+ var ch chan interface{}
+
+ bx.stateMtx.Lock()
switch bx.state {
case BLE_XPORT_STATE_STARTED:
// Already started; don't block.
- bx.mtx.Unlock()
+ bx.stateMtx.Unlock()
return nil
case BLE_XPORT_STATE_DORMANT:
// Not in the process of starting; the user will be waiting forever.
- bx.mtx.Unlock()
+ bx.stateMtx.Unlock()
return fmt.Errorf("Attempt to use BLE transport without starting it")
default:
+ ch = bx.readyBcast.Listen()
}
+ bx.stateMtx.Unlock()
- bx.numReadyListeners++
- bx.mtx.Unlock()
-
- return <-bx.readyChan
-}
-
-func (bx *BleXport) notifyReadyListeners(err error) {
- for i := 0; i < bx.numReadyListeners; i++ {
- bx.readyChan <- err
- }
- bx.numReadyListeners = 0
+ itf := <-ch
+ return itf.(error)
}
func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
- bx.mtx.Lock()
- defer bx.mtx.Unlock()
+ bx.stateMtx.Lock()
+ defer bx.stateMtx.Unlock()
if bx.state != from {
return false
@@ -343,9 +347,10 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
bx.state = to
switch bx.state {
case BLE_XPORT_STATE_STARTED:
- bx.notifyReadyListeners(nil)
+ bx.readyBcast.SendAndClear(nil)
case BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_DORMANT:
- bx.notifyReadyListeners(nmxutil.NewXportError("BLE transport stopped"))
+ bx.readyBcast.SendAndClear(
+ nmxutil.NewXportError("BLE transport stopped"))
default:
}
@@ -362,32 +367,21 @@ func (bx *BleXport) startOnce() error {
return nmxutil.NewXportError("BLE xport started twice")
}
- bx.stopChan = make(chan struct{})
- bx.numStopListeners = 0
-
- bx.createUnixChild()
- if err := bx.client.Start(); err != nil {
- if unixchild.IsUcAcceptError(err) {
- err = nmxutil.NewXportError(
- "blehostd did not connect to socket; " +
- "controller not attached?")
- } else {
- err = nmxutil.NewXportError(
- "Failed to start child process: " + err.Error())
- }
+ if err := bx.startUnixChild(); err != nil {
bx.shutdown(true, err)
return err
}
+ bx.stopChan = make(chan struct{})
+
// Listen for errors and data from the blehostd process.
go func() {
- bx.numStopListeners++
for {
select {
case err := <-bx.client.ErrChild:
err = nmxutil.NewXportError("BLE transport error: " +
err.Error())
- go bx.shutdown(true, err)
+ bx.shutdown(true, err)
case buf := <-bx.client.FromChild:
if len(buf) != 0 {
@@ -433,16 +427,16 @@ func (bx *BleXport) startOnce() error {
// Host and controller are synced. Listen for sync loss in the background.
go func() {
- bx.numStopListeners++
for {
select {
case err := <-bl.ErrChan:
- go bx.shutdown(true, err)
+ bx.shutdown(true, err)
+ return
case bm := <-bl.MsgChan:
switch msg := bm.(type) {
case *BleSyncEvt:
if !msg.Synced {
- go bx.shutdown(true, nmxutil.NewXportError(
+ bx.shutdown(true, nmxutil.NewXportError(
"BLE host <-> controller sync lost"))
}
}
@@ -452,7 +446,7 @@ func (bx *BleXport) startOnce() error {
}
}()
- // Generate a new random address is none was specified.
+ // Generate a new random address if none was specified.
if bx.randAddr == nil {
addr, err := GenRandAddrXact(bx)
if err != nil {
diff --git a/nmxact/nmble/discover.go b/nmxact/nmble/discover.go
index 3858e75..2ac19ba 100644
--- a/nmxact/nmble/discover.go
+++ b/nmxact/nmble/discover.go
@@ -108,7 +108,7 @@ func (d *Discoverer) Stop() error {
return nmxutil.NewAlreadyError("Attempt to stop inactive discoverer")
}
- ch <- struct{}{}
+ close(ch)
return nil
}
diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go
index ba59f81..435b123 100644
--- a/nmxact/nmble/dispatch.go
+++ b/nmxact/nmble/dispatch.go
@@ -55,7 +55,7 @@ type Listener struct {
func NewListener() *Listener {
return &Listener{
MsgChan: make(chan Msg, 16),
- ErrChan: make(chan error, 4),
+ ErrChan: make(chan error, 1),
TmoChan: make(chan time.Time, 1),
}
}
@@ -70,10 +70,14 @@ func (bl *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
return bl.TmoChan
}
-func (bl *Listener) Stop() {
+func (bl *Listener) Close() {
if bl.timer != nil {
bl.timer.Stop()
}
+
+ close(bl.MsgChan)
+ close(bl.ErrChan)
+ close(bl.TmoChan)
}
type Dispatcher struct {
@@ -234,7 +238,7 @@ func (d *Dispatcher) RemoveListener(base MsgBase) *Listener {
base, bl := d.findListener(base)
if bl != nil {
- bl.Stop()
+ bl.Close()
if base.Seq != BLE_SEQ_NONE {
delete(d.seqMap, base.Seq)
} else {
@@ -264,8 +268,8 @@ func decodeMsg(data []byte) (MsgBase, Msg, error) {
cb := msgCtorMap[opTypePair]
if cb == nil {
return base, nil, fmt.Errorf(
- "Unrecognized op+type pair:") // %s, %s",
- //MsgOpToString(base.Op), MsgTypeToString(base.Type))
+ "Unrecognized op+type pair: %s, %s",
+ MsgOpToString(base.Op), MsgTypeToString(base.Type))
}
msg := cb()
@@ -298,6 +302,10 @@ func (d *Dispatcher) Dispatch(data []byte) {
}
func (d *Dispatcher) ErrorAll(err error) {
+ if err == nil {
+ panic("NIL ERROR")
+ }
+
d.mtx.Lock()
m1 := d.seqMap
@@ -310,8 +318,10 @@ func (d *Dispatcher) ErrorAll(err error) {
for _, bl := range m1 {
bl.ErrChan <- err
+ bl.Close()
}
for _, bl := range m2 {
bl.ErrChan <- err
+ bl.Close()
}
}
diff --git a/nmxact/nmble/receiver.go b/nmxact/nmble/receiver.go
index d089ba8..8660393 100644
--- a/nmxact/nmble/receiver.go
+++ b/nmxact/nmble/receiver.go
@@ -94,6 +94,9 @@ func (r *Receiver) RemoveSeqListener(name string, seq BleSeq) {
}
func (r *Receiver) ErrorAll(err error) {
+ if err == nil {
+ panic("NIL ERROR")
+ }
r.mtx.Lock()
defer r.mtx.Unlock()
diff --git a/nmxact/nmxutil/bcast.go b/nmxact/nmxutil/bcast.go
new file mode 100644
index 0000000..a7d68c0
--- /dev/null
+++ b/nmxact/nmxutil/bcast.go
@@ -0,0 +1,43 @@
+package nmxutil
+
+import (
+ "sync"
+)
+
+type Bcaster struct {
+ chs [](chan interface{})
+ mtx sync.Mutex
+}
+
+func (b *Bcaster) Listen() chan interface{} {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+
+ ch := make(chan interface{})
+ b.chs = append(b.chs, ch)
+
+ return ch
+}
+
+func (b *Bcaster) Send(val interface{}) {
+ b.mtx.Lock()
+ chs := b.chs
+ b.mtx.Unlock()
+
+ for _, ch := range chs {
+ ch <- val
+ close(ch)
+ }
+}
+
+func (b *Bcaster) Clear() {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+
+ b.chs = nil
+}
+
+func (b *Bcaster) SendAndClear(val interface{}) {
+ b.Send(val)
+ b.Clear()
+}
diff --git a/nmxact/nmxutil/err_funnel.go b/nmxact/nmxutil/err_funnel.go
index 58b0ca2..6d6b5a3 100644
--- a/nmxact/nmxutil/err_funnel.go
+++ b/nmxact/nmxutil/err_funnel.go
@@ -1,7 +1,6 @@
package nmxutil
import (
- "fmt"
"sync"
"time"
)
@@ -13,26 +12,15 @@ type ErrProcFn func(err error)
// reported.
type ErrFunnel struct {
LessCb ErrLessFn
- ProcCb ErrProcFn
AccumDelay time.Duration
mtx sync.Mutex
resetMtx sync.Mutex
curErr error
errTimer *time.Timer
- started bool
waiters [](chan error)
}
-func (f *ErrFunnel) Start() {
- f.resetMtx.Lock()
-
- f.mtx.Lock()
- defer f.mtx.Unlock()
-
- f.started = true
-}
-
func (f *ErrFunnel) Insert(err error) {
if err == nil {
panic("ErrFunnel nil insert")
@@ -41,10 +29,6 @@ func (f *ErrFunnel) Insert(err error) {
f.mtx.Lock()
defer f.mtx.Unlock()
- if !f.started {
- panic("ErrFunnel insert without start")
- }
-
if f.curErr == nil {
f.curErr = err
f.errTimer = time.AfterFunc(f.AccumDelay, func() {
@@ -61,18 +45,6 @@ func (f *ErrFunnel) Insert(err error) {
}
}
-func (f *ErrFunnel) Reset() {
- f.mtx.Lock()
- defer f.mtx.Unlock()
-
- if f.started {
- f.started = false
- f.curErr = nil
- f.errTimer.Stop()
- f.resetMtx.Unlock()
- }
-}
-
func (f *ErrFunnel) timerExp() {
f.mtx.Lock()
@@ -88,35 +60,18 @@ func (f *ErrFunnel) timerExp() {
panic("ErrFunnel timer expired but no error")
}
- f.ProcCb(err)
-
for _, w := range waiters {
w <- err
+ close(w)
}
}
-func (f *ErrFunnel) Wait() error {
- var err error
- var c chan error
+func (f *ErrFunnel) Wait() chan error {
+ c := make(chan error)
f.mtx.Lock()
-
- if !f.started {
- if f.curErr == nil {
- err = fmt.Errorf("Wait on unstarted ErrFunnel")
- } else {
- err = f.curErr
- }
- } else {
- c = make(chan error)
- f.waiters = append(f.waiters, c)
- }
-
+ f.waiters = append(f.waiters, c)
f.mtx.Unlock()
- if err != nil {
- return err
- } else {
- return <-c
- }
+ return c
}
--
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.