You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/07/14 22:50:04 UTC
[mynewt-newtmgr] branch master updated (854a8d9 -> 097bb15)
This is an automated email from the ASF dual-hosted git repository.
ccollins pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git.
from 854a8d9 newtmgr - revendor
new d748b02 nmxact - Stop scanning for Iotivity UUID.
new 097bb15 nmxact - Some more concurrency fixes.
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
nmxact/nmble/ble_fsm.go | 90 +++++++++++++++++++++++++++---------------
nmxact/nmble/ble_oic_sesn.go | 17 ++++++--
nmxact/nmble/ble_plain_sesn.go | 9 +++--
nmxact/nmble/ble_scanner.go | 4 +-
nmxact/nmble/ble_xport.go | 4 +-
nmxact/nmxutil/nmxutil.go | 8 ++++
nmxact/scan/scan.go | 8 ----
7 files changed, 89 insertions(+), 51 deletions(-)
--
To stop receiving notification emails like this one, please contact
['"commits@mynewt.apache.org" <co...@mynewt.apache.org>'].
[mynewt-newtmgr] 01/02: nmxact - Stop scanning for Iotivity UUID.
Posted by cc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git
commit d748b023adde1a2be5462652ba0b8cab9626e224
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Fri Jul 14 15:01:55 2017 -0700
nmxact - Stop scanning for Iotivity UUID.
---
nmxact/scan/scan.go | 8 --------
1 file changed, 8 deletions(-)
diff --git a/nmxact/scan/scan.go b/nmxact/scan/scan.go
index 318e777..ba1a081 100644
--- a/nmxact/scan/scan.go
+++ b/nmxact/scan/scan.go
@@ -57,14 +57,6 @@ func BleOmpScanCfg(ScanCb ScanFn) Cfg {
}
}
- iotUuid, _ := bledefs.ParseUuid(bledefs.OmpUnsecSvcUuid)
- for _, u128 := range adv.Fields.Uuids128 {
- u := bledefs.BleUuid{U128: u128}
- if bledefs.CompareUuids(u, iotUuid) == 0 {
- return true
- }
- }
-
return false
},
},
--
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.
[mynewt-newtmgr] 02/02: nmxact - Some more concurrency fixes.
Posted by cc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git
commit 097bb1581655f130bbc9d9519ac6d3b33025edeb
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Fri Jul 14 15:02:15 2017 -0700
nmxact - Some more concurrency fixes.
---
nmxact/nmble/ble_fsm.go | 90 +++++++++++++++++++++++++++---------------
nmxact/nmble/ble_oic_sesn.go | 17 ++++++--
nmxact/nmble/ble_plain_sesn.go | 9 +++--
nmxact/nmble/ble_scanner.go | 4 +-
nmxact/nmble/ble_xport.go | 4 +-
nmxact/nmxutil/nmxutil.go | 8 ++++
6 files changed, 89 insertions(+), 43 deletions(-)
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index 44a7908..4bf20b3 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -4,6 +4,7 @@ import (
"encoding/hex"
"fmt"
"strings"
+ "sync"
"sync/atomic"
"time"
@@ -73,19 +74,21 @@ type BleFsmParams struct {
type BleFsm struct {
params BleFsmParams
- connHandle uint16
- peerDev BleDev
- nmpSvc *BleSvc
- nmpReqChr *BleChr
- nmpRspChr *BleChr
- attMtu int
- state BleSesnState
- rxer *Receiver
- errFunnel nmxutil.ErrFunnel
- id uint32
+ connHandle uint16
+ peerDev BleDev
+ nmpSvc *BleSvc
+ nmpReqChr *BleChr
+ nmpRspChr *BleChr
+ prevDisconnect BleDisconnectEntry
+ attMtu int
+ state BleSesnState
+ rxer *Receiver
+ errFunnel nmxutil.ErrFunnel
+ id uint32
+ wg sync.WaitGroup
encBcast nmxutil.Bcaster
- disconnectChan chan BleDisconnectEntry
+ disconnectChan chan struct{}
rxNmpChan chan []byte
}
@@ -164,17 +167,7 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType {
}
}
-// Listens for an error in the state machine. On error, the session is
-// considered disconnected and the error is reported to the client.
-func (bf *BleFsm) listenForError() {
- err := <-bf.errFunnel.Wait()
-
- // Stop listening for NMP responses.
- close(bf.rxNmpChan)
-
- // Remember some fields before we clear them.
- dt := calcDisconnectType(bf.state)
-
+func (bf *BleFsm) shutdown(err error) {
bf.params.Bx.StopWaitingForMaster(bf, err)
bf.rxer.ErrorAll(err)
@@ -182,17 +175,36 @@ func (bf *BleFsm) listenForError() {
// Wait for all listeners to get removed.
bf.rxer.WaitUntilNoListeners()
+ bf.wg.Wait()
- bf.disconnectChan <- BleDisconnectEntry{dt, bf.peerDev, err}
+ close(bf.rxNmpChan)
close(bf.disconnectChan)
+}
+
+// Listens for an error in the state machine. On error, the session is
+// considered disconnected and the error is reported to the client.
+func (bf *BleFsm) listenForError() {
+ bf.wg.Add(1)
+
+ go func() {
+ err := <-bf.errFunnel.Wait()
+
+ dt := calcDisconnectType(bf.state)
+ bf.prevDisconnect = BleDisconnectEntry{dt, bf.peerDev, err}
- bf.disconnectChan = make(chan BleDisconnectEntry)
+ bf.wg.Done()
+ bf.shutdown(err)
+ }()
}
// Listens for events in the background.
func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) error {
+ bf.wg.Add(1)
+
go func() {
defer bf.rxer.RemoveSeqListener("connect", seq)
+ defer bf.wg.Done()
+
for {
select {
case err := <-bl.ErrChan:
@@ -254,8 +266,12 @@ func (bf *BleFsm) nmpRspListen() error {
return err
}
+ bf.wg.Add(1)
+
go func() {
defer bf.rxer.RemoveBaseListener("nmp-rsp", base)
+ defer bf.wg.Done()
+
for {
select {
case <-bl.ErrChan:
@@ -629,7 +645,7 @@ func (bf *BleFsm) executeState() (bool, error) {
return false, nil
}
-func (bf *BleFsm) DisconnectChan() <-chan BleDisconnectEntry {
+func (bf *BleFsm) DisconnectChan() <-chan struct{} {
return bf.disconnectChan
}
@@ -637,6 +653,10 @@ func (bf *BleFsm) RxNmpChan() <-chan []byte {
return bf.rxNmpChan
}
+func (bf *BleFsm) PrevDisconnect() BleDisconnectEntry {
+ return bf.prevDisconnect
+}
+
func (bf *BleFsm) startOnce() (bool, error) {
if !bf.IsClosed() {
return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
@@ -644,16 +664,23 @@ func (bf *BleFsm) startOnce() (bool, error) {
bf.state))
}
+ bf.disconnectChan = make(chan struct{})
+ bf.rxNmpChan = make(chan []byte)
+
+ bf.listenForError()
+
for {
retry, err := bf.executeState()
if err != nil {
- bf.errFunnel.Insert(err)
- err = <-bf.errFunnel.Wait()
+ // If stop fails, assume the connection wasn't established and
+ // force an error.
+ if bf.Stop() != nil {
+ bf.errFunnel.Insert(err)
+ }
+ <-bf.disconnectChan
return retry, err
} else if bf.state == SESN_STATE_DONE {
// We are fully connected. Listen for errors in the background.
- go bf.listenForError()
-
return false, nil
}
}
@@ -665,9 +692,6 @@ func (bf *BleFsm) startOnce() (bool, error) {
func (bf *BleFsm) Start() error {
var err error
- bf.disconnectChan = make(chan BleDisconnectEntry)
- bf.rxNmpChan = make(chan []byte)
-
for i := 0; i < bf.params.Central.ConnTries; i++ {
var retry bool
retry, err = bf.startOnce()
@@ -677,6 +701,8 @@ func (bf *BleFsm) Start() error {
}
if err != nil {
+ nmxutil.Assert(!bf.IsOpen())
+ nmxutil.Assert(bf.IsClosed())
return err
}
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index e1bf28b..8964865 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -2,6 +2,7 @@ package nmble
import (
"fmt"
+ "sync"
"time"
"github.com/runtimeco/go-coap"
@@ -20,6 +21,7 @@ type BleOicSesn struct {
d *omp.Dispatcher
closeTimeout time.Duration
onCloseCb sesn.OnCloseFn
+ wg sync.WaitGroup
closeChan chan struct{}
}
@@ -76,26 +78,33 @@ func (bos *BleOicSesn) Open() error {
bos.d = d
// Listen for disconnect in the background.
+ bos.wg.Add(1)
go func() {
// Block until disconnect.
- entry := <-bos.bf.DisconnectChan()
+ <-bos.bf.DisconnectChan()
+ pd := bos.bf.PrevDisconnect()
// Signal error to all listeners.
- bos.d.ErrorAll(entry.Err)
+ bos.d.ErrorAll(pd.Err)
bos.d.Stop()
+ bos.wg.Done()
+ bos.wg.Wait()
// If the session is being closed, unblock the close() call.
close(bos.closeChan)
// Only execute the client's disconnect callback if the disconnect was
// unsolicited.
- if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
- bos.onCloseCb(bos, entry.Err)
+ if pd.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil {
+ bos.onCloseCb(bos, pd.Err)
}
}()
// Listen for NMP responses in the background.
+ bos.wg.Add(1)
go func() {
+ defer bos.wg.Done()
+
for {
data, ok := <-bos.bf.RxNmpChan()
if !ok {
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index 782fce0..0d77d63 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -63,18 +63,19 @@ func (bps *BlePlainSesn) Open() error {
// Listen for disconnect in the background.
go func() {
// Block until disconnect.
- entry := <-bps.bf.DisconnectChan()
+ <-bps.bf.DisconnectChan()
+ pd := bps.bf.PrevDisconnect()
// Signal error to all listeners.
- bps.d.ErrorAll(entry.Err)
+ bps.d.ErrorAll(pd.Err)
// If the session is being closed, unblock the close() call.
close(bps.closeChan)
// Only execute the client's disconnect callback if the disconnect was
// unsolicited.
- if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil {
- bps.onCloseCb(bps, entry.Err)
+ if pd.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil {
+ bps.onCloseCb(bps, pd.Err)
}
}()
diff --git a/nmxact/nmble/ble_scanner.go b/nmxact/nmble/ble_scanner.go
index da8532c..6d4ee0f 100644
--- a/nmxact/nmble/ble_scanner.go
+++ b/nmxact/nmble/ble_scanner.go
@@ -114,7 +114,9 @@ func (s *BleScanner) scan() (*scan.ScanPeer, error) {
return nil, nil
}
- s.connect(*dev)
+ if err := s.connect(*dev); err != nil {
+ return nil, err
+ }
defer s.bos.Close()
// Now we are connected (and paired if required). Read the peer's hardware
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index 67b2d60..1a8f45e 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -367,13 +367,13 @@ func (bx *BleXport) startOnce() error {
return nmxutil.NewXportError("BLE xport started twice")
}
+ bx.stopChan = make(chan struct{})
+
if err := bx.startUnixChild(); err != nil {
bx.shutdown(true, err)
return err
}
- bx.stopChan = make(chan struct{})
-
// Listen for errors and data from the blehostd process.
go func() {
for {
diff --git a/nmxact/nmxutil/nmxutil.go b/nmxact/nmxutil/nmxutil.go
index 00e28c4..078a06d 100644
--- a/nmxact/nmxutil/nmxutil.go
+++ b/nmxact/nmxutil/nmxutil.go
@@ -17,6 +17,8 @@ import (
const DURATION_FOREVER time.Duration = math.MaxInt64
+var Debug bool
+
var nextNmpSeq uint8
var nmpSeqBeenRead bool
var nextOicSeq uint8
@@ -41,6 +43,12 @@ func SetLogLevel(level log.Level) {
ListenLog.Level = level
}
+func Assert(cond bool) {
+ if Debug && !cond {
+ panic("Failed assertion")
+ }
+}
+
func NextNmpSeq() uint8 {
seqMutex.Lock()
defer seqMutex.Unlock()
--
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.