You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by GitBox <gi...@apache.org> on 2018/05/22 14:58:31 UTC

[GitHub] sterlinghughes closed pull request #74: nmxact; add API to allow CoAP server side operation

sterlinghughes closed pull request #74: nmxact; add API to allow CoAP server side operation
URL: https://github.com/apache/mynewt-newtmgr/pull/74
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/newtmgr/bll/bll_sesn.go b/newtmgr/bll/bll_sesn.go
index 1e959a11..175ee448 100644
--- a/newtmgr/bll/bll_sesn.go
+++ b/newtmgr/bll/bll_sesn.go
@@ -382,6 +382,14 @@ func (s *BllSesn) AbortRx(nmpSeq uint8) error {
 	return nil
 }
 
+func (s *BllSesn) RxAccept() (sesn.Sesn, *sesn.SesnCfg, error) {
+	return nil, nil, fmt.Errorf("Op not implemented yet")
+}
+
+func (s *BllSesn) RxCoap(opt sesn.TxOptions) (coap.Message, error) {
+	return nil, fmt.Errorf("Op not implemented yet")
+}
+
 // Performs a blocking transmit a single NMP message and listens for the
 // response.
 //     * nil: success.
diff --git a/newtmgr/config/mtech_lora_config.go b/newtmgr/config/mtech_lora_config.go
index 4194b040..682e5740 100644
--- a/newtmgr/config/mtech_lora_config.go
+++ b/newtmgr/config/mtech_lora_config.go
@@ -25,14 +25,17 @@ import (
 
 	"mynewt.apache.org/newt/util"
 	"mynewt.apache.org/newtmgr/newtmgr/nmutil"
+	"mynewt.apache.org/newtmgr/nmxact/lora"
 	"mynewt.apache.org/newtmgr/nmxact/mtech_lora"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
 )
 
 func NewMtechLoraConfig() *mtech_lora.LoraConfig {
 	return &mtech_lora.LoraConfig{
-		Addr:  "",
-		SegSz: 0,
+		Addr:        "",
+		SegSz:       0,
+		ConfirmedTx: false,
+		Port:        lora.COAP_LORA_PORT,
 	}
 }
 
@@ -68,6 +71,12 @@ func ParseMtechLoraConnString(cs string) (*mtech_lora.LoraConfig, error) {
 			if err != nil {
 				return mc, util.FmtNewtError("Invalid confirmedtx: %s", v)
 			}
+		case "port":
+			port, err := strconv.ParseUint(v, 10, 8)
+			if err != nil {
+				return mc, util.FmtNewtError("Invalid port number: %s", v)
+			}
+			mc.Port = uint8(port)
 		default:
 			return nil, util.FmtNewtError("Unrecognized key: %s", k)
 		}
