You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/18 20:48:10 UTC

[2/2] qpid-proton git commit: PROTON-1047: go: improved ack handling in electron API, add to broker example

PROTON-1047: go: improved ack handling in electron API, add to broker example

Sender interface changed
- SendAsync: flexible and efficient server-side ack handling across multiple Senders.
- SendSync, SendForget, SendWaitable: easy-to-use methods for common client cases.

electron broker.go, send.go examples demonstrate async send handling.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cd8ad96f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cd8ad96f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cd8ad96f

Branch: refs/heads/master
Commit: cd8ad96f2b3022344198abe8b76f0798758dbe9d
Parents: 41eac74
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 11 15:03:07 2015 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Nov 18 14:27:57 2015 -0500

----------------------------------------------------------------------
 examples/go/electron/broker.go                  | 133 +++++--
 examples/go/electron/receive.go                 |  11 +-
 examples/go/electron/send.go                    |  25 +-
 examples/go/proton/broker.go                    |   9 +-
 examples/go/util/queue.go                       |  10 +
 .../go/src/qpid.apache.org/amqp/error.go        |   2 +-
 .../src/qpid.apache.org/electron/connection.go  |  27 +-
 .../go/src/qpid.apache.org/electron/doc.go      |   3 +-
 .../go/src/qpid.apache.org/electron/endpoint.go |  31 +-
 .../go/src/qpid.apache.org/electron/handler.go  |  16 +-
 .../go/src/qpid.apache.org/electron/link.go     |  23 +-
 .../qpid.apache.org/electron/messaging_test.go  |  71 ++--
 .../go/src/qpid.apache.org/electron/receiver.go |  29 +-
 .../go/src/qpid.apache.org/electron/sender.go   | 361 +++++++++----------
 .../go/src/qpid.apache.org/electron/session.go  |  21 +-
 .../go/src/qpid.apache.org/electron/time.go     |   3 +-
 .../go/src/qpid.apache.org/proton/engine.go     |   7 +
 17 files changed, 430 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index f68deb1..66941b7 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -33,6 +33,7 @@ import (
 	"log"
 	"net"
 	"os"
+	"qpid.apache.org/amqp"
 	"qpid.apache.org/electron"
 )
 
