You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/12 19:50:22 UTC

qpid-proton git commit: PROTON-1045: Use of callbacks to handle accepted endpoints violates design goals.

Repository: qpid-proton
Updated Branches:
  refs/heads/master f3edf5752 -> 91fe6e0fd


PROTON-1045: Use of callbacks to handle accepted endpoints violates design goals.

Get rid of use of callback functions that run in proton goroutine.
Use a channel instead for Incoming requests, all user code runs in user goroutines.
Safer and more consistent.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/91fe6e0f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/91fe6e0f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/91fe6e0f

Branch: refs/heads/master
Commit: 91fe6e0fda07efa07f10a97d906eee114c86d6bf
Parents: f3edf57
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 11 10:22:01 2015 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Nov 11 21:54:25 2015 -0500

----------------------------------------------------------------------
 examples/go/electron/broker.go                  |  35 ++++--
 .../src/qpid.apache.org/electron/connection.go  |  85 ++++++++------
 .../go/src/qpid.apache.org/electron/handler.go  |  43 ++++---
 .../go/src/qpid.apache.org/electron/link.go     |  36 +++---
 .../qpid.apache.org/electron/messaging_test.go  | 116 +++++++++++--------
 .../go/src/qpid.apache.org/electron/receiver.go |  23 ++--
 .../go/src/qpid.apache.org/electron/sender.go   |  15 +--
 .../go/src/qpid.apache.org/electron/session.go  |  26 +++--
 .../go/src/qpid.apache.org/proton/wrappers.go   |  17 +++
 9 files changed, 233 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index f1dce17..f68deb1 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -78,26 +78,43 @@ func (b *broker) run() error {
 			util.Debugf("Accept error: %v", err)
 			continue
 		}
-		c, err := b.container.Connection(conn, electron.Server(), electron.Accepter(b.accept))
+		c, err := b.container.Connection(conn, electron.Server(), electron.AllowIncoming())
 		if err != nil {
 			util.Debugf("Connection error: %v", err)
 			continue
 		}
+		go b.accept(c) // Goroutine to accept incoming sessions and links.
 		util.Debugf("Accepted %v", c)
 	}
 }
 
 // accept remotely-opened endpoints (Session, Sender and Receiver)
 // and start goroutines to service them.
