You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/12 19:50:22 UTC
qpid-proton git commit: PROTON-1045: Use of callbacks to handle
accepted endpoints violates design goals.
Repository: qpid-proton
Updated Branches:
refs/heads/master f3edf5752 -> 91fe6e0fd
PROTON-1045: Use of callbacks to handle accepted endpoints violates design goals.
Get rid of use of callback functions that run in proton goroutine.
Use a channel instead for Incoming requests, all user code runs in user goroutines.
Safer and more consistent.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/91fe6e0f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/91fe6e0f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/91fe6e0f
Branch: refs/heads/master
Commit: 91fe6e0fda07efa07f10a97d906eee114c86d6bf
Parents: f3edf57
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 11 10:22:01 2015 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Nov 11 21:54:25 2015 -0500
----------------------------------------------------------------------
examples/go/electron/broker.go | 35 ++++--
.../src/qpid.apache.org/electron/connection.go | 85 ++++++++------
.../go/src/qpid.apache.org/electron/handler.go | 43 ++++---
.../go/src/qpid.apache.org/electron/link.go | 36 +++---
.../qpid.apache.org/electron/messaging_test.go | 116 +++++++++++--------
.../go/src/qpid.apache.org/electron/receiver.go | 23 ++--
.../go/src/qpid.apache.org/electron/sender.go | 15 +--
.../go/src/qpid.apache.org/electron/session.go | 26 +++--
.../go/src/qpid.apache.org/proton/wrappers.go | 17 +++
9 files changed, 233 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index f1dce17..f68deb1 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -78,26 +78,43 @@ func (b *broker) run() error {
util.Debugf("Accept error: %v", err)
continue
}
- c, err := b.container.Connection(conn, electron.Server(), electron.Accepter(b.accept))
+ c, err := b.container.Connection(conn, electron.Server(), electron.AllowIncoming())
if err != nil {
util.Debugf("Connection error: %v", err)
continue
}
+ go b.accept(c) // Goroutine to accept incoming sessions and links.
util.Debugf("Accepted %v", c)
}
}
// accept remotely-opened endpoints (Session, Sender and Receiver)
// and start goroutines to service them.
-func (b *broker) accept(i electron.Incoming) {
- switch i := i.(type) {
- case *electron.IncomingSender:
- go b.sender(i.AcceptSender())
- case *electron.IncomingReceiver:
- go b.receiver(i.AcceptReceiver(100, true)) // Pre-fetch 100 messages
- default:
- i.Accept()
+func (b *broker) accept(c electron.Connection) {
+ for in := range c.Incoming() {
+ switch in := in.(type) {
+
+ case *electron.IncomingSender:
+ if in.Source() == "" {
+ util.Debugf("sender has no source: %s", in)
+ break
+ }
+ go b.sender(in.Accept().(electron.Sender))
+
+ case *electron.IncomingReceiver:
+ if in.Target() == "" {
+ util.Debugf("receiver has no target: %s", in)
+ break
+ }
+ in.SetPrefetch(true)
+ in.SetCapacity(*credit) // Pre-fetch up to credit window.
+ go b.receiver(in.Accept().(electron.Receiver))
+
+ default:
+ in.Accept() // Accept sessions unconditionally
+ }
}
+ util.Debugf("incoming closed: %s", c)
}
// sender pops messages from a queue and sends them.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 8573f28..3462b92 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -24,7 +24,6 @@ import "C"
import (
"net"
- "qpid.apache.org/amqp"
"qpid.apache.org/proton"
"sync"
"time"
@@ -58,12 +57,30 @@ type Connection interface {
// WaitTimeout is like Wait but returns Timeout if the timeout expires.
WaitTimeout(time.Duration) error
+
+ // Incoming returns a channel for incoming endpoints opened by the remote end.
+ //
+ // To enable, pass AllowIncoming() when creating the Connection. Otherwise all
+ // incoming endpoint requests are automatically rejected and Incoming()
+ // returns nil.
+ //
+ // An Incoming value can be an *IncomingSession, *IncomingSender or
+ // *IncomingReceiver. You must call Accept() to open the endpoint or Reject()
+ // to close it with an error. The specific Incoming types have additional
+ // methods to configure the endpoint.
+ //
+ // Delay in receiving from Incoming() or calling Accept/Reject will block
+ // proton. Normally you should have a dedicated goroutine receive from this
+ // channel and start a new goroutine to serve each endpoint accepted. The
+ // channel is closed when the Connection closes.
+ //
+ Incoming() <-chan Incoming
}
-// ConnectionOption can be passed when creating a connection.
+// ConnectionOption can be passed when creating a connection to configure various options
type ConnectionOption func(*connection)
-// Server setting puts the connection in server mode.
+// Server returns a ConnectionOption to put the connection in server mode.
//
// A server connection will do protocol negotiation to accept a incoming AMQP
// connection. Normally you would call this for a connection created by
@@ -71,24 +88,19 @@ type ConnectionOption func(*connection)
//
func Server() ConnectionOption { return func(c *connection) { c.engine.Server() } }
-// Accepter provides a function to be called when a connection receives an incoming
-// request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver.
-//
-// The accept() function must not block or use the accepted endpoint.
-// It can pass the endpoint to another goroutine for processing.
-//
-// By default all incoming endpoints are rejected.
-func Accepter(accept func(Incoming)) ConnectionOption {
- return func(c *connection) { c.accept = accept }
+// AllowIncoming returns a ConnectionOption to enable incoming endpoint open requests.
+// See Connection.Incoming()
+func AllowIncoming() ConnectionOption {
+ return func(c *connection) { c.incoming = make(chan Incoming) }
}
type connection struct {
endpoint
- listenOnce, defaultSessionOnce, closeOnce sync.Once
+ defaultSessionOnce, closeOnce sync.Once
container *container
conn net.Conn
- accept func(Incoming)
+ incoming chan Incoming
handler *handler
engine *proton.Engine
err proton.ErrorHolder
@@ -99,7 +111,7 @@ type connection struct {
}
func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) {
- c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})}
+ c := &connection{container: cont, conn: conn, done: make(chan struct{})}
c.handler = newHandler(c)
var err error
c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
@@ -173,38 +185,45 @@ func (c *connection) WaitTimeout(timeout time.Duration) error {
return c.Error()
}
+func (c *connection) Incoming() <-chan Incoming { return c.incoming }
+
// Incoming is the interface for incoming requests to open an endpoint.
// Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
type Incoming interface {
- // Accept the endpoint with default settings.
- //
- // You must not use the returned endpoint in the accept() function that
- // receives the Incoming value, but you can pass it to other goroutines.
- //
- // Implementing types provide type-specific Accept functions that take additional settings.
+ // Accept and open the endpoint.
Accept() Endpoint
// Reject the endpoint with an error
Reject(error)
- error(string, ...interface{}) error
+ // wait for and call the accept function, call in proton goroutine.
+ wait() error
+ pEndpoint() proton.Endpoint
}
-// Common state for incoming endpoints, record accept or reject error.
type incoming struct {
- err error
- accepted bool
+ endpoint proton.Endpoint
+ acceptCh chan func() error
}
-func (i *incoming) Reject(err error) { i.err = err }
+func makeIncoming(e proton.Endpoint) incoming {
+ return incoming{endpoint: e, acceptCh: make(chan func() error)}
+}
+
+func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } }
+
+// Call in proton goroutine, wait for and call the accept function fr
+func (in *incoming) wait() error { return (<-in.acceptCh)() }
+
+func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint }
-func (i *incoming) error(fmt string, arg ...interface{}) error {
- switch {
- case i.err != nil:
- return i.err
- case !i.accepted:
- return amqp.Errorf(amqp.NotAllowed, fmt, arg...)
- default:
+// Called in app goroutine to send an accept function to proton and return the resulting endpoint.
+func (in *incoming) accept(f func() Endpoint) Endpoint {
+ done := make(chan Endpoint)
+ in.acceptCh <- func() error {
+ ep := f()
+ done <- ep
return nil
}
+ return <-done
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 754a221..1b1164c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -49,9 +49,8 @@ func newHandler(c *connection) *handler {
h.delegator.AutoOpen = false
return h
}
-
-func (h *handler) internalError(ep proton.Endpoint, msg string) {
- proton.CloseError(ep, amqp.Errorf(amqp.InternalError, "%s %s", msg, ep))
+func (h *handler) linkError(l proton.Link, msg string) {
+ proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l))
}
func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
@@ -61,7 +60,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
if r, ok := h.links[e.Link()].(*receiver); ok {
r.message(e.Delivery())
} else {
- h.internalError(e.Link(), "no receiver for link")
+ h.linkError(e.Link(), "no receiver")
}
case proton.MSettled:
@@ -73,16 +72,12 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
if s, ok := h.links[e.Link()].(*sender); ok {
s.sendable()
} else {
- h.internalError(e.Link(), "no sender for link")
+ h.linkError(e.Link(), "no sender")
}
case proton.MSessionOpening:
if e.Session().State().LocalUninit() { // Remotely opened
- incoming := &IncomingSession{h: h, pSession: e.Session()}
- h.connection.accept(incoming)
- if err := incoming.error("rejected session %s", e.Session()); err != nil {
- proton.CloseError(e.Session(), err)
- }
+ h.incoming(newIncomingSession(h, e.Session()))
}
case proton.MSessionClosed:
@@ -101,19 +96,13 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
}
ss := h.sessions[l.Session()]
if ss == nil {
- h.internalError(e.Link(), "no session for link")
+ h.linkError(e.Link(), "no session")
break
}
- var incoming Incoming
if l.IsReceiver() {
- incoming = &IncomingReceiver{makeIncomingLink(ss, l)}
+ h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)})
} else {
- incoming = &IncomingSender{makeIncomingLink(ss, l)}
- }
- h.connection.accept(incoming)
- if err := incoming.error("rejected link %s", e.Link()); err != nil {
- proton.CloseError(l, err)
- break
+ h.incoming(&IncomingSender{makeIncomingLink(ss, l)})
}
case proton.MLinkClosing:
@@ -146,6 +135,22 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
}
}
+func (h *handler) incoming(in Incoming) {
+ var err error
+ if h.connection.incoming != nil {
+ h.connection.incoming <- in
+ err = in.wait()
+ } else {
+ err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s",
+ in.pEndpoint().Type(), in.pEndpoint().String())
+ }
+ if err == nil {
+ in.pEndpoint().Open()
+ } else {
+ proton.CloseError(in.pEndpoint(), err)
+ }
+}
+
func (h *handler) linkClosed(l proton.Link, err error) {
if link := h.links[l]; link != nil {
link.closed(err)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 4835cb9..cadc2c1 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -60,36 +60,37 @@ type Link interface {
open()
}
-// LinkOption can be passed when creating a sender or receiver link.
+// LinkOption can be passed when creating a sender or receiver link to set optional configuration.
type LinkOption func(*link)
-// Source sets address that messages are coming from.
+// Source returns a LinkOption that sets address that messages are coming from.
func Source(s string) LinkOption { return func(l *link) { l.source = s } }
-// Target sets address that messages are going to.
+// Target returns a LinkOption that sets address that messages are going to.
func Target(s string) LinkOption { return func(l *link) { l.target = s } }
-// LinkName sets the link name.
+// LinkName returns a LinkOption that sets the link name.
func LinkName(s string) LinkOption { return func(l *link) { l.target = s } }
-// SndSettle sets the send settle mode
+// SndSettle returns a LinkOption that sets the send settle mode
func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { l.sndSettle = m } }
-// RcvSettle sets the send settle mode
+// RcvSettle returns a LinkOption that sets the send settle mode
func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { l.rcvSettle = m } }
-// SndSettleMode defines when the sending end of the link settles message delivery.
+// SndSettleMode returns a LinkOption that defines when the sending end of the
+// link settles message delivery.
type SndSettleMode proton.SndSettleMode
-// Capacity sets the link capacity
+// Capacity returns a LinkOption that sets the link capacity
func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } }
-// Prefetch sets a receivers pre-fetch flag. Not relevant for a sender.
+// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.
func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } }
-// AtMostOnce sets "fire and forget" mode, messages are sent but no
-// acknowledgment is received, messages can be lost if there is a network
-// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
+// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
+// are sent but no acknowledgment is received, messages can be lost if there is
+// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
func AtMostOnce() LinkOption {
return func(l *link) {
SndSettle(SndSettled)(l)
@@ -97,11 +98,11 @@ func AtMostOnce() LinkOption {
}
}
-// AtLeastOnce requests acknowledgment for every message, acknowledgment
-// indicates the message was definitely received. In the event of a
-// failure, unacknowledged messages can be re-sent but there is a chance
-// that the message will be received twice in this case.
-// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
+// AtLeastOnce returns a LinkOption that requests acknowledgment for every
+// message, acknowledgment indicates the message was definitely received. In the
+// event of a failure, unacknowledged messages can be re-sent but there is a
+// chance that the message will be received twice in this case. Sets
+// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
func AtLeastOnce() LinkOption {
return func(l *link) {
SndSettle(SndUnsettled)(l)
@@ -200,6 +201,7 @@ type incomingLink struct {
// Set up a link from an incoming proton.Link.
func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
l := incomingLink{
+ incoming: makeIncoming(eLink),
link: link{
session: sn,
isSender: eLink.IsSender(),
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
index c7ff290..3b5a062 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
@@ -39,22 +39,22 @@ func fatalIf(t *testing.T, err error) {
}
}
-// Start a server, return listening addr and channel for incoming Connection.
-func newServer(t *testing.T, cont Container, accept func(Incoming)) (net.Addr, <-chan Connection) {
+// Start a server, return listening addr and channel for incoming Connections.
+func newServer(t *testing.T, cont Container) (net.Addr, <-chan Connection) {
listener, err := net.Listen("tcp", "")
fatalIf(t, err)
addr := listener.Addr()
ch := make(chan Connection)
go func() {
conn, err := listener.Accept()
- c, err := cont.Connection(conn, Server(), Accepter(accept))
+ c, err := cont.Connection(conn, Server(), AllowIncoming())
fatalIf(t, err)
ch <- c
}()
return addr, ch
}
-// Return open an client connection and session, return the session.
+// Open a client connection and session, return the session.
func newClient(t *testing.T, cont Container, addr net.Addr) Session {
conn, err := net.Dial(addr.Network(), addr.String())
fatalIf(t, err)
@@ -66,8 +66,8 @@ func newClient(t *testing.T, cont Container, addr net.Addr) Session {
}
// Return client and server ends of the same connection.
-func newClientServer(t *testing.T, accept func(Incoming)) (client Session, server Connection) {
- addr, ch := newServer(t, NewContainer("test-server"), accept)
+func newClientServer(t *testing.T) (client Session, server Connection) {
+ addr, ch := newServer(t, NewContainer("test-server"))
client = newClient(t, NewContainer("test-client"), addr)
return client, <-ch
}
@@ -85,19 +85,22 @@ func TestClientSendServerReceive(t *testing.T) {
nMessages := 3
rchan := make(chan Receiver, nLinks)
- client, server := newClientServer(t, func(i Incoming) {
- switch i := i.(type) {
- case *IncomingReceiver:
- rchan <- i.AcceptReceiver(1, false)
- default:
- i.Accept()
+ client, server := newClientServer(t)
+ go func() {
+ for in := range server.Incoming() {
+ switch in := in.(type) {
+ case *IncomingReceiver:
+ in.SetCapacity(1)
+ in.SetPrefetch(false)
+ rchan <- in.Accept().(Receiver)
+ default:
+ in.Accept()
+ }
}
- })
-
- defer func() {
- closeClientServer(client, server)
}()
+ defer func() { closeClientServer(client, server) }()
+
s := make([]Sender, nLinks)
for i := 0; i < nLinks; i++ {
var err error
@@ -155,26 +158,29 @@ func TestClientSendServerReceive(t *testing.T) {
func TestClientReceiver(t *testing.T) {
nMessages := 3
- client, server := newClientServer(t, func(i Incoming) {
- switch i := i.(type) {
- case *IncomingSender:
- s := i.AcceptSender()
- go func() {
- for i := int32(0); i < int32(nMessages); i++ {
- sm, err := s.Send(amqp.NewMessageWith(i))
- if err != nil {
- t.Error(err)
- return
- } else {
- sm.Disposition() // Sync send.
+ client, server := newClientServer(t)
+ go func() {
+ for in := range server.Incoming() {
+ switch in := in.(type) {
+ case *IncomingSender:
+ s := in.Accept().(Sender)
+ go func() {
+ for i := int32(0); i < int32(nMessages); i++ {
+ sm, err := s.Send(amqp.NewMessageWith(i))
+ if err != nil {
+ t.Error(err)
+ return
+ } else {
+ sm.Disposition() // Sync send.
+ }
}
- }
- s.Close(nil)
- }()
- default:
- i.Accept()
+ s.Close(nil)
+ }()
+ default:
+ in.Accept()
+ }
}
- })
+ }()
r, err := client.Receiver(Source("foo"))
if err != nil {
@@ -203,14 +209,19 @@ func TestClientReceiver(t *testing.T) {
func TestTimeouts(t *testing.T) {
var err error
rchan := make(chan Receiver, 1)
- client, server := newClientServer(t, func(i Incoming) {
- switch i := i.(type) {
- case *IncomingReceiver:
- rchan <- i.AcceptReceiver(1, false) // Issue credit only on receive
- default:
- i.Accept()
+ client, server := newClientServer(t)
+ go func() {
+ for i := range server.Incoming() {
+ switch i := i.(type) {
+ case *IncomingReceiver:
+ i.SetCapacity(1)
+ i.SetPrefetch(false)
+ rchan <- i.Accept().(Receiver) // Issue credit only on receive
+ default:
+ i.Accept()
+ }
}
- })
+ }()
defer func() { closeClientServer(client, server) }()
// Open client sender
@@ -274,16 +285,21 @@ type pairs struct {
func newPairs(t *testing.T) *pairs {
p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
- p.client, p.server = newClientServer(t, func(i Incoming) {
- switch i := i.(type) {
- case *IncomingReceiver:
- p.rchan <- i.AcceptReceiver(1, false)
- case *IncomingSender:
- p.schan <- i.AcceptSender()
- default:
- i.Accept()
+ p.client, p.server = newClientServer(t)
+ go func() {
+ for i := range p.server.Incoming() {
+ switch i := i.(type) {
+ case *IncomingReceiver:
+ i.SetCapacity(1)
+ i.SetPrefetch(false)
+ p.rchan <- i.Accept().(Receiver)
+ case *IncomingSender:
+ p.schan <- i.Accept().(Sender)
+ default:
+ i.Accept()
+ }
}
- })
+ }()
return p
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 9f854cf..4ff83b4 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -119,6 +119,7 @@ type receiver struct {
policy policy
}
+// Call in proton goroutine
func newReceiver(l link) *receiver {
r := &receiver{link: l}
if r.capacity < 1 {
@@ -216,23 +217,19 @@ func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
// Reject is short for Acknowledge(Rejected)
func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
-// IncomingReceiver is passed to the accept() function given to Connection.Listen()
-// when there is an incoming request for a receiver link.
+// IncomingReceiver is sent on the Connection.Incoming() channel when there is
+// an incoming request to open a receiver link.
type IncomingReceiver struct {
incomingLink
}
-// Link provides information about the incoming link.
-func (i *IncomingReceiver) Link() Link { return i }
+// SetCapacity sets the capacity of the incoming receiver, call before Accept()
+func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = capacity }
-// AcceptReceiver sets Capacity and Prefetch of the accepted Receiver.
-func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) Receiver {
- i.capacity = capacity
- i.prefetch = prefetch
- return i.Accept().(Receiver)
-}
+// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept()
+func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = prefetch }
-func (i *IncomingReceiver) Accept() Endpoint {
- i.accepted = true
- return newReceiver(i.link)
+// Accept accepts an incoming receiver endpoint
+func (in *IncomingReceiver) Accept() Endpoint {
+ return in.accept(func() Endpoint { return newReceiver(in.link) })
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 98304c1..11f019b 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -297,18 +297,13 @@ func (sm *sentMessage) finish() {
func (sm *sentMessage) Error() error { return sm.err }
-// IncomingSender is passed to the accept() function given to Connection.Listen()
-// when there is an incoming request for a sender link.
+// IncomingSender is sent on the Connection.Incoming() channel when there is
+// an incoming request to open a sender link.
type IncomingSender struct {
incomingLink
}
-// Link provides information about the incoming link.
-func (i *IncomingSender) Link() Link { return i }
-
-func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) }
-
-func (i *IncomingSender) Accept() Endpoint {
- i.accepted = true
- return newSender(i.link)
+// Accept accepts an incoming sender endpoint
+func (in *IncomingSender) Accept() Endpoint {
+ return in.accept(func() Endpoint { return newSender(in.link) })
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index 785a582..d347c99 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -44,7 +44,8 @@ type session struct {
// SessionOption can be passed when creating a Session
type SessionOption func(*session)
-// IncomingCapacity sets the size (in bytes) of the sessions incoming data buffer..
+// IncomingCapacity returns a Session Option that sets the size (in bytes) of
+// the sessions incoming data buffer..
func IncomingCapacity(cap uint) SessionOption { return func(s *session) { s.capacity = cap } }
// in proton goroutine
@@ -70,8 +71,6 @@ func (s *session) Close(err error) {
s.engine().Inject(func() { localClose(s.eSession, err) })
}
-func (s *session) SetCapacity(bytes uint) { s.capacity = bytes }
-
func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
err = s.engine().InjectWait(func() error {
l, err := localLink(s, true, setting...)
@@ -100,8 +99,8 @@ func (s *session) closed(err error) {
s.err.Set(Closed)
}
-// IncomingSession is passed to the accept() function given to Connection.Listen()
-// when there is an incoming session request.
+// IncomingSender is sent on the Connection.Incoming() channel when there is an
+// incoming request to open a session.
type IncomingSession struct {
incoming
h *handler
@@ -109,13 +108,16 @@ type IncomingSession struct {
capacity uint
}
-// AcceptCapacity sets the session buffer capacity of an incoming session in bytes.
-func (i *IncomingSession) AcceptSession(bytes uint) Session {
- i.capacity = bytes
- return i.Accept().(Session)
+func newIncomingSession(h *handler, ps proton.Session) *IncomingSession {
+ return &IncomingSession{incoming: makeIncoming(ps), h: h, pSession: ps}
}
-func (i *IncomingSession) Accept() Endpoint {
- i.accepted = true
- return newSession(i.h.connection, i.pSession, IncomingCapacity(i.capacity))
+// SetCapacity sets the session buffer capacity of an incoming session in bytes.
+func (in *IncomingSession) SetCapacity(bytes uint) { in.capacity = bytes }
+
+// Accept an incoming session endpoint.
+func (in *IncomingSession) Accept() Endpoint {
+ return in.accept(func() Endpoint {
+ return newSession(in.h.connection, in.pSession, IncomingCapacity(in.capacity))
+ })
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 1a1fd96..0b881c1 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -168,6 +168,8 @@ type Endpoint interface {
RemoteCondition() Condition
// Human readable name
String() string
+ // Human readable endpoint type "link", "session" etc.
+ Type() string
}
// CloseError sets an error condition on an endpoint and closes the endpoint
@@ -257,6 +259,15 @@ func (l Link) String() string {
}
}
+func (l Link) Type() string {
+ if l.IsSender() {
+ return "sender-link"
+ } else {
+ return "receiver-link"
+ }
+
+}
+
func cPtr(b []byte) *C.char {
if len(b) == 0 {
return nil
@@ -285,6 +296,10 @@ func (c Connection) String() string {
return fmt.Sprintf("%x", c.pn)
}
+func (c Connection) Type() string {
+ return "connection"
+}
+
// Head functions don't follow the normal naming conventions so missed by the generator.
func (c Connection) LinkHead(s State) Link {
@@ -313,6 +328,8 @@ func (s Session) String() string {
return fmt.Sprintf("%s/%p", s.Connection(), s.pn)
}
+func (s Session) Type() string { return "session" }
+
// Error returns an instance of amqp.Error or nil.
func (c Condition) Error() error {
if c.IsNil() || !c.IsSet() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org