@@ -52,7 +53,12 @@ var qsize = flag.Int("qsize", 1000, "Max queue size")
 func main() {
 	flag.Usage = usage
 	flag.Parse()
-	b := &broker{util.MakeQueues(*qsize), electron.NewContainer("")}
+	b := &broker{
+		queues:    util.MakeQueues(*qsize),
+		container: electron.NewContainer(""),
+		acks:      make(chan electron.Outcome),
+		sent:      make(chan sentMessage),
+	}
 	if err := b.run(); err != nil {
 		log.Fatal(err)
 	}
@@ -60,18 +66,31 @@ func main() {
 
 // State for the broker
 type broker struct {
-	queues    util.Queues
-	container electron.Container
+	queues    util.Queues           // A collection of queues.
+	container electron.Container    // electron.Container manages AMQP connections.
+	sent      chan sentMessage      // Channel to record sent messages.
+	acks      chan electron.Outcome // Channel to receive the Outcome of sent messages.
+}
+
+// Record of a sent message and the queue it came from.
+// If a message is rejected or not acknowledged due to a failure, we will put it back on the queue.
+type sentMessage struct {
+	m amqp.Message
+	q util.Queue
 }
 
-// Listens for connections and starts an electron.Connection for each one.
+// run listens for incoming net.Conn connections and starts an electron.Connection for each one.
 func (b *broker) run() error {
 	listener, err := net.Listen("tcp", *addr)
 	if err != nil {
 		return err
 	}
 	defer listener.Close()
-	fmt.Printf("Listening on %s\n", listener.Addr())
+	fmt.Printf("Listening on %v\n", listener.Addr())
+
+	go b.acknowledgements() // Handles acknowledgements for all connections.
+
+	// Start a goroutine for each new connections
 	for {
 		conn, err := listener.Accept()
 		if err != nil {
@@ -83,69 +102,107 @@ func (b *broker) run() error {
 			util.Debugf("Connection error: %v", err)
 			continue
 		}
-		go b.accept(c) // Goroutine to accept incoming sessions and links.
+		cc := &connection{b, c}
+		go cc.run() // Handle the connection
 		util.Debugf("Accepted %v", c)
 	}
 }
 
-// accept remotely-opened endpoints (Session, Sender and Receiver)
+// State for a broker connectoin
+type connection struct {
+	broker     *broker
+	connection electron.Connection
+}
+
+// accept remotely-opened endpoints (Session, Sender and Receiver) on a connection
 // and start goroutines to service them.
-func (b *broker) accept(c electron.Connection) {
-	for in := range c.Incoming() {
+func (c *connection) run() {
+	for in := range c.connection.Incoming() {
 		switch in := in.(type) {
 
 		case *electron.IncomingSender:
 			if in.Source() == "" {
-				util.Debugf("sender has no source: %s", in)
-				break
+				in.Reject(fmt.Errorf("no source"))
+			} else {
+				go c.sender(in.Accept().(electron.Sender))
 			}
-			go b.sender(in.Accept().(electron.Sender))
 
 		case *electron.IncomingReceiver:
 			if in.Target() == "" {
-				util.Debugf("receiver has no target: %s", in)
-				break
+				in.Reject(fmt.Errorf("no target"))
+			} else {
+				in.SetPrefetch(true)
+				in.SetCapacity(*credit) // Pre-fetch up to credit window.
+				go c.receiver(in.Accept().(electron.Receiver))
 			}
-			in.SetPrefetch(true)
-			in.SetCapacity(*credit) // Pre-fetch up to credit window.
-			go b.receiver(in.Accept().(electron.Receiver))
 
 		default:
 			in.Accept() // Accept sessions unconditionally
 		}
 	}
-	util.Debugf("incoming closed: %s", c)
+	util.Debugf("incoming closed: %v", c.connection)
+}
+
+// receiver receives messages and pushes to a queue.
+func (c *connection) receiver(receiver electron.Receiver) {
+	q := c.broker.queues.Get(receiver.Target())
+	for {
+		if rm, err := receiver.Receive(); err == nil {
+			util.Debugf("%v: received %v", receiver, util.FormatMessage(rm.Message))
+			q <- rm.Message
+			rm.Accept()
+		} else {
+			util.Debugf("%v error: %v", receiver, err)
+			break
+		}
+	}
 }
 
 // sender pops messages from a queue and sends them.
-func (b *broker) sender(sender electron.Sender) {
-	q := b.queues.Get(sender.Source())
+func (c *connection) sender(sender electron.Sender) {
+	q := c.broker.queues.Get(sender.Source())
 	for {
-		m, ok := <-q
-		if !ok { // Queue closed
+		if sender.Error() != nil {
+			util.Debugf("%v closed: %v", sender, sender.Error())
 			return
 		}
-		if err := sender.SendForget(m); err == nil {
-			util.Debugf("%s send: %s", sender, util.FormatMessage(m))
-		} else {
-			util.Debugf("%s error: %s", sender, err)
-			q <- m // Put it back on the queue.
-			return
+		select {
+
+		case m := <-q:
+			util.Debugf("%v: sent %v", sender, util.FormatMessage(m))
+			sm := sentMessage{m, q}
+			c.broker.sent <- sm                    // Record sent message
+			sender.SendAsync(m, c.broker.acks, sm) // Receive outcome on c.broker.acks with Value sm
+
+		case <-sender.Done(): // break if sender is closed
+			break
 		}
 	}
 }
 
-// receiver receives messages and pushes to a queue.
-func (b *broker) receiver(receiver electron.Receiver) {
-	q := b.queues.Get(receiver.Target())
+// acknowledgements keeps track of sent messages and receives outcomes.
+//
+// We could have handled outcomes separately per-connection, per-sender or even
+// per-message. Message outcomes are returned via channels defined by the user
+// so they can be grouped in any way that suits the application.
+func (b *broker) acknowledgements() {
+	sentMap := make(map[sentMessage]bool)
 	for {
-		if rm, err := receiver.Receive(); err == nil {
-			util.Debugf("%s: received %s", receiver, util.FormatMessage(rm.Message))
-			q <- rm.Message
-			rm.Accept()
-		} else {
-			util.Debugf("%s error: %s", receiver, err)
-			break
+		select {
+		case sm, ok := <-b.sent: // A local sender records that it has sent a message.
+			if ok {
+				sentMap[sm] = true
+			} else {
+				return // Closed
+			}
+		case outcome := <-b.acks: // The message outcome is available
+			sm := outcome.Value.(sentMessage)
+			delete(sentMap, sm)
+			if outcome.Status != electron.Accepted { // Error, release or rejection
+				sm.q.PutBack(sm.m) // Put the message back on the queue.
+				util.Debugf("message %v put back, status %v, error %v",
+					util.FormatMessage(sm.m), outcome.Status, outcome.Error)
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index f7d41fa..d631726 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -85,15 +85,12 @@ func main() {
 
 			// Loop receiving messages and sending them to the main() goroutine
 			for {
-				rm, err := r.Receive()
-				switch err {
-				case electron.Closed:
-					util.Debugf("closed %s", urlStr)
+				if rm, err := r.Receive(); err != nil {
+					util.Debugf("closed %v: %v", urlStr, err)
 					return
-				case nil:
+				} else {
+					rm.Accept()
 					messages <- rm.Message
-				default:
-					log.Fatal(err)
 				}
 			}
 		}(urlStr)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index c9bdbc9..4c9a0bf 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -42,11 +42,6 @@ Send messages to each URL concurrently with body "<url-path>-<n>" where n is the
 
 var count = flag.Int64("count", 1, "Send this may messages per address.")
 
-type sent struct {
-	name        string
-	sentMessage electron.SentMessage
-}
-
 func main() {
 	flag.Usage = usage
 	flag.Parse()
@@ -58,9 +53,10 @@ func main() {
 		os.Exit(1)
 	}
 
-	sentChan := make(chan sent) // Channel to receive all the delivery receipts.
-	var wait sync.WaitGroup     // Used by main() to wait for all goroutines to end.
-	wait.Add(len(urls))         // Wait for one goroutine per URL.
+	sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
+
+	var wait sync.WaitGroup
+	wait.Add(len(urls)) // Wait for one goroutine per URL.
 
 	_, prog := path.Split(os.Args[0])
 	container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid()))
@@ -91,9 +87,7 @@ func main() {
 				m := amqp.NewMessage()
 				body := fmt.Sprintf("%v-%v", url.Path, i)
 				m.Marshal(body)
-				sentMessage, err := s.Send(m)
-				util.ExitIf(err)
-				sentChan <- sent{body, sentMessage}
+				s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
 			}
 		}(urlStr)
 	}
@@ -102,12 +96,11 @@ func main() {
 	expect := int(*count) * len(urls)
 	util.Debugf("Started senders, expect %v acknowledgements\n", expect)
 	for i := 0; i < expect; i++ {
-		d := <-sentChan
-		disposition, err := d.sentMessage.Disposition()
-		if err != nil {
-			util.Debugf("acknowledgement[%v] %v error: %v\n", i, d.name, err)
+		out := <-sentChan // Outcome of async sends.
+		if out.Error != nil {
+			util.Debugf("acknowledgement[%v] %v error: %v\n", i, out.Value, out.Error)
 		} else {
-			util.Debugf("acknowledgement[%v]  %v (%v)\n", i, d.name, disposition)
+			util.Debugf("acknowledgement[%v]  %v (%v)\n", i, out.Value, out.Status)
 		}
 	}
 	fmt.Printf("Received all %v acknowledgements\n", expect)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/proton/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go
index 3eb5880..6458665 100644
--- a/examples/go/proton/broker.go
+++ b/examples/go/proton/broker.go
@@ -24,6 +24,9 @@ under the License.
 // messages to queues or subscribe to receive messages from them.
 //
 
+// TODO: show how to handle acknowledgedments from receivers and put rejected or
+// un-acknowledged messages back on their queues.
+
 package main
 
 import (
@@ -277,7 +280,7 @@ func (s *sender) sendable() {
 // run runs in a separate goroutine. It monitors the queue for messages and injects
 // a function to send them when there is credit
 func (s *sender) run() {
-	var q chan amqp.Message // q is nil initially as we have no credit.
+	var q util.Queue // q is nil initially as we have no credit.
 	for {
 		select {
 		case _, ok := <-s.credit:
@@ -293,7 +296,7 @@ func (s *sender) run() {
 			q = nil                      // Assume all credit will be used used, will be signaled otherwise.
 			s.h.injecter.Inject(func() { // Inject handler function to actually send
 				if s.h.senders[s.l] != s { // The sender has been closed by the remote end.
-					go func() { q <- m }() // Put the message back on the queue but don't block
+					q.PutBack(m) // Put the message back on the queue but don't block
 					return
 				}
 				if s.sendOne(m) != nil {
@@ -322,7 +325,7 @@ func (s *sender) sendOne(m amqp.Message) error {
 		delivery.Settle() // Pre-settled, unreliable.
 		util.Debugf("link %s sent %s", s.l, util.FormatMessage(m))
 	} else {
-		go func() { s.q <- m }() // Put the message back on the queue but don't block
+		s.q.PutBack(m) // Put the message back on the queue, don't block
 	}
 	return err
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/util/queue.go
----------------------------------------------------------------------
diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go
index d844c0d..2eaba72 100644
--- a/examples/go/util/queue.go
+++ b/examples/go/util/queue.go
@@ -27,6 +27,16 @@ import (
 // Use a buffered channel as a very simple queue.
 type Queue chan amqp.Message
 
+// Put a message back on the queue, does not block.
+func (q Queue) PutBack(m amqp.Message) {
+	select {
+	case q <- m:
+	default:
+		// Not an efficient implementation but ensures we don't block the caller.
+		go func() { q <- m }()
+	}
+}
+
 // Concurrent-safe map of queues.
 type Queues struct {
 	queueSize int

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
index 4096cdc..349fc41 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
@@ -39,7 +39,7 @@ import (
 type Error struct{ Name, Description string }
 
 // Error implements the Go error interface for AMQP error errors.
-func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) }
+func (c Error) Error() string { return fmt.Sprintf("%s: %s", c.Name, c.Description) }
 
 // Errorf makes a Error with name and formatted description as per fmt.Sprintf
 func Errorf(name, format string, arg ...interface{}) Error {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 3462b92..8a9e6cd 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -69,10 +69,10 @@ type Connection interface {
 	// to close it with an error. The specific Incoming types have additional
 	// methods to configure the endpoint.
 	//
-	// Delay in receiving from Incoming() or calling Accept/Reject will block
-	// proton. Normally you should have a dedicated goroutine receive from this
-	// channel and start a new goroutine to serve each endpoint accepted.  The
-	// channel is closed when the Connection closes.
+	// Not receiving from Incoming() or not calling Accept/Reject will block the
+	// electron event loop. Normally you would have a dedicated goroutine receive
+	// from Incoming() and start new goroutines to serve each incoming endpoint.
+	// The channel is closed when the Connection closes.
 	//
 	Incoming() <-chan Incoming
 }
@@ -103,15 +103,13 @@ type connection struct {
 	incoming    chan Incoming
 	handler     *handler
 	engine      *proton.Engine
-	err         proton.ErrorHolder
 	eConnection proton.Connection
 
 	defaultSession Session
-	done           chan struct{}
 }
 
 func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) {
-	c := &connection{container: cont, conn: conn, done: make(chan struct{})}
+	c := &connection{container: cont, conn: conn}
 	c.handler = newHandler(c)
 	var err error
 	c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
@@ -121,12 +119,20 @@ func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption)
 	for _, set := range setting {
 		set(c)
 	}
-	c.str = c.engine.String()
+	c.endpoint = makeEndpoint(c.engine.String())
 	c.eConnection = c.engine.Connection()
-	go func() { c.engine.Run(); close(c.done) }()
+	go c.run()
 	return c, nil
 }
 
+func (c *connection) run() {
+	c.engine.Run()
+	if c.incoming != nil {
+		close(c.incoming)
+	}
+	c.closed(Closed)
+}
+
 func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
 
 func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) }
@@ -134,6 +140,9 @@ func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect
 func (c *connection) Session(setting ...SessionOption) (Session, error) {
 	var s Session
 	err := c.engine.InjectWait(func() error {
+		if c.Error() != nil {
+			return c.Error()
+		}
 		eSession, err := c.engine.Connection().Session()
 		if err == nil {
 			eSession.Open()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
index eaa6e7a..a484900 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -35,7 +35,8 @@ Send() and Receive() messages.
 You can create an AMQP server connection by calling Connection.Server() and
 Connection.Listen() before calling Connection.Open(). A server connection can
 negotiate protocol security details and can accept incoming links opened from
-the remote end of the connection
+the remote end of the connection.
+
 */
 package electron
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index 057e572..f04b240 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -48,15 +48,31 @@ type Endpoint interface {
 
 	// Connection containing the endpoint
 	Connection() Connection
+
+	// Done returns a channel that will close when the endpoint closes.
+	// Error() will contain the reason.
+	Done() <-chan struct{}
 }
 
 type endpoint struct {
-	err proton.ErrorHolder
-	str string // Must be set by the value that embeds endpoint.
+	err  proton.ErrorHolder
+	str  string // Must be set by the value that embeds endpoint.
+	done chan struct{}
+}
+
+func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} }
+
+func (e *endpoint) closed(err error) {
+	e.err.Set(err)
+	e.err.Set(Closed)
+	close(e.done)
 }
 
 func (e *endpoint) String() string { return e.str }
-func (e *endpoint) Error() error   { return e.err.Get() }
+
+func (e *endpoint) Error() error { return e.err.Get() }
+
+func (e *endpoint) Done() <-chan struct{} { return e.done }
 
 // Call in proton goroutine to close an endpoint locally
 // handler will complete the close when remote end closes.
@@ -65,3 +81,12 @@ func localClose(ep proton.Endpoint, err error) {
 		proton.CloseError(ep, err)
 	}
 }
+
+// Used to indicate that a channel has closed which normally is because the endpoint is closed.
+func errorOrClosed(e Endpoint) error {
+	if e.Error() != nil {
+		return e.Error()
+	} else {
+		return Closed
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 1b1164c..1586026 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -30,7 +30,7 @@ type handler struct {
 	delegator    *proton.MessagingAdapter
 	connection   *connection
 	links        map[proton.Link]Link
-	sentMessages map[proton.Delivery]*sentMessage
+	sentMessages map[proton.Delivery]sentMessage
 	sessions     map[proton.Session]*session
 }
 
@@ -38,7 +38,7 @@ func newHandler(c *connection) *handler {
 	h := &handler{
 		connection:   c,
 		links:        make(map[proton.Link]Link),
-		sentMessages: make(map[proton.Delivery]*sentMessage),
+		sentMessages: make(map[proton.Delivery]sentMessage),
 		sessions:     make(map[proton.Session]*session),
 	}
 	h.delegator = proton.NewMessagingAdapter(h)
@@ -64,8 +64,10 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		}
 
 	case proton.MSettled:
-		if sm := h.sentMessages[e.Delivery()]; sm != nil {
-			sm.settled(nil)
+		if sm, ok := h.sentMessages[e.Delivery()]; ok {
+			d := e.Delivery().Remote()
+			sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value}
+			delete(h.sentMessages, e.Delivery())
 		}
 
 	case proton.MSendable:
@@ -123,15 +125,19 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection))
 
 		err := h.connection.Error()
+
 		for l, _ := range h.links {
 			h.linkClosed(l, err)
 		}
+		h.links = nil
 		for _, s := range h.sessions {
 			s.closed(err)
 		}
+		h.sessions = nil
 		for _, sm := range h.sentMessages {
-			sm.settled(err)
+			sm.ack <- Outcome{Unacknowledged, err, sm.value}
 		}
+		h.sentMessages = nil
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index cadc2c1..91efa8e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -144,7 +144,6 @@ type link struct {
 
 	session *session
 	eLink   proton.Link
-	done    chan struct{} // Closed when link is closed
 }
 
 func (l *link) Source() string           { return l.source }
@@ -167,7 +166,6 @@ func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error)
 		isSender: isSender,
 		capacity: 1,
 		prefetch: false,
-		done:     make(chan struct{}),
 	}
 	for _, set := range setting {
 		set(&l)
@@ -188,8 +186,8 @@ func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error)
 	l.eLink.Target().SetAddress(l.target)
 	l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
 	l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
-	l.str = l.eLink.String()
 	l.eLink.Open()
+	l.endpoint = makeEndpoint(l.eLink.String())
 	return l, nil
 }
 
@@ -213,23 +211,18 @@ func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
 			rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
 			capacity:  1,
 			prefetch:  false,
-			done:      make(chan struct{}),
+			endpoint:  makeEndpoint(eLink.String()),
 		},
 	}
-	l.str = eLink.String()
 	return l
 }
 
-// Called in proton goroutine on closed or disconnected
-func (l *link) closed(err error) {
-	l.err.Set(err)
-	l.err.Set(Closed) // If no error set, mark as closed.
-	close(l.done)
-}
-
 // Not part of Link interface but use by Sender and Receiver.
 func (l *link) Credit() (credit int, err error) {
 	err = l.engine().InjectWait(func() error {
+		if l.Error() != nil {
+			return l.Error()
+		}
 		credit = l.eLink.Credit()
 		return nil
 	})
@@ -240,7 +233,11 @@ func (l *link) Credit() (credit int, err error) {
 func (l *link) Capacity() int { return l.capacity }
 
 func (l *link) Close(err error) {
-	l.engine().Inject(func() { localClose(l.eLink, err) })
+	l.engine().Inject(func() {
+		if l.Error() == nil {
+			localClose(l.eLink, err)
+		}
+	})
 }
 
 func (l *link) open() {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
index 3b5a062..5af57e8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
@@ -80,7 +80,6 @@ func closeClientServer(client Session, server Connection) {
 
 // Send a message one way with a client sender and server receiver, verify ack.
 func TestClientSendServerReceive(t *testing.T) {
-	timeout := time.Second * 2
 	nLinks := 3
 	nMessages := 3
 
@@ -116,15 +115,14 @@ func TestClientSendServerReceive(t *testing.T) {
 
 	for i := 0; i < nLinks; i++ {
 		for j := 0; j < nMessages; j++ {
-			var sm SentMessage
-
 			// Client send
+			ack := make(chan Outcome, 1)
 			sendDone := make(chan struct{})
 			go func() {
 				defer close(sendDone)
 				m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
 				var err error
-				sm, err = s[i].Send(m)
+				s[i].SendAsync(m, ack, "testing")
 				if err != nil {
 					t.Fatal(err)
 				}
@@ -141,16 +139,19 @@ func TestClientSendServerReceive(t *testing.T) {
 
 			// Should not be acknowledged on client yet
 			<-sendDone
-			if d, err := sm.DispositionTimeout(0); err != Timeout || NoDisposition != d {
-				t.Errorf("want [no-disposition/timeout] got [%v/%v]", d, err)
+			select {
+			case <-ack:
+				t.Errorf("unexpected ack")
+			default:
 			}
-			// Server ack
-			if err := rm.Acknowledge(Rejected); err != nil {
+
+			// Server send ack
+			if err := rm.Reject(); err != nil {
 				t.Error(err)
 			}
 			// Client get ack.
-			if d, err := sm.DispositionTimeout(timeout); err != nil || Rejected != d {
-				t.Errorf("want [rejected/nil] got [%v/%v]", d, err)
+			if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected {
+				t.Error("unexpected ack: ", a.Status, a.Error, a.Value)
 			}
 		}
 	}
@@ -166,12 +167,10 @@ func TestClientReceiver(t *testing.T) {
 				s := in.Accept().(Sender)
 				go func() {
 					for i := int32(0); i < int32(nMessages); i++ {
-						sm, err := s.Send(amqp.NewMessageWith(i))
-						if err != nil {
-							t.Error(err)
+						out := s.SendSync(amqp.NewMessageWith(i))
+						if out.Error != nil {
+							t.Error(out.Error)
 							return
-						} else {
-							sm.Disposition() // Sync send.
 						}
 					}
 					s.Close(nil)
@@ -235,10 +234,10 @@ func TestTimeouts(t *testing.T) {
 	short := time.Millisecond
 	long := time.Second
 	m := amqp.NewMessage()
-	if _, err = snd.SendTimeout(m, 0); err != Timeout { // No credit, expect timeout.
+	if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout.
 		t.Error("want Timeout got", err)
 	}
-	if _, err = snd.SendTimeout(m, short); err != Timeout { // No credit, expect timeout.
+	if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout.
 		t.Error("want Timeout got", err)
 	}
 	// Test receive with timeout
@@ -250,17 +249,15 @@ func TestTimeouts(t *testing.T) {
 		t.Error("want Timeout got", err)
 	}
 	// There is now a credit on the link due to receive
-	sm, err := snd.SendTimeout(m, long)
-	if err != nil {
-		t.Fatal(err)
-	}
+	ack := make(chan Outcome)
+	snd.SendAsyncTimeout(m, ack, nil, short)
 	// Disposition should timeout
-	if _, err = sm.DispositionTimeout(long); err != Timeout {
-		t.Error("want Timeout got", err)
-	}
-	if _, err = sm.DispositionTimeout(short); err != Timeout {
-		t.Error("want Timeout got", err)
+	select {
+	case <-ack:
+		t.Errorf("want Timeout got %#v", ack)
+	case <-time.After(short):
 	}
+
 	// Receive and accept
 	rm, err := rcv.ReceiveTimeout(long)
 	if err != nil {
@@ -268,9 +265,8 @@ func TestTimeouts(t *testing.T) {
 	}
 	rm.Accept()
 	// Sender get ack
-	d, err := sm.DispositionTimeout(long)
-	if err != nil || d != Accepted {
-		t.Errorf("want (rejected, nil) got (%v, %v)", d, err)
+	if a := <-ack; a.Status != Accepted || a.Error != nil {
+		t.Errorf("want (accepted, nil) got %#v", a)
 	}
 }
 
@@ -329,7 +325,7 @@ type result struct {
 func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) }
 
 func doSend(snd Sender, results chan result) {
-	_, err := snd.Send(amqp.NewMessage())
+	err := snd.SendSync(amqp.NewMessage()).Error
 	results <- result{"send", err}
 }
 
@@ -338,9 +334,8 @@ func doReceive(rcv Receiver, results chan result) {
 	results <- result{"receive", err}
 }
 
-func doDisposition(sm SentMessage, results chan result) {
-	_, err := sm.Disposition()
-	results <- result{"disposition", err}
+func doDisposition(ack <-chan Outcome, results chan result) {
+	results <- result{"disposition", (<-ack).Error}
 }
 
 // Test that closing Links interrupts blocked link functions.
@@ -391,9 +386,8 @@ func TestConnectionCloseInterrupt1(t *testing.T) {
 	// Connection.Close() interrupts Send, Receive, Disposition.
 	snd, rcv := pairs.senderReceiver()
 	go doReceive(rcv, results)
-	sm, err := snd.Send(amqp.NewMessage())
-	fatalIf(t, err)
-	go doDisposition(sm, results)
+	ack := snd.SendWaitable(amqp.NewMessage())
+	go doDisposition(ack, results)
 	snd, rcv = pairs.senderReceiver()
 	go doSend(snd, results)
 	rcv, snd = pairs.receiverSender()
@@ -416,9 +410,8 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
 	// Connection.Close() interrupts Send, Receive, Disposition.
 	snd, rcv := pairs.senderReceiver()
 	go doReceive(rcv, results)
-	sm, err := snd.Send(amqp.NewMessage())
-	fatalIf(t, err)
-	go doDisposition(sm, results)
+	ack := snd.SendWaitable(amqp.NewMessage())
+	go doDisposition(ack, results)
 	snd, rcv = pairs.senderReceiver()
 	go doSend(snd, results)
 	rcv, snd = pairs.receiverSender()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 4ff83b4..22bdc7e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -77,6 +77,9 @@ type prefetchPolicy struct{}
 
 func (p prefetchPolicy) Flow(r *receiver) {
 	r.engine().Inject(func() {
+		if r.Error() != nil {
+			return
+		}
 		_, _, max := r.credit()
 		if max > 0 {
 			r.eLink.Flow(max)
@@ -94,6 +97,9 @@ type noPrefetchPolicy struct{ waiting int }
 
 func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
 	r.engine().Inject(func() {
+		if r.Error() != nil {
+			return
+		}
 		len, credit, max := r.credit()
 		add := p.waiting - (len + credit)
 		if add > max {
@@ -202,20 +208,23 @@ type ReceivedMessage struct {
 	receiver  Receiver
 }
 
-// Acknowledge a ReceivedMessage with the given disposition code.
-func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error {
-	return rm.receiver.(*receiver).engine().InjectWait(func() error {
-		// Settle doesn't return an error but if the receiver is broken the settlement won't happen.
-		rm.eDelivery.SettleAs(uint64(disposition))
-		return rm.receiver.Error()
+// Acknowledge a ReceivedMessage with the given delivery status.
+func (rm *ReceivedMessage) acknowledge(status uint64) error {
+	return rm.receiver.(*receiver).engine().Inject(func() {
+		// Deliveries are valid as long as the connection is, unless settled.
+		rm.eDelivery.SettleAs(uint64(status))
 	})
 }
 
-// Accept is short for Acknowledge(Accpeted)
-func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
+// Accept tells the sender that we take responsibility for processing the message.
+func (rm *ReceivedMessage) Accept() error { return rm.acknowledge(proton.Accepted) }
+
+// Reject tells the sender we consider the message invalid and unusable.
+func (rm *ReceivedMessage) Reject() error { return rm.acknowledge(proton.Rejected) }
 
-// Reject is short for Acknowledge(Rejected)
-func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
+// Release tells the sender we will not process the message but some other
+// receiver might.
+func (rm *ReceivedMessage) Release() error { return rm.acknowledge(proton.Released) }
 
 // IncomingReceiver is sent on the Connection.Incoming() channel when there is
 // an incoming request to open a receiver link.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 11f019b..573e9da 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -25,278 +25,237 @@ import "C"
 import (
 	"qpid.apache.org/amqp"
 	"qpid.apache.org/proton"
-	"reflect"
 	"time"
 )
 
 // Sender is a Link that sends messages.
+//
+// The result of sending a message is provided by an Outcome value.
+//
+// A sender can buffer messages up to the credit limit provided by the remote receiver.
+// Send* methods will block if the buffer is full until there is space.
+// Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.
+//
 type Sender interface {
 	Link
 
-	// Send a message without waiting for acknowledgement. Returns a SentMessage.
-	// use SentMessage.Disposition() to wait for acknowledgement and get the
-	// disposition code.
-	//
-	// If the send buffer is full, send blocks until there is space in the buffer.
-	Send(m amqp.Message) (sm SentMessage, err error)
+	// SendSync sends a message and blocks until the message is acknowledged by the remote receiver.
+	// Returns an Outcome, which may contain an error if the message could not be sent.
+	SendSync(m amqp.Message) Outcome
+
+	// SendWaitable puts a message in the send buffer and returns a channel that
+	// you can use to wait for the Outcome of just that message. The channel is
+	// buffered so you can receive from it whenever you want without blocking anything.
+	SendWaitable(m amqp.Message) <-chan Outcome
+
+	// SendForget buffers a message for sending and returns, with no notification of the outcome.
+	SendForget(m amqp.Message)
 
-	// SendTimeout is like send but only waits up to timeout for buffer space.
+	// SendAsync puts a message in the send buffer and returns immediately.  An
+	// Outcome with Value = value will be sent to the ack channel when the remote
+	// receiver has acknowledged the message or if there is an error.
 	//
-	// Returns Timeout error if the timeout expires and the message has not been sent.
-	SendTimeout(m amqp.Message, timeout time.Duration) (sm SentMessage, err error)
+	// You can use the same ack channel for many calls to SendAsync(), possibly on
+	// many Senders. The channel will receive the outcomes in the order they
+	// become available. The channel should be buffered and/or served by dedicated
+	// goroutines to avoid blocking the connection.
+	//
+	// If ack == nil no Outcome is sent.
+	SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})
 
-	// Send a message and forget it, there will be no acknowledgement.
-	// If the send buffer is full, send blocks until there is space in the buffer.
-	SendForget(m amqp.Message) error
+	SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration)
 
-	// SendForgetTimeout is like send but only waits up to timeout for buffer space.
-	// Returns Timeout error if the timeout expires and the message has not been sent.
-	SendForgetTimeout(m amqp.Message, timeout time.Duration) error
+	SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome
 
-	// Credit indicates how many messages the receiving end of the link can accept.
-	//
-	// On a Sender credit can be negative, meaning that messages in excess of the
-	// receiver's credit limit have been buffered locally till credit is available.
-	Credit() (int, error)
-}
+	SendForgetTimeout(m amqp.Message, timeout time.Duration)
 
-type sendMessage struct {
-	m  amqp.Message
-	sm SentMessage
+	SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
 }
 
-type sender struct {
-	link
-	credit chan struct{} // Signal available credit.
+// Outcome provides information about the outcome of sending a message.
+type Outcome struct {
+	// Status of the message: was it sent, how was it acknowledged.
+	Status SentStatus
+	// Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise.
+	Error error
+	// Value provided by the application in SendAsync()
+	Value interface{}
 }
 
-// Disposition indicates the outcome of a settled message delivery.
-type Disposition uint64
+// SentStatus indicates the status of a sent message.
+type SentStatus int
 
 const (
-	// No disposition available: pre-settled, not yet acknowledged or an error occurred
-	NoDisposition Disposition = 0
+	// Message was never sent
+	Unsent SentStatus = iota
+	// Message was sent but never acknowledged. It may or may not have been received.
+	Unacknowledged
+	// Message was sent pre-settled, no remote outcome is available.
+	Presettled
 	// Message was accepted by the receiver
-	Accepted = proton.Accepted
+	Accepted
 	// Message was rejected as invalid by the receiver
-	Rejected = proton.Rejected
-	// Message was not processed by the receiver but may be processed by some other receiver.
-	Released = proton.Released
+	Rejected
+	// Message was not processed by the receiver but may be valid for a different receiver
+	Released
+	// Receiver responded with an unrecognized status.
+	Unknown
 )
 
-// String human readable name for a Disposition.
-func (d Disposition) String() string {
-	switch d {
-	case NoDisposition:
-		return "no-disposition"
+// String human readable name for SentStatus.
+func (s SentStatus) String() string {
+	switch s {
+	case Unsent:
+		return "unsent"
+	case Unacknowledged:
+		return "unacknowledged"
 	case Accepted:
 		return "accepted"
 	case Rejected:
 		return "rejected"
 	case Released:
 		return "released"
-	default:
+	case Unknown:
 		return "unknown"
+	default:
+		return "invalid"
 	}
 }
 
-func (s *sender) Send(m amqp.Message) (SentMessage, error) {
-	return s.SendTimeout(m, Forever)
-}
-
-func (s *sender) SendTimeout(m amqp.Message, timeout time.Duration) (SentMessage, error) {
-	var sm SentMessage
-	if s.sndSettle == SndSettled {
-		sm = nil
-	} else {
-		sm = newSentMessage(s.session.connection)
+// Convert proton delivery state code to SentStatus value
+func sentStatus(d uint64) SentStatus {
+	switch d {
+	case proton.Accepted:
+		return Accepted
+	case proton.Rejected:
+		return Rejected
+	case proton.Released, proton.Modified:
+		return Released
+	default:
+		return Unknown
 	}
-	return s.sendInternal(sendMessage{m, sm}, timeout)
-}
-
-func (s *sender) SendForget(m amqp.Message) error {
-	return s.SendForgetTimeout(m, Forever)
 }
 
-func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error {
-	snd := sendMessage{m, nil}
-	_, err := s.sendInternal(snd, timeout)
-	return err
+// Sender implementation, held by handler.
+type sender struct {
+	link
+	credit chan struct{} // Signal available credit.
 }
 
-func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) {
-	if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for credit
-		if err == Closed {
+func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) {
+	// wait for credit
+	if _, err := timedReceive(s.credit, t); err != nil {
+		if err == Closed && s.Error != nil {
 			err = s.Error()
-			assert(err != nil)
 		}
-		return nil, err
+		ack <- Outcome{Unsent, err, v}
+		return
 	}
-	if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil {
-		return nil, err
-	}
-	return snd.sm, nil
-}
-
-// Send a message. Handler goroutine
-func (s *sender) doSend(snd sendMessage) {
-	delivery, err := s.eLink.Send(snd.m)
-	switch sm := snd.sm.(type) {
-	case nil:
-		delivery.Settle()
-	case *sentMessage:
-		sm.delivery = delivery
-		if err != nil {
-			sm.settled(err)
-		} else {
-			s.handler().sentMessages[delivery] = sm
+	// Send a message in handler goroutine
+	err := s.engine().Inject(func() {
+		if s.Error() != nil {
+			if ack != nil {
+				ack <- Outcome{Unsent, s.Error(), v}
+			}
+			return
 		}
-	default:
-		assert(false, "bad SentMessage type %T", snd.sm)
-	}
-	if s.eLink.Credit() > 0 {
-		s.sendable() // Signal credit.
+		if delivery, err := s.eLink.Send(m); err == nil {
+			if ack != nil { // We must report an outcome
+				if s.SndSettle() == SndSettled {
+					delivery.Settle() // Pre-settle if required
+					ack <- Outcome{Presettled, nil, v}
+				} else {
+					s.handler().sentMessages[delivery] = sentMessage{ack, v}
+				}
+			} else { // ack == nil, can't report outcome
+				if s.SndSettle() != SndUnsettled { // Pre-settle unless we are forced not to.
+					delivery.Settle()
+				}
+			}
+		} else { // err != nil
+			if ack != nil {
+				ack <- Outcome{Unsent, err, v}
+			}
+		}
+		if s.eLink.Credit() > 0 { // Signal there is still credit
+			s.sendable()
+		}
+	})
+	if err != nil && ack != nil {
+		ack <- Outcome{Unsent, err, v}
 	}
 }
 
-// Signal the sender has credit. Any goroutine.
+// Set credit flag if not already set. Non-blocking, any goroutine
 func (s *sender) sendable() {
 	select { // Non-blocking
-	case s.credit <- struct{}{}: // Set the flag if not already set.
+	case s.credit <- struct{}{}:
 	default:
 	}
 }
 
-func (s *sender) closed(err error) {
-	s.link.closed(err)
-	close(s.credit)
+func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome {
+	out := make(chan Outcome, 1)
+	s.SendAsyncTimeout(m, out, nil, t)
+	return out
 }
 
-func newSender(l link) *sender {
-	s := &sender{link: l, credit: make(chan struct{}, 1)}
-	s.handler().addLink(s.eLink, s)
-	s.link.open()
-	return s
-}
-
-// SentMessage represents a previously sent message. It allows you to wait for acknowledgement.
-type SentMessage interface {
-
-	// Disposition blocks till the message is acknowledged and returns the
-	// disposition state.
-	//
-	// NoDisposition with Error() != nil means the Connection was closed before
-	// the message was acknowledged.
-	//
-	// NoDisposition with Error() == nil means the message was pre-settled or
-	// Forget() was called.
-	Disposition() (Disposition, error)
-
-	// DispositionTimeout is like Disposition but gives up after timeout, see Timeout.
-	DispositionTimeout(time.Duration) (Disposition, error)
-
-	// Forget interrupts any call to Disposition on this SentMessage and tells the
-	// peer we are no longer interested in the disposition of this message.
-	Forget()
-
-	// Error returns the error that closed the disposition, or nil if there was no error.
-	// If the disposition closed because the connection closed, it will return Closed.
-	Error() error
-
-	// Value is an optional value you wish to associate with the SentMessage. It
-	// can be the message itself or some form of identifier.
-	Value() interface{}
-	SetValue(interface{})
-}
-
-// SentMessageSet is a concurrent-safe set of sent messages that can be checked
-// to get the next completed sent message
-type SentMessageSet struct {
-	cases []reflect.SelectCase
-	sm    []SentMessage
-	done  chan SentMessage
-}
-
-func (s *SentMessageSet) Add(sm SentMessage) {
-	s.sm = append(s.sm, sm)
-	s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sm.(*sentMessage).done)})
+func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) {
+	s.SendAsyncTimeout(m, nil, nil, t)
 }
 
-// Wait waits up to timeout and returns the next SentMessage that has a valid dispositionb
-// or an error.
-func (s *SentMessageSet) Wait(sm SentMessage, timeout time.Duration) (SentMessage, error) {
-	s.cases = s.cases[:len(s.sm)] // Remove previous timeout cases
-	if timeout == 0 {             // Non-blocking
-		s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault})
-	} else {
-		s.cases = append(s.cases,
-			reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
+func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome {
+	deadline := time.Now().Add(t)
+	ack := s.SendWaitableTimeout(m, t)
+	t = deadline.Sub(time.Now()) // Adjust for time already spent.
+	if t < 0 {
+		t = 0
 	}
-	chosen, _, _ := reflect.Select(s.cases)
-	if chosen > len(s.sm) {
-		return nil, Timeout
+	if out, err := timedReceive(ack, t); err == nil {
+		return out.(Outcome)
 	} else {
-		sm := s.sm[chosen]
-		s.sm = append(s.sm[:chosen], s.sm[chosen+1:]...)
-		return sm, nil
+		if err == Closed && s.Error() != nil {
+			err = s.Error()
+		}
+		return Outcome{Unacknowledged, err, nil}
 	}
 }
 
-// SentMessage implementation
-type sentMessage struct {
-	connection  *connection
-	done        chan struct{}
-	delivery    proton.Delivery
-	disposition Disposition
-	err         error
-	value       interface{}
+func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) {
+	s.SendAsyncTimeout(m, ack, v, Forever)
 }
 
-func newSentMessage(c *connection) *sentMessage {
-	return &sentMessage{connection: c, done: make(chan struct{})}
+func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome {
+	return s.SendWaitableTimeout(m, Forever)
 }
 
-func (sm *sentMessage) SetValue(v interface{}) { sm.value = v }
-func (sm *sentMessage) Value() interface{}     { return sm.value }
-func (sm *sentMessage) Disposition() (Disposition, error) {
-	<-sm.done
-	return sm.disposition, sm.err
+func (s *sender) SendForget(m amqp.Message) {
+	s.SendForgetTimeout(m, Forever)
 }
 
-func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) {
-	if _, err := timedReceive(sm.done, timeout); err == Timeout {
-		return sm.disposition, Timeout
-	} else {
-		return sm.disposition, sm.err
-	}
+func (s *sender) SendSync(m amqp.Message) Outcome {
+	return <-s.SendWaitable(m)
 }
 
-func (sm *sentMessage) Forget() {
-	sm.connection.engine.Inject(func() {
-		sm.delivery.Settle()
-		delete(sm.connection.handler.sentMessages, sm.delivery)
-	})
-	sm.finish()
+// handler goroutine
+func (s *sender) closed(err error) {
+	s.link.closed(err)
+	close(s.credit)
 }
 
-func (sm *sentMessage) settled(err error) {
-	if sm.delivery.Settled() {
-		sm.disposition = Disposition(sm.delivery.Remote().Type())
-	}
-	sm.err = err
-	sm.finish()
+func newSender(l link) *sender {
+	s := &sender{link: l, credit: make(chan struct{}, 1)}
+	s.handler().addLink(s.eLink, s)
+	s.link.open()
+	return s
 }
 
-func (sm *sentMessage) finish() {
-	select {
-	case <-sm.done: // No-op if already closed
-	default:
-		close(sm.done)
-	}
+// sentMessage records a sent message on the handler.
+type sentMessage struct {
+	ack   chan<- Outcome
+	value interface{}
 }
 
-func (sm *sentMessage) Error() error { return sm.err }
-
 // IncomingSender is sent on the Connection.Incoming() channel when there is
 // an incoming request to open a sender link.
 type IncomingSender struct {
@@ -307,3 +266,9 @@ type IncomingSender struct {
 func (in *IncomingSender) Accept() Endpoint {
 	return in.accept(func() Endpoint { return newSender(in.link) })
 }
+
+// Call in injected functions to check if the sender is valid.
+func (s *sender) valid() bool {
+	s2, ok := s.handler().links[s.eLink].(*sender)
+	return ok && s2 == s
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index d347c99..18d8bc8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -53,7 +53,7 @@ func newSession(c *connection, es proton.Session, setting ...SessionOption) *ses
 	s := &session{
 		connection: c,
 		eSession:   es,
-		endpoint:   endpoint{str: es.String()},
+		endpoint:   makeEndpoint(es.String()),
 	}
 	for _, set := range setting {
 		set(s)
@@ -67,12 +67,20 @@ func newSession(c *connection, es proton.Session, setting ...SessionOption) *ses
 func (s *session) Connection() Connection     { return s.connection }
 func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
 func (s *session) engine() *proton.Engine     { return s.connection.engine }
+
 func (s *session) Close(err error) {
-	s.engine().Inject(func() { localClose(s.eSession, err) })
+	s.engine().Inject(func() {
+		if s.Error() == nil {
+			localClose(s.eSession, err)
+		}
+	})
 }
 
 func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
 	err = s.engine().InjectWait(func() error {
+		if s.Error() != nil {
+			return s.Error()
+		}
 		l, err := localLink(s, true, setting...)
 		if err == nil {
 			snd = newSender(l)
@@ -84,6 +92,9 @@ func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
 
 func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) {
 	err = s.engine().InjectWait(func() error {
+		if s.Error() != nil {
+			return s.Error()
+		}
 		l, err := localLink(s, false, setting...)
 		if err == nil {
 			rcv = newReceiver(l)
@@ -93,12 +104,6 @@ func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) {
 	return
 }
 
-// Called from handler on closed.
-func (s *session) closed(err error) {
-	s.err.Set(err)
-	s.err.Set(Closed)
-}
-
 // IncomingSender is sent on the Connection.Incoming() channel when there is an
 // incoming request to open a session.
 type IncomingSession struct {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
index 0015185..51bfbc5 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
@@ -21,6 +21,7 @@ package electron
 
 import (
 	"fmt"
+	"math"
 	"reflect"
 	"time"
 )
@@ -41,7 +42,7 @@ import (
 var Timeout = fmt.Errorf("timeout")
 
 // Forever can be used as a timeout parameter to indicate wait forever.
-const Forever time.Duration = -1
+const Forever time.Duration = math.MaxInt64
 
 // timedReceive receives on channel (which can be a chan of any type), waiting
 // up to timeout.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 95d70e9..2e67ef7 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -315,6 +315,13 @@ func (eng *Engine) Run() error {
 	wait.Wait()
 	close(eng.running) // Signal goroutines have exited and Error is set.
 
+	// Execute any injected functions for side effects on application data structures.
+	inject := eng.inject
+	eng.inject = nil // Further calls to Inject() will return an error.
+	for f := range inject {
+		f()
+	}
+
 	if !eng.connection.IsNil() {
 		eng.connection.Free()
 	}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org