-func (b *broker) accept(i electron.Incoming) {
-	switch i := i.(type) {
-	case *electron.IncomingSender:
-		go b.sender(i.AcceptSender())
-	case *electron.IncomingReceiver:
-		go b.receiver(i.AcceptReceiver(100, true)) // Pre-fetch 100 messages
-	default:
-		i.Accept()
+func (b *broker) accept(c electron.Connection) {
+	for in := range c.Incoming() {
+		switch in := in.(type) {
+
+		case *electron.IncomingSender:
+			if in.Source() == "" {
+				util.Debugf("sender has no source: %s", in)
+				break
+			}
+			go b.sender(in.Accept().(electron.Sender))
+
+		case *electron.IncomingReceiver:
+			if in.Target() == "" {
+				util.Debugf("receiver has no target: %s", in)
+				break
+			}
+			in.SetPrefetch(true)
+			in.SetCapacity(*credit) // Pre-fetch up to credit window.
+			go b.receiver(in.Accept().(electron.Receiver))
+
+		default:
+			in.Accept() // Accept sessions unconditionally
+		}
 	}
+	util.Debugf("incoming closed: %s", c)
 }
 
 // sender pops messages from a queue and sends them.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 8573f28..3462b92 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -24,7 +24,6 @@ import "C"
 
 import (
 	"net"
-	"qpid.apache.org/amqp"
 	"qpid.apache.org/proton"
 	"sync"
 	"time"
@@ -58,12 +57,30 @@ type Connection interface {
 
 	// WaitTimeout is like Wait but returns Timeout if the timeout expires.
 	WaitTimeout(time.Duration) error
+
+	// Incoming returns a channel for incoming endpoints opened by the remote end.
+	//
+	// To enable, pass AllowIncoming() when creating the Connection. Otherwise all
+	// incoming endpoint requests are automatically rejected and Incoming()
+	// returns nil.
+	//
+	// An Incoming value can be an *IncomingSession, *IncomingSender or
+	// *IncomingReceiver.  You must call Accept() to open the endpoint or Reject()
+	// to close it with an error. The specific Incoming types have additional
+	// methods to configure the endpoint.
+	//
+	// Delay in receiving from Incoming() or calling Accept/Reject will block
+	// proton. Normally you should have a dedicated goroutine receive from this
+	// channel and start a new goroutine to serve each endpoint accepted.  The
+	// channel is closed when the Connection closes.
+	//
+	Incoming() <-chan Incoming
 }
 
-// ConnectionOption can be passed when creating a connection.
+// ConnectionOption can be passed when creating a connection to configure various options
 type ConnectionOption func(*connection)
 
-// Server setting puts the connection in server mode.
+// Server returns a ConnectionOption to put the connection in server mode.
 //
 // A server connection will do protocol negotiation to accept a incoming AMQP
 // connection. Normally you would call this for a connection created by
@@ -71,24 +88,19 @@ type ConnectionOption func(*connection)
 //
 func Server() ConnectionOption { return func(c *connection) { c.engine.Server() } }
 
-// Accepter provides a function to be called when a connection receives an incoming
-// request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver.
-//
-// The accept() function must not block or use the accepted endpoint.
-// It can pass the endpoint to another goroutine for processing.
-//
-// By default all incoming endpoints are rejected.
-func Accepter(accept func(Incoming)) ConnectionOption {
-	return func(c *connection) { c.accept = accept }
+// AllowIncoming returns a ConnectionOption to enable incoming endpoint open requests.
+// See Connection.Incoming()
+func AllowIncoming() ConnectionOption {
+	return func(c *connection) { c.incoming = make(chan Incoming) }
 }
 
 type connection struct {
 	endpoint
-	listenOnce, defaultSessionOnce, closeOnce sync.Once
+	defaultSessionOnce, closeOnce sync.Once
 
 	container   *container
 	conn        net.Conn
-	accept      func(Incoming)
+	incoming    chan Incoming
 	handler     *handler
 	engine      *proton.Engine
 	err         proton.ErrorHolder
@@ -99,7 +111,7 @@ type connection struct {
 }
 
 func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) {
-	c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})}
+	c := &connection{container: cont, conn: conn, done: make(chan struct{})}
 	c.handler = newHandler(c)
 	var err error
 	c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
@@ -173,38 +185,45 @@ func (c *connection) WaitTimeout(timeout time.Duration) error {
 	return c.Error()
 }
 
+func (c *connection) Incoming() <-chan Incoming { return c.incoming }
+
 // Incoming is the interface for incoming requests to open an endpoint.
 // Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
 type Incoming interface {
-	// Accept the endpoint with default settings.
-	//
-	// You must not use the returned endpoint in the accept() function that
-	// receives the Incoming value, but you can pass it to other goroutines.
-	//
-	// Implementing types provide type-specific Accept functions that take additional settings.
+	// Accept and open the endpoint.
 	Accept() Endpoint
 
 	// Reject the endpoint with an error
 	Reject(error)
 
-	error(string, ...interface{}) error
+	// wait for and call the accept function, call in proton goroutine.
+	wait() error
+	pEndpoint() proton.Endpoint
 }
 
-// Common state for incoming endpoints, record accept or reject error.
 type incoming struct {
-	err      error
-	accepted bool
+	endpoint proton.Endpoint
+	acceptCh chan func() error
 }
 
-func (i *incoming) Reject(err error) { i.err = err }
+func makeIncoming(e proton.Endpoint) incoming {
+	return incoming{endpoint: e, acceptCh: make(chan func() error)}
+}
+
+func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } }
+
+// Call in proton goroutine, wait for and call the accept function fr
+func (in *incoming) wait() error { return (<-in.acceptCh)() }
+
+func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint }
 