@@ -80,6 +89,7 @@ func FillMtechLoraSesnCfg(mc *mtech_lora.LoraConfig, sc *sesn.SesnCfg) error {
 	sc.Lora.Addr = mc.Addr
 	sc.Lora.SegSz = mc.SegSz
 	sc.Lora.ConfirmedTx = mc.ConfirmedTx
+	sc.Lora.Port = mc.Port
 	if nmutil.DeviceName != "" {
 		sc.Lora.Addr = nmutil.DeviceName
 	}
diff --git a/nmxact/lora/lora_coap.go b/nmxact/lora/lora_coap.go
index fbd62061..e69f669e 100644
--- a/nmxact/lora/lora_coap.go
+++ b/nmxact/lora/lora_coap.go
@@ -29,3 +29,4 @@ type CoapLoraFrag struct {
 }
 
 const COAP_LORA_LAST_FRAG uint8 = 0x80
+const COAP_LORA_PORT = 0xbb
diff --git a/nmxact/mgmt/transceiver.go b/nmxact/mgmt/transceiver.go
index b7ebe528..da2f8316 100644
--- a/nmxact/mgmt/transceiver.go
+++ b/nmxact/mgmt/transceiver.go
@@ -45,6 +45,7 @@ type Transceiver struct {
 	od *omp.Dispatcher
 
 	isTcp bool
+	proto sesn.MgmtProto
 	wg    sync.WaitGroup
 }
 
@@ -53,6 +54,7 @@ func NewTransceiver(txFilterCb, rxFilterCb nmcoap.MsgFilter, isTcp bool, mgmtPro
 
 	t := &Transceiver{
 		isTcp: isTcp,
+		proto: mgmtProto,
 	}
 
 	if mgmtProto == sesn.MGMT_PROTO_NMP {
@@ -182,6 +184,9 @@ func (t *Transceiver) TxOic(txCb TxFn, req coap.Message, mtu int,
 	default:
 		rspExpected = false
 	}
+	if t.proto == sesn.MGMT_PROTO_COAP_SERVER {
+		rspExpected = false
+	}
 
 	var ol *nmcoap.Listener
 	if rspExpected {
@@ -304,6 +309,10 @@ func (t *Transceiver) DispatchCoap(data []byte) {
 	t.od.Dispatch(data)
 }
 
+func (t *Transceiver) ProcessCoapReq(data []byte) (coap.Message, error) {
+	return t.od.ProcessCoapReq(data)
+}
+
 func (t *Transceiver) ErrorOne(seq uint8, err error) {
 	if t.nd != nil {
 		t.nd.ErrorOne(seq, err)
@@ -326,3 +335,7 @@ func (t *Transceiver) AbortRx(seq uint8) {
 func (t *Transceiver) Stop() {
 	t.od.Stop()
 }
+
+func (t *Transceiver) MgmtProto() sesn.MgmtProto {
+	return t.proto
+}
diff --git a/nmxact/mtech_lora/listen.go b/nmxact/mtech_lora/listen.go
index 64dc9459..89698dd2 100644
--- a/nmxact/mtech_lora/listen.go
+++ b/nmxact/mtech_lora/listen.go
@@ -22,7 +22,7 @@ package mtech_lora
 import (
 	"bytes"
 	"fmt"
-	"time"
+	"strconv"
 
 	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 )
@@ -32,6 +32,13 @@ type ListenerKey struct {
 	Type   string
 }
 
+func TgtPortKey(tgt string, port uint8, msgType string) ListenerKey {
+	return ListenerKey{
+		Target: tgt + strconv.Itoa(int(port)),
+		Type:   msgType,
+	}
+}
+
 func TgtKey(tgt string, msgType string) ListenerKey {
 	return ListenerKey{
 		Target: tgt,
@@ -47,50 +54,33 @@ func TypeKey(msgType string) ListenerKey {
 }
 
 type Listener struct {
-	MsgChan chan []byte
-	MtuChan chan int
-	ErrChan chan error
-	TmoChan chan time.Time
-	Acked   bool
+	MsgChan  chan []byte
+	MtuChan  chan int
+	ConnChan chan *LoraSesn
 
+	RefCnt   int
 	Data     *bytes.Buffer
 	NextFrag uint8
 	Crc      uint16
-
-	timer *time.Timer
 }
 
 func NewListener() *Listener {
 	return &Listener{
-		MsgChan: make(chan []byte, 16),
-		MtuChan: make(chan int, 1),
-		ErrChan: make(chan error, 1),
-		TmoChan: make(chan time.Time, 1),
+		MsgChan:  make(chan []byte, 16),
+		MtuChan:  make(chan int, 1),
+		ConnChan: make(chan *LoraSesn, 4),
 
+		RefCnt: 1,
 		Data: bytes.NewBuffer([]byte{}),
 	}
 }
 
-func (ll *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time {
-	fn := func() {
-		if !ll.Acked {
-			ll.TmoChan <- time.Now()
-		}
-	}
-	ll.timer = time.AfterFunc(tmo, fn)
-	return ll.TmoChan
-}
-
 func (ll *Listener) Close() {
-	// This provokes a race condition.  The timer may get initialized at any
-	// time.
-	if ll.timer != nil {
-		ll.timer.Stop()
-	}
 
-	// Mark the command as acked in case the race condition mentioned above
-	// occurred.  If the timer goes off, nothing will happen.
-	ll.Acked = true
+	ll.RefCnt--
+	if ll.RefCnt > 0 {
+		return
+	}
 
 	close(ll.MsgChan)
 	for {
@@ -106,18 +96,13 @@ func (ll *Listener) Close() {
 		}
 	}
 
-	close(ll.ErrChan)
-	for {
-		if _, ok := <-ll.ErrChan; !ok {
-			break
-		}
-	}
-
-	close(ll.TmoChan)
+	close(ll.ConnChan)
 	for {
-		if _, ok := <-ll.TmoChan; !ok {
+		l, ok := <-ll.ConnChan
+		if !ok {
 			break
 		}
+		l.Close()
 	}
 }
 
@@ -134,12 +119,12 @@ func NewListenerMap() *ListenerMap {
 	}
 }
 
-func (lm *ListenerMap) FindListener(tgt string, msgType string) (
+func (lm *ListenerMap) FindListener(tgt string, port uint8, msgType string) (
 	ListenerKey, *Listener) {
 
 	var key ListenerKey
 
-	key = TgtKey(tgt, msgType)
+	key = TgtPortKey(tgt, port, msgType)
 	if listener := lm.k2l[key]; listener != nil {
 		return key, listener
 	}
@@ -196,15 +181,99 @@ func (lm *ListenerMap) RemoveKey(key ListenerKey) *Listener {
 	return listener
 }
 
-func (lm *ListenerMap) ExtractAll() []*Listener {
-	listeners := make([]*Listener, 0, len(lm.l2k))
+func (lm *ListenerMap) Dump() {
+	fmt.Printf(" key -> listener\n")
+	for key, l := range lm.k2l {
+		fmt.Printf("  %s: %p,%d\n", key, l, l.RefCnt)
+	}
+	fmt.Printf(" listener -> key\n")
+	for l, key := range lm.l2k {
+		fmt.Printf("  %p %s\n", l, key)
+	}
+}
 
-	for listener, _ := range lm.l2k {
-		listeners = append(listeners, listener)
+// not thread safe
+type ListenerSlice struct {
+	k2l map[ListenerKey][]*Listener
+	l2k map[*Listener]ListenerKey
+}
+
+func NewListenerSlice() *ListenerSlice {
+	return &ListenerSlice{
+		k2l: map[ListenerKey][]*Listener{},
+		l2k: map[*Listener]ListenerKey{},
 	}
+}
 
-	lm.k2l = map[ListenerKey]*Listener{}
-	lm.l2k = map[*Listener]ListenerKey{}
+func (lm *ListenerSlice) FindListener(tgt string, msgType string) (
+	ListenerKey, []*Listener) {
+
+	var key ListenerKey
+
+	key = TgtKey(tgt, msgType)
+	if lSlice := lm.k2l[key]; lSlice != nil {
+		return key, lSlice
+	}
+
+	key = TypeKey(msgType)
+	if lSlice := lm.k2l[key]; lSlice != nil {
+		return key, lSlice
+	}
 
-	return listeners
+	return key, nil
+}
+
+func (lm *ListenerSlice) AddListener(key ListenerKey, listener *Listener) error {
+	if _, ok := lm.l2k[listener]; ok {
+		nmxutil.Assert(false)
+		return fmt.Errorf("Duplicate Lora listener: %#v", key)
+	}
+
+	lm.k2l[key] = append(lm.k2l[key], listener)
+	lm.l2k[listener] = key
+
+	return nil
+}
+
+func (lm *ListenerSlice) deleteListener(key ListenerKey, listener *Listener) {
+	nmxutil.Assert(lm.l2k[listener] == key)
+
+	for i, elem := range lm.k2l[key] {
+		if elem == listener {
+			slen := len(lm.k2l[key])
+			lm.k2l[key][i] = lm.k2l[key][slen-1]
+			lm.k2l[key][slen-1] = nil
+			lm.k2l[key] = lm.k2l[key][:slen-1]
+			if slen == 1 {
+				delete(lm.k2l, key)
+			}
+			break
+		}
+	}
+	delete(lm.l2k, listener)
+}
+
+func (lm *ListenerSlice) RemoveListener(listener *Listener) *ListenerKey {
+	key, ok := lm.l2k[listener]
+	if !ok {
+		return nil
+	}
+
+	lm.deleteListener(key, listener)
+	return &key
+}
+
+func (ls *ListenerSlice) Dump() {
+	fmt.Printf(" key -> listener\n")
+	for key, sl := range ls.k2l {
+		fmt.Printf("  %s\n\t", key)
+		for i, elem := range sl {
+			fmt.Printf("[%d %p] ", i, elem)
+		}
+		fmt.Printf("\n")
+	}
+	fmt.Printf(" listener -> key\n")
+	for l, key := range ls.l2k {
+		fmt.Printf("  %p %s\n", l, key)
+	}
 }
diff --git a/nmxact/mtech_lora/mtech_lora_sesn.go b/nmxact/mtech_lora/mtech_lora_sesn.go
index 5906d031..3eca22f2 100644
--- a/nmxact/mtech_lora/mtech_lora_sesn.go
+++ b/nmxact/mtech_lora/mtech_lora_sesn.go
@@ -25,6 +25,7 @@ import (
 	"encoding/binary"
 	"fmt"
 	"sync"
+	"time"
 
 	log "github.com/Sirupsen/logrus"
 	"github.com/joaojeronimo/go-crc16"
@@ -41,31 +42,40 @@ import (
 )
 
 type LoraSesn struct {
-	cfg      sesn.SesnCfg
-	txvr     *mgmt.Transceiver
-	isOpen   bool
-	mtu      int
-	xport    *LoraXport
-	listener *Listener
-	wg       sync.WaitGroup
-	stopChan chan struct{}
+	cfg           sesn.SesnCfg
+	txvr          *mgmt.Transceiver
+	isOpen        bool
+	mtu           int
+	xport         *LoraXport
+	msgListener   *Listener
+	reassListener *Listener
+	tgtListener   *Listener
+	wg            sync.WaitGroup
+	stopChan      chan struct{}
 
 	txFilterCb nmcoap.MsgFilter
 	rxFilterCb nmcoap.MsgFilter
 }
 
 type mtechLoraTx struct {
-	Port uint16 `json:"port"`
+	Port uint8  `json:"port"`
 	Data string `json:"data"`
 	Ack  bool   `json:"ack"`
 }
 
 func NewLoraSesn(cfg sesn.SesnCfg, lx *LoraXport) (*LoraSesn, error) {
-	addr, err := NormalizeAddr(cfg.Lora.Addr)
-	if err != nil {
-		return nil, fmt.Errorf("Invalid Lora address %s\n", cfg.Lora.Addr)
+	if cfg.Lora.Addr != "" || cfg.MgmtProto != sesn.MGMT_PROTO_COAP_SERVER {
+		// Allow server to bind to wildcard address
+		addr, err := NormalizeAddr(cfg.Lora.Addr)
+		if err != nil {
+			return nil, fmt.Errorf("Invalid Lora address %s\n", cfg.Lora.Addr)
+		}
+		cfg.Lora.Addr = addr
+	}
+
+	if cfg.Lora.Port < 1 || cfg.Lora.Port > 223 {
+		return nil, fmt.Errorf("Invalid Lora Port %d\n", cfg.Lora.Addr)
 	}
-	cfg.Lora.Addr = addr
 	s := &LoraSesn{
 		cfg:        cfg,
 		xport:      lx,
@@ -83,37 +93,72 @@ func (s *LoraSesn) Open() error {
 			"Attempt to open an already-open Lora session")
 	}
 
-	key := TgtKey(s.cfg.Lora.Addr, "rx")
-	s.xport.Lock()
-
 	txFilterCb, rxFilterCb := s.Filters()
-	txvr, err := mgmt.NewTransceiver(txFilterCb, rxFilterCb, false, s.cfg.MgmtProto, 3)
+	txvr, err := mgmt.NewTransceiver(txFilterCb, rxFilterCb, false,
+		s.cfg.MgmtProto, 3)
 	if err != nil {
 		return err
 	}
 	s.txvr = txvr
 	s.stopChan = make(chan struct{})
-	s.listener = NewListener()
 
-	err = s.xport.listenMap.AddListener(key, s.listener)
+	msgType := "rsp"
+	if s.cfg.MgmtProto == sesn.MGMT_PROTO_COAP_SERVER {
+		msgType = "req"
+	}
+
+	s.xport.Lock()
+
+	msgKey := TgtPortKey(s.cfg.Lora.Addr, s.cfg.Lora.Port, msgType)
+	s.msgListener = NewListener()
+	err = s.xport.msgMap.AddListener(msgKey, s.msgListener)
 	if err != nil {
+		s.xport.Unlock()
 		s.txvr.Stop()
 		return err
 	}
+
+	if s.cfg.Lora.Addr != "" {
+		msgKey, l := s.xport.reassMap.FindListener(s.cfg.Lora.Addr,
+			s.cfg.Lora.Port, "reass")
+		if l != nil {
+			l.RefCnt++
+		} else {
+			msgKey = TgtPortKey(s.cfg.Lora.Addr, s.cfg.Lora.Port, "reass")
+			l = NewListener()
+			s.xport.reassMap.AddListener(msgKey, l)
+		}
+		s.reassListener = l
+
+		tgtKey := TgtKey(s.cfg.Lora.Addr, "rx")
+		s.tgtListener = NewListener()
+		err = s.xport.tgtMap.AddListener(tgtKey, s.tgtListener)
+		if err != nil {
+			s.xport.Unlock()
+			s.closeListeners()
+			s.txvr.Stop()
+			return err
+		}
+	}
+
 	s.xport.Unlock()
 
+	if s.cfg.MgmtProto == sesn.MGMT_PROTO_COAP_SERVER {
+		s.isOpen = true
+		return nil
+	}
 	s.wg.Add(1)
 	go func() {
 		defer s.wg.Done()
-		defer s.xport.listenMap.RemoveListener(s.listener)
+		defer s.closeListeners()
 
 		for {
 			select {
-			case msg, ok := <-s.listener.MsgChan:
+			case msg, ok := <-s.msgListener.MsgChan:
 				if ok {
 					s.txvr.DispatchCoap(msg)
 				}
-			case mtu, ok := <-s.listener.MtuChan:
+			case mtu, ok := <-s.tgtListener.MtuChan:
 				if ok {
 					if s.mtu != mtu {
 						log.Debugf("Setting mtu for %s %d",
@@ -130,6 +175,21 @@ func (s *LoraSesn) Open() error {
 	return nil
 }
 
+func (s *LoraSesn) closeListeners() {
+	s.xport.Lock()
+	s.xport.msgMap.RemoveListener(s.msgListener)
+	if s.tgtListener != nil {
+		s.xport.tgtMap.RemoveListener(s.tgtListener)
+	}
+	if s.reassListener != nil {
+		s.reassListener.Close()
+		if s.reassListener.RefCnt == 0 {
+			s.xport.reassMap.RemoveListener(s.reassListener)
+		}
+	}
+	s.xport.Unlock()
+}
+
 func (s *LoraSesn) Close() error {
 	if s.isOpen == false {
 		return nmxutil.NewSesnClosedError(
@@ -140,12 +200,23 @@ func (s *LoraSesn) Close() error {
 	s.txvr.ErrorAll(fmt.Errorf("manual close"))
 	s.txvr.Stop()
 	close(s.stopChan)
-	s.listener.Close()
+	s.msgListener.Close()
+	if s.tgtListener != nil {
+		s.tgtListener.Close()
+	}
 	s.wg.Wait()
-	s.xport.Tx([]byte(fmt.Sprintf("lora/%s/flush", DenormalizeAddr(s.cfg.Lora.Addr))))
+
+	if s.cfg.Lora.Addr != "" {
+		s.xport.Tx([]byte(fmt.Sprintf("lora/%s/flush",
+			DenormalizeAddr(s.cfg.Lora.Addr))))
+	}
 	s.stopChan = nil
 	s.txvr = nil
 
+	if s.cfg.MgmtProto == sesn.MGMT_PROTO_COAP_SERVER {
+		s.closeListeners()
+	}
+
 	return nil
 }
 
@@ -216,7 +287,7 @@ func (s *LoraSesn) sendFragments(b []byte) error {
 		base64.StdEncoding.Encode(seg64, seg.Bytes())
 
 		msg := mtechLoraTx{
-			Port: OIC_LORA_PORT,
+			Port: s.cfg.Lora.Port,
 			Ack:  s.cfg.Lora.ConfirmedTx,
 			Data: string(seg64),
 		}
@@ -241,7 +312,8 @@ func (s *LoraSesn) TxNmpOnce(m *nmp.NmpMsg, opt sesn.TxOptions) (
 	nmp.NmpRsp, error) {
 
 	if !s.IsOpen() {
-		return nil, fmt.Errorf("Attempt to transmit over closed Lora session")
+		return nil, nmxutil.NewSesnClosedError(
+			"Attempt to transmit over closed Lora session")
 	}
 
 	txFunc := func(b []byte) error {
@@ -259,7 +331,8 @@ func (s *LoraSesn) TxCoapOnce(m coap.Message, resType sesn.ResourceType,
 	opt sesn.TxOptions) (coap.COAPCode, []byte, error) {
 
 	if !s.IsOpen() {
-		return 0, nil, fmt.Errorf("Attempt to transmit over closed Lora session")
+		return 0, nil, nmxutil.NewSesnClosedError(
+			"Attempt to transmit over closed Lora session")
 	}
 	txFunc := func(b []byte) error {
 		return s.sendFragments(b)
@@ -287,6 +360,85 @@ func (s *LoraSesn) CoapIsTcp() bool {
 	return false
 }
 
+func (s *LoraSesn) RxAccept() (sesn.Sesn, *sesn.SesnCfg, error) {
+	if !s.isOpen {
+		return nil, nil, nmxutil.NewSesnClosedError(
+			"Attempt to listen for data from closed connection")
+	}
+	if s.cfg.MgmtProto != sesn.MGMT_PROTO_COAP_SERVER {
+		return nil, nil, fmt.Errorf("Invalid operation for %s", s.cfg.MgmtProto)
+	}
+	if s.tgtListener != nil {
+		return nil, nil, fmt.Errorf("RxAccept() only wildcard destinations")
+	}
+
+	s.wg.Add(1)
+	defer s.wg.Done()
+	for {
+		select {
+		case cl_s, ok := <-s.msgListener.ConnChan:
+			if !ok {
+				continue
+			}
+			cl_s.cfg.Lora.ConfirmedTx = s.cfg.Lora.ConfirmedTx
+			cl_s.cfg.Lora.SegSz = s.cfg.Lora.SegSz
+			cl_s.cfg.TxFilterCb = s.cfg.TxFilterCb
+			cl_s.cfg.RxFilterCb = s.cfg.RxFilterCb
+			return cl_s, &cl_s.cfg, nil
+		case <-s.stopChan:
+			return nil, nil, fmt.Errorf("Session closed")
+		}
+	}
+}
+
+func (s *LoraSesn) RxCoap(opt sesn.TxOptions) (coap.Message, error) {
+	if !s.isOpen {
+		return nil, nmxutil.NewSesnClosedError(
+			"Attempt to listen for data from closed connection")
+	}
+	if s.cfg.MgmtProto != sesn.MGMT_PROTO_COAP_SERVER {
+		return nil, fmt.Errorf("Invalid operation for %s", s.cfg.MgmtProto)
+	}
+	if s.tgtListener == nil {
+		return nil, fmt.Errorf("RxCoap() only connected sessions")
+	}
+
+	waitTmoChan := time.After(opt.Timeout)
+	s.wg.Add(1)
+	defer s.wg.Done()
+	for {
+		select {
+		case data, ok := <-s.msgListener.MsgChan:
+			if !ok {
+				continue
+			}
+			msg, err := s.txvr.ProcessCoapReq(data)
+			if err != nil {
+				return nil, err
+			}
+			if msg != nil {
+				return msg, nil
+			}
+		case mtu, ok := <-s.tgtListener.MtuChan:
+			if ok {
+				if s.mtu != mtu {
+					log.Debugf("Setting mtu for %s %d",
+						s.cfg.Lora.Addr, mtu)
+				}
+				s.mtu = mtu
+			}
+		case _, ok := <-waitTmoChan:
+			waitTmoChan = nil
+			if ok {
+				return nil, nmxutil.NewRspTimeoutError(
+					"RxCoap() timed out")
+			}
+		case <-s.stopChan:
+			return nil, fmt.Errorf("Session closed")
+		}
+	}
+}
+
 func (s *LoraSesn) Filters() (nmcoap.MsgFilter, nmcoap.MsgFilter) {
 	return s.txFilterCb, s.rxFilterCb
 }
diff --git a/nmxact/mtech_lora/mtech_lora_xport.go b/nmxact/mtech_lora/mtech_lora_xport.go
index c848f7d9..8985d7cb 100644
--- a/nmxact/mtech_lora/mtech_lora_xport.go
+++ b/nmxact/mtech_lora/mtech_lora_xport.go
@@ -30,6 +30,7 @@ import (
 	"sync"
 
 	log "github.com/Sirupsen/logrus"
+	"github.com/runtimeco/go-coap"
 	"github.com/ugorji/go/codec"
 
 	"mynewt.apache.org/newtmgr/nmxact/lora"
@@ -41,19 +42,22 @@ type LoraConfig struct {
 	Addr        string
 	SegSz       int
 	ConfirmedTx bool
+	Port        uint8
 }
 
 type LoraJoinedCb func(dev LoraConfig)
 
 type LoraXport struct {
 	sync.Mutex
-	cfg       *LoraXportCfg
-	started   bool
-	rxConn    *net.UDPConn
-	txConn    *net.UDPConn
-	listenMap *ListenerMap
-	exitChan  chan int
-	joinCb    LoraJoinedCb
+	cfg      *LoraXportCfg
+	started  bool
+	rxConn   *net.UDPConn
+	txConn   *net.UDPConn
+	msgMap   *ListenerMap
+	reassMap *ListenerMap
+	tgtMap   *ListenerSlice
+	exitChan chan int
+	joinCb   LoraJoinedCb
 }
 
 type LoraXportCfg struct {
@@ -63,7 +67,7 @@ type LoraXportCfg struct {
 
 type LoraData struct {
 	Data string `codec:"data"`
-	Port int    `codec:"port"`
+	Port uint8  `codec:"port"`
 }
 
 type LoraPacketSent struct {
@@ -86,7 +90,6 @@ const MAX_PACKET_SIZE_IN = 2048
 const MAX_PACKET_SIZE_OUT = 256
 const UDP_RX_PORT = 1784
 const UDP_TX_PORT = 1786
-const OIC_LORA_PORT = 0xbb
 
 func NewXportCfg() *LoraXportCfg {
 	return &LoraXportCfg{
@@ -96,9 +99,12 @@ func NewXportCfg() *LoraXportCfg {
 }
 
 func NewLoraXport(cfg *LoraXportCfg) *LoraXport {
+	log.SetLevel(log.DebugLevel)
 	return &LoraXport{
-		cfg:       cfg,
-		listenMap: NewListenerMap(),
+		cfg:      cfg,
+		msgMap:   NewListenerMap(),
+		reassMap: NewListenerMap(),
+		tgtMap:   NewListenerSlice(),
 	}
 }
 
@@ -131,20 +137,50 @@ func DenormalizeAddr(addr string) string {
 }
 
 func (lx *LoraXport) BuildSesn(cfg sesn.SesnCfg) (sesn.Sesn, error) {
-	if cfg.Lora.Addr == "" {
+	if cfg.Lora.Addr == "" && cfg.MgmtProto != sesn.MGMT_PROTO_COAP_SERVER {
 		return nil, fmt.Errorf("Need an address of endpoint")
 	}
 	return NewLoraSesn(cfg, lx)
 }
 
-func (lx *LoraXport) reass(dev string, data []byte) {
+func (lx *LoraXport) acceptServerSesn(sl *Listener, dev string, port uint8) (*LoraSesn, error) {
+	sc := sesn.NewSesnCfg()
+	sc.MgmtProto = sesn.MGMT_PROTO_COAP_SERVER
+	sc.Lora.Addr = dev
+	sc.Lora.Port = port
+
+	ls, err := NewLoraSesn(sc, lx)
+	if err != nil {
+		return nil, fmt.Errorf("NewSesn():%v", err)
+	}
+	err = ls.Open()
+	if err != nil {
+		return nil, fmt.Errorf("Open():%v", err)
+	}
+	sl.ConnChan <- ls
+	return ls, nil
+}
+
+func (lx *LoraXport) reass(dev string, port uint8, data []byte) {
 	lx.Lock()
 	defer lx.Unlock()
 
-	_, l := lx.listenMap.FindListener(dev, "rx")
+	var err error
+	_, l := lx.reassMap.FindListener(dev, port, "reass")
 	if l == nil {
-		log.Debugf("No LoRa listener for %s", dev)
-		return
+		_, l = lx.msgMap.FindListener("", port, "req")
+		if l == nil {
+			log.Debugf("No LoRa listener for %s", dev)
+			return
+		}
+		lx.Unlock()
+		s, err := lx.acceptServerSesn(l, dev, port)
+		lx.Lock()
+		if err != nil {
+			log.Errorf("Cannot create server sesn: %v", err)
+			return
+		}
+		l = s.reassListener
 	}
 
 	str := hex.Dump(data)
@@ -155,7 +191,7 @@ func (lx *LoraXport) reass(dev string, data []byte) {
 	var frag lora.CoapLoraFrag
 	var sFrag lora.CoapLoraFragStart
 
-	err := binary.Read(bufR, binary.LittleEndian, &frag)
+	err = binary.Read(bufR, binary.LittleEndian, &frag)
 	if err != nil {
 		log.Debugf("Can't read header")
 		return
@@ -187,8 +223,35 @@ func (lx *LoraXport) reass(dev string, data []byte) {
 	}
 
 	if (frag.FragNum & lora.COAP_LORA_LAST_FRAG) != 0 {
-		l.MsgChan <- l.Data.Bytes()
+		data := l.Data.Bytes()
 		l.Data.Reset()
+
+		code := coap.COAPCode(data[1])
+		if code <= coap.DELETE {
+			_, l = lx.msgMap.FindListener(dev, port, "req")
+			if l == nil {
+				_, l = lx.msgMap.FindListener("", port, "req")
+				if l == nil {
+					log.Debugf("No LoRa listener for %s", dev)
+					return
+				}
+				lx.Unlock()
+				s, err := lx.acceptServerSesn(l, dev, port)
+				lx.Lock()
+				if err != nil {
+					log.Errorf("Cannot create server sesn: %v", err)
+					return
+				}
+				l = s.msgListener
+			}
+		} else {
+			_, l = lx.msgMap.FindListener(dev, port, "rsp")
+		}
+		if l != nil {
+			l.MsgChan <- data
+		} else {
+			log.Debugf("No LoRa listener for %s", dev)
+		}
 	}
 }
 
@@ -196,15 +259,17 @@ func (lx *LoraXport) dataRateSeen(dev string, dataRate string) {
 	lx.Lock()
 	defer lx.Unlock()
 
-	_, l := lx.listenMap.FindListener(dev, "rx")
-	if l == nil {
+	_, lSlice := lx.tgtMap.FindListener(dev, "rx")
+	if len(lSlice) == 0 {
 		return
 	}
 	mtu, ok := LoraDataRateMapUS[dataRate]
 	if !ok {
 		mtu = lx.minMtu()
 	}
-	l.MtuChan <- mtu
+	for _, l := range lSlice {
+		l.MtuChan <- mtu
+	}
 }
 
 /*
@@ -239,16 +304,12 @@ func (lx *LoraXport) processData(data string) {
 			return
 		}
 
-		if msg.Port != OIC_LORA_PORT {
-			log.Debugf("loraxport rx: ignoring data to port %d", msg.Port)
-			return
-		}
 		dec, err := base64.StdEncoding.DecodeString(msg.Data)
 		if err != nil {
 			log.Debugf("loraxport rx: error decoding base64: %v", err)
 			return
 		}
-		lx.reass(dev, dec)
+		lx.reass(dev, msg.Port, dec)
 	case "packet_sent":
 		var sent LoraPacketSent
 
@@ -285,6 +346,8 @@ func (lx *LoraXport) Start() error {
 	addr, err = net.ResolveUDPAddr("udp",
 		fmt.Sprintf("127.0.0.1:%d", lx.cfg.AppPortDown))
 	if err != nil {
+		lx.rxConn.Close()
+		lx.rxConn = nil
 		return fmt.Errorf("Failure resolving name for UDP session: %s",
 			err.Error())
 	}
@@ -345,6 +408,12 @@ func (lx *LoraXport) Stop() error {
 	return nil
 }
 
+func (lx *LoraXport) DumpListeners() {
+	lx.msgMap.Dump()
+	lx.reassMap.Dump()
+	lx.tgtMap.Dump()
+}
+
 func (lx *LoraXport) Tx(bytes []byte) error {
 	log.Debugf("loraxport tx: %s", bytes)
 	_, err := lx.txConn.Write(bytes)
diff --git a/nmxact/nmble/ble_sesn.go b/nmxact/nmble/ble_sesn.go
index 219554cc..f3753603 100644
--- a/nmxact/nmble/ble_sesn.go
+++ b/nmxact/nmble/ble_sesn.go
@@ -122,6 +122,14 @@ func (s *BleSesn) TxCoapObserve(m coap.Message,
 	return s.Ns.TxCoapObserve(m, resType, opt, NotifCb, stopsignal)
 }
 
+func (s *BleSesn) RxAccept() (sesn.Sesn, *sesn.SesnCfg, error) {
+	return s.Ns.RxAccept()
+}
+
+func (s *BleSesn) RxCoap(opt sesn.TxOptions) (coap.Message, error) {
+	return s.Ns.RxCoap(opt)
+}
+
 func (s *BleSesn) Filters() (nmcoap.MsgFilter, nmcoap.MsgFilter) {
 	return s.Ns.Filters()
 }
diff --git a/nmxact/nmble/naked_sesn.go b/nmxact/nmble/naked_sesn.go
index fa49e927..e1caf532 100644
--- a/nmxact/nmble/naked_sesn.go
+++ b/nmxact/nmble/naked_sesn.go
@@ -733,6 +733,14 @@ func (s *NakedSesn) ensureSecurity(encReqd bool, authReqd bool) error {
 	return nil
 }
 
+func (s *NakedSesn) RxAccept() (sesn.Sesn, *sesn.SesnCfg, error) {
+	return nil, nil, fmt.Errorf("Op not implemented yet")
+}
+
+func (s *NakedSesn) RxCoap(opt sesn.TxOptions) (coap.Message, error) {
+	return nil, fmt.Errorf("Op not implemented yet")
+}
+
 func (s *NakedSesn) Filters() (nmcoap.MsgFilter, nmcoap.MsgFilter) {
 	return s.txFilterCb, s.rxFilterCb
 }
diff --git a/nmxact/nmcoap/dispatch.go b/nmxact/nmcoap/dispatch.go
index 7dfb4f89..06ab2239 100644
--- a/nmxact/nmcoap/dispatch.go
+++ b/nmxact/nmcoap/dispatch.go
@@ -176,6 +176,14 @@ func (d *Dispatcher) Dispatch(data []byte) bool {
 	return d.dispatchRsp(ot, m)
 }
 
+func (d *Dispatcher) ProcessCoapReq(data []byte) (coap.Message, error) {
+	m := d.rxer.Rx(data)
+	if m == nil {
+		return nil, nil
+	}
+	return m, nil
+}
+
 func (d *Dispatcher) ErrorOne(token Token, err error) error {
 	d.mtx.Lock()
 	defer d.mtx.Unlock()
diff --git a/nmxact/nmserial/serial_sesn.go b/nmxact/nmserial/serial_sesn.go
index e2fad788..76443c77 100644
--- a/nmxact/nmserial/serial_sesn.go
+++ b/nmxact/nmserial/serial_sesn.go
@@ -22,6 +22,7 @@ package nmserial
 import (
 	"fmt"
 	"sync"
+	"time"
 
 	"github.com/runtimeco/go-coap"
 
@@ -40,9 +41,14 @@ type SerialSesn struct {
 	isOpen bool
 
 	// This mutex ensures:
-	//     * each response get matched up with its corresponding request.
 	//     * accesses to isOpen are protected.
-	m sync.Mutex
+	m  sync.Mutex
+	wg sync.WaitGroup
+
+	errChan  chan error
+	msgChan  chan []byte
+	connChan chan *SerialSesn
+	stopChan chan struct{}
 
 	txFilterCb nmcoap.MsgFilter
 	rxFilterCb nmcoap.MsgFilter
@@ -67,35 +73,103 @@ func NewSerialSesn(sx *SerialXport, cfg sesn.SesnCfg) (*SerialSesn, error) {
 
 func (s *SerialSesn) Open() error {
 	s.m.Lock()
-	defer s.m.Unlock()
 
 	if s.isOpen {
+		s.m.Unlock()
 		return nmxutil.NewSesnAlreadyOpenError(
 			"Attempt to open an already-open serial session")
 	}
 
-	txvr, err := mgmt.NewTransceiver(s.cfg.TxFilterCb, s.cfg.RxFilterCb, false, s.cfg.MgmtProto, 3)
+	txvr, err := mgmt.NewTransceiver(s.cfg.TxFilterCb, s.cfg.RxFilterCb, false,
+		s.cfg.MgmtProto, 3)
 	if err != nil {
+		s.m.Unlock()
 		return err
 	}
 	s.txvr = txvr
+	s.errChan = make(chan error)
+	s.msgChan = make(chan []byte, 16)
+	s.connChan = make(chan *SerialSesn, 4)
+	s.stopChan = make(chan struct{})
 
 	s.isOpen = true
+	s.m.Unlock()
+	if s.cfg.MgmtProto == sesn.MGMT_PROTO_COAP_SERVER {
+		return nil
+	}
+	s.wg.Add(1)
+	go func() {
+		defer s.wg.Done()
+
+		for {
+			select {
+			case msg, ok := <-s.msgChan:
+				if !ok {
+					continue
+				}
+				if s.cfg.MgmtProto == sesn.MGMT_PROTO_OMP {
+					s.txvr.DispatchCoap(msg)
+				} else if s.cfg.MgmtProto == sesn.MGMT_PROTO_NMP {
+					s.txvr.DispatchNmpRsp(msg)
+				}
+			case <-s.errChan:
+				// XXX pass it on
+			case <-s.stopChan:
+				return
+			}
+		}
+	}()
 	return nil
 }
 
 func (s *SerialSesn) Close() error {
 	s.m.Lock()
-	defer s.m.Unlock()
 
 	if !s.isOpen {
+		s.m.Unlock()
 		return nmxutil.NewSesnClosedError(
 			"Attempt to close an unopened serial session")
 	}
 
+	s.isOpen = false
 	s.txvr.ErrorAll(fmt.Errorf("closed"))
 	s.txvr.Stop()
-	s.isOpen = false
+	close(s.stopChan)
+	close(s.connChan)
+	s.sx.Lock()
+	if s == s.sx.acceptSesn {
+		s.sx.acceptSesn = nil
+
+	}
+	if s == s.sx.reqSesn {
+		s.sx.reqSesn = nil
+	}
+	s.sx.Unlock()
+	s.m.Unlock()
+
+	s.wg.Wait()
+	s.stopChan = nil
+	s.txvr = nil
+
+	for {
+		s, ok := <-s.connChan
+		if !ok {
+			break
+		}
+		s.Close()
+	}
+	close(s.msgChan)
+	for {
+		if _, ok := <-s.msgChan; !ok {
+			break
+		}
+	}
+	close(s.errChan)
+	for {
+		if _, ok := <-s.errChan; !ok {
+			break
+		}
+	}
 
 	return nil
 }
@@ -125,26 +199,20 @@ func (s *SerialSesn) AbortRx(seq uint8) error {
 func (s *SerialSesn) TxNmpOnce(m *nmp.NmpMsg, opt sesn.TxOptions) (
 	nmp.NmpRsp, error) {
 
-	s.m.Lock()
-	defer s.m.Unlock()
-
 	if !s.isOpen {
 		return nil, nmxutil.NewSesnClosedError(
 			"Attempt to transmit over closed serial session")
 	}
 
 	txFn := func(b []byte) error {
-		if err := s.sx.Tx(b); err != nil {
-			return err
-		}
+		return s.sx.Tx(b)
+	}
 
-		rsp, err := s.sx.Rx()
-		if err != nil {
-			return err
-		}
-		s.txvr.DispatchNmpRsp(rsp)
-		return nil
+	err := s.sx.setRspSesn(s)
+	if err != nil {
+		return nil, err
 	}
+	defer s.sx.setRspSesn(nil)
 
 	return s.txvr.TxNmp(txFn, m, s.MtuOut(), opt.Timeout)
 }
@@ -152,18 +220,20 @@ func (s *SerialSesn) TxNmpOnce(m *nmp.NmpMsg, opt sesn.TxOptions) (
 func (s *SerialSesn) TxCoapOnce(m coap.Message, resType sesn.ResourceType,
 	opt sesn.TxOptions) (coap.COAPCode, []byte, error) {
 
+	if !s.isOpen {
+		return 0, nil, nmxutil.NewSesnClosedError(
+			"Attempt to transmit over closed serial session")
+	}
+
 	txFn := func(b []byte) error {
-		if err := s.sx.Tx(b); err != nil {
-			return err
-		}
+		return s.sx.Tx(b)
+	}
 
-		rsp, err := s.sx.Rx()
-		if err != nil {
-			return err
-		}
-		s.txvr.DispatchCoap(rsp)
-		return nil
+	err := s.sx.setRspSesn(s)
+	if err != nil {
+		return 0, nil, err
 	}
+	defer s.sx.setRspSesn(nil)
 
 	rsp, err := s.txvr.TxOic(txFn, m, s.MtuOut(), opt.Timeout)
 	if err != nil {
@@ -188,6 +258,82 @@ func (s *SerialSesn) CoapIsTcp() bool {
 	return false
 }
 
+func (s *SerialSesn) RxAccept() (sesn.Sesn, *sesn.SesnCfg, error) {
+	if !s.isOpen {
+		return nil, nil, nmxutil.NewSesnClosedError(
+			"Attempt to listen for data from closed connection")
+	}
+	if s.cfg.MgmtProto != sesn.MGMT_PROTO_COAP_SERVER {
+		return nil, nil, fmt.Errorf("Invalid operation for %s", s.cfg.MgmtProto)
+	}
+
+	err := s.sx.setAcceptSesn(s)
+	if err != nil {
+		return nil, nil, err
+	}
+
+	s.wg.Add(1)
+	defer s.wg.Done()
+	for {
+		select {
+		case cl_s, ok := <-s.connChan:
+			if !ok {
+				continue
+			}
+			return cl_s, &cl_s.cfg, nil
+		case <-s.stopChan:
+			return nil, nil, fmt.Errorf("Session closed")
+		}
+	}
+}
+
+func (s *SerialSesn) RxCoap(opt sesn.TxOptions) (coap.Message, error) {
+	if !s.isOpen {
+		return nil, nmxutil.NewSesnClosedError(
+			"Attempt to listen for data from closed connection")
+	}
+	if s.cfg.MgmtProto != sesn.MGMT_PROTO_COAP_SERVER {
+		return nil, fmt.Errorf("Invalid operation for %s", s.cfg.MgmtProto)
+	}
+	if s.sx.reqSesn != s {
+		return nil, fmt.Errorf("Invalid operation")
+	}
+	waitTmoChan := time.After(opt.Timeout)
+	s.wg.Add(1)
+	defer s.wg.Done()
+	for {
+		select {
+		case data, ok := <-s.msgChan:
+			if !ok {
+				continue
+			}
+			msg, err := s.txvr.ProcessCoapReq(data)
+			if err != nil {
+				return nil, err
+			}
+			if msg != nil {
+				return msg, nil
+			}
+		case _, ok := <-waitTmoChan:
+			waitTmoChan = nil
+			if ok {
+				return nil, nmxutil.NewRspTimeoutError(
+					"RxCoap() timed out")
+			}
+		case err, ok := <-s.errChan:
+			if !ok {
+				continue
+			}
+			if err == errTimeout {
+				continue
+			}
+			return nil, err
+		case <-s.stopChan:
+			return nil, fmt.Errorf("Session closed")
+		}
+	}
+}
+
 func (s *SerialSesn) Filters() (nmcoap.MsgFilter, nmcoap.MsgFilter) {
 	return s.txFilterCb, s.rxFilterCb
 }
diff --git a/nmxact/nmserial/serial_xport.go b/nmxact/nmserial/serial_xport.go
index 09a94c00..5afd0718 100644
--- a/nmxact/nmserial/serial_xport.go
+++ b/nmxact/nmserial/serial_xport.go
@@ -24,15 +24,17 @@ import (
 	"encoding/base64"
 	"encoding/binary"
 	"encoding/hex"
+	"errors"
 	"fmt"
+	"sync"
 	"time"
 
 	log "github.com/Sirupsen/logrus"
 	"github.com/joaojeronimo/go-crc16"
+	"github.com/runtimeco/go-coap"
 	"github.com/tarm/serial"
 
 	"mynewt.apache.org/newt/util"
-	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
 )
 
@@ -43,10 +45,12 @@ type XportCfg struct {
 	ReadTimeout time.Duration
 }
 
+var errTimeout error = errors.New("Timeout reading from serial connection")
+
 func NewXportCfg() *XportCfg {
 	return &XportCfg{
 		ReadTimeout: 10 * time.Second,
-		Mtu: 512,
+		Mtu:         512,
 	}
 }
 
@@ -55,6 +59,14 @@ type SerialXport struct {
 	port    *serial.Port
 	scanner *bufio.Scanner
 
+	wg sync.WaitGroup
+	sync.Mutex
+	closing bool
+
+	reqSesn    *SerialSesn
+	acceptSesn *SerialSesn
+	rspSesn    *SerialSesn
+
 	pkt *Packet
 }
 
@@ -68,6 +80,26 @@ func (sx *SerialXport) BuildSesn(cfg sesn.SesnCfg) (sesn.Sesn, error) {
 	return NewSerialSesn(sx, cfg)
 }
 
+func (sx *SerialXport) acceptServerSesn(sl *SerialSesn) (*SerialSesn, error) {
+	sc := sesn.NewSesnCfg()
+	sc.MgmtProto = sesn.MGMT_PROTO_COAP_SERVER
+	sc.TxFilterCb = sl.cfg.TxFilterCb
+	sc.RxFilterCb = sl.cfg.RxFilterCb
+
+	s, err := NewSerialSesn(sx, sc)
+	if err != nil {
+		return nil, fmt.Errorf("NewSesn():%v", err)
+	}
+	err = s.Open()
+	if err != nil {
+		return nil, fmt.Errorf("Open():%v", err)
+	}
+	sl.connChan <- s
+	sx.setReqSesn(s)
+
+	return s, nil
+}
+
 func (sx *SerialXport) Start() error {
 	c := &serial.Config{
 		Name:        sx.cfg.DevPath,
@@ -86,15 +118,109 @@ func (sx *SerialXport) Start() error {
 		return err
 	}
 
-	// Most of the reading will be done line by line, use the
-	// bufio.Scanner to do this
-	sx.scanner = bufio.NewScanner(sx.port)
+	sx.wg.Add(1)
+	go func() {
+		defer sx.wg.Done()
 
+		// Most of the reading will be done line by line, use the
+		// bufio.Scanner to do this
+		sx.scanner = bufio.NewScanner(sx.port)
+
+		for {
+			msg, err := sx.Rx()
+			sx.Lock()
+			if err != nil {
+				if sx.rspSesn != nil {
+					sx.rspSesn.errChan <- err
+				}
+			}
+			if sx.closing {
+				sx.Unlock()
+				return
+			}
+			if msg == nil {
+				sx.Unlock()
+				continue
+			}
+			if len(msg) >= 4 {
+				if sx.reqSesn != nil || sx.acceptSesn != nil {
+					msgType := coap.COAPCode(msg[1])
+					if msgType <= coap.DELETE {
+						if sx.reqSesn != nil {
+							sx.reqSesn.msgChan <- msg
+							sx.Unlock()
+							continue
+						}
+						if sx.acceptSesn != nil {
+							s, err := sx.acceptServerSesn(
+								sx.acceptSesn)
+							if err != nil {
+								log.Errorf("Cannot create server sesn: %v", err)
+								sx.Unlock()
+								continue
+							}
+							s.msgChan <- msg
+							sx.Unlock()
+							continue
+						}
+					}
+				}
+			}
+			if sx.rspSesn != nil {
+				sx.rspSesn.msgChan <- msg
+			}
+			sx.Unlock()
+		}
+
+	}()
+	return nil
+}
+
+func (sx *SerialXport) setRspSesn(s *SerialSesn) error {
+	sx.Lock()
+	defer sx.Unlock()
+
+	if sx.closing {
+		return fmt.Errorf("Transport closed")
+	}
+	if s != nil && sx.rspSesn != nil {
+		return fmt.Errorf("Transport busy")
+	}
+	sx.rspSesn = s
+	return nil
+}
+
+func (sx *SerialXport) setAcceptSesn(s *SerialSesn) error {
+	sx.Lock()
+	defer sx.Unlock()
+
+	if sx.closing {
+		return fmt.Errorf("Transport closed")
+	}
+	if sx.acceptSesn != nil && s != sx.acceptSesn {
+		return fmt.Errorf("Transport busy")
+	}
+	sx.acceptSesn = s
+	return nil
+}
+
+func (sx *SerialXport) setReqSesn(s *SerialSesn) error {
+	if sx.closing {
+		return fmt.Errorf("Transport closed")
+	}
+	if sx.reqSesn != nil && s != sx.reqSesn {
+		return fmt.Errorf("Transport busy")
+	}
+	sx.reqSesn = s
 	return nil
 }
 
 func (sx *SerialXport) Stop() error {
-	return sx.port.Close()
+	sx.closing = true
+
+	err := sx.port.Close()
+	sx.wg.Wait()
+	return err
 }
 
 func (sx *SerialXport) txRaw(bytes []byte) error {
@@ -226,8 +352,7 @@ func (sx *SerialXport) Rx() ([]byte, error) {
 	if err == nil {
 		// Scanner hit EOF, so we'll need to create a new one.  This only
 		// happens on timeouts.
-		err = nmxutil.NewXportError(
-			"Timeout reading from serial connection")
+		err = errTimeout
 		sx.scanner = bufio.NewScanner(sx.port)
 	}
 	return nil, err
diff --git a/nmxact/omp/dispatch.go b/nmxact/omp/dispatch.go
index cea579b3..4e7088b5 100644
--- a/nmxact/omp/dispatch.go
+++ b/nmxact/omp/dispatch.go
@@ -24,6 +24,7 @@ import (
 	"sync/atomic"
 
 	log "github.com/Sirupsen/logrus"
+        "github.com/runtimeco/go-coap"
 
 	"mynewt.apache.org/newtmgr/nmxact/nmcoap"
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
@@ -109,6 +110,10 @@ func (d *Dispatcher) Dispatch(data []byte) bool {
 	return d.oicd.Dispatch(data)
 }
 
+func (d *Dispatcher) ProcessCoapReq(data []byte) (coap.Message, error) {
+	return d.oicd.ProcessCoapReq(data)
+}
+
 func (d *Dispatcher) AddOicListener(token []byte) (*nmcoap.Listener, error) {
 	return d.oicd.AddListener(token)
 }
diff --git a/nmxact/omp/omp.go b/nmxact/omp/omp.go
index 7dbf1afe..bd8b1085 100644
--- a/nmxact/omp/omp.go
+++ b/nmxact/omp/omp.go
@@ -48,7 +48,8 @@ type OicMsg struct {
 func DecodeOmp(m coap.Message, rxFilterCb nmcoap.MsgFilter) (nmp.NmpRsp, error) {
 
 	// Ignore non-responses.
-	if m.Code() == coap.GET || m.Code() == coap.PUT || m.Code() == coap.POST {
+	if m.Code() == coap.GET || m.Code() == coap.PUT || m.Code() == coap.POST ||
+		m.Code() == coap.DELETE {
 		return nil, nil
 	}
 
diff --git a/nmxact/sesn/sesn.go b/nmxact/sesn/sesn.go
index 8e1bbba7..7bd1f40f 100644
--- a/nmxact/sesn/sesn.go
+++ b/nmxact/sesn/sesn.go
@@ -91,6 +91,9 @@ type Sesn interface {
 
 	// XXX AbortResource(seq uint8) error
 
+	RxAccept() (Sesn, *SesnCfg, error)
+	RxCoap(opt TxOptions) (coap.Message, error)
+
 	////// Internal to nmxact:
 
 	// Performs a blocking transmit a single NMP message and listens for the
diff --git a/nmxact/sesn/sesn_cfg.go b/nmxact/sesn/sesn_cfg.go
index 5f753abf..32003833 100644
--- a/nmxact/sesn/sesn_cfg.go
+++ b/nmxact/sesn/sesn_cfg.go
@@ -24,6 +24,7 @@ import (
 	"time"
 
 	"mynewt.apache.org/newtmgr/nmxact/bledefs"
+	"mynewt.apache.org/newtmgr/nmxact/lora"
 	"mynewt.apache.org/newtmgr/nmxact/nmcoap"
 )
 
@@ -32,8 +33,19 @@ type MgmtProto int
 const (
 	MGMT_PROTO_NMP MgmtProto = iota
 	MGMT_PROTO_OMP
+	MGMT_PROTO_COAP_SERVER
 )
 
+var mgmtProtoMap = map[MgmtProto]string{
+	MGMT_PROTO_NMP:         "nmp",
+	MGMT_PROTO_OMP:         "omp",
+	MGMT_PROTO_COAP_SERVER: "coapserver",
+}
+
+func (r MgmtProto) String() string {
+	return mgmtProtoMap[r]
+}
+
 type ResourceType int
 
 const (
@@ -90,6 +102,7 @@ type SesnCfgLora struct {
 	Addr        string
 	SegSz       int
 	ConfirmedTx bool
+	Port        uint8
 }
 
 type SesnCfg struct {
@@ -124,6 +137,7 @@ func NewSesnCfg() SesnCfg {
 		},
 		Lora: SesnCfgLora{
 			ConfirmedTx: false,
+			Port:        lora.COAP_LORA_PORT,
 		},
 	}
 }
diff --git a/nmxact/udp/udp_sesn.go b/nmxact/udp/udp_sesn.go
index 5532a476..6f4a7922 100644
--- a/nmxact/udp/udp_sesn.go
+++ b/nmxact/udp/udp_sesn.go
@@ -170,6 +170,14 @@ func (s *UdpSesn) CoapIsTcp() bool {
 	return false
 }
 
+func (s *UdpSesn) RxAccept() (sesn.Sesn, *sesn.SesnCfg, error) {
+	return nil, nil, fmt.Errorf("Op not implemented yet")
+}
+
+func (s *UdpSesn) RxCoap(opt sesn.TxOptions) (coap.Message, error) {
+	return nil, fmt.Errorf("Op not implemented yet")
+}
+
 func (s *UdpSesn) Filters() (nmcoap.MsgFilter, nmcoap.MsgFilter) {
 	return s.txFilterCb, s.rxFilterCb
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services