You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/07/15 00:44:01 UTC
[mynewt-newtmgr] 02/02: newtmgr - Revendor.
This is an automated email from the ASF dual-hosted git repository.
ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git
commit 16457af3638bdf7a4aee40c5010d8509cfe2ce05
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Fri Jul 14 15:51:17 2017 -0700
newtmgr - Revendor.
---
newtmgr/Godeps/Godeps.json | 48 +++++------
.../newtmgr/nmxact/nmble/ble_fsm.go | 92 ++++++++++++++--------
.../newtmgr/nmxact/nmble/ble_oic_sesn.go | 24 ++++--
.../newtmgr/nmxact/nmble/ble_plain_sesn.go | 42 +++++-----
.../newtmgr/nmxact/nmble/ble_scanner.go | 4 +-
.../newtmgr/nmxact/nmble/ble_util.go | 20 ++---
.../newtmgr/nmxact/nmble/ble_xport.go | 31 +++++---
.../newtmgr/nmxact/nmble/discover.go | 8 +-
.../newtmgr/nmxact/nmble/dispatch.go | 2 +
.../newtmgr/nmxact/nmble/receiver.go | 28 +++----
.../newtmgr/nmxact/nmp/dispatch.go | 10 ++-
.../newtmgr/nmxact/nmxutil/nmxutil.go | 8 ++
.../newtmgr/nmxact/oic/dispatch.go | 17 +++-
.../newtmgr/nmxact/omp/dispatch.go | 8 ++
.../mynewt.apache.org/newtmgr/nmxact/scan/scan.go | 8 --
15 files changed, 211 insertions(+), 139 deletions(-)
diff --git a/newtmgr/Godeps/Godeps.json b/newtmgr/Godeps/Godeps.json
index a89fd26..d205eef 100644
--- a/newtmgr/Godeps/Godeps.json
+++ b/newtmgr/Godeps/Godeps.json
@@ -126,63 +126,63 @@
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/bledefs",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmble",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmp",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmserial",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmxutil",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/oic",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/omp",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/scan",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/sesn",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/udp",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/xact",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
},
{
"ImportPath": "mynewt.apache.org/newtmgr/nmxact/xport",
- "Comment": "mynewt_0_9_0_tag-549-g46f62fa",
- "Rev": "46f62fa33fed18f2962a147e21cf4a9bd6ac7c66"
+ "Comment": "mynewt_0_9_0_tag-552-g62a3d9e",
+ "Rev": "62a3d9ece4c5ad7b8b357d66ee8a61e4e40c4063"
}
]
}
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
index 44a7908..6721eb0 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_fsm.go
@@ -4,6 +4,7 @@ import (
"encoding/hex"
"fmt"
"strings"
+ "sync"
"sync/atomic"
"time"
@@ -73,19 +74,21 @@ type BleFsmParams struct {
type BleFsm struct {
params BleFsmParams
- connHandle uint16
- peerDev BleDev
- nmpSvc *BleSvc
- nmpReqChr *BleChr
- nmpRspChr *BleChr
- attMtu int
- state BleSesnState
- rxer *Receiver
- errFunnel nmxutil.ErrFunnel
- id uint32
+ connHandle uint16
+ peerDev BleDev
+ nmpSvc *BleSvc
+ nmpReqChr *BleChr
+ nmpRspChr *BleChr
+ prevDisconnect BleDisconnectEntry
+ attMtu int
+ state BleSesnState
+ rxer *Receiver
+ errFunnel nmxutil.ErrFunnel
+ id uint32
+ wg sync.WaitGroup
encBcast nmxutil.Bcaster
- disconnectChan chan BleDisconnectEntry
+ disconnectChan chan struct{}
rxNmpChan chan []byte
}
@@ -164,35 +167,44 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
}
}
-// Listens for an error in the state machine. On error, the session is
-// considered disconnected and the error is reported to the client.
-func (bf *BleFsm) listenForError() {
- err := <-bf.errFunnel.Wait()
-
- // Stop listening for NMP responses.
- close(bf.rxNmpChan)
-
- // Remember some fields before we clear them.
- dt := calcDisconnectType(bf.state)
-
+func (bf *BleFsm) shutdown(err error) {
bf.params.Bx.StopWaitingForMaster(bf, err)
- bf.rxer.ErrorAll(err)
+ bf.rxer.RemoveAll("shutdown")
bf.state = SESN_STATE_UNCONNECTED
// Wait for all listeners to get removed.
bf.rxer.WaitUntilNoListeners()
+ bf.wg.Wait()
- bf.disconnectChan <- BleDisconnectEntry{dt, bf.peerDev, err}
+ close(bf.rxNmpChan)
close(bf.disconnectChan)
+}
+
+// Listens for an error in the state machine. On error, the session is
+// considered disconnected and the error is reported to the client.
+func (bf *BleFsm) listenForError() {
+ bf.wg.Add(1)
+
+ go func() {
+ err := <-bf.errFunnel.Wait()
+
+ dt := calcDisconnectType(bf.state)
+ bf.prevDisconnect = BleDisconnectEntry{dt, bf.peerDev, err}
- bf.disconnectChan = make(chan BleDisconnectEntry)
+ bf.wg.Done()
+ bf.shutdown(err)
+ }()
}
// Listens for events in the background.
func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) error {
+ bf.wg.Add(1)
+
go func() {
+ defer bf.wg.Done()
defer bf.rxer.RemoveSeqListener("connect", seq)
+
for {
select {
case err := <-bl.ErrChan:
@@ -254,8 +266,12 @@ func (bf *BleFsm) nmpRspListen() error {
return err
}
+ bf.wg.Add(1)
+
go func() {
+ defer bf.wg.Done()
defer bf.rxer.RemoveBaseListener("nmp-rsp", base)
+
for {
select {
case <-bl.ErrChan:
@@ -629,7 +645,7 @@ func (bf *BleFsm) executeState() (bool, error) {
return false, nil
}
-func (bf *BleFsm) DisconnectChan() <-chan BleDisconnectEntry {
+func (bf *BleFsm) DisconnectChan() <-chan struct{} {
return bf.disconnectChan
}
@@ -637,6 +653,10 @@ func (bf *BleFsm) RxNmpChan() <-chan []byte {
return bf.rxNmpChan
}
+func (bf *BleFsm) PrevDisconnect() BleDisconnectEntry {
+ return bf.prevDisconnect
+}
+
func (bf *BleFsm) startOnce() (bool, error) {
if !bf.IsClosed() {
return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
@@ -644,16 +664,23 @@ func (bf *BleFsm) startOnce() (bool, error) {
bf.state))
}
+ bf.disconnectChan = make(chan struct{})
+ bf.rxNmpChan = make(chan []byte)
+
+ bf.listenForError()
+
for {
retry, err := bf.executeState()
if err != nil {
- bf.errFunnel.Insert(err)
- err = <-bf.errFunnel.Wait()
+ // If stop fails, assume the connection wasn't established and
+ // force an error.
+ if bf.Stop() != nil {
+ bf.errFunnel.Insert(err)
+ }
+ <-bf.disconnectChan
return retry, err
} else if bf.state == SESN_STATE_DONE {
// We are fully connected. Listen for errors in the background.
- go bf.listenForError()
-
return false, nil
}
}
@@ -665,9 +692,6 @@ func (bf *BleFsm) startOnce() (bool, error) {
func (bf *BleFsm) Start() error {
var err error
- bf.disconnectChan = make(chan BleDisconnectEntry)
- bf.rxNmpChan = make(chan []byte)
-
for i := 0; i < bf.params.Central.ConnTries; i++ {
var retry bool
retry, err = bf.startOnce()
@@ -677,6 +701,8 @@ func (bf *BleFsm) Start() error {
}
if err != nil {
+ nmxutil.Assert(!bf.IsOpen())
+ nmxutil.Assert(bf.IsClosed())
return err
}
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
index e1bf28b..bbdafb1 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_oic_sesn.go
@@ -2,6 +2,7 @@ package nmble
import (
"fmt"
+ "sync"
"time"
"github.com/runtimeco/go-coap"
@@ -20,6 +21,7 @@ type BleOicSesn struct {
d *omp.Dispatcher
closeTimeout time.Duration
onCloseCb sesn.OnCloseFn
+ wg sync.WaitGroup
closeChan chan struct{}
}
@@ -76,26 +78,34 @@ func (bos *BleOicSesn) Open() error {
bos.d = d
// Listen for disconnect in the background.
+ bos.wg.Add(1)
go func() {
+ // If the session is being closed, unblock the close() call.
+ defer close(bos.closeChan)
+
// Block until disconnect.
- entry := <-bos.bf.DisconnectChan()
+ <-bos.bf.DisconnectChan()
+ nmxutil.Assert(!bos.IsOpen())
+ pd := bos.bf.PrevDisconnect()
// Signal error to all listeners.
- bos.d.ErrorAll(entry.Err)
+ bos.d.ErrorAll(pd.Err)
bos.d.Stop()
-
- // If the session is being closed, unblock the close() call.
- close(bos.closeChan)
+ bos.wg.Done()
+ bos.wg.Wait()
// Only execute the client's disconnect callback if the disconnect was
// unsolicited.
- if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
- bos.onCloseCb(bos, entry.Err)
+ if pd.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
+ bos.onCloseCb(bos, pd.Err)
}
}()
// Listen for NMP responses in the background.
+ bos.wg.Add(1)
go func() {
+ defer bos.wg.Done()
+
for {
data, ok := <-bos.bf.RxNmpChan()
if !ok {
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
index 782fce0..9268c3d 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_plain_sesn.go
@@ -2,11 +2,13 @@ package nmble
import (
"fmt"
+ "sync"
"time"
"mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/nmp"
+ "mynewt.apache.org/newtmgr/nmxact/nmxutil"
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
@@ -15,6 +17,7 @@ type BlePlainSesn struct {
d *nmp.Dispatcher
closeTimeout time.Duration
onCloseCb sesn.OnCloseFn
+ wg sync.WaitGroup
closeChan chan struct{}
}
@@ -61,25 +64,34 @@ func (bps *BlePlainSesn) Open() error {
bps.d = nmp.NewDispatcher(3)
// Listen for disconnect in the background.
+ bps.wg.Add(1)
go func() {
+ // If the session is being closed, unblock the close() call.
+ defer close(bps.closeChan)
+
// Block until disconnect.
- entry := <-bps.bf.DisconnectChan()
+ <-bps.bf.DisconnectChan()
+ nmxutil.Assert(!bps.IsOpen())
- // Signal error to all listeners.
- bps.d.ErrorAll(entry.Err)
+ pd := bps.bf.PrevDisconnect()
- // If the session is being closed, unblock the close() call.
- close(bps.closeChan)
+ // Signal error to all listeners.
+ bps.d.ErrorAll(pd.Err)
+ bps.wg.Done()
+ bps.wg.Wait()
// Only execute the client's disconnect callback if the disconnect was
// unsolicited.
- if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil {
- bps.onCloseCb(bps, entry.Err)
+ if pd.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil {
+ bps.onCloseCb(bps, pd.Err)
}
}()
// Listen for NMP responses in the background.
+ bps.wg.Add(1)
go func() {
+ defer bps.wg.Done()
+
for {
data, ok := <-bps.bf.RxNmpChan()
if !ok {
@@ -109,22 +121,6 @@ func (bps *BlePlainSesn) IsOpen() bool {
return bps.bf.IsOpen()
}
-// Called by the FSM when a blehostd disconnect event is received.
-func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
- err error) {
-
- bps.d.ErrorAll(err)
-
- // If the session is being closed, unblock the close() call.
- close(bps.closeChan)
-
- // Only execute client's disconnect callback if the disconnect was
- // unsolicited and the session was fully open.
- if dt == FSM_DISCONNECT_TYPE_OPENED && bps.onCloseCb != nil {
- bps.onCloseCb(bps, err)
- }
-}
-
func (bps *BlePlainSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
return nmp.EncodeNmpPlain(m)
}
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go
index da8532c..6d4ee0f 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go
@@ -114,7 +114,9 @@ func (s *BleScanner) scan() (*scan.ScanPeer, error) {
return nil, nil
}
- s.connect(*dev)
+ if err := s.connect(*dev); err != nil {
+ return nil, err
+ }
defer s.bos.Close()
// Now we are connected (and paired if required). Read the peer's hardware
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go
index b6d2eea..3550e33 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go
@@ -305,10 +305,10 @@ func ConnFindXact(x *BleXport, connHandle uint16) (BleConnDesc, error) {
}
bl := NewListener()
- if err := x.Bd.AddListener(base, bl); err != nil {
+ if err := x.AddListener(base, bl); err != nil {
return BleConnDesc{}, err
}
- defer x.Bd.RemoveListener(base)
+ defer x.RemoveListener(base)
return connFind(x, bl, r)
}
@@ -323,10 +323,10 @@ func GenRandAddrXact(x *BleXport) (BleAddr, error) {
}
bl := NewListener()
- if err := x.Bd.AddListener(base, bl); err != nil {
+ if err := x.AddListener(base, bl); err != nil {
return BleAddr{}, err
}
- defer x.Bd.RemoveListener(base)
+ defer x.RemoveListener(base)
return genRandAddr(x, bl, r)
}
@@ -343,10 +343,10 @@ func SetRandAddrXact(x *BleXport, addr BleAddr) error {
}
bl := NewListener()
- if err := x.Bd.AddListener(base, bl); err != nil {
+ if err := x.AddListener(base, bl); err != nil {
return err
}
- defer x.Bd.RemoveListener(base)
+ defer x.RemoveListener(base)
return setRandAddr(x, bl, r)
}
@@ -363,10 +363,10 @@ func SetPreferredMtuXact(x *BleXport, mtu uint16) error {
}
bl := NewListener()
- if err := x.Bd.AddListener(base, bl); err != nil {
+ if err := x.AddListener(base, bl); err != nil {
return err
}
- defer x.Bd.RemoveListener(base)
+ defer x.RemoveListener(base)
return setPreferredMtu(x, bl, r)
}
@@ -382,10 +382,10 @@ func ResetXact(x *BleXport) error {
}
bl := NewListener()
- if err := x.Bd.AddListener(base, bl); err != nil {
+ if err := x.AddListener(base, bl); err != nil {
return err
}
- defer x.Bd.RemoveListener(base)
+ defer x.RemoveListener(base)
return reset(x, bl, r)
}
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go
index 67b2d60..188a78f 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go
@@ -84,7 +84,8 @@ const (
// Implements xport.Xport.
type BleXport struct {
- Bd *Dispatcher
+ cfg XportCfg
+ d *Dispatcher
client *unixchild.Client
state BleXportState
stopChan chan struct{}
@@ -95,13 +96,11 @@ type BleXport struct {
randAddr *BleAddr
stateMtx sync.Mutex
scanner *BleScanner
-
- cfg XportCfg
}
func NewBleXport(cfg XportCfg) (*BleXport, error) {
bx := &BleXport{
- Bd: NewDispatcher(),
+ d: NewDispatcher(),
shutdownChan: make(chan bool),
readyBcast: nmxutil.Bcaster{},
master: nmxutil.NewSingleResource(),
@@ -171,7 +170,7 @@ func (bx *BleXport) addSyncListener() (*Listener, error) {
Seq: BLE_SEQ_NONE,
ConnHandle: -1,
}
- if err := bx.Bd.AddListener(base, bl); err != nil {
+ if err := bx.d.AddListener(base, bl); err != nil {
return nil, err
}
@@ -185,7 +184,7 @@ func (bx *BleXport) removeSyncListener() {
Seq: BLE_SEQ_NONE,
ConnHandle: -1,
}
- bx.Bd.RemoveListener(base)
+ bx.d.RemoveListener(base)
}
func (bx *BleXport) querySyncStatus() (bool, error) {
@@ -207,10 +206,10 @@ func (bx *BleXport) querySyncStatus() (bool, error) {
Seq: req.Seq,
ConnHandle: -1,
}
- if err := bx.Bd.AddListener(base, bl); err != nil {
+ if err := bx.d.AddListener(base, bl); err != nil {
return false, err
}
- defer bx.Bd.RemoveListener(base)
+ defer bx.d.RemoveListener(base)
if err := bx.txNoSync(j); err != nil {
return false, err
@@ -286,7 +285,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
// Indicate an error to all of this transport's listeners. This prevents
// them from blocking endlessly while awaiting a BLE message.
log.Debugf("Stopping BLE dispatcher")
- bx.Bd.ErrorAll(err)
+ bx.d.ErrorAll(err)
synced, err := bx.querySyncStatus()
if err == nil && synced {
@@ -367,13 +366,13 @@ func (bx *BleXport) startOnce() error {
return nmxutil.NewXportError("BLE xport started twice")
}
+ bx.stopChan = make(chan struct{})
+
if err := bx.startUnixChild(); err != nil {
bx.shutdown(true, err)
return err
}
- bx.stopChan = make(chan struct{})
-
// Listen for errors and data from the blehostd process.
go func() {
for {
@@ -386,7 +385,7 @@ func (bx *BleXport) startOnce() error {
case buf := <-bx.client.FromChild:
if len(buf) != 0 {
log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
- bx.Bd.Dispatch(buf)
+ bx.d.Dispatch(buf)
}
case <-bx.stopChan:
@@ -542,6 +541,14 @@ func (bx *BleXport) Tx(data []byte) error {
return bx.txNoSync(data)
}
+func (bx *BleXport) AddListener(base MsgBase, listener *Listener) error {
+ return bx.d.AddListener(base, listener)
+}
+
+func (bx *BleXport) RemoveListener(base MsgBase) *Listener {
+ return bx.d.RemoveListener(base)
+}
+
func (bx *BleXport) RspTimeout() time.Duration {
return bx.cfg.BlehostdRspTimeout
}
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go
index 2ac19ba..c8d4a3f 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go
@@ -40,10 +40,10 @@ func (d *Discoverer) scanCancel() error {
}
bl := NewListener()
- if err := d.params.Bx.Bd.AddListener(base, bl); err != nil {
+ if err := d.params.Bx.AddListener(base, bl); err != nil {
return err
}
- defer d.params.Bx.Bd.RemoveListener(base)
+ defer d.params.Bx.RemoveListener(base)
if err := scanCancel(d.params.Bx, bl, r); err != nil {
return err
@@ -79,10 +79,10 @@ func (d *Discoverer) Start(advRptCb BleAdvRptFn) error {
}
bl := NewListener()
- if err := d.params.Bx.Bd.AddListener(base, bl); err != nil {
+ if err := d.params.Bx.AddListener(base, bl); err != nil {
return err
}
- defer d.params.Bx.Bd.RemoveListener(base)
+ defer d.params.Bx.RemoveListener(base)
d.abortChan = make(chan struct{}, 1)
defer func() { d.abortChan = nil }()
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go
index 435b123..76bd18c 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/dispatch.go
@@ -80,6 +80,8 @@ func (bl *Listener) Close() {
close(bl.TmoChan)
}
+// The dispatcher is the owner of the listeners it points to. Only the
+// dispatcher writes to these listeners.
type Dispatcher struct {
seqMap map[BleSeq]*Listener
baseMap map[MsgBase]*Listener
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go
index 8660393..231ad5e 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/receiver.go
@@ -6,11 +6,15 @@ import (
"mynewt.apache.org/newtmgr/nmxact/nmxutil"
)
+// The receiver never writes to any of its listeners. It only maintains a set
+// of listeners so that their lifetimes can be tracked and to facilitate their
+// removal from the BLE transport.
+
type Receiver struct {
id uint32
bx *BleXport
logDepth int
- bls map[*Listener]struct{}
+ bls map[*Listener]MsgBase
mtx sync.Mutex
wg sync.WaitGroup
}
@@ -20,7 +24,7 @@ func NewReceiver(id uint32, bx *BleXport, logDepth int) *Receiver {
id: id,
bx: bx,
logDepth: logDepth + 3,
- bls: map[*Listener]struct{}{},
+ bls: map[*Listener]MsgBase{},
}
}
@@ -34,11 +38,11 @@ func (r *Receiver) addListener(name string, base MsgBase) (
r.mtx.Lock()
defer r.mtx.Unlock()
- if err := r.bx.Bd.AddListener(base, bl); err != nil {
+ if err := r.bx.AddListener(base, bl); err != nil {
return nil, err
}
- r.bls[bl] = struct{}{}
+ r.bls[bl] = base
r.wg.Add(1)
return bl, nil
@@ -68,7 +72,7 @@ func (r *Receiver) removeListener(name string, base MsgBase) *Listener {
r.mtx.Lock()
defer r.mtx.Unlock()
- bl := r.bx.Bd.RemoveListener(base)
+ bl := r.bx.RemoveListener(base)
delete(r.bls, bl)
if bl != nil {
@@ -93,18 +97,14 @@ func (r *Receiver) RemoveSeqListener(name string, seq BleSeq) {
r.removeListener(name, base)
}
-func (r *Receiver) ErrorAll(err error) {
- if err == nil {
- panic("NIL ERROR")
- }
+func (r *Receiver) RemoveAll(name string) {
r.mtx.Lock()
- defer r.mtx.Unlock()
-
bls := r.bls
- r.bls = map[*Listener]struct{}{}
+ r.bls = map[*Listener]MsgBase{}
+ r.mtx.Unlock()
- for bl, _ := range bls {
- bl.ErrChan <- err
+ for _, base := range bls {
+ r.removeListener(name, base)
}
}
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmp/dispatch.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmp/dispatch.go
index fee372e..9051b5b 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmp/dispatch.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmp/dispatch.go
@@ -53,12 +53,18 @@ func (nl *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
return nl.tmoChan
}
-func (nl *Listener) Stop() {
+func (nl *Listener) Close() {
if nl.timer != nil {
nl.timer.Stop()
}
+
+ close(nl.RspChan)
+ close(nl.ErrChan)
+ close(nl.tmoChan)
}
+// The dispatcher is the owner of the listeners it points to. Only the
+// dispatcher writes to these listeners.
type Dispatcher struct {
seqListenerMap map[uint8]*Listener
reassembler *Reassembler
@@ -97,7 +103,7 @@ func (d *Dispatcher) RemoveListener(seq uint8) *Listener {
nl := d.seqListenerMap[seq]
if nl != nil {
- nl.Stop()
+ nl.Close()
delete(d.seqListenerMap, seq)
}
return nl
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/nmxutil.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/nmxutil.go
index 00e28c4..078a06d 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/nmxutil.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/nmxutil.go
@@ -17,6 +17,8 @@ import (
const DURATION_FOREVER time.Duration = math.MaxInt64
+var Debug bool
+
var nextNmpSeq uint8
var nmpSeqBeenRead bool
var nextOicSeq uint8
@@ -41,6 +43,12 @@ func SetLogLevel(level log.Level) {
ListenLog.Level = level
}
+func Assert(cond bool) {
+ if Debug && !cond {
+ panic("Failed assertion")
+ }
+}
+
func NextNmpSeq() uint8 {
seqMutex.Lock()
defer seqMutex.Unlock()
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/oic/dispatch.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/oic/dispatch.go
index aacc277..810abe7 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/oic/dispatch.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/oic/dispatch.go
@@ -72,6 +72,18 @@ func (ol *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
return ol.tmoChan
}
+func (ol *Listener) Close() {
+ if ol.timer != nil {
+ ol.timer.Stop()
+ }
+
+ close(ol.RspChan)
+ close(ol.ErrChan)
+ close(ol.tmoChan)
+}
+
+// The dispatcher is the owner of the listeners it points to. Only the
+// dispatcher writes to these listeners.
type Dispatcher struct {
tokenListenerMap map[Token]*Listener
reassembler *Reassembler
@@ -124,7 +136,10 @@ func (d *Dispatcher) RemoveListener(token []byte) *Listener {
}
ol := d.tokenListenerMap[ot]
- delete(d.tokenListenerMap, ot)
+ if ol != nil {
+ ol.Close()
+ delete(d.tokenListenerMap, ot)
+ }
return ol
}
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/omp/dispatch.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/omp/dispatch.go
index d0ba7d4..7647048 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/omp/dispatch.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/omp/dispatch.go
@@ -20,16 +20,21 @@
package omp
import (
+ "sync"
+
log "github.com/Sirupsen/logrus"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/oic"
)
+// The dispatcher is the owner of the listeners it points to. Only the
+// dispatcher writes to these listeners.
type Dispatcher struct {
nmpd *nmp.Dispatcher
oicd *oic.Dispatcher
stopCh chan struct{}
+ wg sync.WaitGroup
}
func NewDispatcher(isTcp bool, logDepth int) (*Dispatcher, error) {
@@ -56,8 +61,10 @@ func (r *Dispatcher) addOmpListener() error {
return err
}
+ r.wg.Add(1)
go func() {
defer r.RemoveOicListener(nil)
+ defer r.wg.Done()
for {
select {
@@ -83,6 +90,7 @@ func (r *Dispatcher) addOmpListener() error {
func (r *Dispatcher) Stop() {
r.stopCh <- struct{}{}
+ r.wg.Wait()
}
func (r *Dispatcher) Dispatch(data []byte) bool {
diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go
index 318e777..ba1a081 100644
--- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go
+++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go
@@ -57,14 +57,6 @@ func BleOmpScanCfg(ScanCb ScanFn) Cfg {
}
}
- iotUuid, _ := bledefs.ParseUuid(bledefs.OmpUnsecSvcUuid)
- for _, u128 := range adv.Fields.Uuids128 {
- u := bledefs.BleUuid{U128: u128}
- if bledefs.CompareUuids(u, iotUuid) == 0 {
- return true
- }
- }
-
return false
},
},
--
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.