-func (i *incoming) error(fmt string, arg ...interface{}) error {
-	switch {
-	case i.err != nil:
-		return i.err
-	case !i.accepted:
-		return amqp.Errorf(amqp.NotAllowed, fmt, arg...)
-	default:
+// Called in app goroutine to send an accept function to proton and return the resulting endpoint.
+func (in *incoming) accept(f func() Endpoint) Endpoint {
+	done := make(chan Endpoint)
+	in.acceptCh <- func() error {
+		ep := f()
+		done <- ep
 		return nil
 	}
+	return <-done
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 754a221..1b1164c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -49,9 +49,8 @@ func newHandler(c *connection) *handler {
 	h.delegator.AutoOpen = false
 	return h
 }
-
-func (h *handler) internalError(ep proton.Endpoint, msg string) {
-	proton.CloseError(ep, amqp.Errorf(amqp.InternalError, "%s %s", msg, ep))
+func (h *handler) linkError(l proton.Link, msg string) {
+	proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l))
 }
 
 func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
@@ -61,7 +60,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		if r, ok := h.links[e.Link()].(*receiver); ok {
 			r.message(e.Delivery())
 		} else {
-			h.internalError(e.Link(), "no receiver for link")
+			h.linkError(e.Link(), "no receiver")
 		}
 
 	case proton.MSettled:
@@ -73,16 +72,12 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		if s, ok := h.links[e.Link()].(*sender); ok {
 			s.sendable()
 		} else {
-			h.internalError(e.Link(), "no sender for link")
+			h.linkError(e.Link(), "no sender")
 		}
 
 	case proton.MSessionOpening:
 		if e.Session().State().LocalUninit() { // Remotely opened
-			incoming := &IncomingSession{h: h, pSession: e.Session()}
-			h.connection.accept(incoming)
-			if err := incoming.error("rejected session %s", e.Session()); err != nil {
-				proton.CloseError(e.Session(), err)
-			}
+			h.incoming(newIncomingSession(h, e.Session()))
 		}
 
 	case proton.MSessionClosed:
@@ -101,19 +96,13 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		}
 		ss := h.sessions[l.Session()]
 		if ss == nil {
-			h.internalError(e.Link(), "no session for link")
+			h.linkError(e.Link(), "no session")
 			break
 		}
-		var incoming Incoming
 		if l.IsReceiver() {
-			incoming = &IncomingReceiver{makeIncomingLink(ss, l)}
+			h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)})
 		} else {
-			incoming = &IncomingSender{makeIncomingLink(ss, l)}
-		}
-		h.connection.accept(incoming)
-		if err := incoming.error("rejected link %s", e.Link()); err != nil {
-			proton.CloseError(l, err)
-			break
+			h.incoming(&IncomingSender{makeIncomingLink(ss, l)})
 		}
 
 	case proton.MLinkClosing:
@@ -146,6 +135,22 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 	}
 }
 
+func (h *handler) incoming(in Incoming) {
+	var err error
+	if h.connection.incoming != nil {
+		h.connection.incoming <- in
+		err = in.wait()
+	} else {
+		err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s",
+			in.pEndpoint().Type(), in.pEndpoint().String())
+	}
+	if err == nil {
+		in.pEndpoint().Open()
+	} else {
+		proton.CloseError(in.pEndpoint(), err)
+	}
+}
+
 func (h *handler) linkClosed(l proton.Link, err error) {
 	if link := h.links[l]; link != nil {
 		link.closed(err)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 4835cb9..cadc2c1 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -60,36 +60,37 @@ type Link interface {
 	open()
 }
 
-// LinkOption can be passed when creating a sender or receiver link.
+// LinkOption can be passed when creating a sender or receiver link to set optional configuration.
 type LinkOption func(*link)
 
-// Source sets address that messages are coming from.
+// Source returns a LinkOption that sets address that messages are coming from.
 func Source(s string) LinkOption { return func(l *link) { l.source = s } }
 
-// Target sets address that messages are going to.
+// Target returns a LinkOption that sets address that messages are going to.
 func Target(s string) LinkOption { return func(l *link) { l.target = s } }
 
-// LinkName sets the link name.
+// LinkName returns a LinkOption that sets the link name.
 func LinkName(s string) LinkOption { return func(l *link) { l.target = s } }
 
-// SndSettle sets the send settle mode
+// SndSettle returns a LinkOption that sets the send settle mode
 func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { l.sndSettle = m } }
 
-// RcvSettle sets the send settle mode
+// RcvSettle returns a LinkOption that sets the send settle mode
 func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { l.rcvSettle = m } }
 
-// SndSettleMode defines when the sending end of the link settles message delivery.
+// SndSettleMode returns a LinkOption that defines when the sending end of the
+// link settles message delivery.
 type SndSettleMode proton.SndSettleMode
 
-// Capacity sets the link capacity
+// Capacity returns a LinkOption that sets the link capacity
 func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } }
 
-// Prefetch sets a receivers pre-fetch flag. Not relevant for a sender.
+// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.
 func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } }
 
