You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/08/21 20:07:15 UTC
[mynewt-newtmgr] 01/02: nmxact - Fragment all outgoing messages.
This is an automated email from the ASF dual-hosted git repository.
ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git
commit 3aeccd56c5fa9dc8777ee61974c44f4c79824cfb
Author: Christopher Collins <cc...@apache.org>
AuthorDate: Mon Aug 21 12:52:44 2017 -0700
nmxact - Fragment all outgoing messages.
---
newtmgr/bll/bll_sesn.go | 19 +++++++--------
nmxact/mgmt/transceiver.go | 37 ++++++++++++++++++-----------
nmxact/nmble/ble_oic_svr.go | 21 ++++++++++-------
nmxact/nmble/ble_sesn.go | 19 +++++++--------
nmxact/nmble/ble_util.go | 53 ------------------------------------------
nmxact/nmserial/serial_sesn.go | 16 +++++++------
nmxact/nmxutil/nmxutil.go | 15 ++++++++++++
nmxact/sesn/sesn.go | 10 ++++----
nmxact/udp/udp_sesn.go | 12 +++++-----
nmxact/xact/fs.go | 5 ++--
nmxact/xact/image.go | 5 ++--
11 files changed, 95 insertions(+), 117 deletions(-)
diff --git a/newtmgr/bll/bll_sesn.go b/newtmgr/bll/bll_sesn.go
index eb86414..2917db0 100644
--- a/newtmgr/bll/bll_sesn.go
+++ b/newtmgr/bll/bll_sesn.go
@@ -30,6 +30,7 @@ import (
"github.com/runtimeco/go-coap"
"golang.org/x/net/context"
+ "mynewt.apache.org/newt/util"
"mynewt.apache.org/newtmgr/newtmgr/nmutil"
"mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/mgmt"
@@ -281,14 +282,12 @@ func (s *BllSesn) IsOpen() bool {
// Retrieves the maximum data payload for incoming NMP responses.
func (s *BllSesn) MtuIn() int {
- mtu, _ := nmble.MtuIn(s.cfg.MgmtProto, s.attMtu)
- return mtu
+ return int(s.attMtu) - nmble.NOTIFY_CMD_BASE_SZ
}
// Retrieves the maximum data payload for outgoing NMP requests.
func (s *BllSesn) MtuOut() int {
- mtu, _ := nmble.MtuOut(s.cfg.MgmtProto, s.attMtu)
- return mtu
+ return util.IntMin(s.MtuIn(), bledefs.BLE_ATT_ATTR_MAX_LEN)
}
// Stops a receive operation in progress. This must be called from a
@@ -298,10 +297,6 @@ func (s *BllSesn) AbortRx(nmpSeq uint8) error {
return nil
}
-func (s *BllSesn) EncodeNmpMsg(msg *nmp.NmpMsg) ([]byte, error) {
- return nmble.EncodeMgmtMsg(s.cfg.MgmtProto, msg)
-}
-
// Performs a blocking transmit a single NMP message and listens for the
// response.
// * nil: success.
@@ -324,7 +319,7 @@ func (s *BllSesn) TxNmpOnce(msg *nmp.NmpMsg, opt sesn.TxOptions) (
return s.cln.WriteCharacteristic(s.nmpReqChr, b, true)
}
- return s.txvr.TxNmp(txRaw, msg, opt.Timeout)
+ return s.txvr.TxNmp(txRaw, msg, s.MtuOut(), opt.Timeout)
}
func (s *BllSesn) resReqChr(resType sesn.ResourceType) (
@@ -357,7 +352,7 @@ func (s *BllSesn) TxCoapOnce(m coap.Message, resType sesn.ResourceType,
return s.cln.WriteCharacteristic(chr, b, true)
}
- rsp, err := s.txvr.TxOic(txRaw, m, opt.Timeout)
+ rsp, err := s.txvr.TxOic(txRaw, m, s.MtuOut(), opt.Timeout)
if err != nil {
return 0, nil, err
} else if rsp == nil {
@@ -366,3 +361,7 @@ func (s *BllSesn) TxCoapOnce(m coap.Message, resType sesn.ResourceType,
return rsp.Code(), rsp.Payload(), nil
}
}
+
+func (s *BllSesn) MgmtProto() sesn.MgmtProto {
+ return s.cfg.MgmtProto
+}
diff --git a/nmxact/mgmt/transceiver.go b/nmxact/mgmt/transceiver.go
index fbc28ec..6d6cb82 100644
--- a/nmxact/mgmt/transceiver.go
+++ b/nmxact/mgmt/transceiver.go
@@ -49,7 +49,7 @@ func NewTransceiver(isTcp bool, mgmtProto sesn.MgmtProto, logDepth int) (
return t, nil
}
-func (t *Transceiver) txPlain(txCb TxFn, req *nmp.NmpMsg,
+func (t *Transceiver) txPlain(txCb TxFn, req *nmp.NmpMsg, mtu int,
timeout time.Duration) (nmp.NmpRsp, error) {
nl, err := t.nd.AddListener(req.Hdr.Seq)
@@ -65,8 +65,11 @@ func (t *Transceiver) txPlain(txCb TxFn, req *nmp.NmpMsg,
log.Debugf("Tx NMP request: %s", hex.Dump(b))
- if err := txCb(b); err != nil {
- return nil, err
+ frags := nmxutil.Fragment(b, mtu)
+ for _, frag := range frags {
+ if err := txCb(frag); err != nil {
+ return nil, err
+ }
}
// Now wait for NMP response.
@@ -82,8 +85,8 @@ func (t *Transceiver) txPlain(txCb TxFn, req *nmp.NmpMsg,
}
}
-func (t *Transceiver) txOmp(txCb TxFn, req *nmp.NmpMsg, timeout time.Duration) (
- nmp.NmpRsp, error) {
+func (t *Transceiver) txOmp(txCb TxFn, req *nmp.NmpMsg, mtu int,
+ timeout time.Duration) (nmp.NmpRsp, error) {
nl, err := t.od.AddNmpListener(req.Hdr.Seq)
if err != nil {
@@ -103,8 +106,11 @@ func (t *Transceiver) txOmp(txCb TxFn, req *nmp.NmpMsg, timeout time.Duration) (
log.Debugf("Tx OMP request: %s", hex.Dump(b))
- if err := txCb(b); err != nil {
- return nil, err
+ frags := nmxutil.Fragment(b, mtu)
+ for _, frag := range frags {
+ if err := txCb(frag); err != nil {
+ return nil, err
+ }
}
// Now wait for NMP response.
@@ -120,17 +126,17 @@ func (t *Transceiver) txOmp(txCb TxFn, req *nmp.NmpMsg, timeout time.Duration) (
}
}
-func (t *Transceiver) TxNmp(txCb TxFn, req *nmp.NmpMsg, timeout time.Duration) (
- nmp.NmpRsp, error) {
+func (t *Transceiver) TxNmp(txCb TxFn, req *nmp.NmpMsg, mtu int,
+ timeout time.Duration) (nmp.NmpRsp, error) {
if t.nd != nil {
- return t.txPlain(txCb, req, timeout)
+ return t.txPlain(txCb, req, mtu, timeout)
} else {
- return t.txOmp(txCb, req, timeout)
+ return t.txOmp(txCb, req, mtu, timeout)
}
}
-func (t *Transceiver) TxOic(txCb TxFn, req coap.Message,
+func (t *Transceiver) TxOic(txCb TxFn, req coap.Message, mtu int,
timeout time.Duration) (coap.Message, error) {
b, err := oic.Encode(req)
@@ -158,8 +164,11 @@ func (t *Transceiver) TxOic(txCb TxFn, req coap.Message,
}
log.Debugf("Tx OIC request: %s", hex.Dump(b))
- if err := txCb(b); err != nil {
- return nil, err
+ frags := nmxutil.Fragment(b, mtu)
+ for _, frag := range frags {
+ if err := txCb(frag); err != nil {
+ return nil, err
+ }
}
if !rspExpected {
diff --git a/nmxact/nmble/ble_oic_svr.go b/nmxact/nmble/ble_oic_svr.go
index 6dc609f..c0c6d9f 100644
--- a/nmxact/nmble/ble_oic_svr.go
+++ b/nmxact/nmble/ble_oic_svr.go
@@ -4,6 +4,7 @@ import (
log "github.com/Sirupsen/logrus"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
+ "mynewt.apache.org/newtmgr/nmxact/nmxutil"
"mynewt.apache.org/newtmgr/nmxact/oic"
)
@@ -44,30 +45,34 @@ func (b *BleOicSvr) Rx(access BleGattAccess) uint8 {
s := b.x.findSesn(access.ConnHandle)
if s == nil {
// The sender is no longer connected.
+ log.Debugf("Failed to send CoAP response; peer no longer "+
+ "connected (conn_handle=%d)", access.ConnHandle)
return ERR_CODE_ATT_UNLIKELY
}
data, err := ml.MarshalBinary()
if err != nil {
+ log.Debugf("Failed to send CoAP response; error marshalling "+
+ "response: %s", err.Error())
return ERR_CODE_ATT_UNLIKELY
}
_, valHandle, err := FindChrXact(b.x, b.rspSvcUuid, b.rspChrUuid)
if err != nil {
+ log.Debugf("Failed to send CoAP response; cannot find response "+
+ "characteristic: (s=%s c=%s)",
+ b.rspSvcUuid.String(), b.rspChrUuid.String())
return ERR_CODE_ATT_UNLIKELY
}
mtu := s.MtuOut()
- for off := 0; off < len(data); off += mtu {
- chunkEnd := off + mtu
- if chunkEnd > len(data) {
- chunkEnd = len(data)
- }
- chunk := data[off:chunkEnd]
-
+ frags := nmxutil.Fragment(data, mtu)
+ for _, frag := range frags {
if err := NotifyXact(b.x, access.ConnHandle, valHandle,
- chunk); err != nil {
+ frag); err != nil {
+ log.Debugf("Failed to send CoAP response; failed to send "+
+ "fragment: %s", err.Error())
return ERR_CODE_ATT_UNLIKELY
}
}
diff --git a/nmxact/nmble/ble_sesn.go b/nmxact/nmble/ble_sesn.go
index 705c76b..33098bc 100644
--- a/nmxact/nmble/ble_sesn.go
+++ b/nmxact/nmble/ble_sesn.go
@@ -26,6 +26,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/runtimeco/go-coap"
+ "mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
"mynewt.apache.org/newtmgr/nmxact/mgmt"
"mynewt.apache.org/newtmgr/nmxact/nmp"
@@ -298,18 +299,16 @@ func (s *BleSesn) IsOpen() bool {
return s.conn.IsConnected()
}
-func (s *BleSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
- return EncodeMgmtMsg(s.cfg.MgmtProto, m)
-}
-
func (s *BleSesn) MtuIn() int {
- mtu, _ := MtuIn(s.cfg.MgmtProto, s.conn.AttMtu())
- return mtu
+ return int(s.conn.AttMtu()) - NOTIFY_CMD_BASE_SZ
}
func (s *BleSesn) MtuOut() int {
- mtu, _ := MtuOut(s.cfg.MgmtProto, s.conn.AttMtu())
- return mtu
+ return util.IntMin(s.MtuIn(), BLE_ATT_ATTR_MAX_LEN)
+}
+
+func (s *BleSesn) MgmtProto() sesn.MgmtProto {
+ return s.cfg.MgmtProto
}
func (s *BleSesn) ConnInfo() (BleConnDesc, error) {
@@ -332,7 +331,7 @@ func (s *BleSesn) TxNmpOnce(req *nmp.NmpMsg, opt sesn.TxOptions) (
return s.conn.WriteChrNoRsp(chr, b, "nmp")
}
- return s.txvr.TxNmp(txRaw, req, opt.Timeout)
+ return s.txvr.TxNmp(txRaw, req, s.MtuOut(), opt.Timeout)
}
func (s *BleSesn) TxCoapOnce(m coap.Message,
@@ -352,7 +351,7 @@ func (s *BleSesn) TxCoapOnce(m coap.Message,
return s.conn.WriteChrNoRsp(chr, b, "coap")
}
- rsp, err := s.txvr.TxOic(txRaw, m, opt.Timeout)
+ rsp, err := s.txvr.TxOic(txRaw, m, s.MtuOut(), opt.Timeout)
if err != nil {
return 0, nil, err
} else if rsp == nil {
diff --git a/nmxact/nmble/ble_util.go b/nmxact/nmble/ble_util.go
index d1d0bfb..7bb1e6f 100644
--- a/nmxact/nmble/ble_util.go
+++ b/nmxact/nmble/ble_util.go
@@ -27,12 +27,9 @@ import (
log "github.com/Sirupsen/logrus"
- "mynewt.apache.org/newt/util"
. "mynewt.apache.org/newtmgr/nmxact/bledefs"
- "mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/nmxutil"
"mynewt.apache.org/newtmgr/nmxact/oic"
- "mynewt.apache.org/newtmgr/nmxact/omp"
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
@@ -800,53 +797,3 @@ func BuildMgmtChrs(mgmtProto sesn.MgmtProto) (BleMgmtChrs, error) {
return mgmtChrs, nil
}
-
-func MtuIn(mgmtProto sesn.MgmtProto, attMtu uint16) (int, error) {
- var mtu int
-
- switch mgmtProto {
- case sesn.MGMT_PROTO_NMP:
- mtu = int(attMtu) - NOTIFY_CMD_BASE_SZ - nmp.NMP_HDR_SIZE
-
- case sesn.MGMT_PROTO_OMP:
- mtu = int(attMtu) - NOTIFY_CMD_BASE_SZ - omp.OMP_MSG_OVERHEAD -
- nmp.NMP_HDR_SIZE
-
- default:
- return 0, fmt.Errorf("Invalid management protocol: %s", mgmtProto)
- }
-
- return mtu, nil
-}
-
-func MtuOut(mgmtProto sesn.MgmtProto, attMtu uint16) (int, error) {
- var mtu int
-
- switch mgmtProto {
- case sesn.MGMT_PROTO_NMP:
- mtu = int(attMtu) - NOTIFY_CMD_BASE_SZ - nmp.NMP_HDR_SIZE
-
- case sesn.MGMT_PROTO_OMP:
- mtu = int(attMtu) - NOTIFY_CMD_BASE_SZ - omp.OMP_MSG_OVERHEAD -
- nmp.NMP_HDR_SIZE
-
- default:
- return 0, fmt.Errorf("Invalid management protocol: %s", mgmtProto)
- }
-
- return util.IntMin(mtu, BLE_ATT_ATTR_MAX_LEN), nil
-}
-
-func EncodeMgmtMsg(mgmtProto sesn.MgmtProto, m *nmp.NmpMsg) ([]byte, error) {
- switch mgmtProto {
- case sesn.MGMT_PROTO_NMP:
- return nmp.EncodeNmpPlain(m)
-
- case sesn.MGMT_PROTO_OMP:
- return omp.EncodeOmpTcp(m)
-
- default:
- return nil,
- fmt.Errorf("invalid management protocol: %+v", mgmtProto)
- }
-}
diff --git a/nmxact/nmserial/serial_sesn.go b/nmxact/nmserial/serial_sesn.go
index 97a9c69..2cc5692 100644
--- a/nmxact/nmserial/serial_sesn.go
+++ b/nmxact/nmserial/serial_sesn.go
@@ -33,6 +33,7 @@ import (
)
type SerialSesn struct {
+ cfg sesn.SesnCfg
sx *SerialXport
txvr *mgmt.Transceiver
isOpen bool
@@ -45,7 +46,8 @@ type SerialSesn struct {
func NewSerialSesn(sx *SerialXport, cfg sesn.SesnCfg) (*SerialSesn, error) {
s := &SerialSesn{
- sx: sx,
+ cfg: cfg,
+ sx: sx,
}
txvr, err := mgmt.NewTransceiver(false, cfg.MgmtProto, 3)
@@ -108,10 +110,6 @@ func (s *SerialSesn) AbortRx(seq uint8) error {
return nil
}
-func (s *SerialSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
- return omp.EncodeOmpDgram(m)
-}
-
func (s *SerialSesn) TxNmpOnce(m *nmp.NmpMsg, opt sesn.TxOptions) (
nmp.NmpRsp, error) {
@@ -136,7 +134,7 @@ func (s *SerialSesn) TxNmpOnce(m *nmp.NmpMsg, opt sesn.TxOptions) (
return nil
}
- return s.txvr.TxNmp(txFn, m, opt.Timeout)
+ return s.txvr.TxNmp(txFn, m, s.MtuOut(), opt.Timeout)
}
func (s *SerialSesn) TxCoapOnce(m coap.Message, resType sesn.ResourceType,
@@ -155,7 +153,7 @@ func (s *SerialSesn) TxCoapOnce(m coap.Message, resType sesn.ResourceType,
return nil
}
- rsp, err := s.txvr.TxOic(txFn, m, opt.Timeout)
+ rsp, err := s.txvr.TxOic(txFn, m, s.MtuOut(), opt.Timeout)
if err != nil {
return 0, nil, err
} else if rsp == nil {
@@ -164,3 +162,7 @@ func (s *SerialSesn) TxCoapOnce(m coap.Message, resType sesn.ResourceType,
return rsp.Code(), rsp.Payload(), nil
}
}
+
+func (s *SerialSesn) MgmtProto() sesn.MgmtProto {
+ return s.cfg.MgmtProto
+}
diff --git a/nmxact/nmxutil/nmxutil.go b/nmxact/nmxutil/nmxutil.go
index f49e3f8..954cc94 100644
--- a/nmxact/nmxutil/nmxutil.go
+++ b/nmxact/nmxutil/nmxutil.go
@@ -132,6 +132,21 @@ func StopAndDrainTimer(timer *time.Timer) {
}
}
+func Fragment(b []byte, mtu int) [][]byte {
+ frags := [][]byte{}
+
+ for off := 0; off < len(b); off += mtu {
+ fragEnd := off + mtu
+ if fragEnd > len(b) {
+ fragEnd = len(b)
+ }
+ frag := b[off:fragEnd]
+ frags = append(frags, frag)
+ }
+
+ return frags
+}
+
var nextId uint32
func GetNextId() uint32 {
diff --git a/nmxact/sesn/sesn.go b/nmxact/sesn/sesn.go
index ab5fb9e..9fd67ae 100644
--- a/nmxact/sesn/sesn.go
+++ b/nmxact/sesn/sesn.go
@@ -71,11 +71,13 @@ type Sesn interface {
// Indicates whether the session is currently open.
IsOpen() bool
- // Retrieves the maximum data payload for outgoing NMP requests.
+ // Retrieves the maximum data payload for incoming data packets.
+ MtuIn() int
+
+ // Retrieves the maximum data payload for outgoing data packets.
MtuOut() int
- // Retrieves the maximum data payload for incoming NMP responses.
- MtuIn() int
+ MgmtProto() MgmtProto
// Stops a receive operation in progress. This must be called from a
// separate thread, as sesn receive operations are blocking.
@@ -85,8 +87,6 @@ type Sesn interface {
////// Internal to nmxact:
- EncodeNmpMsg(msg *nmp.NmpMsg) ([]byte, error)
-
// Performs a blocking transmit a single NMP message and listens for the
// response.
// * nil: success.
diff --git a/nmxact/udp/udp_sesn.go b/nmxact/udp/udp_sesn.go
index d6447f2..feae2a6 100644
--- a/nmxact/udp/udp_sesn.go
+++ b/nmxact/udp/udp_sesn.go
@@ -101,10 +101,6 @@ func (s *UdpSesn) MtuOut() int {
nmp.NMP_HDR_SIZE
}
-func (s *UdpSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
- return omp.EncodeOmpDgram(m)
-}
-
func (s *UdpSesn) TxNmpOnce(m *nmp.NmpMsg, opt sesn.TxOptions) (
nmp.NmpRsp, error) {
@@ -116,7 +112,7 @@ func (s *UdpSesn) TxNmpOnce(m *nmp.NmpMsg, opt sesn.TxOptions) (
_, err := s.conn.WriteToUDP(b, s.addr)
return err
}
- return s.txvr.TxNmp(txRaw, m, opt.Timeout)
+ return s.txvr.TxNmp(txRaw, m, s.MtuOut(), opt.Timeout)
}
func (s *UdpSesn) AbortRx(seq uint8) error {
@@ -132,7 +128,7 @@ func (s *UdpSesn) TxCoapOnce(m coap.Message, resType sesn.ResourceType,
return err
}
- rsp, err := s.txvr.TxOic(txRaw, m, opt.Timeout)
+ rsp, err := s.txvr.TxOic(txRaw, m, s.MtuOut(), opt.Timeout)
if err != nil {
return 0, nil, err
} else if rsp == nil {
@@ -141,3 +137,7 @@ func (s *UdpSesn) TxCoapOnce(m coap.Message, resType sesn.ResourceType,
return rsp.Code(), rsp.Payload(), nil
}
}
+
+func (s *UdpSesn) MgmtProto() sesn.MgmtProto {
+ return s.cfg.MgmtProto
+}
diff --git a/nmxact/xact/fs.go b/nmxact/xact/fs.go
index 66d18a3..47fb798 100644
--- a/nmxact/xact/fs.go
+++ b/nmxact/xact/fs.go
@@ -22,6 +22,7 @@ package xact
import (
"fmt"
+ "mynewt.apache.org/newtmgr/nmxact/mgmt"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
@@ -141,7 +142,7 @@ func nextFsUploadReq(s sesn.Sesn, name string, data []byte, off int) (
// First, build a request without data to determine how much data could
// fit.
empty := buildFsUploadReq(name, len(data), nil, off)
- emptyEnc, err := s.EncodeNmpMsg(empty.Msg())
+ emptyEnc, err := mgmt.EncodeMgmt(s.MgmtProto(), empty.Msg())
if err != nil {
return nil, err
}
@@ -156,7 +157,7 @@ func nextFsUploadReq(s sesn.Sesn, name string, data []byte, off int) (
// not be valid for some encodings (e.g., CBOR uses variable length fields
// to encodes byte string lengths).
r := buildFsUploadReq(name, len(data), data[off:off+room], off)
- enc, err := s.EncodeNmpMsg(r.Msg())
+ enc, err := mgmt.EncodeMgmt(s.MgmtProto(), r.Msg())
if err != nil {
return nil, err
}
diff --git a/nmxact/xact/image.go b/nmxact/xact/image.go
index 2fd2d9c..78880dd 100644
--- a/nmxact/xact/image.go
+++ b/nmxact/xact/image.go
@@ -22,6 +22,7 @@ package xact
import (
"fmt"
+ "mynewt.apache.org/newtmgr/nmxact/mgmt"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
@@ -80,7 +81,7 @@ func nextImageUploadReq(s sesn.Sesn, data []byte, off int) (
// First, build a request without data to determine how much data could
// fit.
empty := buildImageUploadReq(len(data), nil, off)
- emptyEnc, err := s.EncodeNmpMsg(empty.Msg())
+ emptyEnc, err := mgmt.EncodeMgmt(s.MgmtProto(), empty.Msg())
if err != nil {
return nil, err
}
@@ -101,7 +102,7 @@ func nextImageUploadReq(s sesn.Sesn, data []byte, off int) (
// not be valid for some encodings (e.g., CBOR uses variable length fields
// to encodes byte string lengths).
r := buildImageUploadReq(len(data), data[off:off+room], off)
- enc, err := s.EncodeNmpMsg(r.Msg())
+ enc, err := mgmt.EncodeMgmt(s.MgmtProto(), r.Msg())
if err != nil {
return nil, err
}
--
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <co...@mynewt.apache.org>.