-// AtMostOnce sets "fire and forget" mode, messages are sent but no
-// acknowledgment is received, messages can be lost if there is a network
-// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
+// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
+// are sent but no acknowledgment is received, messages can be lost if there is
+// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
 func AtMostOnce() LinkOption {
 	return func(l *link) {
 		SndSettle(SndSettled)(l)
@@ -97,11 +98,11 @@ func AtMostOnce() LinkOption {
 	}
 }
 
-// AtLeastOnce requests acknowledgment for every message, acknowledgment
-// indicates the message was definitely received. In the event of a
-// failure, unacknowledged messages can be re-sent but there is a chance
-// that the message will be received twice in this case.
-// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
+// AtLeastOnce returns a LinkOption that requests acknowledgment for every
+// message, acknowledgment indicates the message was definitely received. In the
+// event of a failure, unacknowledged messages can be re-sent but there is a
+// chance that the message will be received twice in this case.  Sets
+// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
 func AtLeastOnce() LinkOption {
 	return func(l *link) {
 		SndSettle(SndUnsettled)(l)
@@ -200,6 +201,7 @@ type incomingLink struct {
 // Set up a link from an incoming proton.Link.
 func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
 	l := incomingLink{
+		incoming: makeIncoming(eLink),
 		link: link{
 			session:   sn,
 			isSender:  eLink.IsSender(),

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
index c7ff290..3b5a062 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
@@ -39,22 +39,22 @@ func fatalIf(t *testing.T, err error) {
 	}
 }
 
-// Start a server, return listening addr and channel for incoming Connection.
-func newServer(t *testing.T, cont Container, accept func(Incoming)) (net.Addr, <-chan Connection) {
+// Start a server, return listening addr and channel for incoming Connections.
+func newServer(t *testing.T, cont Container) (net.Addr, <-chan Connection) {
 	listener, err := net.Listen("tcp", "")
 	fatalIf(t, err)
 	addr := listener.Addr()
 	ch := make(chan Connection)
 	go func() {
 		conn, err := listener.Accept()
-		c, err := cont.Connection(conn, Server(), Accepter(accept))
+		c, err := cont.Connection(conn, Server(), AllowIncoming())
 		fatalIf(t, err)
 		ch <- c
 	}()
 	return addr, ch
 }
 
-// Return open an client connection and session, return the session.
+// Open a client connection and session, return the session.
 func newClient(t *testing.T, cont Container, addr net.Addr) Session {
 	conn, err := net.Dial(addr.Network(), addr.String())
 	fatalIf(t, err)
@@ -66,8 +66,8 @@ func newClient(t *testing.T, cont Container, addr net.Addr) Session {
 }
 
 // Return client and server ends of the same connection.
-func newClientServer(t *testing.T, accept func(Incoming)) (client Session, server Connection) {
-	addr, ch := newServer(t, NewContainer("test-server"), accept)
+func newClientServer(t *testing.T) (client Session, server Connection) {
+	addr, ch := newServer(t, NewContainer("test-server"))
 	client = newClient(t, NewContainer("test-client"), addr)
 	return client, <-ch
 }
@@ -85,19 +85,22 @@ func TestClientSendServerReceive(t *testing.T) {
 	nMessages := 3
 
 	rchan := make(chan Receiver, nLinks)
-	client, server := newClientServer(t, func(i Incoming) {
-		switch i := i.(type) {
-		case *IncomingReceiver:
-			rchan <- i.AcceptReceiver(1, false)
-		default:
-			i.Accept()
+	client, server := newClientServer(t)
+	go func() {
+		for in := range server.Incoming() {
+			switch in := in.(type) {
+			case *IncomingReceiver:
+				in.SetCapacity(1)
+				in.SetPrefetch(false)
+				rchan <- in.Accept().(Receiver)
+			default:
+				in.Accept()
+			}
 		}
-	})
-
-	defer func() {
-		closeClientServer(client, server)
 	}()
 
+	defer func() { closeClientServer(client, server) }()
+
 	s := make([]Sender, nLinks)
 	for i := 0; i < nLinks; i++ {
 		var err error
@@ -155,26 +158,29 @@ func TestClientSendServerReceive(t *testing.T) {
 
 func TestClientReceiver(t *testing.T) {
 	nMessages := 3
-	client, server := newClientServer(t, func(i Incoming) {
-		switch i := i.(type) {
-		case *IncomingSender:
-			s := i.AcceptSender()
-			go func() {
-				for i := int32(0); i < int32(nMessages); i++ {
-					sm, err := s.Send(amqp.NewMessageWith(i))
-					if err != nil {
-						t.Error(err)
-						return
-					} else {
-						sm.Disposition() // Sync send.
+	client, server := newClientServer(t)
+	go func() {
+		for in := range server.Incoming() {
+			switch in := in.(type) {
+			case *IncomingSender:
+				s := in.Accept().(Sender)
+				go func() {
+					for i := int32(0); i < int32(nMessages); i++ {
+						sm, err := s.Send(amqp.NewMessageWith(i))
+						if err != nil {
+							t.Error(err)
+							return
+						} else {
+							sm.Disposition() // Sync send.
+						}
 					}
-				}
-				s.Close(nil)
-			}()
-		default:
-			i.Accept()
+					s.Close(nil)
+				}()
+			default:
+				in.Accept()
+			}
 		}
-	})
+	}()
 
 	r, err := client.Receiver(Source("foo"))
 	if err != nil {
@@ -203,14 +209,19 @@ func TestClientReceiver(t *testing.T) {
 func TestTimeouts(t *testing.T) {
 	var err error
 	rchan := make(chan Receiver, 1)
-	client, server := newClientServer(t, func(i Incoming) {
-		switch i := i.(type) {
-		case *IncomingReceiver:
-			rchan <- i.AcceptReceiver(1, false) // Issue credit only on receive
-		default:
-			i.Accept()
+	client, server := newClientServer(t)
+	go func() {
+		for i := range server.Incoming() {
+			switch i := i.(type) {
+			case *IncomingReceiver:
+				i.SetCapacity(1)
+				i.SetPrefetch(false)
+				rchan <- i.Accept().(Receiver) // Issue credit only on receive
+			default:
+				i.Accept()
+			}
 		}
-	})
+	}()
 	defer func() { closeClientServer(client, server) }()
 
 	// Open client sender
@@ -274,16 +285,21 @@ type pairs struct {
 
 func newPairs(t *testing.T) *pairs {
 	p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
-	p.client, p.server = newClientServer(t, func(i Incoming) {
-		switch i := i.(type) {
-		case *IncomingReceiver:
-			p.rchan <- i.AcceptReceiver(1, false)
-		case *IncomingSender:
-			p.schan <- i.AcceptSender()
-		default:
-			i.Accept()
+	p.client, p.server = newClientServer(t)
+	go func() {
+		for i := range p.server.Incoming() {
+			switch i := i.(type) {
+			case *IncomingReceiver:
+				i.SetCapacity(1)
+				i.SetPrefetch(false)
+				p.rchan <- i.Accept().(Receiver)
+			case *IncomingSender:
+				p.schan <- i.Accept().(Sender)
+			default:
+				i.Accept()
+			}
 		}
-	})
+	}()
 	return p
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 9f854cf..4ff83b4 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -119,6 +119,7 @@ type receiver struct {
 	policy policy
 }
 
+// Call in proton goroutine
 func newReceiver(l link) *receiver {
 	r := &receiver{link: l}
 	if r.capacity < 1 {
@@ -216,23 +217,19 @@ func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
 // Reject is short for Acknowledge(Rejected)
 func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
 
-// IncomingReceiver is passed to the accept() function given to Connection.Listen()
-// when there is an incoming request for a receiver link.
+// IncomingReceiver is sent on the Connection.Incoming() channel when there is
+// an incoming request to open a receiver link.
 type IncomingReceiver struct {
 	incomingLink
 }
 
-// Link provides information about the incoming link.
-func (i *IncomingReceiver) Link() Link { return i }
+// SetCapacity sets the capacity of the incoming receiver, call before Accept()
+func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = capacity }
 
-// AcceptReceiver sets Capacity and Prefetch of the accepted Receiver.
-func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) Receiver {
-	i.capacity = capacity
-	i.prefetch = prefetch
-	return i.Accept().(Receiver)
-}
+// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept()
+func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = prefetch }
 
-func (i *IncomingReceiver) Accept() Endpoint {
-	i.accepted = true
-	return newReceiver(i.link)
+// Accept accepts an incoming receiver endpoint
+func (in *IncomingReceiver) Accept() Endpoint {
+	return in.accept(func() Endpoint { return newReceiver(in.link) })
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 98304c1..11f019b 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -297,18 +297,13 @@ func (sm *sentMessage) finish() {
 
 func (sm *sentMessage) Error() error { return sm.err }
 
-// IncomingSender is passed to the accept() function given to Connection.Listen()
-// when there is an incoming request for a sender link.
+// IncomingSender is sent on the Connection.Incoming() channel when there is
+// an incoming request to open a sender link.
 type IncomingSender struct {
 	incomingLink
 }
 
-// Link provides information about the incoming link.
-func (i *IncomingSender) Link() Link { return i }
-
-func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) }
-
-func (i *IncomingSender) Accept() Endpoint {
-	i.accepted = true
-	return newSender(i.link)
+// Accept accepts an incoming sender endpoint
+func (in *IncomingSender) Accept() Endpoint {
+	return in.accept(func() Endpoint { return newSender(in.link) })
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index 785a582..d347c99 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -44,7 +44,8 @@ type session struct {
 // SessionOption can be passed when creating a Session
 type SessionOption func(*session)
 
-// IncomingCapacity sets the size (in bytes) of the sessions incoming data buffer..
+// IncomingCapacity returns a Session Option that sets the size (in bytes) of
+// the sessions incoming data buffer..
 func IncomingCapacity(cap uint) SessionOption { return func(s *session) { s.capacity = cap } }
 
 // in proton goroutine
@@ -70,8 +71,6 @@ func (s *session) Close(err error) {
 	s.engine().Inject(func() { localClose(s.eSession, err) })
 }
 
-func (s *session) SetCapacity(bytes uint) { s.capacity = bytes }
-
 func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
 	err = s.engine().InjectWait(func() error {
 		l, err := localLink(s, true, setting...)
@@ -100,8 +99,8 @@ func (s *session) closed(err error) {
 	s.err.Set(Closed)
 }
 
-// IncomingSession is passed to the accept() function given to Connection.Listen()
-// when there is an incoming session request.
+// IncomingSender is sent on the Connection.Incoming() channel when there is an
+// incoming request to open a session.
 type IncomingSession struct {
 	incoming
 	h        *handler
@@ -109,13 +108,16 @@ type IncomingSession struct {
 	capacity uint
 }
 
-// AcceptCapacity sets the session buffer capacity of an incoming session in bytes.
-func (i *IncomingSession) AcceptSession(bytes uint) Session {
-	i.capacity = bytes
-	return i.Accept().(Session)
+func newIncomingSession(h *handler, ps proton.Session) *IncomingSession {
+	return &IncomingSession{incoming: makeIncoming(ps), h: h, pSession: ps}
 }
 
-func (i *IncomingSession) Accept() Endpoint {
-	i.accepted = true
-	return newSession(i.h.connection, i.pSession, IncomingCapacity(i.capacity))
+// SetCapacity sets the session buffer capacity of an incoming session in bytes.
+func (in *IncomingSession) SetCapacity(bytes uint) { in.capacity = bytes }
+
+// Accept an incoming session endpoint.
+func (in *IncomingSession) Accept() Endpoint {
+	return in.accept(func() Endpoint {
+		return newSession(in.h.connection, in.pSession, IncomingCapacity(in.capacity))
+	})
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 1a1fd96..0b881c1 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -168,6 +168,8 @@ type Endpoint interface {
 	RemoteCondition() Condition
 	// Human readable name
 	String() string
+	// Human readable endpoint type "link", "session" etc.
+	Type() string
 }
 
 // CloseError sets an error condition on an endpoint and closes the endpoint
@@ -257,6 +259,15 @@ func (l Link) String() string {
 	}
 }
 
+func (l Link) Type() string {
+	if l.IsSender() {
+		return "sender-link"
+	} else {
+		return "receiver-link"
+	}
+
+}
+
 func cPtr(b []byte) *C.char {
 	if len(b) == 0 {
 		return nil
@@ -285,6 +296,10 @@ func (c Connection) String() string {
 	return fmt.Sprintf("%x", c.pn)
 }
 
+func (c Connection) Type() string {
+	return "connection"
+}
+
 // Head functions don't follow the normal naming conventions so missed by the generator.
 
 func (c Connection) LinkHead(s State) Link {
@@ -313,6 +328,8 @@ func (s Session) String() string {
 	return fmt.Sprintf("%s/%p", s.Connection(), s.pn)
 }
 
+func (s Session) Type() string { return "session" }
+
 // Error returns an instance of amqp.Error or nil.
 func (c Condition) Error() error {
 	if c.IsNil() || !c.IsSet() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org